summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream')
-rw-r--r--sleekxmpp/xmlstream/stanzabase.py10
-rw-r--r--sleekxmpp/xmlstream/tostring.py (renamed from sleekxmpp/xmlstream/tostring/tostring.py)31
-rw-r--r--sleekxmpp/xmlstream/tostring/__init__.py19
-rw-r--r--sleekxmpp/xmlstream/tostring/tostring26.py110
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py157
5 files changed, 154 insertions, 173 deletions
diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py
index d9a4636a..a2826ead 100644
--- a/sleekxmpp/xmlstream/stanzabase.py
+++ b/sleekxmpp/xmlstream/stanzabase.py
@@ -482,7 +482,8 @@ class ElementBase(object):
if plugin:
if plugin not in self.plugins:
self.init_plugin(plugin)
- handler = getattr(self.plugins[plugin], set_method, None)
+ handler = getattr(self.plugins[plugin],
+ set_method, None)
if handler:
return handler(value)
@@ -1064,7 +1065,9 @@ class ElementBase(object):
Defaults to True.
"""
stanza_ns = '' if top_level_ns else self.namespace
- return tostring(self.xml, xmlns='', stanza_ns=stanza_ns)
+ return tostring(self.xml, xmlns='',
+ stanza_ns=stanza_ns,
+ top_level=not top_level_ns)
def __repr__(self):
"""
@@ -1282,7 +1285,8 @@ class StanzaBase(ElementBase):
stanza_ns = '' if top_level_ns else self.namespace
return tostring(self.xml, xmlns='',
stanza_ns=stanza_ns,
- stream=self.stream)
+ stream=self.stream,
+ top_level=not top_level_ns)
# To comply with PEP8, method names now use underscores.
diff --git a/sleekxmpp/xmlstream/tostring/tostring.py b/sleekxmpp/xmlstream/tostring.py
index 38b08d82..f9674b15 100644
--- a/sleekxmpp/xmlstream/tostring/tostring.py
+++ b/sleekxmpp/xmlstream/tostring.py
@@ -6,8 +6,14 @@
See the file LICENSE for copying permission.
"""
+import sys
-def tostring(xml=None, xmlns='', stanza_ns='', stream=None, outbuffer=''):
+if sys.version_info < (3, 0):
+ import types
+
+
+def tostring(xml=None, xmlns='', stanza_ns='', stream=None,
+ outbuffer='', top_level=False):
"""
Serialize an XML object to a Unicode string.
@@ -26,6 +32,8 @@ def tostring(xml=None, xmlns='', stanza_ns='', stream=None, outbuffer=''):
stream -- The XML stream that generated the XML object.
outbuffer -- Optional buffer for storing serializations during
recursive calls.
+ top_level -- Indicates that the element is the outermost
+ element.
"""
# Add previous results to the start of the output.
output = [outbuffer]
@@ -39,14 +47,21 @@ def tostring(xml=None, xmlns='', stanza_ns='', stream=None, outbuffer=''):
else:
tag_xmlns = ''
+ default_ns = ''
+ stream_ns = ''
+ if stream:
+ default_ns = stream.default_ns
+ stream_ns = stream.stream_ns
+
# Output the tag name and derived namespace of the element.
namespace = ''
- if tag_xmlns not in ['', xmlns, stanza_ns]:
+ if top_level and tag_xmlns not in ['', default_ns, stream_ns] or \
+ tag_xmlns not in ['', xmlns, stanza_ns, stream_ns]:
namespace = ' xmlns="%s"' % tag_xmlns
- if stream and tag_xmlns in stream.namespace_map:
- mapped_namespace = stream.namespace_map[tag_xmlns]
- if mapped_namespace:
- tag_name = "%s:%s" % (mapped_namespace, tag_name)
+ if stream and tag_xmlns in stream.namespace_map:
+ mapped_namespace = stream.namespace_map[tag_xmlns]
+ if mapped_namespace:
+ tag_name = "%s:%s" % (mapped_namespace, tag_name)
output.append("<%s" % tag_name)
output.append(namespace)
@@ -93,6 +108,10 @@ def xml_escape(text):
Arguments:
text -- The XML text to convert.
"""
+ if sys.version_info < (3, 0):
+ if type(text) != types.UnicodeType:
+ text = unicode(text, 'utf-8', 'ignore')
+
text = list(text)
escapes = {'&': '&amp;',
'<': '&lt;',
diff --git a/sleekxmpp/xmlstream/tostring/__init__.py b/sleekxmpp/xmlstream/tostring/__init__.py
deleted file mode 100644
index 5852cba2..00000000
--- a/sleekxmpp/xmlstream/tostring/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-"""
- SleekXMPP: The Sleek XMPP Library
- Copyright (C) 2010 Nathanael C. Fritz
- This file is part of SleekXMPP.
-
- See the file LICENSE for copying permission.
-"""
-
-import sys
-
-# Import the correct tostring and xml_escape functions based on the Python
-# version in order to properly handle Unicode.
-
-if sys.version_info < (3, 0):
- from sleekxmpp.xmlstream.tostring.tostring26 import tostring, xml_escape
-else:
- from sleekxmpp.xmlstream.tostring.tostring import tostring, xml_escape
-
-__all__ = ['tostring', 'xml_escape']
diff --git a/sleekxmpp/xmlstream/tostring/tostring26.py b/sleekxmpp/xmlstream/tostring/tostring26.py
deleted file mode 100644
index 11501780..00000000
--- a/sleekxmpp/xmlstream/tostring/tostring26.py
+++ /dev/null
@@ -1,110 +0,0 @@
-"""
- SleekXMPP: The Sleek XMPP Library
- Copyright (C) 2010 Nathanael C. Fritz
- This file is part of SleekXMPP.
-
- See the file LICENSE for copying permission.
-"""
-
-from __future__ import unicode_literals
-import types
-
-
-def tostring(xml=None, xmlns='', stanza_ns='', stream=None, outbuffer=''):
- """
- Serialize an XML object to a Unicode string.
-
- If namespaces are provided using xmlns or stanza_ns, then elements
- that use those namespaces will not include the xmlns attribute in
- the output.
-
- Arguments:
- xml -- The XML object to serialize. If the value is None,
- then the XML object contained in this stanza
- object will be used.
- xmlns -- Optional namespace of an element wrapping the XML
- object.
- stanza_ns -- The namespace of the stanza object that contains
- the XML object.
- stream -- The XML stream that generated the XML object.
- outbuffer -- Optional buffer for storing serializations during
- recursive calls.
- """
- # Add previous results to the start of the output.
- output = [outbuffer]
-
- # Extract the element's tag name.
- tag_name = xml.tag.split('}', 1)[-1]
-
- # Extract the element's namespace if it is defined.
- if '}' in xml.tag:
- tag_xmlns = xml.tag.split('}', 1)[0][1:]
- else:
- tag_xmlns = u''
-
- # Output the tag name and derived namespace of the element.
- namespace = u''
- if tag_xmlns not in ['', xmlns, stanza_ns]:
- namespace = u' xmlns="%s"' % tag_xmlns
- if stream and tag_xmlns in stream.namespace_map:
- mapped_namespace = stream.namespace_map[tag_xmlns]
- if mapped_namespace:
- tag_name = u"%s:%s" % (mapped_namespace, tag_name)
- output.append(u"<%s" % tag_name)
- output.append(namespace)
-
- # Output escaped attribute values.
- for attrib, value in xml.attrib.items():
- value = xml_escape(value)
- if '}' not in attrib:
- output.append(' %s="%s"' % (attrib, value))
- else:
- attrib_ns = attrib.split('}')[0][1:]
- attrib = attrib.split('}')[1]
- if stream and attrib_ns in stream.namespace_map:
- mapped_ns = stream.namespace_map[attrib_ns]
- if mapped_ns:
- output.append(' %s:%s="%s"' % (mapped_ns,
- attrib,
- value))
-
- if len(xml) or xml.text:
- # If there are additional child elements to serialize.
- output.append(u">")
- if xml.text:
- output.append(xml_escape(xml.text))
- if len(xml):
- for child in xml.getchildren():
- output.append(tostring(child, tag_xmlns, stanza_ns, stream))
- output.append(u"</%s>" % tag_name)
- elif xml.text:
- # If we only have text content.
- output.append(u">%s</%s>" % (xml_escape(xml.text), tag_name))
- else:
- # Empty element.
- output.append(u" />")
- if xml.tail:
- # If there is additional text after the element.
- output.append(xml_escape(xml.tail))
- return u''.join(output)
-
-
-def xml_escape(text):
- """
- Convert special characters in XML to escape sequences.
-
- Arguments:
- text -- The XML text to convert.
- """
- if type(text) != types.UnicodeType:
- text = list(unicode(text, 'utf-8', 'ignore'))
- else:
- text = list(text)
- escapes = {u'&': u'&amp;',
- u'<': u'&lt;',
- u'>': u'&gt;',
- u"'": u'&apos;',
- u'"': u'&quot;'}
- for i, c in enumerate(text):
- text[i] = escapes.get(c, c)
- return u''.join(text)
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 5bc71f04..5ba4269f 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -8,6 +8,7 @@
from __future__ import with_statement, unicode_literals
+import base64
import copy
import logging
import signal
@@ -23,6 +24,7 @@ try:
except ImportError:
import Queue as queue
+import sleekxmpp
from sleekxmpp.thirdparty.statemachine import StateMachine
from sleekxmpp.xmlstream import Scheduler, tostring
from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET
@@ -107,7 +109,13 @@ class XMLStream(object):
stream_header -- The closing tag of the stream's root element.
use_ssl -- Flag indicating if SSL should be used.
use_tls -- Flag indicating if TLS should be used.
+ use_proxy -- Flag indicating that an HTTP Proxy should be used.
stop -- threading Event used to stop all threads.
+ proxy_config -- An optional dictionary with the following entries:
+ host -- The host offering proxy services.
+ port -- The port for the proxy service.
+ username -- Optional username for the proxy.
+ password -- Optional password for the proxy.
auto_reconnect -- Flag to determine whether we auto reconnect.
reconnect_max_delay -- Maximum time to delay between connection
@@ -180,6 +188,9 @@ class XMLStream(object):
self.use_ssl = False
self.use_tls = False
+ self.use_proxy = False
+
+ self.proxy_config = {}
self.default_ns = ''
self.stream_header = "<stream>"
@@ -322,6 +333,12 @@ class XMLStream(object):
log.debug('Waiting %s seconds before connecting.' % delay)
time.sleep(delay)
+ if self.use_proxy:
+ connected = self._connect_proxy()
+ if not connected:
+ self.reconnect_delay = delay
+ return False
+
if self.use_ssl and self.ssl_support:
log.debug("Socket Wrapped for SSL")
if self.ca_certs is None:
@@ -341,8 +358,10 @@ class XMLStream(object):
self.socket = ssl_socket
try:
- log.debug("Connecting to %s:%s" % self.address)
- self.socket.connect(self.address)
+ if not self.use_proxy:
+ log.debug("Connecting to %s:%s" % self.address)
+ self.socket.connect(self.address)
+
self.set_socket(self.socket, ignore=True)
#this event is where you should set your application state
self.event("connected", direct=True)
@@ -356,22 +375,86 @@ class XMLStream(object):
self.reconnect_delay = delay
return False
- def disconnect(self, reconnect=False):
+ def _connect_proxy(self):
+ """Attempt to connect using an HTTP Proxy."""
+
+ # Extract the proxy address, and optional credentials
+ address = (self.proxy_config['host'], int(self.proxy_config['port']))
+ cred = None
+ if self.proxy_config['username']:
+ username = self.proxy_config['username']
+ password = self.proxy_config['password']
+
+ cred = '%s:%s' % (username, password)
+ if sys.version_info < (3, 0):
+ cred = bytes(cred)
+ else:
+ cred = bytes(cred, 'utf-8')
+ cred = base64.b64encode(cred).decode('utf-8')
+
+ # Build the HTTP headers for connecting to the XMPP server
+ headers = ['CONNECT %s:%s HTTP/1.0' % self.address,
+ 'Host: %s:%s' % self.address,
+ 'Proxy-Connection: Keep-Alive',
+ 'Pragma: no-cache',
+ 'User-Agent: SleekXMPP/%s' % sleekxmpp.__version__]
+ if cred:
+ headers.append('Proxy-Authorization: Basic %s' % cred)
+ headers = '\r\n'.join(headers) + '\r\n\r\n'
+
+ try:
+ log.debug("Connecting to proxy: %s:%s" % address)
+ self.socket.connect(address)
+ self.send_raw(headers, now=True)
+ resp = ''
+ while '\r\n\r\n' not in resp:
+ resp += self.socket.recv(1024).decode('utf-8')
+ log.debug('RECV: %s' % resp)
+
+ lines = resp.split('\r\n')
+ if '200' not in lines[0]:
+ self.event('proxy_error', resp)
+ log.error('Proxy Error: %s' % lines[0])
+ return False
+
+ # Proxy connection established, continue connecting
+ # with the XMPP server.
+ return True
+ except Socket.error as serr:
+ error_msg = "Could not connect to %s:%s. Socket Error #%s: %s"
+ self.event('socket_error', serr)
+ log.error(error_msg % (self.address[0], self.address[1],
+ serr.errno, serr.strerror))
+ return False
+
+ def disconnect(self, reconnect=False, wait=False):
"""
Terminate processing and close the XML streams.
Optionally, the connection may be reconnected and
resume processing afterwards.
+ If the disconnect should take place after all items
+ in the send queue have been sent, use wait=True. However,
+ take note: If you are constantly adding items to the queue
+ such that it is never empty, then the disconnect will
+ not occur and the call will continue to block.
+
Arguments:
reconnect -- Flag indicating if the connection
and processing should be restarted.
Defaults to False.
+ wait -- Flag indicating if the send queue should
+ be emptied before disconnecting.
"""
self.state.transition('connected', 'disconnected', wait=0.0,
- func=self._disconnect, args=(reconnect,))
+ func=self._disconnect, args=(reconnect, wait))
+
+ def _disconnect(self, reconnect=False, wait=False):
+ # Wait for the send queue to empty.
+ if wait:
+ self.send_queue.join()
- def _disconnect(self, reconnect=False):
# Send the end of stream marker.
self.send_raw(self.stream_footer, now=True)
self.session_started_event.clear()
@@ -748,7 +831,7 @@ class XMLStream(object):
self.send_queue.put(data)
return True
- def process(self, threaded=True):
+ def process(self, **kwargs):
"""
Initialize the XML streams and begin processing events.
@@ -756,15 +839,29 @@ class XMLStream(object):
by HANDLER_THREADS.
Arguments:
+ block -- If block=False then event dispatcher will run
+ in a separate thread, allowing for the stream to be
+ used in the background for another application.
+ Otherwise, process(block=True) blocks the current thread.
+ Defaults to False.
+
+ **threaded is deprecated and included for API compatibility**
threaded -- If threaded=True then event dispatcher will run
in a separate thread, allowing for the stream to be
used in the background for another application.
Defaults to True.
- Event handlers and the send queue will be threaded
- regardless of this parameter's value.
+ Event handlers and the send queue will be threaded
+ regardless of these parameters.
"""
- self._thread_excepthook()
+ if 'threaded' in kwargs and 'block' in kwargs:
+ raise ValueError("process() called with both " + \
+ "block and threaded arguments")
+ elif 'block' in kwargs:
+ threaded = not(kwargs.get('block', False))
+ else:
+ threaded = kwargs.get('threaded', True)
+
self.scheduler.process(threaded=True)
def start_thread(name, target):
@@ -944,13 +1041,14 @@ class XMLStream(object):
func -- The event handler to execute.
args -- Arguments to the event handler.
"""
+ orig = copy.copy(args[0])
try:
func(*args)
except Exception as e:
error_msg = 'Error processing event handler: %s'
log.exception(error_msg % str(func))
- if hasattr(args[0], 'exception'):
- args[0].exception(e)
+ if hasattr(orig, 'exception'):
+ orig.exception(e)
def _event_runner(self):
"""
@@ -973,6 +1071,7 @@ class XMLStream(object):
etype, handler = event[0:2]
args = event[2:]
+ orig = copy.copy(args[0])
if etype == 'stanza':
try:
@@ -980,7 +1079,7 @@ class XMLStream(object):
except Exception as e:
error_msg = 'Error processing stream handler: %s'
log.exception(error_msg % handler.name)
- args[0].exception(e)
+ orig.exception(e)
elif etype == 'schedule':
try:
log.debug('Scheduled event: %s' % args)
@@ -989,6 +1088,7 @@ class XMLStream(object):
log.exception('Error processing scheduled task')
elif etype == 'event':
func, threaded, disposable = handler
+ orig = copy.copy(args[0])
try:
if threaded:
x = threading.Thread(
@@ -1001,8 +1101,8 @@ class XMLStream(object):
except Exception as e:
error_msg = 'Error processing event handler: %s'
log.exception(error_msg % str(func))
- if hasattr(args[0], 'exception'):
- args[0].exception(e)
+ if hasattr(orig, 'exception'):
+ orig.exception(e)
elif etype == 'quit':
log.debug("Quitting event runner thread")
return False
@@ -1034,6 +1134,7 @@ class XMLStream(object):
log.debug("SEND: %s" % data)
try:
self.socket.send(data.encode('utf-8'))
+ self.send_queue.task_done()
except Socket.error as serr:
self.event('socket_error', serr)
log.warning("Failed to send %s" % data)
@@ -1049,30 +1150,16 @@ class XMLStream(object):
self.event_queue.put(('quit', None, None))
return
- def _thread_excepthook(self):
+ def exception(self, exception):
"""
- If a threaded event handler raises an exception, there is no way to
- catch it except with an excepthook. Currently, each thread has its own
- excepthook, but ideally we could use the main sys.excepthook.
+ Process an unknown exception.
- Modifies threading.Thread to use sys.excepthook when an exception
- is not caught.
- """
- init_old = threading.Thread.__init__
-
- def init(self, *args, **kwargs):
- init_old(self, *args, **kwargs)
- run_old = self.run
+ Meant to be overridden.
- def run_with_except_hook(*args, **kw):
- try:
- run_old(*args, **kw)
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- sys.excepthook(*sys.exc_info())
- self.run = run_with_except_hook
- threading.Thread.__init__ = init
+ Arguments:
+ exception -- An unhandled exception object.
+ """
+ pass
# To comply with PEP8, method names now use underscores.