diff options
-rw-r--r-- | sleekxmpp/xmlstream/stanzabase.py | 2 | ||||
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 44 | ||||
-rw-r--r-- | tests/test_stream_filters.py | 88 |
3 files changed, 130 insertions, 4 deletions
diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py index 389fe20c..2f864300 100644 --- a/sleekxmpp/xmlstream/stanzabase.py +++ b/sleekxmpp/xmlstream/stanzabase.py @@ -1251,7 +1251,7 @@ class StanzaBase(ElementBase): stanza sent immediately. Useful for stream initialization. Defaults to ``False``. """ - self.stream.send_raw(self.__str__(), now=now) + self.stream.send(self, now=now) def __copy__(self): """Return a copy of the stanza object that does not share the diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index fb9f91bc..d5029928 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -35,7 +35,7 @@ except ImportError: import sleekxmpp from sleekxmpp.thirdparty.statemachine import StateMachine from sleekxmpp.xmlstream import Scheduler, tostring -from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET +from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET, ElementBase from sleekxmpp.xmlstream.handler import Waiter, XMLCallback from sleekxmpp.xmlstream.matcher import MatchXMLMask @@ -268,6 +268,7 @@ class XMLStream(object): self.__handlers = [] self.__event_handlers = {} self.__event_handlers_lock = threading.Lock() + self.__filters = {'in': [], 'out': []} self._id = 0 self._id_lock = threading.Lock() @@ -743,6 +744,28 @@ class XMLStream(object): """ del self.__root_stanza[stanza_class] + def add_filter(self, mode, handler, order=None): + """Add a filter for incoming or outgoing stanzas. + + These filters are applied before incoming stanzas are + passed to any handlers, and before outgoing stanzas + are put in the send queue. + + Each filter must accept a single stanza, and return + either a stanza or ``None``. If the filter returns + ``None``, then the stanza will be dropped from being + processed for events or from being sent. + + :param mode: One of ``'in'`` or ``'out'``. + :param handler: The filter function. + :param int order: The position to insert the filter in + the list of active filters. + """ + if order: + self.__filters[mode].insert(order, handler) + else: + self.__filters[mode].append(handler) + def add_handler(self, mask, pointer, name=None, disposable=False, threaded=False, filter=False, instream=False): """A shortcut method for registering a handler using XML masks. @@ -994,6 +1017,14 @@ class XMLStream(object): timeout = self.response_timeout if hasattr(mask, 'xml'): mask = mask.xml + + if isinstance(data, ElementBase): + for filter in self.__filters['out']: + if data is not None: + data = filter(data) + if data is None: + return + data = str(data) if mask is not None: log.warning("Use of send mask waiters is deprecated.") @@ -1246,8 +1277,6 @@ class XMLStream(object): :param xml: The :class:`~sleekxmpp.xmlstream.stanzabase.ElementBase` stanza to analyze. """ - log.debug("RECV: %s", tostring(xml, xmlns=self.default_ns, - stream=self)) # Apply any preprocessing filters. xml = self.incoming_filter(xml) @@ -1255,6 +1284,15 @@ class XMLStream(object): # stanza type applies, a generic StanzaBase stanza will be used. stanza = self._build_stanza(xml) + for filter in self.__filters['in']: + if stanza is not None: + stanza = filter(stanza) + if stanza is None: + return + + log.debug("RECV: %s", tostring(xml, xmlns=self.default_ns, + stream=self)) + # Match the stanza against registered handlers. Handlers marked # to run "in stream" will be executed immediately; the rest will # be queued. diff --git a/tests/test_stream_filters.py b/tests/test_stream_filters.py new file mode 100644 index 00000000..ef4d5dc8 --- /dev/null +++ b/tests/test_stream_filters.py @@ -0,0 +1,88 @@ +import time + +from sleekxmpp import Message +from sleekxmpp.test import * +from sleekxmpp.xmlstream.handler import * +from sleekxmpp.xmlstream.matcher import * + + +class TestFilters(SleekTest): + + """ + Test using incoming and outgoing filters. + """ + + def setUp(self): + self.stream_start() + + def tearDown(self): + self.stream_close() + + def testIncoming(self): + + data = [] + + def in_filter(stanza): + if isinstance(stanza, Message): + if stanza['body'] == 'testing': + stanza['subject'] = stanza['body'] + ' filter' + print('>>> %s' % stanza['subject']) + return stanza + + def on_message(msg): + print('<<< %s' % msg['subject']) + data.append(msg['subject']) + + self.xmpp.add_filter('in', in_filter) + self.xmpp.add_event_handler('message', on_message) + + self.recv(""" + <message> + <body>no filter</body> + </message> + """) + + self.recv(""" + <message> + <body>testing</body> + </message> + """) + + time.sleep(0.5) + + self.assertEqual(data, ['', 'testing filter'], + 'Incoming filter did not apply %s' % data) + + def testOutgoing(self): + + def out_filter(stanza): + if isinstance(stanza, Message): + if stanza['body'] == 'testing': + stanza['body'] = 'changed!' + return stanza + + self.xmpp.add_filter('out', out_filter) + + m1 = self.Message() + m1['body'] = 'testing' + m1.send() + + m2 = self.Message() + m2['body'] = 'blah' + m2.send() + + self.send(""" + <message> + <body>changed!</body> + </message> + """) + + self.send(""" + <message> + <body>blah</body> + </message> + """) + + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestFilters) |