diff options
Diffstat (limited to 'src/tabs/xmltab.py')
-rw-r--r-- | src/tabs/xmltab.py | 360 |
1 files changed, 0 insertions, 360 deletions
diff --git a/src/tabs/xmltab.py b/src/tabs/xmltab.py deleted file mode 100644 index b063ad35..00000000 --- a/src/tabs/xmltab.py +++ /dev/null @@ -1,360 +0,0 @@ -""" -The XMLTab is here for debugging purposes, it shows the incoming and -outgoing stanzas. It has a few useful functions that can filter stanzas -in order to only show the relevant ones, and it can also be frozen or -unfrozen on demand so that the relevant information is not drowned by -the traffic. -""" -import logging -log = logging.getLogger(__name__) - -import curses -import os -from slixmpp.xmlstream import matcher -from slixmpp.xmlstream.tostring import tostring -from slixmpp.xmlstream.stanzabase import ElementBase -from xml.etree import ElementTree as ET - -from . import Tab - -import text_buffer -import windows -from xhtml import clean_text -from decorators import command_args_parser, refresh_wrapper -from common import safeJID - - -class MatchJID(object): - - def __init__(self, jid, dest=''): - self.jid = jid - self.dest = dest - - def match(self, xml): - from_ = safeJID(xml['from']) - to_ = safeJID(xml['to']) - if self.jid.full == self.jid.bare: - from_ = from_.bare - to_ = to_.bare - - if self.dest == 'from': - return from_ == self.jid - elif self.dest == 'to': - return to_ == self.jid - return self.jid in (from_, to_) - - def __repr__(self): - return '%s%s%s' % (self.dest, ': ' if self.dest else '', self.jid) - -MATCHERS_MAPPINGS = { - MatchJID: ('JID', lambda obj: repr(obj)), - matcher.MatcherId: ('ID', lambda obj: obj._criteria), - matcher.MatchXMLMask: ('XMLMask', lambda obj: tostring(obj._criteria)), - matcher.MatchXPath: ('XPath', lambda obj: obj._criteria) -} - -class XMLTab(Tab): - def __init__(self): - Tab.__init__(self) - self.state = 'normal' - self.name = 'XMLTab' - self.filters = [] - - self.core_buffer = self.core.xml_buffer - self.filtered_buffer = text_buffer.TextBuffer() - - self.info_header = windows.XMLInfoWin() - self.text_win = windows.XMLTextWin() - self.core_buffer.add_window(self.text_win) - self.default_help_message = windows.HelpText("/ to enter a command") - - self.register_command('close', self.close, - shortdesc="Close this tab.") - self.register_command('clear', self.command_clear, - shortdesc='Clear the current buffer.') - self.register_command('reset', self.command_reset, - shortdesc='Reset the stanza filter.') - self.register_command('filter_id', self.command_filter_id, - usage='<id>', - desc='Show only the stanzas with the id <id>.', - shortdesc='Filter by id.') - self.register_command('filter_xpath', self.command_filter_xpath, - usage='<xpath>', - desc='Show only the stanzas matching the xpath <xpath>.' - ' Any occurrences of %n will be replaced by jabber:client.', - shortdesc='Filter by XPath.') - self.register_command('filter_jid', self.command_filter_jid, - usage='<jid>', - desc='Show only the stanzas matching the jid <jid> in from= or to=.', - shortdesc='Filter by JID.') - self.register_command('filter_from', self.command_filter_from, - usage='<jid>', - desc='Show only the stanzas matching the jid <jid> in from=.', - shortdesc='Filter by JID from.') - self.register_command('filter_to', self.command_filter_to, - usage='<jid>', - desc='Show only the stanzas matching the jid <jid> in to=.', - shortdesc='Filter by JID to.') - self.register_command('filter_xmlmask', self.command_filter_xmlmask, - usage='<xml mask>', - desc='Show only the stanzas matching the given xml mask.', - shortdesc='Filter by xml mask.') - self.register_command('dump', self.command_dump, - usage='<filename>', - desc='Writes the content of the XML buffer into a file.', - shortdesc='Write in a file.') - self.input = self.default_help_message - self.key_func['^T'] = self.close - self.key_func['^I'] = self.completion - self.key_func["KEY_DOWN"] = self.on_scroll_down - self.key_func["KEY_UP"] = self.on_scroll_up - self.key_func["^K"] = self.on_freeze - self.key_func["/"] = self.on_slash - self.resize() - # Used to display the infobar - self.filter_type = '' - self.filter = '' - - def gen_filter_repr(self): - if not self.filters: - self.filter_type = '' - self.filter = '' - return - filter_types = map(lambda x: MATCHERS_MAPPINGS[type(x)][0], self.filters) - filter_strings = map(lambda x: MATCHERS_MAPPINGS[type(x)][1](x), self.filters) - self.filter_type = ','.join(filter_types) - self.filter = ','.join(filter_strings) - - def update_filters(self, matcher): - if not self.filters: - messages = self.core_buffer.messages[:] - self.filtered_buffer.messages = [] - self.core_buffer.del_window(self.text_win) - self.filtered_buffer.add_window(self.text_win) - else: - messages = self.filtered_buffer.messages - self.filtered_buffer.messages = [] - self.filters.append(matcher) - new_messages = [] - for msg in messages: - try: - if msg.txt.strip() and self.match_stanza(ElementBase(ET.fromstring(clean_text(msg.txt)))): - new_messages.append(msg) - except ET.ParseError: - log.debug('Malformed XML : %s', msg.txt, exc_info=True) - self.filtered_buffer.messages = new_messages - self.text_win.rebuild_everything(self.filtered_buffer) - self.gen_filter_repr() - - def on_freeze(self): - """ - Freeze the display. - """ - self.text_win.toggle_lock() - self.refresh() - - def match_stanza(self, stanza): - for matcher in self.filters: - if not matcher.match(stanza): - return False - return True - - @command_args_parser.raw - def command_filter_xmlmask(self, mask): - """/filter_xmlmask <xml mask>""" - try: - self.update_filters(matcher.MatchXMLMask(mask)) - self.refresh() - except Exception as e: - self.core.information('Invalid XML Mask: %s' % e, 'Error') - self.command_reset('') - - @command_args_parser.raw - def command_filter_to(self, jid): - """/filter_jid_to <jid>""" - jid_obj = safeJID(jid) - if not jid_obj: - return self.core.information('Invalid JID: %s' % jid, 'Error') - - self.update_filters(MatchJID(jid_obj, dest='to')) - self.refresh() - - @command_args_parser.raw - def command_filter_from(self, jid): - """/filter_jid_from <jid>""" - jid_obj = safeJID(jid) - if not jid_obj: - return self.core.information('Invalid JID: %s' % jid, 'Error') - - self.update_filters(MatchJID(jid_obj, dest='from')) - self.refresh() - - @command_args_parser.raw - def command_filter_jid(self, jid): - """/filter_jid <jid>""" - jid_obj = safeJID(jid) - if not jid_obj: - return self.core.information('Invalid JID: %s' % jid, 'Error') - - self.update_filters(MatchJID(jid_obj)) - self.refresh() - - @command_args_parser.quoted(1) - def command_filter_id(self, args): - """/filter_id <id>""" - if args is None: - return self.core.command_help('filter_id') - - self.update_filters(matcher.MatcherId(args[0])) - self.refresh() - - @command_args_parser.raw - def command_filter_xpath(self, xpath): - """/filter_xpath <xpath>""" - try: - self.update_filters(matcher.MatchXPath(xpath.replace('%n', self.core.xmpp.default_ns))) - self.refresh() - except: - self.core.information('Invalid XML Path', 'Error') - self.command_reset('') - - @command_args_parser.ignored - def command_reset(self): - """/reset""" - if self.filters: - self.filters = [] - self.filtered_buffer.del_window(self.text_win) - self.core_buffer.add_window(self.text_win) - self.text_win.rebuild_everything(self.core_buffer) - self.filter_type = '' - self.filter = '' - self.refresh() - - @command_args_parser.quoted(1) - def command_dump(self, args): - """/dump <filename>""" - if args is None: - return self.core.command_help('dump') - if self.filters: - xml = self.filtered_buffer.messages[:] - else: - xml = self.core_buffer.messages[:] - text = '\n'.join(('%s %s %s' % (msg.str_time, msg.nickname, clean_text(msg.txt)) for msg in xml)) - filename = os.path.expandvars(os.path.expanduser(args[0])) - try: - with open(filename, 'w') as fd: - fd.write(text) - except Exception as e: - self.core.information('Could not write the XML dump: %s' % e, 'Error') - - def on_slash(self): - """ - '/' is pressed, activate the input - """ - curses.curs_set(1) - self.input = windows.CommandInput("", self.reset_help_message, self.execute_slash_command) - self.input.resize(1, self.width, self.height-1, 0) - self.input.do_command("/") # we add the slash - - @refresh_wrapper.always - def reset_help_message(self, _=None): - if self.closed: - return True - if self.core.current_tab() is self: - curses.curs_set(0) - self.input = self.default_help_message - return True - - def on_scroll_up(self): - return self.text_win.scroll_up(self.text_win.height-1) - - def on_scroll_down(self): - return self.text_win.scroll_down(self.text_win.height-1) - - @command_args_parser.ignored - def command_clear(self): - """ - /clear - """ - if self.filters: - buffer = self.core_buffer - else: - buffer = self.filtered_buffer - buffer.messages = [] - self.text_win.rebuild_everything(buffer) - self.refresh() - self.core.doupdate() - - def execute_slash_command(self, txt): - if txt.startswith('/'): - self.input.key_enter() - self.execute_command(txt) - return self.reset_help_message() - - def completion(self): - if isinstance(self.input, windows.Input): - self.complete_commands(self.input) - - def on_input(self, key, raw): - res = self.input.do_command(key, raw=raw) - if res: - return True - if not raw and key in self.key_func: - return self.key_func[key]() - - def close(self, arg=None): - self.core.close_tab() - - def resize(self): - self.need_resize = False - if self.size.tab_degrade_y: - info_win_size = 0 - tab_win_height = 0 - else: - info_win_size = self.core.information_win_size - tab_win_height = Tab.tab_win_height() - - self.text_win.resize(self.height - info_win_size - tab_win_height - 2, - self.width, 0, 0) - self.text_win.rebuild_everything(self.core.xml_buffer) - self.info_header.resize(1, self.width, - self.height - 2 - info_win_size - - tab_win_height, - 0) - self.input.resize(1, self.width, self.height-1, 0) - - def refresh(self): - if self.need_resize: - self.resize() - log.debug(' TAB Refresh: %s', self.__class__.__name__) - - if self.size.tab_degrade_y: - display_info_win = False - else: - display_info_win = True - - self.text_win.refresh() - self.info_header.refresh(self.filter_type, self.filter, self.text_win) - self.refresh_tab_win() - if display_info_win: - self.info_win.refresh() - self.input.refresh() - - def on_lose_focus(self): - self.state = 'normal' - - def on_gain_focus(self): - self.state = 'current' - curses.curs_set(0) - - def on_close(self): - self.command_clear('') - self.core.xml_tab = False - - def on_info_win_size_changed(self): - if self.core.information_win_size >= self.height-3: - return - self.text_win.resize(self.height-2-self.core.information_win_size - Tab.tab_win_height(), self.width, 0, 0) - self.info_header.resize(1, self.width, self.height-2-self.core.information_win_size - Tab.tab_win_height(), 0) - - |