diff options
Diffstat (limited to 'src/tabs/basetabs.py')
-rw-r--r-- | src/tabs/basetabs.py | 666 |
1 files changed, 666 insertions, 0 deletions
diff --git a/src/tabs/basetabs.py b/src/tabs/basetabs.py new file mode 100644 index 00000000..86ba9e1f --- /dev/null +++ b/src/tabs/basetabs.py @@ -0,0 +1,666 @@ +""" +A Tab object is a way to organize various Windows (see windows.py) +around the screen at once. +A tab is then composed of multiple Buffers. +Each Tab object has different refresh() and resize() methods, defining how its +Windows are displayed, resized, etc. +""" + +from gettext import gettext as _ + +import logging +log = logging.getLogger(__name__) + +import singleton +import string +import time +import weakref +from datetime import datetime, timedelta +from xml.etree import cElementTree as ET + +import core +import timed_events +import windows +import xhtml +from common import safeJID +from config import config +from decorators import refresh_wrapper +from logger import logger +from text_buffer import TextBuffer, CorrectionError +from theming import get_theme + + +MIN_WIDTH = 42 +MIN_HEIGHT = 6 + +STATE_COLORS = { + 'disconnected': lambda: get_theme().COLOR_TAB_DISCONNECTED, + 'scrolled': lambda: get_theme().COLOR_TAB_SCROLLED, + 'joined': lambda: get_theme().COLOR_TAB_JOINED, + 'message': lambda: get_theme().COLOR_TAB_NEW_MESSAGE, + 'highlight': lambda: get_theme().COLOR_TAB_HIGHLIGHT, + 'private': lambda: get_theme().COLOR_TAB_PRIVATE, + 'normal': lambda: get_theme().COLOR_TAB_NORMAL, + 'current': lambda: get_theme().COLOR_TAB_CURRENT, + 'attention': lambda: get_theme().COLOR_TAB_ATTENTION, + } + +VERTICAL_STATE_COLORS = { + 'disconnected': lambda: get_theme().COLOR_VERTICAL_TAB_DISCONNECTED, + 'scrolled': lambda: get_theme().COLOR_VERTICAL_TAB_SCROLLED, + 'joined': lambda: get_theme().COLOR_VERTICAL_TAB_JOINED, + 'message': lambda: get_theme().COLOR_VERTICAL_TAB_NEW_MESSAGE, + 'highlight': lambda: get_theme().COLOR_VERTICAL_TAB_HIGHLIGHT, + 'private': lambda: get_theme().COLOR_VERTICAL_TAB_PRIVATE, + 'normal': lambda: get_theme().COLOR_VERTICAL_TAB_NORMAL, + 'current': lambda: get_theme().COLOR_VERTICAL_TAB_CURRENT, + 'attention': lambda: get_theme().COLOR_VERTICAL_TAB_ATTENTION, + } + + +STATE_PRIORITY = { + 'normal': -1, + 'current': -1, + 'disconnected': 0, + 'scrolled': 0.5, + 'message': 1, + 'joined': 1, + 'highlight': 2, + 'private': 2, + 'attention': 3 + } + +class Tab(object): + tab_core = None + + plugin_commands = {} + plugin_keys = {} + def __init__(self): + self.input = None + self._state = 'normal' + + self.need_resize = False + self.need_resize = False + self.key_func = {} # each tab should add their keys in there + # and use them in on_input + self.commands = {} # and their own commands + + + @property + def core(self): + if not Tab.tab_core: + Tab.tab_core = singleton.Singleton(core.Core) + return Tab.tab_core + + @property + def nb(self): + for index, tab in enumerate(self.core.tabs): + if tab == self: + return index + return len(self.core.tabs) + + @property + def tab_win(self): + if not Tab.tab_core: + Tab.tab_core = singleton.Singleton(core.Core) + return Tab.tab_core.tab_win + + @property + def left_tab_win(self): + if not Tab.tab_core: + Tab.tab_core = singleton.Singleton(core.Core) + return Tab.tab_core.left_tab_win + + @staticmethod + def tab_win_height(): + """ + Returns 1 or 0, depending on if we are using the vertical tab list + or not. + """ + if config.get('enable_vertical_tab_list', 'false') == 'true': + return 0 + return 1 + + @property + def info_win(self): + return self.core.information_win + + @property + def color(self): + return STATE_COLORS[self._state]() + + @property + def vertical_color(self): + return VERTICAL_STATE_COLORS[self._state]() + + @property + def state(self): + return self._state + + @state.setter + def state(self, value): + if not value in STATE_COLORS: + log.debug("Invalid value for tab state: %s", value) + elif STATE_PRIORITY[value] < STATE_PRIORITY[self._state] and \ + value not in ('current', 'disconnected') and \ + not (self._state == 'scrolled' and value == 'disconnected'): + log.debug("Did not set state because of lower priority, asked: %s, kept: %s", value, self._state) + elif self._state == 'disconnected' and value not in ('joined', 'current'): + log.debug('Did not set state because disconnected tabs remain visible') + else: + self._state = value + + @staticmethod + def resize(scr): + Tab.size = (Tab.height, Tab.width) = scr.getmaxyx() + if Tab.height < MIN_HEIGHT or Tab.width < MIN_WIDTH: + Tab.visible = False + else: + Tab.visible = True + windows.Win._tab_win = scr + + def register_command(self, name, func, *, desc='', shortdesc='', completion=None, usage=''): + """ + Add a command + """ + if name in self.commands: + return + if not desc and shortdesc: + desc = shortdesc + self.commands[name] = core.Command(func, desc, completion, shortdesc, usage) + + def complete_commands(self, the_input): + """ + Does command completion on the specified input for both global and tab-specific + commands. + This should be called from the completion method (on tab, for example), passing + the input where completion is to be made. + It can completion the command name itself or an argument of the command. + Returns True if a completion was made, False else. + """ + txt = the_input.get_text() + # check if this is a command + if txt.startswith('/') and not txt.startswith('//'): + position = the_input.get_argument_position(quoted=False) + if position == 0: + words = ['/%s'% (name) for name in sorted(self.core.commands)] +\ + ['/%s' % (name) for name in sorted(self.commands)] + the_input.new_completion(words, 0) + # Do not try to cycle command completion if there was only + # one possibily. The next tab will complete the argument. + # Otherwise we would need to add a useless space before being + # able to complete the arguments. + hit_copy = set(the_input.hit_list) + while not hit_copy: + whitespace = the_input.text.find(' ') + if whitespace == -1: + whitespace = len(the_input.text) + the_input.text = the_input.text[:whitespace-1] + the_input.text[whitespace:] + the_input.new_completion(words, 0) + hit_copy = set(the_input.hit_list) + if len(hit_copy) == 1: + the_input.do_command(' ') + the_input.reset_completion() + return True + # check if we are in the middle of the command name + elif len(txt.split()) > 1 or\ + (txt.endswith(' ') and not the_input.last_completion): + command_name = txt.split()[0][1:] + if command_name in self.commands: + command = self.commands[command_name] + elif command_name in self.core.commands: + command = self.core.commands[command_name] + else: # Unknown command, cannot complete + return False + if command[2] is None: + return False # There's no completion function + else: + return command[2](the_input) + return True + return False + + def execute_command(self, provided_text): + """ + Execute the command in the input and return False if + the input didn't contain a command + """ + txt = provided_text or self.input.key_enter() + if txt.startswith('/') and not txt.startswith('//') and\ + not txt.startswith('/me '): + command = txt.strip().split()[0][1:] + arg = txt[2+len(command):] # jump the '/' and the ' ' + func = None + if command in self.commands: # check tab-specific commands + func = self.commands[command][0] + elif command in self.core.commands: # check global commands + func = self.core.commands[command][0] + else: + low = command.lower() + if low in self.commands: + func = self.commands[low][0] + elif low in self.core.commands: + func = self.core.commands[low][0] + else: + self.core.information(_("Unknown command (%s)") % (command), _('Error')) + if command in ('correct', 'say'): # hack + arg = xhtml.convert_simple_to_full_colors(arg) + else: + arg = xhtml.clean_text_simple(arg) + if func: + func(arg) + return True + else: + return False + + def refresh_tab_win(self): + if self.left_tab_win: + self.left_tab_win.refresh() + else: + self.tab_win.refresh() + + def refresh(self): + """ + Called on each screen refresh (when something has changed) + """ + pass + + def get_name(self): + """ + get the name of the tab + """ + return self.__class__.__name__ + + def get_nick(self): + """ + Get the nick of the tab (defaults to its name) + """ + return self.get_name() + + def get_text_window(self): + """ + Returns the principal TextWin window, if there's one + """ + return None + + def on_input(self, key, raw): + """ + raw indicates if the key should activate the associated command or not. + """ + pass + + def update_commands(self): + for c in self.plugin_commands: + if not c in self.commands: + self.commands[c] = self.plugin_commands[c] + + def update_keys(self): + for k in self.plugin_keys: + if not k in self.key_func: + self.key_func[k] = self.plugin_keys[k] + + def on_lose_focus(self): + """ + called when this tab loses the focus. + """ + self.state = 'normal' + + def on_gain_focus(self): + """ + called when this tab gains the focus. + """ + self.state = 'current' + + def on_scroll_down(self): + """ + Defines what happens when we scroll down + """ + pass + + def on_scroll_up(self): + """ + Defines what happens when we scroll up + """ + pass + + def on_line_up(self): + """ + Defines what happens when we scroll one line up + """ + pass + + def on_line_down(self): + """ + Defines what happens when we scroll one line up + """ + pass + + def on_half_scroll_down(self): + """ + Defines what happens when we scroll half a screen down + """ + pass + + def on_half_scroll_up(self): + """ + Defines what happens when we scroll half a screen up + """ + pass + + def on_info_win_size_changed(self): + """ + Called when the window with the informations is resized + """ + pass + + def on_close(self): + """ + Called when the tab is to be closed + """ + if self.input: + self.input.on_delete() + + def matching_names(self): + """ + Returns a list of strings that are used to name a tab with the /win + command. For example you could switch to a tab that returns + ['hello', 'coucou'] using /win hel, or /win coucou + If not implemented in the tab, it just doesn’t match with anything. + """ + return [] + + def __del__(self): + log.debug('------ Closing tab %s', self.__class__.__name__) + +class GapTab(Tab): + + def __bool__(self): + return False + + def __len__(self): + return 0 + + def get_name(self): + return '' + + def refresh(self): + log.debug('WARNING: refresh() called on a gap tab, this should not happen') + +class ChatTab(Tab): + """ + A tab containing a chat of any type. + Just use this class instead of Tab if the tab needs a recent-words completion + Also, ^M is already bound to on_enter + And also, add the /say command + """ + plugin_commands = {} + plugin_keys = {} + def __init__(self, jid=''): + Tab.__init__(self) + self.name = jid + self.text_win = None + self._text_buffer = TextBuffer() + self.remote_wants_chatstates = None # change this to True or False when + # we know that the remote user wants chatstates, or not. + # None means we don’t know yet, and we send only "active" chatstates + self.chatstate = None # can be "active", "composing", "paused", "gone", "inactive" + # We keep a weakref of the event that will set our chatstate to "paused", so that + # we can delete it or change it if we need to + self.timed_event_paused = None + # if that’s None, then no paused chatstate was sent recently + # if that’s a weakref returning None, then a paused chatstate was sent + # since the last input + self.remote_supports_attention = False + # Keeps the last sent message to complete it easily in completion_correct, and to replace it. + self.last_sent_message = None + self.key_func['M-v'] = self.move_separator + self.key_func['M-h'] = self.scroll_separator + self.key_func['M-/'] = self.last_words_completion + self.key_func['^M'] = self.on_enter + self.register_command('say', self.command_say, + usage=_('<message>'), + shortdesc=_('Send the message.')) + self.register_command('xhtml', self.command_xhtml, + usage=_('<custom xhtml>'), + shortdesc=_('Send custom XHTML.')) + self.register_command('clear', self.command_clear, + shortdesc=_('Clear the current buffer.')) + self.register_command('correct', self.command_correct, + desc=_('Fix the last message with whatever you want.'), + shortdesc=_('Correct the last message.'), + completion=self.completion_correct) + self.chat_state = None + self.update_commands() + self.update_keys() + + # Get the logs + log_nb = config.get('load_log', 10) + logs = self.load_logs(log_nb) + + if logs: + for message in logs: + self._text_buffer.add_message(**message) + + @property + def is_muc(self): + return False + + def load_logs(self, log_nb): + logs = logger.get_logs(safeJID(self.get_name()).bare, log_nb) + + def log_message(self, txt, nickname, time=None, typ=1): + """ + Log the messages in the archives. + """ + name = safeJID(self.name).bare + if not logger.log_message(name, nickname, txt, date=time, typ=typ): + self.core.information(_('Unable to write in the log file'), 'Error') + + def add_message(self, txt, time=None, nickname=None, forced_user=None, nick_color=None, identifier=None, jid=None, history=None, typ=1): + self.log_message(txt, nickname, time=time, typ=typ) + self._text_buffer.add_message(txt, time=time, + nickname=nickname, + nick_color=nick_color, + history=history, + user=forced_user, + identifier=identifier, + jid=jid) + + def modify_message(self, txt, old_id, new_id, user=None,jid=None, nickname=None): + self.log_message(txt, nickname, typ=1) + message = self._text_buffer.modify_message(txt, old_id, new_id, time=time, user=user, jid=jid) + if message: + self.text_win.modify_message(old_id, message) + self.core.refresh_window() + return True + return False + + def last_words_completion(self): + """ + Complete the input with words recently said + """ + # build the list of the recent words + char_we_dont_want = string.punctuation+' ’„“”…«»' + words = list() + for msg in self._text_buffer.messages[:-40:-1]: + if not msg: + continue + txt = xhtml.clean_text(msg.txt) + for char in char_we_dont_want: + txt = txt.replace(char, ' ') + for word in txt.split(): + if len(word) >= 4 and word not in words: + words.append(word) + words.extend([word for word in config.get('words', '').split(':') if word]) + self.input.auto_completion(words, ' ', quotify=False) + + def on_enter(self): + txt = self.input.key_enter() + if txt: + if not self.execute_command(txt): + if txt.startswith('//'): + txt = txt[1:] + self.command_say(xhtml.convert_simple_to_full_colors(txt)) + self.cancel_paused_delay() + + def command_xhtml(self, arg): + """" + /xhtml <custom xhtml> + """ + message = self.generate_xhtml_message(arg) + if message: + message.send() + + def generate_xhtml_message(self, arg): + if not arg: + return + try: + body = xhtml.clean_text(xhtml.xhtml_to_poezio_colors(arg)) + # The <body /> element is the only allowable child of the <xhtm-im> + arg = "<body xmlns='http://www.w3.org/1999/xhtml'>%s</body>" % (arg,) + ET.fromstring(arg) + except: + self.core.information('Could not send custom xhtml', 'Error') + log.error('/xhtml: Unable to send custom xhtml', exc_info=True) + return + + msg = self.core.xmpp.make_message(self.get_dest_jid()) + msg['body'] = body + msg.enable('html') + msg['html']['body'] = arg + return msg + + def get_dest_jid(self): + return self.get_name() + + @refresh_wrapper.always + def command_clear(self, args): + """ + /clear + """ + self._text_buffer.messages = [] + self.text_win.rebuild_everything(self._text_buffer) + + def send_chat_state(self, state, always_send=False): + """ + Send an empty chatstate message + """ + if not self.is_muc or self.joined: + if state in ('active', 'inactive', 'gone') and self.inactive and not always_send: + return + if config.get_by_tabname('send_chat_states', 'true', self.general_jid, True) and \ + self.remote_wants_chatstates is not False: + msg = self.core.xmpp.make_message(self.get_dest_jid()) + msg['type'] = self.message_type + msg['chat_state'] = state + self.chat_state = state + msg.send() + + def send_composing_chat_state(self, empty_after): + """ + Send the "active" or "composing" chatstate, depending + on the the current status of the input + """ + name = self.general_jid + if config.get_by_tabname('send_chat_states', 'true', name, True) == 'true' and self.remote_wants_chatstates: + needed = 'inactive' if self.inactive else 'active' + self.cancel_paused_delay() + if not empty_after: + if self.chat_state != "composing": + self.send_chat_state("composing") + self.set_paused_delay(True) + elif empty_after and self.chat_state != needed: + self.send_chat_state(needed, True) + + def set_paused_delay(self, composing): + """ + we create a timed event that will put us to paused + in a few seconds + """ + if config.get_by_tabname('send_chat_states', 'true', self.general_jid, True) != 'true': + return + if self.timed_event_paused: + # check the weakref + event = self.timed_event_paused() + if event: + # the event already exists: we just update + # its date + event.change_date(datetime.now() + timedelta(seconds=4)) + return + new_event = timed_events.DelayedEvent(4, self.send_chat_state, 'paused') + self.core.add_timed_event(new_event) + self.timed_event_paused = weakref.ref(new_event) + + def cancel_paused_delay(self): + """ + Remove that event from the list and set it to None. + Called for example when the input is emptied, or when the message + is sent + """ + if self.timed_event_paused: + event = self.timed_event_paused() + if event: + self.core.remove_timed_event(event) + del event + self.timed_event_paused = None + + def command_correct(self, line): + """ + /correct <fixed message> + """ + if not line: + self.core.command_help('correct') + return + if not self.last_sent_message: + self.core.information(_('There is no message to correct.')) + return + self.command_say(line, correct=True) + + def completion_correct(self, the_input): + if self.last_sent_message and the_input.get_argument_position() == 1: + return the_input.auto_completion([self.last_sent_message['body']], '', quotify=False) + + @property + def inactive(self): + """Whether we should send inactive or active as a chatstate""" + return self.core.status.show in ('xa', 'away') or\ + (hasattr(self, 'directed_presence') and not self.directed_presence) + + def move_separator(self): + self.text_win.remove_line_separator() + self.text_win.add_line_separator(self._text_buffer) + self.text_win.refresh() + self.input.refresh() + + def get_conversation_messages(self): + return self._text_buffer.messages + + def check_scrolled(self): + if self.text_win.pos != 0: + self.state = 'scrolled' + + def command_say(self, line, correct=False): + pass + + def on_line_up(self): + return self.text_win.scroll_up(1) + + def on_line_down(self): + return self.text_win.scroll_down(1) + + def on_scroll_up(self): + return self.text_win.scroll_up(self.text_win.height-1) + + def on_scroll_down(self): + return self.text_win.scroll_down(self.text_win.height-1) + + def on_half_scroll_up(self): + return self.text_win.scroll_up((self.text_win.height-1) // 2) + + def on_half_scroll_down(self): + return self.text_win.scroll_down((self.text_win.height-1) // 2) + + @refresh_wrapper.always + def scroll_separator(self): + self.text_win.scroll_to_separator() + + |