diff options
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r-- | sleekxmpp/xmlstream/xmlstream.py | 73 |
1 files changed, 37 insertions, 36 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 97d977e8..0de4d035 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -33,7 +33,7 @@ if sys.version_info < (3, 0): # The time in seconds to wait before timing out waiting for response stanzas. RESPONSE_TIMEOUT = 10 -# The number of threads to use to handle XML stream events. This is not the +# The number of threads to use to handle XML stream events. This is not the # same as the number of custom event handling threads. HANDLER_THREADS must # be at least 1. HANDLER_THREADS = 1 @@ -53,9 +53,9 @@ class XMLStream(object): """ An XML stream connection manager and event dispatcher. - The XMLStream class abstracts away the issues of establishing a - connection with a server and sending and receiving XML "stanzas". - A stanza is a complete XML element that is a direct child of a root + The XMLStream class abstracts away the issues of establishing a + connection with a server and sending and receiving XML "stanzas". + A stanza is a complete XML element that is a direct child of a root document element. Two streams are used, one for each communication direction, over the same socket. Once the connection is closed, both streams should be complete and valid XML documents. @@ -65,7 +65,7 @@ class XMLStream(object): to events in a SAX XML parser. Custom -- Triggered manually. Scheduled -- Triggered based on time delays. - + Typically, stanzas are first processed by a stream event handler which will then trigger custom events to continue further processing, especially since custom event handlers may run in individual threads. @@ -75,7 +75,7 @@ class XMLStream(object): address -- The hostname and port of the server. default_ns -- The default XML namespace that will be applied to all non-namespaced stanzas. - event_queue -- A queue of stream, custom, and scheduled + event_queue -- A queue of stream, custom, and scheduled events to be processed. filesocket -- A filesocket created from the main connection socket. Required for ElementTree.iterparse. @@ -117,7 +117,7 @@ class XMLStream(object): send_xml -- Send an XML string on the stream. set_socket -- Set the stream's socket and generate a new filesocket. - start_stream_handler -- Meant to be overridden. + start_stream_handler -- Meant to be overridden. start_tls -- Establish a TLS connection and restart the stream. """ @@ -148,12 +148,12 @@ class XMLStream(object): # TODO: Integrate the new state machine. self.state = StateMachine() - self.state.addStates({'connected': False, - 'is client': False, - 'ssl': False, - 'tls': False, - 'reconnect': True, - 'processing': False, + self.state.addStates({'connected': False, + 'is client': False, + 'ssl': False, + 'tls': False, + 'reconnect': True, + 'processing': False, 'disconnecting': False}) self.address = (host, int(port)) @@ -170,7 +170,7 @@ class XMLStream(object): self.event_queue = queue.Queue() self.send_queue = queue.Queue() self.scheduler = Scheduler(self.event_queue) - + self.namespace_map = {} self.__thread = {} @@ -179,7 +179,7 @@ class XMLStream(object): self.run = True - def connect(self, host='', port=0, use_ssl=False, + def connect(self, host='', port=0, use_ssl=False, use_tls=True, reattempt=True): """ Create a new socket and connect to the server. @@ -224,7 +224,7 @@ class XMLStream(object): self.state.set('connected', True) return True except socket.error as serr: - error_msg = "Could not connect. Socket Error #%s: %s" + error_msg = "Could not connect. Socket Error #%s: %s" logging.error(error_msg % (serr.errno, serr.strerror)) time.sleep(1) @@ -265,8 +265,8 @@ class XMLStream(object): """ Reset the stream's state and reconnect to the server. """ - self.state.set('tls',False) - self.state.set('ssl',False) + self.state.set('tls', False) + self.state.set('ssl', False) time.sleep(1) self.connect() @@ -302,8 +302,8 @@ class XMLStream(object): """ if self.ssl_support: logging.info("Negotiating TLS") - self.socket = ssl.wrap_socket(self.socket, - ssl_version=ssl.PROTOCOL_TLSv1, + self.socket = ssl.wrap_socket(self.socket, + ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False) self.socket.do_handshake() self.set_socket(self.socket) @@ -347,7 +347,7 @@ class XMLStream(object): def register_handler(self, handler, before=None, after=None): """ - Add a stream event handler that will be executed when a matching + Add a stream event handler that will be executed when a matching stanza is received. Arguments: @@ -372,7 +372,7 @@ class XMLStream(object): idx += 1 return False - def schedule(self, name, seconds, callback, args=None, + def schedule(self, name, seconds, callback, args=None, kwargs=None, repeat=False): """ Schedule a callback function to execute after a given delay. @@ -387,7 +387,7 @@ class XMLStream(object): repeat -- Flag indicating if the scheduled event should be reset and repeat after executing. """ - self.scheduler.add(name, seconds, callback, args, kwargs, + self.scheduler.add(name, seconds, callback, args, kwargs, repeat, qpointer=self.event_queue) def incoming_filter(self, xml): @@ -420,11 +420,11 @@ class XMLStream(object): Arguments: threaded -- If threaded=True then event dispatcher will run - in a separate thread, allowing for the stream to be used - in the background for another application. Defaults - to True. + in a separate thread, allowing for the stream to be + used in the background for another application. + Defaults to True. - Event handlers and the send queue will be threaded + Event handlers and the send queue will be threaded regardless of this parameter's value. """ self.scheduler.process(threaded=True) @@ -450,7 +450,7 @@ class XMLStream(object): Start processing the XML streams. Processing will continue after any recoverable errors - if reconnections are allowed. + if reconnections are allowed. """ firstrun = True @@ -465,10 +465,10 @@ class XMLStream(object): self.send_raw(self.stream_header) # The call to self.__read_xml will block and prevent # the body of the loop from running until a diconnect - # occurs. After any reconnection, the stream header will + # occurs. After any reconnection, the stream header will # be resent and processing will resume. while self.run and self.__read_xml(): - # Ensure the stream header is sent for any + # Ensure the stream header is sent for any # new connections. if self.state['is client']: self.send_raw(self.stream_header) @@ -506,7 +506,7 @@ class XMLStream(object): Parse the incoming XML stream, raising stream events for each received stanza. """ - depth = 0 + depth = 0 root = None for (event, xml) in ET.iterparse(self.filesocket, (b'end', b'start')): if event == b'start': @@ -532,7 +532,7 @@ class XMLStream(object): self.__spawn_event(xml) except RestartStream: return True - if root: + if root: # Keep the root element empty of children to # save on memory use. root.clear() @@ -547,8 +547,8 @@ class XMLStream(object): Arguments: xml -- The XML stanza to analyze. """ - logging.debug("RECV: %s" % tostring(xml, - xmlns=self.default_ns, + logging.debug("RECV: %s" % tostring(xml, + xmlns=self.default_ns, stream=self)) # Apply any preprocessing filters. xml = self.incoming_filter(xml) @@ -571,7 +571,7 @@ class XMLStream(object): stanza_copy = stanza_type(self, copy.deepcopy(xml)) handler.prerun(stanza_copy) self.event_queue.put(('stanza', handler, stanza_copy)) - if handler.checkDelete(): + if handler.checkDelete(): self.__handlers.pop(self.__handlers.index(handler)) unhandled = False @@ -608,7 +608,8 @@ class XMLStream(object): try: handler.run(args[0]) except Exception as e: - logging.exception('Error processing event handler: %s' % handler.name) + error_msg = 'Error processing event handler: %s' + logging.exception(error_msg % handler.name) args[0].exception(e) elif etype == 'schedule': try: |