diff options
Diffstat (limited to 'sleekxmpp')
-rw-r--r-- | sleekxmpp/__init__.py | 4 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/statemachine.py | 43 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 72 |
3 files changed, 70 insertions, 49 deletions
diff --git a/sleekxmpp/__init__.py b/sleekxmpp/__init__.py index 3379e45d..3b2b5e0f 100644 --- a/sleekxmpp/__init__.py +++ b/sleekxmpp/__init__.py @@ -159,11 +159,11 @@ class ClientXMPP(basexmpp, XMLStream): def reconnect(self): self.disconnect(reconnect=True) - def disconnect(self, reconnect=False): + def disconnect(self, reconnect=False, error=False): self.event("disconnected") self.authenticated = False self.sessionstarted = False - XMLStream.disconnect(self, reconnect) + XMLStream.disconnect(self, reconnect, error) def registerFeature(self, mask, pointer, breaker = False): """Register a stream feature.""" diff --git a/sleekxmpp/xmlstream/statemachine.py b/sleekxmpp/xmlstream/statemachine.py index 9412d5ad..8939397b 100644 --- a/sleekxmpp/xmlstream/statemachine.py +++ b/sleekxmpp/xmlstream/statemachine.py @@ -86,11 +86,11 @@ class StateMachine(object): start = time.time() while not self.__current_state in from_states or not self.lock.acquire(False): # detect timeout: - if time.time() >= start + wait: return False - self.notifier.wait(wait) + remainder = start + wait - time.time() + if remainder > 0: self.notifier.wait(remainder) + else: return False try: # lock is acquired; all other threads will return false or wait until notify/timeout - self.notifier.clear() if self.__current_state in from_states: # should always be True due to lock # Note that func might throw an exception, but that's OK, it aborts the transition @@ -107,7 +107,8 @@ class StateMachine(object): log.error( "StateMachine bug!! The lock should ensure this doesn't happen!" ) return False finally: - self.notifier.set() + self.notifier.set() # notify any waiting threads that the state has changed. + self.notifier.clear() self.lock.release() @@ -146,14 +147,14 @@ class StateMachine(object): return _StateCtx(self, from_state, to_state, wait) - def ensure(self, state, wait=0.0): + def ensure(self, state, wait=0.0, block_on_transition=False ): ''' Ensure the state machine is currently in `state`, or wait until it enters `state`. ''' - return self.ensure_any( (state,), wait=wait ) + return self.ensure_any( (state,), wait=wait, block_on_transition=block_on_transition ) - def ensure_any(self, states, wait=0.0): + def ensure_any(self, states, wait=0.0, block_on_transition=False): ''' Ensure we are currently in one of the given `states` or wait until we enter one of those states. @@ -172,16 +173,25 @@ class StateMachine(object): if not state in self.__states: raise ValueError( "StateMachine does not contain state '%s'" % state ) - # Locking never really gained us anything here, since the lock was released - # before the function returned anyways. The only thing it _did_ do was - # increase the probability that this function would block for longer than - # intended if a `transition` function or context was running while holding - # the lock. + # if we're in the middle of a transition, determine whether we should + # 'fall back' to the 'current' state, or wait for the new state, in order to + # avoid an operation occurring in the wrong state. + # TODO another option would be an ensure_ctx that uses a semaphore to allow + # threads to indicate they want to remain in a particular state. + + # will return immediately if no transition is in process. + if block_on_transition: + # we're not in the middle of a transition; don't hold the lock + if self.lock.acquire(False): self.lock.release() + # wait for the transition to complete + else: self.notifier.wait() + start = time.time() while not self.__current_state in states: # detect timeout: - if time.time() >= start + wait: return False - self.notifier.wait(wait) + remainder = start + wait - time.time() + if remainder > 0: self.notifier.wait(remainder) + else: return False return True @@ -227,10 +237,11 @@ class _StateCtx: start = time.time() while not self.state_machine[ self.from_state ] or not self.state_machine.lock.acquire(False): # detect timeout: - if time.time() >= start + self.wait: + remainder = start + self.wait - time.time() + if remainder > 0: self.state_machine.notifier.wait(remainder) + else: log.debug('StateMachine timeout while waiting for state: %s', self.from_state ) return False - self.state_machine.notifier.wait(self.wait) self._locked = True # lock has been acquired at this point self.state_machine.notifier.clear() diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 1fb3f8bb..88ec219c 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -83,7 +83,9 @@ class XMLStream(object): self.namespace_map = {} - self.run = True + # booleans are not volatile in Python and changes + # do not seem to be detected easily between threads. + self.quit = threading.Event() def setSocket(self, socket): "Set the socket" @@ -120,7 +122,7 @@ class XMLStream(object): # holds the state lock. delay = 1.0 # reconnection delay - while self.run: + while not self.quit.is_set(): logging.debug('connecting....') try: if host and port: @@ -150,14 +152,14 @@ class XMLStream(object): if not reattempt: return False except: logging.exception("Connection error") - if not reattempt: return False + if not reattempt: return False # quiesce if rconnection fails: # This algorithm based loosely on Twisted internet.protocol # http://twistedmatrix.com/trac/browser/trunk/twisted/internet/protocol.py#L310 delay = min(delay * RECONNECT_QUIESCE_FACTOR, RECONNECT_MAX_DELAY) delay = random.normalvariate(delay, delay * RECONNECT_QUIESCE_JITTER) - logging.debug('Waiting %fs until next reconnect attempt...', delay) + logging.debug('Waiting %.3fs until next reconnect attempt...', delay) time.sleep(delay) @@ -189,8 +191,8 @@ class XMLStream(object): raise RestartStream() def process(self, threaded=True): + self.quit.clear() self.scheduler.process(threaded=True) - self.run = True for t in range(0, HANDLER_THREADS): th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner) th.setDaemon(True) @@ -214,14 +216,16 @@ class XMLStream(object): def _process(self): "Start processing the socket." logging.debug('Process thread starting...') - while self.run: - if not self.state.ensure('connected',wait=2): continue + while not self.quit.is_set(): + if not self.state.ensure('connected',wait=2, block_on_transition=True): continue try: + logging.debug(' ------------------------------- starting process loop...') self.sendPriorityRaw(self.stream_header) - while self.run and self.__readXML(): pass + self.__readXML() # this loops until the stream is terminated. except socket.timeout: - logging.debug('socket rcv timeout') - pass + # TODO currently this will re-send a stream header if this exception occurs. + # I don't think that's intended behavior. + logging.warn('socket rcv timeout') except RestartStream: logging.debug("Restarting stream...") continue # DON'T re-initialize the stream -- this exception is sent @@ -230,14 +234,19 @@ class XMLStream(object): logging.debug("System interrupt detected") self.shutdown() self.eventqueue.put(('quit', None, None)) - except cElementTree.XMLParserError: #if there is an xml parsing exception, assume stream needs to be restarted - logging.warn('XML RCV parsing error!', exc_info=1) - if self.should_reconnect: self.disconnect(reconnect=True) - else: self.disconnect() except: logging.exception('Unexpected error in RCV thread') - if self.should_reconnect: self.disconnect(reconnect=True) - else: self.disconnect() + + # if the RCV socket is terminated for whatever reason (e.g. we reach this point of + # code,) our only sane choice of action is an attempt to re-establish the connection. + if not self.quit.is_set(): + logging.info( 'about to reconnect..........' ) + try: + self.disconnect(reconnect=self.should_reconnect, error=True) + except: + logging.exception( "WTF disconnect!" ) + logging.info( 'reconnect complete!' ) + logging.info( 'reconnect complete!' ) logging.debug('Quitting Process thread') @@ -257,8 +266,8 @@ class XMLStream(object): if event == b'end': edepth += -1 if edepth == 0 and event == b'end': - # what is this case exactly? Premature EOF? - logging.debug("Ending readXML loop") + logging.warn("Premature EOF from read socket; Ending readXML loop") + # this is a premature EOF as far as I can tell; raise an exception so the stream get closed and re-established cleanly. return False elif edepth == 1: #self.xmlin.put(xmlobj) @@ -266,13 +275,14 @@ class XMLStream(object): if root: root.clear() if event == b'start': edepth += 1 - logging.debug("Exiting readXML loop") + logging.warn("Exiting readXML loop") + # TODO under what conditions will this _ever_ occur? return False def _sendThread(self): logging.debug('send thread starting...') - while self.run: - if not self.state.ensure('connected',wait=2): continue + while not self.quit.is_set(): + if not self.state.ensure('connected',wait=2, block_on_transition=True): continue data = None try: @@ -294,7 +304,7 @@ class XMLStream(object): # some sort of event that could be handled by a common thread or the reader # thread to perform reconnect and then re-initialize the handler threads as well. if self.should_reconnect: - self.disconnect(reconnect=True) + self.disconnect(reconnect=True, error=True) def sendRaw(self, data): self.sendqueue.put((1, data)) @@ -304,16 +314,16 @@ class XMLStream(object): self.sendqueue.put((0, data)) return True - def disconnect(self, reconnect=False): + def disconnect(self, reconnect=False, error=False): with self.state.transition_ctx('connected','disconnected') as locked: if not locked: logging.warning("Already disconnected.") return + logging.debug("Disconnecting...") - self.sendRaw(self.stream_footer) - time.sleep(5) - #send end of stream - #wait for end of stream back + # don't send a footer on error; if the stream is already closed, + # this won't get sent until the stream is re-initialized! + if not error: self.sendRaw(self.stream_footer) #send end of stream try: # self.socket.shutdown(socket.SHUT_RDWR) self.socket.close() @@ -331,7 +341,7 @@ class XMLStream(object): Disconnects and shuts down all event threads. ''' self.disconnect() - self.run = False + self.quit.set() self.scheduler.run = False def incoming_filter(self, xmlobj): @@ -339,9 +349,9 @@ class XMLStream(object): def __spawnEvent(self, xmlobj): "watching xmlOut and processes handlers" + if logging.getLogger().isEnabledFor(logging.DEBUG): + logging.debug("RECV: %s" % cElementTree.tostring(xmlobj)) #convert XML into Stanza - # TODO surround this log statement with an if, it's expensive - logging.debug("RECV: %s" % cElementTree.tostring(xmlobj)) xmlobj = self.incoming_filter(xmlobj) stanza = None for stanza_class in self.__root_stanza: @@ -369,7 +379,7 @@ class XMLStream(object): def _eventRunner(self): logging.debug("Loading event runner") - while self.run: + while not self.quit.is_set(): try: event = self.eventqueue.get(True, timeout=5) except queue.Empty: |