diff options
-rw-r--r-- | slixmpp/plugins/xep_0461/__init__.py | 6 | ||||
-rw-r--r-- | slixmpp/plugins/xep_0461/reply.py | 48 | ||||
-rw-r--r-- | slixmpp/plugins/xep_0461/stanza.py | 47 | ||||
-rw-r--r-- | tests/test_stanza_xep_0461.py | 48 | ||||
-rw-r--r-- | tests/test_stream_xep_0461.py | 48 |
5 files changed, 197 insertions, 0 deletions
diff --git a/slixmpp/plugins/xep_0461/__init__.py b/slixmpp/plugins/xep_0461/__init__.py new file mode 100644 index 00000000..1e9b2829 --- /dev/null +++ b/slixmpp/plugins/xep_0461/__init__.py @@ -0,0 +1,6 @@ +from slixmpp.plugins.base import register_plugin + +from .reply import XEP_0461 +from . import stanza + +register_plugin(XEP_0461) diff --git a/slixmpp/plugins/xep_0461/reply.py b/slixmpp/plugins/xep_0461/reply.py new file mode 100644 index 00000000..6607012a --- /dev/null +++ b/slixmpp/plugins/xep_0461/reply.py @@ -0,0 +1,48 @@ +from slixmpp.plugins import BasePlugin +from slixmpp.types import JidStr +from slixmpp.xmlstream import StanzaBase +from slixmpp.xmlstream.handler import Callback +from slixmpp.xmlstream.matcher import StanzaPath + +from . import stanza + + +class XEP_0461(BasePlugin): + """XEP-0461: Message Replies""" + + name = "xep_0461" + description = "XEP-0461: Message Replies" + + dependencies = {"xep_0030"} + stanza = stanza + namespace = stanza.NS + + def plugin_init(self) -> None: + stanza.register_plugins() + self.xmpp.register_handler( + Callback( + "Message replied to", + StanzaPath("message/reply"), + self._handle_reply_to_message, + ) + ) + + def plugin_end(self): + self.xmpp.plugin["xep_0030"].del_feature(feature=stanza.NS) + + def session_bind(self, jid): + self.xmpp.plugin["xep_0030"].add_feature(feature=stanza.NS) + + def _handle_reply_to_message(self, msg: StanzaBase): + self.xmpp.event("message_reply", msg) + + def send_reply(self, reply_to: JidStr, reply_id: str, **msg_kwargs): + """ + + :param reply_to: Full JID of the quoted author + :param reply_id: ID of the message to reply to + """ + msg = self.xmpp.make_message(**msg_kwargs) + msg["reply"]["to"] = reply_to + msg["reply"]["id"] = reply_id + msg.send() diff --git a/slixmpp/plugins/xep_0461/stanza.py b/slixmpp/plugins/xep_0461/stanza.py new file mode 100644 index 00000000..b99b2745 --- /dev/null +++ b/slixmpp/plugins/xep_0461/stanza.py @@ -0,0 +1,47 @@ +from slixmpp.stanza import Message +from slixmpp.xmlstream import ElementBase, register_stanza_plugin + +NS = "urn:xmpp:reply:0" + + +class Reply(ElementBase): + namespace = NS + name = "reply" + plugin_attrib = "reply" + interfaces = {"id", "to"} + + +class FeatureFallBack(ElementBase): + # should also be a multi attrib + namespace = "urn:xmpp:feature-fallback:0" + name = "fallback" + plugin_attrib = "feature_fallback" + interfaces = {"for"} + + def get_stripped_body(self): + # only works for a single fallback_body attrib + start = self["fallback_body"]["start"] + end = self["fallback_body"]["end"] + body = self.parent()["body"] + try: + start = int(start) + end = int(end) + except ValueError: + return body + else: + return body[:start] + body[end:] + + +class FallBackBody(ElementBase): + # According to https://xmpp.org/extensions/inbox/compatibility-fallback.html + # this should be a multi_attrib *but* since it's a protoXEP, we'll see... + namespace = FeatureFallBack.namespace + name = "body" + plugin_attrib = "fallback_body" + interfaces = {"start", "end"} + + +def register_plugins(): + register_stanza_plugin(Message, Reply) + register_stanza_plugin(Message, FeatureFallBack) + register_stanza_plugin(FeatureFallBack, FallBackBody) diff --git a/tests/test_stanza_xep_0461.py b/tests/test_stanza_xep_0461.py new file mode 100644 index 00000000..b9550481 --- /dev/null +++ b/tests/test_stanza_xep_0461.py @@ -0,0 +1,48 @@ +import unittest +from slixmpp import Message +from slixmpp.test import SlixTest +from slixmpp.plugins.xep_0461 import stanza + + +class TestReply(SlixTest): + def setUp(self): + stanza.register_plugins() + + def testReply(self): + message = Message() + message["reply"]["id"] = "some-id" + message["body"] = "some-body" + + self.check( + message, + """ + <message> + <reply xmlns="urn:xmpp:reply:0" id="some-id" /> + <body>some-body</body> + </message> + """, + ) + + def testFallback(self): + message = Message() + message["body"] = "12345\nrealbody" + message["feature_fallback"]["for"] = "NS" + message["feature_fallback"]["fallback_body"]["start"] = "0" + message["feature_fallback"]["fallback_body"]["end"] = "6" + + self.check( + message, + """ + <message xmlns="jabber:client"> + <body>12345\nrealbody</body> + <fallback xmlns='urn:xmpp:feature-fallback:0' for='NS'> + <body start="0" end="6" /> + </fallback> + </message> + """, + ) + + assert message["feature_fallback"].get_stripped_body() == "realbody" + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestReply) diff --git a/tests/test_stream_xep_0461.py b/tests/test_stream_xep_0461.py new file mode 100644 index 00000000..b73a9964 --- /dev/null +++ b/tests/test_stream_xep_0461.py @@ -0,0 +1,48 @@ +import logging +import unittest +from slixmpp.test import SlixTest + + +class TestReply(SlixTest): + def setUp(self): + self.stream_start(plugins=["xep_0461"]) + + def tearDown(self): + self.stream_close() + + def testFallBackBody(self): + async def on_reply(msg): + start = msg["feature_fallback"]["fallback_body"]["start"] + end = msg["feature_fallback"]["fallback_body"]["end"] + self.xmpp["xep_0461"].send_reply( + reply_to=msg.get_from(), + reply_id=msg.get_id(), + mto="test@test.com", + mbody=f"{start} to {end}", + ) + + self.xmpp.add_event_handler("message_reply", on_reply) + + self.recv( + """ + <message id="other-id" from="from@from.com/res"> + <reply xmlns="urn:xmpp:reply:0" id="some-id" /> + <body>> quoted\nsome-body</body> + <fallback xmlns='urn:xmpp:feature-fallback:0' for='urn:xmpp:reply:0'> + <body start="0" end="8" /> + </fallback> + </message> + """ + ) + self.send( + """ + <message xmlns="jabber:client" to="test@test.com" type="normal"> + <reply xmlns="urn:xmpp:reply:0" id="other-id" to="from@from.com/res" /> + <body>0 to 8</body> + </message> + """ + ) + + +logging.basicConfig(level=logging.DEBUG) +suite = unittest.TestLoader().loadTestsFromTestCase(TestReply) |