summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sleekxmpp/xmlstream/stanzabase.py2
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py44
-rw-r--r--tests/test_stream_filters.py88
3 files changed, 130 insertions, 4 deletions
diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py
index 389fe20c..2f864300 100644
--- a/sleekxmpp/xmlstream/stanzabase.py
+++ b/sleekxmpp/xmlstream/stanzabase.py
@@ -1251,7 +1251,7 @@ class StanzaBase(ElementBase):
stanza sent immediately. Useful for stream
initialization. Defaults to ``False``.
"""
- self.stream.send_raw(self.__str__(), now=now)
+ self.stream.send(self, now=now)
def __copy__(self):
"""Return a copy of the stanza object that does not share the
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index fb9f91bc..d5029928 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -35,7 +35,7 @@ except ImportError:
import sleekxmpp
from sleekxmpp.thirdparty.statemachine import StateMachine
from sleekxmpp.xmlstream import Scheduler, tostring
-from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET
+from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET, ElementBase
from sleekxmpp.xmlstream.handler import Waiter, XMLCallback
from sleekxmpp.xmlstream.matcher import MatchXMLMask
@@ -268,6 +268,7 @@ class XMLStream(object):
self.__handlers = []
self.__event_handlers = {}
self.__event_handlers_lock = threading.Lock()
+ self.__filters = {'in': [], 'out': []}
self._id = 0
self._id_lock = threading.Lock()
@@ -743,6 +744,28 @@ class XMLStream(object):
"""
del self.__root_stanza[stanza_class]
+ def add_filter(self, mode, handler, order=None):
+ """Add a filter for incoming or outgoing stanzas.
+
+ These filters are applied before incoming stanzas are
+ passed to any handlers, and before outgoing stanzas
+ are put in the send queue.
+
+ Each filter must accept a single stanza, and return
+ either a stanza or ``None``. If the filter returns
+ ``None``, then the stanza will be dropped from being
+ processed for events or from being sent.
+
+ :param mode: One of ``'in'`` or ``'out'``.
+ :param handler: The filter function.
+ :param int order: The position to insert the filter in
+ the list of active filters.
+ """
+ if order:
+ self.__filters[mode].insert(order, handler)
+ else:
+ self.__filters[mode].append(handler)
+
def add_handler(self, mask, pointer, name=None, disposable=False,
threaded=False, filter=False, instream=False):
"""A shortcut method for registering a handler using XML masks.
@@ -994,6 +1017,14 @@ class XMLStream(object):
timeout = self.response_timeout
if hasattr(mask, 'xml'):
mask = mask.xml
+
+ if isinstance(data, ElementBase):
+ for filter in self.__filters['out']:
+ if data is not None:
+ data = filter(data)
+ if data is None:
+ return
+
data = str(data)
if mask is not None:
log.warning("Use of send mask waiters is deprecated.")
@@ -1246,8 +1277,6 @@ class XMLStream(object):
:param xml: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase`
stanza to analyze.
"""
- log.debug("RECV: %s", tostring(xml, xmlns=self.default_ns,
- stream=self))
# Apply any preprocessing filters.
xml = self.incoming_filter(xml)
@@ -1255,6 +1284,15 @@ class XMLStream(object):
# stanza type applies, a generic StanzaBase stanza will be used.
stanza = self._build_stanza(xml)
+ for filter in self.__filters['in']:
+ if stanza is not None:
+ stanza = filter(stanza)
+ if stanza is None:
+ return
+
+ log.debug("RECV: %s", tostring(xml, xmlns=self.default_ns,
+ stream=self))
+
# Match the stanza against registered handlers. Handlers marked
# to run "in stream" will be executed immediately; the rest will
# be queued.
diff --git a/tests/test_stream_filters.py b/tests/test_stream_filters.py
new file mode 100644
index 00000000..ef4d5dc8
--- /dev/null
+++ b/tests/test_stream_filters.py
@@ -0,0 +1,88 @@
+import time
+
+from sleekxmpp import Message
+from sleekxmpp.test import *
+from sleekxmpp.xmlstream.handler import *
+from sleekxmpp.xmlstream.matcher import *
+
+
+class TestFilters(SleekTest):
+
+ """
+ Test using incoming and outgoing filters.
+ """
+
+ def setUp(self):
+ self.stream_start()
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testIncoming(self):
+
+ data = []
+
+ def in_filter(stanza):
+ if isinstance(stanza, Message):
+ if stanza['body'] == 'testing':
+ stanza['subject'] = stanza['body'] + ' filter'
+ print('>>> %s' % stanza['subject'])
+ return stanza
+
+ def on_message(msg):
+ print('<<< %s' % msg['subject'])
+ data.append(msg['subject'])
+
+ self.xmpp.add_filter('in', in_filter)
+ self.xmpp.add_event_handler('message', on_message)
+
+ self.recv("""
+ <message>
+ <body>no filter</body>
+ </message>
+ """)
+
+ self.recv("""
+ <message>
+ <body>testing</body>
+ </message>
+ """)
+
+ time.sleep(0.5)
+
+ self.assertEqual(data, ['', 'testing filter'],
+ 'Incoming filter did not apply %s' % data)
+
+ def testOutgoing(self):
+
+ def out_filter(stanza):
+ if isinstance(stanza, Message):
+ if stanza['body'] == 'testing':
+ stanza['body'] = 'changed!'
+ return stanza
+
+ self.xmpp.add_filter('out', out_filter)
+
+ m1 = self.Message()
+ m1['body'] = 'testing'
+ m1.send()
+
+ m2 = self.Message()
+ m2['body'] = 'blah'
+ m2.send()
+
+ self.send("""
+ <message>
+ <body>changed!</body>
+ </message>
+ """)
+
+ self.send("""
+ <message>
+ <body>blah</body>
+ </message>
+ """)
+
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestFilters)