summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/connection.py1
-rw-r--r--src/tabs.py118
2 files changed, 118 insertions, 1 deletions
diff --git a/src/connection.py b/src/connection.py
index f4db0816..dc4b170a 100644
--- a/src/connection.py
+++ b/src/connection.py
@@ -60,6 +60,7 @@ class Connection(sleekxmpp.ClientXMPP):
self.register_plugin('xep_0048')
self.register_plugin('xep_0071')
self.register_plugin('xep_0085')
+ self.register_plugin('xep_0191')
if config.get('send_poezio_info', 'true') == 'true':
info = {'name':'poezio',
'version':'0.8-dev'}
diff --git a/src/tabs.py b/src/tabs.py
index 7a9affb8..af62ea8c 100644
--- a/src/tabs.py
+++ b/src/tabs.py
@@ -39,7 +39,7 @@ import multiuserchat as muc
from theming import get_theme
-from sleekxmpp.xmlstream.stanzabase import JID
+from sleekxmpp import JID, InvalidJID
from sleekxmpp.xmlstream import matcher
from sleekxmpp.xmlstream.handler import Callback
from config import config
@@ -1859,10 +1859,126 @@ class RosterInfoTab(Tab):
self.commands['export'] = (self.command_export, _("Usage: /export [/path/to/file]\nExport: Export your contacts into /path/to/file if specified, or $HOME/poezio_contacts if not."), self.completion_file)
self.commands['import'] = (self.command_import, _("Usage: /import [/path/to/file]\nImport: Import your contacts from /path/to/file if specified, or $HOME/poezio_contacts if not."), self.completion_file)
self.commands['clear_infos'] = (self.command_clear_infos, _("Usage: /clear_infos\nClear Infos: Use this command to clear the info buffer."), None)
+ self.core.xmpp.add_event_handler('session_start',
+ lambda event: self.core.xmpp.plugin['xep_0030'].get_info(
+ jid=self.core.xmpp.boundjid.domain,
+ block=False, timeout=5, callback=self.check_blocking)
+ )
self.resize()
self.update_commands()
self.update_keys()
+ def check_blocking(self, iq):
+ if iq['type'] == 'error':
+ return
+ if 'urn:xmpp:blocking' in iq['disco_info'].get_features():
+ self.commands['block'] = (self.command_block, _("Usage: /block [jid]\nBlock: prevent a JID from talking to you."), self.completion_block)
+ self.commands['unblock'] = (self.command_unblock, _("Usage: /unblock [jid]\nUnblock: allow a JID to talk to you."), self.completion_unblock)
+ self.commands['list_blocks'] = (self.command_list_blocks, _("Usage: /list_blocks\nList Blocks: Retrieve the list of the blocked contacts."), None)
+ self.core.xmpp.del_event_handler('session_start', self.check_blocking)
+ self.core.xmpp.add_event_handler('blocked_message', self.on_blocked_message)
+ else:
+ self.core.information('Simple blocking not supported by the server', 'Info')
+
+ def on_blocked_message(self, message):
+ """
+ When we try to send a message to a blocked contact
+ """
+ tab = self.core.get_conversation_by_jid(message['from'], False)
+ if not tab:
+ log.debug('Received message from nonexistent tab: %s', message['from'])
+ message = '\x19%(info_col)s}Cannot send message to %(jid)s: contact blocked' % {
+ 'info_col': get_theme().COLOR_INFORMATION_TEXT[0],
+ 'jid': message['from'],
+ }
+ tab.add_message(message)
+
+ def command_block(self, arg):
+ """
+ /block [jid]
+ """
+ def callback(iq):
+ if iq['type'] == 'error':
+ return self.core.information('Could not block the contact.', 'Error')
+ elif iq['type'] == 'result':
+ return self.core.information('Contact blocked.', 'Info')
+
+ item = self.roster_win.selected_row
+ if arg:
+ try:
+ jid = JID(arg)
+ except InvalidJID:
+ self.core.information('JID not well-formed', 'Error')
+ return
+ elif isinstance(item, Contact):
+ jid = item.bare_jid
+ elif isinstance(item, Resource):
+ jid = item.jid.bare
+ self.core.xmpp.plugin['xep_0191'].block(jid, block=False, callback=callback)
+
+ def completion_block(self, the_input):
+ """
+ Completion for /block
+ """
+ jids = roster.jids()
+ return the_input.auto_completion(jids, '', quotify=False)
+
+ def command_unblock(self, arg):
+ """
+ /unblock [jid]
+ """
+ def callback(iq):
+ if iq['type'] == 'error':
+ return self.core.information('Could not unblock the contact.', 'Error')
+ elif iq['type'] == 'result':
+ return self.core.information('Contact unblocked.', 'Info')
+
+ item = self.roster_win.selected_row
+ if arg:
+ try:
+ jid = JID(arg)
+ except InvalidJID:
+ self.core.information('JID not well-formed', 'Error')
+ return
+ elif isinstance(item, Contact):
+ jid = item.bare_jid
+ elif isinstance(item, Resource):
+ jid = item.jid.bare
+ self.core.xmpp.plugin['xep_0191'].unblock(jid, block=False, callback=callback)
+
+ def completion_unblock(self, the_input):
+ """
+ Completion for /unblock
+ """
+ try:
+ iq = self.core.xmpp.plugin['xep_0191'].get_blocked(block=True)
+ except Exception as e:
+ iq = e.iq
+ finally:
+ if iq['type'] == 'error':
+ return
+ l = [str(item) for item in iq['blocklist']['items']]
+ return the_input.auto_completion(l, quotify=False)
+
+ def command_list_blocks(self, arg=None):
+ """
+ /list_blocks
+ """
+ def callback(iq):
+ if iq['type'] == 'error':
+ return self.core.information('Could not retrieve the blocklist.', 'Error')
+ s = 'List of blocked JIDs:\n'
+ log.debug('COCUCOCOCOCOCOCOC\n%s\n\n', iq['blocklist']['items'])
+ items = (str(item) for item in iq['blocklist']['items'])
+ jids = '\n'.join(items)
+ if jids:
+ s += jids
+ else:
+ s = 'No blocked JIDs.'
+ self.core.information(s, 'Info')
+
+ self.core.xmpp.plugin['xep_0191'].get_blocked(block=False, callback=callback)
+
def resize(self):
if not self.visible:
return