summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0055/search.py
blob: a45b52a7ea61a0d64b3f398e18be6966a6fe59fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import logging

from slixmpp import CoroutineCallback, StanzaPath, Iq, register_stanza_plugin
from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream import StanzaBase

from . import stanza


class XEP_0055(BasePlugin):
    """
    XEP-0055: Jabber Search

    The config options are only useful for a "server-side" search feature,
    and if the ``provide_search`` option is set to True.

    API
    ===

    ``search_get_form``: customize the search form content (ie fields)

    ``search_query``: return search results
    """
    name = "xep_0055"
    description = "XEP-0055: Jabber search"
    dependencies = {"xep_0004", "xep_0030"}
    stanza = stanza
    default_config = {
        "form_fields": {"first", "last"},
        "form_instructions": "",
        "form_title": "",
        "provide_search": True
    }

    def plugin_init(self):
        register_stanza_plugin(Iq, stanza.Search)
        register_stanza_plugin(stanza.Search, self.xmpp["xep_0004"].stanza.Form)

        if self.provide_search:
            self.xmpp["xep_0030"].add_feature(stanza.Search.namespace)
            self.xmpp.register_handler(
                CoroutineCallback(
                    "search",
                    StanzaPath("/iq/search"),
                    self._handle_search,
                )
            )
            self.api.register(self._get_form, "search_get_form")
            self.api.register(self._get_results, "search_query")

    async def _handle_search(self, iq: StanzaBase):
        if iq["search"]["form"].get_values():
            reply = await self.api["search_query"](None, None, iq.get_from(), iq)
            reply["search"]["form"]["type"] = "result"
        else:
            reply = await self.api["search_get_form"](None, None, iq.get_from(), iq)
        reply["search"]["form"].add_field(
            "FORM_TYPE", value=stanza.Search.namespace, ftype="hidden"
        )
        reply.send()

    async def _get_form(self, jid, node, ifrom, iq):
        reply = iq.reply()
        form = reply["search"]["form"]
        form["title"] = self.form_title
        form["instructions"] = self.form_instructions
        for field in self.form_fields:
            form.add_field(field)
        return reply

    async def _get_results(self, jid, node, ifrom, iq):
        reply = iq.reply()
        form = reply["search"]["form"]
        form["type"] = "result"

        for field in self.form_fields:
            form.add_reported(field)
        return reply

    def make_search_iq(self, **kwargs):
        iq = self.xmpp.make_iq(itype="set", **kwargs)
        iq["search"]["form"].set_type("submit")
        iq["search"]["form"].add_field(
            "FORM_TYPE", value=stanza.Search.namespace, ftype="hidden"
        )
        return iq


log = logging.getLogger(__name__)