From 332a5c2553db41de777473a1e1be9cd1522c9496 Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Thu, 31 Mar 2016 18:54:41 +0100 Subject: Move the src directory to poezio, for better cython compatibility. --- poezio/tabs/rostertab.py | 1280 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1280 insertions(+) create mode 100644 poezio/tabs/rostertab.py (limited to 'poezio/tabs/rostertab.py') diff --git a/poezio/tabs/rostertab.py b/poezio/tabs/rostertab.py new file mode 100644 index 00000000..a5c22304 --- /dev/null +++ b/poezio/tabs/rostertab.py @@ -0,0 +1,1280 @@ +""" +The RosterInfoTab is the tab showing roster info, the list of contacts, +half of it is dedicated to showing the information buffer, and a small +rectangle shows the current contact info. + +This module also includes functions to match users in the roster. +""" +import logging +log = logging.getLogger(__name__) + +import base64 +import curses +import difflib +import os +import ssl +from os import getenv, path +from functools import partial + +from . import Tab + +import common +import windows +from common import safeJID +from config import config +from contact import Contact, Resource +from decorators import refresh_wrapper +from roster import RosterGroup, roster +from theming import get_theme, dump_tuple +from decorators import command_args_parser + +class RosterInfoTab(Tab): + """ + A tab, splitted in two, containing the roster and infos + """ + plugin_commands = {} + plugin_keys = {} + def __init__(self): + Tab.__init__(self) + self.name = "Roster" + self.v_separator = windows.VerticalSeparator() + self.information_win = windows.TextWin() + self.core.information_buffer.add_window(self.information_win) + self.roster_win = windows.RosterWin() + self.contact_info_win = windows.ContactInfoWin() + self.default_help_message = windows.HelpText("Enter commands with “/”. “o”: toggle offline show") + self.input = self.default_help_message + self.state = 'normal' + self.key_func['^I'] = self.completion + self.key_func["/"] = self.on_slash + # disable most of the roster features when in anonymous mode + if not self.core.xmpp.anon: + self.key_func[' '] = self.on_space + self.key_func["KEY_UP"] = self.move_cursor_up + self.key_func["KEY_DOWN"] = self.move_cursor_down + self.key_func["M-u"] = self.move_cursor_to_next_contact + self.key_func["M-y"] = self.move_cursor_to_prev_contact + self.key_func["M-U"] = self.move_cursor_to_next_group + self.key_func["M-Y"] = self.move_cursor_to_prev_group + self.key_func["M-[1;5B"] = self.move_cursor_to_next_group + self.key_func["M-[1;5A"] = self.move_cursor_to_prev_group + self.key_func["l"] = self.command_last_activity + self.key_func["o"] = self.toggle_offline_show + self.key_func["v"] = self.get_contact_version + self.key_func["i"] = self.show_contact_info + self.key_func["s"] = self.start_search + self.key_func["S"] = self.start_search_slow + self.key_func["n"] = self.change_contact_name + self.register_command('deny', self.command_deny, + usage='[jid]', + desc='Deny your presence to the provided JID (or the ' + 'selected contact in your roster), who is asking' + 'you to be in his/here roster.', + shortdesc='Deny an user your presence.', + completion=self.completion_deny) + self.register_command('accept', self.command_accept, + usage='[jid]', + desc='Allow the provided JID (or the selected contact ' + 'in your roster), to see your presence.', + shortdesc='Allow an user your presence.', + completion=self.completion_deny) + self.register_command('add', self.command_add, + usage='', + desc='Add the specified JID to your roster, ask him to' + ' allow you to see his presence, and allow him to' + ' see your presence.', + shortdesc='Add an user to your roster.') + self.register_command('name', self.command_name, + usage=' [name]', + shortdesc='Set the given JID\'s name.', + completion=self.completion_name) + self.register_command('groupadd', self.command_groupadd, + usage=' ', + desc='Add the given JID to the given group.', + shortdesc='Add an user to a group', + completion=self.completion_groupadd) + self.register_command('groupmove', self.command_groupmove, + usage=' ', + desc='Move the given JID from the old group to the new group.', + shortdesc='Move an user to another group.', + completion=self.completion_groupmove) + self.register_command('groupremove', self.command_groupremove, + usage=' ', + desc='Remove the given JID from the given group.', + shortdesc='Remove an user from a group.', + completion=self.completion_groupremove) + self.register_command('remove', self.command_remove, + usage='[jid]', + desc='Remove the specified JID from your roster. This ' + 'will unsubscribe you from its presence, cancel ' + 'its subscription to yours, and remove the item ' + 'from your roster.', + shortdesc='Remove an user from your roster.', + completion=self.completion_remove) + self.register_command('export', self.command_export, + usage='[/path/to/file]', + desc='Export your contacts into /path/to/file if ' + 'specified, or $HOME/poezio_contacts if not.', + shortdesc='Export your roster to a file.', + completion=partial(self.completion_file, 1)) + self.register_command('import', self.command_import, + usage='[/path/to/file]', + desc='Import your contacts from /path/to/file if ' + 'specified, or $HOME/poezio_contacts if not.', + shortdesc='Import your roster from a file.', + completion=partial(self.completion_file, 1)) + self.register_command('password', self.command_password, + usage='', + shortdesc='Change your password') + + self.register_command('reconnect', self.command_reconnect, + desc='Disconnect from the remote server if you are ' + 'currently connected and then connect to it again.', + shortdesc='Disconnect and reconnect to the server.') + self.register_command('disconnect', self.command_disconnect, + desc='Disconnect from the remote server.', + shortdesc='Disconnect from the server.') + self.register_command('clear', self.command_clear, + shortdesc='Clear the info buffer.') + self.register_command('last_activity', self.command_last_activity, + usage='', + desc='Informs you of the last activity of a JID.', + shortdesc='Get the activity of someone.', + completion=self.core.completion_last_activity) + + self.resize() + self.update_commands() + self.update_keys() + + def check_blocking(self, features): + if 'urn:xmpp:blocking' in features and not self.core.xmpp.anon: + self.register_command('block', self.command_block, + usage='[jid]', + shortdesc='Prevent a JID from talking to you.', + completion=self.completion_block) + self.register_command('unblock', self.command_unblock, + usage='[jid]', + shortdesc='Allow a JID to talk to you.', + completion=self.completion_unblock) + self.register_command('list_blocks', self.command_list_blocks, + shortdesc='Show the blocked contacts.') + self.core.xmpp.del_event_handler('session_start', self.check_blocking) + self.core.xmpp.add_event_handler('blocked_message', self.on_blocked_message) + + def check_saslexternal(self, features): + if 'urn:xmpp:saslcert:1' in features and not self.core.xmpp.anon: + self.register_command('certs', self.command_certs, + desc='List the fingerprints of certificates' + ' which can connect to your account.', + shortdesc='List allowed client certs.') + self.register_command('cert_add', self.command_cert_add, + desc='Add a client certificate to the authorized ones. ' + 'It must have an unique name and be contained in ' + 'a PEM file. [management] is a boolean indicating' + ' if a client connected using this certificate can' + ' manage the certificates itself.', + shortdesc='Add a client certificate.', + usage=' [management]', + completion=self.completion_cert_add) + self.register_command('cert_disable', self.command_cert_disable, + desc='Remove a certificate from the list ' + 'of allowed ones. Clients currently ' + 'using this certificate will not be ' + 'forcefully disconnected.', + shortdesc='Disable a certificate', + usage='') + self.register_command('cert_revoke', self.command_cert_revoke, + desc='Remove a certificate from the list ' + 'of allowed ones. Clients currently ' + 'using this certificate will be ' + 'forcefully disconnected.', + shortdesc='Revoke a certificate', + usage='') + self.register_command('cert_fetch', self.command_cert_fetch, + desc='Retrieve a certificate with its ' + 'name. It will be stored in .', + shortdesc='Fetch a certificate', + usage=' ', + completion=self.completion_cert_fetch) + + @property + def selected_row(self): + return self.roster_win.get_selected_row() + + @command_args_parser.ignored + def command_certs(self): + """ + /certs + """ + def cb(iq): + if iq['type'] == 'error': + self.core.information('Unable to retrieve the certificate list.', + 'Error') + return + certs = [] + for item in iq['sasl_certs']['items']: + users = '\n'.join(item['users']) + certs.append((item['name'], users)) + + if not certs: + return self.core.information('No certificates found', 'Info') + msg = 'Certificates:\n' + msg += '\n'.join(((' %s%s' % (item[0] + (': ' if item[1] else ''), item[1])) for item in certs)) + self.core.information(msg, 'Info') + + self.core.xmpp.plugin['xep_0257'].get_certs(callback=cb, timeout=3) + + @command_args_parser.quoted(2, 1) + def command_cert_add(self, args): + """ + /cert_add [cert-management] + """ + if not args or len(args) < 2: + return self.core.command_help('cert_add') + def cb(iq): + if iq['type'] == 'error': + self.core.information('Unable to add the certificate.', 'Error') + else: + self.core.information('Certificate added.', 'Info') + + name = args[0] + + try: + with open(args[1]) as fd: + crt = fd.read() + crt = crt.replace(ssl.PEM_FOOTER, '').replace(ssl.PEM_HEADER, '').replace(' ', '').replace('\n', '') + except Exception as e: + self.core.information('Unable to read the certificate: %s' % e, 'Error') + return + + if len(args) > 2: + management = args[2] + if management: + management = management.lower() + if management not in ('false', '0'): + management = True + else: + management = False + else: + management = False + else: + management = True + + self.core.xmpp.plugin['xep_0257'].add_cert(name, crt, callback=cb, + allow_management=management) + + def completion_cert_add(self, the_input): + """ + completion for /cert_add [management] + """ + text = the_input.get_text() + args = common.shell_split(text) + n = the_input.get_argument_position() + log.debug('%s %s %s', the_input.text, n, the_input.pos) + if n == 1: + return + elif n == 2: + return self.completion_file(2, the_input) + elif n == 3: + return the_input.new_completion(['true', 'false'], n) + + @command_args_parser.quoted(1) + def command_cert_disable(self, args): + """ + /cert_disable + """ + if not args: + return self.core.command_help('cert_disable') + def cb(iq): + if iq['type'] == 'error': + self.core.information('Unable to disable the certificate.', 'Error') + else: + self.core.information('Certificate disabled.', 'Info') + + name = args[0] + + self.core.xmpp.plugin['xep_0257'].disable_cert(name, callback=cb) + + @command_args_parser.quoted(1) + def command_cert_revoke(self, args): + """ + /cert_revoke + """ + if not args: + return self.core.command_help('cert_revoke') + def cb(iq): + if iq['type'] == 'error': + self.core.information('Unable to revoke the certificate.', 'Error') + else: + self.core.information('Certificate revoked.', 'Info') + + name = args[0] + + self.core.xmpp.plugin['xep_0257'].revoke_cert(name, callback=cb) + + + @command_args_parser.quoted(2) + def command_cert_fetch(self, args): + """ + /cert_fetch + """ + if not args or len(args) < 2: + return self.core.command_help('cert_fetch') + def cb(iq): + if iq['type'] == 'error': + self.core.information('Unable to fetch the certificate.', + 'Error') + return + + cert = None + for item in iq['sasl_certs']['items']: + if item['name'] == name: + cert = base64.b64decode(item['x509cert']) + break + + if not cert: + return self.core.information('Certificate not found.', 'Info') + + cert = ssl.DER_cert_to_PEM_cert(cert) + with open(path, 'w') as fd: + fd.write(cert) + + self.core.information('File stored at %s' % path, 'Info') + + name = args[0] + path = args[1] + + self.core.xmpp.plugin['xep_0257'].get_certs(callback=cb) + + def completion_cert_fetch(self, the_input): + """ + completion for /cert_fetch + """ + text = the_input.get_text() + args = common.shell_split(text) + n = the_input.get_argument_position() + log.debug('%s %s %s', the_input.text, n, the_input.pos) + if n == 1: + return + elif n == 2: + return self.completion_file(2, the_input) + + def on_blocked_message(self, message): + """ + When we try to send a message to a blocked contact + """ + tab = self.core.get_conversation_by_jid(message['from'], False) + if not tab: + log.debug('Received message from nonexistent tab: %s', message['from']) + message = '\x19%(info_col)s}Cannot send message to %(jid)s: contact blocked' % { + 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT), + 'jid': message['from'], + } + tab.add_message(message) + + @command_args_parser.quoted(0, 1) + def command_block(self, args): + """ + /block [jid] + """ + def callback(iq): + if iq['type'] == 'error': + return self.core.information('Could not block the contact.', 'Error') + elif iq['type'] == 'result': + return self.core.information('Contact blocked.', 'Info') + + item = self.roster_win.selected_row + if args: + jid = safeJID(args[0]) + elif isinstance(item, Contact): + jid = item.bare_jid + elif isinstance(item, Resource): + jid = item.jid.bare + self.core.xmpp.plugin['xep_0191'].block(jid, callback=callback) + + def completion_block(self, the_input): + """ + Completion for /block + """ + if the_input.get_argument_position() == 1: + jids = roster.jids() + return the_input.new_completion(jids, 1, '', quotify=False) + + @command_args_parser.quoted(0, 1) + def command_unblock(self, args): + """ + /unblock [jid] + """ + def callback(iq): + if iq['type'] == 'error': + return self.core.information('Could not unblock the contact.', 'Error') + elif iq['type'] == 'result': + return self.core.information('Contact unblocked.', 'Info') + + item = self.roster_win.selected_row + if args: + jid = safeJID(args[0]) + elif isinstance(item, Contact): + jid = item.bare_jid + elif isinstance(item, Resource): + jid = item.jid.bare + self.core.xmpp.plugin['xep_0191'].unblock(jid, callback=callback) + + def completion_unblock(self, the_input): + """ + Completion for /unblock + """ + def on_result(iq): + if iq['type'] == 'error': + return + l = sorted(str(item) for item in iq['blocklist']['items']) + return the_input.new_completion(l, 1, quotify=False) + + if the_input.get_argument_position(): + self.core.xmpp.plugin['xep_0191'].get_blocked(callback=on_result) + return True + + @command_args_parser.ignored + def command_list_blocks(self): + """ + /list_blocks + """ + def callback(iq): + if iq['type'] == 'error': + return self.core.information('Could not retrieve the blocklist.', 'Error') + s = 'List of blocked JIDs:\n' + items = (str(item) for item in iq['blocklist']['items']) + jids = '\n'.join(items) + if jids: + s += jids + else: + s = 'No blocked JIDs.' + self.core.information(s, 'Info') + + self.core.xmpp.plugin['xep_0191'].get_blocked(callback=callback) + + @command_args_parser.ignored + def command_reconnect(self): + """ + /reconnect + """ + if self.core.xmpp.is_connected(): + self.core.disconnect(reconnect=True) + else: + self.core.xmpp.connect() + + @command_args_parser.ignored + def command_disconnect(self): + """ + /disconnect + """ + self.core.disconnect() + + @command_args_parser.quoted(0, 1) + def command_last_activity(self, args): + """ + /activity [jid] + """ + item = self.roster_win.selected_row + if args: + jid = args[0] + elif isinstance(item, Contact): + jid = item.bare_jid + elif isinstance(item, Resource): + jid = item.jid + else: + self.core.information('No JID selected.', 'Error') + return + self.core.command_last_activity(jid) + + def resize(self): + self.need_resize = False + if self.size.tab_degrade_x: + display_info = False + roster_width = self.width + else: + display_info = True + roster_width = self.width // 2 + if self.size.tab_degrade_y: + display_contact_win = False + contact_win_h = 0 + else: + display_contact_win = True + contact_win_h = 4 + if self.size.tab_degrade_y: + tab_win_height = 0 + else: + tab_win_height = Tab.tab_win_height() + + info_width = self.width - roster_width - 1 + if display_info: + self.v_separator.resize(self.height - 1 - tab_win_height, + 1, 0, roster_width) + self.information_win.resize(self.height - 1 - tab_win_height + - contact_win_h, + info_width, 0, roster_width + 1, + self.core.information_buffer) + if display_contact_win: + self.contact_info_win.resize(contact_win_h, + info_width, + self.height - tab_win_height + - contact_win_h - 1, + roster_width + 1) + self.roster_win.resize(self.height - 1 - Tab.tab_win_height(), + roster_width, 0, 0) + self.input.resize(1, self.width, self.height-1, 0) + self.default_help_message.resize(1, self.width, self.height-1, 0) + + def completion(self): + # Check if we are entering a command (with the '/' key) + if isinstance(self.input, windows.Input) and\ + not self.input.help_message: + self.complete_commands(self.input) + + def completion_file(self, complete_number, the_input): + """ + Generic quoted completion for files/paths + (use functools.partial to use directly as a completion + for a command) + """ + text = the_input.get_text() + args = common.shell_split(text) + n = the_input.get_argument_position() + if n == complete_number: + if args[n-1] == '' or len(args) < n+1: + home = os.getenv('HOME') or '/' + return the_input.new_completion([home, '/tmp'], n, quotify=True) + path_ = args[n] + if path.isdir(path_): + dir_ = path_ + base = '' + else: + dir_ = path.dirname(path_) + base = path.basename(path_) + try: + names = os.listdir(dir_) + except OSError: + names = [] + names_filtered = [name for name in names if name.startswith(base)] + if names_filtered: + names = names_filtered + if not names: + names = [path_] + end_list = [] + for name in names: + value = os.path.join(dir_, name) + if not name.startswith('.'): + end_list.append(value) + + return the_input.new_completion(end_list, n, quotify=True) + + @command_args_parser.ignored + def command_clear(self): + """ + /clear + """ + self.core.information_buffer.messages = [] + self.information_win.rebuild_everything(self.core.information_buffer) + self.core.information_win.rebuild_everything(self.core.information_buffer) + self.refresh() + + @command_args_parser.quoted(1) + def command_password(self, args): + """ + /password + """ + def callback(iq): + if iq['type'] == 'result': + self.core.information('Password updated', 'Account') + if config.get('password'): + config.silent_set('password', args[0]) + else: + self.core.information('Unable to change the password', 'Account') + self.core.xmpp.plugin['xep_0077'].change_password(args[0], callback=callback) + + @command_args_parser.quoted(0, 1) + def command_deny(self, args): + """ + /deny [jid] + Denies a JID from our roster + """ + if not args: + item = self.roster_win.selected_row + if isinstance(item, Contact): + jid = item.bare_jid + else: + self.core.information('No subscription to deny') + return + else: + jid = safeJID(args[0]).bare + if not jid in [jid for jid in roster.jids()]: + self.core.information('No subscription to deny') + return + + contact = roster[jid] + if contact: + contact.unauthorize() + self.core.information('Subscription to %s was revoked' % jid, + 'Roster') + + @command_args_parser.quoted(1) + def command_add(self, args): + """ + Add the specified JID to the roster, and set automatically + accept the reverse subscription + """ + if args is None: + self.core.information('No JID specified', 'Error') + return + jid = safeJID(safeJID(args[0]).bare) + if not str(jid): + self.core.information('The provided JID (%s) is not valid' % (args[0],), 'Error') + return + if jid in roster and roster[jid].subscription in ('to', 'both'): + return self.core.information('Already subscribed.', 'Roster') + roster.add(jid) + roster.modified() + self.core.information('%s was added to the roster' % jid, 'Roster') + + @command_args_parser.quoted(1, 1) + def command_name(self, args): + """ + Set a name for the specified JID in your roster + """ + def callback(iq): + if not iq: + self.core.information('The name could not be set.', 'Error') + log.debug('Error in /name:\n%s', iq) + if args is None: + return self.core.command_help('name') + jid = safeJID(args[0]).bare + name = args[1] if len(args) == 2 else '' + + contact = roster[jid] + if contact is None: + self.core.information('No such JID in roster', 'Error') + return + + groups = set(contact.groups) + if 'none' in groups: + groups.remove('none') + subscription = contact.subscription + self.core.xmpp.update_roster(jid, name=name, groups=groups, + subscription=subscription, callback=callback) + + @command_args_parser.quoted(2) + def command_groupadd(self, args): + """ + Add the specified JID to the specified group + """ + if args is None: + return self.core.command_help('groupadd') + jid = safeJID(args[0]).bare + group = args[1] + + contact = roster[jid] + if contact is None: + self.core.information('No such JID in roster', 'Error') + return + + new_groups = set(contact.groups) + if group in new_groups: + self.core.information('JID already in group', 'Error') + return + + roster.modified() + new_groups.add(group) + try: + new_groups.remove('none') + except KeyError: + pass + + name = contact.name + subscription = contact.subscription + + def callback(iq): + if iq: + roster.update_contact_groups(jid) + else: + self.core.information('The group could not be set.', 'Error') + log.debug('Error in groupadd:\n%s', iq) + + self.core.xmpp.update_roster(jid, name=name, groups=new_groups, + subscription=subscription, callback=callback) + + @command_args_parser.quoted(3) + def command_groupmove(self, args): + """ + Remove the specified JID from the first specified group and add it to the second one + """ + if args is None: + return self.core.command_help('groupmove') + jid = safeJID(args[0]).bare + group_from = args[1] + group_to = args[2] + + contact = roster[jid] + if not contact: + self.core.information('No such JID in roster', 'Error') + return + + new_groups = set(contact.groups) + if 'none' in new_groups: + new_groups.remove('none') + + if group_to == 'none' or group_from == 'none': + self.core.information('"none" is not a group.', 'Error') + return + + if group_from not in new_groups: + self.core.information('JID not in first group', 'Error') + return + + if group_to in new_groups: + self.core.information('JID already in second group', 'Error') + return + + if group_to == group_from: + self.core.information('The groups are the same.', 'Error') + return + + roster.modified() + new_groups.add(group_to) + if 'none' in new_groups: + new_groups.remove('none') + + new_groups.remove(group_from) + name = contact.name + subscription = contact.subscription + + def callback(iq): + if iq: + roster.update_contact_groups(contact) + else: + self.core.information('The group could not be set') + log.debug('Error in groupmove:\n%s', iq) + + self.core.xmpp.update_roster(jid, name=name, groups=new_groups, + subscription=subscription, callback=callback) + + @command_args_parser.quoted(2) + def command_groupremove(self, args): + """ + Remove the specified JID from the specified group + """ + if args is None: + return self.core.command_help('groupremove') + + jid = safeJID(args[0]).bare + group = args[1] + + contact = roster[jid] + if contact is None: + self.core.information('No such JID in roster', 'Error') + return + + new_groups = set(contact.groups) + try: + new_groups.remove('none') + except KeyError: + pass + if group not in new_groups: + self.core.information('JID not in group', 'Error') + return + + roster.modified() + + new_groups.remove(group) + name = contact.name + subscription = contact.subscription + + def callback(iq): + if iq: + roster.update_contact_groups(jid) + else: + self.core.information('The group could not be set') + log.debug('Error in groupremove:\n%s', iq) + + self.core.xmpp.update_roster(jid, name=name, groups=new_groups, + subscription=subscription, callback=callback) + + @command_args_parser.quoted(0, 1) + def command_remove(self, args): + """ + Remove the specified JID from the roster. i.e.: unsubscribe + from its presence, and cancel its subscription to our. + """ + if args: + jid = safeJID(args[0]).bare + else: + item = self.roster_win.selected_row + if isinstance(item, Contact): + jid = item.bare_jid + else: + self.core.information('No roster item to remove') + return + roster.remove(jid) + del roster[jid] + + @command_args_parser.quoted(0, 1) + def command_import(self, args): + """ + Import the contacts + """ + if args: + if args[0].startswith('/'): + filepath = args[0] + else: + filepath = path.join(getenv('HOME'), args[0]) + else: + filepath = path.join(getenv('HOME'), 'poezio_contacts') + if not path.isfile(filepath): + self.core.information('The file %s does not exist' % filepath, 'Error') + return + try: + handle = open(filepath, 'r', encoding='utf-8') + lines = handle.readlines() + handle.close() + except IOError: + self.core.information('Could not open %s' % filepath, 'Error') + log.error('Unable to correct a message', exc_info=True) + return + for jid in lines: + self.command_add(jid.lstrip('\n')) + self.core.information('Contacts imported from %s' % filepath, 'Info') + + @command_args_parser.quoted(0, 1) + def command_export(self, args): + """ + Export the contacts + """ + if args: + if args[0].startswith('/'): + filepath = args[0] + else: + filepath = path.join(getenv('HOME'), args[0]) + else: + filepath = path.join(getenv('HOME'), 'poezio_contacts') + if path.isfile(filepath): + self.core.information('The file already exists', 'Error') + return + elif not path.isdir(path.dirname(filepath)): + self.core.information('Parent directory not found', 'Error') + return + if roster.export(filepath): + self.core.information('Contacts exported to %s' % filepath, 'Info') + else: + self.core.information('Failed to export contacts to %s' % filepath, 'Info') + + def completion_remove(self, the_input): + """ + Completion for /remove + """ + jids = [jid for jid in roster.jids()] + return the_input.auto_completion(jids, '', quotify=False) + + def completion_name(self, the_input): + """Completion for /name""" + n = the_input.get_argument_position() + if n == 1: + jids = [jid for jid in roster.jids()] + return the_input.new_completion(jids, n, quotify=True) + return False + + def completion_groupadd(self, the_input): + n = the_input.get_argument_position() + if n == 1: + jids = sorted(jid for jid in roster.jids()) + return the_input.new_completion(jids, n, '', quotify=True) + elif n == 2: + groups = sorted(group for group in roster.groups if group != 'none') + return the_input.new_completion(groups, n, '', quotify=True) + return False + + def completion_groupmove(self, the_input): + args = common.shell_split(the_input.text) + n = the_input.get_argument_position() + if n == 1: + jids = sorted(jid for jid in roster.jids()) + return the_input.new_completion(jids, n, '', quotify=True) + elif n == 2: + contact = roster[args[1]] + if not contact: + return False + groups = list(contact.groups) + if 'none' in groups: + groups.remove('none') + return the_input.new_completion(groups, n, '', quotify=True) + elif n == 3: + groups = sorted(group for group in roster.groups) + return the_input.new_completion(groups, n, '', quotify=True) + return False + + def completion_groupremove(self, the_input): + args = common.shell_split(the_input.text) + n = the_input.get_argument_position() + if n == 1: + jids = sorted(jid for jid in roster.jids()) + return the_input.new_completion(jids, n, '', quotify=True) + elif n == 2: + contact = roster[args[1]] + if contact is None: + return False + groups = sorted(contact.groups) + try: + groups.remove('none') + except ValueError: + pass + return the_input.new_completion(groups, n, '', quotify=True) + return False + + def completion_deny(self, the_input): + """ + Complete the first argument from the list of the + contact with ask=='subscribe' + """ + jids = sorted(str(contact.bare_jid) for contact in roster.contacts.values() + if contact.pending_in) + return the_input.new_completion(jids, 1, '', quotify=False) + + @command_args_parser.quoted(0, 1) + def command_accept(self, args): + """ + Accept a JID from in roster. Authorize it AND subscribe to it + """ + if not args: + item = self.roster_win.selected_row + if isinstance(item, Contact): + jid = item.bare_jid + else: + self.core.information('No subscription to accept') + return + else: + jid = safeJID(args[0]).bare + nodepart = safeJID(jid).user + jid = safeJID(jid) + # crappy transports putting resources inside the node part + if '\\2f' in nodepart: + jid.user = nodepart.split('\\2f')[0] + contact = roster[jid] + if contact is None: + return + contact.pending_in = False + roster.modified() + self.core.xmpp.send_presence(pto=jid, ptype='subscribed') + self.core.xmpp.client_roster.send_last_presence() + if contact.subscription in ('from', 'none') and not contact.pending_out: + self.core.xmpp.send_presence(pto=jid, ptype='subscribe', pnick=self.core.own_nick) + + self.core.information('%s is now authorized' % jid, 'Roster') + + def refresh(self): + if self.need_resize: + self.resize() + log.debug(' TAB Refresh: %s', self.__class__.__name__) + + display_info = not self.size.tab_degrade_x + display_contact_win = not self.size.tab_degrade_y + + self.roster_win.refresh(roster) + if display_info: + self.v_separator.refresh() + self.information_win.refresh() + if display_contact_win: + self.contact_info_win.refresh( + self.roster_win.get_selected_row()) + self.refresh_tab_win() + self.input.refresh() + + def on_input(self, key, raw): + if key == '^M': + selected_row = self.roster_win.get_selected_row() + res = self.input.do_command(key, raw=raw) + if res and not isinstance(self.input, windows.Input): + return True + elif res: + return False + if key == '^M': + self.core.on_roster_enter_key(selected_row) + return selected_row + elif not raw and key in self.key_func: + return self.key_func[key]() + + @refresh_wrapper.conditional + def toggle_offline_show(self): + """ + Show or hide offline contacts + """ + option = 'roster_show_offline' + value = config.get(option) + success = config.silent_set(option, str(not value)) + roster.modified() + if not success: + self.core.information('Unable to write in the config file', 'Error') + return True + + def on_slash(self): + """ + '/' is pressed, we enter "input mode" + """ + if isinstance(self.input, windows.YesNoInput): + return + curses.curs_set(1) + self.input = windows.CommandInput("", self.reset_help_message, self.execute_slash_command) + self.input.resize(1, self.width, self.height-1, 0) + self.input.do_command("/") # we add the slash + + def reset_help_message(self, _=None): + self.input = self.default_help_message + if self.core.current_tab() is self: + curses.curs_set(0) + self.input.refresh() + self.core.doupdate() + return True + + def execute_slash_command(self, txt): + if txt.startswith('/'): + self.input.key_enter() + self.execute_command(txt) + return self.reset_help_message() + + def on_lose_focus(self): + self.state = 'normal' + + def on_gain_focus(self): + self.state = 'current' + if isinstance(self.input, windows.HelpText): + curses.curs_set(0) + else: + curses.curs_set(1) + + @refresh_wrapper.conditional + def move_cursor_down(self): + if isinstance(self.input, windows.Input) and not self.input.history_disabled: + return + return self.roster_win.move_cursor_down() + + @refresh_wrapper.conditional + def move_cursor_up(self): + if isinstance(self.input, windows.Input) and not self.input.history_disabled: + return + return self.roster_win.move_cursor_up() + + def move_cursor_to_prev_contact(self): + self.roster_win.move_cursor_up() + while not isinstance(self.roster_win.get_selected_row(), Contact): + if not self.roster_win.move_cursor_up(): + break + self.roster_win.refresh(roster) + + def move_cursor_to_next_contact(self): + self.roster_win.move_cursor_down() + while not isinstance(self.roster_win.get_selected_row(), Contact): + if not self.roster_win.move_cursor_down(): + break + self.roster_win.refresh(roster) + + def move_cursor_to_prev_group(self): + self.roster_win.move_cursor_up() + while not isinstance(self.roster_win.get_selected_row(), RosterGroup): + if not self.roster_win.move_cursor_up(): + break + self.roster_win.refresh(roster) + + def move_cursor_to_next_group(self): + self.roster_win.move_cursor_down() + while not isinstance(self.roster_win.get_selected_row(), RosterGroup): + if not self.roster_win.move_cursor_down(): + break + self.roster_win.refresh(roster) + + def on_scroll_down(self): + return self.roster_win.move_cursor_down(self.height // 2) + + def on_scroll_up(self): + return self.roster_win.move_cursor_up(self.height // 2) + + @refresh_wrapper.conditional + def on_space(self): + if isinstance(self.input, windows.Input): + return + selected_row = self.roster_win.get_selected_row() + if isinstance(selected_row, RosterGroup): + selected_row.toggle_folded() + roster.modified() + return True + elif isinstance(selected_row, Contact): + group = "none" + found_group = False + pos = self.roster_win.pos + while not found_group and pos >= 0: + row = self.roster_win.roster_cache[pos] + pos -= 1 + if isinstance(row, RosterGroup): + found_group = True + group = row.name + selected_row.toggle_folded(group) + roster.modified() + return True + return False + + def get_contact_version(self): + """ + Show the versions of the resource(s) currently selected + """ + selected_row = self.roster_win.get_selected_row() + if isinstance(selected_row, Contact): + for resource in selected_row.resources: + self.core.command_version(str(resource.jid)) + elif isinstance(selected_row, Resource): + self.core.command_version(str(selected_row.jid)) + else: + self.core.information('Nothing to get versions from', 'Info') + + def show_contact_info(self): + """ + Show the contact info (resource number, status, presence, etc) + when 'i' is pressed. + """ + selected_row = self.roster_win.get_selected_row() + if isinstance(selected_row, Contact): + cont = selected_row + res = selected_row.get_highest_priority_resource() + acc = [] + acc.append('Contact: %s (%s)' % (cont.bare_jid, res.presence if res else 'unavailable')) + if res: + acc.append('%s connected resource%s' % (len(cont), '' if len(cont) == 1 else 's')) + acc.append('Current status: %s' % res.status) + if cont.tune: + acc.append('Tune: %s' % common.format_tune_string(cont.tune)) + if cont.mood: + acc.append('Mood: %s' % cont.mood) + if cont.activity: + acc.append('Activity: %s' % cont.activity) + if cont.gaming: + acc.append('Game: %s' % (common.format_gaming_string(cont.gaming))) + msg = '\n'.join(acc) + elif isinstance(selected_row, Resource): + res = selected_row + msg = 'Resource: %s (%s)\nCurrent status: %s\nPriority: %s' % ( + res.jid, + res.presence, + res.status, + res.priority) + elif isinstance(selected_row, RosterGroup): + rg = selected_row + msg = 'Group: %s [%s/%s] contacts online' % ( + rg.name, + rg.get_nb_connected_contacts(), + len(rg),) + else: + msg = None + if msg: + self.core.information(msg, 'Info') + + def change_contact_name(self): + """ + Auto-fill a /name command when 'n' is pressed + """ + selected_row = self.roster_win.get_selected_row() + if isinstance(selected_row, Contact): + jid = selected_row.bare_jid + elif isinstance(selected_row, Resource): + jid = safeJID(selected_row.jid).bare + else: + return + self.on_slash() + self.input.text = '/name "%s" ' % jid + self.input.key_end() + self.input.refresh() + + @refresh_wrapper.always + def start_search(self): + """ + Start the search. The input should appear with a short instruction + in it. + """ + if isinstance(self.input, windows.YesNoInput): + return + curses.curs_set(1) + self.input = windows.CommandInput("[Search]", self.on_search_terminate, self.on_search_terminate, self.set_roster_filter) + self.input.resize(1, self.width, self.height-1, 0) + self.input.disable_history() + roster.modified() + self.refresh() + return True + + @refresh_wrapper.always + def start_search_slow(self): + if isinstance(self.input, windows.YesNoInput): + return + curses.curs_set(1) + self.input = windows.CommandInput("[Search]", self.on_search_terminate, self.on_search_terminate, self.set_roster_filter_slow) + self.input.resize(1, self.width, self.height-1, 0) + self.input.disable_history() + return True + + def set_roster_filter_slow(self, txt): + roster.contact_filter = (jid_and_name_match_slow, txt) + roster.modified() + self.refresh() + return False + + def set_roster_filter(self, txt): + roster.contact_filter = (jid_and_name_match, txt) + roster.modified() + self.refresh() + return False + + @refresh_wrapper.always + def on_search_terminate(self, txt): + curses.curs_set(0) + roster.contact_filter = None + self.reset_help_message() + roster.modified() + return True + + def on_close(self): + return + +def diffmatch(search, string): + """ + Use difflib and a loop to check if search_pattern can + be 'almost' found INSIDE a string. + 'almost' being defined by difflib + """ + if len(search) > len(string): + return False + l = len(search) + ratio = 0.7 + for i in range(len(string) - l + 1): + if difflib.SequenceMatcher(None, search, string[i:i+l]).ratio() >= ratio: + return True + return False + +def jid_and_name_match(contact, txt): + """ + Match jid with text precisely + """ + if not txt: + return True + txt = txt.lower() + if txt in safeJID(contact.bare_jid).bare.lower(): + return True + if txt in contact.name.lower(): + return True + return False + +def jid_and_name_match_slow(contact, txt): + """ + A function used to know if a contact in the roster should + be shown in the roster + """ + if not txt: + return True # Everything matches when search is empty + user = safeJID(contact.bare_jid).bare + if diffmatch(txt, user): + return True + if contact.name and diffmatch(txt, contact.name): + return True + return False -- cgit v1.2.3