summaryrefslogtreecommitdiff
path: root/src/tabs/basetabs.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/tabs/basetabs.py')
-rw-r--r--src/tabs/basetabs.py666
1 files changed, 666 insertions, 0 deletions
diff --git a/src/tabs/basetabs.py b/src/tabs/basetabs.py
new file mode 100644
index 00000000..86ba9e1f
--- /dev/null
+++ b/src/tabs/basetabs.py
@@ -0,0 +1,666 @@
+"""
+A Tab object is a way to organize various Windows (see windows.py)
+around the screen at once.
+A tab is then composed of multiple Buffers.
+Each Tab object has different refresh() and resize() methods, defining how its
+Windows are displayed, resized, etc.
+"""
+
+from gettext import gettext as _
+
+import logging
+log = logging.getLogger(__name__)
+
+import singleton
+import string
+import time
+import weakref
+from datetime import datetime, timedelta
+from xml.etree import cElementTree as ET
+
+import core
+import timed_events
+import windows
+import xhtml
+from common import safeJID
+from config import config
+from decorators import refresh_wrapper
+from logger import logger
+from text_buffer import TextBuffer, CorrectionError
+from theming import get_theme
+
+
+MIN_WIDTH = 42
+MIN_HEIGHT = 6
+
+STATE_COLORS = {
+ 'disconnected': lambda: get_theme().COLOR_TAB_DISCONNECTED,
+ 'scrolled': lambda: get_theme().COLOR_TAB_SCROLLED,
+ 'joined': lambda: get_theme().COLOR_TAB_JOINED,
+ 'message': lambda: get_theme().COLOR_TAB_NEW_MESSAGE,
+ 'highlight': lambda: get_theme().COLOR_TAB_HIGHLIGHT,
+ 'private': lambda: get_theme().COLOR_TAB_PRIVATE,
+ 'normal': lambda: get_theme().COLOR_TAB_NORMAL,
+ 'current': lambda: get_theme().COLOR_TAB_CURRENT,
+ 'attention': lambda: get_theme().COLOR_TAB_ATTENTION,
+ }
+
+VERTICAL_STATE_COLORS = {
+ 'disconnected': lambda: get_theme().COLOR_VERTICAL_TAB_DISCONNECTED,
+ 'scrolled': lambda: get_theme().COLOR_VERTICAL_TAB_SCROLLED,
+ 'joined': lambda: get_theme().COLOR_VERTICAL_TAB_JOINED,
+ 'message': lambda: get_theme().COLOR_VERTICAL_TAB_NEW_MESSAGE,
+ 'highlight': lambda: get_theme().COLOR_VERTICAL_TAB_HIGHLIGHT,
+ 'private': lambda: get_theme().COLOR_VERTICAL_TAB_PRIVATE,
+ 'normal': lambda: get_theme().COLOR_VERTICAL_TAB_NORMAL,
+ 'current': lambda: get_theme().COLOR_VERTICAL_TAB_CURRENT,
+ 'attention': lambda: get_theme().COLOR_VERTICAL_TAB_ATTENTION,
+ }
+
+
+STATE_PRIORITY = {
+ 'normal': -1,
+ 'current': -1,
+ 'disconnected': 0,
+ 'scrolled': 0.5,
+ 'message': 1,
+ 'joined': 1,
+ 'highlight': 2,
+ 'private': 2,
+ 'attention': 3
+ }
+
+class Tab(object):
+ tab_core = None
+
+ plugin_commands = {}
+ plugin_keys = {}
+ def __init__(self):
+ self.input = None
+ self._state = 'normal'
+
+ self.need_resize = False
+ self.need_resize = False
+ self.key_func = {} # each tab should add their keys in there
+ # and use them in on_input
+ self.commands = {} # and their own commands
+
+
+ @property
+ def core(self):
+ if not Tab.tab_core:
+ Tab.tab_core = singleton.Singleton(core.Core)
+ return Tab.tab_core
+
+ @property
+ def nb(self):
+ for index, tab in enumerate(self.core.tabs):
+ if tab == self:
+ return index
+ return len(self.core.tabs)
+
+ @property
+ def tab_win(self):
+ if not Tab.tab_core:
+ Tab.tab_core = singleton.Singleton(core.Core)
+ return Tab.tab_core.tab_win
+
+ @property
+ def left_tab_win(self):
+ if not Tab.tab_core:
+ Tab.tab_core = singleton.Singleton(core.Core)
+ return Tab.tab_core.left_tab_win
+
+ @staticmethod
+ def tab_win_height():
+ """
+ Returns 1 or 0, depending on if we are using the vertical tab list
+ or not.
+ """
+ if config.get('enable_vertical_tab_list', 'false') == 'true':
+ return 0
+ return 1
+
+ @property
+ def info_win(self):
+ return self.core.information_win
+
+ @property
+ def color(self):
+ return STATE_COLORS[self._state]()
+
+ @property
+ def vertical_color(self):
+ return VERTICAL_STATE_COLORS[self._state]()
+
+ @property
+ def state(self):
+ return self._state
+
+ @state.setter
+ def state(self, value):
+ if not value in STATE_COLORS:
+ log.debug("Invalid value for tab state: %s", value)
+ elif STATE_PRIORITY[value] < STATE_PRIORITY[self._state] and \
+ value not in ('current', 'disconnected') and \
+ not (self._state == 'scrolled' and value == 'disconnected'):
+ log.debug("Did not set state because of lower priority, asked: %s, kept: %s", value, self._state)
+ elif self._state == 'disconnected' and value not in ('joined', 'current'):
+ log.debug('Did not set state because disconnected tabs remain visible')
+ else:
+ self._state = value
+
+ @staticmethod
+ def resize(scr):
+ Tab.size = (Tab.height, Tab.width) = scr.getmaxyx()
+ if Tab.height < MIN_HEIGHT or Tab.width < MIN_WIDTH:
+ Tab.visible = False
+ else:
+ Tab.visible = True
+ windows.Win._tab_win = scr
+
+ def register_command(self, name, func, *, desc='', shortdesc='', completion=None, usage=''):
+ """
+ Add a command
+ """
+ if name in self.commands:
+ return
+ if not desc and shortdesc:
+ desc = shortdesc
+ self.commands[name] = core.Command(func, desc, completion, shortdesc, usage)
+
+ def complete_commands(self, the_input):
+ """
+ Does command completion on the specified input for both global and tab-specific
+ commands.
+ This should be called from the completion method (on tab, for example), passing
+ the input where completion is to be made.
+ It can completion the command name itself or an argument of the command.
+ Returns True if a completion was made, False else.
+ """
+ txt = the_input.get_text()
+ # check if this is a command
+ if txt.startswith('/') and not txt.startswith('//'):
+ position = the_input.get_argument_position(quoted=False)
+ if position == 0:
+ words = ['/%s'% (name) for name in sorted(self.core.commands)] +\
+ ['/%s' % (name) for name in sorted(self.commands)]
+ the_input.new_completion(words, 0)
+ # Do not try to cycle command completion if there was only
+ # one possibily. The next tab will complete the argument.
+ # Otherwise we would need to add a useless space before being
+ # able to complete the arguments.
+ hit_copy = set(the_input.hit_list)
+ while not hit_copy:
+ whitespace = the_input.text.find(' ')
+ if whitespace == -1:
+ whitespace = len(the_input.text)
+ the_input.text = the_input.text[:whitespace-1] + the_input.text[whitespace:]
+ the_input.new_completion(words, 0)
+ hit_copy = set(the_input.hit_list)
+ if len(hit_copy) == 1:
+ the_input.do_command(' ')
+ the_input.reset_completion()
+ return True
+ # check if we are in the middle of the command name
+ elif len(txt.split()) > 1 or\
+ (txt.endswith(' ') and not the_input.last_completion):
+ command_name = txt.split()[0][1:]
+ if command_name in self.commands:
+ command = self.commands[command_name]
+ elif command_name in self.core.commands:
+ command = self.core.commands[command_name]
+ else: # Unknown command, cannot complete
+ return False
+ if command[2] is None:
+ return False # There's no completion function
+ else:
+ return command[2](the_input)
+ return True
+ return False
+
+ def execute_command(self, provided_text):
+ """
+ Execute the command in the input and return False if
+ the input didn't contain a command
+ """
+ txt = provided_text or self.input.key_enter()
+ if txt.startswith('/') and not txt.startswith('//') and\
+ not txt.startswith('/me '):
+ command = txt.strip().split()[0][1:]
+ arg = txt[2+len(command):] # jump the '/' and the ' '
+ func = None
+ if command in self.commands: # check tab-specific commands
+ func = self.commands[command][0]
+ elif command in self.core.commands: # check global commands
+ func = self.core.commands[command][0]
+ else:
+ low = command.lower()
+ if low in self.commands:
+ func = self.commands[low][0]
+ elif low in self.core.commands:
+ func = self.core.commands[low][0]
+ else:
+ self.core.information(_("Unknown command (%s)") % (command), _('Error'))
+ if command in ('correct', 'say'): # hack
+ arg = xhtml.convert_simple_to_full_colors(arg)
+ else:
+ arg = xhtml.clean_text_simple(arg)
+ if func:
+ func(arg)
+ return True
+ else:
+ return False
+
+ def refresh_tab_win(self):
+ if self.left_tab_win:
+ self.left_tab_win.refresh()
+ else:
+ self.tab_win.refresh()
+
+ def refresh(self):
+ """
+ Called on each screen refresh (when something has changed)
+ """
+ pass
+
+ def get_name(self):
+ """
+ get the name of the tab
+ """
+ return self.__class__.__name__
+
+ def get_nick(self):
+ """
+ Get the nick of the tab (defaults to its name)
+ """
+ return self.get_name()
+
+ def get_text_window(self):
+ """
+ Returns the principal TextWin window, if there's one
+ """
+ return None
+
+ def on_input(self, key, raw):
+ """
+ raw indicates if the key should activate the associated command or not.
+ """
+ pass
+
+ def update_commands(self):
+ for c in self.plugin_commands:
+ if not c in self.commands:
+ self.commands[c] = self.plugin_commands[c]
+
+ def update_keys(self):
+ for k in self.plugin_keys:
+ if not k in self.key_func:
+ self.key_func[k] = self.plugin_keys[k]
+
+ def on_lose_focus(self):
+ """
+ called when this tab loses the focus.
+ """
+ self.state = 'normal'
+
+ def on_gain_focus(self):
+ """
+ called when this tab gains the focus.
+ """
+ self.state = 'current'
+
+ def on_scroll_down(self):
+ """
+ Defines what happens when we scroll down
+ """
+ pass
+
+ def on_scroll_up(self):
+ """
+ Defines what happens when we scroll up
+ """
+ pass
+
+ def on_line_up(self):
+ """
+ Defines what happens when we scroll one line up
+ """
+ pass
+
+ def on_line_down(self):
+ """
+ Defines what happens when we scroll one line up
+ """
+ pass
+
+ def on_half_scroll_down(self):
+ """
+ Defines what happens when we scroll half a screen down
+ """
+ pass
+
+ def on_half_scroll_up(self):
+ """
+ Defines what happens when we scroll half a screen up
+ """
+ pass
+
+ def on_info_win_size_changed(self):
+ """
+ Called when the window with the informations is resized
+ """
+ pass
+
+ def on_close(self):
+ """
+ Called when the tab is to be closed
+ """
+ if self.input:
+ self.input.on_delete()
+
+ def matching_names(self):
+ """
+ Returns a list of strings that are used to name a tab with the /win
+ command. For example you could switch to a tab that returns
+ ['hello', 'coucou'] using /win hel, or /win coucou
+ If not implemented in the tab, it just doesn’t match with anything.
+ """
+ return []
+
+ def __del__(self):
+ log.debug('------ Closing tab %s', self.__class__.__name__)
+
+class GapTab(Tab):
+
+ def __bool__(self):
+ return False
+
+ def __len__(self):
+ return 0
+
+ def get_name(self):
+ return ''
+
+ def refresh(self):
+ log.debug('WARNING: refresh() called on a gap tab, this should not happen')
+
+class ChatTab(Tab):
+ """
+ A tab containing a chat of any type.
+ Just use this class instead of Tab if the tab needs a recent-words completion
+ Also, ^M is already bound to on_enter
+ And also, add the /say command
+ """
+ plugin_commands = {}
+ plugin_keys = {}
+ def __init__(self, jid=''):
+ Tab.__init__(self)
+ self.name = jid
+ self.text_win = None
+ self._text_buffer = TextBuffer()
+ self.remote_wants_chatstates = None # change this to True or False when
+ # we know that the remote user wants chatstates, or not.
+ # None means we don’t know yet, and we send only "active" chatstates
+ self.chatstate = None # can be "active", "composing", "paused", "gone", "inactive"
+ # We keep a weakref of the event that will set our chatstate to "paused", so that
+ # we can delete it or change it if we need to
+ self.timed_event_paused = None
+ # if that’s None, then no paused chatstate was sent recently
+ # if that’s a weakref returning None, then a paused chatstate was sent
+ # since the last input
+ self.remote_supports_attention = False
+ # Keeps the last sent message to complete it easily in completion_correct, and to replace it.
+ self.last_sent_message = None
+ self.key_func['M-v'] = self.move_separator
+ self.key_func['M-h'] = self.scroll_separator
+ self.key_func['M-/'] = self.last_words_completion
+ self.key_func['^M'] = self.on_enter
+ self.register_command('say', self.command_say,
+ usage=_('<message>'),
+ shortdesc=_('Send the message.'))
+ self.register_command('xhtml', self.command_xhtml,
+ usage=_('<custom xhtml>'),
+ shortdesc=_('Send custom XHTML.'))
+ self.register_command('clear', self.command_clear,
+ shortdesc=_('Clear the current buffer.'))
+ self.register_command('correct', self.command_correct,
+ desc=_('Fix the last message with whatever you want.'),
+ shortdesc=_('Correct the last message.'),
+ completion=self.completion_correct)
+ self.chat_state = None
+ self.update_commands()
+ self.update_keys()
+
+ # Get the logs
+ log_nb = config.get('load_log', 10)
+ logs = self.load_logs(log_nb)
+
+ if logs:
+ for message in logs:
+ self._text_buffer.add_message(**message)
+
+ @property
+ def is_muc(self):
+ return False
+
+ def load_logs(self, log_nb):
+ logs = logger.get_logs(safeJID(self.get_name()).bare, log_nb)
+
+ def log_message(self, txt, nickname, time=None, typ=1):
+ """
+ Log the messages in the archives.
+ """
+ name = safeJID(self.name).bare
+ if not logger.log_message(name, nickname, txt, date=time, typ=typ):
+ self.core.information(_('Unable to write in the log file'), 'Error')
+
+ def add_message(self, txt, time=None, nickname=None, forced_user=None, nick_color=None, identifier=None, jid=None, history=None, typ=1):
+ self.log_message(txt, nickname, time=time, typ=typ)
+ self._text_buffer.add_message(txt, time=time,
+ nickname=nickname,
+ nick_color=nick_color,
+ history=history,
+ user=forced_user,
+ identifier=identifier,
+ jid=jid)
+
+ def modify_message(self, txt, old_id, new_id, user=None,jid=None, nickname=None):
+ self.log_message(txt, nickname, typ=1)
+ message = self._text_buffer.modify_message(txt, old_id, new_id, time=time, user=user, jid=jid)
+ if message:
+ self.text_win.modify_message(old_id, message)
+ self.core.refresh_window()
+ return True
+ return False
+
+ def last_words_completion(self):
+ """
+ Complete the input with words recently said
+ """
+ # build the list of the recent words
+ char_we_dont_want = string.punctuation+' ’„“”…«»'
+ words = list()
+ for msg in self._text_buffer.messages[:-40:-1]:
+ if not msg:
+ continue
+ txt = xhtml.clean_text(msg.txt)
+ for char in char_we_dont_want:
+ txt = txt.replace(char, ' ')
+ for word in txt.split():
+ if len(word) >= 4 and word not in words:
+ words.append(word)
+ words.extend([word for word in config.get('words', '').split(':') if word])
+ self.input.auto_completion(words, ' ', quotify=False)
+
+ def on_enter(self):
+ txt = self.input.key_enter()
+ if txt:
+ if not self.execute_command(txt):
+ if txt.startswith('//'):
+ txt = txt[1:]
+ self.command_say(xhtml.convert_simple_to_full_colors(txt))
+ self.cancel_paused_delay()
+
+ def command_xhtml(self, arg):
+ """"
+ /xhtml <custom xhtml>
+ """
+ message = self.generate_xhtml_message(arg)
+ if message:
+ message.send()
+
+ def generate_xhtml_message(self, arg):
+ if not arg:
+ return
+ try:
+ body = xhtml.clean_text(xhtml.xhtml_to_poezio_colors(arg))
+ # The <body /> element is the only allowable child of the <xhtm-im>
+ arg = "<body xmlns='http://www.w3.org/1999/xhtml'>%s</body>" % (arg,)
+ ET.fromstring(arg)
+ except:
+ self.core.information('Could not send custom xhtml', 'Error')
+ log.error('/xhtml: Unable to send custom xhtml', exc_info=True)
+ return
+
+ msg = self.core.xmpp.make_message(self.get_dest_jid())
+ msg['body'] = body
+ msg.enable('html')
+ msg['html']['body'] = arg
+ return msg
+
+ def get_dest_jid(self):
+ return self.get_name()
+
+ @refresh_wrapper.always
+ def command_clear(self, args):
+ """
+ /clear
+ """
+ self._text_buffer.messages = []
+ self.text_win.rebuild_everything(self._text_buffer)
+
+ def send_chat_state(self, state, always_send=False):
+ """
+ Send an empty chatstate message
+ """
+ if not self.is_muc or self.joined:
+ if state in ('active', 'inactive', 'gone') and self.inactive and not always_send:
+ return
+ if config.get_by_tabname('send_chat_states', 'true', self.general_jid, True) and \
+ self.remote_wants_chatstates is not False:
+ msg = self.core.xmpp.make_message(self.get_dest_jid())
+ msg['type'] = self.message_type
+ msg['chat_state'] = state
+ self.chat_state = state
+ msg.send()
+
+ def send_composing_chat_state(self, empty_after):
+ """
+ Send the "active" or "composing" chatstate, depending
+ on the the current status of the input
+ """
+ name = self.general_jid
+ if config.get_by_tabname('send_chat_states', 'true', name, True) == 'true' and self.remote_wants_chatstates:
+ needed = 'inactive' if self.inactive else 'active'
+ self.cancel_paused_delay()
+ if not empty_after:
+ if self.chat_state != "composing":
+ self.send_chat_state("composing")
+ self.set_paused_delay(True)
+ elif empty_after and self.chat_state != needed:
+ self.send_chat_state(needed, True)
+
+ def set_paused_delay(self, composing):
+ """
+ we create a timed event that will put us to paused
+ in a few seconds
+ """
+ if config.get_by_tabname('send_chat_states', 'true', self.general_jid, True) != 'true':
+ return
+ if self.timed_event_paused:
+ # check the weakref
+ event = self.timed_event_paused()
+ if event:
+ # the event already exists: we just update
+ # its date
+ event.change_date(datetime.now() + timedelta(seconds=4))
+ return
+ new_event = timed_events.DelayedEvent(4, self.send_chat_state, 'paused')
+ self.core.add_timed_event(new_event)
+ self.timed_event_paused = weakref.ref(new_event)
+
+ def cancel_paused_delay(self):
+ """
+ Remove that event from the list and set it to None.
+ Called for example when the input is emptied, or when the message
+ is sent
+ """
+ if self.timed_event_paused:
+ event = self.timed_event_paused()
+ if event:
+ self.core.remove_timed_event(event)
+ del event
+ self.timed_event_paused = None
+
+ def command_correct(self, line):
+ """
+ /correct <fixed message>
+ """
+ if not line:
+ self.core.command_help('correct')
+ return
+ if not self.last_sent_message:
+ self.core.information(_('There is no message to correct.'))
+ return
+ self.command_say(line, correct=True)
+
+ def completion_correct(self, the_input):
+ if self.last_sent_message and the_input.get_argument_position() == 1:
+ return the_input.auto_completion([self.last_sent_message['body']], '', quotify=False)
+
+ @property
+ def inactive(self):
+ """Whether we should send inactive or active as a chatstate"""
+ return self.core.status.show in ('xa', 'away') or\
+ (hasattr(self, 'directed_presence') and not self.directed_presence)
+
+ def move_separator(self):
+ self.text_win.remove_line_separator()
+ self.text_win.add_line_separator(self._text_buffer)
+ self.text_win.refresh()
+ self.input.refresh()
+
+ def get_conversation_messages(self):
+ return self._text_buffer.messages
+
+ def check_scrolled(self):
+ if self.text_win.pos != 0:
+ self.state = 'scrolled'
+
+ def command_say(self, line, correct=False):
+ pass
+
+ def on_line_up(self):
+ return self.text_win.scroll_up(1)
+
+ def on_line_down(self):
+ return self.text_win.scroll_down(1)
+
+ def on_scroll_up(self):
+ return self.text_win.scroll_up(self.text_win.height-1)
+
+ def on_scroll_down(self):
+ return self.text_win.scroll_down(self.text_win.height-1)
+
+ def on_half_scroll_up(self):
+ return self.text_win.scroll_up((self.text_win.height-1) // 2)
+
+ def on_half_scroll_down(self):
+ return self.text_win.scroll_down((self.text_win.height-1) // 2)
+
+ @refresh_wrapper.always
+ def scroll_separator(self):
+ self.text_win.scroll_to_separator()
+
+