""" Plugin manager module. Define the PluginManager class, the one that glues all the plugins and the API together. Defines also a bunch of variables related to the plugin env. """ import os from os import path import logging from poezio import tabs from poezio.core.structs import Command, Completion from poezio.plugin import PluginAPI from poezio.config import config log = logging.getLogger(__name__) class PluginManager(object): """ Plugin Manager Contains all the references to the plugins And keeps track of everything the plugin has done through the API. """ def __init__(self, core): self.core = core # module name -> module object self.modules = {} # module name -> plugin object self.plugins = {} # module name -> dict of commands loaded for the module self.commands = {} # module name -> list of event_name/handler pairs loaded for the module self.event_handlers = {} # module name -> dict of tab types; tab type -> commands # loaded by the module self.tab_commands = {} # module name → dict of keys/handlers loaded for the module self.keys = {} # module name → dict of tab types; tab type → list of keybinds (tuples) self.tab_keys = {} self.roster_elements = {} from importlib import machinery self.finder = machinery.PathFinder() self.initial_set_plugins_dir() self.initial_set_plugins_conf_dir() self.fill_load_path() self.plugin_api = PluginAPI(core, self) def disable_plugins(self): for plugin in set(self.plugins.keys()): try: self.unload(plugin, notify=False) except: pass def load(self, name, notify=True): """ Load a plugin. """ if name in self.plugins: self.unload(name) try: module = None loader = self.finder.find_module(name, self.load_path) if not loader: self.core.information('Could not find plugin: %s' % name) return module = loader.load_module() except Exception as e: log.debug("Could not load plugin %s", name, exc_info=True) self.core.information("Could not load plugin %s: %s" % (name, e), 'Error') finally: if not module: return self.modules[name] = module self.commands[name] = {} self.keys[name] = {} self.tab_keys[name] = {} self.tab_commands[name] = {} self.event_handlers[name] = [] try: self.plugins[name] = None self.plugins[name] = module.Plugin(self.plugin_api, self.core, self.plugins_conf_dir) except Exception as e: log.error('Error while loading the plugin %s', name, exc_info=True) if notify: self.core.information('Unable to load the plugin %s: %s' % (name, e), 'Error') self.unload(name, notify=False) else: if notify: self.core.information('Plugin %s loaded' % name, 'Info') def unload(self, name, notify=True): if name in self.plugins: try: for command in self.commands[name].keys(): del self.core.commands[command] for key in self.keys[name].keys(): del self.core.key_func[key] for tab in list(self.tab_commands[name].keys()): for command in self.tab_commands[name][tab][:]: self.del_tab_command(name, getattr(tabs, tab), command[0]) del self.tab_commands[name][tab] for tab in list(self.tab_keys[name].keys()): for key in self.tab_keys[name][tab][:]: self.del_tab_key(name, getattr(tabs, tab), key[0]) del self.tab_keys[name][tab] for event_name, handler in self.event_handlers[name][:]: self.del_event_handler(name, event_name, handler) if self.plugins[name] is not None: self.plugins[name].unload() del self.plugins[name] del self.commands[name] del self.keys[name] del self.tab_commands[name] del self.event_handlers[name] if notify: self.core.information('Plugin %s unloaded' % name, 'Info') except Exception as e: log.debug("Could not unload plugin %s", name, exc_info=True) self.core.information("Could not unload plugin %s: %s" % (name, e), 'Error') def add_command(self, module_name, name, handler, help, completion=None, short='', usage=''): """ Add a global command. """ if name in self.core.commands: raise Exception("Command '%s' already exists" % (name,)) commands = self.commands[module_name] commands[name] = Command(handler, help, completion, short, usage) self.core.commands[name] = commands[name] def del_command(self, module_name, name): """ Remove a global command added through add_command. """ if name in self.commands[module_name]: del self.commands[module_name][name] if name in self.core.commands: del self.core.commands[name] def add_tab_command(self, module_name, tab_type, name, handler, help, completion=None, short='', usage=''): """ Add a command only for a type of Tab. """ commands = self.tab_commands[module_name] t = tab_type.__name__ if name in tab_type.plugin_commands: return if not t in commands: commands[t] = [] commands[t].append((name, handler, help, completion)) tab_type.plugin_commands[name] = Command(handler, help, completion, short, usage) for tab in self.core.tabs: if isinstance(tab, tab_type): tab.update_commands() def del_tab_command(self, module_name, tab_type, name): """ Remove a command added through add_tab_command. """ commands = self.tab_commands[module_name] t = tab_type.__name__ if not t in commands: return for command in commands[t]: if command[0] == name: commands[t].remove(command) del tab_type.plugin_commands[name] for tab in self.core.tabs: if isinstance(tab, tab_type) and name in tab.commands: del tab.commands[name] def add_tab_key(self, module_name, tab_type, key, handler): """ Associate a key binding to a handler only for a type of Tab. """ keys = self.tab_keys[module_name] t = tab_type.__name__ if key in tab_type.plugin_keys: return if not t in keys: keys[t] = [] keys[t].append((key, handler)) tab_type.plugin_keys[key] = handler for tab in self.core.tabs: if isinstance(tab, tab_type): tab.update_keys() def del_tab_key(self, module_name, tab_type, key): """ Remove a key binding added through add_tab_key. """ keys = self.tab_keys[module_name] t = tab_type.__name__ if not t in keys: return for _key in keys[t]: if _key[0] == key: keys[t].remove(_key) del tab_type.plugin_keys[key] for tab in self.core.tabs: if isinstance(tab, tab_type) and key in tab.key_func: del tab.key_func[key] def add_key(self, module_name, key, handler): """ Associate a global key binding to a handler, except if it already exists. """ if key in self.core.key_func: raise Exception("Key '%s' already exists" % (key,)) keys = self.keys[module_name] keys[key] = handler self.core.key_func[key] = handler def del_key(self, module_name, key): """ Remove a global key binding added by a plugin. """ if key in self.keys[module_name]: del self.keys[module_name][key] if key in self.core.key_func: del self.core.commands[key] def add_event_handler(self, module_name, event_name, handler, position=0): """ Add an event handler. If event_name isn’t in the event list, assume it is a slixmpp event. """ eh = self.event_handlers[module_name] eh.append((event_name, handler)) if event_name in self.core.events.events: self.core.events.add_event_handler(event_name, handler, position) else: self.core.xmpp.add_event_handler(event_name, handler) def del_event_handler(self, module_name, event_name, handler): """ Remove an event handler if it exists. """ if event_name in self.core.events.events: self.core.events.del_event_handler(None, handler) else: self.core.xmpp.del_event_handler(event_name, handler) eh = self.event_handlers[module_name] eh = [e for e in eh if e != (event_name, handler)] def completion_load(self, the_input): """ completion function that completes the name of the plugins, from all .py files in plugins_dir """ try: names = set() for path in self.load_path: try: add = set(os.listdir(path)) names |= add except: pass except OSError as e: self.core.information('Completion failed: %s' % e, 'Error') return False plugins_files = [name[:-3] for name in names if name.endswith('.py') and name != '__init__.py' and not name.startswith('.')] plugins_files.sort() position = the_input.get_argument_position(quoted=False) return Completion(the_input.new_completion, plugins_files, position, '', quotify=False) def completion_unload(self, the_input): """ completion function that completes the name of loaded plugins """ position = the_input.get_argument_position(quoted=False) return Completion(the_input.new_completion, sorted(self.plugins.keys()), position, '', quotify=False) def on_plugins_dir_change(self, new_value): self.plugins_dir = new_value self.check_create_plugins_dir() self.fill_load_path() def on_plugins_conf_dir_change(self, new_value): self.plugins_conf_dir = new_value self.check_create_plugins_conf_dir() def initial_set_plugins_conf_dir(self): """ Create the plugins_conf_dir """ plugins_conf_dir = config.get('plugins_conf_dir') if not plugins_conf_dir: config_home = os.environ.get('XDG_CONFIG_HOME') if not config_home: config_home = os.path.join(os.environ.get('HOME'), '.config') plugins_conf_dir = os.path.join(config_home, 'poezio', 'plugins') self.plugins_conf_dir = os.path.expanduser(plugins_conf_dir) self.check_create_plugins_conf_dir() def check_create_plugins_conf_dir(self): """ Create the plugins config directory if it does not exist. Returns True on success, False on failure. """ if not os.access(self.plugins_conf_dir, os.R_OK | os.X_OK): try: os.makedirs(self.plugins_conf_dir) except OSError: log.error('Unable to create the plugin conf dir: %s', self.plugins_conf_dir, exc_info=True) return False return True def initial_set_plugins_dir(self): """ Set the plugins_dir on start """ plugins_dir = config.get('plugins_dir') plugins_dir = plugins_dir or\ os.path.join(os.environ.get('XDG_DATA_HOME') or\ os.path.join(os.environ.get('HOME'), '.local', 'share'), 'poezio', 'plugins') self.plugins_dir = os.path.expanduser(plugins_dir) self.check_create_plugins_dir() def check_create_plugins_dir(self): """ Create the plugins directory if it does not exist. Returns True on success, False on failure. """ if not os.access(self.plugins_dir, os.R_OK | os.X_OK): try: os.makedirs(self.plugins_dir, exist_ok=True) except OSError: log.error('Unable to create the plugins dir: %s', self.plugins_dir, exc_info=True) return False return True def fill_load_path(self): """ Append the global packages and the source directory if available """ self.load_path = [] default_plugin_path = path.join(path.dirname(path.dirname(__file__)), 'plugins') if os.access(default_plugin_path, os.R_OK | os.X_OK): self.load_path.insert(0, default_plugin_path) if os.access(self.plugins_dir, os.R_OK | os.X_OK): self.load_path.append(self.plugins_dir) try: import poezio_plugins except: pass else: if poezio_plugins.__path__: self.load_path.append(list(poezio_plugins.__path__)[0])