summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--slixmpp/xmlstream/xmlstream.py133
1 files changed, 46 insertions, 87 deletions
diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py
index 81b5b55f..8f7ecd7e 100644
--- a/slixmpp/xmlstream/xmlstream.py
+++ b/slixmpp/xmlstream/xmlstream.py
@@ -142,11 +142,6 @@ class XMLStream(object):
self._der_cert = None
- #: The current amount to time to delay attempting to reconnect.
- #: This value doubles (with some jitter) with each failed
- #: connection attempt up to :attr:`reconnect_max_delay` seconds.
- self.reconnect_delay = None
-
#: The connection state machine tracks if the stream is
#: ``'connected'`` or ``'disconnected'``.
self.state = StateMachine(('disconnected', 'connected'))
@@ -241,7 +236,6 @@ class XMLStream(object):
self.__thread_cond = threading.Condition()
self.__active_threads = set()
self._use_daemons = False
- self._disconnect_wait_for_threads = True
self._id = 0
self._id_lock = threading.Lock()
@@ -249,13 +243,6 @@ class XMLStream(object):
#: We use an ID prefix to ensure that all ID values are unique.
self._id_prefix = '%s-' % uuid.uuid4()
- #: The :attr:`disconnect_wait` setting is the default value
- #: for controlling if the system waits for the send queue to
- #: empty before ending the stream. This may be overridden by
- #: passing ``wait=True`` or ``wait=False`` to :meth:`disconnect`.
- #: The default :attr:`disconnect_wait` value is ``False``.
- self.disconnect_wait = False
-
#: A list of DNS results that have not yet been tried.
self.dns_answers = []
@@ -380,6 +367,7 @@ class XMLStream(object):
self.parser = xml.etree.ElementTree.XMLPullParser(("start", "end"))
def connection_made(self, transport):
+ self.event("connected")
self.transport = transport
self.socket = self.transport.get_extra_info("socket")
self.init_parser()
@@ -396,102 +384,72 @@ class XMLStream(object):
stream=self,
top_level=True,
open_only=True))
- # Perform any stream initialization actions, such
- # as handshakes.
- self.stream_end_event.clear()
- self.start_stream_handler(self.xml_root)
-
- # We have a successful stream connection, so reset
- # exponential backoff for new reconnect attempts.
- self.reconnect_delay = 1.0
self.xml_depth += 1
if event == 'end':
self.xml_depth -= 1
if self.xml_depth == 0:
# The stream's root element has closed,
# terminating the stream.
- log.debug("End of stream recieved")
- self.stream_end_event.set()
- return False
+ log.debug("End of stream received")
+ self.abort()
elif self.xml_depth == 1:
# We only raise events for stanzas that are direct
# children of the root element.
- try:
- self.__spawn_event(xml)
- except RestartStream:
- return True
+ self.__spawn_event(xml)
if self.xml_root is not None:
# Keep the root element empty of children to
# save on memory use.
self.xml_root.clear()
- def connection_lost(self):
+ def eof_received(self):
+ """
+ When the TCP connection is properly closed by the remote end
+ """
+ log.debug("eof_received")
+
+ def connection_lost(self, exception):
+ """
+ On any kind of disconnection
+ """
+ log.warning("connection_lost: %s", (exception,))
+ if self.end_session_on_disconnect:
+ self.event('session_end')
self.parser = None
self.transport = None
self.socket = None
+ self.event("disconnected")
- def _session_timeout_check(self, event=None):
- """
- Add check to ensure that a session is established within
- a reasonable amount of time.
- """
+ def disconnect(self, wait=2.0):
+ """Close the XML stream and wait for an acknowldgement from the server for
+ at most `wait` seconds. After the given number of seconds has
+ passed without a response from the serveur, abort() is called. If
+ wait is 0.0, this is equivalent to calling abort() directly.
+
+ Does nothing if we are not connected.
+
+ :param wait: Time to wait for a response from the server.
- def _handle_session_timeout():
- if not self.session_started_event.is_set():
- log.debug("Session start has taken more " + \
- "than %d seconds", self.session_timeout)
- self.disconnect(reconnect=self.auto_reconnect)
-
- self.schedule("Session timeout check",
- self.session_timeout,
- _handle_session_timeout)
-
- def disconnect(self, reconnect=False, wait=None, send_close=True):
- """Terminate processing and close the XML streams.
-
- Optionally, the connection may be reconnected and
- resume processing afterwards.
-
- If the disconnect should take place after all items
- in the send queue have been sent, use ``wait=True``.
-
- .. warning::
-
- If you are constantly adding items to the queue
- such that it is never empty, then the disconnect will
- not occur and the call will continue to block.
-
- :param reconnect: Flag indicating if the connection
- and processing should be restarted.
- Defaults to ``False``.
- :param wait: Flag indicating if the send queue should
- be emptied before disconnecting, overriding
- :attr:`disconnect_wait`.
- :param send_close: Flag indicating if the stream footer
- should be sent before terminating the
- connection. Setting this to ``False``
- prevents error loops when trying to
- disconnect after a socket error.
"""
- # TODO
- pass
+ if self.transport:
+ self.send_raw(self.stream_footer)
+ self.schedule('Disconnect wait', wait,
+ self.abort, repeat=False)
def abort(self):
- self.session_started_event.clear()
- if self._disconnect_wait_for_threads:
- self._wait_for_threads()
- try:
- self.socket.shutdown(Socket.SHUT_RDWR)
- self.socket.close()
- except Socket.error:
- pass
- self.state.transition_any(['connected', 'disconnected'], 'disconnected', func=lambda: True)
- self.event("killed", direct=True)
-
- def reconnect(self, reattempt=True, wait=False, send_close=True):
- """Reset the stream's state and reconnect to the server."""
+ """
+ Forcibly close the connection
+ """
+ if self.transport:
+ self.transport.abort()
+ self.event("killed", direct=True)
+
+ def reconnect(self, wait=2.0):
+ """Calls disconnect(), and once we are disconnected (after the timeout, or
+ when the server acknowledgement is received), call connect()
+ """
log.debug("reconnecting...")
- self.connect()
+ self.disconnect(wait)
+ self.add_event_handler('disconnected', self.connect, disposable=True)
def configure_socket(self):
"""Set timeout and other options for self.socket.
@@ -581,9 +539,10 @@ class XMLStream(object):
repeat=True)
def _remove_schedules(self, event):
- """Remove whitespace keepalive and certificate expiration schedules."""
+ """Remove some schedules that become pointless when disconnected"""
self.cancel_schedule('Whitespace Keepalive')
self.cancel_schedule('Certificate Expiration')
+ self.cancel_schedule('Disconnect wait')
def start_stream_handler(self, xml):
"""Perform any initialization actions, such as handshakes,