summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--slixmpp/xmlstream/xmlstream.py230
1 files changed, 39 insertions, 191 deletions
diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py
index 1540ae7a..8552dc6b 100644
--- a/slixmpp/xmlstream/xmlstream.py
+++ b/slixmpp/xmlstream/xmlstream.py
@@ -23,7 +23,6 @@ import signal
import socket as Socket
import ssl
import sys
-import threading
import time
import random
import weakref
@@ -46,7 +45,6 @@ RESPONSE_TIMEOUT = 30
log = logging.getLogger(__name__)
-
class NotConnectedError(Exception):
"""
Raised when we try to send something over the wire but we are not
@@ -205,28 +203,15 @@ class XMLStream(object):
#: if the connection is terminated.
self.end_session_on_disconnect = True
- #: A queue of string data to be sent over the stream.
- self.send_queue = Queue()
- self.send_queue_lock = threading.Lock()
- self.send_lock = threading.RLock()
-
- self.__failed_send_stanza = None
-
#: A mapping of XML namespaces to well-known prefixes.
self.namespace_map = {StanzaBase.xml_ns: 'xml'}
- self.__thread = {}
self.__root_stanza = []
self.__handlers = []
self.__event_handlers = {}
self.__filters = {'in': [], 'out': [], 'out_sync': []}
- self.__thread_count = 0
- self.__thread_cond = threading.Condition()
- self.__active_threads = set()
- self._use_daemons = False
self._id = 0
- self._id_lock = threading.Lock()
#: We use an ID prefix to ensure that all ID values are unique.
self._id_prefix = '%s-' % uuid.uuid4()
@@ -241,53 +226,6 @@ class XMLStream(object):
self.add_event_handler('disconnected', self._remove_schedules)
self.add_event_handler('session_start', self._start_keepalive)
- self.add_event_handler('session_start', self._cert_expiration)
-
- def use_signals(self, signals=None):
- """Register signal handlers for ``SIGHUP`` and ``SIGTERM``.
-
- By using signals, a ``'killed'`` event will be raised when the
- application is terminated.
-
- If a signal handler already existed, it will be executed first,
- before the ``'killed'`` event is raised.
-
- :param list signals: A list of signal names to be monitored.
- Defaults to ``['SIGHUP', 'SIGTERM']``.
- """
- if signals is None:
- signals = ['SIGHUP', 'SIGTERM']
-
- existing_handlers = {}
- for sig_name in signals:
- if hasattr(signal, sig_name):
- sig = getattr(signal, sig_name)
- handler = signal.getsignal(sig)
- if handler:
- existing_handlers[sig] = handler
-
- def handle_kill(signum, frame):
- """
- Capture kill event and disconnect cleanly after first
- spawning the ``'killed'`` event.
- """
-
- if signum in existing_handlers and \
- existing_handlers[signum] != handle_kill:
- existing_handlers[signum](signum, frame)
-
- self.event("killed", direct=True)
- self.disconnect()
-
- try:
- for sig_name in signals:
- if hasattr(signal, sig_name):
- sig = getattr(signal, sig_name)
- signal.signal(sig, handle_kill)
- self.__signals_installed = True
- except:
- log.debug("Can not set interrupt signal handlers. " + \
- "Slixmpp is not running from a main thread.")
def new_id(self):
"""Generate and return a new stream ID in hexadecimal form.
@@ -296,9 +234,8 @@ class XMLStream(object):
ID values. Using this method ensures that all new ID values
are unique in this stream.
"""
- with self._id_lock:
- self._id += 1
- return self.get_id()
+ self._id += 1
+ return self.get_id()
def get_id(self):
"""Return the current unique stream ID in hexadecimal form."""
@@ -350,11 +287,16 @@ class XMLStream(object):
asyncio.async(connect_routine)
def init_parser(self):
+ """init the XML parser. The parser must always be reset for each new
+ connexion
+ """
self.xml_depth = 0
self.xml_root = None
self.parser = xml.etree.ElementTree.XMLPullParser(("start", "end"))
def connection_made(self, transport):
+ """Called when the TCP connection has been established with the server
+ """
self.event("connected")
self.transport = transport
self.socket = self.transport.get_extra_info("socket")
@@ -362,6 +304,12 @@ class XMLStream(object):
self.send_raw(self.stream_header)
def data_received(self, data):
+ """Called when incoming data is received on the socket.
+
+ We feed that data to the parser and the see if this produced any XML
+ event. This could trigger one or more event (a stanza is received,
+ the stream is opened, etc).
+ """
self.parser.feed(data)
for event, xml in self.parser.read_events():
if event == 'start':
@@ -372,6 +320,7 @@ class XMLStream(object):
stream=self,
top_level=True,
open_only=True))
+ self.start_stream_handler(self.xml_root)
self.xml_depth += 1
if event == 'end':
self.xml_depth -= 1
@@ -381,8 +330,8 @@ class XMLStream(object):
log.debug("End of stream received")
self.abort()
elif self.xml_depth == 1:
- # We only raise events for stanzas that are direct
- # children of the root element.
+ # A stanza is an XML element that is a direct child of
+ # the root element, hence the check of depth == 1
self.__spawn_event(xml)
if self.xml_root is not None:
# Keep the root element empty of children to
@@ -390,28 +339,31 @@ class XMLStream(object):
self.xml_root.clear()
def eof_received(self):
- """
- When the TCP connection is properly closed by the remote end
+ """When the TCP connection is properly closed by the remote end
"""
log.debug("eof_received")
def connection_lost(self, exception):
+ """On any kind of disconnection, initiated by us or not. This signals the
+ closure of the TCP connection
"""
- On any kind of disconnection
- """
- log.warning("connection_lost: %s", (exception,))
+ log.info("connection_lost: %s", (exception,))
+ self.event("disconnected")
if self.end_session_on_disconnect:
self.event('session_end')
+ # All these objects are associated with one TCP connection. Since
+ # we are not connected anymore, destroy them
self.parser = None
self.transport = None
self.socket = None
- self.event("disconnected")
def disconnect(self, wait=2.0):
"""Close the XML stream and wait for an acknowldgement from the server for
at most `wait` seconds. After the given number of seconds has
- passed without a response from the serveur, abort() is called. If
- wait is 0.0, this is equivalent to calling abort() directly.
+ passed without a response from the serveur, or when the server
+ successfuly responds with a closure of its own stream, abort() is
+ called. If wait is 0.0, this is almost equivalent to calling abort()
+ directly.
Does nothing if we are not connected.
@@ -429,7 +381,7 @@ class XMLStream(object):
"""
if self.transport:
self.transport.abort()
- self.event("killed", direct=True)
+ self.event("killed")
def reconnect(self, wait=2.0):
"""Calls disconnect(), and once we are disconnected (after the timeout, or
@@ -475,40 +427,6 @@ class XMLStream(object):
server_hostname=self.address[0])
asyncio.async(ssl_connect_routine)
- def _cert_expiration(self, event):
- """Schedule an event for when the TLS certificate expires."""
-
- if not self._der_cert:
- log.warn("TLS or SSL was enabled, but no certificate was found.")
- return
-
- def restart():
- if not self.event_handled('ssl_expired_cert'):
- log.warn("The server certificate has expired. Restarting.")
- self.reconnect()
- else:
- pem_cert = ssl.DER_cert_to_PEM_cert(self._der_cert)
- self.event('ssl_expired_cert', pem_cert)
-
- cert_ttl = cert.get_ttl(self._der_cert)
- if cert_ttl is None:
- return
-
- if cert_ttl.days < 0:
- log.warn('CERT: Certificate has expired.')
- restart()
-
- try:
- total_seconds = cert_ttl.total_seconds()
- except AttributeError:
- # for Python < 2.7
- total_seconds = (cert_ttl.microseconds + (cert_ttl.seconds + cert_ttl.days * 24 * 3600) * 10**6) / 10**6
-
- log.info('CERT: Time until certificate expiration: %s' % cert_ttl)
- self.schedule('Certificate Expiration',
- total_seconds,
- restart)
-
def _start_keepalive(self, event):
"""Begin sending whitespace periodically to keep the connection alive.
@@ -596,39 +514,6 @@ class XMLStream(object):
"""Remove an incoming or outgoing filter."""
self.__filters[mode].remove(handler)
- def add_handler(self, mask, pointer, name=None, disposable=False,
- threaded=False, filter=False, instream=False):
- """A shortcut method for registering a handler using XML masks.
-
- The use of :meth:`register_handler()` is preferred.
-
- :param mask: An XML snippet matching the structure of the
- stanzas that will be passed to this handler.
- :param pointer: The handler function itself.
- :parm name: A unique name for the handler. A name will
- be generated if one is not provided.
- :param disposable: Indicates if the handler should be discarded
- after one use.
- :param threaded: **DEPRECATED**.
- Remains for backwards compatibility.
- :param filter: **DEPRECATED**.
- Remains for backwards compatibility.
- :param instream: Indicates if the handler should execute during
- stream processing and not during normal event
- processing.
- """
- # To prevent circular dependencies, we must load the matcher
- # and handler classes here.
-
- if name is None:
- name = 'add_handler_%s' % self.new_id()
- self.register_handler(
- XMLCallback(name,
- MatchXMLMask(mask, self.default_ns),
- pointer,
- once=disposable,
- instream=instream))
-
def register_handler(self, handler, before=None, after=None):
"""Add a stream event handler that will be executed when a matching
stanza is received.
@@ -688,22 +573,19 @@ class XMLStream(object):
else:
return next(self.dns_answers)
- def add_event_handler(self, name, pointer,
- threaded=False, disposable=False):
+ def add_event_handler(self, name, pointer, disposable=False):
"""Add a custom event handler that will be executed whenever
its event is manually triggered.
:param name: The name of the event that will trigger
this handler.
:param pointer: The function to execute.
- :param threaded: If set to ``True``, the handler will execute
- in its own thread. Defaults to ``False``.
:param disposable: If set to ``True``, the handler will be
discarded after one use. Defaults to ``False``.
"""
if not name in self.__event_handlers:
self.__event_handlers[name] = []
- self.__event_handlers[name].append((pointer, threaded, disposable))
+ self.__event_handlers[name].append((pointer, disposable))
def del_event_handler(self, name, pointer):
"""Remove a function as a handler for an event.
@@ -747,30 +629,32 @@ class XMLStream(object):
for handler in handlers:
#TODO: Data should not be copied, but should be read only,
# but this might break current code so it's left for future.
+ handler_callback, disposable = handler
out_data = copy.copy(data) if len(handlers) > 1 else data
old_exception = getattr(data, 'exception', None)
if direct:
try:
- handler[0](out_data)
+ handler_callback(out_data)
except Exception as e:
error_msg = 'Error processing event handler: %s'
- log.exception(error_msg, str(handler[0]))
+ log.exception(error_msg, str(handler_callback))
if old_exception:
old_exception(e)
else:
self.exception(e)
else:
self.run_event(('event', handler, out_data))
- if handler[2]:
+ if disposable:
# If the handler is disposable, we will go ahead and
# remove it now instead of waiting for it to be
# processed in the queue.
try:
h_index = self.__event_handlers[name].index(handler)
- self.__event_handlers[name].pop(h_index)
except:
pass
+ else:
+ self.__event_handlers[name].pop(h_index)
def schedule(self, name, seconds, callback, args=tuple(),
kwargs={}, repeat=False):
@@ -877,22 +761,13 @@ class XMLStream(object):
:param string data: Any bytes or utf-8 string value.
"""
+ print("SEND: %s" % (data))
if not self.transport:
raise NotConnectedError()
if isinstance(data, str):
data = data.encode('utf-8')
self.transport.write(data)
- def _start_thread(self, name, target, track=True):
- self.__thread[name] = threading.Thread(name=name, target=target)
- self.__thread[name].daemon = self._use_daemons
- self.__thread[name].start()
-
- if track:
- self.__active_threads.add(name)
- with self.__thread_cond:
- self.__thread_count += 1
-
def _build_stanza(self, xml, default_ns=None):
"""Create a stanza object from a given XML object.
@@ -965,25 +840,6 @@ class XMLStream(object):
if unhandled:
stanza.unhandled()
- def _threaded_event_wrapper(self, func, args):
- """Capture exceptions for event handlers that run
- in individual threads.
-
- :param func: The event handler to execute.
- :param args: Arguments to the event handler.
- """
- # this is always already copied before this is invoked
- orig = args[0]
- try:
- func(*args)
- except Exception as e:
- error_msg = 'Error processing event handler: %s'
- log.exception(error_msg, str(func))
- if hasattr(orig, 'exception'):
- orig.exception(e)
- else:
- self.exception(e)
-
def run_event(self, event):
etype, handler = event[0:2]
args = event[2:]
@@ -1005,17 +861,9 @@ class XMLStream(object):
log.exception('Error processing scheduled task')
self.exception(e)
elif etype == 'event':
- func, threaded, disposable = handler
+ func, disposable = handler
try:
- if threaded:
- x = threading.Thread(
- name="Event_%s" % str(func),
- target=self._threaded_event_wrapper,
- args=(func, args))
- x.daemon = self._use_daemons
- x.start()
- else:
- func(*args)
+ func(*args)
except Exception as e:
error_msg = 'Error processing event handler: %s'
log.exception(error_msg, str(func))