From 6e93982fdf9c66673ad298148beaefc725f9440c Mon Sep 17 00:00:00 2001 From: Tom Nichols Date: Fri, 2 Jul 2010 16:46:34 -0400 Subject: trying to get xmlstream to reconnect on stream failure --- sleekxmpp/xmlstream/xmlstream.py | 84 +++++++++++++++++++++++----------------- 1 file changed, 49 insertions(+), 35 deletions(-) (limited to 'sleekxmpp/xmlstream/xmlstream.py') diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 842dfee2..a464da0d 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -42,9 +42,6 @@ if sys.version_info < (3, 0): class RestartStream(Exception): pass -class CloseStream(Exception): - pass - stanza_extensions = {} RECONNECT_MAX_DELAY = 3600 @@ -86,7 +83,9 @@ class XMLStream(object): self.namespace_map = {} - self.run = True + # booleans are not volatile in Python and changes + # do not seem to be detected easily between threads. + self.quit = threading.Event() def setSocket(self, socket): "Set the socket" @@ -123,7 +122,7 @@ class XMLStream(object): # holds the state lock. delay = 1.0 # reconnection delay - while self.run: + while not self.quit.is_set(): logging.debug('connecting....') try: if host and port: @@ -153,14 +152,14 @@ class XMLStream(object): if not reattempt: return False except: logging.exception("Connection error") - if not reattempt: return False + if not reattempt: return False # quiesce if rconnection fails: # This algorithm based loosely on Twisted internet.protocol # http://twistedmatrix.com/trac/browser/trunk/twisted/internet/protocol.py#L310 delay = min(delay * RECONNECT_QUIESCE_FACTOR, RECONNECT_MAX_DELAY) delay = random.normalvariate(delay, delay * RECONNECT_QUIESCE_JITTER) - logging.debug('Waiting %fs until next reconnect attempt...', delay) + logging.debug('Waiting %.3fs until next reconnect attempt...', delay) time.sleep(delay) @@ -192,8 +191,8 @@ class XMLStream(object): raise RestartStream() def process(self, threaded=True): + self.quit.clear() self.scheduler.process(threaded=True) - self.run = True for t in range(0, HANDLER_THREADS): th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner) th.setDaemon(True) @@ -217,16 +216,16 @@ class XMLStream(object): def _process(self): "Start processing the socket." logging.debug('Process thread starting...') - while self.run: - if not self.state.ensure('connected',wait=2): continue + while not self.quit.is_set(): + if not self.state.ensure('connected',wait=2, block_on_transition=True): continue try: + logging.debug(' ------------------------------- starting process loop...') self.sendPriorityRaw(self.stream_header) - while self.run and self.__readXML(): pass + self.__readXML() # this loops until the stream is terminated. except socket.timeout: - logging.debug('socket rcv timeout') - pass - except CloseStream: - # TODO warn that the listener thread is exiting!!! + # TODO currently this will re-send a stream header if this exception occurs. + # I don't think that's intended behavior. + logging.warn('socket rcv timeout') pass except RestartStream: logging.debug("Restarting stream...") @@ -236,13 +235,25 @@ class XMLStream(object): logging.debug("System interrupt detected") self.shutdown() self.eventqueue.put(('quit', None, None)) - except cElementTree.XMLParserError: - logging.warn('XML RCV parsing error!', exc_info=1) - # don't restart the stream on an XML parse error. except: logging.exception('Unexpected error in RCV thread') - if self.should_reconnect: - self.disconnect(reconnect=True) + + # if the RCV socket is terminated for whatever reason, our only sane choice of action is an attempt + # to re-establish the connection. + if not self.quit.is_set(): + logging.info( 'about to reconnect..........' ) + logging.info( 'about to reconnect..........' ) + logging.info( 'about to reconnect..........' ) + logging.info( 'about to reconnect..........' ) + try: + self.disconnect(reconnect=self.should_reconnect, error=True) + except: + logging.exception( "WTF disconnect!" ) + logging.info( 'reconnect complete!' ) + logging.info( 'reconnect complete!' ) + logging.info( 'reconnect complete!' ) + logging.info( 'reconnect complete!' ) + logging.info( 'reconnect complete!' ) logging.debug('Quitting Process thread') @@ -262,8 +273,8 @@ class XMLStream(object): if event == b'end': edepth += -1 if edepth == 0 and event == b'end': - # what is this case exactly? Premature EOF? - logging.debug("Ending readXML loop") + logging.warn("Premature EOF from read socket; Ending readXML loop") + # this is a premature EOF as far as I can tell; raise an exception so the stream get closed and re-established cleanly. return False elif edepth == 1: #self.xmlin.put(xmlobj) @@ -271,13 +282,14 @@ class XMLStream(object): if root: root.clear() if event == b'start': edepth += 1 - logging.debug("Exiting readXML loop") + logging.warn("Exiting readXML loop") + # TODO under what conditions will this _ever_ occur? return False def _sendThread(self): logging.debug('send thread starting...') - while self.run: - if not self.state.ensure('connected',wait=2): continue + while not self.quit.is_set(): + if not self.state.ensure('connected',wait=2, block_on_transition=True): continue data = None try: @@ -299,7 +311,7 @@ class XMLStream(object): # some sort of event that could be handled by a common thread or the reader # thread to perform reconnect and then re-initialize the handler threads as well. if self.should_reconnect: - self.disconnect(reconnect=True) + self.disconnect(reconnect=True, error=True) def sendRaw(self, data): self.sendqueue.put((1, data)) @@ -309,16 +321,18 @@ class XMLStream(object): self.sendqueue.put((0, data)) return True - def disconnect(self, reconnect=False): + def disconnect(self, reconnect=False, error=False): + logging.info('AAAAAAAAAAAAAAAAAAAAAAAA') with self.state.transition_ctx('connected','disconnected') as locked: + logging.info('BBBBBBBBBBBBBBBBBBBBBBBBBB') if not locked: logging.warning("Already disconnected.") return + logging.debug("Disconnecting...") - self.sendRaw(self.stream_footer) - time.sleep(5) - #send end of stream - #wait for end of stream back + # don't send a footer on error; if the stream is already closed, + # this won't get sent until the stream is re-initialized! + if not error: self.sendRaw(self.stream_footer) #send end of stream try: # self.socket.shutdown(socket.SHUT_RDWR) self.socket.close() @@ -336,7 +350,7 @@ class XMLStream(object): Disconnects and shuts down all event threads. ''' self.disconnect() - self.run = False + self.quit.set() self.scheduler.run = False def incoming_filter(self, xmlobj): @@ -344,9 +358,9 @@ class XMLStream(object): def __spawnEvent(self, xmlobj): "watching xmlOut and processes handlers" + if logging.getLogger().isEnabledFor(logging.DEBUG): + logging.debug("RECV: %s" % cElementTree.tostring(xmlobj)) #convert XML into Stanza - # TODO surround this log statement with an if, it's expensive - logging.debug("RECV: %s" % cElementTree.tostring(xmlobj)) xmlobj = self.incoming_filter(xmlobj) stanza = None for stanza_class in self.__root_stanza: @@ -374,7 +388,7 @@ class XMLStream(object): def _eventRunner(self): logging.debug("Loading event runner") - while self.run: + while not self.quit.is_set(): try: event = self.eventqueue.get(True, timeout=5) except queue.Empty: -- cgit v1.2.3