summaryrefslogtreecommitdiff
path: root/slixmpp
diff options
context:
space:
mode:
Diffstat (limited to 'slixmpp')
-rw-r--r--slixmpp/clientxmpp.py1
-rw-r--r--slixmpp/componentxmpp.py2
-rw-r--r--slixmpp/features/feature_bind/bind.py1
-rw-r--r--slixmpp/features/feature_mechanisms/mechanisms.py4
-rw-r--r--slixmpp/features/feature_session/session.py1
-rw-r--r--slixmpp/features/feature_starttls/starttls.py2
-rw-r--r--slixmpp/plugins/xep_0078/legacyauth.py1
-rw-r--r--slixmpp/plugins/xep_0115/caps.py2
-rw-r--r--slixmpp/plugins/xep_0198/stream_management.py9
-rw-r--r--slixmpp/stanza/iq.py3
-rw-r--r--slixmpp/xmlstream/stanzabase.py2
-rw-r--r--slixmpp/xmlstream/xmlstream.py186
12 files changed, 36 insertions, 178 deletions
diff --git a/slixmpp/clientxmpp.py b/slixmpp/clientxmpp.py
index ae9010d4..fa36ad56 100644
--- a/slixmpp/clientxmpp.py
+++ b/slixmpp/clientxmpp.py
@@ -144,7 +144,6 @@ class ClientXMPP(BaseXMPP):
:param use_ssl: Indicates if the older SSL connection method
should be used. Defaults to ``False``.
"""
- self.session_started_event.clear()
# If an address was provided, disable using DNS SRV lookup;
# otherwise, use the domain from the client JID with the standard
diff --git a/slixmpp/componentxmpp.py b/slixmpp/componentxmpp.py
index c640c3dc..632db189 100644
--- a/slixmpp/componentxmpp.py
+++ b/slixmpp/componentxmpp.py
@@ -143,7 +143,7 @@ class ComponentXMPP(BaseXMPP):
handshake = ET.Element('{jabber:component:accept}handshake')
handshake.text = hashlib.sha1(pre_hash).hexdigest().lower()
- self.send_xml(handshake, now=True)
+ self.send_xml(handshake)
def _handle_handshake(self, xml):
"""The handshake has been accepted.
diff --git a/slixmpp/features/feature_bind/bind.py b/slixmpp/features/feature_bind/bind.py
index f636abf9..e26c3ce6 100644
--- a/slixmpp/features/feature_bind/bind.py
+++ b/slixmpp/features/feature_bind/bind.py
@@ -64,5 +64,4 @@ class FeatureBind(BasePlugin):
if 'session' not in self.features['features']:
log.debug("Established Session")
self.xmpp.sessionstarted = True
- self.xmpp.session_started_event.set()
self.xmpp.event('session_start')
diff --git a/slixmpp/features/feature_mechanisms/mechanisms.py b/slixmpp/features/feature_mechanisms/mechanisms.py
index 3cbb83f2..00dd481c 100644
--- a/slixmpp/features/feature_mechanisms/mechanisms.py
+++ b/slixmpp/features/feature_mechanisms/mechanisms.py
@@ -196,7 +196,7 @@ class FeatureMechanisms(BasePlugin):
self.attempted_mechs.add(self.mech.name)
self.xmpp.disconnect()
else:
- resp.send(now=True)
+ resp.send()
return True
@@ -217,7 +217,7 @@ class FeatureMechanisms(BasePlugin):
else:
if resp.get_value() == '':
resp.del_value()
- resp.send(now=True)
+ resp.send()
def _handle_success(self, stanza):
"""SASL authentication succeeded. Restart the stream."""
diff --git a/slixmpp/features/feature_session/session.py b/slixmpp/features/feature_session/session.py
index 08f7480f..e57405db 100644
--- a/slixmpp/features/feature_session/session.py
+++ b/slixmpp/features/feature_session/session.py
@@ -51,5 +51,4 @@ class FeatureSession(BasePlugin):
log.debug("Established Session")
self.xmpp.sessionstarted = True
- self.xmpp.session_started_event.set()
self.xmpp.event('session_start')
diff --git a/slixmpp/features/feature_starttls/starttls.py b/slixmpp/features/feature_starttls/starttls.py
index a05f755b..5aad8ed4 100644
--- a/slixmpp/features/feature_starttls/starttls.py
+++ b/slixmpp/features/feature_starttls/starttls.py
@@ -55,7 +55,7 @@ class FeatureSTARTTLS(BasePlugin):
elif self.xmpp.disable_starttls:
return False
else:
- self.xmpp.send(features['starttls'], now=True)
+ self.xmpp.send(features['starttls'])
return True
def _handle_starttls_proceed(self, proceed):
diff --git a/slixmpp/plugins/xep_0078/legacyauth.py b/slixmpp/plugins/xep_0078/legacyauth.py
index d3826e59..8d2ea230 100644
--- a/slixmpp/plugins/xep_0078/legacyauth.py
+++ b/slixmpp/plugins/xep_0078/legacyauth.py
@@ -141,7 +141,6 @@ class XEP_0078(BasePlugin):
log.debug("Established Session")
self.xmpp.sessionstarted = True
- self.xmpp.session_started_event.set()
self.xmpp.event('session_start')
return True
diff --git a/slixmpp/plugins/xep_0115/caps.py b/slixmpp/plugins/xep_0115/caps.py
index 378c6ae0..c548de11 100644
--- a/slixmpp/plugins/xep_0115/caps.py
+++ b/slixmpp/plugins/xep_0115/caps.py
@@ -305,7 +305,7 @@ class XEP_0115(BasePlugin):
self.cache_caps(ver, info)
self.assign_verstring(jid, ver)
- if self.xmpp.session_started_event.is_set() and self.broadcast:
+ if self.xmpp.sessionstarted and self.broadcast:
if self.xmpp.is_component or preserve:
for contact in self.xmpp.roster[jid]:
self.xmpp.roster[jid][contact].send_last_presence()
diff --git a/slixmpp/plugins/xep_0198/stream_management.py b/slixmpp/plugins/xep_0198/stream_management.py
index 64052fc5..acf37cd7 100644
--- a/slixmpp/plugins/xep_0198/stream_management.py
+++ b/slixmpp/plugins/xep_0198/stream_management.py
@@ -173,7 +173,7 @@ class XEP_0198(BasePlugin):
ack = stanza.Ack(self.xmpp)
with self.handled_lock:
ack['h'] = self.handled
- self.xmpp.send_raw(str(ack), now=True)
+ self.xmpp.send_raw(str(ack))
def request_ack(self, e=None):
"""Request an ack from the server."""
@@ -199,14 +199,14 @@ class XEP_0198(BasePlugin):
self.enabled.set()
enable = stanza.Enable(self.xmpp)
enable['resume'] = self.allow_resume
- enable.send(now=True)
+ enable.send()
self.handled = 0
elif self.sm_id and self.allow_resume:
self.enabled.set()
resume = stanza.Resume(self.xmpp)
resume['h'] = self.handled
resume['previd'] = self.sm_id
- resume.send(now=True)
+ resume.send()
# Wait for a response before allowing stream feature processing
# to continue. The actual result processing will be done in the
@@ -239,8 +239,7 @@ class XEP_0198(BasePlugin):
self.xmpp.features.add('stream_management')
self._handle_ack(stanza)
for id, stanza in self.unacked_queue:
- self.xmpp.send(stanza, now=True, use_filters=False)
- self.xmpp.session_started_event.set()
+ self.xmpp.send(stanza, use_filters=False)
self.xmpp.event('session_resumed', stanza)
def _handle_failed(self, stanza):
diff --git a/slixmpp/stanza/iq.py b/slixmpp/stanza/iq.py
index 4be819b0..605e226a 100644
--- a/slixmpp/stanza/iq.py
+++ b/slixmpp/stanza/iq.py
@@ -191,9 +191,6 @@ class Iq(RootStanza):
stanza. Only called if there is a callback parameter
(and therefore are in async mode).
"""
- if timeout is None:
- timeout = self.stream.response_timeout
-
if self.stream.session_bind_event.is_set():
matcher = MatchIDSender({
'id': self['id'],
diff --git a/slixmpp/xmlstream/stanzabase.py b/slixmpp/xmlstream/stanzabase.py
index 0921dde4..75555c34 100644
--- a/slixmpp/xmlstream/stanzabase.py
+++ b/slixmpp/xmlstream/stanzabase.py
@@ -1580,7 +1580,7 @@ class StanzaBase(ElementBase):
stanza sent immediately. Useful for stream
initialization. Defaults to ``False``.
"""
- self.stream.send(self, now=now)
+ self.stream.send(self)
def __copy__(self):
"""Return a copy of the stanza object that does not share the
diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py
index 2846d4a4..81b5b55f 100644
--- a/slixmpp/xmlstream/xmlstream.py
+++ b/slixmpp/xmlstream/xmlstream.py
@@ -45,35 +45,6 @@ from slixmpp.xmlstream.resolver import resolve, default_resolver
#: The time in seconds to wait before timing out waiting for response stanzas.
RESPONSE_TIMEOUT = 30
-#: The time in seconds to wait for events from the event queue, and also the
-#: time between checks for the process stop signal.
-WAIT_TIMEOUT = 1.0
-
-#: The number of threads to use to handle XML stream events. This is not the
-#: same as the number of custom event handling threads.
-#: :data:`HANDLER_THREADS` must be at least 1. For Python implementations
-#: with a GIL, this should be left at 1, but for implemetnations without
-#: a GIL increasing this value can provide better performance.
-HANDLER_THREADS = 1
-
-#: The time in seconds to delay between attempts to resend data
-#: after an SSL error.
-SSL_RETRY_DELAY = 0.5
-
-#: The maximum number of times to attempt resending data due to
-#: an SSL error.
-SSL_RETRY_MAX = 10
-
-#: Maximum time to delay between connection attempts is one hour.
-RECONNECT_MAX_DELAY = 600
-
-#: Maximum number of attempts to connect to the server before quitting
-#: and raising a 'connect_failed' event. Setting this to ``None`` will
-#: allow infinite reconnection attempts, and using ``0`` will disable
-#: reconnections. Defaults to ``None``.
-RECONNECT_MAX_ATTEMPTS = None
-
-
log = logging.getLogger(__name__)
@@ -83,6 +54,11 @@ class RestartStream(Exception):
resending the stream header.
"""
+class NotConnectedError(Exception):
+ """
+ Raised when we try to send something over the wire but we are not
+ connected.
+ """
class XMLStream(object):
"""
@@ -166,36 +142,11 @@ class XMLStream(object):
self._der_cert = None
- #: The time in seconds to wait for events from the event queue,
- #: and also the time between checks for the process stop signal.
- self.wait_timeout = WAIT_TIMEOUT
-
- #: The time in seconds to wait before timing out waiting
- #: for response stanzas.
- self.response_timeout = RESPONSE_TIMEOUT
-
#: The current amount to time to delay attempting to reconnect.
#: This value doubles (with some jitter) with each failed
#: connection attempt up to :attr:`reconnect_max_delay` seconds.
self.reconnect_delay = None
- #: Maximum time to delay between connection attempts is one hour.
- self.reconnect_max_delay = RECONNECT_MAX_DELAY
-
- #: Maximum number of attempts to connect to the server before
- #: quitting and raising a 'connect_failed' event. Setting to
- #: ``None`` allows infinite reattempts, while setting it to ``0``
- #: will disable reconnection attempts. Defaults to ``None``.
- self.reconnect_max_attempts = RECONNECT_MAX_ATTEMPTS
-
- #: The time in seconds to delay between attempts to resend data
- #: after an SSL error.
- self.ssl_retry_max = SSL_RETRY_MAX
-
- #: The maximum number of times to attempt resending data due to
- #: an SSL error.
- self.ssl_retry_delay = SSL_RETRY_DELAY
-
#: The connection state machine tracks if the stream is
#: ``'connected'`` or ``'disconnected'``.
self.state = StateMachine(('disconnected', 'connected'))
@@ -267,20 +218,6 @@ class XMLStream(object):
#: :attr:`whitespace_keepalive` is enabled.
self.whitespace_keepalive_interval = 300
- #: An :class:`~threading.Event` to signal receiving a closing
- #: stream tag from the server.
- self.stream_end_event = threading.Event()
- self.stream_end_event.set()
-
- #: An :class:`~threading.Event` to signal the start of a stream
- #: session. Until this event fires, the send queue is not used
- #: and data is sent immediately over the wire.
- self.session_started_event = threading.Event()
-
- #: The default time in seconds to wait for a session to start
- #: after connecting before reconnecting and trying again.
- self.session_timeout = 45
-
#: Flag for controlling if the session can be considered ended
#: if the connection is terminated.
self.end_session_on_disconnect = True
@@ -312,10 +249,6 @@ class XMLStream(object):
#: We use an ID prefix to ensure that all ID values are unique.
self._id_prefix = '%s-' % uuid.uuid4()
- #: The :attr:`auto_reconnnect` setting controls whether or not
- #: the stream will be restarted in the event of an error.
- self.auto_reconnect = True
-
#: The :attr:`disconnect_wait` setting is the default value
#: for controlling if the system waits for the send queue to
#: empty before ending the stream. This may be overridden by
@@ -331,7 +264,6 @@ class XMLStream(object):
#: ``_xmpp-client._tcp`` service.
self.dns_service = None
- self.add_event_handler('connected', self._session_timeout_check)
self.add_event_handler('disconnected', self._remove_schedules)
self.add_event_handler('session_start', self._start_keepalive)
self.add_event_handler('session_start', self._cert_expiration)
@@ -887,12 +819,11 @@ class XMLStream(object):
# If the handler is disposable, we will go ahead and
# remove it now instead of waiting for it to be
# processed in the queue.
- with self.__event_handlers_lock:
- try:
- h_index = self.__event_handlers[name].index(handler)
- self.__event_handlers[name].pop(h_index)
- except:
- pass
+ try:
+ h_index = self.__event_handlers[name].index(handler)
+ self.__event_handlers[name].pop(h_index)
+ except:
+ pass
def schedule(self, name, seconds, callback, args=tuple(),
kwargs={}, repeat=False):
@@ -954,34 +885,18 @@ class XMLStream(object):
"""
return xml
- def send(self, data, mask=None, timeout=None, now=False, use_filters=True):
+ def send(self, data, use_filters=True):
"""A wrapper for :meth:`send_raw()` for sending stanza objects.
May optionally block until an expected response is received.
:param data: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase`
stanza to send on the stream.
- :param mask: **DEPRECATED**
- An XML string snippet matching the structure
- of the expected response. Execution will block
- in this thread until the response is received
- or a timeout occurs.
- :param int timeout: Time in seconds to wait for a response before
- continuing. Defaults to :attr:`response_timeout`.
- :param bool now: Indicates if the send queue should be skipped,
- sending the stanza immediately. Useful mainly
- for stream initialization stanzas.
- Defaults to ``False``.
:param bool use_filters: Indicates if outgoing filters should be
applied to the given stanza data. Disabling
filters is useful when resending stanzas.
Defaults to ``True``.
"""
- if timeout is None:
- timeout = self.response_timeout
- if hasattr(mask, 'xml'):
- mask = mask.xml
-
if isinstance(data, ElementBase):
if use_filters:
for filter in self.__filters['out']:
@@ -989,61 +904,37 @@ class XMLStream(object):
if data is None:
return
- if mask is not None:
- log.warning("Use of send mask waiters is deprecated.")
- wait_for = Waiter("SendWait_%s" % self.new_id(),
- MatchXMLMask(mask))
- self.register_handler(wait_for)
-
if isinstance(data, ElementBase):
- with self.send_queue_lock:
- if use_filters:
- for filter in self.__filters['out_sync']:
- data = filter(data)
- if data is None:
- return
- str_data = tostring(data.xml, xmlns=self.default_ns,
- stream=self,
- top_level=True)
- self.send_raw(str_data)
+ if use_filters:
+ for filter in self.__filters['out_sync']:
+ data = filter(data)
+ if data is None:
+ return
+ str_data = tostring(data.xml, xmlns=self.default_ns,
+ stream=self,
+ top_level=True)
+ self.send_raw(str_data)
else:
self.send_raw(data)
- if mask is not None:
- return wait_for.wait(timeout)
- def send_xml(self, data, mask=None, timeout=None, now=False):
- """Send an XML object on the stream, and optionally wait
- for a response.
+ def send_xml(self, data):
+ """Send an XML object on the stream
:param data: The :class:`~xml.etree.ElementTree.Element` XML object
to send on the stream.
- :param mask: **DEPRECATED**
- An XML string snippet matching the structure
- of the expected response. Execution will block
- in this thread until the response is received
- or a timeout occurs.
- :param int timeout: Time in seconds to wait for a response before
- continuing. Defaults to :attr:`response_timeout`.
- :param bool now: Indicates if the send queue should be skipped,
- sending the stanza immediately. Useful mainly
- for stream initialization stanzas.
- Defaults to ``False``.
"""
- if timeout is None:
- timeout = self.response_timeout
- return self.send(tostring(data), mask, timeout, now)
+ return self.send(tostring(data))
def send_raw(self, data):
"""Send raw data across the stream.
:param string data: Any bytes or utf-8 string value.
"""
+ if not self.transport:
+ raise NotConnectedError()
if isinstance(data, str):
data = data.encode('utf-8')
- if not self.transport:
- logger.error("Cannot send data, we are not connected.")
- else:
- self.transport.write(data)
+ self.transport.write(data)
def _start_thread(self, name, target, track=True):
self.__thread[name] = threading.Thread(name=name, target=target)
@@ -1055,31 +946,6 @@ class XMLStream(object):
with self.__thread_cond:
self.__thread_count += 1
- def _end_thread(self, name, early=False):
- with self.__thread_cond:
- curr_thread = threading.current_thread().name
- if curr_thread in self.__active_threads:
- self.__thread_count -= 1
- self.__active_threads.remove(curr_thread)
-
- if early:
- log.debug('Threading deadlock prevention!')
- log.debug(("Marked %s thread as ended due to " + \
- "disconnect() call. %s threads remain.") % (
- name, self.__thread_count))
- else:
- log.debug("Stopped %s thread. %s threads remain." % (
- name, self.__thread_count))
-
- else:
- log.debug(("Finished exiting %s thread after early " + \
- "termination from disconnect() call. " + \
- "%s threads remain.") % (
- name, self.__thread_count))
-
- if self.__thread_count == 0:
- self.__thread_cond.notify()
-
def _build_stanza(self, xml, default_ns=None):
"""Create a stanza object from a given XML object.