From 78bd21b7cfe34f2183d052a292df01f1d3ad9269 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Mon, 11 Feb 2013 18:08:34 -0800 Subject: Support using messages for IBB data transfer. --- sleekxmpp/plugins/xep_0047/ibb.py | 20 ++++++++++----- sleekxmpp/plugins/xep_0047/stream.py | 47 ++++++++++++++++++++++++------------ 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/sleekxmpp/plugins/xep_0047/ibb.py b/sleekxmpp/plugins/xep_0047/ibb.py index fb48a9b9..e341433f 100644 --- a/sleekxmpp/plugins/xep_0047/ibb.py +++ b/sleekxmpp/plugins/xep_0047/ibb.py @@ -36,6 +36,7 @@ class XEP_0047(BasePlugin): register_stanza_plugin(Iq, Open) register_stanza_plugin(Iq, Close) register_stanza_plugin(Iq, Data) + register_stanza_plugin(Message, Data) self.xmpp.register_handler(Callback( 'IBB Open', @@ -52,10 +53,16 @@ class XEP_0047(BasePlugin): StanzaPath('iq@type=set/ibb_data'), self._handle_data)) + self.xmpp.register_handler(Callback( + 'IBB Message Data', + StanzaPath('message/ibb_data'), + self._handle_data)) + def plugin_end(self): self.xmpp.remove_handler('IBB Open') self.xmpp.remove_handler('IBB Close') self.xmpp.remove_handler('IBB Data') + self.xmpp.remove_handler('IBB Message Data') self.xmpp['xep_0030'].del_feature(feature='http://jabber.org/protocol/ibb') def session_bind(self, jid): @@ -69,7 +76,7 @@ class XEP_0047(BasePlugin): return True return False - def open_stream(self, jid, block_size=4096, sid=None, window=1, + def open_stream(self, jid, block_size=4096, sid=None, window=1, use_messages=False, ifrom=None, block=True, timeout=None, callback=None): if sid is None: sid = str(uuid.uuid4()) @@ -83,7 +90,8 @@ class XEP_0047(BasePlugin): iq['ibb_open']['stanza'] = 'iq' stream = IBBytestream(self.xmpp, sid, block_size, - iq['to'], iq['from'], window) + iq['to'], iq['from'], window, + use_messages) with self._stream_lock: self.pending_streams[iq['id']] = stream @@ -139,11 +147,11 @@ class XEP_0047(BasePlugin): self.xmpp.event('ibb_stream_start', stream) - def _handle_data(self, iq): - sid = iq['ibb_data']['sid'] + def _handle_data(self, stanza): + sid = stanza['ibb_data']['sid'] stream = self.streams.get(sid, None) - if stream is not None and iq['from'] != stream.sender: - stream._recv_data(iq) + if stream is not None and stanza['from'] != stream.sender: + stream._recv_data(stanza) else: raise XMPPError('item-not-found') diff --git a/sleekxmpp/plugins/xep_0047/stream.py b/sleekxmpp/plugins/xep_0047/stream.py index b49a077b..adc86450 100644 --- a/sleekxmpp/plugins/xep_0047/stream.py +++ b/sleekxmpp/plugins/xep_0047/stream.py @@ -2,6 +2,7 @@ import socket import threading import logging +from sleekxmpp.stanza import Iq from sleekxmpp.util import Queue from sleekxmpp.exceptions import XMPPError @@ -11,11 +12,12 @@ log = logging.getLogger(__name__) class IBBytestream(object): - def __init__(self, xmpp, sid, block_size, to, ifrom, window_size=1): + def __init__(self, xmpp, sid, block_size, to, ifrom, window_size=1, use_messages=False): self.xmpp = xmpp self.sid = sid self.block_size = block_size self.window_size = window_size + self.use_messages = use_messages self.receiver = to self.sender = ifrom @@ -46,16 +48,27 @@ class IBBytestream(object): with self._send_seq_lock: self.send_seq = (self.send_seq + 1) % 65535 seq = self.send_seq - iq = self.xmpp.Iq() - iq['type'] = 'set' - iq['to'] = self.receiver - iq['from'] = self.sender - iq['ibb_data']['sid'] = self.sid - iq['ibb_data']['seq'] = seq - iq['ibb_data']['data'] = data - self.window_empty.clear() - self.window_ids.add(iq['id']) - iq.send(block=False, callback=self._recv_ack) + if self.use_messages: + msg = self.xmpp.Message() + msg['to'] = self.receiver + msg['from'] = self.sender + msg['id'] = self.xmpp.new_id() + msg['ibb_data']['sid'] = self.sid + msg['ibb_data']['seq'] = seq + msg['ibb_data']['data'] = data + msg.send() + self.send_window.release() + else: + iq = self.xmpp.Iq() + iq['type'] = 'set' + iq['to'] = self.receiver + iq['from'] = self.sender + iq['ibb_data']['sid'] = self.sid + iq['ibb_data']['seq'] = seq + iq['ibb_data']['data'] = data + self.window_empty.clear() + self.window_ids.add(iq['id']) + iq.send(block=False, callback=self._recv_ack) return len(data) def sendall(self, data): @@ -71,23 +84,25 @@ class IBBytestream(object): if iq['type'] == 'error': self.close() - def _recv_data(self, iq): + def _recv_data(self, stanza): with self._recv_seq_lock: - new_seq = iq['ibb_data']['seq'] + new_seq = stanza['ibb_data']['seq'] if new_seq != (self.recv_seq + 1) % 65535: self.close() raise XMPPError('unexpected-request') self.recv_seq = new_seq - data = iq['ibb_data']['data'] + data = stanza['ibb_data']['data'] if len(data) > self.block_size: self.close() raise XMPPError('not-acceptable') self.recv_queue.put(data) self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data}) - iq.reply() - iq.send() + + if isinstance(stanza, Iq): + stanza.reply() + stanza.send() def recv(self, *args, **kwargs): return self.read(block=True) -- cgit v1.2.3 From fbf79755d72e35b365729d6f7e7d85fdadfb1403 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Mon, 11 Feb 2013 19:49:38 -0800 Subject: Track which verstrings are being checked, so we don't request duplicates. --- sleekxmpp/plugins/xep_0115/caps.py | 50 +++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/sleekxmpp/plugins/xep_0115/caps.py b/sleekxmpp/plugins/xep_0115/caps.py index 8bad1410..1d8d9e42 100644 --- a/sleekxmpp/plugins/xep_0115/caps.py +++ b/sleekxmpp/plugins/xep_0115/caps.py @@ -9,8 +9,9 @@ import logging import hashlib import base64 +import threading -import sleekxmpp +from sleekxmpp import __version__ from sleekxmpp.stanza import StreamFeatures, Presence, Iq from sleekxmpp.xmlstream import register_stanza_plugin, JID from sleekxmpp.xmlstream.handler import Callback @@ -45,8 +46,7 @@ class XEP_0115(BasePlugin): 'md5': hashlib.md5} if self.caps_node is None: - ver = sleekxmpp.__version__ - self.caps_node = 'http://sleekxmpp.com/ver/%s' % ver + self.caps_node = 'http://sleekxmpp.com/ver/%s' % __version__ register_stanza_plugin(Presence, stanza.Capabilities) register_stanza_plugin(StreamFeatures, stanza.Capabilities) @@ -90,6 +90,9 @@ class XEP_0115(BasePlugin): disco.assign_verstring = self.assign_verstring disco.get_verstring = self.get_verstring + self._processing_lock = threading.Lock() + self._processing = set() + def plugin_end(self): self.xmpp['xep_0030'].del_feature(feature=stanza.Capabilities.namespace) self.xmpp.del_filter('out', self._filter_add_caps) @@ -139,13 +142,15 @@ class XEP_0115(BasePlugin): self.xmpp.event('entity_caps_legacy', pres) return + ver = pres['caps']['ver'] + existing_verstring = self.get_verstring(pres['from'].full) - if str(existing_verstring) == str(pres['caps']['ver']): + if str(existing_verstring) == str(ver): return - existing_caps = self.get_caps(verstring=pres['caps']['ver']) + existing_caps = self.get_caps(verstring=ver) if existing_caps is not None: - self.assign_verstring(pres['from'], pres['caps']['ver']) + self.assign_verstring(pres['from'], ver) return if pres['caps']['hash'] not in self.hashes: @@ -156,9 +161,16 @@ class XEP_0115(BasePlugin): except XMPPError: return - log.debug("New caps verification string: %s", pres['caps']['ver']) + # Only lookup the same caps once at a time. + with self._processing_lock: + if ver in self._processing: + log.debug('Already processing verstring %s' % ver) + return + self._processing.add(ver) + + log.debug("New caps verification string: %s", ver) try: - node = '%s#%s' % (pres['caps']['node'], pres['caps']['ver']) + node = '%s#%s' % (pres['caps']['node'], ver) caps = self.xmpp['xep_0030'].get_info(pres['from'], node) if isinstance(caps, Iq): @@ -168,7 +180,10 @@ class XEP_0115(BasePlugin): pres['caps']['ver']): self.assign_verstring(pres['from'], pres['caps']['ver']) except XMPPError: - log.debug("Could not retrieve disco#info results for caps") + log.debug("Could not retrieve disco#info results for caps for %s", node) + + with self._processing_lock: + self._processing.remove(ver) def _validate_caps(self, caps, hash, check_verstring): # Check Identities @@ -179,7 +194,6 @@ class XEP_0115(BasePlugin): return False # Check Features - full_features = caps.get_features(dedupe=False) deduped_features = caps.get_features() if len(full_features) != len(deduped_features): @@ -272,7 +286,7 @@ class XEP_0115(BasePlugin): binary = hash(S.encode('utf8')).digest() return base64.b64encode(binary).decode('utf-8') - def update_caps(self, jid=None, node=None): + def update_caps(self, jid=None, node=None, preserve=False): try: info = self.xmpp['xep_0030'].get_info(jid, node, local=True) if isinstance(info, Iq): @@ -286,19 +300,11 @@ class XEP_0115(BasePlugin): self.assign_verstring(jid, ver) if self.xmpp.session_started_event.is_set() and self.broadcast: - # Check if we've sent directed presence. If we haven't, we - # can just send a normal presence stanza. If we have, then - # we will send presence to each contact individually so - # that we don't clobber existing statuses. - directed = False or self.xmpp.is_component - for contact in self.xmpp.roster[jid]: - if self.xmpp.roster[jid][contact].last_status is not None: - directed = True - if not directed: - self.xmpp.roster[jid].send_last_presence() - else: + if self.xmpp.is_component or preserve: for contact in self.xmpp.roster[jid]: self.xmpp.roster[jid][contact].send_last_presence() + else: + self.xmpp.roster[jid].send_last_presence() except XMPPError: return -- cgit v1.2.3 From cdeae7e72fb1d624fc45ffafcef08e30ac606720 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Mon, 11 Feb 2013 19:53:57 -0800 Subject: Make legacy caps log more useful, until we support legacy caps. --- sleekxmpp/plugins/xep_0115/caps.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sleekxmpp/plugins/xep_0115/caps.py b/sleekxmpp/plugins/xep_0115/caps.py index 1d8d9e42..fac1cc7c 100644 --- a/sleekxmpp/plugins/xep_0115/caps.py +++ b/sleekxmpp/plugins/xep_0115/caps.py @@ -138,7 +138,10 @@ class XEP_0115(BasePlugin): def _process_caps(self, pres): if not pres['caps']['hash']: - log.debug("Received unsupported legacy caps.") + log.debug("Received unsupported legacy caps: %s, %s, %s", + pres['caps']['node'], + pres['caps']['ver'], + pres['caps']['ext']) self.xmpp.event('entity_caps_legacy', pres) return -- cgit v1.2.3 From 99ecb166d377ea316c95830a74efc73f5c2b9182 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Mon, 11 Feb 2013 20:01:53 -0800 Subject: More caps cleanup --- sleekxmpp/plugins/xep_0115/caps.py | 43 ++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/sleekxmpp/plugins/xep_0115/caps.py b/sleekxmpp/plugins/xep_0115/caps.py index fac1cc7c..41b5c52e 100644 --- a/sleekxmpp/plugins/xep_0115/caps.py +++ b/sleekxmpp/plugins/xep_0115/caps.py @@ -207,29 +207,32 @@ class XEP_0115(BasePlugin): form_types = [] deduped_form_types = set() for stanza in caps['substanzas']: - if isinstance(stanza, self.xmpp['xep_0004'].stanza.Form): - if 'FORM_TYPE' in stanza['fields']: - f_type = tuple(stanza['fields']['FORM_TYPE']['value']) - form_types.append(f_type) - deduped_form_types.add(f_type) - if len(form_types) != len(deduped_form_types): - log.debug("Duplicated FORM_TYPE values, " + \ - "invalid for caps") + if not isinstance(stanza, self.xmpp['xep_0004'].stanza.Form): + log.debug("Non form extension found, ignoring for caps") + caps.xml.remove(stanza.xml) + continue + if 'FORM_TYPE' in stanza['fields']: + f_type = tuple(stanza['fields']['FORM_TYPE']['value']) + form_types.append(f_type) + deduped_form_types.add(f_type) + if len(form_types) != len(deduped_form_types): + log.debug("Duplicated FORM_TYPE values, " + \ + "invalid for caps") + return False + + if len(f_type) > 1: + deduped_type = set(f_type) + if len(f_type) != len(deduped_type): + log.debug("Extra FORM_TYPE data, invalid for caps") return False - if len(f_type) > 1: - deduped_type = set(f_type) - if len(f_type) != len(deduped_type): - log.debug("Extra FORM_TYPE data, invalid for caps") - return False - - if stanza['fields']['FORM_TYPE']['type'] != 'hidden': - log.debug("Field FORM_TYPE type not 'hidden', " + \ - "ignoring form for caps") - caps.xml.remove(stanza.xml) - else: - log.debug("No FORM_TYPE found, ignoring form for caps") + if stanza['fields']['FORM_TYPE']['type'] != 'hidden': + log.debug("Field FORM_TYPE type not 'hidden', " + \ + "ignoring form for caps") caps.xml.remove(stanza.xml) + else: + log.debug("No FORM_TYPE found, ignoring form for caps") + caps.xml.remove(stanza.xml) verstring = self.generate_verstring(caps, hash) if verstring != check_verstring: -- cgit v1.2.3 From 55e50ad9798247f1730ae51ebe39e9ebf3a643a9 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Tue, 12 Feb 2013 01:30:44 -0800 Subject: Add XEP-0079 Advanced Message Processing plugin. --- sleekxmpp/plugins/__init__.py | 1 + sleekxmpp/plugins/xep_0079/__init__.py | 18 +++++++ sleekxmpp/plugins/xep_0079/amp.py | 79 ++++++++++++++++++++++++++++ sleekxmpp/plugins/xep_0079/stanza.py | 96 ++++++++++++++++++++++++++++++++++ 4 files changed, 194 insertions(+) create mode 100644 sleekxmpp/plugins/xep_0079/__init__.py create mode 100644 sleekxmpp/plugins/xep_0079/amp.py create mode 100644 sleekxmpp/plugins/xep_0079/stanza.py diff --git a/sleekxmpp/plugins/__init__.py b/sleekxmpp/plugins/__init__.py index c2d89c46..61b5ff9c 100644 --- a/sleekxmpp/plugins/__init__.py +++ b/sleekxmpp/plugins/__init__.py @@ -35,6 +35,7 @@ __all__ = [ 'xep_0066', # Out of Band Data 'xep_0077', # In-Band Registration # 'xep_0078', # Non-SASL auth. Don't automatically load + 'xep_0079', # Advanced Message Processing 'xep_0080', # User Location 'xep_0082', # XMPP Date and Time Profiles 'xep_0084', # User Avatar diff --git a/sleekxmpp/plugins/xep_0079/__init__.py b/sleekxmpp/plugins/xep_0079/__init__.py new file mode 100644 index 00000000..09e66715 --- /dev/null +++ b/sleekxmpp/plugins/xep_0079/__init__.py @@ -0,0 +1,18 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.plugins.base import register_plugin + +from sleekxmpp.plugins.xep_0079.stanza import ( + AMP, Rule, InvalidRules, UnsupportedConditions, + UnsupportedActions, FailedRules, FailedRule, + AMPFeature) +from sleekxmpp.plugins.xep_0079.amp import XEP_0079 + + +register_plugin(XEP_0079) diff --git a/sleekxmpp/plugins/xep_0079/amp.py b/sleekxmpp/plugins/xep_0079/amp.py new file mode 100644 index 00000000..918fb841 --- /dev/null +++ b/sleekxmpp/plugins/xep_0079/amp.py @@ -0,0 +1,79 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permissio +""" + +import logging + +from sleekxmpp.stanza import Message, Error, StreamFeatures +from sleekxmpp.xmlstream import register_stanza_plugin +from sleekxmpp.xmlstream.matcher import StanzaPath, MatchMany +from sleekxmpp.xmlstream.handler import Callback +from sleekxmpp.plugins import BasePlugin +from sleekxmpp.plugins.xep_0079 import stanza + + +log = logging.getLogger(__name__) + + +class XEP_0079(BasePlugin): + + """ + XEP-0079 Advanced Message Processing + """ + + name = 'xep_0079' + description = 'XEP-0079: Advanced Message Processing' + dependencies = set(['xep_0030']) + stanza = stanza + + def plugin_init(self): + register_stanza_plugin(Message, stanza.AMP) + register_stanza_plugin(Error, stanza.InvalidRules) + register_stanza_plugin(Error, stanza.UnsupportedConditions) + register_stanza_plugin(Error, stanza.UnsupportedActions) + register_stanza_plugin(Error, stanza.FailedRules) + + self.xmpp.register_handler( + Callback('AMP Response', + MatchMany([ + StanzaPath('message/error/failed_rules'), + StanzaPath('message/amp') + ]), + self._handle_amp_response)) + + if not self.xmpp.is_component: + self.xmpp.register_feature('amp', + self._handle_amp_feature, + restart=False, + order=9000) + register_stanza_plugin(StreamFeatures, stanza.AMPFeature) + + def plugin_end(self): + self.xmpp.remove_handler('AMP Response') + + def _handle_amp_response(self, msg): + log.debug('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') + if msg['type'] == 'error': + self.xmpp.event('amp_error', msg) + elif msg['amp']['status'] in ('alert', 'notify'): + self.xmpp.event('amp_%s' % msg['amp']['status'], msg) + + def _handle_amp_feature(self, features): + log.debug('Advanced Message Processing is available.') + self.xmpp.features.add('amp') + + def discover_support(self, jid=None, **iqargs): + if jid is None: + if self.xmpp.is_component: + jid = self.xmpp.server_host + else: + jid = self.xmpp.boundjid.host + + return self.xmpp['xep_0030'].get_info( + jid=jid, + node='http://jabber.org/protocol/amp', + **iqargs) diff --git a/sleekxmpp/plugins/xep_0079/stanza.py b/sleekxmpp/plugins/xep_0079/stanza.py new file mode 100644 index 00000000..cb6932d6 --- /dev/null +++ b/sleekxmpp/plugins/xep_0079/stanza.py @@ -0,0 +1,96 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from __future__ import unicode_literals + +from sleekxmpp.xmlstream import ElementBase, register_stanza_plugin + + +class AMP(ElementBase): + namespace = 'http://jabber.org/protocol/amp' + name = 'amp' + plugin_attrib = 'amp' + interfaces = set(['from', 'to', 'status', 'per_hop']) + + def get_from(self): + return JID(self._get_attr('from')) + + def set_from(self, value): + return self._set_attr('from', str(value)) + + def get_to(self): + return JID(self._get_attr('from')) + + def set_to(self, value): + return self._set_attr('to', str(value)) + + def get_per_hop(self): + return self._get_attr('per-hop') == 'true' + + def set_per_hop(self, value): + if value: + return self._set_attr('per-hop', 'true') + else: + return self._del_attr('per-hop') + + def del_per_hop(self): + return self._del_attr('per-hop') + + def add_rule(self, action, condition, value): + rule = Rule(parent=self) + rule['action'] = action + rule['condition'] = condition + rule['value'] = value + + +class Rule(ElementBase): + namespace = 'http://jabber.org/protocol/amp' + name = 'rule' + plugin_attrib = name + plugin_multi_attrib = 'rules' + interfaces = set(['action', 'condition', 'value']) + + +class InvalidRules(ElementBase): + namespace = 'http://jabber.org/protocol/amp' + name = 'invalid-rules' + plugin_attrib = 'invalid_rules' + + +class UnsupportedConditions(ElementBase): + namespace = 'http://jabber.org/protocol/amp' + name = 'unsupported-conditions' + plugin_attrib = 'unsupported_conditions' + + +class UnsupportedActions(ElementBase): + namespace = 'http://jabber.org/protocol/amp' + name = 'unsupported-actions' + plugin_attrib = 'unsupported_actions' + + +class FailedRule(Rule): + namespace = 'http://jabber.org/protocol/amp#errors' + + +class FailedRules(ElementBase): + namespace = 'http://jabber.org/protocol/amp#errors' + name = 'failed-rules' + plugin_attrib = 'failed_rules' + + +class AMPFeature(ElementBase): + namespace = 'http://jabber.org/features/amp' + name = 'amp' + + +register_stanza_plugin(AMP, Rule, iterable=True) +register_stanza_plugin(InvalidRules, Rule, iterable=True) +register_stanza_plugin(UnsupportedConditions, Rule, iterable=True) +register_stanza_plugin(UnsupportedActions, Rule, iterable=True) +register_stanza_plugin(FailedRules, FailedRule, iterable=True) -- cgit v1.2.3