summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sleekxmpp/jid.py69
-rw-r--r--sleekxmpp/plugins/xep_0030/disco.py10
-rw-r--r--sleekxmpp/stanza/iq.py37
-rw-r--r--tests/test_stream_handlers.py29
4 files changed, 110 insertions, 35 deletions
diff --git a/sleekxmpp/jid.py b/sleekxmpp/jid.py
index feab4082..8f1a81d4 100644
--- a/sleekxmpp/jid.py
+++ b/sleekxmpp/jid.py
@@ -69,6 +69,20 @@ JID_CACHE = OrderedDict()
JID_CACHE_LOCK = threading.Lock()
JID_CACHE_MAX_SIZE = 1024
+def _cache(key, parts, locked):
+ JID_CACHE[key] = (parts, locked)
+ if len(JID_CACHE) > JID_CACHE_MAX_SIZE:
+ with JID_CACHE_LOCK:
+ while len(JID_CACHE) > JID_CACHE_MAX_SIZE:
+ found = None
+ for key, item in JID_CACHE.iteritems():
+ if not item[1]: # if not locked
+ found = key
+ break
+ if not found: # more than MAX_SIZE locked
+ # warn?
+ break
+ del JID_CACHE[found]
# pylint: disable=c0103
#: The nodeprep profile of stringprep used to validate the local,
@@ -418,19 +432,29 @@ class JID(object):
# pylint: disable=W0212
def __init__(self, jid=None, **kwargs):
- jid_data = (jid, kwargs.get('local', None),
- kwargs.get('domain', None),
- kwargs.get('resource', None))
-
locked = kwargs.get('cache_lock', False)
-
- if jid_data in JID_CACHE:
- parsed_jid, locked = JID_CACHE[jid_data]
- self._jid = parsed_jid
- else:
- if jid is None:
- jid = ''
-
+ in_local = kwargs.get('local', None)
+ in_domain = kwargs.get('domain', None)
+ in_resource = kwargs.get('resource', None)
+ parts = None
+ if in_local or in_domain or in_resource:
+ parts = (in_local, in_domain, in_resource)
+
+ # only check cache if there is a jid string, or parts, not if there
+ # are both
+ self._jid = None
+ key = None
+ if (jid is not None) and (parts is None):
+ if isinstance(jid, JID):
+ # it's already good to go, and there are no additions
+ self._jid = jid._jid
+ return
+ key = jid
+ self._jid, locked = JID_CACHE.get(jid, (None, locked))
+ elif jid is None and parts is not None:
+ key = parts
+ self._jid, locked = JID_CACHE.get(parts, (None, locked))
+ if not self._jid:
if not jid:
parsed_jid = (None, None, None)
elif not isinstance(jid, JID):
@@ -440,27 +464,16 @@ class JID(object):
local, domain, resource = parsed_jid
- local = kwargs.get('local', local)
- domain = kwargs.get('domain', domain)
- resource = kwargs.get('resource', resource)
-
if 'local' in kwargs:
- local = _escape_node(local)
+ local = _escape_node(in_local)
if 'domain' in kwargs:
- domain = _validate_domain(domain)
+ domain = _validate_domain(in_domain)
if 'resource' in kwargs:
- resource = _validate_resource(resource)
+ resource = _validate_resource(in_resource)
self._jid = (local, domain, resource)
-
- JID_CACHE[jid_data] = (self._jid, locked)
- if len(JID_CACHE) > JID_CACHE_MAX_SIZE:
- with JID_CACHE_LOCK:
- key, item = JID_CACHE.popitem(False)
- if item[1]:
- # Need to reinsert locked JIDs
- JID_CACHE[key] = item
-
+ if key:
+ _cache(key, self._jid, locked)
def unescape(self):
"""Return an unescaped JID object.
diff --git a/sleekxmpp/plugins/xep_0030/disco.py b/sleekxmpp/plugins/xep_0030/disco.py
index 278b4a34..8a397923 100644
--- a/sleekxmpp/plugins/xep_0030/disco.py
+++ b/sleekxmpp/plugins/xep_0030/disco.py
@@ -324,6 +324,8 @@ class XEP_0030(BasePlugin):
callback -- Optional callback to execute when a reply is
received instead of blocking and waiting for
the reply.
+ timeout_callback -- Optional callback to execute when no result
+ has been received in timeout seconds.
"""
if local is None:
if jid is not None and not isinstance(jid, JID):
@@ -364,7 +366,8 @@ class XEP_0030(BasePlugin):
iq['disco_info']['node'] = node if node else ''
return iq.send(timeout=kwargs.get('timeout', None),
block=kwargs.get('block', True),
- callback=kwargs.get('callback', None))
+ callback=kwargs.get('callback', None),
+ timeout_callback=kwargs.get('timeout_callback', None))
def set_info(self, jid=None, node=None, info=None):
"""
@@ -405,6 +408,8 @@ class XEP_0030(BasePlugin):
iterator -- If True, return a result set iterator using
the XEP-0059 plugin, if the plugin is loaded.
Otherwise the parameter is ignored.
+ timeout_callback -- Optional callback to execute when no result
+ has been received in timeout seconds.
"""
if local or local is None and jid is None:
items = self.api['get_items'](jid, node,
@@ -423,7 +428,8 @@ class XEP_0030(BasePlugin):
else:
return iq.send(timeout=kwargs.get('timeout', None),
block=kwargs.get('block', True),
- callback=kwargs.get('callback', None))
+ callback=kwargs.get('callback', None),
+ timeout_callback=kwargs.get('timeout_callback', None))
def set_items(self, jid=None, node=None, **kwargs):
"""
diff --git a/sleekxmpp/stanza/iq.py b/sleekxmpp/stanza/iq.py
index f45b3c67..71c0444d 100644
--- a/sleekxmpp/stanza/iq.py
+++ b/sleekxmpp/stanza/iq.py
@@ -154,7 +154,7 @@ class Iq(RootStanza):
StanzaBase.reply(self, clear)
return self
- def send(self, block=True, timeout=None, callback=None, now=False):
+ def send(self, block=True, timeout=None, callback=None, now=False, timeout_callback=None):
"""
Send an <iq> stanza over the XML stream.
@@ -181,15 +181,32 @@ class Iq(RootStanza):
now -- Indicates if the send queue should be skipped and send
the stanza immediately. Used during stream
initialization. Defaults to False.
+ timeout_callback -- Optional reference to a stream handler function.
+ Will be executed when the timeout expires before a
+ response has been received with the originally-sent IQ
+ stanza. Only called if there is a callback parameter
+ (and therefore are in async mode).
"""
if timeout is None:
timeout = self.stream.response_timeout
if callback is not None and self['type'] in ('get', 'set'):
handler_name = 'IqCallback_%s' % self['id']
- handler = Callback(handler_name,
- MatcherId(self['id']),
- callback,
- once=True)
+ if timeout_callback:
+ self.callback = callback
+ self.timeout_callback = timeout_callback
+ self.stream.schedule('IqTimeout_%s' % self['id'],
+ timeout,
+ self._fire_timeout,
+ repeat=False)
+ handler = Callback(handler_name,
+ MatcherId(self['id']),
+ self._handle_result,
+ once=True)
+ else:
+ handler = Callback(handler_name,
+ MatcherId(self['id']),
+ callback,
+ once=True)
self.stream.register_handler(handler)
StanzaBase.send(self, now=now)
return handler_name
@@ -206,6 +223,16 @@ class Iq(RootStanza):
else:
return StanzaBase.send(self, now=now)
+ def _handle_result(self, iq):
+ # we got the IQ, so don't fire the timeout
+ self.stream.scheduler.remove('IqTimeout_%s' % self['id'])
+ self.callback(iq)
+
+ def _fire_timeout(self):
+ # don't fire the handler for the IQ, if it finally does come in
+ self.stream.remove_handler('IqCallback_%s' % self['id'])
+ self.timeout_callback(self)
+
def _set_stanza_values(self, values):
"""
Set multiple stanza interface values using a dictionary.
diff --git a/tests/test_stream_handlers.py b/tests/test_stream_handlers.py
index 7fd4e648..d3850a94 100644
--- a/tests/test_stream_handlers.py
+++ b/tests/test_stream_handlers.py
@@ -153,6 +153,35 @@ class TestHandlers(SleekTest):
self.failUnless(events == ['foo'],
"Iq callback was not executed: %s" % events)
+ def testIqTimeoutCallback(self):
+ """Test that iq.send(tcallback=handle_foo, timeout_callback=handle_timeout) works."""
+ events = []
+
+ def handle_foo(iq):
+ events.append('foo')
+
+ def handle_timeout(iq):
+ events.append('timeout')
+
+ iq = self.Iq()
+ iq['type'] = 'get'
+ iq['id'] = 'test-foo'
+ iq['to'] = 'user@localhost'
+ iq['query'] = 'foo'
+ iq.send(callback=handle_foo, timeout_callback=handle_timeout, timeout=0.05)
+
+ self.send("""
+ <iq type="get" id="test-foo" to="user@localhost">
+ <query xmlns="foo" />
+ </iq>
+ """)
+
+ # Give event queue time to process
+ time.sleep(1)
+
+ self.failUnless(events == ['timeout'],
+ "Iq timeout was not executed: %s" % events)
+
def testMultipleHandlersForStanza(self):
"""
Test that multiple handlers for a single stanza work