from gpg import gnupg from sleekxmpp.xmlstream.stanzabase import JID from xml.etree import cElementTree as ET import xml.sax.saxutils import logging log = logging.getLogger(__name__) from plugin import BasePlugin from tabs import ConversationTab NS_SIGNED = "jabber:x:signed" NS_ENCRYPTED = "jabber:x:encrypted" SIGNED_ATTACHED_MESSAGE = """-----BEGIN PGP SIGNED MESSAGE----- Hash: %(hash)s %(clear)s -----BEGIN PGP SIGNATURE----- Version: GnuPG %(data)s -----END PGP SIGNATURE----- """ ENCRYPTED_MESSAGE = """-----BEGIN PGP MESSAGE----- Version: GnuPG %(data)s -----END PGP MESSAGE-----""" class Plugin(BasePlugin): def init(self): self.contacts = {} # a dict of {full-JID: 'signed'/'valid'/'invalid'/'disabled'} # Whenever we receive a signed presence from a JID, we add it to this # dict, this way we know if we can encrypt the messages we will send to # this JID. # If that resource sends a non-signed presence, then we remove it # from that dict and stop encrypting our messages. # 'disabled' means that the user do NOT want to encrypt its messages # even if the key is valid. self.gpg = gnupg.GPG() self.keyid = self.config.get('keyid', '') or None self.passphrase = self.config.get('passphrase', '') or None if not self.keyid: self.core.information('No GPG keyid provided in the configuration', 'Warning') self.add_event_handler('send_normal_presence', self.sign_presence) self.add_event_handler('normal_presence', self.on_normal_presence) self.add_event_handler('conversation_say_after', self.on_conversation_say) self.add_event_handler('conversation_msg', self.on_conversation_msg) self.add_tab_command(ConversationTab, 'gpg', self.command_gpg, usage=' [jid] [keyid]', help='Force or disable gpg encryption with the fulljid of the current conversation. The setkey argument lets you associate a keyid with the given bare JID.', short='Manage the GPG status', completion=self.gpg_completion) ConversationTab.add_information_element('gpg', self.display_encryption_status) def cleanup(self): self.send_unsigned_presence() ConversationTab.remove_information_element('gpg') self.del_tab_command(ConversationTab, 'gpg') def sign_presence(self, presence): """ Sign every normal presence we send """ signed_element = ET.Element('{%s}x' % (NS_SIGNED,)) t = self.gpg.sign(presence['status'], keyid=self.keyid, passphrase=self.passphrase, detach=True) if not t: self.core.information('Could not sign presence. Disabling GPG module', 'Info') self.core.plugin_manager.unload('gpg') return text = xml.sax.saxutils.escape(str(t)) signed_element.text = self.remove_gpg_headers(text) presence.append(signed_element) def send_unsigned_presence(self): """ Send our current presence, to everyone, but unsigned, to indicate that we cannot/do not want to encrypt/decrypt messages. """ current_presence = self.core.get_status() self.core.command_status('%s %s' % (current_presence.show or 'available', current_presence.message,)) def on_normal_presence(self, presence, resource): """ Check if it’s signed, if it is and we can verify the signature, add 'valid' or 'invalid' into the dict. If it cannot be verified, just add 'signed'. Otherwise, do nothing. """ signed = presence.find('{%s}x' % (NS_SIGNED,)) bare = presence['from'].bare full = presence['from'].full if signed is None: if bare in self.contacts.keys(): del self.contacts[bare] return if self.config.has_section('keys') and bare in self.config.options('keys'): self.contacts[full] = 'invalid' for hash_ in ('SHA1', 'SHA256'): to_verify = SIGNED_ATTACHED_MESSAGE % {'clear': presence['status'], 'data': signed.text, 'hash': hash_} verify = self.gpg.verify(to_verify) if verify: self.contacts[full] = 'valid' break else: self.contacts[full] = 'signed' def on_conversation_say(self, message, tab): """ Check if the contact has a signed AND veryfied signature. If yes, encrypt the message with her key. """ to = message['to'] if not message['body']: # there’s nothing to encrypt if this is a chatstate, for example return signed = to.full in self.contacts.keys() if signed: veryfied = self.contacts[to.full] in ('valid', 'forced') else: veryfied = False if veryfied: # remove the xhtm_im body if present, because that # cannot be encrypted. del message['xhtml_im'] encrypted_element = ET.Element('{%s}x' % (NS_ENCRYPTED,)) text = self.gpg.encrypt(message['body'], self.config.get(to.bare, '', section='keys'), always_trust=True) if not text: self.core.information('Could not encrypt message to %s' % (to.full),) # If we could not encrypt the message, don't send anything message['body'] = '' return encrypted_element.text = self.remove_gpg_headers(xml.sax.saxutils.escape(str(text))) message.append(encrypted_element) message['body'] = 'This message has been encrypted using the GPG key with id: %s' % self.keyid def on_conversation_msg(self, message, tab): """ Check if the message is encrypted, and decrypt it if we can. """ encrypted = message.find('{%s}x' % (NS_ENCRYPTED,)) fro = message['from'] if encrypted is not None: if self.config.has_section('keys') and fro.bare in self.config.options('keys'): keyid = self.config.get(fro.bare, '', 'keys') decrypted = self.gpg.decrypt(ENCRYPTED_MESSAGE % {'data': str(encrypted.text)}, passphrase=self.passphrase) if not decrypted: self.core.information('Could not decrypt message from %s' % (fro.full),) return message['body'] = str(decrypted) def display_encryption_status(self, jid): """ Returns the status of encryption for the associated jid. This is to be used in the ConversationTab’s InfoWin. """ if jid.full not in self.contacts.keys(): return '' status = self.contacts[jid.full] if status in ('valid', 'invalid', 'signed'): return ' GPG Key: %s (%s)' % (status, 'encrypted' if status == 'valid' else 'NOT encrypted',) else: return ' GPG: Encryption %s' % (status,) def command_gpg(self, args): """ A command to force or disable the encryption, or to assign a keyid to a JID """ args = args.split() if not args: return self.core.command_help("gpg") if len(args) >= 2: jid = JID(args[1]) else: if isinstance(self.core.current_tab(), ConversationTab): jid = JID(self.core.current_tab().get_name()) else: return command = args[0] if command == 'force' or command == 'enable': # we can force encryption only with contact having an associated # key, otherwise we cannot encrypt at all if self.config.has_section('keys') and jid.bare in self.config.options('keys'): self.contacts[JID(jid).full] = 'forced' else: self.core.information('Cannot force encryption: no key associated with %s' % (jid.bare), 'Info') elif command == 'disable': self.contacts[JID(jid).full] = 'disabled' elif command == 'setkey': if len(args) != 3: return self.core.command_help("gpg") if not self.config.has_section('keys'): self.config.add_section('keys') self.config.set(jid.bare, args[2], 'keys') self.config.write() self.core.refresh_window() def gpg_completion(self, the_input): return the_input.auto_completion(['force', 'disable', 'setkey'], ' ') def remove_gpg_headers(self, text): lines = text.splitlines() while lines[0].strip() != '': lines.pop(0) while lines[0].strip() == '': lines.pop(0) res = [] for line in lines: if not line.startswith('---'): res.append(line) return '\n'.join(res)