summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--slixmpp/plugins/xep_0461/__init__.py6
-rw-r--r--slixmpp/plugins/xep_0461/reply.py48
-rw-r--r--slixmpp/plugins/xep_0461/stanza.py47
-rw-r--r--tests/test_stanza_xep_0461.py48
-rw-r--r--tests/test_stream_xep_0461.py48
5 files changed, 197 insertions, 0 deletions
diff --git a/slixmpp/plugins/xep_0461/__init__.py b/slixmpp/plugins/xep_0461/__init__.py
new file mode 100644
index 00000000..1e9b2829
--- /dev/null
+++ b/slixmpp/plugins/xep_0461/__init__.py
@@ -0,0 +1,6 @@
+from slixmpp.plugins.base import register_plugin
+
+from .reply import XEP_0461
+from . import stanza
+
+register_plugin(XEP_0461)
diff --git a/slixmpp/plugins/xep_0461/reply.py b/slixmpp/plugins/xep_0461/reply.py
new file mode 100644
index 00000000..6607012a
--- /dev/null
+++ b/slixmpp/plugins/xep_0461/reply.py
@@ -0,0 +1,48 @@
+from slixmpp.plugins import BasePlugin
+from slixmpp.types import JidStr
+from slixmpp.xmlstream import StanzaBase
+from slixmpp.xmlstream.handler import Callback
+from slixmpp.xmlstream.matcher import StanzaPath
+
+from . import stanza
+
+
+class XEP_0461(BasePlugin):
+ """XEP-0461: Message Replies"""
+
+ name = "xep_0461"
+ description = "XEP-0461: Message Replies"
+
+ dependencies = {"xep_0030"}
+ stanza = stanza
+ namespace = stanza.NS
+
+ def plugin_init(self) -> None:
+ stanza.register_plugins()
+ self.xmpp.register_handler(
+ Callback(
+ "Message replied to",
+ StanzaPath("message/reply"),
+ self._handle_reply_to_message,
+ )
+ )
+
+ def plugin_end(self):
+ self.xmpp.plugin["xep_0030"].del_feature(feature=stanza.NS)
+
+ def session_bind(self, jid):
+ self.xmpp.plugin["xep_0030"].add_feature(feature=stanza.NS)
+
+ def _handle_reply_to_message(self, msg: StanzaBase):
+ self.xmpp.event("message_reply", msg)
+
+ def send_reply(self, reply_to: JidStr, reply_id: str, **msg_kwargs):
+ """
+
+ :param reply_to: Full JID of the quoted author
+ :param reply_id: ID of the message to reply to
+ """
+ msg = self.xmpp.make_message(**msg_kwargs)
+ msg["reply"]["to"] = reply_to
+ msg["reply"]["id"] = reply_id
+ msg.send()
diff --git a/slixmpp/plugins/xep_0461/stanza.py b/slixmpp/plugins/xep_0461/stanza.py
new file mode 100644
index 00000000..b99b2745
--- /dev/null
+++ b/slixmpp/plugins/xep_0461/stanza.py
@@ -0,0 +1,47 @@
+from slixmpp.stanza import Message
+from slixmpp.xmlstream import ElementBase, register_stanza_plugin
+
+NS = "urn:xmpp:reply:0"
+
+
+class Reply(ElementBase):
+ namespace = NS
+ name = "reply"
+ plugin_attrib = "reply"
+ interfaces = {"id", "to"}
+
+
+class FeatureFallBack(ElementBase):
+ # should also be a multi attrib
+ namespace = "urn:xmpp:feature-fallback:0"
+ name = "fallback"
+ plugin_attrib = "feature_fallback"
+ interfaces = {"for"}
+
+ def get_stripped_body(self):
+ # only works for a single fallback_body attrib
+ start = self["fallback_body"]["start"]
+ end = self["fallback_body"]["end"]
+ body = self.parent()["body"]
+ try:
+ start = int(start)
+ end = int(end)
+ except ValueError:
+ return body
+ else:
+ return body[:start] + body[end:]
+
+
+class FallBackBody(ElementBase):
+ # According to https://xmpp.org/extensions/inbox/compatibility-fallback.html
+ # this should be a multi_attrib *but* since it's a protoXEP, we'll see...
+ namespace = FeatureFallBack.namespace
+ name = "body"
+ plugin_attrib = "fallback_body"
+ interfaces = {"start", "end"}
+
+
+def register_plugins():
+ register_stanza_plugin(Message, Reply)
+ register_stanza_plugin(Message, FeatureFallBack)
+ register_stanza_plugin(FeatureFallBack, FallBackBody)
diff --git a/tests/test_stanza_xep_0461.py b/tests/test_stanza_xep_0461.py
new file mode 100644
index 00000000..b9550481
--- /dev/null
+++ b/tests/test_stanza_xep_0461.py
@@ -0,0 +1,48 @@
+import unittest
+from slixmpp import Message
+from slixmpp.test import SlixTest
+from slixmpp.plugins.xep_0461 import stanza
+
+
+class TestReply(SlixTest):
+ def setUp(self):
+ stanza.register_plugins()
+
+ def testReply(self):
+ message = Message()
+ message["reply"]["id"] = "some-id"
+ message["body"] = "some-body"
+
+ self.check(
+ message,
+ """
+ <message>
+ <reply xmlns="urn:xmpp:reply:0" id="some-id" />
+ <body>some-body</body>
+ </message>
+ """,
+ )
+
+ def testFallback(self):
+ message = Message()
+ message["body"] = "12345\nrealbody"
+ message["feature_fallback"]["for"] = "NS"
+ message["feature_fallback"]["fallback_body"]["start"] = "0"
+ message["feature_fallback"]["fallback_body"]["end"] = "6"
+
+ self.check(
+ message,
+ """
+ <message xmlns="jabber:client">
+ <body>12345\nrealbody</body>
+ <fallback xmlns='urn:xmpp:feature-fallback:0' for='NS'>
+ <body start="0" end="6" />
+ </fallback>
+ </message>
+ """,
+ )
+
+ assert message["feature_fallback"].get_stripped_body() == "realbody"
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestReply)
diff --git a/tests/test_stream_xep_0461.py b/tests/test_stream_xep_0461.py
new file mode 100644
index 00000000..b73a9964
--- /dev/null
+++ b/tests/test_stream_xep_0461.py
@@ -0,0 +1,48 @@
+import logging
+import unittest
+from slixmpp.test import SlixTest
+
+
+class TestReply(SlixTest):
+ def setUp(self):
+ self.stream_start(plugins=["xep_0461"])
+
+ def tearDown(self):
+ self.stream_close()
+
+ def testFallBackBody(self):
+ async def on_reply(msg):
+ start = msg["feature_fallback"]["fallback_body"]["start"]
+ end = msg["feature_fallback"]["fallback_body"]["end"]
+ self.xmpp["xep_0461"].send_reply(
+ reply_to=msg.get_from(),
+ reply_id=msg.get_id(),
+ mto="test@test.com",
+ mbody=f"{start} to {end}",
+ )
+
+ self.xmpp.add_event_handler("message_reply", on_reply)
+
+ self.recv(
+ """
+ <message id="other-id" from="from@from.com/res">
+ <reply xmlns="urn:xmpp:reply:0" id="some-id" />
+ <body>&gt; quoted\nsome-body</body>
+ <fallback xmlns='urn:xmpp:feature-fallback:0' for='urn:xmpp:reply:0'>
+ <body start="0" end="8" />
+ </fallback>
+ </message>
+ """
+ )
+ self.send(
+ """
+ <message xmlns="jabber:client" to="test@test.com" type="normal">
+ <reply xmlns="urn:xmpp:reply:0" id="other-id" to="from@from.com/res" />
+ <body>0 to 8</body>
+ </message>
+ """
+ )
+
+
+logging.basicConfig(level=logging.DEBUG)
+suite = unittest.TestLoader().loadTestsFromTestCase(TestReply)