diff options
Diffstat (limited to 'src/plugin_manager.py')
-rw-r--r-- | src/plugin_manager.py | 384 |
1 files changed, 0 insertions, 384 deletions
diff --git a/src/plugin_manager.py b/src/plugin_manager.py deleted file mode 100644 index 549753a9..00000000 --- a/src/plugin_manager.py +++ /dev/null @@ -1,384 +0,0 @@ -""" -Plugin manager module. -Define the PluginManager class, the one that glues all the plugins and -the API together. Defines also a bunch of variables related to the -plugin env. -""" - -import os -from os import path -import logging - -import core -import tabs -from plugin import PluginAPI -from config import config - -log = logging.getLogger(__name__) - -class PluginManager(object): - """ - Plugin Manager - Contains all the references to the plugins - And keeps track of everything the plugin has done through the API. - """ - def __init__(self, core): - self.core = core - # module name -> module object - self.modules = {} - # module name -> plugin object - self.plugins = {} - # module name -> dict of commands loaded for the module - self.commands = {} - # module name -> list of event_name/handler pairs loaded for the module - self.event_handlers = {} - # module name -> dict of tab types; tab type -> commands - # loaded by the module - self.tab_commands = {} - # module name → dict of keys/handlers loaded for the module - self.keys = {} - # module name → dict of tab types; tab type → list of keybinds (tuples) - self.tab_keys = {} - self.roster_elements = {} - - from importlib import machinery - self.finder = machinery.PathFinder() - - self.initial_set_plugins_dir() - self.initial_set_plugins_conf_dir() - self.fill_load_path() - - self.plugin_api = PluginAPI(core, self) - - def disable_plugins(self): - for plugin in set(self.plugins.keys()): - try: - self.unload(plugin, notify=False) - except: - pass - - def load(self, name, notify=True): - """ - Load a plugin. - """ - if name in self.plugins: - self.unload(name) - - try: - module = None - loader = self.finder.find_module(name, self.load_path) - if not loader: - self.core.information('Could not find plugin: %s' % name) - return - module = loader.load_module() - except Exception as e: - log.debug("Could not load plugin %s", name, exc_info=True) - self.core.information("Could not load plugin %s: %s" % (name, e), - 'Error') - finally: - if not module: - return - - self.modules[name] = module - self.commands[name] = {} - self.keys[name] = {} - self.tab_keys[name] = {} - self.tab_commands[name] = {} - self.event_handlers[name] = [] - try: - self.plugins[name] = None - self.plugins[name] = module.Plugin(self.plugin_api, self.core, - self.plugins_conf_dir) - except Exception as e: - log.error('Error while loading the plugin %s', name, exc_info=True) - if notify: - self.core.information('Unable to load the plugin %s: %s' % - (name, e), - 'Error') - self.unload(name, notify=False) - else: - if notify: - self.core.information('Plugin %s loaded' % name, 'Info') - - def unload(self, name, notify=True): - if name in self.plugins: - try: - for command in self.commands[name].keys(): - del self.core.commands[command] - for key in self.keys[name].keys(): - del self.core.key_func[key] - for tab in list(self.tab_commands[name].keys()): - for command in self.tab_commands[name][tab][:]: - self.del_tab_command(name, getattr(tabs, tab), - command[0]) - del self.tab_commands[name][tab] - for tab in list(self.tab_keys[name].keys()): - for key in self.tab_keys[name][tab][:]: - self.del_tab_key(name, getattr(tabs, tab), key[0]) - del self.tab_keys[name][tab] - for event_name, handler in self.event_handlers[name][:]: - self.del_event_handler(name, event_name, handler) - - if self.plugins[name] is not None: - self.plugins[name].unload() - del self.plugins[name] - del self.commands[name] - del self.keys[name] - del self.tab_commands[name] - del self.event_handlers[name] - if notify: - self.core.information('Plugin %s unloaded' % name, 'Info') - except Exception as e: - log.debug("Could not unload plugin %s", name, exc_info=True) - self.core.information("Could not unload plugin %s: %s" % - (name, e), - 'Error') - - def add_command(self, module_name, name, handler, help, - completion=None, short='', usage=''): - """ - Add a global command. - """ - if name in self.core.commands: - raise Exception("Command '%s' already exists" % (name,)) - - commands = self.commands[module_name] - commands[name] = core.Command(handler, help, completion, short, usage) - self.core.commands[name] = commands[name] - - def del_command(self, module_name, name): - """ - Remove a global command added through add_command. - """ - if name in self.commands[module_name]: - del self.commands[module_name][name] - if name in self.core.commands: - del self.core.commands[name] - - def add_tab_command(self, module_name, tab_type, name, handler, help, - completion=None, short='', usage=''): - """ - Add a command only for a type of Tab. - """ - commands = self.tab_commands[module_name] - t = tab_type.__name__ - if name in tab_type.plugin_commands: - return - if not t in commands: - commands[t] = [] - commands[t].append((name, handler, help, completion)) - tab_type.plugin_commands[name] = core.Command(handler, help, - completion, short, usage) - for tab in self.core.tabs: - if isinstance(tab, tab_type): - tab.update_commands() - - def del_tab_command(self, module_name, tab_type, name): - """ - Remove a command added through add_tab_command. - """ - commands = self.tab_commands[module_name] - t = tab_type.__name__ - if not t in commands: - return - for command in commands[t]: - if command[0] == name: - commands[t].remove(command) - del tab_type.plugin_commands[name] - for tab in self.core.tabs: - if isinstance(tab, tab_type) and name in tab.commands: - del tab.commands[name] - - def add_tab_key(self, module_name, tab_type, key, handler): - """ - Associate a key binding to a handler only for a type of Tab. - """ - keys = self.tab_keys[module_name] - t = tab_type.__name__ - if key in tab_type.plugin_keys: - return - if not t in keys: - keys[t] = [] - keys[t].append((key, handler)) - tab_type.plugin_keys[key] = handler - for tab in self.core.tabs: - if isinstance(tab, tab_type): - tab.update_keys() - - def del_tab_key(self, module_name, tab_type, key): - """ - Remove a key binding added through add_tab_key. - """ - keys = self.tab_keys[module_name] - t = tab_type.__name__ - if not t in keys: - return - for _key in keys[t]: - if _key[0] == key: - keys[t].remove(_key) - del tab_type.plugin_keys[key] - for tab in self.core.tabs: - if isinstance(tab, tab_type) and key in tab.key_func: - del tab.key_func[key] - - def add_key(self, module_name, key, handler): - """ - Associate a global key binding to a handler, except if it - already exists. - """ - if key in self.core.key_func: - raise Exception("Key '%s' already exists" % (key,)) - keys = self.keys[module_name] - keys[key] = handler - self.core.key_func[key] = handler - - def del_key(self, module_name, key): - """ - Remove a global key binding added by a plugin. - """ - if key in self.keys[module_name]: - del self.keys[module_name][key] - if key in self.core.key_func: - del self.core.commands[key] - - def add_event_handler(self, module_name, event_name, handler, position=0): - """ - Add an event handler. If event_name isn’t in the event list, assume - it is a slixmpp event. - """ - eh = self.event_handlers[module_name] - eh.append((event_name, handler)) - if event_name in self.core.events.events: - self.core.events.add_event_handler(event_name, handler, position) - else: - self.core.xmpp.add_event_handler(event_name, handler) - - def del_event_handler(self, module_name, event_name, handler): - """ - Remove an event handler if it exists. - """ - if event_name in self.core.events.events: - self.core.events.del_event_handler(None, handler) - else: - self.core.xmpp.del_event_handler(event_name, handler) - eh = self.event_handlers[module_name] - eh = list(filter(lambda e: e != (event_name, handler), eh)) - - def completion_load(self, the_input): - """ - completion function that completes the name of the plugins, from - all .py files in plugins_dir - """ - try: - names = set() - for path in self.load_path: - try: - add = set(os.listdir(path)) - names |= add - except: - pass - except OSError as e: - self.core.information('Completion failed: %s' % e, 'Error') - return - plugins_files = [name[:-3] for name in names if name.endswith('.py') - and name != '__init__.py' and not name.startswith('.')] - plugins_files.sort() - position = the_input.get_argument_position(quoted=False) - return the_input.new_completion(plugins_files, position, '', - quotify=False) - - def completion_unload(self, the_input): - """ - completion function that completes the name of loaded plugins - """ - position = the_input.get_argument_position(quoted=False) - return the_input.new_completion(sorted(self.plugins.keys()), position, - '', quotify=False) - - def on_plugins_dir_change(self, new_value): - self.plugins_dir = new_value - self.check_create_plugins_dir() - self.fill_load_path() - - def on_plugins_conf_dir_change(self, new_value): - self.plugins_conf_dir = new_value - self.check_create_plugins_conf_dir() - - def initial_set_plugins_conf_dir(self): - """ - Create the plugins_conf_dir - """ - plugins_conf_dir = config.get('plugins_conf_dir') - if not plugins_conf_dir: - config_home = os.environ.get('XDG_CONFIG_HOME') - if not config_home: - config_home = os.path.join(os.environ.get('HOME'), '.config') - plugins_conf_dir = os.path.join(config_home, 'poezio', 'plugins') - self.plugins_conf_dir = os.path.expanduser(plugins_conf_dir) - self.check_create_plugins_conf_dir() - - def check_create_plugins_conf_dir(self): - """ - Create the plugins config directory if it does not exist. - Returns True on success, False on failure. - """ - if not os.access(self.plugins_conf_dir, os.R_OK | os.X_OK): - try: - os.makedirs(self.plugins_conf_dir) - except OSError: - log.error('Unable to create the plugin conf dir: %s', - self.plugins_conf_dir, exc_info=True) - return False - return True - - def initial_set_plugins_dir(self): - """ - Set the plugins_dir on start - """ - plugins_dir = config.get('plugins_dir') - plugins_dir = plugins_dir or\ - os.path.join(os.environ.get('XDG_DATA_HOME') or\ - os.path.join(os.environ.get('HOME'), - '.local', 'share'), - 'poezio', 'plugins') - self.plugins_dir = os.path.expanduser(plugins_dir) - self.check_create_plugins_dir() - - def check_create_plugins_dir(self): - """ - Create the plugins directory if it does not exist. - Returns True on success, False on failure. - """ - if not os.access(self.plugins_dir, os.R_OK | os.X_OK): - try: - os.makedirs(self.plugins_dir, exist_ok=True) - except OSError: - log.error('Unable to create the plugins dir: %s', - self.plugins_dir, exc_info=True) - return False - return True - - def fill_load_path(self): - """ - Append the global packages and the source directory if available - """ - - self.load_path = [] - - default_plugin_path = path.join(path.dirname(path.dirname(__file__)), - 'plugins') - - if os.access(default_plugin_path, os.R_OK | os.X_OK): - self.load_path.insert(0, default_plugin_path) - - if os.access(self.plugins_dir, os.R_OK | os.X_OK): - self.load_path.append(self.plugins_dir) - - try: - import poezio_plugins - except: - pass - else: - if poezio_plugins.__path__: - self.load_path.append(list(poezio_plugins.__path__)[0]) - |