diff options
Diffstat (limited to 'src/xmpppy-0.5.0rc1/xmpp/session.py')
-rw-r--r-- | src/xmpppy-0.5.0rc1/xmpp/session.py | 349 |
1 files changed, 0 insertions, 349 deletions
diff --git a/src/xmpppy-0.5.0rc1/xmpp/session.py b/src/xmpppy-0.5.0rc1/xmpp/session.py deleted file mode 100644 index 24066b32..00000000 --- a/src/xmpppy-0.5.0rc1/xmpp/session.py +++ /dev/null @@ -1,349 +0,0 @@ -## -## XMPP server -## -## Copyright (C) 2004 Alexey "Snake" Nezhdanov -## -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 2, or (at your option) -## any later version. -## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. - -__version__="$Id" - -""" -When your handler is called it is getting the session instance as the first argument. -This is the difference from xmpppy 0.1 where you got the "Client" instance. -With Session class you can have "multi-session" client instead of having -one client for each connection. Is is specifically important when you are -writing the server. -""" - -from protocol import * - -# Transport-level flags -SOCKET_UNCONNECTED =0 -SOCKET_ALIVE =1 -SOCKET_DEAD =2 -# XML-level flags -STREAM__NOT_OPENED =1 -STREAM__OPENED =2 -STREAM__CLOSING =3 -STREAM__CLOSED =4 -# XMPP-session flags -SESSION_NOT_AUTHED =1 -SESSION_AUTHED =2 -SESSION_BOUND =3 -SESSION_OPENED =4 -SESSION_CLOSED =5 - -class Session: - """ - The Session class instance is used for storing all session-related info like - credentials, socket/xml stream/session state flags, roster items (in case of - client type connection) etc. - Session object have no means of discovering is any info is ready to be read. - Instead you should use poll() (recomended) or select() methods for this purpose. - Session can be one of two types: 'server' and 'client'. 'server' session handles - inbound connection and 'client' one used to create an outbound one. - Session instance have multitude of internal attributes. The most imporant is the 'peer' one. - It is set once the peer is authenticated (client). - """ - def __init__(self,socket,owner,xmlns=None,peer=None): - """ When the session is created it's type (client/server) is determined from the beginning. - socket argument is the pre-created socket-like object. - It must have the following methods: send, recv, fileno, close. - owner is the 'master' instance that have Dispatcher plugged into it and generally - will take care about all session events. - xmlns is the stream namespace that will be used. Client must set this argument - If server sets this argument than stream will be dropped if opened with some another namespace. - peer is the name of peer instance. This is the flag that differentiates client session from - server session. Client must set it to the name of the server that will be connected, server must - leave this argument alone. - """ - self.xmlns=xmlns - if peer: - self.TYP='client' - self.peer=peer - self._socket_state=SOCKET_UNCONNECTED - else: - self.TYP='server' - self.peer=None - self._socket_state=SOCKET_ALIVE - self._sock=socket - self._send=socket.send - self._recv=socket.recv - self.fileno=socket.fileno - self._registered=0 - - self.Dispatcher=owner.Dispatcher - self.DBG_LINE='session' - self.DEBUG=owner.Dispatcher.DEBUG - self._expected={} - self._owner=owner - if self.TYP=='server': self.ID=`random.random()`[2:] - else: self.ID=None - - self.sendbuffer='' - self._stream_pos_queued=None - self._stream_pos_sent=0 - self.deliver_key_queue=[] - self.deliver_queue_map={} - self.stanza_queue=[] - - self._session_state=SESSION_NOT_AUTHED - self.waiting_features=[] - for feature in [NS_TLS,NS_SASL,NS_BIND,NS_SESSION]: - if feature in owner.features: self.waiting_features.append(feature) - self.features=[] - self.feature_in_process=None - self.slave_session=None - self.StartStream() - - def StartStream(self): - """ This method is used to initialise the internal xml expat parser - and to send initial stream header (in case of client connection). - Should be used after initial connection and after every stream restart.""" - self._stream_state=STREAM__NOT_OPENED - self.Stream=simplexml.NodeBuilder() - self.Stream._dispatch_depth=2 - self.Stream.dispatch=self._dispatch - self.Parse=self.Stream.Parse - self.Stream.stream_footer_received=self._stream_close - if self.TYP=='client': - self.Stream.stream_header_received=self._catch_stream_id - self._stream_open() - else: - self.Stream.stream_header_received=self._stream_open - - def receive(self): - """ Reads all pending incoming data. - Raises IOError on disconnection. - Blocks until at least one byte is read.""" - try: received = self._recv(10240) - except: received = '' - - if len(received): # length of 0 means disconnect - self.DEBUG(`self.fileno()`+' '+received,'got') - else: - self.DEBUG('Socket error while receiving data','error') - self.set_socket_state(SOCKET_DEAD) - raise IOError("Peer disconnected") - return received - - def sendnow(self,chunk): - """ Put chunk into "immidiatedly send" queue. - Should only be used for auth/TLS stuff and like. - If you just want to shedule regular stanza for delivery use enqueue method. - """ - if isinstance(chunk,Node): chunk = chunk.__str__().encode('utf-8') - elif type(chunk)==type(u''): chunk = chunk.encode('utf-8') - self.enqueue(chunk) - - def enqueue(self,stanza): - """ Takes Protocol instance as argument. - Puts stanza into "send" fifo queue. Items into the send queue are hold until - stream authenticated. After that this method is effectively the same as "sendnow" method.""" - if isinstance(stanza,Protocol): - self.stanza_queue.append(stanza) - else: self.sendbuffer+=stanza - if self._socket_state>=SOCKET_ALIVE: self.push_queue() - - def push_queue(self,failreason=ERR_RECIPIENT_UNAVAILABLE): - """ If stream is authenticated than move items from "send" queue to "immidiatedly send" queue. - Else if the stream is failed then return all queued stanzas with error passed as argument. - Otherwise do nothing.""" - # If the stream authed - convert stanza_queue into sendbuffer and set the checkpoints - - if self._stream_state>=STREAM__CLOSED or self._socket_state>=SOCKET_DEAD: # the stream failed. Return all stanzas that are still waiting for delivery. - self._owner.deactivatesession(self) - for key in self.deliver_key_queue: # Not sure. May be I - self._dispatch(Error(self.deliver_queue_map[key],failreason),trusted=1) # should simply re-dispatch it? - for stanza in self.stanza_queue: # But such action can invoke - self._dispatch(Error(stanza,failreason),trusted=1) # Infinite loops in case of S2S connection... - self.deliver_queue_map,self.deliver_key_queue,self.stanza_queue={},[],[] - return - elif self._session_state>=SESSION_AUTHED: # FIXME! Должен быть какой-то другой флаг. - #### LOCK_QUEUE - for stanza in self.stanza_queue: - txt=stanza.__str__().encode('utf-8') - self.sendbuffer+=txt - self._stream_pos_queued+=len(txt) # should be re-evaluated for SSL connection. - self.deliver_queue_map[self._stream_pos_queued]=stanza # position of the stream when stanza will be successfully and fully sent - self.deliver_key_queue.append(self._stream_pos_queued) - self.stanza_queue=[] - #### UNLOCK_QUEUE - - def flush_queue(self): - """ Put the "immidiatedly send" queue content on the wire. Blocks until at least one byte sent.""" - if self.sendbuffer: - try: - # LOCK_QUEUE - sent=self._send(self.sendbuffer) # Блокирующая штучка! - except: - # UNLOCK_QUEUE - self.set_socket_state(SOCKET_DEAD) - self.DEBUG("Socket error while sending data",'error') - return self.terminate_stream() - self.DEBUG(`self.fileno()`+' '+self.sendbuffer[:sent],'sent') - self._stream_pos_sent+=sent - self.sendbuffer=self.sendbuffer[sent:] - self._stream_pos_delivered=self._stream_pos_sent # Should be acquired from socket somehow. Take SSL into account. - while self.deliver_key_queue and self._stream_pos_delivered>self.deliver_key_queue[0]: - del self.deliver_queue_map[self.deliver_key_queue[0]] - self.deliver_key_queue.remove(self.deliver_key_queue[0]) - # UNLOCK_QUEUE - - def _dispatch(self,stanza,trusted=0): - """ This is callback that is used to pass the received stanza forth to owner's dispatcher - _if_ the stream is authorised. Otherwise the stanza is just dropped. - The 'trusted' argument is used to emulate stanza receive. - This method is used internally. - """ - self._owner.packets+=1 - if self._stream_state==STREAM__OPENED or trusted: # if the server really should reject all stanzas after he is closed stream (himeself)? - self.DEBUG(stanza.__str__(),'dispatch') - stanza.trusted=trusted - return self.Dispatcher.dispatch(stanza,self) - - def _catch_stream_id(self,ns=None,tag='stream',attrs={}): - """ This callback is used to detect the stream namespace of incoming stream. Used internally. """ - if not attrs.has_key('id') or not attrs['id']: - return self.terminate_stream(STREAM_INVALID_XML) - self.ID=attrs['id'] - if not attrs.has_key('version'): self._owner.Dialback(self) - - def _stream_open(self,ns=None,tag='stream',attrs={}): - """ This callback is used to handle opening stream tag of the incoming stream. - In the case of client session it just make some validation. - Server session also sends server headers and if the stream valid the features node. - Used internally. """ - text='<?xml version="1.0" encoding="utf-8"?>\n<stream:stream' - if self.TYP=='client': - text+=' to="%s"'%self.peer - else: - text+=' id="%s"'%self.ID - if not attrs.has_key('to'): text+=' from="%s"'%self._owner.servernames[0] - else: text+=' from="%s"'%attrs['to'] - if attrs.has_key('xml:lang'): text+=' xml:lang="%s"'%attrs['xml:lang'] - if self.xmlns: xmlns=self.xmlns - else: xmlns=NS_SERVER - text+=' xmlns:db="%s" xmlns:stream="%s" xmlns="%s"'%(NS_DIALBACK,NS_STREAMS,xmlns) - if attrs.has_key('version') or self.TYP=='client': text+=' version="1.0"' - self.sendnow(text+'>') - self.set_stream_state(STREAM__OPENED) - if self.TYP=='client': return - if tag<>'stream': return self.terminate_stream(STREAM_INVALID_XML) - if ns<>NS_STREAMS: return self.terminate_stream(STREAM_INVALID_NAMESPACE) - if self.Stream.xmlns<>self.xmlns: return self.terminate_stream(STREAM_BAD_NAMESPACE_PREFIX) - if not attrs.has_key('to'): return self.terminate_stream(STREAM_IMPROPER_ADDRESSING) - if attrs['to'] not in self._owner.servernames: return self.terminate_stream(STREAM_HOST_UNKNOWN) - self.ourname=attrs['to'].lower() - if self.TYP=='server' and attrs.has_key('version'): - # send features - features=Node('stream:features') - if NS_TLS in self.waiting_features: - features.NT.starttls.setNamespace(NS_TLS) - features.T.starttls.NT.required - if NS_SASL in self.waiting_features: - features.NT.mechanisms.setNamespace(NS_SASL) - for mec in self._owner.SASL.mechanisms: - features.T.mechanisms.NT.mechanism=mec - else: - if NS_BIND in self.waiting_features: features.NT.bind.setNamespace(NS_BIND) - if NS_SESSION in self.waiting_features: features.NT.session.setNamespace(NS_SESSION) - self.sendnow(features) - - def feature(self,feature): - """ Declare some stream feature as activated one. """ - if feature not in self.features: self.features.append(feature) - self.unfeature(feature) - - def unfeature(self,feature): - """ Declare some feature as illegal. Illegal features can not be used. - Example: BIND feature becomes illegal after Non-SASL auth. """ - if feature in self.waiting_features: self.waiting_features.remove(feature) - - def _stream_close(self,unregister=1): - """ Write the closing stream tag and destroy the underlaying socket. Used internally. """ - if self._stream_state>=STREAM__CLOSED: return - self.set_stream_state(STREAM__CLOSING) - self.sendnow('</stream:stream>') - self.set_stream_state(STREAM__CLOSED) - self.push_queue() # decompose queue really since STREAM__CLOSED - self._owner.flush_queues() - if unregister: self._owner.unregistersession(self) - self._destroy_socket() - - def terminate_stream(self,error=None,unregister=1): - """ Notify the peer about stream closure. - Ensure that xmlstream is not brokes - i.e. if the stream isn't opened yet - - open it before closure. - If the error condition is specified than create a stream error and send it along with - closing stream tag. - Emulate receiving 'unavailable' type presence just before stream closure. - """ - if self._stream_state>=STREAM__CLOSING: return - if self._stream_state<STREAM__OPENED: - self.set_stream_state(STREAM__CLOSING) - self._stream_open() - else: - self.set_stream_state(STREAM__CLOSING) - p=Presence(typ='unavailable') - p.setNamespace(NS_CLIENT) - self._dispatch(p,trusted=1) - if error: - if isinstance(error,Node): self.sendnow(error) - else: self.sendnow(ErrorNode(error)) - self._stream_close(unregister=unregister) - if self.slave_session: - self.slave_session.terminate_stream(STREAM_REMOTE_CONNECTION_FAILED) - - def _destroy_socket(self): - """ Break cyclic dependancies to let python's GC free memory right now.""" - self.Stream.dispatch=None - self.Stream.stream_footer_received=None - self.Stream.stream_header_received=None - self.Stream.destroy() - self._sock.close() - self.set_socket_state(SOCKET_DEAD) - - def start_feature(self,f): - """ Declare some feature as "negotiating now" to prevent other features from start negotiating. """ - if self.feature_in_process: raise "Starting feature %s over %s !"%(f,self.feature_in_process) - self.feature_in_process=f - - def stop_feature(self,f): - """ Declare some feature as "negotiated" to allow other features start negotiating. """ - if self.feature_in_process<>f: raise "Stopping feature %s instead of %s !"%(f,self.feature_in_process) - self.feature_in_process=None - - def set_socket_state(self,newstate): - """ Change the underlaying socket state. - Socket starts with SOCKET_UNCONNECTED state - and then proceeds (possibly) to SOCKET_ALIVE - and then to SOCKET_DEAD """ - if self._socket_state<newstate: self._socket_state=newstate - - def set_session_state(self,newstate): - """ Change the session state. - Session starts with SESSION_NOT_AUTHED state - and then comes through - SESSION_AUTHED, SESSION_BOUND, SESSION_OPENED and SESSION_CLOSED states. - """ - if self._session_state<newstate: - if self._session_state<SESSION_AUTHED and \ - newstate>=SESSION_AUTHED: self._stream_pos_queued=self._stream_pos_sent - self._session_state=newstate - - def set_stream_state(self,newstate): - """ Change the underlaying XML stream state - Stream starts with STREAM__NOT_OPENED and then proceeds with - STREAM__OPENED, STREAM__CLOSING and STREAM__CLOSED states. - Note that some features (like TLS and SASL) - requires stream re-start so this state can have non-linear changes. """ - if self._stream_state<newstate: self._stream_state=newstate |