From 6f3cc77bb52349b1b88d0dd6edd2ba77141bc7f6 Mon Sep 17 00:00:00 2001 From: Vijay Pandurangan Date: Sat, 19 Nov 2011 11:30:44 -0800 Subject: This change stops sleekxmpp from spending huge amounts of time unnecessarily computing logging data that may never be used. This is a HUGE performance improvement; in some of my test runs, unnecessary string creation was accounting for > 60% of all CPU time. Note that using % in a string will _always_ perform the sting substitutions, because the strings are constructed before the function is called. So log.debug('%s' % expensiveoperation()) will take about the same CPU time whether or not the logging level is DEBUG or INFO. if you use , no substitutions are performed unless the string is actually logged --- sleekxmpp/xmlstream/handler/waiter.py | 2 +- sleekxmpp/xmlstream/matcher/xmlmask.py | 3 +-- sleekxmpp/xmlstream/xmlstream.py | 44 +++++++++++++++++----------------- 3 files changed, 24 insertions(+), 25 deletions(-) (limited to 'sleekxmpp/xmlstream') diff --git a/sleekxmpp/xmlstream/handler/waiter.py b/sleekxmpp/xmlstream/handler/waiter.py index 25dc161c..c3d0a4e1 100644 --- a/sleekxmpp/xmlstream/handler/waiter.py +++ b/sleekxmpp/xmlstream/handler/waiter.py @@ -91,7 +91,7 @@ class Waiter(BaseHandler): stanza = self._payload.get(True, timeout) except queue.Empty: stanza = False - log.warning("Timed out waiting for %s" % self.name) + log.warning("Timed out waiting for %s" , self.name) self.stream().remove_handler(self.name) return stanza diff --git a/sleekxmpp/xmlstream/matcher/xmlmask.py b/sleekxmpp/xmlstream/matcher/xmlmask.py index 53ccc9ba..cd350432 100644 --- a/sleekxmpp/xmlstream/matcher/xmlmask.py +++ b/sleekxmpp/xmlstream/matcher/xmlmask.py @@ -102,8 +102,7 @@ class MatchXMLMask(MatcherBase): try: mask = ET.fromstring(mask) except ExpatError: - log.warning("Expat error: %s\nIn parsing: %s" % ('', mask)) - + log.warning("Expat error: %s\nIn parsing: %s" , '', mask) if not use_ns: # Compare the element without using namespaces. source_tag = source.tag.split('}', 1)[-1] diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index b6d013b0..7b941bf7 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -362,7 +362,7 @@ class XMLStream(object): else: delay = min(self.reconnect_delay * 2, self.reconnect_max_delay) delay = random.normalvariate(delay, delay * 0.1) - log.debug('Waiting %s seconds before connecting.' % delay) + log.debug('Waiting %s seconds before connecting.' , delay) time.sleep(delay) if self.use_proxy: @@ -391,7 +391,7 @@ class XMLStream(object): try: if not self.use_proxy: - log.debug("Connecting to %s:%s" % self.address) + log.debug("Connecting to %s:%s" , self.address) self.socket.connect(self.address) self.set_socket(self.socket, ignore=True) @@ -435,18 +435,18 @@ class XMLStream(object): headers = '\r\n'.join(headers) + '\r\n\r\n' try: - log.debug("Connecting to proxy: %s:%s" % address) + log.debug("Connecting to proxy: %s:%s" , address) self.socket.connect(address) self.send_raw(headers, now=True) resp = '' while '\r\n\r\n' not in resp: resp += self.socket.recv(1024).decode('utf-8') - log.debug('RECV: %s' % resp) + log.debug('RECV: %s' , resp) lines = resp.split('\r\n') if '200' not in lines[0]: self.event('proxy_error', resp) - log.error('Proxy Error: %s' % lines[0]) + log.error('Proxy Error: %s' , lines[0]) return False # Proxy connection established, continue connecting @@ -510,7 +510,7 @@ class XMLStream(object): # Wait for confirmation that the stream was # closed in the other direction. self.auto_reconnect = reconnect - log.debug('Waiting for %s from server' % self.stream_footer) + log.debug('Waiting for %s from server' , self.stream_footer) self.stream_end_event.wait(4) if not self.auto_reconnect: self.stop.set() @@ -601,7 +601,7 @@ class XMLStream(object): """ if self.ssl_support: log.info("Negotiating TLS") - log.info("Using SSL version: %s" % str(self.ssl_version)) + log.info("Using SSL version: %s" , str(self.ssl_version)) if self.ca_certs is None: cert_policy = ssl.CERT_NONE else: @@ -759,7 +759,7 @@ class XMLStream(object): try: answers = resolver.query(domain, dns.rdatatype.A) except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): - log.warning("No A records for %s" % domain) + log.warning("No A records for %s" , domain) return [((domain, port), 0, 0)] except dns.exception.Timeout: log.warning("DNS resolution timed out " + \ @@ -808,7 +808,7 @@ class XMLStream(object): if self.dns_answers[0] == address: break self.dns_answers.pop(idx) - log.debug("Trying to connect to %s:%s" % address) + log.debug("Trying to connect to %s:%s" , address) return address def add_event_handler(self, name, pointer, @@ -879,7 +879,7 @@ class XMLStream(object): handler[0](copy.copy(data)) except Exception as e: error_msg = 'Error processing event handler: %s' - log.exception(error_msg % str(handler[0])) + log.exception(error_msg , str(handler[0])) if hasattr(data, 'exception'): data.exception(e) else: @@ -994,7 +994,7 @@ class XMLStream(object): Defaults to self.auto_reconnect. """ if now: - log.debug("SEND (IMMED): %s" % data) + log.debug("SEND (IMMED): %s" , data) try: data = data.encode('utf-8') total = len(data) @@ -1004,10 +1004,10 @@ class XMLStream(object): sent += self.socket.send(data[sent:]) count += 1 if count > 1: - log.debug('SENT: %d chunks' % count) + log.debug('SENT: %d chunks' , count) except Socket.error as serr: self.event('socket_error', serr) - log.warning("Failed to send %s" % data) + log.warning("Failed to send %s" , data) if reconnect is None: reconnect = self.auto_reconnect self.disconnect(reconnect) @@ -1187,7 +1187,7 @@ class XMLStream(object): Arguments: xml -- The XML stanza to analyze. """ - log.debug("RECV: %s" % tostring(xml, + log.debug("RECV: %s" , tostring(xml, xmlns=self.default_ns, stream=self)) # Apply any preprocessing filters. @@ -1232,7 +1232,7 @@ class XMLStream(object): func(*args) except Exception as e: error_msg = 'Error processing event handler: %s' - log.exception(error_msg % str(func)) + log.exception(error_msg , str(func)) if hasattr(orig, 'exception'): orig.exception(e) else: @@ -1267,12 +1267,12 @@ class XMLStream(object): handler.run(args[0]) except Exception as e: error_msg = 'Error processing stream handler: %s' - log.exception(error_msg % handler.name) + log.exception(error_msg , handler.name) orig.exception(e) elif etype == 'schedule': name = args[1] try: - log.debug('Scheduled event: %s: %s' % (name, args[0])) + log.debug('Scheduled event: %s: %s' , name, args[0]) handler(*args[0]) except Exception as e: log.exception('Error processing scheduled task') @@ -1291,7 +1291,7 @@ class XMLStream(object): func(*args) except Exception as e: error_msg = 'Error processing event handler: %s' - log.exception(error_msg % str(func)) + log.exception(error_msg , str(func)) if hasattr(orig, 'exception'): orig.exception(e) else: @@ -1324,7 +1324,7 @@ class XMLStream(object): data = self.send_queue.get(True, 1) except queue.Empty: continue - log.debug("SEND: %s" % data) + log.debug("SEND: %s" , data) try: enc_data = data.encode('utf-8') total = len(enc_data) @@ -1334,15 +1334,15 @@ class XMLStream(object): sent += self.socket.send(enc_data[sent:]) count += 1 if count > 1: - log.debug('SENT: %d chunks' % count) + log.debug('SENT: %d chunks' , count) self.send_queue.task_done() except Socket.error as serr: self.event('socket_error', serr) - log.warning("Failed to send %s" % data) + log.warning("Failed to send %s" , data) self.__failed_send_stanza = data self.disconnect(self.auto_reconnect) except Exception as ex: - log.exception('Unexpected error in send thread: %s' % ex) + log.exception('Unexpected error in send thread: %s' , ex) self.exception(ex) if not self.stop.is_set(): self.disconnect(self.auto_reconnect) -- cgit v1.2.3 From 48af3d33226a8f432b50e3ff9b54c59c2d9e4cda Mon Sep 17 00:00:00 2001 From: Vijay Pandurangan Date: Sat, 19 Nov 2011 15:59:38 -0800 Subject: remove unnecessary copies when only one handler matches. This was taking up ~ 15% of CPU on moderate load. --- sleekxmpp/xmlstream/xmlstream.py | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) (limited to 'sleekxmpp/xmlstream') diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 7b941bf7..1453a2a1 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -873,20 +873,25 @@ class XMLStream(object): event queue. All event handlers will run in the same thread. """ - for handler in self.__event_handlers.get(name, []): + handlers = self.__event_handlers.get(name, []) + for handler in handlers: + #TODO: Data should not be copied, but should be read only, + # but this might break current code so it's left for future. + + out_data = copy.copy(data) if len(handlers) > 1 else data + old_exception = getattr(data, 'exception', None) if direct: try: - handler[0](copy.copy(data)) + handler[0](out_data) except Exception as e: error_msg = 'Error processing event handler: %s' log.exception(error_msg , str(handler[0])) - if hasattr(data, 'exception'): - data.exception(e) + if old_exception: + old_exception(e) else: self.exception(e) else: - self.event_queue.put(('event', handler, copy.copy(data))) - + self.event_queue.put(('event', handler, out_data)) if handler[2]: # If the handler is disposable, we will go ahead and # remove it now instead of waiting for it to be @@ -1201,17 +1206,17 @@ class XMLStream(object): # to run "in stream" will be executed immediately; the rest will # be queued. unhandled = True - for handler in self.__handlers: - if handler.match(stanza): - stanza_copy = copy.copy(stanza) - handler.prerun(stanza_copy) - self.event_queue.put(('stanza', handler, stanza_copy)) - try: - if handler.check_delete(): - self.__handlers.remove(handler) - except: - pass # not thread safe - unhandled = False + matched_handlers = filter(lambda h: h.match(stanza), self.__handlers) + for handler in matched_handlers: + stanza_copy = copy.copy(stanza) if len(matched_handlers) > 1 else stanza + handler.prerun(stanza_copy) + self.event_queue.put(('stanza', handler, stanza_copy)) + try: + if handler.check_delete(): + self.__handlers.remove(handler) + except: + pass # not thread safe + unhandled = False # Some stanzas require responses, such as Iq queries. A default # handler will be executed immediately for this case. -- cgit v1.2.3 From 2332970cf282d76cd5e2ac80e86ab987671486b4 Mon Sep 17 00:00:00 2001 From: Vijay Pandurangan Date: Sat, 19 Nov 2011 16:02:41 -0800 Subject: elide unnecessary copy --- sleekxmpp/xmlstream/xmlstream.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'sleekxmpp/xmlstream') diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 1453a2a1..dcbb8ebb 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -1232,7 +1232,8 @@ class XMLStream(object): func -- The event handler to execute. args -- Arguments to the event handler. """ - orig = copy.copy(args[0]) + # this is always already copied before this is invoked + orig = args[0] try: func(*args) except Exception as e: -- cgit v1.2.3 From e3b9d5abbf8e825e6359f457b50fc51db44869ad Mon Sep 17 00:00:00 2001 From: Vijay Pandurangan Date: Sat, 19 Nov 2011 16:03:17 -0800 Subject: double copy --- sleekxmpp/xmlstream/xmlstream.py | 1 - 1 file changed, 1 deletion(-) (limited to 'sleekxmpp/xmlstream') diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index dcbb8ebb..e4374151 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -1285,7 +1285,6 @@ class XMLStream(object): self.exception(e) elif etype == 'event': func, threaded, disposable = handler - orig = copy.copy(args[0]) try: if threaded: x = threading.Thread( -- cgit v1.2.3