From 5c769632e8d35ea76990a1de3ab405c57a21482e Mon Sep 17 00:00:00 2001 From: Florent Le Coz Date: Mon, 21 Jul 2014 20:31:37 +0200 Subject: Make connect(), abort() and reconnect() work All the auto_reconnect, connect_retry logic and that kind of stuf has been entirely removed. --- slixmpp/xmlstream/xmlstream.py | 133 ++++++++++++++--------------------------- 1 file changed, 46 insertions(+), 87 deletions(-) diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 81b5b55f..8f7ecd7e 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -142,11 +142,6 @@ class XMLStream(object): self._der_cert = None - #: The current amount to time to delay attempting to reconnect. - #: This value doubles (with some jitter) with each failed - #: connection attempt up to :attr:`reconnect_max_delay` seconds. - self.reconnect_delay = None - #: The connection state machine tracks if the stream is #: ``'connected'`` or ``'disconnected'``. self.state = StateMachine(('disconnected', 'connected')) @@ -241,7 +236,6 @@ class XMLStream(object): self.__thread_cond = threading.Condition() self.__active_threads = set() self._use_daemons = False - self._disconnect_wait_for_threads = True self._id = 0 self._id_lock = threading.Lock() @@ -249,13 +243,6 @@ class XMLStream(object): #: We use an ID prefix to ensure that all ID values are unique. self._id_prefix = '%s-' % uuid.uuid4() - #: The :attr:`disconnect_wait` setting is the default value - #: for controlling if the system waits for the send queue to - #: empty before ending the stream. This may be overridden by - #: passing ``wait=True`` or ``wait=False`` to :meth:`disconnect`. - #: The default :attr:`disconnect_wait` value is ``False``. - self.disconnect_wait = False - #: A list of DNS results that have not yet been tried. self.dns_answers = [] @@ -380,6 +367,7 @@ class XMLStream(object): self.parser = xml.etree.ElementTree.XMLPullParser(("start", "end")) def connection_made(self, transport): + self.event("connected") self.transport = transport self.socket = self.transport.get_extra_info("socket") self.init_parser() @@ -396,102 +384,72 @@ class XMLStream(object): stream=self, top_level=True, open_only=True)) - # Perform any stream initialization actions, such - # as handshakes. - self.stream_end_event.clear() - self.start_stream_handler(self.xml_root) - - # We have a successful stream connection, so reset - # exponential backoff for new reconnect attempts. - self.reconnect_delay = 1.0 self.xml_depth += 1 if event == 'end': self.xml_depth -= 1 if self.xml_depth == 0: # The stream's root element has closed, # terminating the stream. - log.debug("End of stream recieved") - self.stream_end_event.set() - return False + log.debug("End of stream received") + self.abort() elif self.xml_depth == 1: # We only raise events for stanzas that are direct # children of the root element. - try: - self.__spawn_event(xml) - except RestartStream: - return True + self.__spawn_event(xml) if self.xml_root is not None: # Keep the root element empty of children to # save on memory use. self.xml_root.clear() - def connection_lost(self): + def eof_received(self): + """ + When the TCP connection is properly closed by the remote end + """ + log.debug("eof_received") + + def connection_lost(self, exception): + """ + On any kind of disconnection + """ + log.warning("connection_lost: %s", (exception,)) + if self.end_session_on_disconnect: + self.event('session_end') self.parser = None self.transport = None self.socket = None + self.event("disconnected") - def _session_timeout_check(self, event=None): - """ - Add check to ensure that a session is established within - a reasonable amount of time. - """ + def disconnect(self, wait=2.0): + """Close the XML stream and wait for an acknowldgement from the server for + at most `wait` seconds. After the given number of seconds has + passed without a response from the serveur, abort() is called. If + wait is 0.0, this is equivalent to calling abort() directly. + + Does nothing if we are not connected. + + :param wait: Time to wait for a response from the server. - def _handle_session_timeout(): - if not self.session_started_event.is_set(): - log.debug("Session start has taken more " + \ - "than %d seconds", self.session_timeout) - self.disconnect(reconnect=self.auto_reconnect) - - self.schedule("Session timeout check", - self.session_timeout, - _handle_session_timeout) - - def disconnect(self, reconnect=False, wait=None, send_close=True): - """Terminate processing and close the XML streams. - - Optionally, the connection may be reconnected and - resume processing afterwards. - - If the disconnect should take place after all items - in the send queue have been sent, use ``wait=True``. - - .. warning:: - - If you are constantly adding items to the queue - such that it is never empty, then the disconnect will - not occur and the call will continue to block. - - :param reconnect: Flag indicating if the connection - and processing should be restarted. - Defaults to ``False``. - :param wait: Flag indicating if the send queue should - be emptied before disconnecting, overriding - :attr:`disconnect_wait`. - :param send_close: Flag indicating if the stream footer - should be sent before terminating the - connection. Setting this to ``False`` - prevents error loops when trying to - disconnect after a socket error. """ - # TODO - pass + if self.transport: + self.send_raw(self.stream_footer) + self.schedule('Disconnect wait', wait, + self.abort, repeat=False) def abort(self): - self.session_started_event.clear() - if self._disconnect_wait_for_threads: - self._wait_for_threads() - try: - self.socket.shutdown(Socket.SHUT_RDWR) - self.socket.close() - except Socket.error: - pass - self.state.transition_any(['connected', 'disconnected'], 'disconnected', func=lambda: True) - self.event("killed", direct=True) - - def reconnect(self, reattempt=True, wait=False, send_close=True): - """Reset the stream's state and reconnect to the server.""" + """ + Forcibly close the connection + """ + if self.transport: + self.transport.abort() + self.event("killed", direct=True) + + def reconnect(self, wait=2.0): + """Calls disconnect(), and once we are disconnected (after the timeout, or + when the server acknowledgement is received), call connect() + """ log.debug("reconnecting...") - self.connect() + self.disconnect(wait) + self.add_event_handler('disconnected', self.connect, disposable=True) def configure_socket(self): """Set timeout and other options for self.socket. @@ -581,9 +539,10 @@ class XMLStream(object): repeat=True) def _remove_schedules(self, event): - """Remove whitespace keepalive and certificate expiration schedules.""" + """Remove some schedules that become pointless when disconnected""" self.cancel_schedule('Whitespace Keepalive') self.cancel_schedule('Certificate Expiration') + self.cancel_schedule('Disconnect wait') def start_stream_handler(self, xml): """Perform any initialization actions, such as handshakes, -- cgit v1.2.3