summaryrefslogtreecommitdiff
path: root/slixmpp/xmlstream
diff options
context:
space:
mode:
authorFlorent Le Coz <louiz@louiz.org>2014-07-21 20:27:53 +0200
committerFlorent Le Coz <louiz@louiz.org>2014-07-21 20:32:09 +0200
commit373505f48351a310f7d7ddf0da92fd604682daf1 (patch)
treef9e8acc71e984efde700d65944345f9c7c29bda1 /slixmpp/xmlstream
parenta2cad40f9163f7f14a5607853ff42f458844462e (diff)
downloadslixmpp-373505f48351a310f7d7ddf0da92fd604682daf1.tar.gz
slixmpp-373505f48351a310f7d7ddf0da92fd604682daf1.tar.bz2
slixmpp-373505f48351a310f7d7ddf0da92fd604682daf1.tar.xz
slixmpp-373505f48351a310f7d7ddf0da92fd604682daf1.zip
Clean a new bunch of stuf
Diffstat (limited to 'slixmpp/xmlstream')
-rw-r--r--slixmpp/xmlstream/stanzabase.py2
-rw-r--r--slixmpp/xmlstream/xmlstream.py186
2 files changed, 27 insertions, 161 deletions
diff --git a/slixmpp/xmlstream/stanzabase.py b/slixmpp/xmlstream/stanzabase.py
index 0921dde4..75555c34 100644
--- a/slixmpp/xmlstream/stanzabase.py
+++ b/slixmpp/xmlstream/stanzabase.py
@@ -1580,7 +1580,7 @@ class StanzaBase(ElementBase):
stanza sent immediately. Useful for stream
initialization. Defaults to ``False``.
"""
- self.stream.send(self, now=now)
+ self.stream.send(self)
def __copy__(self):
"""Return a copy of the stanza object that does not share the
diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py
index 2846d4a4..81b5b55f 100644
--- a/slixmpp/xmlstream/xmlstream.py
+++ b/slixmpp/xmlstream/xmlstream.py
@@ -45,35 +45,6 @@ from slixmpp.xmlstream.resolver import resolve, default_resolver
#: The time in seconds to wait before timing out waiting for response stanzas.
RESPONSE_TIMEOUT = 30
-#: The time in seconds to wait for events from the event queue, and also the
-#: time between checks for the process stop signal.
-WAIT_TIMEOUT = 1.0
-
-#: The number of threads to use to handle XML stream events. This is not the
-#: same as the number of custom event handling threads.
-#: :data:`HANDLER_THREADS` must be at least 1. For Python implementations
-#: with a GIL, this should be left at 1, but for implemetnations without
-#: a GIL increasing this value can provide better performance.
-HANDLER_THREADS = 1
-
-#: The time in seconds to delay between attempts to resend data
-#: after an SSL error.
-SSL_RETRY_DELAY = 0.5
-
-#: The maximum number of times to attempt resending data due to
-#: an SSL error.
-SSL_RETRY_MAX = 10
-
-#: Maximum time to delay between connection attempts is one hour.
-RECONNECT_MAX_DELAY = 600
-
-#: Maximum number of attempts to connect to the server before quitting
-#: and raising a 'connect_failed' event. Setting this to ``None`` will
-#: allow infinite reconnection attempts, and using ``0`` will disable
-#: reconnections. Defaults to ``None``.
-RECONNECT_MAX_ATTEMPTS = None
-
-
log = logging.getLogger(__name__)
@@ -83,6 +54,11 @@ class RestartStream(Exception):
resending the stream header.
"""
+class NotConnectedError(Exception):
+ """
+ Raised when we try to send something over the wire but we are not
+ connected.
+ """
class XMLStream(object):
"""
@@ -166,36 +142,11 @@ class XMLStream(object):
self._der_cert = None
- #: The time in seconds to wait for events from the event queue,
- #: and also the time between checks for the process stop signal.
- self.wait_timeout = WAIT_TIMEOUT
-
- #: The time in seconds to wait before timing out waiting
- #: for response stanzas.
- self.response_timeout = RESPONSE_TIMEOUT
-
#: The current amount to time to delay attempting to reconnect.
#: This value doubles (with some jitter) with each failed
#: connection attempt up to :attr:`reconnect_max_delay` seconds.
self.reconnect_delay = None
- #: Maximum time to delay between connection attempts is one hour.
- self.reconnect_max_delay = RECONNECT_MAX_DELAY
-
- #: Maximum number of attempts to connect to the server before
- #: quitting and raising a 'connect_failed' event. Setting to
- #: ``None`` allows infinite reattempts, while setting it to ``0``
- #: will disable reconnection attempts. Defaults to ``None``.
- self.reconnect_max_attempts = RECONNECT_MAX_ATTEMPTS
-
- #: The time in seconds to delay between attempts to resend data
- #: after an SSL error.
- self.ssl_retry_max = SSL_RETRY_MAX
-
- #: The maximum number of times to attempt resending data due to
- #: an SSL error.
- self.ssl_retry_delay = SSL_RETRY_DELAY
-
#: The connection state machine tracks if the stream is
#: ``'connected'`` or ``'disconnected'``.
self.state = StateMachine(('disconnected', 'connected'))
@@ -267,20 +218,6 @@ class XMLStream(object):
#: :attr:`whitespace_keepalive` is enabled.
self.whitespace_keepalive_interval = 300
- #: An :class:`~threading.Event` to signal receiving a closing
- #: stream tag from the server.
- self.stream_end_event = threading.Event()
- self.stream_end_event.set()
-
- #: An :class:`~threading.Event` to signal the start of a stream
- #: session. Until this event fires, the send queue is not used
- #: and data is sent immediately over the wire.
- self.session_started_event = threading.Event()
-
- #: The default time in seconds to wait for a session to start
- #: after connecting before reconnecting and trying again.
- self.session_timeout = 45
-
#: Flag for controlling if the session can be considered ended
#: if the connection is terminated.
self.end_session_on_disconnect = True
@@ -312,10 +249,6 @@ class XMLStream(object):
#: We use an ID prefix to ensure that all ID values are unique.
self._id_prefix = '%s-' % uuid.uuid4()
- #: The :attr:`auto_reconnnect` setting controls whether or not
- #: the stream will be restarted in the event of an error.
- self.auto_reconnect = True
-
#: The :attr:`disconnect_wait` setting is the default value
#: for controlling if the system waits for the send queue to
#: empty before ending the stream. This may be overridden by
@@ -331,7 +264,6 @@ class XMLStream(object):
#: ``_xmpp-client._tcp`` service.
self.dns_service = None
- self.add_event_handler('connected', self._session_timeout_check)
self.add_event_handler('disconnected', self._remove_schedules)
self.add_event_handler('session_start', self._start_keepalive)
self.add_event_handler('session_start', self._cert_expiration)
@@ -887,12 +819,11 @@ class XMLStream(object):
# If the handler is disposable, we will go ahead and
# remove it now instead of waiting for it to be
# processed in the queue.
- with self.__event_handlers_lock:
- try:
- h_index = self.__event_handlers[name].index(handler)
- self.__event_handlers[name].pop(h_index)
- except:
- pass
+ try:
+ h_index = self.__event_handlers[name].index(handler)
+ self.__event_handlers[name].pop(h_index)
+ except:
+ pass
def schedule(self, name, seconds, callback, args=tuple(),
kwargs={}, repeat=False):
@@ -954,34 +885,18 @@ class XMLStream(object):
"""
return xml
- def send(self, data, mask=None, timeout=None, now=False, use_filters=True):
+ def send(self, data, use_filters=True):
"""A wrapper for :meth:`send_raw()` for sending stanza objects.
May optionally block until an expected response is received.
:param data: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase`
stanza to send on the stream.
- :param mask: **DEPRECATED**
- An XML string snippet matching the structure
- of the expected response. Execution will block
- in this thread until the response is received
- or a timeout occurs.
- :param int timeout: Time in seconds to wait for a response before
- continuing. Defaults to :attr:`response_timeout`.
- :param bool now: Indicates if the send queue should be skipped,
- sending the stanza immediately. Useful mainly
- for stream initialization stanzas.
- Defaults to ``False``.
:param bool use_filters: Indicates if outgoing filters should be
applied to the given stanza data. Disabling
filters is useful when resending stanzas.
Defaults to ``True``.
"""
- if timeout is None:
- timeout = self.response_timeout
- if hasattr(mask, 'xml'):
- mask = mask.xml
-
if isinstance(data, ElementBase):
if use_filters:
for filter in self.__filters['out']:
@@ -989,61 +904,37 @@ class XMLStream(object):
if data is None:
return
- if mask is not None:
- log.warning("Use of send mask waiters is deprecated.")
- wait_for = Waiter("SendWait_%s" % self.new_id(),
- MatchXMLMask(mask))
- self.register_handler(wait_for)
-
if isinstance(data, ElementBase):
- with self.send_queue_lock:
- if use_filters:
- for filter in self.__filters['out_sync']:
- data = filter(data)
- if data is None:
- return
- str_data = tostring(data.xml, xmlns=self.default_ns,
- stream=self,
- top_level=True)
- self.send_raw(str_data)
+ if use_filters:
+ for filter in self.__filters['out_sync']:
+ data = filter(data)
+ if data is None:
+ return
+ str_data = tostring(data.xml, xmlns=self.default_ns,
+ stream=self,
+ top_level=True)
+ self.send_raw(str_data)
else:
self.send_raw(data)
- if mask is not None:
- return wait_for.wait(timeout)
- def send_xml(self, data, mask=None, timeout=None, now=False):
- """Send an XML object on the stream, and optionally wait
- for a response.
+ def send_xml(self, data):
+ """Send an XML object on the stream
:param data: The :class:`~xml.etree.ElementTree.Element` XML object
to send on the stream.
- :param mask: **DEPRECATED**
- An XML string snippet matching the structure
- of the expected response. Execution will block
- in this thread until the response is received
- or a timeout occurs.
- :param int timeout: Time in seconds to wait for a response before
- continuing. Defaults to :attr:`response_timeout`.
- :param bool now: Indicates if the send queue should be skipped,
- sending the stanza immediately. Useful mainly
- for stream initialization stanzas.
- Defaults to ``False``.
"""
- if timeout is None:
- timeout = self.response_timeout
- return self.send(tostring(data), mask, timeout, now)
+ return self.send(tostring(data))
def send_raw(self, data):
"""Send raw data across the stream.
:param string data: Any bytes or utf-8 string value.
"""
+ if not self.transport:
+ raise NotConnectedError()
if isinstance(data, str):
data = data.encode('utf-8')
- if not self.transport:
- logger.error("Cannot send data, we are not connected.")
- else:
- self.transport.write(data)
+ self.transport.write(data)
def _start_thread(self, name, target, track=True):
self.__thread[name] = threading.Thread(name=name, target=target)
@@ -1055,31 +946,6 @@ class XMLStream(object):
with self.__thread_cond:
self.__thread_count += 1
- def _end_thread(self, name, early=False):
- with self.__thread_cond:
- curr_thread = threading.current_thread().name
- if curr_thread in self.__active_threads:
- self.__thread_count -= 1
- self.__active_threads.remove(curr_thread)
-
- if early:
- log.debug('Threading deadlock prevention!')
- log.debug(("Marked %s thread as ended due to " + \
- "disconnect() call. %s threads remain.") % (
- name, self.__thread_count))
- else:
- log.debug("Stopped %s thread. %s threads remain." % (
- name, self.__thread_count))
-
- else:
- log.debug(("Finished exiting %s thread after early " + \
- "termination from disconnect() call. " + \
- "%s threads remain.") % (
- name, self.__thread_count))
-
- if self.__thread_count == 0:
- self.__thread_cond.notify()
-
def _build_stanza(self, xml, default_ns=None):
"""Create a stanza object from a given XML object.