From 373505f48351a310f7d7ddf0da92fd604682daf1 Mon Sep 17 00:00:00 2001 From: Florent Le Coz Date: Mon, 21 Jul 2014 20:27:53 +0200 Subject: Clean a new bunch of stuf --- slixmpp/xmlstream/stanzabase.py | 2 +- slixmpp/xmlstream/xmlstream.py | 186 ++++++---------------------------------- 2 files changed, 27 insertions(+), 161 deletions(-) (limited to 'slixmpp/xmlstream') diff --git a/slixmpp/xmlstream/stanzabase.py b/slixmpp/xmlstream/stanzabase.py index 0921dde4..75555c34 100644 --- a/slixmpp/xmlstream/stanzabase.py +++ b/slixmpp/xmlstream/stanzabase.py @@ -1580,7 +1580,7 @@ class StanzaBase(ElementBase): stanza sent immediately. Useful for stream initialization. Defaults to ``False``. """ - self.stream.send(self, now=now) + self.stream.send(self) def __copy__(self): """Return a copy of the stanza object that does not share the diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py index 2846d4a4..81b5b55f 100644 --- a/slixmpp/xmlstream/xmlstream.py +++ b/slixmpp/xmlstream/xmlstream.py @@ -45,35 +45,6 @@ from slixmpp.xmlstream.resolver import resolve, default_resolver #: The time in seconds to wait before timing out waiting for response stanzas. RESPONSE_TIMEOUT = 30 -#: The time in seconds to wait for events from the event queue, and also the -#: time between checks for the process stop signal. -WAIT_TIMEOUT = 1.0 - -#: The number of threads to use to handle XML stream events. This is not the -#: same as the number of custom event handling threads. -#: :data:`HANDLER_THREADS` must be at least 1. For Python implementations -#: with a GIL, this should be left at 1, but for implemetnations without -#: a GIL increasing this value can provide better performance. -HANDLER_THREADS = 1 - -#: The time in seconds to delay between attempts to resend data -#: after an SSL error. -SSL_RETRY_DELAY = 0.5 - -#: The maximum number of times to attempt resending data due to -#: an SSL error. -SSL_RETRY_MAX = 10 - -#: Maximum time to delay between connection attempts is one hour. -RECONNECT_MAX_DELAY = 600 - -#: Maximum number of attempts to connect to the server before quitting -#: and raising a 'connect_failed' event. Setting this to ``None`` will -#: allow infinite reconnection attempts, and using ``0`` will disable -#: reconnections. Defaults to ``None``. -RECONNECT_MAX_ATTEMPTS = None - - log = logging.getLogger(__name__) @@ -83,6 +54,11 @@ class RestartStream(Exception): resending the stream header. """ +class NotConnectedError(Exception): + """ + Raised when we try to send something over the wire but we are not + connected. + """ class XMLStream(object): """ @@ -166,36 +142,11 @@ class XMLStream(object): self._der_cert = None - #: The time in seconds to wait for events from the event queue, - #: and also the time between checks for the process stop signal. - self.wait_timeout = WAIT_TIMEOUT - - #: The time in seconds to wait before timing out waiting - #: for response stanzas. - self.response_timeout = RESPONSE_TIMEOUT - #: The current amount to time to delay attempting to reconnect. #: This value doubles (with some jitter) with each failed #: connection attempt up to :attr:`reconnect_max_delay` seconds. self.reconnect_delay = None - #: Maximum time to delay between connection attempts is one hour. - self.reconnect_max_delay = RECONNECT_MAX_DELAY - - #: Maximum number of attempts to connect to the server before - #: quitting and raising a 'connect_failed' event. Setting to - #: ``None`` allows infinite reattempts, while setting it to ``0`` - #: will disable reconnection attempts. Defaults to ``None``. - self.reconnect_max_attempts = RECONNECT_MAX_ATTEMPTS - - #: The time in seconds to delay between attempts to resend data - #: after an SSL error. - self.ssl_retry_max = SSL_RETRY_MAX - - #: The maximum number of times to attempt resending data due to - #: an SSL error. - self.ssl_retry_delay = SSL_RETRY_DELAY - #: The connection state machine tracks if the stream is #: ``'connected'`` or ``'disconnected'``. self.state = StateMachine(('disconnected', 'connected')) @@ -267,20 +218,6 @@ class XMLStream(object): #: :attr:`whitespace_keepalive` is enabled. self.whitespace_keepalive_interval = 300 - #: An :class:`~threading.Event` to signal receiving a closing - #: stream tag from the server. - self.stream_end_event = threading.Event() - self.stream_end_event.set() - - #: An :class:`~threading.Event` to signal the start of a stream - #: session. Until this event fires, the send queue is not used - #: and data is sent immediately over the wire. - self.session_started_event = threading.Event() - - #: The default time in seconds to wait for a session to start - #: after connecting before reconnecting and trying again. - self.session_timeout = 45 - #: Flag for controlling if the session can be considered ended #: if the connection is terminated. self.end_session_on_disconnect = True @@ -312,10 +249,6 @@ class XMLStream(object): #: We use an ID prefix to ensure that all ID values are unique. self._id_prefix = '%s-' % uuid.uuid4() - #: The :attr:`auto_reconnnect` setting controls whether or not - #: the stream will be restarted in the event of an error. - self.auto_reconnect = True - #: The :attr:`disconnect_wait` setting is the default value #: for controlling if the system waits for the send queue to #: empty before ending the stream. This may be overridden by @@ -331,7 +264,6 @@ class XMLStream(object): #: ``_xmpp-client._tcp`` service. self.dns_service = None - self.add_event_handler('connected', self._session_timeout_check) self.add_event_handler('disconnected', self._remove_schedules) self.add_event_handler('session_start', self._start_keepalive) self.add_event_handler('session_start', self._cert_expiration) @@ -887,12 +819,11 @@ class XMLStream(object): # If the handler is disposable, we will go ahead and # remove it now instead of waiting for it to be # processed in the queue. - with self.__event_handlers_lock: - try: - h_index = self.__event_handlers[name].index(handler) - self.__event_handlers[name].pop(h_index) - except: - pass + try: + h_index = self.__event_handlers[name].index(handler) + self.__event_handlers[name].pop(h_index) + except: + pass def schedule(self, name, seconds, callback, args=tuple(), kwargs={}, repeat=False): @@ -954,34 +885,18 @@ class XMLStream(object): """ return xml - def send(self, data, mask=None, timeout=None, now=False, use_filters=True): + def send(self, data, use_filters=True): """A wrapper for :meth:`send_raw()` for sending stanza objects. May optionally block until an expected response is received. :param data: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase` stanza to send on the stream. - :param mask: **DEPRECATED** - An XML string snippet matching the structure - of the expected response. Execution will block - in this thread until the response is received - or a timeout occurs. - :param int timeout: Time in seconds to wait for a response before - continuing. Defaults to :attr:`response_timeout`. - :param bool now: Indicates if the send queue should be skipped, - sending the stanza immediately. Useful mainly - for stream initialization stanzas. - Defaults to ``False``. :param bool use_filters: Indicates if outgoing filters should be applied to the given stanza data. Disabling filters is useful when resending stanzas. Defaults to ``True``. """ - if timeout is None: - timeout = self.response_timeout - if hasattr(mask, 'xml'): - mask = mask.xml - if isinstance(data, ElementBase): if use_filters: for filter in self.__filters['out']: @@ -989,61 +904,37 @@ class XMLStream(object): if data is None: return - if mask is not None: - log.warning("Use of send mask waiters is deprecated.") - wait_for = Waiter("SendWait_%s" % self.new_id(), - MatchXMLMask(mask)) - self.register_handler(wait_for) - if isinstance(data, ElementBase): - with self.send_queue_lock: - if use_filters: - for filter in self.__filters['out_sync']: - data = filter(data) - if data is None: - return - str_data = tostring(data.xml, xmlns=self.default_ns, - stream=self, - top_level=True) - self.send_raw(str_data) + if use_filters: + for filter in self.__filters['out_sync']: + data = filter(data) + if data is None: + return + str_data = tostring(data.xml, xmlns=self.default_ns, + stream=self, + top_level=True) + self.send_raw(str_data) else: self.send_raw(data) - if mask is not None: - return wait_for.wait(timeout) - def send_xml(self, data, mask=None, timeout=None, now=False): - """Send an XML object on the stream, and optionally wait - for a response. + def send_xml(self, data): + """Send an XML object on the stream :param data: The :class:`~xml.etree.ElementTree.Element` XML object to send on the stream. - :param mask: **DEPRECATED** - An XML string snippet matching the structure - of the expected response. Execution will block - in this thread until the response is received - or a timeout occurs. - :param int timeout: Time in seconds to wait for a response before - continuing. Defaults to :attr:`response_timeout`. - :param bool now: Indicates if the send queue should be skipped, - sending the stanza immediately. Useful mainly - for stream initialization stanzas. - Defaults to ``False``. """ - if timeout is None: - timeout = self.response_timeout - return self.send(tostring(data), mask, timeout, now) + return self.send(tostring(data)) def send_raw(self, data): """Send raw data across the stream. :param string data: Any bytes or utf-8 string value. """ + if not self.transport: + raise NotConnectedError() if isinstance(data, str): data = data.encode('utf-8') - if not self.transport: - logger.error("Cannot send data, we are not connected.") - else: - self.transport.write(data) + self.transport.write(data) def _start_thread(self, name, target, track=True): self.__thread[name] = threading.Thread(name=name, target=target) @@ -1055,31 +946,6 @@ class XMLStream(object): with self.__thread_cond: self.__thread_count += 1 - def _end_thread(self, name, early=False): - with self.__thread_cond: - curr_thread = threading.current_thread().name - if curr_thread in self.__active_threads: - self.__thread_count -= 1 - self.__active_threads.remove(curr_thread) - - if early: - log.debug('Threading deadlock prevention!') - log.debug(("Marked %s thread as ended due to " + \ - "disconnect() call. %s threads remain.") % ( - name, self.__thread_count)) - else: - log.debug("Stopped %s thread. %s threads remain." % ( - name, self.__thread_count)) - - else: - log.debug(("Finished exiting %s thread after early " + \ - "termination from disconnect() call. " + \ - "%s threads remain.") % ( - name, self.__thread_count)) - - if self.__thread_count == 0: - self.__thread_cond.notify() - def _build_stanza(self, xml, default_ns=None): """Create a stanza object from a given XML object. -- cgit v1.2.3