summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r--sleekxmpp/xmlstream/filesocket.py2
-rw-r--r--sleekxmpp/xmlstream/stanzabase.py12
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py153
3 files changed, 113 insertions, 54 deletions
diff --git a/sleekxmpp/xmlstream/filesocket.py b/sleekxmpp/xmlstream/filesocket.py
index 441ff875..fd81864b 100644
--- a/sleekxmpp/xmlstream/filesocket.py
+++ b/sleekxmpp/xmlstream/filesocket.py
@@ -22,6 +22,8 @@ class FileSocket(_fileobject):
def read(self, size=4096):
"""Read data from the socket as if it were a file."""
+ if self._sock is None:
+ return None
data = self._sock.recv(size)
if data is not None:
return data
diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py
index 28f78f3c..f1a9e1f5 100644
--- a/sleekxmpp/xmlstream/stanzabase.py
+++ b/sleekxmpp/xmlstream/stanzabase.py
@@ -1255,9 +1255,15 @@ class StanzaBase(ElementBase):
log.exception('Error handling {%s}%s stanza' % (self.namespace,
self.name))
- def send(self):
- """Queue the stanza to be sent on the XML stream."""
- self.stream.sendRaw(self.__str__())
+ def send(self, now=False):
+ """
+ Queue the stanza to be sent on the XML stream.
+ Arguments:
+ now -- Indicates if the queue should be skipped and the
+ stanza sent immediately. Useful for stream
+ initialization. Defaults to False.
+ """
+ self.stream.send_raw(self.__str__(), now=now)
def __copy__(self):
"""
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 1c165562..2d72de5f 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -17,6 +17,7 @@ import sys
import threading
import time
import types
+import random
try:
import queue
except ImportError:
@@ -45,6 +46,9 @@ HANDLER_THREADS = 1
# Flag indicating if the SSL library is available for use.
SSL_SUPPORT = True
+# Maximum time to delay between connection attempts is one hour.
+RECONNECT_MAX_DELAY = 3600
+
log = logging.getLogger(__name__)
@@ -104,7 +108,11 @@ class XMLStream(object):
use_ssl -- Flag indicating if SSL should be used.
use_tls -- Flag indicating if TLS should be used.
stop -- threading Event used to stop all threads.
- auto_reconnect-- Flag to determine whether we auto reconnect.
+
+ auto_reconnect -- Flag to determine whether we auto reconnect.
+ reconnect_max_delay -- Maximum time to delay between connection
+ attempts. Defaults to RECONNECT_MAX_DELAY,
+ which is one hour.
Methods:
add_event_handler -- Add a handler for a custom event.
@@ -155,6 +163,8 @@ class XMLStream(object):
self.ca_certs = None
self.response_timeout = RESPONSE_TIMEOUT
+ self.reconnect_delay = None
+ self.reconnect_max_delay = RECONNECT_MAX_DELAY
self.state = StateMachine(('disconnected', 'connected'))
self.state._set_state('disconnected')
@@ -178,6 +188,8 @@ class XMLStream(object):
self.stop = threading.Event()
self.stream_end_event = threading.Event()
self.stream_end_event.set()
+ self.session_started_event = threading.Event()
+
self.event_queue = queue.Queue()
self.send_queue = queue.Queue()
self.scheduler = Scheduler(self.event_queue, self.stop)
@@ -300,6 +312,15 @@ class XMLStream(object):
self.stop.clear()
self.socket = self.socket_class(Socket.AF_INET, Socket.SOCK_STREAM)
self.socket.settimeout(None)
+
+ if self.reconnect_delay is None:
+ delay = 1.0
+ else:
+ delay = min(self.reconnect_delay * 2, self.reconnect_max_delay)
+ delay = random.normalvariate(delay, delay * 0.1)
+ log.debug('Waiting %s seconds before connecting.' % delay)
+ time.sleep(delay)
+
if self.use_ssl and self.ssl_support:
log.debug("Socket Wrapped for SSL")
if self.ca_certs is None:
@@ -309,7 +330,7 @@ class XMLStream(object):
ssl_socket = ssl.wrap_socket(self.socket,
ca_certs=self.ca_certs,
- certs_reqs=cert_policy)
+ cert_reqs=cert_policy)
if hasattr(self.socket, 'socket'):
# We are using a testing socket, so preserve the top
@@ -324,13 +345,14 @@ class XMLStream(object):
self.set_socket(self.socket, ignore=True)
#this event is where you should set your application state
self.event("connected", direct=True)
+ self.reconnect_delay = 1.0
return True
except Socket.error as serr:
error_msg = "Could not connect to %s:%s. Socket Error #%s: %s"
self.event('socket_error', serr)
log.error(error_msg % (self.address[0], self.address[1],
serr.errno, serr.strerror))
- time.sleep(1)
+ self.reconnect_delay = delay
return False
def disconnect(self, reconnect=False):
@@ -350,7 +372,8 @@ class XMLStream(object):
def _disconnect(self, reconnect=False):
# Send the end of stream marker.
- self.send_raw(self.stream_footer)
+ self.send_raw(self.stream_footer, now=True)
+ self.session_started_event.clear()
# Wait for confirmation that the stream was
# closed in the other direction.
self.auto_reconnect = reconnect
@@ -643,7 +666,7 @@ class XMLStream(object):
"""
return xml
- def send(self, data, mask=None, timeout=None):
+ def send(self, data, mask=None, timeout=None, now=False):
"""
A wrapper for send_raw for sending stanza objects.
@@ -657,10 +680,13 @@ class XMLStream(object):
or a timeout occurs.
timeout -- Time in seconds to wait for a response before
continuing. Defaults to RESPONSE_TIMEOUT.
+ now -- Indicates if the send queue should be skipped,
+ sending the stanza immediately. Useful mainly
+ for stream initialization stanzas.
+ Defaults to False.
"""
if timeout is None:
timeout = self.response_timeout
-
if hasattr(mask, 'xml'):
mask = mask.xml
data = str(data)
@@ -669,21 +695,11 @@ class XMLStream(object):
wait_for = Waiter("SendWait_%s" % self.new_id(),
MatchXMLMask(mask))
self.register_handler(wait_for)
- self.send_raw(data)
+ self.send_raw(data, now)
if mask is not None:
return wait_for.wait(timeout)
- def send_raw(self, data):
- """
- Send raw data across the stream.
-
- Arguments:
- data -- Any string value.
- """
- self.send_queue.put(data)
- return True
-
- def send_xml(self, data, mask=None, timeout=None):
+ def send_xml(self, data, mask=None, timeout=None, now=False):
"""
Send an XML object on the stream, and optionally wait
for a response.
@@ -696,10 +712,39 @@ class XMLStream(object):
or a timeout occurs.
timeout -- Time in seconds to wait for a response before
continuing. Defaults to RESPONSE_TIMEOUT.
+ now -- Indicates if the send queue should be skipped,
+ sending the stanza immediately. Useful mainly
+ for stream initialization stanzas.
+ Defaults to False.
"""
if timeout is None:
timeout = self.response_timeout
- return self.send(tostring(data), mask, timeout)
+ return self.send(tostring(data), mask, timeout, now)
+
+ def send_raw(self, data, now=False, reconnect=None):
+ """
+ Send raw data across the stream.
+
+ Arguments:
+ data -- Any string value.
+ reconnect -- Indicates if the stream should be
+ restarted if there is an error sending
+ the stanza. Used mainly for testing.
+ Defaults to self.auto_reconnect.
+ """
+ if now:
+ log.debug("SEND (IMMED): %s" % data)
+ try:
+ self.socket.send(data.encode('utf-8'))
+ except Socket.error as serr:
+ self.event('socket_error', serr)
+ log.warning("Failed to send %s" % data)
+ if reconnect is None:
+ reconnect = self.auto_reconnect
+ self.disconnect(reconnect)
+ else:
+ self.send_queue.put(data)
+ return True
def process(self, threaded=True):
"""
@@ -753,7 +798,7 @@ class XMLStream(object):
firstrun = False
try:
if self.is_client:
- self.send_raw(self.stream_header)
+ self.send_raw(self.stream_header, now=True)
# The call to self.__read_xml will block and prevent
# the body of the loop from running until a disconnect
# occurs. After any reconnection, the stream header will
@@ -762,7 +807,7 @@ class XMLStream(object):
# Ensure the stream header is sent for any
# new connections.
if self.is_client:
- self.send_raw(self.stream_header)
+ self.send_raw(self.stream_header, now=True)
except KeyboardInterrupt:
log.debug("Keyboard Escape Detected in _process")
self.stop.set()
@@ -790,35 +835,39 @@ class XMLStream(object):
"""
depth = 0
root = None
- for (event, xml) in ET.iterparse(self.filesocket, (b'end', b'start')):
- if event == b'start':
- if depth == 0:
- # We have received the start of the root element.
- root = xml
- # Perform any stream initialization actions, such
- # as handshakes.
- self.stream_end_event.clear()
- self.start_stream_handler(root)
- depth += 1
- if event == b'end':
- depth -= 1
- if depth == 0:
- # The stream's root element has closed,
- # terminating the stream.
- log.debug("End of stream recieved")
- self.stream_end_event.set()
- return False
- elif depth == 1:
- # We only raise events for stanzas that are direct
- # children of the root element.
- try:
- self.__spawn_event(xml)
- except RestartStream:
- return True
- if root:
- # Keep the root element empty of children to
- # save on memory use.
- root.clear()
+ try:
+ for (event, xml) in ET.iterparse(self.filesocket,
+ (b'end', b'start')):
+ if event == b'start':
+ if depth == 0:
+ # We have received the start of the root element.
+ root = xml
+ # Perform any stream initialization actions, such
+ # as handshakes.
+ self.stream_end_event.clear()
+ self.start_stream_handler(root)
+ depth += 1
+ if event == b'end':
+ depth -= 1
+ if depth == 0:
+ # The stream's root element has closed,
+ # terminating the stream.
+ log.debug("End of stream recieved")
+ self.stream_end_event.set()
+ return False
+ elif depth == 1:
+ # We only raise events for stanzas that are direct
+ # children of the root element.
+ try:
+ self.__spawn_event(xml)
+ except RestartStream:
+ return True
+ if root:
+ # Keep the root element empty of children to
+ # save on memory use.
+ root.clear()
+ except SyntaxError:
+ log.error("Error reading from XML stream.")
log.debug("Ending read XML loop")
def _build_stanza(self, xml, default_ns=None):
@@ -971,6 +1020,7 @@ class XMLStream(object):
"""
try:
while not self.stop.isSet():
+ self.session_started_event.wait()
try:
data = self.send_queue.get(True, 1)
except queue.Empty:
@@ -978,7 +1028,8 @@ class XMLStream(object):
log.debug("SEND: %s" % data)
try:
self.socket.send(data.encode('utf-8'))
- except:
+ except Socket.error as serr:
+ self.event('socket_error', serr)
log.warning("Failed to send %s" % data)
self.disconnect(self.auto_reconnect)
except KeyboardInterrupt: