From 332a5c2553db41de777473a1e1be9cd1522c9496 Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Thu, 31 Mar 2016 18:54:41 +0100 Subject: Move the src directory to poezio, for better cython compatibility. --- src/tabs/__init__.py | 13 - src/tabs/adhoc_commands_list.py | 57 -- src/tabs/basetabs.py | 881 -------------------- src/tabs/bookmarkstab.py | 145 ---- src/tabs/conversationtab.py | 484 ----------- src/tabs/data_forms.py | 75 -- src/tabs/listtab.py | 202 ----- src/tabs/muclisttab.py | 70 -- src/tabs/muctab.py | 1720 --------------------------------------- src/tabs/privatetab.py | 362 -------- src/tabs/rostertab.py | 1280 ----------------------------- src/tabs/xmltab.py | 360 -------- 12 files changed, 5649 deletions(-) delete mode 100644 src/tabs/__init__.py delete mode 100644 src/tabs/adhoc_commands_list.py delete mode 100644 src/tabs/basetabs.py delete mode 100644 src/tabs/bookmarkstab.py delete mode 100644 src/tabs/conversationtab.py delete mode 100644 src/tabs/data_forms.py delete mode 100644 src/tabs/listtab.py delete mode 100644 src/tabs/muclisttab.py delete mode 100644 src/tabs/muctab.py delete mode 100644 src/tabs/privatetab.py delete mode 100644 src/tabs/rostertab.py delete mode 100644 src/tabs/xmltab.py (limited to 'src/tabs') diff --git a/src/tabs/__init__.py b/src/tabs/__init__.py deleted file mode 100644 index d0a881a6..00000000 --- a/src/tabs/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -from . basetabs import Tab, ChatTab, GapTab, OneToOneTab -from . basetabs import STATE_PRIORITY -from . rostertab import RosterInfoTab -from . muctab import MucTab, NS_MUC_USER -from . privatetab import PrivateTab -from . conversationtab import ConversationTab, StaticConversationTab,\ - DynamicConversationTab -from . xmltab import XMLTab -from . listtab import ListTab -from . muclisttab import MucListTab -from . adhoc_commands_list import AdhocCommandsListTab -from . data_forms import DataFormsTab -from . bookmarkstab import BookmarksTab diff --git a/src/tabs/adhoc_commands_list.py b/src/tabs/adhoc_commands_list.py deleted file mode 100644 index 10ebf22b..00000000 --- a/src/tabs/adhoc_commands_list.py +++ /dev/null @@ -1,57 +0,0 @@ -""" -A tab listing the ad-hoc commands on a specific JID. The user can -select one of them and start executing it, or just close the tab and do -nothing. -""" - -import logging -log = logging.getLogger(__name__) - -from . import ListTab - -from slixmpp.plugins.xep_0030.stanza.items import DiscoItem - -class AdhocCommandsListTab(ListTab): - plugin_commands = {} - plugin_keys = {} - - def __init__(self, jid): - ListTab.__init__(self, jid.full, - "“Enter”: execute selected command.", - 'Ad-hoc commands of JID %s (Loading)' % jid, - (('Node', 0), ('Description', 1))) - self.key_func['^M'] = self.execute_selected_command - - def execute_selected_command(self): - if not self.listview or not self.listview.get_selected_row(): - return - node, name, jid = self.listview.get_selected_row() - session = {'next': self.core.on_next_adhoc_step, - 'error': self.core.on_adhoc_error} - self.core.xmpp.plugin['xep_0050'].start_command(jid, node, session) - - def get_columns_sizes(self): - return {'Node': int(self.width * 3 / 8), - 'Description': int(self.width * 5 / 8)} - - def on_list_received(self, iq): - """ - Fill the listview with the value from the received iq - """ - if iq['type'] == 'error': - self.set_error(iq['error']['type'], iq['error']['code'], iq['error']['text']) - return - def get_items(): - substanza = iq['disco_items'] - for item in substanza['substanzas']: - if isinstance(item, DiscoItem): - yield item - items = [(item['node'], item['name'] or '', item['jid']) for item in get_items()] - self.listview.set_lines(items) - self.info_header.message = 'Ad-hoc commands of JID %s' % self.name - if self.core.current_tab() is self: - self.refresh() - else: - self.state = 'highlight' - self.refresh_tab_win() - self.core.doupdate() diff --git a/src/tabs/basetabs.py b/src/tabs/basetabs.py deleted file mode 100644 index bb0c0ea4..00000000 --- a/src/tabs/basetabs.py +++ /dev/null @@ -1,881 +0,0 @@ -""" -Module for the base Tabs - -The root class Tab defines the generic interface and attributes of a -tab. A tab organizes various Windows around the screen depending -of the tab specificity. If the tab shows messages, it will also -reference a buffer containing the messages. - -Each subclass should redefine its own refresh() and resize() method -according to its windows. - -This module also defines ChatTabs, the parent class for all tabs -revolving around chats. -""" - -import logging -log = logging.getLogger(__name__) - -import singleton -import string -import time -import weakref -from datetime import datetime, timedelta -from xml.etree import cElementTree as ET - -import core -import timed_events -import windows -import xhtml -from common import safeJID -from config import config -from decorators import refresh_wrapper -from logger import logger -from text_buffer import TextBuffer -from theming import get_theme, dump_tuple -from decorators import command_args_parser - -# getters for tab colors (lambdas, so that they are dynamic) -STATE_COLORS = { - 'disconnected': lambda: get_theme().COLOR_TAB_DISCONNECTED, - 'scrolled': lambda: get_theme().COLOR_TAB_SCROLLED, - 'nonempty': lambda: get_theme().COLOR_TAB_NONEMPTY, - 'joined': lambda: get_theme().COLOR_TAB_JOINED, - 'message': lambda: get_theme().COLOR_TAB_NEW_MESSAGE, - 'composing': lambda: get_theme().COLOR_TAB_COMPOSING, - 'highlight': lambda: get_theme().COLOR_TAB_HIGHLIGHT, - 'private': lambda: get_theme().COLOR_TAB_PRIVATE, - 'normal': lambda: get_theme().COLOR_TAB_NORMAL, - 'current': lambda: get_theme().COLOR_TAB_CURRENT, - 'attention': lambda: get_theme().COLOR_TAB_ATTENTION, - } -VERTICAL_STATE_COLORS = { - 'disconnected': lambda: get_theme().COLOR_VERTICAL_TAB_DISCONNECTED, - 'scrolled': lambda: get_theme().COLOR_VERTICAL_TAB_SCROLLED, - 'nonempty': lambda: get_theme().COLOR_VERTICAL_TAB_NONEMPTY, - 'joined': lambda: get_theme().COLOR_VERTICAL_TAB_JOINED, - 'message': lambda: get_theme().COLOR_VERTICAL_TAB_NEW_MESSAGE, - 'composing': lambda: get_theme().COLOR_VERTICAL_TAB_COMPOSING, - 'highlight': lambda: get_theme().COLOR_VERTICAL_TAB_HIGHLIGHT, - 'private': lambda: get_theme().COLOR_VERTICAL_TAB_PRIVATE, - 'normal': lambda: get_theme().COLOR_VERTICAL_TAB_NORMAL, - 'current': lambda: get_theme().COLOR_VERTICAL_TAB_CURRENT, - 'attention': lambda: get_theme().COLOR_VERTICAL_TAB_ATTENTION, - } - - -# priority of the different tab states when using Alt+e -# higher means more priority, < 0 means not selectable -STATE_PRIORITY = { - 'normal': -1, - 'current': -1, - 'disconnected': 0, - 'nonempty': 0.1, - 'scrolled': 0.5, - 'joined': 0.8, - 'composing': 0.9, - 'message': 1, - 'highlight': 2, - 'private': 2, - 'attention': 3 - } - -class Tab(object): - tab_core = None - size_manager = None - - plugin_commands = {} - plugin_keys = {} - def __init__(self): - if not hasattr(self, 'name'): - self.name = self.__class__.__name__ - self.input = None - self.closed = False - self._state = 'normal' - self._prev_state = None - - self.need_resize = False - self.key_func = {} # each tab should add their keys in there - # and use them in on_input - self.commands = {} # and their own commands - - - @property - def size(self): - if not Tab.size_manager: - Tab.size_manager = self.core.size - return Tab.size_manager - - @property - def core(self): - if not Tab.tab_core: - Tab.tab_core = singleton.Singleton(core.Core) - return Tab.tab_core - - @property - def nb(self): - for index, tab in enumerate(self.core.tabs): - if tab == self: - return index - return len(self.core.tabs) - - @property - def tab_win(self): - if not Tab.tab_core: - Tab.tab_core = singleton.Singleton(core.Core) - return Tab.tab_core.tab_win - - @property - def left_tab_win(self): - if not Tab.tab_core: - Tab.tab_core = singleton.Singleton(core.Core) - return Tab.tab_core.left_tab_win - - @staticmethod - def tab_win_height(): - """ - Returns 1 or 0, depending on if we are using the vertical tab list - or not. - """ - if config.get('enable_vertical_tab_list'): - return 0 - return 1 - - @property - def info_win(self): - return self.core.information_win - - @property - def color(self): - return STATE_COLORS[self._state]() - - @property - def vertical_color(self): - return VERTICAL_STATE_COLORS[self._state]() - - @property - def state(self): - return self._state - - @state.setter - def state(self, value): - if not value in STATE_COLORS: - log.debug("Invalid value for tab state: %s", value) - elif STATE_PRIORITY[value] < STATE_PRIORITY[self._state] and \ - value not in ('current', 'disconnected') and \ - not (self._state == 'scrolled' and value == 'disconnected'): - log.debug("Did not set state because of lower priority, asked: %s, kept: %s", value, self._state) - elif self._state == 'disconnected' and value not in ('joined', 'current'): - log.debug('Did not set state because disconnected tabs remain visible') - else: - self._state = value - if self._state == 'current': - self._prev_state = None - - def set_state(self, value): - self._state = value - - def save_state(self): - if self._state != 'composing': - self._prev_state = self._state - - def restore_state(self): - if self.state == 'composing' and self._prev_state: - self._state = self._prev_state - self._prev_state = None - elif not self._prev_state: - self._state = 'normal' - - @staticmethod - def resize(scr): - Tab.height, Tab.width = scr.getmaxyx() - windows.Win._tab_win = scr - - def missing_command_callback(self, command_name): - """ - Callback executed when a command is not found. - Returns True if the callback took care of displaying - the error message, False otherwise. - """ - return False - - def register_command(self, name, func, *, desc='', shortdesc='', completion=None, usage=''): - """ - Add a command - """ - if name in self.commands: - return - if not desc and shortdesc: - desc = shortdesc - self.commands[name] = core.Command(func, desc, completion, shortdesc, usage) - - def complete_commands(self, the_input): - """ - Does command completion on the specified input for both global and tab-specific - commands. - This should be called from the completion method (on tab, for example), passing - the input where completion is to be made. - It can completion the command name itself or an argument of the command. - Returns True if a completion was made, False else. - """ - txt = the_input.get_text() - # check if this is a command - if txt.startswith('/') and not txt.startswith('//'): - position = the_input.get_argument_position(quoted=False) - if position == 0: - words = ['/%s'% (name) for name in sorted(self.core.commands)] +\ - ['/%s' % (name) for name in sorted(self.commands)] - the_input.new_completion(words, 0) - # Do not try to cycle command completion if there was only - # one possibily. The next tab will complete the argument. - # Otherwise we would need to add a useless space before being - # able to complete the arguments. - hit_copy = set(the_input.hit_list) - while not hit_copy: - whitespace = the_input.text.find(' ') - if whitespace == -1: - whitespace = len(the_input.text) - the_input.text = the_input.text[:whitespace-1] + the_input.text[whitespace:] - the_input.new_completion(words, 0) - hit_copy = set(the_input.hit_list) - if len(hit_copy) == 1: - the_input.do_command(' ') - the_input.reset_completion() - return True - # check if we are in the middle of the command name - elif len(txt.split()) > 1 or\ - (txt.endswith(' ') and not the_input.last_completion): - command_name = txt.split()[0][1:] - if command_name in self.commands: - command = self.commands[command_name] - elif command_name in self.core.commands: - command = self.core.commands[command_name] - else: # Unknown command, cannot complete - return False - if command[2] is None: - return False # There's no completion function - else: - return command[2](the_input) - return False - - def execute_command(self, provided_text): - """ - Execute the command in the input and return False if - the input didn't contain a command - """ - txt = provided_text or self.input.key_enter() - if txt.startswith('/') and not txt.startswith('//') and\ - not txt.startswith('/me '): - command = txt.strip().split()[0][1:] - arg = txt[2+len(command):] # jump the '/' and the ' ' - func = None - if command in self.commands: # check tab-specific commands - func = self.commands[command][0] - elif command in self.core.commands: # check global commands - func = self.core.commands[command][0] - else: - low = command.lower() - if low in self.commands: - func = self.commands[low][0] - elif low in self.core.commands: - func = self.core.commands[low][0] - else: - if self.missing_command_callback is not None: - error_handled = self.missing_command_callback(low) - if not error_handled: - self.core.information("Unknown command (%s)" % - (command), - 'Error') - if command in ('correct', 'say'): # hack - arg = xhtml.convert_simple_to_full_colors(arg) - else: - arg = xhtml.clean_text_simple(arg) - if func: - if hasattr(self.input, "reset_completion"): - self.input.reset_completion() - func(arg) - return True - else: - return False - - def refresh_tab_win(self): - if config.get('enable_vertical_tab_list'): - if self.left_tab_win and not self.size.core_degrade_x: - self.left_tab_win.refresh() - elif not self.size.core_degrade_y: - self.tab_win.refresh() - - def refresh(self): - """ - Called on each screen refresh (when something has changed) - """ - pass - - def get_name(self): - """ - get the name of the tab - """ - return self.name - - def get_nick(self): - """ - Get the nick of the tab (defaults to its name) - """ - return self.name - - def get_text_window(self): - """ - Returns the principal TextWin window, if there's one - """ - return None - - def on_input(self, key, raw): - """ - raw indicates if the key should activate the associated command or not. - """ - pass - - def update_commands(self): - for c in self.plugin_commands: - if not c in self.commands: - self.commands[c] = self.plugin_commands[c] - - def update_keys(self): - for k in self.plugin_keys: - if not k in self.key_func: - self.key_func[k] = self.plugin_keys[k] - - def on_lose_focus(self): - """ - called when this tab loses the focus. - """ - self.state = 'normal' - - def on_gain_focus(self): - """ - called when this tab gains the focus. - """ - self.state = 'current' - - def on_scroll_down(self): - """ - Defines what happens when we scroll down - """ - pass - - def on_scroll_up(self): - """ - Defines what happens when we scroll up - """ - pass - - def on_line_up(self): - """ - Defines what happens when we scroll one line up - """ - pass - - def on_line_down(self): - """ - Defines what happens when we scroll one line up - """ - pass - - def on_half_scroll_down(self): - """ - Defines what happens when we scroll half a screen down - """ - pass - - def on_half_scroll_up(self): - """ - Defines what happens when we scroll half a screen up - """ - pass - - def on_info_win_size_changed(self): - """ - Called when the window with the informations is resized - """ - pass - - def on_close(self): - """ - Called when the tab is to be closed - """ - if self.input: - self.input.on_delete() - self.closed = True - - def matching_names(self): - """ - Returns a list of strings that are used to name a tab with the /win - command. For example you could switch to a tab that returns - ['hello', 'coucou'] using /win hel, or /win coucou - If not implemented in the tab, it just doesn’t match with anything. - """ - return [] - - def __del__(self): - log.debug('------ Closing tab %s', self.__class__.__name__) - -class GapTab(Tab): - - def __bool__(self): - return False - - def __len__(self): - return 0 - - @property - def name(self): - return '' - - def refresh(self): - log.debug('WARNING: refresh() called on a gap tab, this should not happen') - -class ChatTab(Tab): - """ - A tab containing a chat of any type. - Just use this class instead of Tab if the tab needs a recent-words completion - Also, ^M is already bound to on_enter - And also, add the /say command - """ - plugin_commands = {} - plugin_keys = {} - def __init__(self, jid=''): - Tab.__init__(self) - self.name = jid - self.text_win = None - self._text_buffer = TextBuffer() - self.chatstate = None # can be "active", "composing", "paused", "gone", "inactive" - # We keep a reference of the event that will set our chatstate to "paused", so that - # we can delete it or change it if we need to - self.timed_event_paused = None - # Keeps the last sent message to complete it easily in completion_correct, and to replace it. - self.last_sent_message = None - self.key_func['M-v'] = self.move_separator - self.key_func['M-h'] = self.scroll_separator - self.key_func['M-/'] = self.last_words_completion - self.key_func['^M'] = self.on_enter - self.register_command('say', self.command_say, - usage='', - shortdesc='Send the message.') - self.register_command('xhtml', self.command_xhtml, - usage='', - shortdesc='Send custom XHTML.') - self.register_command('clear', self.command_clear, - shortdesc='Clear the current buffer.') - self.register_command('correct', self.command_correct, - desc='Fix the last message with whatever you want.', - shortdesc='Correct the last message.', - completion=self.completion_correct) - self.chat_state = None - self.update_commands() - self.update_keys() - - # Get the logs - log_nb = config.get('load_log') - logs = self.load_logs(log_nb) - - if logs: - for message in logs: - self._text_buffer.add_message(**message) - - @property - def is_muc(self): - return False - - def load_logs(self, log_nb): - logs = logger.get_logs(safeJID(self.name).bare, log_nb) - return logs - - def log_message(self, txt, nickname, time=None, typ=1): - """ - Log the messages in the archives. - """ - name = safeJID(self.name).bare - if not logger.log_message(name, nickname, txt, date=time, typ=typ): - self.core.information('Unable to write in the log file', 'Error') - - def add_message(self, txt, time=None, nickname=None, forced_user=None, - nick_color=None, identifier=None, jid=None, history=None, - typ=1, highlight=False): - self.log_message(txt, nickname, time=time, typ=typ) - self._text_buffer.add_message(txt, time=time, - nickname=nickname, - highlight=highlight, - nick_color=nick_color, - history=history, - user=forced_user, - identifier=identifier, - jid=jid) - - def modify_message(self, txt, old_id, new_id, user=None, jid=None, nickname=None): - self.log_message(txt, nickname, typ=1) - message = self._text_buffer.modify_message(txt, old_id, new_id, time=time, user=user, jid=jid) - if message: - self.text_win.modify_message(old_id, message) - self.core.refresh_window() - return True - return False - - def last_words_completion(self): - """ - Complete the input with words recently said - """ - # build the list of the recent words - char_we_dont_want = string.punctuation+' ’„“”…«»' - words = list() - for msg in self._text_buffer.messages[:-40:-1]: - if not msg: - continue - txt = xhtml.clean_text(msg.txt) - for char in char_we_dont_want: - txt = txt.replace(char, ' ') - for word in txt.split(): - if len(word) >= 4 and word not in words: - words.append(word) - words.extend([word for word in config.get('words').split(':') if word]) - self.input.auto_completion(words, ' ', quotify=False) - - def on_enter(self): - txt = self.input.key_enter() - if txt: - if not self.execute_command(txt): - if txt.startswith('//'): - txt = txt[1:] - self.command_say(xhtml.convert_simple_to_full_colors(txt)) - self.cancel_paused_delay() - - @command_args_parser.raw - def command_xhtml(self, xhtml): - """" - /xhtml - """ - message = self.generate_xhtml_message(xhtml) - if message: - message.send() - - def generate_xhtml_message(self, arg): - if not arg: - return - try: - body = xhtml.clean_text(xhtml.xhtml_to_poezio_colors(arg)) - ET.fromstring(arg) - except: - self.core.information('Could not send custom xhtml', 'Error') - log.error('/xhtml: Unable to send custom xhtml', exc_info=True) - return - - msg = self.core.xmpp.make_message(self.get_dest_jid()) - msg['body'] = body - msg.enable('html') - msg['html']['body'] = arg - return msg - - def get_dest_jid(self): - return self.name - - @refresh_wrapper.always - def command_clear(self, ignored): - """ - /clear - """ - self._text_buffer.messages = [] - self.text_win.rebuild_everything(self._text_buffer) - - def send_chat_state(self, state, always_send=False): - """ - Send an empty chatstate message - """ - if not self.is_muc or self.joined: - if state in ('active', 'inactive', 'gone') and self.inactive and not always_send: - return - if (config.get_by_tabname('send_chat_states', self.general_jid) - and self.remote_wants_chatstates is not False): - msg = self.core.xmpp.make_message(self.get_dest_jid()) - msg['type'] = self.message_type - msg['chat_state'] = state - self.chat_state = state - msg.send() - return True - - def send_composing_chat_state(self, empty_after): - """ - Send the "active" or "composing" chatstate, depending - on the the current status of the input - """ - name = self.general_jid - if (config.get_by_tabname('send_chat_states', name) - and self.remote_wants_chatstates): - needed = 'inactive' if self.inactive else 'active' - self.cancel_paused_delay() - if not empty_after: - if self.chat_state != "composing": - self.send_chat_state("composing") - self.set_paused_delay(True) - elif empty_after and self.chat_state != needed: - self.send_chat_state(needed, True) - - def set_paused_delay(self, composing): - """ - we create a timed event that will put us to paused - in a few seconds - """ - if not config.get_by_tabname('send_chat_states', self.general_jid): - return - # First, cancel the delay if it already exists, before rescheduling - # it at a new date - self.cancel_paused_delay() - new_event = timed_events.DelayedEvent(4, self.send_chat_state, 'paused') - self.core.add_timed_event(new_event) - self.timed_event_paused = new_event - - def cancel_paused_delay(self): - """ - Remove that event from the list and set it to None. - Called for example when the input is emptied, or when the message - is sent - """ - if self.timed_event_paused is not None: - self.core.remove_timed_event(self.timed_event_paused) - self.timed_event_paused = None - - @command_args_parser.raw - def command_correct(self, line): - """ - /correct - """ - if not line: - self.core.command_help('correct') - return - if not self.last_sent_message: - self.core.information('There is no message to correct.') - return - self.command_say(line, correct=True) - - def completion_correct(self, the_input): - if self.last_sent_message and the_input.get_argument_position() == 1: - return the_input.auto_completion([self.last_sent_message['body']], '', quotify=False) - - @property - def inactive(self): - """Whether we should send inactive or active as a chatstate""" - return self.core.status.show in ('xa', 'away') or\ - (hasattr(self, 'directed_presence') and not self.directed_presence) - - def move_separator(self): - self.text_win.remove_line_separator() - self.text_win.add_line_separator(self._text_buffer) - self.text_win.refresh() - self.input.refresh() - - def get_conversation_messages(self): - return self._text_buffer.messages - - def check_scrolled(self): - if self.text_win.pos != 0: - self.state = 'scrolled' - - @command_args_parser.raw - def command_say(self, line, correct=False): - pass - - def on_line_up(self): - return self.text_win.scroll_up(1) - - def on_line_down(self): - return self.text_win.scroll_down(1) - - def on_scroll_up(self): - return self.text_win.scroll_up(self.text_win.height-1) - - def on_scroll_down(self): - return self.text_win.scroll_down(self.text_win.height-1) - - def on_half_scroll_up(self): - return self.text_win.scroll_up((self.text_win.height-1) // 2) - - def on_half_scroll_down(self): - return self.text_win.scroll_down((self.text_win.height-1) // 2) - - @refresh_wrapper.always - def scroll_separator(self): - self.text_win.scroll_to_separator() - -class OneToOneTab(ChatTab): - - def __init__(self, jid=''): - ChatTab.__init__(self, jid) - - # Set to true once the first disco is done - self.__initial_disco = False - # change this to True or False when - # we know that the remote user wants chatstates, or not. - # None means we don’t know yet, and we send only "active" chatstates - self._remote_wants_chatstates = None - self.remote_supports_attention = True - self.remote_supports_receipts = True - self.check_features() - - @property - def remote_wants_chatstates(self): - return self._remote_wants_chatstates - - @remote_wants_chatstates.setter - def remote_wants_chatstates(self, value): - old_value = self._remote_wants_chatstates - self._remote_wants_chatstates = value - if (old_value is None and value != None) or \ - (old_value != value and value != None): - ok = get_theme().CHAR_OK - nope = get_theme().CHAR_EMPTY - support = ok if value else nope - if value: - msg = '\x19%s}Contact supports chat states [%s].' - else: - msg = '\x19%s}Contact does not support chat states [%s].' - color = dump_tuple(get_theme().COLOR_INFORMATION_TEXT) - msg = msg % (color, support) - self.add_message(msg, typ=0) - self.core.refresh_window() - - def ack_message(self, msg_id, msg_jid): - """ - Ack a message - """ - new_msg = self._text_buffer.ack_message(msg_id, msg_jid) - if new_msg: - self.text_win.modify_message(msg_id, new_msg) - self.core.refresh_window() - - def nack_message(self, error, msg_id, msg_jid): - """ - Ack a message - """ - new_msg = self._text_buffer.nack_message(error, msg_id, msg_jid) - if new_msg: - self.text_win.modify_message(msg_id, new_msg) - self.core.refresh_window() - return True - return False - - @command_args_parser.raw - def command_xhtml(self, xhtml_data): - message = self.generate_xhtml_message(xhtml_data) - if message: - message['type'] = 'chat' - if self.remote_supports_receipts: - message._add_receipt = True - if self.remote_wants_chatstates: - message['chat_sate'] = 'active' - message.send() - body = xhtml.xhtml_to_poezio_colors(xhtml_data, force=True) - self._text_buffer.add_message(body, nickname=self.core.own_nick, - identifier=message['id'],) - self.refresh() - - def check_features(self): - "check the features supported by the other party" - if safeJID(self.get_dest_jid()).resource: - self.core.xmpp.plugin['xep_0030'].get_info( - jid=self.get_dest_jid(), timeout=5, - callback=self.features_checked) - - @command_args_parser.raw - def command_attention(self, message): - """/attention [message]""" - if message is not '': - self.command_say(message, attention=True) - else: - msg = self.core.xmpp.make_message(self.get_dest_jid()) - msg['type'] = 'chat' - msg['attention'] = True - msg.send() - - @command_args_parser.raw - def command_say(self, line, correct=False, attention=False): - pass - - def missing_command_callback(self, command_name): - if command_name not in ('correct', 'attention'): - return False - - if command_name == 'correct': - feature = 'message correction' - elif command_name == 'attention': - feature = 'attention requests' - msg = ('%s does not support %s, therefore the /%s ' - 'command is currently disabled in this tab.') - msg = msg % (self.name, feature, command_name) - self.core.information(msg, 'Info') - return True - - def _feature_attention(self, features): - "Check for the 'attention' features" - if 'urn:xmpp:attention:0' in features: - self.remote_supports_attention = True - self.register_command('attention', self.command_attention, - usage='[message]', - shortdesc='Request the attention.', - desc='Attention: Request the attention of ' - 'the contact. Can also send a message' - ' along with the attention.') - else: - self.remote_supports_attention = False - return self.remote_supports_attention - - def _feature_correct(self, features): - "Check for the 'correction' feature" - if not 'urn:xmpp:message-correct:0' in features: - if 'correct' in self.commands: - del self.commands['correct'] - elif not 'correct' in self.commands: - self.register_command('correct', self.command_correct, - desc='Fix the last message with whatever you want.', - shortdesc='Correct the last message.', - completion=self.completion_correct) - return 'correct' in self.commands - - def _feature_receipts(self, features): - "Check for the 'receipts' feature" - if 'urn:xmpp:receipts' in features: - self.remote_supports_receipts = True - else: - self.remote_supports_receipts = False - return self.remote_supports_receipts - - def features_checked(self, iq): - "Features check callback" - features = iq['disco_info'].get_features() or [] - before = ('correct' in self.commands, - self.remote_supports_attention, - self.remote_supports_receipts) - correct = self._feature_correct(features) - attention = self._feature_attention(features) - receipts = self._feature_receipts(features) - - if (correct, attention, receipts) == before and self.__initial_disco: - return - else: - self.__initial_disco = True - - if not (correct or attention or receipts): - return # don’t display anything - - ok = get_theme().CHAR_OK - nope = get_theme().CHAR_EMPTY - - correct = ok if correct else nope - attention = ok if attention else nope - receipts = ok if receipts else nope - - msg = ('\x19%s}Contact supports: correction [%s], ' - 'attention [%s], receipts [%s].') - color = dump_tuple(get_theme().COLOR_INFORMATION_TEXT) - msg = msg % (color, correct, attention, receipts) - self.add_message(msg, typ=0) - self.core.refresh_window() - - diff --git a/src/tabs/bookmarkstab.py b/src/tabs/bookmarkstab.py deleted file mode 100644 index 7f5069ea..00000000 --- a/src/tabs/bookmarkstab.py +++ /dev/null @@ -1,145 +0,0 @@ -""" -Defines the data-forms Tab -""" - -import logging -log = logging.getLogger(__name__) - -import windows -from bookmarks import Bookmark, BookmarkList, stanza_storage -from tabs import Tab -from common import safeJID - - -class BookmarksTab(Tab): - """ - A tab displaying lines of bookmarks, each bookmark having - a 4 widgets to set the jid/password/autojoin/storage method - """ - plugin_commands = {} - def __init__(self, bookmarks: BookmarkList): - Tab.__init__(self) - self.name = "Bookmarks" - self.bookmarks = bookmarks - self.new_bookmarks = [] - self.removed_bookmarks = [] - self.header_win = windows.ColumnHeaderWin(('room@server/nickname', - 'password', - 'autojoin', - 'storage')) - self.bookmarks_win = windows.BookmarksWin(self.bookmarks, - self.height-4, - self.width, 1, 0) - self.help_win = windows.HelpText('Ctrl+Y: save, Ctrl+G: cancel, ' - '↑↓: change lines, tab: change ' - 'column, M-a: add bookmark, C-k' - ': delete bookmark') - self.info_header = windows.BookmarksInfoWin() - self.key_func['KEY_UP'] = self.bookmarks_win.go_to_previous_line_input - self.key_func['KEY_DOWN'] = self.bookmarks_win.go_to_next_line_input - self.key_func['^I'] = self.bookmarks_win.go_to_next_horizontal_input - self.key_func['^G'] = self.on_cancel - self.key_func['^Y'] = self.on_save - self.key_func['M-a'] = self.add_bookmark - self.key_func['^K'] = self.del_bookmark - self.resize() - self.update_commands() - - def add_bookmark(self): - new_bookmark = Bookmark(safeJID('room@example.tld/nick'), method='local') - self.new_bookmarks.append(new_bookmark) - self.bookmarks_win.add_bookmark(new_bookmark) - - def del_bookmark(self): - current = self.bookmarks_win.del_current_bookmark() - if current in self.new_bookmarks: - self.new_bookmarks.remove(current) - else: - self.removed_bookmarks.append(current) - - def on_cancel(self): - self.core.close_tab() - return True - - def on_save(self): - self.bookmarks_win.save() - if find_duplicates(self.new_bookmarks): - self.core.information('Duplicate bookmarks in list (saving aborted)', 'Error') - return - for bm in self.new_bookmarks: - if safeJID(bm.jid): - if not self.bookmarks[bm.jid]: - self.bookmarks.append(bm) - else: - self.core.information('Invalid JID for bookmark: %s/%s' % (bm.jid, bm.nick), 'Error') - return - - for bm in self.removed_bookmarks: - if bm in self.bookmarks: - self.bookmarks.remove(bm) - - def send_cb(success): - if success: - self.core.information('Bookmarks saved.', 'Info') - else: - self.core.information('Remote bookmarks not saved.', 'Error') - log.debug('alerte %s', str(stanza_storage(self.bookmarks.bookmarks))) - self.bookmarks.save(self.core.xmpp, callback=send_cb) - self.core.close_tab() - return True - - def on_input(self, key, raw=False): - if key in self.key_func: - res = self.key_func[key]() - if res: - return res - self.bookmarks_win.refresh_current_input() - else: - self.bookmarks_win.on_input(key) - - def resize(self): - self.need_resize = False - self.header_win.resize_columns({ - 'room@server/nickname': self.width//3, - 'password': self.width//3, - 'autojoin': self.width//6, - 'storage': self.width//6 - }) - info_height = self.core.information_win_size - tab_height = Tab.tab_win_height() - self.header_win.resize(1, self.width, 0, 0) - self.bookmarks_win.resize(self.height - 3 - tab_height - info_height, - self.width, 1, 0) - self.help_win.resize(1, self.width, self.height - 1, 0) - self.info_header.resize(1, self.width, - self.height - 2 - tab_height - info_height, 0) - - def on_info_win_size_changed(self): - if self.core.information_win_size >= self.height - 3: - return - info_height = self.core.information_win_size - tab_height = Tab.tab_win_height() - self.bookmarks_win.resize(self.height - 3 - tab_height - info_height, - self.width, 1, 0) - self.info_header.resize(1, self.width, - self.height - 2 - tab_height - info_height, 0) - - def refresh(self): - if self.need_resize: - self.resize() - self.header_win.refresh() - self.refresh_tab_win() - self.help_win.refresh() - self.info_header.refresh(self.bookmarks.preferred) - self.info_win.refresh() - self.bookmarks_win.refresh() - - -def find_duplicates(bm_list): - jids = set() - for bookmark in bm_list: - if bookmark.jid in jids: - return True - jids.add(bookmark.jid) - return False - diff --git a/src/tabs/conversationtab.py b/src/tabs/conversationtab.py deleted file mode 100644 index 1d8c60a4..00000000 --- a/src/tabs/conversationtab.py +++ /dev/null @@ -1,484 +0,0 @@ -""" -Module for the ConversationTabs - -A ConversationTab is a direct chat between two JIDs, outside of a room. - -There are two different instances of a ConversationTab: -- A DynamicConversationTab that implements XEP-0296 (best practices for - resource locking), which means it will switch the resource it is - focused on depending on the presences received. This is the default. -- A StaticConversationTab that will stay focused on one resource all - the time. - -""" -import logging -log = logging.getLogger(__name__) - -import curses - -from . basetabs import OneToOneTab, Tab - -import common -import fixes -import windows -import xhtml -from common import safeJID -from config import config -from decorators import refresh_wrapper -from roster import roster -from theming import get_theme, dump_tuple -from decorators import command_args_parser - -class ConversationTab(OneToOneTab): - """ - The tab containg a normal conversation (not from a MUC) - Must not be instantiated, use Static or Dynamic version only. - """ - plugin_commands = {} - plugin_keys = {} - additional_informations = {} - message_type = 'chat' - def __init__(self, jid): - OneToOneTab.__init__(self, jid) - self.nick = None - self.nick_sent = False - self.state = 'normal' - self.name = jid # a conversation tab is linked to one specific full jid OR bare jid - self.text_win = windows.TextWin() - self._text_buffer.add_window(self.text_win) - self.upper_bar = windows.ConversationStatusMessageWin() - self.input = windows.MessageInput() - # keys - self.key_func['^I'] = self.completion - # commands - self.register_command('unquery', self.command_unquery, - shortdesc='Close the tab.') - self.register_command('close', self.command_unquery, - shortdesc='Close the tab.') - self.register_command('version', self.command_version, - desc='Get the software version of the current interlocutor (usually its XMPP client and Operating System).', - shortdesc='Get the software version of the user.') - self.register_command('info', self.command_info, - shortdesc='Get the status of the contact.') - self.register_command('last_activity', self.command_last_activity, - usage='[jid]', - desc='Get the last activity of the given or the current contact.', - shortdesc='Get the activity.', - completion=self.core.completion_last_activity) - self.resize() - self.update_commands() - self.update_keys() - - @property - def general_jid(self): - return safeJID(self.name).bare - - @staticmethod - def add_information_element(plugin_name, callback): - """ - Lets a plugin add its own information to the ConversationInfoWin - """ - ConversationTab.additional_informations[plugin_name] = callback - - @staticmethod - def remove_information_element(plugin_name): - del ConversationTab.additional_informations[plugin_name] - - def completion(self): - self.complete_commands(self.input) - - @command_args_parser.raw - def command_say(self, line, attention=False, correct=False): - msg = self.core.xmpp.make_message(self.get_dest_jid()) - msg['type'] = 'chat' - msg['body'] = line - if not self.nick_sent: - msg['nick'] = self.core.own_nick - self.nick_sent = True - # trigger the event BEFORE looking for colors. - # and before displaying the message in the window - # This lets a plugin insert \x19xxx} colors, that will - # be converted in xhtml. - self.core.events.trigger('conversation_say', msg, self) - if not msg['body']: - self.cancel_paused_delay() - self.text_win.refresh() - self.input.refresh() - return - replaced = False - if correct or msg['replace']['id']: - msg['replace']['id'] = self.last_sent_message['id'] - if config.get_by_tabname('group_corrections', self.name): - try: - self.modify_message(msg['body'], self.last_sent_message['id'], msg['id'], jid=self.core.xmpp.boundjid, - nickname=self.core.own_nick) - replaced = True - except: - log.error('Unable to correct a message', exc_info=True) - else: - del msg['replace'] - if msg['body'].find('\x19') != -1: - msg.enable('html') - msg['html']['body'] = xhtml.poezio_colors_to_html(msg['body']) - msg['body'] = xhtml.clean_text(msg['body']) - if (config.get_by_tabname('send_chat_states', self.general_jid) and - self.remote_wants_chatstates is not False): - needed = 'inactive' if self.inactive else 'active' - msg['chat_state'] = needed - if attention and self.remote_supports_attention: - msg['attention'] = True - self.core.events.trigger('conversation_say_after', msg, self) - if not msg['body']: - self.cancel_paused_delay() - self.text_win.refresh() - self.input.refresh() - return - if not replaced: - self.add_message(msg['body'], - nickname=self.core.own_nick, - nick_color=get_theme().COLOR_OWN_NICK, - identifier=msg['id'], - jid=self.core.xmpp.boundjid, - typ=1) - - self.last_sent_message = msg - if self.remote_supports_receipts: - msg._add_receipt = True - msg.send() - self.cancel_paused_delay() - self.text_win.refresh() - self.input.refresh() - - @command_args_parser.quoted(0, 1) - def command_last_activity(self, args): - """ - /last_activity [jid] - """ - if args and args[0]: - return self.core.command_last_activity(args[0]) - - def callback(iq): - if iq['type'] != 'result': - if iq['error']['type'] == 'auth': - self.core.information('You are not allowed to see the activity of this contact.', 'Error') - else: - self.core.information('Error retrieving the activity', 'Error') - return - seconds = iq['last_activity']['seconds'] - status = iq['last_activity']['status'] - from_ = iq['from'] - msg = '\x19%s}The last activity of %s was %s ago%s' - if not safeJID(from_).user: - msg = '\x19%s}The uptime of %s is %s.' % ( - dump_tuple(get_theme().COLOR_INFORMATION_TEXT), - from_, - common.parse_secs_to_str(seconds)) - else: - msg = '\x19%s}The last activity of %s was %s ago%s' % ( - dump_tuple(get_theme().COLOR_INFORMATION_TEXT), - from_, - common.parse_secs_to_str(seconds), - (' and his/her last status was %s' % status) if status else '',) - self.add_message(msg) - self.core.refresh_window() - - self.core.xmpp.plugin['xep_0012'].get_last_activity(self.get_dest_jid(), callback=callback) - - @refresh_wrapper.conditional - @command_args_parser.ignored - def command_info(self): - contact = roster[self.get_dest_jid()] - jid = safeJID(self.get_dest_jid()) - if contact: - if jid.resource: - resource = contact[jid.full] - else: - resource = contact.get_highest_priority_resource() - else: - resource = None - if resource: - status = ('Status: %s' % resource.status) if resource.status else '' - self._text_buffer.add_message("\x19%(info_col)s}Show: %(show)s, %(status)s\x19o" % { - 'show': resource.show or 'available', 'status': status, 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)}) - return True - else: - self._text_buffer.add_message("\x19%(info_col)s}No information available\x19o" % {'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)}) - return True - - @command_args_parser.ignored - def command_unquery(self): - self.core.close_tab() - - @command_args_parser.quoted(0, 1) - def command_version(self, args): - """ - /version [jid] - """ - def callback(res): - if not res: - return self.core.information('Could not get the software version from %s' % (jid,), 'Warning') - version = '%s is running %s version %s on %s' % (jid, - res.get('name') or 'an unknown software', - res.get('version') or 'unknown', - res.get('os') or 'an unknown platform') - self.core.information(version, 'Info') - if args: - return self.core.command_version(args[0]) - jid = safeJID(self.name) - if not jid.resource: - if jid in roster: - resource = roster[jid].get_highest_priority_resource() - jid = resource.jid if resource else jid - fixes.get_version(self.core.xmpp, jid, - callback=callback) - - def resize(self): - self.need_resize = False - if self.size.tab_degrade_y: - display_bar = False - info_win_height = 0 - tab_win_height = 0 - bar_height = 0 - else: - display_bar = True - info_win_height = self.core.information_win_size - tab_win_height = Tab.tab_win_height() - bar_height = 1 - - self.text_win.resize(self.height - 2 - bar_height - info_win_height - - tab_win_height, - self.width, bar_height, 0) - self.text_win.rebuild_everything(self._text_buffer) - if display_bar: - self.upper_bar.resize(1, self.width, 0, 0) - self.info_header.resize(1, self.width, - self.height - 2 - info_win_height - - tab_win_height, - 0) - self.input.resize(1, self.width, self.height - 1, 0) - - def refresh(self): - if self.need_resize: - self.resize() - log.debug(' TAB Refresh: %s', self.__class__.__name__) - display_bar = display_info_win = not self.size.tab_degrade_y - - self.text_win.refresh() - - if display_bar: - self.upper_bar.refresh(self.get_dest_jid(), roster[self.get_dest_jid()]) - self.info_header.refresh(self.get_dest_jid(), roster[self.get_dest_jid()], self.text_win, self.chatstate, ConversationTab.additional_informations) - - if display_info_win: - self.info_win.refresh() - self.refresh_tab_win() - self.input.refresh() - - def refresh_info_header(self): - self.info_header.refresh(self.get_dest_jid(), roster[self.get_dest_jid()], - self.text_win, self.chatstate, ConversationTab.additional_informations) - self.input.refresh() - - def get_nick(self): - jid = safeJID(self.name) - contact = roster[jid.bare] - if contact: - return contact.name or jid.user - else: - if self.nick: - return self.nick - return jid.user - - def on_input(self, key, raw): - if not raw and key in self.key_func: - self.key_func[key]() - return False - self.input.do_command(key, raw=raw) - empty_after = self.input.get_text() == '' or (self.input.get_text().startswith('/') and not self.input.get_text().startswith('//')) - self.send_composing_chat_state(empty_after) - return False - - def on_lose_focus(self): - contact = roster[self.get_dest_jid()] - jid = safeJID(self.get_dest_jid()) - if contact: - if jid.resource: - resource = contact[jid.full] - else: - resource = contact.get_highest_priority_resource() - else: - resource = None - if self.input.text: - self.state = 'nonempty' - else: - self.state = 'normal' - self.text_win.remove_line_separator() - self.text_win.add_line_separator(self._text_buffer) - if (config.get_by_tabname('send_chat_states', self.general_jid) - and (not self.input.get_text() - or not self.input.get_text().startswith('//'))): - if resource: - self.send_chat_state('inactive') - self.check_scrolled() - - def on_gain_focus(self): - contact = roster[self.get_dest_jid()] - jid = safeJID(self.get_dest_jid()) - if contact: - if jid.resource: - resource = contact[jid.full] - else: - resource = contact.get_highest_priority_resource() - else: - resource = None - - self.state = 'current' - curses.curs_set(1) - if (config.get_by_tabname('send_chat_states', self.general_jid) - and (not self.input.get_text() - or not self.input.get_text().startswith('//'))): - if resource: - self.send_chat_state('active') - - def on_info_win_size_changed(self): - if self.core.information_win_size >= self.height-3: - return - self.text_win.resize(self.height-3-self.core.information_win_size - Tab.tab_win_height(), self.width, 1, 0) - self.info_header.resize(1, self.width, self.height-2-self.core.information_win_size - Tab.tab_win_height(), 0) - - def get_text_window(self): - return self.text_win - - def on_close(self): - Tab.on_close(self) - if config.get_by_tabname('send_chat_states', self.general_jid): - self.send_chat_state('gone') - - def matching_names(self): - res = [] - jid = safeJID(self.name) - res.append((2, jid.bare)) - res.append((1, jid.user)) - contact = roster[self.name] - if contact and contact.name: - res.append((0, contact.name)) - return res - -class DynamicConversationTab(ConversationTab): - """ - A conversation tab associated with one bare JID that can be “locked” to - a full jid, and unlocked, as described in the XEP-0296. - Only one DynamicConversationTab can be opened for a given jid. - """ - def __init__(self, jid, resource=None): - self.locked_resource = None - self.name = safeJID(jid).bare - if resource: - self.lock(resource) - self.info_header = windows.DynamicConversationInfoWin() - ConversationTab.__init__(self, jid) - self.register_command('unlock', self.unlock_command, - shortdesc='Unlock the conversation from a particular resource.') - - def lock(self, resource): - """ - Lock the tab to the resource. - """ - assert(resource) - if resource != self.locked_resource: - self.locked_resource = resource - info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT) - jid_c = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID) - - message = ('%(info)sConversation locked to ' - '%(jid_c)s%(jid)s/%(resource)s%(info)s.') % { - 'info': info, - 'jid_c': jid_c, - 'jid': self.name, - 'resource': resource} - self.add_message(message, typ=0) - self.check_features() - - def unlock_command(self, arg=None): - self.unlock() - self.refresh_info_header() - - def unlock(self, from_=None): - """ - Unlock the tab from a resource. It is now “associated” with the bare - jid. - """ - self.remote_wants_chatstates = None - if self.locked_resource != None: - self.locked_resource = None - info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT) - jid_c = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID) - - if from_: - message = ('%(info)sConversation unlocked (received activity' - ' from %(jid_c)s%(jid)s%(info)s).') % { - 'info': info, - 'jid_c': jid_c, - 'jid': from_} - self.add_message(message, typ=0) - else: - message = '%sConversation unlocked.' % info - self.add_message(message, typ=0) - - def get_dest_jid(self): - """ - Returns the full jid (using the locked resource), or the bare jid if - the conversation is not locked. - """ - if self.locked_resource: - return "%s/%s" % (self.name, self.locked_resource) - return self.name - - def refresh(self): - """ - Different from the parent class only for the info_header object. - """ - if self.need_resize: - self.resize() - log.debug(' TAB Refresh: %s', self.__class__.__name__) - display_bar = display_info_win = not self.size.tab_degrade_y - - self.text_win.refresh() - if display_bar: - self.upper_bar.refresh(self.name, roster[self.name]) - if self.locked_resource: - displayed_jid = "%s/%s" % (self.name, self.locked_resource) - else: - displayed_jid = self.name - self.info_header.refresh(displayed_jid, roster[self.name], - self.text_win, self.chatstate, - ConversationTab.additional_informations) - if display_info_win: - self.info_win.refresh() - - self.refresh_tab_win() - self.input.refresh() - - def refresh_info_header(self): - """ - Different from the parent class only for the info_header object. - """ - if self.locked_resource: - displayed_jid = "%s/%s" % (self.name, self.locked_resource) - else: - displayed_jid = self.name - self.info_header.refresh(displayed_jid, roster[self.name], - self.text_win, self.chatstate, ConversationTab.additional_informations) - self.input.refresh() - -class StaticConversationTab(ConversationTab): - """ - A conversation tab associated with one Full JID. It cannot be locked to - an different resource or unlocked. - """ - def __init__(self, jid): - assert(safeJID(jid).resource) - self.info_header = windows.ConversationInfoWin() - ConversationTab.__init__(self, jid) - - diff --git a/src/tabs/data_forms.py b/src/tabs/data_forms.py deleted file mode 100644 index 0fad2974..00000000 --- a/src/tabs/data_forms.py +++ /dev/null @@ -1,75 +0,0 @@ -""" -Defines the data-forms Tab -""" - -import logging -log = logging.getLogger(__name__) - -import windows -from tabs import Tab - -class DataFormsTab(Tab): - """ - A tab contaning various window type, displaying - a form that the user needs to fill. - """ - plugin_commands = {} - def __init__(self, form, on_cancel, on_send, kwargs): - Tab.__init__(self) - self._form = form - self._on_cancel = on_cancel - self._on_send = on_send - self._kwargs = kwargs - self.fields = [] - for field in self._form: - self.fields.append(field) - self.topic_win = windows.Topic() - self.form_win = windows.FormWin(form, self.height-4, self.width, 1, 0) - self.help_win = windows.HelpText("Ctrl+Y: send form, Ctrl+G: cancel") - self.help_win_dyn = windows.HelpText() - self.key_func['KEY_UP'] = self.form_win.go_to_previous_input - self.key_func['KEY_DOWN'] = self.form_win.go_to_next_input - self.key_func['^G'] = self.on_cancel - self.key_func['^Y'] = self.on_send - self.resize() - self.update_commands() - - def on_cancel(self): - self._on_cancel(self._form, **self._kwargs) - return True - - def on_send(self): - self._form.reply() - self.form_win.reply() - self._on_send(self._form, **self._kwargs) - return True - - def on_input(self, key, raw=False): - if key in self.key_func: - res = self.key_func[key]() - if res: - return res - self.help_win_dyn.refresh(self.form_win.get_help_message()) - self.form_win.refresh_current_input() - else: - self.form_win.on_input(key) - - def resize(self): - self.need_resize = False - self.topic_win.resize(1, self.width, 0, 0) - self.form_win.resize(self.height - 3 - Tab.tab_win_height(), - self.width, 1, 0) - self.help_win.resize(1, self.width, self.height - 1, 0) - self.help_win_dyn.resize(1, self.width, - self.height - 2 - Tab.tab_win_height(), 0) - self.lines = [] - - def refresh(self): - if self.need_resize: - self.resize() - self.topic_win.refresh(self._form['title']) - self.refresh_tab_win() - self.help_win.refresh() - self.help_win_dyn.refresh(self.form_win.get_help_message()) - self.form_win.refresh() - diff --git a/src/tabs/listtab.py b/src/tabs/listtab.py deleted file mode 100644 index 4d8bab9c..00000000 --- a/src/tabs/listtab.py +++ /dev/null @@ -1,202 +0,0 @@ -""" -A generic tab that displays a serie of items in a scrollable, searchable, -sortable list. It should be inherited, to actually provide methods that -insert items in the list, and that lets the user interact with them. -""" - -import logging -log = logging.getLogger(__name__) - -import curses -import collections - -import windows -from common import safeJID -from decorators import refresh_wrapper - -from . import Tab - - -class ListTab(Tab): - plugin_commands = {} - plugin_keys = {} - - def __init__(self, name, help_message, header_text, cols): - """Parameters: - name: The name of the tab - help_message: The default help message displayed instead of the - input - header_text: The text displayed on the header line, at the top of - the tab - cols: a tuple of 2-tuples. e.g. (('column1_name', number), - ('column2_name', number)) - """ - Tab.__init__(self) - self.state = 'normal' - self.name = name - columns = collections.OrderedDict() - for col, num in cols: - columns[col] = num - self.list_header = windows.ColumnHeaderWin(list(columns)) - self.listview = windows.ListWin(columns) - self.info_header = windows.MucListInfoWin(header_text) - self.default_help_message = windows.HelpText(help_message) - self.input = self.default_help_message - self.key_func["KEY_DOWN"] = self.move_cursor_down - self.key_func["KEY_UP"] = self.move_cursor_up - self.key_func['^I'] = self.completion - self.key_func["/"] = self.on_slash - self.key_func['KEY_LEFT'] = self.list_header.sel_column_left - self.key_func['KEY_RIGHT'] = self.list_header.sel_column_right - self.key_func[' '] = self.sort_by - self.register_command('close', self.close, - shortdesc='Close this tab.') - self.resize() - self.update_keys() - self.update_commands() - - def get_columns_sizes(self): - """ - Must be implemented in subclasses. Must return a dict like this: - {'column1_name': size1, - 'column2_name': size2} - Where the size are calculated based on the size of the tab etc - """ - raise NotImplementedError - - - def refresh(self): - if self.need_resize: - self.resize() - log.debug(' TAB Refresh: %s', self.__class__.__name__) - if self.size.tab_degrade_y: - display_info_win = False - else: - display_info_win = True - - self.info_header.refresh(window=self.listview) - if display_info_win: - self.info_win.refresh() - self.refresh_tab_win() - self.list_header.refresh() - self.listview.refresh() - self.input.refresh() - - def resize(self): - if self.size.tab_degrade_y: - info_win_height = 0 - tab_win_height = 0 - else: - info_win_height = self.core.information_win_size - tab_win_height = Tab.tab_win_height() - - self.info_header.resize(1, self.width, - self.height - 2 - info_win_height - - tab_win_height, - 0) - column_size = self.get_columns_sizes() - self.list_header.resize_columns(column_size) - self.list_header.resize(1, self.width, 0, 0) - self.listview.resize_columns(column_size) - self.listview.resize(self.height - 3 - info_win_height - tab_win_height, - self.width, 1, 0) - self.input.resize(1, self.width, self.height-1, 0) - - def on_slash(self): - """ - '/' is pressed, activate the input - """ - curses.curs_set(1) - self.input = windows.CommandInput("", self.reset_help_message, self.execute_slash_command) - self.input.resize(1, self.width, self.height-1, 0) - self.input.do_command("/") # we add the slash - - def close(self, arg=None): - self.input.on_delete() - self.core.close_tab(self) - - def set_error(self, msg, code, body): - """ - If there's an error (retrieving the values etc) - """ - self._error_message = 'Error: %(code)s - %(msg)s: %(body)s' % {'msg':msg, 'body':body, 'code':code} - self.info_header.message = self._error_message - self.info_header.refresh() - curses.doupdate() - - def sort_by(self): - if self.list_header.get_order(): - self.listview.sort_by_column( - col_name=self.list_header.get_sel_column(), - asc=False) - self.list_header.set_order(False) - self.list_header.refresh() - else: - self.listview.sort_by_column( - col_name=self.list_header.get_sel_column(), - asc=True) - self.list_header.set_order(True) - self.list_header.refresh() - self.core.doupdate() - - @refresh_wrapper.always - def reset_help_message(self, _=None): - if self.closed: - return True - curses.curs_set(0) - self.input = self.default_help_message - self.input.resize(1, self.width, self.height-1, 0) - return True - - def execute_slash_command(self, txt): - if txt.startswith('/'): - self.input.key_enter() - self.execute_command(txt) - return self.reset_help_message() - - def completion(self): - if isinstance(self.input, windows.Input): - self.complete_commands(self.input) - - def on_input(self, key, raw): - res = self.input.do_command(key, raw=raw) - if res and not isinstance(self.input, windows.Input): - return True - elif res: - return False - if not raw and key in self.key_func: - return self.key_func[key]() - - def on_info_win_size_changed(self): - if self.core.information_win_size >= self.height-3: - return - self.info_header.resize(1, self.width, self.height-2-self.core.information_win_size - Tab.tab_win_height(), 0) - self.listview.resize(self.height-3-self.core.information_win_size - Tab.tab_win_height(), self.width, 1, 0) - - def on_lose_focus(self): - self.state = 'normal' - - def on_gain_focus(self): - self.state = 'current' - curses.curs_set(0) - - def on_scroll_up(self): - return self.listview.scroll_up() - - def on_scroll_down(self): - return self.listview.scroll_down() - - def move_cursor_up(self): - self.listview.move_cursor_up() - self.listview.refresh() - self.core.doupdate() - - def move_cursor_down(self): - self.listview.move_cursor_down() - self.listview.refresh() - self.core.doupdate() - - def matching_names(self): - return [(2, self.name)] - - diff --git a/src/tabs/muclisttab.py b/src/tabs/muclisttab.py deleted file mode 100644 index 92d55190..00000000 --- a/src/tabs/muclisttab.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -A MucListTab is a tab listing the rooms on a conference server. - -It has no functionnality except scrolling the list, and allowing the -user to join the rooms. -""" -import logging -log = logging.getLogger(__name__) - -from . import ListTab - -from slixmpp.plugins.xep_0030.stanza.items import DiscoItem - -class MucListTab(ListTab): - """ - A tab listing rooms from a specific server, displaying various information, - scrollable, and letting the user join them, etc - """ - plugin_commands = {} - plugin_keys = {} - - def __init__(self, server): - ListTab.__init__(self, server, - "“j”: join room.", - 'Chatroom list on server %s (Loading)' % server, - (('node-part', 0), ('name', 2), ('users', 3))) - self.key_func['j'] = self.join_selected - self.key_func['J'] = self.join_selected_no_focus - self.key_func['^M'] = self.join_selected - - def get_columns_sizes(self): - return {'node-part': int(self.width* 2 / 8), - 'name': int(self.width * 5 / 8), - 'users': self.width - int(self.width * 2 / 8) - - int(self.width * 5 / 8)} - - def join_selected_no_focus(self): - return - - def on_muc_list_item_received(self, iq): - """ - Callback called when a disco#items result is received - Used with command_list - """ - if iq['type'] == 'error': - self.set_error(iq['error']['type'], iq['error']['code'], iq['error']['text']) - return - def get_items(): - substanza = iq['disco_items'] - for item in substanza['substanzas']: - if isinstance(item, DiscoItem): - yield (item['jid'], item['node'], item['name']) - items = [(item[0].split('@')[0], - item[0], - item[2] or '', '') for item in get_items()] - self.listview.set_lines(items) - self.info_header.message = 'Chatroom list on server %s' % self.name - if self.core.current_tab() is self: - self.refresh() - else: - self.state = 'highlight' - self.refresh_tab_win() - self.core.doupdate() - - def join_selected(self): - row = self.listview.get_selected_row() - if not row: - return - self.core.command_join(row[1]) - diff --git a/src/tabs/muctab.py b/src/tabs/muctab.py deleted file mode 100644 index 1f3ec6d8..00000000 --- a/src/tabs/muctab.py +++ /dev/null @@ -1,1720 +0,0 @@ -""" -Module for the MucTab - -A MucTab is a tab for multi-user chats as defined in XEP-0045. - -It keeps track of many things such as part/joins, maintains an -user list, and updates private tabs when necessary. -""" - -import logging -log = logging.getLogger(__name__) - -import bisect -import curses -import os -import random -import re -from datetime import datetime - -from . import ChatTab, Tab - -import common -import fixes -import multiuserchat as muc -import timed_events -import windows -import xhtml -from common import safeJID -from config import config -from decorators import refresh_wrapper, command_args_parser -from logger import logger -from roster import roster -from theming import get_theme, dump_tuple -from user import User - - -SHOW_NAME = { - 'dnd': 'busy', - 'away': 'away', - 'xa': 'not available', - 'chat': 'chatty', - '': 'available' - } - -NS_MUC_USER = 'http://jabber.org/protocol/muc#user' - - -class MucTab(ChatTab): - """ - The tab containing a multi-user-chat room. - It contains an userlist, an input, a topic, an information and a chat zone - """ - message_type = 'groupchat' - plugin_commands = {} - plugin_keys = {} - def __init__(self, jid, nick, password=None): - self.joined = False - ChatTab.__init__(self, jid) - if self.joined == False: - self._state = 'disconnected' - self.own_nick = nick - self.name = jid - self.password = password - self.users = [] - self.privates = [] # private conversations - self.topic = '' - self.topic_from = '' - self.remote_wants_chatstates = True - # Self ping event, so we can cancel it when we leave the room - self.self_ping_event = None - # We send active, composing and paused states to the MUC because - # the chatstate may or may not be filtered by the MUC, - # that’s not our problem. - self.topic_win = windows.Topic() - self.text_win = windows.TextWin() - self._text_buffer.add_window(self.text_win) - self.v_separator = windows.VerticalSeparator() - self.user_win = windows.UserList() - self.info_header = windows.MucInfoWin() - self.input = windows.MessageInput() - self.ignores = [] # set of Users - # keys - self.key_func['^I'] = self.completion - self.key_func['M-u'] = self.scroll_user_list_down - self.key_func['M-y'] = self.scroll_user_list_up - self.key_func['M-n'] = self.go_to_next_hl - self.key_func['M-p'] = self.go_to_prev_hl - # commands - self.register_command('ignore', self.command_ignore, - usage='', - desc='Ignore a specified nickname.', - shortdesc='Ignore someone', - completion=self.completion_ignore) - self.register_command('unignore', self.command_unignore, - usage='', - desc='Remove the specified nickname from the ignore list.', - shortdesc='Unignore someone.', - completion=self.completion_unignore) - self.register_command('kick', self.command_kick, - usage=' [reason]', - desc='Kick the user with the specified nickname.' - ' You also can give an optional reason.', - shortdesc='Kick someone.', - completion=self.completion_quoted) - self.register_command('ban', self.command_ban, - usage=' [reason]', - desc='Ban the user with the specified nickname.' - ' You also can give an optional reason.', - shortdesc='Ban someone', - completion=self.completion_quoted) - self.register_command('role', self.command_role, - usage=' [reason]', - desc='Set the role of an user. Roles can be:' - ' none, visitor, participant, moderator.' - ' You also can give an optional reason.', - shortdesc='Set the role of an user.', - completion=self.completion_role) - self.register_command('affiliation', self.command_affiliation, - usage=' ', - desc='Set the affiliation of an user. Affiliations can be:' - ' outcast, none, member, admin, owner.', - shortdesc='Set the affiliation of an user.', - completion=self.completion_affiliation) - self.register_command('topic', self.command_topic, - usage='', - desc='Change the subject of the room.', - shortdesc='Change the subject.', - completion=self.completion_topic) - self.register_command('query', self.command_query, - usage=' [message]', - desc='Open a private conversation with . This nick' - ' has to be present in the room you\'re currently in.' - ' If you specified a message after the nickname, it ' - 'will immediately be sent to this user.', - shortdesc='Query an user.', - completion=self.completion_quoted) - self.register_command('part', self.command_part, - usage='[message]', - desc='Disconnect from a room. You can' - ' specify an optional message.', - shortdesc='Leave the room.') - self.register_command('close', self.command_close, - usage='[message]', - desc='Disconnect from a room and close the tab.' - ' You can specify an optional message if ' - 'you are still connected.', - shortdesc='Close the tab.') - self.register_command('nick', self.command_nick, - usage='', - desc='Change your nickname in the current room.', - shortdesc='Change your nickname.', - completion=self.completion_nick) - self.register_command('recolor', self.command_recolor, - usage='[random]', - desc='Re-assign a color to all participants of the' - ' current room, based on the last time they talked.' - ' Use this if the participants currently talking ' - 'have too many identical colors. Use /recolor random' - ' for a non-deterministic result.', - shortdesc='Change the nicks colors.', - completion=self.completion_recolor) - self.register_command('color', self.command_color, - usage=' ', - desc='Fix a color for a nick. Use "unset" instead of a color' - ' to remove the attribution', - shortdesc='Fix a color for a nick.', - completion=self.completion_color) - self.register_command('cycle', self.command_cycle, - usage='[message]', - desc='Leave the current room and rejoin it immediately.', - shortdesc='Leave and re-join the room.') - self.register_command('info', self.command_info, - usage='', - desc='Display some information about the user ' - 'in the MUC: its/his/her role, affiliation,' - ' status and status message.', - shortdesc='Show an user\'s infos.', - completion=self.completion_info) - self.register_command('configure', self.command_configure, - desc='Configure the current room, through a form.', - shortdesc='Configure the room.') - self.register_command('version', self.command_version, - usage='', - desc='Get the software version of the given JID' - ' or nick in room (usually its XMPP client' - ' and Operating System).', - shortdesc='Get the software version of a jid.', - completion=self.completion_version) - self.register_command('names', self.command_names, - desc='Get the users in the room with their roles.', - shortdesc='List the users.') - self.register_command('invite', self.command_invite, - desc='Invite a contact to this room', - usage=' [reason]', - shortdesc='Invite a contact to this room', - completion=self.completion_invite) - - if self.core.xmpp.boundjid.server == "gmail.com": #gmail sucks - del self.commands["nick"] - - self.resize() - self.update_commands() - self.update_keys() - - @property - def general_jid(self): - return self.name - - @property - def is_muc(self): - return True - - @property - def last_connection(self): - last_message = self._text_buffer.last_message - if last_message: - return last_message.time - return None - - @refresh_wrapper.always - def go_to_next_hl(self): - """ - Go to the next HL in the room, or the last - """ - self.text_win.next_highlight() - - @refresh_wrapper.always - def go_to_prev_hl(self): - """ - Go to the previous HL in the room, or the first - """ - self.text_win.previous_highlight() - - def completion_version(self, the_input): - """Completion for /version""" - compare_users = lambda x: x.last_talked - userlist = [] - for user in sorted(self.users, key=compare_users, reverse=True): - if user.nick != self.own_nick: - userlist.append(user.nick) - comp = [] - for jid in (jid for jid in roster.jids() if len(roster[jid])): - for resource in roster[jid].resources: - comp.append(resource.jid) - comp.sort() - userlist.extend(comp) - - return the_input.auto_completion(userlist, quotify=False) - - def completion_info(self, the_input): - """Completion for /info""" - compare_users = lambda x: x.last_talked - userlist = [] - for user in sorted(self.users, key=compare_users, reverse=True): - userlist.append(user.nick) - return the_input.auto_completion(userlist, quotify=False) - - def completion_nick(self, the_input): - """Completion for /nick""" - nicks = [os.environ.get('USER'), - config.get('default_nick'), - self.core.get_bookmark_nickname(self.name)] - nicks = [i for i in nicks if i] - return the_input.auto_completion(nicks, '', quotify=False) - - def completion_recolor(self, the_input): - if the_input.get_argument_position() == 1: - return the_input.new_completion(['random'], 1, '', quotify=False) - return True - - def completion_color(self, the_input): - """Completion for /color""" - n = the_input.get_argument_position(quoted=True) - if n == 1: - userlist = [user.nick for user in self.users] - if self.own_nick in userlist: - userlist.remove(self.own_nick) - return the_input.new_completion(userlist, 1, '', quotify=True) - elif n == 2: - colors = [i for i in xhtml.colors if i] - colors.sort() - colors.append('unset') - colors.append('random') - return the_input.new_completion(colors, 2, '', quotify=False) - - def completion_ignore(self, the_input): - """Completion for /ignore""" - userlist = [user.nick for user in self.users] - if self.own_nick in userlist: - userlist.remove(self.own_nick) - userlist.sort() - return the_input.auto_completion(userlist, quotify=False) - - def completion_role(self, the_input): - """Completion for /role""" - n = the_input.get_argument_position(quoted=True) - if n == 1: - userlist = [user.nick for user in self.users] - if self.own_nick in userlist: - userlist.remove(self.own_nick) - return the_input.new_completion(userlist, 1, '', quotify=True) - elif n == 2: - possible_roles = ['none', 'visitor', 'participant', 'moderator'] - return the_input.new_completion(possible_roles, 2, '', - quotify=True) - - def completion_affiliation(self, the_input): - """Completion for /affiliation""" - n = the_input.get_argument_position(quoted=True) - if n == 1: - userlist = [user.nick for user in self.users] - if self.own_nick in userlist: - userlist.remove(self.own_nick) - jidlist = [user.jid.bare for user in self.users] - if self.core.xmpp.boundjid.bare in jidlist: - jidlist.remove(self.core.xmpp.boundjid.bare) - userlist.extend(jidlist) - return the_input.new_completion(userlist, 1, '', quotify=True) - elif n == 2: - possible_affiliations = ['none', 'member', 'admin', - 'owner', 'outcast'] - return the_input.new_completion(possible_affiliations, 2, '', - quotify=True) - - @command_args_parser.quoted(1, 1, ['']) - def command_invite(self, args): - """/invite [reason]""" - if args is None: - return self.core.command_help('invite') - jid, reason = args - self.core.command_invite('%s %s "%s"' % (jid, self.name, reason)) - - def completion_invite(self, the_input): - """Completion for /invite""" - n = the_input.get_argument_position(quoted=True) - if n == 1: - return the_input.new_completion(roster.jids(), 1, quotify=True) - - def scroll_user_list_up(self): - self.user_win.scroll_up() - self.user_win.refresh(self.users) - self.input.refresh() - - def scroll_user_list_down(self): - self.user_win.scroll_down() - self.user_win.refresh(self.users) - self.input.refresh() - - @command_args_parser.quoted(1) - def command_info(self, args): - """ - /info - """ - if args is None: - return self.core.command_help('info') - nick = args[0] - user = self.get_user_by_name(nick) - if not user: - return self.core.information("Unknown user: %s" % nick, "Error") - theme = get_theme() - inf = '\x19' + dump_tuple(theme.COLOR_INFORMATION_TEXT) + '}' - if user.jid: - user_jid = '%s (\x19%s}%s\x19o%s)' % ( - inf, - dump_tuple(theme.COLOR_MUC_JID), - user.jid, - inf) - else: - user_jid = '' - info = ('\x19%s}%s\x19o%s%s: show: \x19%s}%s\x19o%s, affiliation:' - ' \x19%s}%s\x19o%s, role: \x19%s}%s\x19o%s') % ( - dump_tuple(user.color), - nick, - user_jid, - inf, - dump_tuple(theme.color_show(user.show)), - user.show or 'Available', - inf, - dump_tuple(theme.color_role(user.role)), - user.affiliation or 'None', - inf, - dump_tuple(theme.color_role(user.role)), - user.role or 'None', - '\n%s' % user.status if user.status else '') - self.add_message(info, typ=0) - self.core.refresh_window() - - @command_args_parser.quoted(0) - def command_configure(self, ignored): - """ - /configure - """ - def on_form_received(form): - if not form: - self.core.information( - 'Could not retrieve the configuration form', - 'Error') - return - self.core.open_new_form(form, self.cancel_config, self.send_config) - - fixes.get_room_form(self.core.xmpp, self.name, on_form_received) - - def cancel_config(self, form): - """ - The user do not want to send his/her config, send an iq cancel - """ - muc.cancel_config(self.core.xmpp, self.name) - self.core.close_tab() - - def send_config(self, form): - """ - The user sends his/her config to the server - """ - muc.configure_room(self.core.xmpp, self.name, form) - self.core.close_tab() - - @command_args_parser.raw - def command_cycle(self, msg): - """/cycle [reason]""" - self.command_part(msg) - self.disconnect() - self.user_win.pos = 0 - self.core.disable_private_tabs(self.name) - self.join() - - def join(self): - """ - Join the room - """ - status = self.core.get_status() - if self.last_connection: - delta = datetime.now() - self.last_connection - seconds = delta.seconds + delta.days * 24 * 3600 - else: - seconds = 0 - muc.join_groupchat(self.core, self.name, self.own_nick, - self.password, - status=status.message, - show=status.show, - seconds=seconds) - - @command_args_parser.quoted(0, 1, ['']) - def command_recolor(self, args): - """ - /recolor [random] - Re-assign color to the participants of the room - """ - deterministic = config.get_by_tabname('deterministic_nick_colors', self.name) - if deterministic: - for user in self.users: - if user.nick == self.own_nick: - continue - color = self.search_for_color(user.nick) - if color != '': - continue - user.set_deterministic_color() - if args[0] == 'random': - self.core.information('"random" was provided, but poezio is ' - 'configured to use deterministic colors', - 'Warning') - self.user_win.refresh(self.users) - self.input.refresh() - return - compare_users = lambda x: x.last_talked - users = list(self.users) - sorted_users = sorted(users, key=compare_users, reverse=True) - full_sorted_users = sorted_users[:] - # search our own user, to remove it from the list - # Also remove users whose color is fixed - for user in full_sorted_users: - color = self.search_for_color(user.nick) - if user.nick == self.own_nick: - sorted_users.remove(user) - user.color = get_theme().COLOR_OWN_NICK - elif color != '': - sorted_users.remove(user) - user.change_color(color, deterministic) - colors = list(get_theme().LIST_COLOR_NICKNAMES) - if args[0] == 'random': - random.shuffle(colors) - for i, user in enumerate(sorted_users): - user.color = colors[i % len(colors)] - self.text_win.rebuild_everything(self._text_buffer) - self.user_win.refresh(self.users) - self.text_win.refresh() - self.input.refresh() - - @command_args_parser.quoted(2, 2, ['']) - def command_color(self, args): - """ - /color - Fix a color for a nick. - Use "unset" instead of a color to remove the attribution. - User "random" to attribute a random color. - """ - if args is None: - return self.core.command_help('color') - nick = args[0] - color = args[1].lower() - user = self.get_user_by_name(nick) - if not color in xhtml.colors and color not in ('unset', 'random'): - return self.core.information("Unknown color: %s" % color, 'Error') - if user and user.nick == self.own_nick: - return self.core.information("You cannot change the color of your" - " own nick.", 'Error') - if color == 'unset': - if config.remove_and_save(nick, 'muc_colors'): - self.core.information('Color for nick %s unset' % (nick)) - else: - if color == 'random': - color = random.choice(list(xhtml.colors)) - if user: - user.change_color(color) - config.set_and_save(nick, color, 'muc_colors') - nick_color_aliases = config.get_by_tabname('nick_color_aliases', self.name) - if nick_color_aliases: - # if any user in the room has a nick which is an alias of the - # nick, update its color - for tab in self.core.get_tabs(MucTab): - for u in tab.users: - nick_alias = re.sub('^_*', '', u.nick) - nick_alias = re.sub('_*$', '', nick_alias) - if nick_alias == nick: - u.change_color(color) - self.text_win.rebuild_everything(self._text_buffer) - self.user_win.refresh(self.users) - self.text_win.refresh() - self.input.refresh() - - @command_args_parser.quoted(1) - def command_version(self, args): - """ - /version - """ - def callback(res): - if not res: - return self.core.information('Could not get the software ' - 'version from %s' % (jid,), - 'Warning') - version = '%s is running %s version %s on %s' % ( - jid, - res.get('name') or 'an unknown software', - res.get('version') or 'unknown', - res.get('os') or 'an unknown platform') - self.core.information(version, 'Info') - if args is None: - return self.core.command_help('version') - nick = args[0] - if nick in [user.nick for user in self.users]: - jid = safeJID(self.name).bare - jid = safeJID(jid + '/' + nick) - else: - jid = safeJID(nick) - fixes.get_version(self.core.xmpp, jid, - callback=callback) - - @command_args_parser.quoted(1) - def command_nick(self, args): - """ - /nick - """ - if args is None: - return self.core.command_help('nick') - nick = args[0] - if not self.joined: - return self.core.information('/nick only works in joined rooms', - 'Info') - current_status = self.core.get_status() - if not safeJID(self.name + '/' + nick): - return self.core.information('Invalid nick', 'Info') - muc.change_nick(self.core, self.name, nick, - current_status.message, - current_status.show) - - @command_args_parser.quoted(0, 1, ['']) - def command_part(self, args): - """ - /part [msg] - """ - arg = args[0] - msg = None - if self.joined: - info_col = dump_tuple(get_theme().COLOR_INFORMATION_TEXT) - char_quit = get_theme().CHAR_QUIT - spec_col = dump_tuple(get_theme().COLOR_QUIT_CHAR) - - if config.get_by_tabname('display_user_color_in_join_part', - self.general_jid): - color = dump_tuple(get_theme().COLOR_OWN_NICK) - else: - color = 3 - - if arg: - msg = ('\x19%(color_spec)s}%(spec)s\x19%(info_col)s} ' - 'You (\x19%(color)s}%(nick)s\x19%(info_col)s})' - ' left the chatroom' - ' (\x19o%(reason)s\x19%(info_col)s})') % { - 'info_col': info_col, 'reason': arg, - 'spec': char_quit, 'color': color, - 'color_spec': spec_col, - 'nick': self.own_nick, - } - else: - msg = ('\x19%(color_spec)s}%(spec)s\x19%(info_col)s} ' - 'You (\x19%(color)s}%(nick)s\x19%(info_col)s})' - ' left the chatroom') % { - 'info_col': info_col, - 'spec': char_quit, 'color': color, - 'color_spec': spec_col, - 'nick': self.own_nick, - } - - self.add_message(msg, typ=2) - self.disconnect() - muc.leave_groupchat(self.core.xmpp, self.name, self.own_nick, arg) - self.core.disable_private_tabs(self.name, reason=msg) - if self == self.core.current_tab(): - self.refresh() - self.core.doupdate() - - @command_args_parser.raw - def command_close(self, msg): - """ - /close [msg] - """ - self.command_part(msg) - self.core.close_tab() - - @command_args_parser.quoted(1, 1) - def command_query(self, args): - """ - /query [message] - """ - if args is None: - return self.core.command_help('query') - nick = args[0] - r = None - for user in self.users: - if user.nick == nick: - r = self.core.open_private_window(self.name, user.nick) - if r and len(args) == 2: - msg = args[1] - self.core.current_tab().command_say( - xhtml.convert_simple_to_full_colors(msg)) - if not r: - self.core.information("Cannot find user: %s" % nick, 'Error') - - @command_args_parser.raw - def command_topic(self, subject): - """ - /topic [new topic] - """ - if not subject: - self._text_buffer.add_message( - "\x19%s}The subject of the room is: %s %s" % - (dump_tuple(get_theme().COLOR_INFORMATION_TEXT), - self.topic, - '(set by %s)' % self.topic_from if self.topic_from - else '')) - self.refresh() - return - - muc.change_subject(self.core.xmpp, self.name, subject) - - @command_args_parser.quoted(0) - def command_names(self, args): - """ - /names - """ - if not self.joined: - return - - aff = { - 'owner': get_theme().CHAR_AFFILIATION_OWNER, - 'admin': get_theme().CHAR_AFFILIATION_ADMIN, - 'member': get_theme().CHAR_AFFILIATION_MEMBER, - 'none': get_theme().CHAR_AFFILIATION_NONE, - } - - colors = {} - colors["visitor"] = dump_tuple(get_theme().COLOR_USER_VISITOR) - colors["moderator"] = dump_tuple(get_theme().COLOR_USER_MODERATOR) - colors["participant"] = dump_tuple(get_theme().COLOR_USER_PARTICIPANT) - color_other = dump_tuple(get_theme().COLOR_USER_NONE) - - buff = ['Users: %s \n' % len(self.users)] - for user in self.users: - affiliation = aff.get(user.affiliation, - get_theme().CHAR_AFFILIATION_NONE) - color = colors.get(user.role, color_other) - buff.append('\x19%s}%s\x19o\x19%s}%s\x19o' % ( - color, affiliation, dump_tuple(user.color), user.nick)) - - buff.append('\n') - message = ' '.join(buff) - - self._text_buffer.add_message(message) - self.text_win.refresh() - self.input.refresh() - - def completion_topic(self, the_input): - if the_input.get_argument_position() == 1: - return the_input.auto_completion([self.topic], '', quotify=False) - - def completion_quoted(self, the_input): - """Nick completion, but with quotes""" - if the_input.get_argument_position(quoted=True) == 1: - compare_users = lambda x: x.last_talked - word_list = [] - for user in sorted(self.users, key=compare_users, reverse=True): - if user.nick != self.own_nick: - word_list.append(user.nick) - - return the_input.new_completion(word_list, 1, quotify=True) - - @command_args_parser.quoted(1, 1) - def command_kick(self, args): - """ - /kick [reason] - """ - if args is None: - return self.core.command_help('kick') - if len(args) == 2: - msg = ' "%s"' % args[1] - else: - msg = '' - self.command_role('"'+args[0]+ '" none'+msg) - - @command_args_parser.quoted(1, 1) - def command_ban(self, args): - """ - /ban [reason] - """ - def callback(iq): - if iq['type'] == 'error': - self.core.room_error(iq, self.name) - if args is None: - return self.core.command_help('ban') - if len(args) > 1: - msg = args[1] - else: - msg = '' - nick = args[0] - - if nick in [user.nick for user in self.users]: - res = muc.set_user_affiliation(self.core.xmpp, self.name, - 'outcast', nick=nick, - callback=callback, reason=msg) - else: - res = muc.set_user_affiliation(self.core.xmpp, self.name, - 'outcast', jid=safeJID(nick), - callback=callback, reason=msg) - if not res: - self.core.information('Could not ban user', 'Error') - - @command_args_parser.quoted(2, 1, ['']) - def command_role(self, args): - """ - /role [reason] - Changes the role of an user - roles can be: none, visitor, participant, moderator - """ - def callback(iq): - if iq['type'] == 'error': - self.core.room_error(iq, self.name) - - if args is None: - return self.core.command_help('role') - - nick, role, reason = args[0], args[1].lower(), args[2] - - valid_roles = ('none', 'visitor', 'participant', 'moderator') - - if not self.joined or role not in valid_roles: - return self.core.information('The role must be one of ' + ', '.join(valid_roles), - 'Error') - - if not safeJID(self.name + '/' + nick): - return self.core.information('Invalid nick', 'Info') - muc.set_user_role(self.core.xmpp, self.name, nick, reason, role, - callback=callback) - - @command_args_parser.quoted(2) - def command_affiliation(self, args): - """ - /affiliation - Changes the affiliation of an user - affiliations can be: outcast, none, member, admin, owner - """ - def callback(iq): - if iq['type'] == 'error': - self.core.room_error(iq, self.name) - - if args is None: - return self.core.command_help('affiliation') - - nick, affiliation = args[0], args[1].lower() - - if not self.joined: - return - - valid_affiliations = ('outcast', 'none', 'member', 'admin', 'owner') - if affiliation not in valid_affiliations: - return self.core.information('The affiliation must be one of ' + ', '.join(valid_affiliations), - 'Error') - - if nick in [user.nick for user in self.users]: - res = muc.set_user_affiliation(self.core.xmpp, self.name, - affiliation, nick=nick, - callback=callback) - else: - res = muc.set_user_affiliation(self.core.xmpp, self.name, - affiliation, jid=safeJID(nick), - callback=callback) - if not res: - self.core.information('Could not set affiliation', 'Error') - - @command_args_parser.raw - def command_say(self, line, correct=False): - """ - /say - Or normal input + enter - """ - needed = 'inactive' if self.inactive else 'active' - msg = self.core.xmpp.make_message(self.name) - msg['type'] = 'groupchat' - msg['body'] = line - # trigger the event BEFORE looking for colors. - # This lets a plugin insert \x19xxx} colors, that will - # be converted in xhtml. - self.core.events.trigger('muc_say', msg, self) - if not msg['body']: - self.cancel_paused_delay() - self.text_win.refresh() - self.input.refresh() - return - if msg['body'].find('\x19') != -1: - msg.enable('html') - msg['html']['body'] = xhtml.poezio_colors_to_html(msg['body']) - msg['body'] = xhtml.clean_text(msg['body']) - if (config.get_by_tabname('send_chat_states', self.general_jid) - and self.remote_wants_chatstates is not False): - msg['chat_state'] = needed - if correct: - msg['replace']['id'] = self.last_sent_message['id'] - self.cancel_paused_delay() - self.core.events.trigger('muc_say_after', msg, self) - if not msg['body']: - self.cancel_paused_delay() - self.text_win.refresh() - self.input.refresh() - return - self.last_sent_message = msg - msg.send() - self.chat_state = needed - - @command_args_parser.raw - def command_xhtml(self, msg): - message = self.generate_xhtml_message(msg) - if message: - message['type'] = 'groupchat' - message.send() - - @command_args_parser.quoted(1) - def command_ignore(self, args): - """ - /ignore - """ - if args is None: - return self.core.command_help('ignore') - - nick = args[0] - user = self.get_user_by_name(nick) - if not user: - self.core.information('%s is not in the room' % nick) - elif user in self.ignores: - self.core.information('%s is already ignored' % nick) - else: - self.ignores.append(user) - self.core.information("%s is now ignored" % nick, 'info') - - @command_args_parser.quoted(1) - def command_unignore(self, args): - """ - /unignore - """ - if args is None: - return self.core.command_help('unignore') - - nick = args[0] - user = self.get_user_by_name(nick) - if not user: - self.core.information('%s is not in the room' % nick) - elif user not in self.ignores: - self.core.information('%s is not ignored' % nick) - else: - self.ignores.remove(user) - self.core.information('%s is now unignored' % nick) - - def completion_unignore(self, the_input): - if the_input.get_argument_position() == 1: - users = [user.nick for user in self.ignores] - return the_input.auto_completion(users, quotify=False) - - def resize(self): - """ - Resize the whole window. i.e. all its sub-windows - """ - self.need_resize = False - if config.get('hide_user_list') or self.size.tab_degrade_x: - display_user_list = False - text_width = self.width - else: - display_user_list = True - text_width = (self.width // 10) * 9 - - if self.size.tab_degrade_y: - display_info_win = False - tab_win_height = 0 - info_win_height = 0 - else: - display_info_win = True - tab_win_height = Tab.tab_win_height() - info_win_height = self.core.information_win_size - - - self.user_win.resize(self.height - 3 - info_win_height - - tab_win_height, - self.width - (self.width // 10) * 9 - 1, - 1, - (self.width // 10) * 9 + 1) - self.v_separator.resize(self.height - 3 - info_win_height - tab_win_height, - 1, 1, 9 * (self.width // 10)) - - self.topic_win.resize(1, self.width, 0, 0) - - self.text_win.resize(self.height - 3 - info_win_height - - tab_win_height, - text_width, 1, 0) - self.text_win.rebuild_everything(self._text_buffer) - self.info_header.resize(1, self.width, - self.height - 2 - info_win_height - - tab_win_height, - 0) - self.input.resize(1, self.width, self.height-1, 0) - - def refresh(self): - if self.need_resize: - self.resize() - log.debug(' TAB Refresh: %s', self.__class__.__name__) - if config.get('hide_user_list') or self.size.tab_degrade_x: - display_user_list = False - else: - display_user_list = True - display_info_win = not self.size.tab_degrade_y - - self.topic_win.refresh(self.get_single_line_topic()) - self.text_win.refresh() - if display_user_list: - self.v_separator.refresh() - self.user_win.refresh(self.users) - self.info_header.refresh(self, self.text_win) - self.refresh_tab_win() - if display_info_win: - self.info_win.refresh() - self.input.refresh() - - def on_input(self, key, raw): - if not raw and key in self.key_func: - self.key_func[key]() - return False - self.input.do_command(key, raw=raw) - empty_after = self.input.get_text() == '' - empty_after = empty_after or (self.input.get_text().startswith('/') - and not - self.input.get_text().startswith('//')) - self.send_composing_chat_state(empty_after) - return False - - def completion(self): - """ - Called when Tab is pressed, complete the nickname in the input - """ - if self.complete_commands(self.input): - return - - # If we are not completing a command or a command argument, - # complete a nick - compare_users = lambda x: x.last_talked - word_list = [] - for user in sorted(self.users, key=compare_users, reverse=True): - if user.nick != self.own_nick: - word_list.append(user.nick) - after = config.get('after_completion') + ' ' - input_pos = self.input.pos - if ' ' not in self.input.get_text()[:input_pos] or ( - self.input.last_completion and - self.input.get_text()[:input_pos] == - self.input.last_completion + after): - add_after = after - else: - if not config.get('add_space_after_completion'): - add_after = '' - else: - add_after = ' ' - self.input.auto_completion(word_list, add_after, quotify=False) - empty_after = self.input.get_text() == '' - empty_after = empty_after or (self.input.get_text().startswith('/') - and not - self.input.get_text().startswith('//')) - self.send_composing_chat_state(empty_after) - - def get_nick(self): - if not config.get('show_muc_jid'): - return safeJID(self.name).user - return self.name - - def get_text_window(self): - return self.text_win - - def on_lose_focus(self): - if self.joined: - if self.input.text: - self.state = 'nonempty' - else: - self.state = 'normal' - else: - self.state = 'disconnected' - self.text_win.remove_line_separator() - self.text_win.add_line_separator(self._text_buffer) - if (config.get_by_tabname('send_chat_states', self.general_jid) and - not self.input.get_text()): - self.send_chat_state('inactive') - self.check_scrolled() - - def on_gain_focus(self): - self.state = 'current' - if (self.text_win.built_lines and self.text_win.built_lines[-1] is None - and not config.get('show_useless_separator')): - self.text_win.remove_line_separator() - curses.curs_set(1) - if self.joined and config.get_by_tabname('send_chat_states', - self.general_jid) and not self.input.get_text(): - self.send_chat_state('active') - - def on_info_win_size_changed(self): - if self.core.information_win_size >= self.height-3: - return - if config.get("hide_user_list"): - text_width = self.width - else: - text_width = (self.width//10)*9 - self.user_win.resize(self.height - 3 - self.core.information_win_size - - Tab.tab_win_height(), - self.width - (self.width // 10) * 9 - 1, - 1, - (self.width // 10) * 9 + 1) - self.v_separator.resize(self.height - 3 - self.core.information_win_size - Tab.tab_win_height(), - 1, 1, 9 * (self.width // 10)) - self.text_win.resize(self.height - 3 - self.core.information_win_size - - Tab.tab_win_height(), - text_width, 1, 0) - self.info_header.resize(1, self.width, - self.height-2-self.core.information_win_size - - Tab.tab_win_height(), - 0) - - def handle_presence(self, presence): - from_nick = presence['from'].resource - from_room = presence['from'].bare - xpath = '{%s}x/{%s}status' % (NS_MUC_USER, NS_MUC_USER) - status_codes = set() - for status_code in presence.findall(xpath): - status_codes.add(status_code.attrib['code']) - - # Check if it's not an error presence. - if presence['type'] == 'error': - return self.core.room_error(presence, from_room) - affiliation = presence['muc']['affiliation'] - show = presence['show'] - status = presence['status'] - role = presence['muc']['role'] - jid = presence['muc']['jid'] - typ = presence['type'] - deterministic = config.get_by_tabname('deterministic_nick_colors', self.name) - color = self.search_for_color(from_nick) - if not self.joined: # user in the room BEFORE us. - # ignore redondant presence message, see bug #1509 - if (from_nick not in [user.nick for user in self.users] - and typ != "unavailable"): - new_user = User(from_nick, affiliation, show, - status, role, jid, deterministic, color) - bisect.insort_left(self.users, new_user) - self.core.events.trigger('muc_join', presence, self) - if '110' in status_codes or self.own_nick == from_nick: - # second part of the condition is a workaround for old - # ejabberd or every gateway in the world that just do - # not send a 110 status code with the presence - self.own_nick = from_nick - self.joined = True - if self.name in self.core.initial_joins: - self.core.initial_joins.remove(self.name) - self._state = 'normal' - elif self != self.core.current_tab(): - self._state = 'joined' - if (self.core.current_tab() is self - and self.core.status.show not in ('xa', 'away')): - self.send_chat_state('active') - new_user.color = get_theme().COLOR_OWN_NICK - - if config.get_by_tabname('display_user_color_in_join_part', - self.general_jid): - color = dump_tuple(new_user.color) - else: - color = 3 - - info_col = dump_tuple(get_theme().COLOR_INFORMATION_TEXT) - warn_col = dump_tuple(get_theme().COLOR_WARNING_TEXT) - spec_col = dump_tuple(get_theme().COLOR_JOIN_CHAR) - - self.add_message( - '\x19%(color_spec)s}%(spec)s\x19%(info_col)s} You ' - '(\x19%(nick_col)s}%(nick)s\x19%(info_col)s}) joined' - ' the chatroom' % - { - 'nick': from_nick, - 'spec': get_theme().CHAR_JOIN, - 'color_spec': spec_col, - 'nick_col': color, - 'info_col': info_col, - }, - typ=2) - if '201' in status_codes: - self.add_message( - '\x19%(info_col)s}Info: The room ' - 'has been created' % - {'info_col': info_col}, - typ=0) - if '170' in status_codes: - self.add_message( - '\x19%(warn_col)s}Warning:\x19%(info_col)s}' - ' This room is publicly logged' % - {'info_col': info_col, - 'warn_col': warn_col}, - typ=0) - if '100' in status_codes: - self.add_message( - '\x19%(warn_col)s}Warning:\x19%(info_col)s}' - ' This room is not anonymous.' % - {'info_col': info_col, - 'warn_col': warn_col}, - typ=0) - if self.core.current_tab() is not self: - self.refresh_tab_win() - self.core.current_tab().input.refresh() - self.core.doupdate() - self.core.enable_private_tabs(self.name) - # Enable the self ping event, to regularly check if we - # are still in the room. - self.enable_self_ping_event() - else: - change_nick = '303' in status_codes - kick = '307' in status_codes and typ == 'unavailable' - ban = '301' in status_codes and typ == 'unavailable' - shutdown = '332' in status_codes and typ == 'unavailable' - non_member = '322' in status_codes and typ == 'unavailable' - user = self.get_user_by_name(from_nick) - # New user - if not user: - self.core.events.trigger('muc_join', presence, self) - self.on_user_join(from_nick, affiliation, show, status, role, - jid, color) - # nick change - elif change_nick: - self.core.events.trigger('muc_nickchange', presence, self) - self.on_user_nick_change(presence, user, from_nick, from_room) - elif ban: - self.core.events.trigger('muc_ban', presence, self) - self.core.on_user_left_private_conversation(from_room, - from_nick, status) - self.on_user_banned(presence, user, from_nick) - # kick - elif kick: - self.core.events.trigger('muc_kick', presence, self) - self.core.on_user_left_private_conversation(from_room, - from_nick, status) - self.on_user_kicked(presence, user, from_nick) - elif shutdown: - self.core.events.trigger('muc_shutdown', presence, self) - self.on_muc_shutdown() - elif non_member: - self.core.events.trigger('muc_shutdown', presence, self) - self.on_non_member_kicked() - # user quit - elif typ == 'unavailable': - self.on_user_leave_groupchat(user, jid, status, - from_nick, from_room) - # status change - else: - self.on_user_change_status(user, from_nick, from_room, - affiliation, role, show, status) - if self.core.current_tab() is self: - self.text_win.refresh() - self.user_win.refresh_if_changed(self.users) - self.info_header.refresh(self, self.text_win) - self.input.refresh() - self.core.doupdate() - - def on_non_member_kicked(self): - """We have been kicked because the MUC is members-only""" - self.add_message( - '\x19%(info_col)s}You have been kicked because you ' - 'are not a member and the room is now members-only.' % { - 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)}, - typ=2) - self.disconnect() - - def on_muc_shutdown(self): - """We have been kicked because the MUC service is shutting down""" - self.add_message( - '\x19%(info_col)s}You have been kicked because the' - ' MUC service is shutting down.' % { - 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)}, - typ=2) - self.disconnect() - - def on_user_join(self, from_nick, affiliation, show, status, role, jid, color): - """ - When a new user joins the groupchat - """ - deterministic = config.get_by_tabname('deterministic_nick_colors', self.name) - user = User(from_nick, affiliation, - show, status, role, jid, deterministic, color) - bisect.insort_left(self.users, user) - hide_exit_join = config.get_by_tabname('hide_exit_join', - self.general_jid) - if hide_exit_join != 0: - if config.get_by_tabname('display_user_color_in_join_part', - self.general_jid): - color = dump_tuple(user.color) - else: - color = 3 - info_col = dump_tuple(get_theme().COLOR_INFORMATION_TEXT) - spec_col = dump_tuple(get_theme().COLOR_JOIN_CHAR) - char_join = get_theme().CHAR_JOIN - if not jid.full: - msg = ('\x19%(color_spec)s}%(spec)s \x19%(color)s}%(nick)s' - '\x19%(info_col)s} joined the chatroom') % { - 'nick': from_nick, 'spec': char_join, - 'color': color, - 'info_col': info_col, - 'color_spec': spec_col, - } - else: - msg = ('\x19%(color_spec)s}%(spec)s \x19%(color)s}%(nick)s' - '\x19%(info_col)s} (\x19%(jid_color)s}%(jid)s\x19' - '%(info_col)s}) joined the chatroom') % { - 'spec': char_join, 'nick': from_nick, - 'color':color, 'jid':jid.full, - 'info_col': info_col, - 'jid_color': dump_tuple(get_theme().COLOR_MUC_JID), - 'color_spec': spec_col, - } - self.add_message(msg, typ=2) - self.core.on_user_rejoined_private_conversation(self.name, from_nick) - - def on_user_nick_change(self, presence, user, from_nick, from_room): - new_nick = presence.find('{%s}x/{%s}item' % (NS_MUC_USER, NS_MUC_USER) - ).attrib['nick'] - if user.nick == self.own_nick: - self.own_nick = new_nick - # also change our nick in all private discussions of this room - self.core.on_muc_own_nickchange(self) - else: - color = config.get_by_tabname(new_nick, 'muc_colors') - if color != '': - deterministic = config.get_by_tabname('deterministic_nick_colors', - self.name) - user.change_color(color, deterministic) - user.change_nick(new_nick) - self.users.remove(user) - bisect.insort_left(self.users, user) - - if config.get_by_tabname('display_user_color_in_join_part', - self.general_jid): - color = dump_tuple(user.color) - else: - color = 3 - info_col = dump_tuple(get_theme().COLOR_INFORMATION_TEXT) - self.add_message('\x19%(color)s}%(old)s\x19%(info_col)s} is' - ' now known as \x19%(color)s}%(new)s' % { - 'old':from_nick, 'new':new_nick, - 'color':color, 'info_col': info_col}, - typ=2) - # rename the private tabs if needed - self.core.rename_private_tabs(self.name, from_nick, new_nick) - - def on_user_banned(self, presence, user, from_nick): - """ - When someone is banned from a muc - """ - self.users.remove(user) - by = presence.find('{%s}x/{%s}item/{%s}actor' % - (NS_MUC_USER, NS_MUC_USER, NS_MUC_USER)) - reason = presence.find('{%s}x/{%s}item/{%s}reason' % - (NS_MUC_USER, NS_MUC_USER, NS_MUC_USER)) - by = by.attrib['jid'] if by is not None else None - - info_col = dump_tuple(get_theme().COLOR_INFORMATION_TEXT) - char_kick = get_theme().CHAR_KICK - - if from_nick == self.own_nick: # we are banned - if by: - kick_msg = ('\x191}%(spec)s \x193}You\x19%(info_col)s}' - ' have been banned by \x194}%(by)s') % { - 'spec': char_kick, 'by': by, - 'info_col': info_col} - else: - kick_msg = ('\x191}%(spec)s \x193}You\x19' - '%(info_col)s} have been banned.') % { - 'spec': char_kick, 'info_col': info_col} - self.core.disable_private_tabs(self.name, reason=kick_msg) - self.disconnect() - self.refresh_tab_win() - self.core.current_tab().input.refresh() - self.core.doupdate() - if config.get_by_tabname('autorejoin', self.general_jid): - delay = config.get_by_tabname('autorejoin_delay', - self.general_jid) - delay = common.parse_str_to_secs(delay) - if delay <= 0: - muc.join_groupchat(self.core, self.name, self.own_nick) - else: - self.core.add_timed_event(timed_events.DelayedEvent( - delay, - muc.join_groupchat, - self.core, - self.name, - self.own_nick)) - - else: - if config.get_by_tabname('display_user_color_in_join_part', - self.general_jid): - color = dump_tuple(user.color) - else: - color = 3 - - if by: - kick_msg = ('\x191}%(spec)s \x19%(color)s}' - '%(nick)s\x19%(info_col)s} ' - 'has been banned by \x194}%(by)s') % { - 'spec': char_kick, 'nick': from_nick, - 'color': color, 'by': by, - 'info_col': info_col} - else: - kick_msg = ('\x191}%(spec)s \x19%(color)s}%(nick)s' - '\x19%(info_col)s} has been banned') % { - 'spec': char_kick, 'nick': from_nick, - 'color': color, 'info_col': info_col} - if reason is not None and reason.text: - kick_msg += ('\x19%(info_col)s} Reason: \x196}' - '%(reason)s\x19%(info_col)s}') % { - 'reason': reason.text, 'info_col': info_col} - self.add_message(kick_msg, typ=2) - - def on_user_kicked(self, presence, user, from_nick): - """ - When someone is kicked from a muc - """ - self.users.remove(user) - actor_elem = presence.find('{%s}x/{%s}item/{%s}actor' % - (NS_MUC_USER, NS_MUC_USER, NS_MUC_USER)) - reason = presence.find('{%s}x/{%s}item/{%s}reason' % - (NS_MUC_USER, NS_MUC_USER, NS_MUC_USER)) - by = None - info_col = dump_tuple(get_theme().COLOR_INFORMATION_TEXT) - char_kick = get_theme().CHAR_KICK - if actor_elem is not None: - by = actor_elem.get('nick') or actor_elem.get('jid') - if from_nick == self.own_nick: # we are kicked - if by: - kick_msg = ('\x191}%(spec)s \x193}You\x19' - '%(info_col)s} have been kicked' - ' by \x193}%(by)s') % { - 'spec': char_kick, 'by': by, - 'info_col': info_col} - else: - kick_msg = ('\x191}%(spec)s \x193}You\x19%(info_col)s}' - ' have been kicked.') % { - 'spec': char_kick, - 'info_col': info_col} - self.core.disable_private_tabs(self.name, reason=kick_msg) - self.disconnect() - self.refresh_tab_win() - self.core.current_tab().input.refresh() - self.core.doupdate() - # try to auto-rejoin - if config.get_by_tabname('autorejoin', self.general_jid): - delay = config.get_by_tabname('autorejoin_delay', - self.general_jid) - delay = common.parse_str_to_secs(delay) - if delay <= 0: - muc.join_groupchat(self.core, self.name, self.own_nick) - else: - self.core.add_timed_event(timed_events.DelayedEvent( - delay, - muc.join_groupchat, - self.core, - self.name, - self.own_nick)) - else: - if config.get_by_tabname('display_user_color_in_join_part', - self.general_jid): - color = dump_tuple(user.color) - else: - color = 3 - if by: - kick_msg = ('\x191}%(spec)s \x19%(color)s}%(nick)s' - '\x19%(info_col)s} has been kicked by ' - '\x193}%(by)s') % { - 'spec': char_kick, 'nick':from_nick, - 'color':color, 'by':by, 'info_col': info_col} - else: - kick_msg = ('\x191}%(spec)s \x19%(color)s}%(nick)s' - '\x19%(info_col)s} has been kicked') % { - 'spec': char_kick, 'nick': from_nick, - 'color':color, 'info_col': info_col} - if reason is not None and reason.text: - kick_msg += ('\x19%(info_col)s} Reason: \x196}' - '%(reason)s') % { - 'reason': reason.text, 'info_col': info_col} - self.add_message(kick_msg, typ=2) - - def on_user_leave_groupchat(self, user, jid, status, from_nick, from_room): - """ - When an user leaves a groupchat - """ - self.users.remove(user) - if self.own_nick == user.nick: - # We are now out of the room. - # Happens with some buggy (? not sure) servers - self.disconnect() - self.core.disable_private_tabs(from_room) - self.refresh_tab_win() - - hide_exit_join = config.get_by_tabname('hide_exit_join', - self.general_jid) - - if hide_exit_join <= -1 or user.has_talked_since(hide_exit_join): - if config.get_by_tabname('display_user_color_in_join_part', - self.general_jid): - color = dump_tuple(user.color) - else: - color = 3 - info_col = dump_tuple(get_theme().COLOR_INFORMATION_TEXT) - spec_col = dump_tuple(get_theme().COLOR_QUIT_CHAR) - - if not jid.full: - leave_msg = ('\x19%(color_spec)s}%(spec)s \x19%(color)s}' - '%(nick)s\x19%(info_col)s} has left the ' - 'chatroom') % { - 'nick':from_nick, 'color':color, - 'spec':get_theme().CHAR_QUIT, - 'info_col': info_col, - 'color_spec': spec_col} - else: - jid_col = dump_tuple(get_theme().COLOR_MUC_JID) - leave_msg = ('\x19%(color_spec)s}%(spec)s \x19%(color)s}' - '%(nick)s\x19%(info_col)s} (\x19%(jid_col)s}' - '%(jid)s\x19%(info_col)s}) has left the ' - 'chatroom') % { - 'spec':get_theme().CHAR_QUIT, - 'nick':from_nick, 'color':color, - 'jid':jid.full, 'info_col': info_col, - 'color_spec': spec_col, - 'jid_col': jid_col} - if status: - leave_msg += ' (\x19o%s\x19%s})' % (status, info_col) - self.add_message(leave_msg, typ=2) - self.core.on_user_left_private_conversation(from_room, from_nick, - status) - - def on_user_change_status( - self, user, from_nick, from_room, affiliation, role, show, status): - """ - When an user changes her status - """ - # build the message - display_message = False # flag to know if something significant enough - # to be displayed has changed - if config.get_by_tabname('display_user_color_in_join_part', - self.general_jid): - color = dump_tuple(user.color) - else: - color = 3 - if from_nick == self.own_nick: - msg = '\x19%(color)s}You\x19%(info_col)s} changed: ' % { - 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT), - 'color': color} - else: - msg = '\x19%(color)s}%(nick)s\x19%(info_col)s} changed: ' % { - 'nick': from_nick, 'color': color, - 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)} - if affiliation != user.affiliation: - msg += 'affiliation: %s, ' % affiliation - display_message = True - if role != user.role: - msg += 'role: %s, ' % role - display_message = True - if show != user.show and show in SHOW_NAME: - msg += 'show: %s, ' % SHOW_NAME[show] - display_message = True - if status != user.status: - # if the user sets his status to nothing - if status: - msg += 'status: %s, ' % status - display_message = True - elif show in SHOW_NAME and show == user.show: - msg += 'show: %s, ' % SHOW_NAME[show] - display_message = True - if not display_message: - return - msg = msg[:-2] # remove the last ", " - hide_status_change = config.get_by_tabname('hide_status_change', - self.general_jid) - if hide_status_change < -1: - hide_status_change = -1 - if ((hide_status_change == -1 or \ - user.has_talked_since(hide_status_change) or\ - user.nick == self.own_nick)\ - and\ - (affiliation != user.affiliation or\ - role != user.role or\ - show != user.show or\ - status != user.status))\ - or\ - (affiliation != user.affiliation or\ - role != user.role): - # display the message in the room - self._text_buffer.add_message(msg) - self.core.on_user_changed_status_in_private('%s/%s' % - (from_room, from_nick), - msg) - self.users.remove(user) - # finally, effectively change the user status - user.update(affiliation, show, status, role) - bisect.insort_left(self.users, user) - - def disconnect(self): - """ - Set the state of the room as not joined, so - we can know if we can join it, send messages to it, etc - """ - self.users = [] - if self is not self.core.current_tab(): - self.state = 'disconnected' - self.joined = False - self.disable_self_ping_event() - - def get_single_line_topic(self): - """ - Return the topic as a single-line string (for the window header) - """ - return self.topic.replace('\n', '|') - - def log_message(self, txt, nickname, time=None, typ=1): - """ - Log the messages in the archives, if it needs - to be - """ - if time is None and self.joined: # don't log the history messages - if not logger.log_message(self.name, nickname, txt, typ=typ): - self.core.information('Unable to write in the log file', - 'Error') - - def do_highlight(self, txt, time, nickname): - """ - Set the tab color and returns the nick color - """ - highlighted = False - if not time and nickname and nickname != self.own_nick and self.joined: - - if re.search(r'\b' + self.own_nick.lower() + r'\b', txt.lower()): - if self.state != 'current': - self.state = 'highlight' - highlighted = True - else: - highlight_words = config.get_by_tabname('highlight_on', - self.general_jid) - highlight_words = highlight_words.split(':') - for word in highlight_words: - if word and word.lower() in txt.lower(): - if self.state != 'current': - self.state = 'highlight' - highlighted = True - break - if highlighted: - beep_on = config.get('beep_on').split() - if 'highlight' in beep_on and 'message' not in beep_on: - if not config.get_by_tabname('disable_beep', self.name): - curses.beep() - return highlighted - - def get_user_by_name(self, nick): - """ - Gets the user associated with the given nick, or None if not found - """ - for user in self.users: - if user.nick == nick: - return user - return None - - def add_message(self, txt, time=None, nickname=None, **kwargs): - """ - Note that user can be None even if nickname is not None. It happens - when we receive an history message said by someone who is not - in the room anymore - Return True if the message highlighted us. False otherwise. - """ - - # reset self-ping interval - if self.self_ping_event: - self.enable_self_ping_event() - - self.log_message(txt, nickname, time=time, typ=kwargs.get('typ', 1)) - args = dict() - for key, value in kwargs.items(): - if key not in ('typ', 'forced_user'): - args[key] = value - if nickname is not None: - user = self.get_user_by_name(nickname) - else: - user = None - - if user: - user.set_last_talked(datetime.now()) - args['user'] = user - if not user and kwargs.get('forced_user'): - args['user'] = kwargs['forced_user'] - - if (not time and nickname and nickname != self.own_nick - and self.state != 'current'): - if (self.state != 'highlight' and - config.get_by_tabname('notify_messages', self.name)): - self.state = 'message' - if time and not txt.startswith('/me'): - txt = '\x19%(info_col)s}%(txt)s' % { - 'txt': txt, - 'info_col': dump_tuple(get_theme().COLOR_LOG_MSG)} - elif not nickname: - txt = '\x19%(info_col)s}%(txt)s' % { - 'txt': txt, - 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)} - elif not kwargs.get('highlight'): # TODO - args['highlight'] = self.do_highlight(txt, time, nickname) - time = time or datetime.now() - self._text_buffer.add_message(txt, time, nickname, **args) - return args.get('highlight', False) - - def modify_message(self, txt, old_id, new_id, - time=None, nickname=None, user=None, jid=None): - self.log_message(txt, nickname, time=time, typ=1) - highlight = self.do_highlight(txt, time, nickname) - message = self._text_buffer.modify_message(txt, old_id, new_id, - highlight=highlight, - time=time, user=user, - jid=jid) - if message: - self.text_win.modify_message(old_id, message) - return highlight - return False - - def matching_names(self): - return [(1, safeJID(self.name).user), (3, self.name)] - - def enable_self_ping_event(self): - delay = config.get_by_tabname("self_ping_delay", self.general_jid, default=0) - if delay <= 0: # use 0 or some negative value to disable it - return - self.disable_self_ping_event() - self.self_ping_event = timed_events.DelayedEvent(delay, self.send_self_ping) - self.core.add_timed_event(self.self_ping_event) - - def disable_self_ping_event(self): - if self.self_ping_event is not None: - self.core.remove_timed_event(self.self_ping_event) - self.self_ping_event = None - - def send_self_ping(self): - to = self.name + "/" + self.own_nick - self.core.xmpp.plugin['xep_0199'].send_ping(jid=to, - callback=self.on_self_ping_result, - timeout_callback=self.on_self_ping_failed, - timeout=60) - - def on_self_ping_result(self, iq): - if iq["type"] == "error": - self.command_cycle(iq["error"]["text"] or "not in this room") - self.core.refresh_window() - else: # Re-send a self-ping in a few seconds - self.enable_self_ping_event() - - def search_for_color(self, nick): - """ - Search for the color of a nick in the config file. - Also, look at the colors of its possible aliases if nick_color_aliases - is set. - """ - color = config.get_by_tabname(nick, 'muc_colors') - if color != '': - return color - nick_color_aliases = config.get_by_tabname('nick_color_aliases', self.name) - if nick_color_aliases: - nick_alias = re.sub('^_*(.*?)_*$', '\\1', nick) - color = config.get_by_tabname(nick_alias, 'muc_colors') - return color - - def on_self_ping_failed(self, iq): - self.command_cycle("the MUC server is not responding") - self.core.refresh_window() diff --git a/src/tabs/privatetab.py b/src/tabs/privatetab.py deleted file mode 100644 index a715a922..00000000 --- a/src/tabs/privatetab.py +++ /dev/null @@ -1,362 +0,0 @@ -""" -Module for the PrivateTab - -A PrivateTab is a private conversation opened with someone from a MUC -(see muctab.py). The conversation happens with both JID being relative -to the MUC (room@server/nick1 and room@server/nick2). - -This tab references his parent room, and is modified to keep track of -both participant’s nicks. It also has slightly different features than -the ConversationTab (such as tab-completion on nicks from the room). - -""" -import logging -log = logging.getLogger(__name__) - -import curses - -from . import OneToOneTab, MucTab, Tab - -import fixes -import windows -import xhtml -from common import safeJID -from config import config -from decorators import refresh_wrapper -from logger import logger -from theming import get_theme, dump_tuple -from decorators import command_args_parser - -class PrivateTab(OneToOneTab): - """ - The tab containg a private conversation (someone from a MUC) - """ - message_type = 'chat' - plugin_commands = {} - additional_informations = {} - plugin_keys = {} - def __init__(self, name, nick): - OneToOneTab.__init__(self, name) - self.own_nick = nick - self.name = name - self.text_win = windows.TextWin() - self._text_buffer.add_window(self.text_win) - self.info_header = windows.PrivateInfoWin() - self.input = windows.MessageInput() - # keys - self.key_func['^I'] = self.completion - # commands - self.register_command('info', self.command_info, - desc='Display some information about the user in the MUC: its/his/her role, affiliation, status and status message.', - shortdesc='Info about the user.') - self.register_command('unquery', self.command_unquery, - shortdesc='Close the tab.') - self.register_command('close', self.command_unquery, - shortdesc='Close the tab.') - self.register_command('version', self.command_version, - desc='Get the software version of the current interlocutor (usually its XMPP client and Operating System).', - shortdesc='Get the software version of a jid.') - self.resize() - self.parent_muc = self.core.get_tab_by_name(safeJID(name).bare, MucTab) - self.on = True - self.update_commands() - self.update_keys() - - @property - def general_jid(self): - return self.name - - def get_dest_jid(self): - return self.name - - @property - def nick(self): - return self.get_nick() - - @staticmethod - def add_information_element(plugin_name, callback): - """ - Lets a plugin add its own information to the PrivateInfoWin - """ - PrivateTab.additional_informations[plugin_name] = callback - - @staticmethod - def remove_information_element(plugin_name): - del PrivateTab.additional_informations[plugin_name] - - def load_logs(self, log_nb): - logs = logger.get_logs(safeJID(self.name).full.replace('/', '\\'), log_nb) - return logs - - def log_message(self, txt, nickname, time=None, typ=1): - """ - Log the messages in the archives. - """ - if not logger.log_message(self.name, nickname, txt, date=time, typ=typ): - self.core.information('Unable to write in the log file', 'Error') - - def on_close(self): - self.parent_muc.privates.remove(self) - - def completion(self): - """ - Called when Tab is pressed, complete the nickname in the input - """ - if self.complete_commands(self.input): - return - - # If we are not completing a command or a command's argument, complete a nick - compare_users = lambda x: x.last_talked - word_list = [user.nick for user in sorted(self.parent_muc.users, key=compare_users, reverse=True)\ - if user.nick != self.own_nick] - after = config.get('after_completion') + ' ' - input_pos = self.input.pos - if ' ' not in self.input.get_text()[:input_pos] or (self.input.last_completion and\ - self.input.get_text()[:input_pos] == self.input.last_completion + after): - add_after = after - else: - add_after = '' - self.input.auto_completion(word_list, add_after, quotify=False) - empty_after = self.input.get_text() == '' or (self.input.get_text().startswith('/') and not self.input.get_text().startswith('//')) - self.send_composing_chat_state(empty_after) - - @command_args_parser.raw - def command_say(self, line, attention=False, correct=False): - if not self.on: - return - msg = self.core.xmpp.make_message(self.name) - msg['type'] = 'chat' - msg['body'] = line - # trigger the event BEFORE looking for colors. - # This lets a plugin insert \x19xxx} colors, that will - # be converted in xhtml. - self.core.events.trigger('private_say', msg, self) - if not msg['body']: - self.cancel_paused_delay() - self.text_win.refresh() - self.input.refresh() - return - user = self.parent_muc.get_user_by_name(self.own_nick) - replaced = False - if correct or msg['replace']['id']: - msg['replace']['id'] = self.last_sent_message['id'] - if config.get_by_tabname('group_corrections', self.name): - try: - self.modify_message(msg['body'], self.last_sent_message['id'], msg['id'], - user=user, jid=self.core.xmpp.boundjid, nickname=self.own_nick) - replaced = True - except: - log.error('Unable to correct a message', exc_info=True) - else: - del msg['replace'] - - if msg['body'].find('\x19') != -1: - msg.enable('html') - msg['html']['body'] = xhtml.poezio_colors_to_html(msg['body']) - msg['body'] = xhtml.clean_text(msg['body']) - if (config.get_by_tabname('send_chat_states', self.general_jid) and - self.remote_wants_chatstates is not False): - needed = 'inactive' if self.inactive else 'active' - msg['chat_state'] = needed - if attention and self.remote_supports_attention: - msg['attention'] = True - self.core.events.trigger('private_say_after', msg, self) - if not msg['body']: - self.cancel_paused_delay() - self.text_win.refresh() - self.input.refresh() - return - if not replaced: - self.add_message(msg['body'], - nickname=self.own_nick or self.core.own_nick, - forced_user=user, - nick_color=get_theme().COLOR_OWN_NICK, - identifier=msg['id'], - jid=self.core.xmpp.boundjid, - typ=1) - - self.last_sent_message = msg - if self.remote_supports_receipts: - msg._add_receipt = True - msg.send() - self.cancel_paused_delay() - self.text_win.refresh() - self.input.refresh() - - @command_args_parser.ignored - def command_unquery(self): - """ - /unquery - """ - self.core.close_tab() - - @command_args_parser.quoted(0, 1) - def command_version(self, args): - """ - /version - """ - def callback(res): - if not res: - return self.core.information('Could not get the software version from %s' % (jid,), 'Warning') - version = '%s is running %s version %s on %s' % (jid, - res.get('name') or 'an unknown software', - res.get('version') or 'unknown', - res.get('os') or 'an unknown platform') - self.core.information(version, 'Info') - if args: - return self.core.command_version(args[0]) - jid = safeJID(self.name) - fixes.get_version(self.core.xmpp, jid, - callback=callback) - - @command_args_parser.quoted(0, 1) - def command_info(self, arg): - """ - /info - """ - if arg and arg[0]: - self.parent_muc.command_info(arg[0]) - else: - user = safeJID(self.name).resource - self.parent_muc.command_info(user) - - def resize(self): - self.need_resize = False - - if self.size.tab_degrade_y: - info_win_height = 0 - tab_win_height = 0 - else: - info_win_height = self.core.information_win_size - tab_win_height = Tab.tab_win_height() - - self.text_win.resize(self.height - 2 - info_win_height - tab_win_height, - self.width, 0, 0) - self.text_win.rebuild_everything(self._text_buffer) - self.info_header.resize(1, self.width, - self.height - 2 - info_win_height - - tab_win_height, - 0) - self.input.resize(1, self.width, self.height-1, 0) - - def refresh(self): - if self.need_resize: - self.resize() - log.debug(' TAB Refresh: %s', self.__class__.__name__) - display_info_win = not self.size.tab_degrade_y - - self.text_win.refresh() - self.info_header.refresh(self.name, self.text_win, self.chatstate, - PrivateTab.additional_informations) - if display_info_win: - self.info_win.refresh() - - self.refresh_tab_win() - self.input.refresh() - - def refresh_info_header(self): - self.info_header.refresh(self.name, self.text_win, self.chatstate, PrivateTab.additional_informations) - self.input.refresh() - - def get_nick(self): - return safeJID(self.name).resource - - def on_input(self, key, raw): - if not raw and key in self.key_func: - self.key_func[key]() - return False - self.input.do_command(key, raw=raw) - if not self.on: - return False - empty_after = self.input.get_text() == '' or (self.input.get_text().startswith('/') and not self.input.get_text().startswith('//')) - tab = self.core.get_tab_by_name(safeJID(self.name).bare, MucTab) - if tab and tab.joined: - self.send_composing_chat_state(empty_after) - return False - - def on_lose_focus(self): - if self.input.text: - self.state = 'nonempty' - else: - self.state = 'normal' - - self.text_win.remove_line_separator() - self.text_win.add_line_separator(self._text_buffer) - tab = self.core.get_tab_by_name(safeJID(self.name).bare, MucTab) - if tab and tab.joined and config.get_by_tabname('send_chat_states', - self.general_jid) and not self.input.get_text() and self.on: - self.send_chat_state('inactive') - self.check_scrolled() - - def on_gain_focus(self): - self.state = 'current' - curses.curs_set(1) - tab = self.core.get_tab_by_name(safeJID(self.name).bare, MucTab) - if tab and tab.joined and config.get_by_tabname('send_chat_states', - self.general_jid,) and not self.input.get_text() and self.on: - self.send_chat_state('active') - - def on_info_win_size_changed(self): - if self.core.information_win_size >= self.height-3: - return - self.text_win.resize(self.height-2-self.core.information_win_size - Tab.tab_win_height(), self.width, 0, 0) - self.info_header.resize(1, self.width, self.height-2-self.core.information_win_size - Tab.tab_win_height(), 0) - - def get_text_window(self): - return self.text_win - - @refresh_wrapper.conditional - def rename_user(self, old_nick, new_nick): - """ - The user changed her nick in the corresponding muc: update the tab’s name and - display a message. - """ - self.add_message('\x193}%(old)s\x19%(info_col)s} is now known as \x193}%(new)s' % {'old':old_nick, 'new':new_nick, 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)}, typ=2) - new_jid = safeJID(self.name).bare+'/'+new_nick - self.name = new_jid - return self.core.current_tab() is self - - @refresh_wrapper.conditional - def user_left(self, status_message, from_nick): - """ - The user left the associated MUC - """ - self.deactivate() - if not status_message: - self.add_message('\x191}%(spec)s \x193}%(nick)s\x19%(info_col)s} has left the room' % {'nick':from_nick, 'spec':get_theme().CHAR_QUIT, 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)}, typ=2) - else: - self.add_message('\x191}%(spec)s \x193}%(nick)s\x19%(info_col)s} has left the room (%(status)s)"' % {'nick':from_nick, 'spec':get_theme().CHAR_QUIT, 'status': status_message, 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)}, typ=2) - return self.core.current_tab() is self - - @refresh_wrapper.conditional - def user_rejoined(self, nick): - """ - The user (or at least someone with the same nick) came back in the MUC - """ - self.activate() - self.check_features() - tab = self.core.get_tab_by_name(safeJID(self.name).bare, MucTab) - color = 3 - if tab and config.get_by_tabname('display_user_color_in_join_part', - self.general_jid): - user = tab.get_user_by_name(nick) - if user: - color = dump_tuple(user.color) - self.add_message('\x194}%(spec)s \x19%(color)s}%(nick)s\x19%(info_col)s} joined the room' % {'nick':nick, 'color': color, 'spec':get_theme().CHAR_JOIN, 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)}, typ=2) - return self.core.current_tab() is self - - def activate(self, reason=None): - self.on = True - if reason: - self.add_message(txt=reason, typ=2) - - def deactivate(self, reason=None): - self.on = False - self.remote_wants_chatstates = None - if reason: - self.add_message(txt=reason, typ=2) - - def matching_names(self): - return [(3, safeJID(self.name).resource), (4, self.name)] - - diff --git a/src/tabs/rostertab.py b/src/tabs/rostertab.py deleted file mode 100644 index a5c22304..00000000 --- a/src/tabs/rostertab.py +++ /dev/null @@ -1,1280 +0,0 @@ -""" -The RosterInfoTab is the tab showing roster info, the list of contacts, -half of it is dedicated to showing the information buffer, and a small -rectangle shows the current contact info. - -This module also includes functions to match users in the roster. -""" -import logging -log = logging.getLogger(__name__) - -import base64 -import curses -import difflib -import os -import ssl -from os import getenv, path -from functools import partial - -from . import Tab - -import common -import windows -from common import safeJID -from config import config -from contact import Contact, Resource -from decorators import refresh_wrapper -from roster import RosterGroup, roster -from theming import get_theme, dump_tuple -from decorators import command_args_parser - -class RosterInfoTab(Tab): - """ - A tab, splitted in two, containing the roster and infos - """ - plugin_commands = {} - plugin_keys = {} - def __init__(self): - Tab.__init__(self) - self.name = "Roster" - self.v_separator = windows.VerticalSeparator() - self.information_win = windows.TextWin() - self.core.information_buffer.add_window(self.information_win) - self.roster_win = windows.RosterWin() - self.contact_info_win = windows.ContactInfoWin() - self.default_help_message = windows.HelpText("Enter commands with “/”. “o”: toggle offline show") - self.input = self.default_help_message - self.state = 'normal' - self.key_func['^I'] = self.completion - self.key_func["/"] = self.on_slash - # disable most of the roster features when in anonymous mode - if not self.core.xmpp.anon: - self.key_func[' '] = self.on_space - self.key_func["KEY_UP"] = self.move_cursor_up - self.key_func["KEY_DOWN"] = self.move_cursor_down - self.key_func["M-u"] = self.move_cursor_to_next_contact - self.key_func["M-y"] = self.move_cursor_to_prev_contact - self.key_func["M-U"] = self.move_cursor_to_next_group - self.key_func["M-Y"] = self.move_cursor_to_prev_group - self.key_func["M-[1;5B"] = self.move_cursor_to_next_group - self.key_func["M-[1;5A"] = self.move_cursor_to_prev_group - self.key_func["l"] = self.command_last_activity - self.key_func["o"] = self.toggle_offline_show - self.key_func["v"] = self.get_contact_version - self.key_func["i"] = self.show_contact_info - self.key_func["s"] = self.start_search - self.key_func["S"] = self.start_search_slow - self.key_func["n"] = self.change_contact_name - self.register_command('deny', self.command_deny, - usage='[jid]', - desc='Deny your presence to the provided JID (or the ' - 'selected contact in your roster), who is asking' - 'you to be in his/here roster.', - shortdesc='Deny an user your presence.', - completion=self.completion_deny) - self.register_command('accept', self.command_accept, - usage='[jid]', - desc='Allow the provided JID (or the selected contact ' - 'in your roster), to see your presence.', - shortdesc='Allow an user your presence.', - completion=self.completion_deny) - self.register_command('add', self.command_add, - usage='', - desc='Add the specified JID to your roster, ask him to' - ' allow you to see his presence, and allow him to' - ' see your presence.', - shortdesc='Add an user to your roster.') - self.register_command('name', self.command_name, - usage=' [name]', - shortdesc='Set the given JID\'s name.', - completion=self.completion_name) - self.register_command('groupadd', self.command_groupadd, - usage=' ', - desc='Add the given JID to the given group.', - shortdesc='Add an user to a group', - completion=self.completion_groupadd) - self.register_command('groupmove', self.command_groupmove, - usage=' ', - desc='Move the given JID from the old group to the new group.', - shortdesc='Move an user to another group.', - completion=self.completion_groupmove) - self.register_command('groupremove', self.command_groupremove, - usage=' ', - desc='Remove the given JID from the given group.', - shortdesc='Remove an user from a group.', - completion=self.completion_groupremove) - self.register_command('remove', self.command_remove, - usage='[jid]', - desc='Remove the specified JID from your roster. This ' - 'will unsubscribe you from its presence, cancel ' - 'its subscription to yours, and remove the item ' - 'from your roster.', - shortdesc='Remove an user from your roster.', - completion=self.completion_remove) - self.register_command('export', self.command_export, - usage='[/path/to/file]', - desc='Export your contacts into /path/to/file if ' - 'specified, or $HOME/poezio_contacts if not.', - shortdesc='Export your roster to a file.', - completion=partial(self.completion_file, 1)) - self.register_command('import', self.command_import, - usage='[/path/to/file]', - desc='Import your contacts from /path/to/file if ' - 'specified, or $HOME/poezio_contacts if not.', - shortdesc='Import your roster from a file.', - completion=partial(self.completion_file, 1)) - self.register_command('password', self.command_password, - usage='', - shortdesc='Change your password') - - self.register_command('reconnect', self.command_reconnect, - desc='Disconnect from the remote server if you are ' - 'currently connected and then connect to it again.', - shortdesc='Disconnect and reconnect to the server.') - self.register_command('disconnect', self.command_disconnect, - desc='Disconnect from the remote server.', - shortdesc='Disconnect from the server.') - self.register_command('clear', self.command_clear, - shortdesc='Clear the info buffer.') - self.register_command('last_activity', self.command_last_activity, - usage='', - desc='Informs you of the last activity of a JID.', - shortdesc='Get the activity of someone.', - completion=self.core.completion_last_activity) - - self.resize() - self.update_commands() - self.update_keys() - - def check_blocking(self, features): - if 'urn:xmpp:blocking' in features and not self.core.xmpp.anon: - self.register_command('block', self.command_block, - usage='[jid]', - shortdesc='Prevent a JID from talking to you.', - completion=self.completion_block) - self.register_command('unblock', self.command_unblock, - usage='[jid]', - shortdesc='Allow a JID to talk to you.', - completion=self.completion_unblock) - self.register_command('list_blocks', self.command_list_blocks, - shortdesc='Show the blocked contacts.') - self.core.xmpp.del_event_handler('session_start', self.check_blocking) - self.core.xmpp.add_event_handler('blocked_message', self.on_blocked_message) - - def check_saslexternal(self, features): - if 'urn:xmpp:saslcert:1' in features and not self.core.xmpp.anon: - self.register_command('certs', self.command_certs, - desc='List the fingerprints of certificates' - ' which can connect to your account.', - shortdesc='List allowed client certs.') - self.register_command('cert_add', self.command_cert_add, - desc='Add a client certificate to the authorized ones. ' - 'It must have an unique name and be contained in ' - 'a PEM file. [management] is a boolean indicating' - ' if a client connected using this certificate can' - ' manage the certificates itself.', - shortdesc='Add a client certificate.', - usage=' [management]', - completion=self.completion_cert_add) - self.register_command('cert_disable', self.command_cert_disable, - desc='Remove a certificate from the list ' - 'of allowed ones. Clients currently ' - 'using this certificate will not be ' - 'forcefully disconnected.', - shortdesc='Disable a certificate', - usage='') - self.register_command('cert_revoke', self.command_cert_revoke, - desc='Remove a certificate from the list ' - 'of allowed ones. Clients currently ' - 'using this certificate will be ' - 'forcefully disconnected.', - shortdesc='Revoke a certificate', - usage='') - self.register_command('cert_fetch', self.command_cert_fetch, - desc='Retrieve a certificate with its ' - 'name. It will be stored in .', - shortdesc='Fetch a certificate', - usage=' ', - completion=self.completion_cert_fetch) - - @property - def selected_row(self): - return self.roster_win.get_selected_row() - - @command_args_parser.ignored - def command_certs(self): - """ - /certs - """ - def cb(iq): - if iq['type'] == 'error': - self.core.information('Unable to retrieve the certificate list.', - 'Error') - return - certs = [] - for item in iq['sasl_certs']['items']: - users = '\n'.join(item['users']) - certs.append((item['name'], users)) - - if not certs: - return self.core.information('No certificates found', 'Info') - msg = 'Certificates:\n' - msg += '\n'.join(((' %s%s' % (item[0] + (': ' if item[1] else ''), item[1])) for item in certs)) - self.core.information(msg, 'Info') - - self.core.xmpp.plugin['xep_0257'].get_certs(callback=cb, timeout=3) - - @command_args_parser.quoted(2, 1) - def command_cert_add(self, args): - """ - /cert_add [cert-management] - """ - if not args or len(args) < 2: - return self.core.command_help('cert_add') - def cb(iq): - if iq['type'] == 'error': - self.core.information('Unable to add the certificate.', 'Error') - else: - self.core.information('Certificate added.', 'Info') - - name = args[0] - - try: - with open(args[1]) as fd: - crt = fd.read() - crt = crt.replace(ssl.PEM_FOOTER, '').replace(ssl.PEM_HEADER, '').replace(' ', '').replace('\n', '') - except Exception as e: - self.core.information('Unable to read the certificate: %s' % e, 'Error') - return - - if len(args) > 2: - management = args[2] - if management: - management = management.lower() - if management not in ('false', '0'): - management = True - else: - management = False - else: - management = False - else: - management = True - - self.core.xmpp.plugin['xep_0257'].add_cert(name, crt, callback=cb, - allow_management=management) - - def completion_cert_add(self, the_input): - """ - completion for /cert_add [management] - """ - text = the_input.get_text() - args = common.shell_split(text) - n = the_input.get_argument_position() - log.debug('%s %s %s', the_input.text, n, the_input.pos) - if n == 1: - return - elif n == 2: - return self.completion_file(2, the_input) - elif n == 3: - return the_input.new_completion(['true', 'false'], n) - - @command_args_parser.quoted(1) - def command_cert_disable(self, args): - """ - /cert_disable - """ - if not args: - return self.core.command_help('cert_disable') - def cb(iq): - if iq['type'] == 'error': - self.core.information('Unable to disable the certificate.', 'Error') - else: - self.core.information('Certificate disabled.', 'Info') - - name = args[0] - - self.core.xmpp.plugin['xep_0257'].disable_cert(name, callback=cb) - - @command_args_parser.quoted(1) - def command_cert_revoke(self, args): - """ - /cert_revoke - """ - if not args: - return self.core.command_help('cert_revoke') - def cb(iq): - if iq['type'] == 'error': - self.core.information('Unable to revoke the certificate.', 'Error') - else: - self.core.information('Certificate revoked.', 'Info') - - name = args[0] - - self.core.xmpp.plugin['xep_0257'].revoke_cert(name, callback=cb) - - - @command_args_parser.quoted(2) - def command_cert_fetch(self, args): - """ - /cert_fetch - """ - if not args or len(args) < 2: - return self.core.command_help('cert_fetch') - def cb(iq): - if iq['type'] == 'error': - self.core.information('Unable to fetch the certificate.', - 'Error') - return - - cert = None - for item in iq['sasl_certs']['items']: - if item['name'] == name: - cert = base64.b64decode(item['x509cert']) - break - - if not cert: - return self.core.information('Certificate not found.', 'Info') - - cert = ssl.DER_cert_to_PEM_cert(cert) - with open(path, 'w') as fd: - fd.write(cert) - - self.core.information('File stored at %s' % path, 'Info') - - name = args[0] - path = args[1] - - self.core.xmpp.plugin['xep_0257'].get_certs(callback=cb) - - def completion_cert_fetch(self, the_input): - """ - completion for /cert_fetch - """ - text = the_input.get_text() - args = common.shell_split(text) - n = the_input.get_argument_position() - log.debug('%s %s %s', the_input.text, n, the_input.pos) - if n == 1: - return - elif n == 2: - return self.completion_file(2, the_input) - - def on_blocked_message(self, message): - """ - When we try to send a message to a blocked contact - """ - tab = self.core.get_conversation_by_jid(message['from'], False) - if not tab: - log.debug('Received message from nonexistent tab: %s', message['from']) - message = '\x19%(info_col)s}Cannot send message to %(jid)s: contact blocked' % { - 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT), - 'jid': message['from'], - } - tab.add_message(message) - - @command_args_parser.quoted(0, 1) - def command_block(self, args): - """ - /block [jid] - """ - def callback(iq): - if iq['type'] == 'error': - return self.core.information('Could not block the contact.', 'Error') - elif iq['type'] == 'result': - return self.core.information('Contact blocked.', 'Info') - - item = self.roster_win.selected_row - if args: - jid = safeJID(args[0]) - elif isinstance(item, Contact): - jid = item.bare_jid - elif isinstance(item, Resource): - jid = item.jid.bare - self.core.xmpp.plugin['xep_0191'].block(jid, callback=callback) - - def completion_block(self, the_input): - """ - Completion for /block - """ - if the_input.get_argument_position() == 1: - jids = roster.jids() - return the_input.new_completion(jids, 1, '', quotify=False) - - @command_args_parser.quoted(0, 1) - def command_unblock(self, args): - """ - /unblock [jid] - """ - def callback(iq): - if iq['type'] == 'error': - return self.core.information('Could not unblock the contact.', 'Error') - elif iq['type'] == 'result': - return self.core.information('Contact unblocked.', 'Info') - - item = self.roster_win.selected_row - if args: - jid = safeJID(args[0]) - elif isinstance(item, Contact): - jid = item.bare_jid - elif isinstance(item, Resource): - jid = item.jid.bare - self.core.xmpp.plugin['xep_0191'].unblock(jid, callback=callback) - - def completion_unblock(self, the_input): - """ - Completion for /unblock - """ - def on_result(iq): - if iq['type'] == 'error': - return - l = sorted(str(item) for item in iq['blocklist']['items']) - return the_input.new_completion(l, 1, quotify=False) - - if the_input.get_argument_position(): - self.core.xmpp.plugin['xep_0191'].get_blocked(callback=on_result) - return True - - @command_args_parser.ignored - def command_list_blocks(self): - """ - /list_blocks - """ - def callback(iq): - if iq['type'] == 'error': - return self.core.information('Could not retrieve the blocklist.', 'Error') - s = 'List of blocked JIDs:\n' - items = (str(item) for item in iq['blocklist']['items']) - jids = '\n'.join(items) - if jids: - s += jids - else: - s = 'No blocked JIDs.' - self.core.information(s, 'Info') - - self.core.xmpp.plugin['xep_0191'].get_blocked(callback=callback) - - @command_args_parser.ignored - def command_reconnect(self): - """ - /reconnect - """ - if self.core.xmpp.is_connected(): - self.core.disconnect(reconnect=True) - else: - self.core.xmpp.connect() - - @command_args_parser.ignored - def command_disconnect(self): - """ - /disconnect - """ - self.core.disconnect() - - @command_args_parser.quoted(0, 1) - def command_last_activity(self, args): - """ - /activity [jid] - """ - item = self.roster_win.selected_row - if args: - jid = args[0] - elif isinstance(item, Contact): - jid = item.bare_jid - elif isinstance(item, Resource): - jid = item.jid - else: - self.core.information('No JID selected.', 'Error') - return - self.core.command_last_activity(jid) - - def resize(self): - self.need_resize = False - if self.size.tab_degrade_x: - display_info = False - roster_width = self.width - else: - display_info = True - roster_width = self.width // 2 - if self.size.tab_degrade_y: - display_contact_win = False - contact_win_h = 0 - else: - display_contact_win = True - contact_win_h = 4 - if self.size.tab_degrade_y: - tab_win_height = 0 - else: - tab_win_height = Tab.tab_win_height() - - info_width = self.width - roster_width - 1 - if display_info: - self.v_separator.resize(self.height - 1 - tab_win_height, - 1, 0, roster_width) - self.information_win.resize(self.height - 1 - tab_win_height - - contact_win_h, - info_width, 0, roster_width + 1, - self.core.information_buffer) - if display_contact_win: - self.contact_info_win.resize(contact_win_h, - info_width, - self.height - tab_win_height - - contact_win_h - 1, - roster_width + 1) - self.roster_win.resize(self.height - 1 - Tab.tab_win_height(), - roster_width, 0, 0) - self.input.resize(1, self.width, self.height-1, 0) - self.default_help_message.resize(1, self.width, self.height-1, 0) - - def completion(self): - # Check if we are entering a command (with the '/' key) - if isinstance(self.input, windows.Input) and\ - not self.input.help_message: - self.complete_commands(self.input) - - def completion_file(self, complete_number, the_input): - """ - Generic quoted completion for files/paths - (use functools.partial to use directly as a completion - for a command) - """ - text = the_input.get_text() - args = common.shell_split(text) - n = the_input.get_argument_position() - if n == complete_number: - if args[n-1] == '' or len(args) < n+1: - home = os.getenv('HOME') or '/' - return the_input.new_completion([home, '/tmp'], n, quotify=True) - path_ = args[n] - if path.isdir(path_): - dir_ = path_ - base = '' - else: - dir_ = path.dirname(path_) - base = path.basename(path_) - try: - names = os.listdir(dir_) - except OSError: - names = [] - names_filtered = [name for name in names if name.startswith(base)] - if names_filtered: - names = names_filtered - if not names: - names = [path_] - end_list = [] - for name in names: - value = os.path.join(dir_, name) - if not name.startswith('.'): - end_list.append(value) - - return the_input.new_completion(end_list, n, quotify=True) - - @command_args_parser.ignored - def command_clear(self): - """ - /clear - """ - self.core.information_buffer.messages = [] - self.information_win.rebuild_everything(self.core.information_buffer) - self.core.information_win.rebuild_everything(self.core.information_buffer) - self.refresh() - - @command_args_parser.quoted(1) - def command_password(self, args): - """ - /password - """ - def callback(iq): - if iq['type'] == 'result': - self.core.information('Password updated', 'Account') - if config.get('password'): - config.silent_set('password', args[0]) - else: - self.core.information('Unable to change the password', 'Account') - self.core.xmpp.plugin['xep_0077'].change_password(args[0], callback=callback) - - @command_args_parser.quoted(0, 1) - def command_deny(self, args): - """ - /deny [jid] - Denies a JID from our roster - """ - if not args: - item = self.roster_win.selected_row - if isinstance(item, Contact): - jid = item.bare_jid - else: - self.core.information('No subscription to deny') - return - else: - jid = safeJID(args[0]).bare - if not jid in [jid for jid in roster.jids()]: - self.core.information('No subscription to deny') - return - - contact = roster[jid] - if contact: - contact.unauthorize() - self.core.information('Subscription to %s was revoked' % jid, - 'Roster') - - @command_args_parser.quoted(1) - def command_add(self, args): - """ - Add the specified JID to the roster, and set automatically - accept the reverse subscription - """ - if args is None: - self.core.information('No JID specified', 'Error') - return - jid = safeJID(safeJID(args[0]).bare) - if not str(jid): - self.core.information('The provided JID (%s) is not valid' % (args[0],), 'Error') - return - if jid in roster and roster[jid].subscription in ('to', 'both'): - return self.core.information('Already subscribed.', 'Roster') - roster.add(jid) - roster.modified() - self.core.information('%s was added to the roster' % jid, 'Roster') - - @command_args_parser.quoted(1, 1) - def command_name(self, args): - """ - Set a name for the specified JID in your roster - """ - def callback(iq): - if not iq: - self.core.information('The name could not be set.', 'Error') - log.debug('Error in /name:\n%s', iq) - if args is None: - return self.core.command_help('name') - jid = safeJID(args[0]).bare - name = args[1] if len(args) == 2 else '' - - contact = roster[jid] - if contact is None: - self.core.information('No such JID in roster', 'Error') - return - - groups = set(contact.groups) - if 'none' in groups: - groups.remove('none') - subscription = contact.subscription - self.core.xmpp.update_roster(jid, name=name, groups=groups, - subscription=subscription, callback=callback) - - @command_args_parser.quoted(2) - def command_groupadd(self, args): - """ - Add the specified JID to the specified group - """ - if args is None: - return self.core.command_help('groupadd') - jid = safeJID(args[0]).bare - group = args[1] - - contact = roster[jid] - if contact is None: - self.core.information('No such JID in roster', 'Error') - return - - new_groups = set(contact.groups) - if group in new_groups: - self.core.information('JID already in group', 'Error') - return - - roster.modified() - new_groups.add(group) - try: - new_groups.remove('none') - except KeyError: - pass - - name = contact.name - subscription = contact.subscription - - def callback(iq): - if iq: - roster.update_contact_groups(jid) - else: - self.core.information('The group could not be set.', 'Error') - log.debug('Error in groupadd:\n%s', iq) - - self.core.xmpp.update_roster(jid, name=name, groups=new_groups, - subscription=subscription, callback=callback) - - @command_args_parser.quoted(3) - def command_groupmove(self, args): - """ - Remove the specified JID from the first specified group and add it to the second one - """ - if args is None: - return self.core.command_help('groupmove') - jid = safeJID(args[0]).bare - group_from = args[1] - group_to = args[2] - - contact = roster[jid] - if not contact: - self.core.information('No such JID in roster', 'Error') - return - - new_groups = set(contact.groups) - if 'none' in new_groups: - new_groups.remove('none') - - if group_to == 'none' or group_from == 'none': - self.core.information('"none" is not a group.', 'Error') - return - - if group_from not in new_groups: - self.core.information('JID not in first group', 'Error') - return - - if group_to in new_groups: - self.core.information('JID already in second group', 'Error') - return - - if group_to == group_from: - self.core.information('The groups are the same.', 'Error') - return - - roster.modified() - new_groups.add(group_to) - if 'none' in new_groups: - new_groups.remove('none') - - new_groups.remove(group_from) - name = contact.name - subscription = contact.subscription - - def callback(iq): - if iq: - roster.update_contact_groups(contact) - else: - self.core.information('The group could not be set') - log.debug('Error in groupmove:\n%s', iq) - - self.core.xmpp.update_roster(jid, name=name, groups=new_groups, - subscription=subscription, callback=callback) - - @command_args_parser.quoted(2) - def command_groupremove(self, args): - """ - Remove the specified JID from the specified group - """ - if args is None: - return self.core.command_help('groupremove') - - jid = safeJID(args[0]).bare - group = args[1] - - contact = roster[jid] - if contact is None: - self.core.information('No such JID in roster', 'Error') - return - - new_groups = set(contact.groups) - try: - new_groups.remove('none') - except KeyError: - pass - if group not in new_groups: - self.core.information('JID not in group', 'Error') - return - - roster.modified() - - new_groups.remove(group) - name = contact.name - subscription = contact.subscription - - def callback(iq): - if iq: - roster.update_contact_groups(jid) - else: - self.core.information('The group could not be set') - log.debug('Error in groupremove:\n%s', iq) - - self.core.xmpp.update_roster(jid, name=name, groups=new_groups, - subscription=subscription, callback=callback) - - @command_args_parser.quoted(0, 1) - def command_remove(self, args): - """ - Remove the specified JID from the roster. i.e.: unsubscribe - from its presence, and cancel its subscription to our. - """ - if args: - jid = safeJID(args[0]).bare - else: - item = self.roster_win.selected_row - if isinstance(item, Contact): - jid = item.bare_jid - else: - self.core.information('No roster item to remove') - return - roster.remove(jid) - del roster[jid] - - @command_args_parser.quoted(0, 1) - def command_import(self, args): - """ - Import the contacts - """ - if args: - if args[0].startswith('/'): - filepath = args[0] - else: - filepath = path.join(getenv('HOME'), args[0]) - else: - filepath = path.join(getenv('HOME'), 'poezio_contacts') - if not path.isfile(filepath): - self.core.information('The file %s does not exist' % filepath, 'Error') - return - try: - handle = open(filepath, 'r', encoding='utf-8') - lines = handle.readlines() - handle.close() - except IOError: - self.core.information('Could not open %s' % filepath, 'Error') - log.error('Unable to correct a message', exc_info=True) - return - for jid in lines: - self.command_add(jid.lstrip('\n')) - self.core.information('Contacts imported from %s' % filepath, 'Info') - - @command_args_parser.quoted(0, 1) - def command_export(self, args): - """ - Export the contacts - """ - if args: - if args[0].startswith('/'): - filepath = args[0] - else: - filepath = path.join(getenv('HOME'), args[0]) - else: - filepath = path.join(getenv('HOME'), 'poezio_contacts') - if path.isfile(filepath): - self.core.information('The file already exists', 'Error') - return - elif not path.isdir(path.dirname(filepath)): - self.core.information('Parent directory not found', 'Error') - return - if roster.export(filepath): - self.core.information('Contacts exported to %s' % filepath, 'Info') - else: - self.core.information('Failed to export contacts to %s' % filepath, 'Info') - - def completion_remove(self, the_input): - """ - Completion for /remove - """ - jids = [jid for jid in roster.jids()] - return the_input.auto_completion(jids, '', quotify=False) - - def completion_name(self, the_input): - """Completion for /name""" - n = the_input.get_argument_position() - if n == 1: - jids = [jid for jid in roster.jids()] - return the_input.new_completion(jids, n, quotify=True) - return False - - def completion_groupadd(self, the_input): - n = the_input.get_argument_position() - if n == 1: - jids = sorted(jid for jid in roster.jids()) - return the_input.new_completion(jids, n, '', quotify=True) - elif n == 2: - groups = sorted(group for group in roster.groups if group != 'none') - return the_input.new_completion(groups, n, '', quotify=True) - return False - - def completion_groupmove(self, the_input): - args = common.shell_split(the_input.text) - n = the_input.get_argument_position() - if n == 1: - jids = sorted(jid for jid in roster.jids()) - return the_input.new_completion(jids, n, '', quotify=True) - elif n == 2: - contact = roster[args[1]] - if not contact: - return False - groups = list(contact.groups) - if 'none' in groups: - groups.remove('none') - return the_input.new_completion(groups, n, '', quotify=True) - elif n == 3: - groups = sorted(group for group in roster.groups) - return the_input.new_completion(groups, n, '', quotify=True) - return False - - def completion_groupremove(self, the_input): - args = common.shell_split(the_input.text) - n = the_input.get_argument_position() - if n == 1: - jids = sorted(jid for jid in roster.jids()) - return the_input.new_completion(jids, n, '', quotify=True) - elif n == 2: - contact = roster[args[1]] - if contact is None: - return False - groups = sorted(contact.groups) - try: - groups.remove('none') - except ValueError: - pass - return the_input.new_completion(groups, n, '', quotify=True) - return False - - def completion_deny(self, the_input): - """ - Complete the first argument from the list of the - contact with ask=='subscribe' - """ - jids = sorted(str(contact.bare_jid) for contact in roster.contacts.values() - if contact.pending_in) - return the_input.new_completion(jids, 1, '', quotify=False) - - @command_args_parser.quoted(0, 1) - def command_accept(self, args): - """ - Accept a JID from in roster. Authorize it AND subscribe to it - """ - if not args: - item = self.roster_win.selected_row - if isinstance(item, Contact): - jid = item.bare_jid - else: - self.core.information('No subscription to accept') - return - else: - jid = safeJID(args[0]).bare - nodepart = safeJID(jid).user - jid = safeJID(jid) - # crappy transports putting resources inside the node part - if '\\2f' in nodepart: - jid.user = nodepart.split('\\2f')[0] - contact = roster[jid] - if contact is None: - return - contact.pending_in = False - roster.modified() - self.core.xmpp.send_presence(pto=jid, ptype='subscribed') - self.core.xmpp.client_roster.send_last_presence() - if contact.subscription in ('from', 'none') and not contact.pending_out: - self.core.xmpp.send_presence(pto=jid, ptype='subscribe', pnick=self.core.own_nick) - - self.core.information('%s is now authorized' % jid, 'Roster') - - def refresh(self): - if self.need_resize: - self.resize() - log.debug(' TAB Refresh: %s', self.__class__.__name__) - - display_info = not self.size.tab_degrade_x - display_contact_win = not self.size.tab_degrade_y - - self.roster_win.refresh(roster) - if display_info: - self.v_separator.refresh() - self.information_win.refresh() - if display_contact_win: - self.contact_info_win.refresh( - self.roster_win.get_selected_row()) - self.refresh_tab_win() - self.input.refresh() - - def on_input(self, key, raw): - if key == '^M': - selected_row = self.roster_win.get_selected_row() - res = self.input.do_command(key, raw=raw) - if res and not isinstance(self.input, windows.Input): - return True - elif res: - return False - if key == '^M': - self.core.on_roster_enter_key(selected_row) - return selected_row - elif not raw and key in self.key_func: - return self.key_func[key]() - - @refresh_wrapper.conditional - def toggle_offline_show(self): - """ - Show or hide offline contacts - """ - option = 'roster_show_offline' - value = config.get(option) - success = config.silent_set(option, str(not value)) - roster.modified() - if not success: - self.core.information('Unable to write in the config file', 'Error') - return True - - def on_slash(self): - """ - '/' is pressed, we enter "input mode" - """ - if isinstance(self.input, windows.YesNoInput): - return - curses.curs_set(1) - self.input = windows.CommandInput("", self.reset_help_message, self.execute_slash_command) - self.input.resize(1, self.width, self.height-1, 0) - self.input.do_command("/") # we add the slash - - def reset_help_message(self, _=None): - self.input = self.default_help_message - if self.core.current_tab() is self: - curses.curs_set(0) - self.input.refresh() - self.core.doupdate() - return True - - def execute_slash_command(self, txt): - if txt.startswith('/'): - self.input.key_enter() - self.execute_command(txt) - return self.reset_help_message() - - def on_lose_focus(self): - self.state = 'normal' - - def on_gain_focus(self): - self.state = 'current' - if isinstance(self.input, windows.HelpText): - curses.curs_set(0) - else: - curses.curs_set(1) - - @refresh_wrapper.conditional - def move_cursor_down(self): - if isinstance(self.input, windows.Input) and not self.input.history_disabled: - return - return self.roster_win.move_cursor_down() - - @refresh_wrapper.conditional - def move_cursor_up(self): - if isinstance(self.input, windows.Input) and not self.input.history_disabled: - return - return self.roster_win.move_cursor_up() - - def move_cursor_to_prev_contact(self): - self.roster_win.move_cursor_up() - while not isinstance(self.roster_win.get_selected_row(), Contact): - if not self.roster_win.move_cursor_up(): - break - self.roster_win.refresh(roster) - - def move_cursor_to_next_contact(self): - self.roster_win.move_cursor_down() - while not isinstance(self.roster_win.get_selected_row(), Contact): - if not self.roster_win.move_cursor_down(): - break - self.roster_win.refresh(roster) - - def move_cursor_to_prev_group(self): - self.roster_win.move_cursor_up() - while not isinstance(self.roster_win.get_selected_row(), RosterGroup): - if not self.roster_win.move_cursor_up(): - break - self.roster_win.refresh(roster) - - def move_cursor_to_next_group(self): - self.roster_win.move_cursor_down() - while not isinstance(self.roster_win.get_selected_row(), RosterGroup): - if not self.roster_win.move_cursor_down(): - break - self.roster_win.refresh(roster) - - def on_scroll_down(self): - return self.roster_win.move_cursor_down(self.height // 2) - - def on_scroll_up(self): - return self.roster_win.move_cursor_up(self.height // 2) - - @refresh_wrapper.conditional - def on_space(self): - if isinstance(self.input, windows.Input): - return - selected_row = self.roster_win.get_selected_row() - if isinstance(selected_row, RosterGroup): - selected_row.toggle_folded() - roster.modified() - return True - elif isinstance(selected_row, Contact): - group = "none" - found_group = False - pos = self.roster_win.pos - while not found_group and pos >= 0: - row = self.roster_win.roster_cache[pos] - pos -= 1 - if isinstance(row, RosterGroup): - found_group = True - group = row.name - selected_row.toggle_folded(group) - roster.modified() - return True - return False - - def get_contact_version(self): - """ - Show the versions of the resource(s) currently selected - """ - selected_row = self.roster_win.get_selected_row() - if isinstance(selected_row, Contact): - for resource in selected_row.resources: - self.core.command_version(str(resource.jid)) - elif isinstance(selected_row, Resource): - self.core.command_version(str(selected_row.jid)) - else: - self.core.information('Nothing to get versions from', 'Info') - - def show_contact_info(self): - """ - Show the contact info (resource number, status, presence, etc) - when 'i' is pressed. - """ - selected_row = self.roster_win.get_selected_row() - if isinstance(selected_row, Contact): - cont = selected_row - res = selected_row.get_highest_priority_resource() - acc = [] - acc.append('Contact: %s (%s)' % (cont.bare_jid, res.presence if res else 'unavailable')) - if res: - acc.append('%s connected resource%s' % (len(cont), '' if len(cont) == 1 else 's')) - acc.append('Current status: %s' % res.status) - if cont.tune: - acc.append('Tune: %s' % common.format_tune_string(cont.tune)) - if cont.mood: - acc.append('Mood: %s' % cont.mood) - if cont.activity: - acc.append('Activity: %s' % cont.activity) - if cont.gaming: - acc.append('Game: %s' % (common.format_gaming_string(cont.gaming))) - msg = '\n'.join(acc) - elif isinstance(selected_row, Resource): - res = selected_row - msg = 'Resource: %s (%s)\nCurrent status: %s\nPriority: %s' % ( - res.jid, - res.presence, - res.status, - res.priority) - elif isinstance(selected_row, RosterGroup): - rg = selected_row - msg = 'Group: %s [%s/%s] contacts online' % ( - rg.name, - rg.get_nb_connected_contacts(), - len(rg),) - else: - msg = None - if msg: - self.core.information(msg, 'Info') - - def change_contact_name(self): - """ - Auto-fill a /name command when 'n' is pressed - """ - selected_row = self.roster_win.get_selected_row() - if isinstance(selected_row, Contact): - jid = selected_row.bare_jid - elif isinstance(selected_row, Resource): - jid = safeJID(selected_row.jid).bare - else: - return - self.on_slash() - self.input.text = '/name "%s" ' % jid - self.input.key_end() - self.input.refresh() - - @refresh_wrapper.always - def start_search(self): - """ - Start the search. The input should appear with a short instruction - in it. - """ - if isinstance(self.input, windows.YesNoInput): - return - curses.curs_set(1) - self.input = windows.CommandInput("[Search]", self.on_search_terminate, self.on_search_terminate, self.set_roster_filter) - self.input.resize(1, self.width, self.height-1, 0) - self.input.disable_history() - roster.modified() - self.refresh() - return True - - @refresh_wrapper.always - def start_search_slow(self): - if isinstance(self.input, windows.YesNoInput): - return - curses.curs_set(1) - self.input = windows.CommandInput("[Search]", self.on_search_terminate, self.on_search_terminate, self.set_roster_filter_slow) - self.input.resize(1, self.width, self.height-1, 0) - self.input.disable_history() - return True - - def set_roster_filter_slow(self, txt): - roster.contact_filter = (jid_and_name_match_slow, txt) - roster.modified() - self.refresh() - return False - - def set_roster_filter(self, txt): - roster.contact_filter = (jid_and_name_match, txt) - roster.modified() - self.refresh() - return False - - @refresh_wrapper.always - def on_search_terminate(self, txt): - curses.curs_set(0) - roster.contact_filter = None - self.reset_help_message() - roster.modified() - return True - - def on_close(self): - return - -def diffmatch(search, string): - """ - Use difflib and a loop to check if search_pattern can - be 'almost' found INSIDE a string. - 'almost' being defined by difflib - """ - if len(search) > len(string): - return False - l = len(search) - ratio = 0.7 - for i in range(len(string) - l + 1): - if difflib.SequenceMatcher(None, search, string[i:i+l]).ratio() >= ratio: - return True - return False - -def jid_and_name_match(contact, txt): - """ - Match jid with text precisely - """ - if not txt: - return True - txt = txt.lower() - if txt in safeJID(contact.bare_jid).bare.lower(): - return True - if txt in contact.name.lower(): - return True - return False - -def jid_and_name_match_slow(contact, txt): - """ - A function used to know if a contact in the roster should - be shown in the roster - """ - if not txt: - return True # Everything matches when search is empty - user = safeJID(contact.bare_jid).bare - if diffmatch(txt, user): - return True - if contact.name and diffmatch(txt, contact.name): - return True - return False diff --git a/src/tabs/xmltab.py b/src/tabs/xmltab.py deleted file mode 100644 index b063ad35..00000000 --- a/src/tabs/xmltab.py +++ /dev/null @@ -1,360 +0,0 @@ -""" -The XMLTab is here for debugging purposes, it shows the incoming and -outgoing stanzas. It has a few useful functions that can filter stanzas -in order to only show the relevant ones, and it can also be frozen or -unfrozen on demand so that the relevant information is not drowned by -the traffic. -""" -import logging -log = logging.getLogger(__name__) - -import curses -import os -from slixmpp.xmlstream import matcher -from slixmpp.xmlstream.tostring import tostring -from slixmpp.xmlstream.stanzabase import ElementBase -from xml.etree import ElementTree as ET - -from . import Tab - -import text_buffer -import windows -from xhtml import clean_text -from decorators import command_args_parser, refresh_wrapper -from common import safeJID - - -class MatchJID(object): - - def __init__(self, jid, dest=''): - self.jid = jid - self.dest = dest - - def match(self, xml): - from_ = safeJID(xml['from']) - to_ = safeJID(xml['to']) - if self.jid.full == self.jid.bare: - from_ = from_.bare - to_ = to_.bare - - if self.dest == 'from': - return from_ == self.jid - elif self.dest == 'to': - return to_ == self.jid - return self.jid in (from_, to_) - - def __repr__(self): - return '%s%s%s' % (self.dest, ': ' if self.dest else '', self.jid) - -MATCHERS_MAPPINGS = { - MatchJID: ('JID', lambda obj: repr(obj)), - matcher.MatcherId: ('ID', lambda obj: obj._criteria), - matcher.MatchXMLMask: ('XMLMask', lambda obj: tostring(obj._criteria)), - matcher.MatchXPath: ('XPath', lambda obj: obj._criteria) -} - -class XMLTab(Tab): - def __init__(self): - Tab.__init__(self) - self.state = 'normal' - self.name = 'XMLTab' - self.filters = [] - - self.core_buffer = self.core.xml_buffer - self.filtered_buffer = text_buffer.TextBuffer() - - self.info_header = windows.XMLInfoWin() - self.text_win = windows.XMLTextWin() - self.core_buffer.add_window(self.text_win) - self.default_help_message = windows.HelpText("/ to enter a command") - - self.register_command('close', self.close, - shortdesc="Close this tab.") - self.register_command('clear', self.command_clear, - shortdesc='Clear the current buffer.') - self.register_command('reset', self.command_reset, - shortdesc='Reset the stanza filter.') - self.register_command('filter_id', self.command_filter_id, - usage='', - desc='Show only the stanzas with the id .', - shortdesc='Filter by id.') - self.register_command('filter_xpath', self.command_filter_xpath, - usage='', - desc='Show only the stanzas matching the xpath .' - ' Any occurrences of %n will be replaced by jabber:client.', - shortdesc='Filter by XPath.') - self.register_command('filter_jid', self.command_filter_jid, - usage='', - desc='Show only the stanzas matching the jid in from= or to=.', - shortdesc='Filter by JID.') - self.register_command('filter_from', self.command_filter_from, - usage='', - desc='Show only the stanzas matching the jid in from=.', - shortdesc='Filter by JID from.') - self.register_command('filter_to', self.command_filter_to, - usage='', - desc='Show only the stanzas matching the jid in to=.', - shortdesc='Filter by JID to.') - self.register_command('filter_xmlmask', self.command_filter_xmlmask, - usage='', - desc='Show only the stanzas matching the given xml mask.', - shortdesc='Filter by xml mask.') - self.register_command('dump', self.command_dump, - usage='', - desc='Writes the content of the XML buffer into a file.', - shortdesc='Write in a file.') - self.input = self.default_help_message - self.key_func['^T'] = self.close - self.key_func['^I'] = self.completion - self.key_func["KEY_DOWN"] = self.on_scroll_down - self.key_func["KEY_UP"] = self.on_scroll_up - self.key_func["^K"] = self.on_freeze - self.key_func["/"] = self.on_slash - self.resize() - # Used to display the infobar - self.filter_type = '' - self.filter = '' - - def gen_filter_repr(self): - if not self.filters: - self.filter_type = '' - self.filter = '' - return - filter_types = map(lambda x: MATCHERS_MAPPINGS[type(x)][0], self.filters) - filter_strings = map(lambda x: MATCHERS_MAPPINGS[type(x)][1](x), self.filters) - self.filter_type = ','.join(filter_types) - self.filter = ','.join(filter_strings) - - def update_filters(self, matcher): - if not self.filters: - messages = self.core_buffer.messages[:] - self.filtered_buffer.messages = [] - self.core_buffer.del_window(self.text_win) - self.filtered_buffer.add_window(self.text_win) - else: - messages = self.filtered_buffer.messages - self.filtered_buffer.messages = [] - self.filters.append(matcher) - new_messages = [] - for msg in messages: - try: - if msg.txt.strip() and self.match_stanza(ElementBase(ET.fromstring(clean_text(msg.txt)))): - new_messages.append(msg) - except ET.ParseError: - log.debug('Malformed XML : %s', msg.txt, exc_info=True) - self.filtered_buffer.messages = new_messages - self.text_win.rebuild_everything(self.filtered_buffer) - self.gen_filter_repr() - - def on_freeze(self): - """ - Freeze the display. - """ - self.text_win.toggle_lock() - self.refresh() - - def match_stanza(self, stanza): - for matcher in self.filters: - if not matcher.match(stanza): - return False - return True - - @command_args_parser.raw - def command_filter_xmlmask(self, mask): - """/filter_xmlmask """ - try: - self.update_filters(matcher.MatchXMLMask(mask)) - self.refresh() - except Exception as e: - self.core.information('Invalid XML Mask: %s' % e, 'Error') - self.command_reset('') - - @command_args_parser.raw - def command_filter_to(self, jid): - """/filter_jid_to """ - jid_obj = safeJID(jid) - if not jid_obj: - return self.core.information('Invalid JID: %s' % jid, 'Error') - - self.update_filters(MatchJID(jid_obj, dest='to')) - self.refresh() - - @command_args_parser.raw - def command_filter_from(self, jid): - """/filter_jid_from """ - jid_obj = safeJID(jid) - if not jid_obj: - return self.core.information('Invalid JID: %s' % jid, 'Error') - - self.update_filters(MatchJID(jid_obj, dest='from')) - self.refresh() - - @command_args_parser.raw - def command_filter_jid(self, jid): - """/filter_jid """ - jid_obj = safeJID(jid) - if not jid_obj: - return self.core.information('Invalid JID: %s' % jid, 'Error') - - self.update_filters(MatchJID(jid_obj)) - self.refresh() - - @command_args_parser.quoted(1) - def command_filter_id(self, args): - """/filter_id """ - if args is None: - return self.core.command_help('filter_id') - - self.update_filters(matcher.MatcherId(args[0])) - self.refresh() - - @command_args_parser.raw - def command_filter_xpath(self, xpath): - """/filter_xpath """ - try: - self.update_filters(matcher.MatchXPath(xpath.replace('%n', self.core.xmpp.default_ns))) - self.refresh() - except: - self.core.information('Invalid XML Path', 'Error') - self.command_reset('') - - @command_args_parser.ignored - def command_reset(self): - """/reset""" - if self.filters: - self.filters = [] - self.filtered_buffer.del_window(self.text_win) - self.core_buffer.add_window(self.text_win) - self.text_win.rebuild_everything(self.core_buffer) - self.filter_type = '' - self.filter = '' - self.refresh() - - @command_args_parser.quoted(1) - def command_dump(self, args): - """/dump """ - if args is None: - return self.core.command_help('dump') - if self.filters: - xml = self.filtered_buffer.messages[:] - else: - xml = self.core_buffer.messages[:] - text = '\n'.join(('%s %s %s' % (msg.str_time, msg.nickname, clean_text(msg.txt)) for msg in xml)) - filename = os.path.expandvars(os.path.expanduser(args[0])) - try: - with open(filename, 'w') as fd: - fd.write(text) - except Exception as e: - self.core.information('Could not write the XML dump: %s' % e, 'Error') - - def on_slash(self): - """ - '/' is pressed, activate the input - """ - curses.curs_set(1) - self.input = windows.CommandInput("", self.reset_help_message, self.execute_slash_command) - self.input.resize(1, self.width, self.height-1, 0) - self.input.do_command("/") # we add the slash - - @refresh_wrapper.always - def reset_help_message(self, _=None): - if self.closed: - return True - if self.core.current_tab() is self: - curses.curs_set(0) - self.input = self.default_help_message - return True - - def on_scroll_up(self): - return self.text_win.scroll_up(self.text_win.height-1) - - def on_scroll_down(self): - return self.text_win.scroll_down(self.text_win.height-1) - - @command_args_parser.ignored - def command_clear(self): - """ - /clear - """ - if self.filters: - buffer = self.core_buffer - else: - buffer = self.filtered_buffer - buffer.messages = [] - self.text_win.rebuild_everything(buffer) - self.refresh() - self.core.doupdate() - - def execute_slash_command(self, txt): - if txt.startswith('/'): - self.input.key_enter() - self.execute_command(txt) - return self.reset_help_message() - - def completion(self): - if isinstance(self.input, windows.Input): - self.complete_commands(self.input) - - def on_input(self, key, raw): - res = self.input.do_command(key, raw=raw) - if res: - return True - if not raw and key in self.key_func: - return self.key_func[key]() - - def close(self, arg=None): - self.core.close_tab() - - def resize(self): - self.need_resize = False - if self.size.tab_degrade_y: - info_win_size = 0 - tab_win_height = 0 - else: - info_win_size = self.core.information_win_size - tab_win_height = Tab.tab_win_height() - - self.text_win.resize(self.height - info_win_size - tab_win_height - 2, - self.width, 0, 0) - self.text_win.rebuild_everything(self.core.xml_buffer) - self.info_header.resize(1, self.width, - self.height - 2 - info_win_size - - tab_win_height, - 0) - self.input.resize(1, self.width, self.height-1, 0) - - def refresh(self): - if self.need_resize: - self.resize() - log.debug(' TAB Refresh: %s', self.__class__.__name__) - - if self.size.tab_degrade_y: - display_info_win = False - else: - display_info_win = True - - self.text_win.refresh() - self.info_header.refresh(self.filter_type, self.filter, self.text_win) - self.refresh_tab_win() - if display_info_win: - self.info_win.refresh() - self.input.refresh() - - def on_lose_focus(self): - self.state = 'normal' - - def on_gain_focus(self): - self.state = 'current' - curses.curs_set(0) - - def on_close(self): - self.command_clear('') - self.core.xml_tab = False - - def on_info_win_size_changed(self): - if self.core.information_win_size >= self.height-3: - return - self.text_win.resize(self.height-2-self.core.information_win_size - Tab.tab_win_height(), self.width, 0, 0) - self.info_header.resize(1, self.width, self.height-2-self.core.information_win_size - Tab.tab_win_height(), 0) - - -- cgit v1.2.3