""" Plugin manager module. Define the PluginManager class, the one that glues all the plugins and the API together. Defines also a bunch of variables related to the plugin env. """ import imp import os from os import path import logging from gettext import gettext as _ from sys import version_info import core import tabs from plugin import PluginAPI from config import config log = logging.getLogger(__name__) class PluginManager(object): """ Plugin Manager Contains all the references to the plugins And keeps track of everything the plugin has done through the API. """ def __init__(self, core): self.core = core # module name -> module object self.modules = {} # module name -> plugin object self.plugins = {} # module name -> dict of commands loaded for the module self.commands = {} # module name -> list of event_name/handler pairs loaded for the module self.event_handlers = {} # module name -> dict of tab types; tab type -> commands # loaded by the module self.tab_commands = {} # module name → dict of keys/handlers loaded for the module self.keys = {} # module name → dict of tab types; tab type → list of keybinds (tuples) self.tab_keys = {} self.roster_elements = {} if version_info[1] >= 3: # 3.3 & > from importlib import machinery self.finder = machinery.PathFinder() self.initial_set_plugins_dir() self.initial_set_plugins_conf_dir() self.fill_load_path() self.plugin_api = PluginAPI(core, self) def disable_plugins(self): for plugin in set(self.plugins.keys()): try: self.unload(plugin, notify=False) except: pass def load(self, name, notify=True): """ Load a plugin. """ if name in self.plugins: self.unload(name) try: module = None if version_info[1] < 3: # < 3.3 if name in self.modules: imp.acquire_lock() module = imp.reload(self.modules[name]) else: file, filename, info = imp.find_module(name, self.load_path) imp.acquire_lock() module = imp.load_module(name, file, filename, info) else: # 3.3 & > loader = self.finder.find_module(name, self.load_path) if not loader: self.core.information('Could not find plugin: %s' % name) return module = loader.load_module() except Exception as e: log.debug("Could not load plugin %s", name, exc_info=True) self.core.information("Could not load plugin %s: %s" % (name, e), 'Error') finally: if version_info[1] < 3 and imp.lock_held(): imp.release_lock() if not module: return self.modules[name] = module self.commands[name] = {} self.keys[name] = {} self.tab_keys[name] = {} self.tab_commands[name] = {} self.event_handlers[name] = [] try: self.plugins[name] = None self.plugins[name] = module.Plugin(self.plugin_api, self.core, self.plugins_conf_dir) except Exception as e: log.error('Error while loading the plugin %s', name, exc_info=True) if notify: self.core.information(_('Unable to load the plugin %s: %s') % (name, e), 'Error') self.unload(name, notify=False) else: if notify: self.core.information('Plugin %s loaded' % name, 'Info') def unload(self, name, notify=True): if name in self.plugins: try: for command in self.commands[name].keys(): del self.core.commands[command] for key in self.keys[name].keys(): del self.core.key_func[key] for tab in list(self.tab_commands[name].keys()): for command in self.tab_commands[name][tab][:]: self.del_tab_command(name, getattr(tabs, tab), command[0]) del self.tab_commands[name][tab] for tab in list(self.tab_keys[name].keys()): for key in self.tab_keys[name][tab][:]: self.del_tab_key(name, getattr(tabs, tab), key[0]) del self.tab_keys[name][tab] for event_name, handler in self.event_handlers[name][:]: self.del_event_handler(name, event_name, handler) if self.plugins[name] is not None: self.plugins[name].unload() del self.plugins[name] del self.commands[name] del self.keys[name] del self.tab_commands[name] del self.event_handlers[name] if notify: self.core.information('Plugin %s unloaded' % name, 'Info') except Exception as e: log.debug("Could not unload plugin %s", name, exc_info=True) self.core.information(_("Could not unload plugin %s: %s") % (name, e), 'Error') def add_command(self, module_name, name, handler, help, completion=None, short='', usage=''): """ Add a global command. """ if name in self.core.commands: raise Exception(_("Command '%s' already exists") % (name,)) commands = self.commands[module_name] commands[name] = core.Command(handler, help, completion, short, usage) self.core.commands[name] = commands[name] def del_command(self, module_name, name): """ Remove a global command added through add_command. """ if name in self.commands[module_name]: del self.commands[module_name][name] if name in self.core.commands: del self.core.commands[name] def add_tab_command(self, module_name, tab_type, name, handler, help, completion=None, short='', usage=''): """ Add a command only for a type of Tab. """ commands = self.tab_commands[module_name] t = tab_type.__name__ if name in tab_type.plugin_commands: return if not t in commands: commands[t] = [] commands[t].append((name, handler, help, completion)) tab_type.plugin_commands[name] = core.Command(handler, help, completion, short, usage) for tab in self.core.tabs: if isinstance(tab, tab_type): tab.update_commands() def del_tab_command(self, module_name, tab_type, name): """ Remove a command added through add_tab_command. """ commands = self.tab_commands[module_name] t = tab_type.__name__ if not t in commands: return for command in commands[t]: if command[0] == name: commands[t].remove(command) del tab_type.plugin_commands[name] for tab in self.core.tabs: if isinstance(tab, tab_type) and name in tab.commands: del tab.commands[name] def add_tab_key(self, module_name, tab_type, key, handler): """ Associate a key binding to a handler only for a type of Tab. """ keys = self.tab_keys[module_name] t = tab_type.__name__ if key in tab_type.plugin_keys: return if not t in keys: keys[t] = [] keys[t].append((key, handler)) tab_type.plugin_keys[key] = handler for tab in self.core.tabs: if isinstance(tab, tab_type): tab.update_keys() def del_tab_key(self, module_name, tab_type, key): """ Remove a key binding added through add_tab_key. """ keys = self.tab_keys[module_name] t = tab_type.__name__ if not t in keys: return for _key in keys[t]: if _key[0] == key: keys[t].remove(_key) del tab_type.plugin_keys[key] for tab in self.core.tabs: if isinstance(tab, tab_type) and key in tab.key_func: del tab.key_func[key] def add_key(self, module_name, key, handler): """ Associate a global key binding to a handler, except if it already exists. """ if key in self.core.key_func: raise Exception(_("Key '%s' already exists") % (key,)) keys = self.keys[module_name] keys[key] = handler self.core.key_func[key] = handler def del_key(self, module_name, key): """ Remove a global key binding added by a plugin. """ if key in self.keys[module_name]: del self.keys[module_name][key] if key in self.core.key_func: del self.core.commands[key] def add_event_handler(self, module_name, event_name, handler, position=0): """ Add an event handler. If event_name isn’t in the event list, assume it is a sleekxmpp event. """ eh = self.event_handlers[module_name] eh.append((event_name, handler)) if event_name in self.core.events.events: self.core.events.add_event_handler(event_name, handler, position) else: self.core.xmpp.add_event_handler(event_name, handler) def del_event_handler(self, module_name, event_name, handler): """ Remove an event handler if it exists. """ if event_name in self.core.events.events: self.core.events.del_event_handler(None, handler) else: self.core.xmpp.del_event_handler(event_name, handler) eh = self.event_handlers[module_name] eh = list(filter(lambda e: e != (event_name, handler), eh)) def completion_load(self, the_input): """ completion function that completes the name of the plugins, from all .py files in plugins_dir """ try: names = set() for path in self.load_path: try: add = set(os.listdir(path)) names |= add except: pass except OSError as e: self.core.information(_('Completion failed: %s' % e), 'Error') return plugins_files = [name[:-3] for name in names if name.endswith('.py') and name != '__init__.py' and not name.startswith('.')] plugins_files.sort() position = the_input.get_argument_position(quoted=False) return the_input.new_completion(plugins_files, position, '', quotify=False) def completion_unload(self, the_input): """ completion function that completes the name of loaded plugins """ position = the_input.get_argument_position(quoted=False) return the_input.new_completion(sorted(self.plugins.keys()), position, '', quotify=False) def on_plugins_dir_change(self, new_value): self.plugins_dir = new_value self.check_create_plugins_dir() self.fill_load_path() def on_plugins_conf_dir_change(self, new_value): self.plugins_conf_dir = new_value self.check_create_plugins_conf_dir() def initial_set_plugins_conf_dir(self): """ Create the plugins_conf_dir """ plugins_conf_dir = config.get('plugins_conf_dir', '') if not plugins_conf_dir: config_home = os.environ.get('XDG_CONFIG_HOME') if not config_home: config_home = os.path.join(os.environ.get('HOME'), '.config') plugins_conf_dir = os.path.join(config_home, 'poezio', 'plugins') self.plugins_conf_dir = os.path.expanduser(plugins_conf_dir) self.check_create_plugins_conf_dir() def check_create_plugins_conf_dir(self): """ Create the plugins config directory if it does not exist. Returns True on success, False on failure. """ if not os.access(self.plugins_conf_dir, os.R_OK | os.X_OK): try: os.makedirs(self.plugins_conf_dir) except OSError: log.error('Unable to create the plugin conf dir: %s', plugins_conf_dir, exc_info=True) return False return True def initial_set_plugins_dir(self): """ Set the plugins_dir on start """ plugins_dir = config.get('plugins_dir', '') plugins_dir = plugins_dir or\ os.path.join(os.environ.get('XDG_DATA_HOME') or\ os.path.join(os.environ.get('HOME'), '.local', 'share'), 'poezio', 'plugins') self.plugins_dir = os.path.expanduser(plugins_dir) self.check_create_plugins_dir() def check_create_plugins_dir(self): """ Create the plugins directory if it does not exist. Returns True on success, False on failure. """ if not os.access(self.plugins_dir, os.R_OK | os.X_OK): try: os.makedirs(self.plugins_dir, exist_ok=True) except OSError: log.error('Unable to create the plugins dir: %s', self.plugins_dir, exc_info=True) return False return True def fill_load_path(self): """ Append the global packages and the source directory if available """ self.load_path = [] default_plugin_path = path.join(path.dirname(path.dirname(__file__)), 'plugins') if os.access(default_plugin_path, os.R_OK | os.X_OK): self.load_path.insert(0, default_plugin_path) if os.access(self.plugins_dir, os.R_OK | os.X_OK): self.load_path.append(self.plugins_dir) try: import poezio_plugins except: pass else: if poezio_plugins.__path__: self.load_path.append(list(poezio_plugins.__path__)[0])