From efe1b9f5a90fb445f765ee26aac671056a065ecb Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 9 Dec 2011 23:56:39 -0800 Subject: Allow sending stanzas on session_end. May set self.disconnect_wait=True so that all disconnect calls wait for the send queue to empty, unless explicitly overridden with wait=False. The session_end now fires before closing the socket so that final stanzas may be sent, such as unavailable presences for components. --- sleekxmpp/xmlstream/xmlstream.py | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) (limited to 'sleekxmpp') diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 1a0b6241..3e569082 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -276,6 +276,13 @@ class XMLStream(object): #: the stream will be restarted in the event of an error. self.auto_reconnect = True + #: The :attr:`disconnect_wait` setting is the default value + #: for controlling if the system waits for the send queue to + #: empty before ending the stream. This may be overridden by + #: passing ``wait=True`` or ``wait=False`` to :meth:`disconnect`. + #: The default :attr:`disconnect_wait` value is ``False``. + self.disconnect_wait = False + #: A list of DNS results that have not yet been tried. self.dns_answers = [] @@ -402,6 +409,7 @@ class XMLStream(object): try: while elapsed < delay and not self.stop.is_set(): time.sleep(0.1) + elapsed += 0.1 except KeyboardInterrupt: self.stop.set() return False @@ -519,7 +527,7 @@ class XMLStream(object): self.session_timeout, _handle_session_timeout) - def disconnect(self, reconnect=False, wait=False): + def disconnect(self, reconnect=False, wait=None): """Terminate processing and close the XML streams. Optionally, the connection may be reconnected and @@ -538,14 +546,20 @@ class XMLStream(object): and processing should be restarted. Defaults to ``False``. :param wait: Flag indicating if the send queue should - be emptied before disconnecting. + be emptied before disconnecting, overriding + :attr:`disconnect_wait`. """ self.state.transition('connected', 'disconnected', func=self._disconnect, args=(reconnect, wait)) - def _disconnect(self, reconnect=False, wait=False): + def _disconnect(self, reconnect=False, wait=None): + self.event('session_end', direct=True) + # Wait for the send queue to empty. - if wait: + if wait is not None: + if wait: + self.send_queue.join() + elif self.disconnect_wait: self.send_queue.join() # Send the end of stream marker. @@ -566,7 +580,6 @@ class XMLStream(object): self.event('socket_error', serr) finally: #clear your application state - self.event('session_end', direct=True) self.event("disconnected", direct=True) return True @@ -1119,6 +1132,7 @@ class XMLStream(object): # Additional passes will be made only if an error occurs and # reconnecting is permitted. while True: + shutdown = False try: # The call to self.__read_xml will block and prevent # the body of the loop from running until a disconnect @@ -1136,16 +1150,16 @@ class XMLStream(object): if not self.__read_xml(): # If the server terminated the stream, end processing break - except SyntaxError as e: - log.error("Error reading from XML stream.") - self.exception(e) except KeyboardInterrupt: log.debug("Keyboard Escape Detected in _process") - self.stop.set() + self.event('killed', direct=True) + shutdown = True except SystemExit: log.debug("SystemExit in _process") - self.stop.set() - self.scheduler.quit() + shutdown = True + except SyntaxError as e: + log.error("Error reading from XML stream.") + self.exception(e) except Socket.error as serr: self.event('socket_error', serr) log.exception('Socket Error') @@ -1154,7 +1168,8 @@ class XMLStream(object): log.exception('Connection error.') self.exception(e) - if not self.stop.is_set() and self.auto_reconnect: + if not shutdown and not self.stop.is_set() \ + and self.auto_reconnect: self.reconnect() else: self.disconnect() -- cgit v1.2.3