summaryrefslogtreecommitdiff
path: root/src/xmpppy-0.5.0rc1/build/lib/xmpp/session.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/xmpppy-0.5.0rc1/build/lib/xmpp/session.py')
-rw-r--r--src/xmpppy-0.5.0rc1/build/lib/xmpp/session.py349
1 files changed, 349 insertions, 0 deletions
diff --git a/src/xmpppy-0.5.0rc1/build/lib/xmpp/session.py b/src/xmpppy-0.5.0rc1/build/lib/xmpp/session.py
new file mode 100644
index 00000000..24066b32
--- /dev/null
+++ b/src/xmpppy-0.5.0rc1/build/lib/xmpp/session.py
@@ -0,0 +1,349 @@
+##
+## XMPP server
+##
+## Copyright (C) 2004 Alexey "Snake" Nezhdanov
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2, or (at your option)
+## any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+
+__version__="$Id"
+
+"""
+When your handler is called it is getting the session instance as the first argument.
+This is the difference from xmpppy 0.1 where you got the "Client" instance.
+With Session class you can have "multi-session" client instead of having
+one client for each connection. Is is specifically important when you are
+writing the server.
+"""
+
+from protocol import *
+
+# Transport-level flags
+SOCKET_UNCONNECTED =0
+SOCKET_ALIVE =1
+SOCKET_DEAD =2
+# XML-level flags
+STREAM__NOT_OPENED =1
+STREAM__OPENED =2
+STREAM__CLOSING =3
+STREAM__CLOSED =4
+# XMPP-session flags
+SESSION_NOT_AUTHED =1
+SESSION_AUTHED =2
+SESSION_BOUND =3
+SESSION_OPENED =4
+SESSION_CLOSED =5
+
+class Session:
+ """
+ The Session class instance is used for storing all session-related info like
+ credentials, socket/xml stream/session state flags, roster items (in case of
+ client type connection) etc.
+ Session object have no means of discovering is any info is ready to be read.
+ Instead you should use poll() (recomended) or select() methods for this purpose.
+ Session can be one of two types: 'server' and 'client'. 'server' session handles
+ inbound connection and 'client' one used to create an outbound one.
+ Session instance have multitude of internal attributes. The most imporant is the 'peer' one.
+ It is set once the peer is authenticated (client).
+ """
+ def __init__(self,socket,owner,xmlns=None,peer=None):
+ """ When the session is created it's type (client/server) is determined from the beginning.
+ socket argument is the pre-created socket-like object.
+ It must have the following methods: send, recv, fileno, close.
+ owner is the 'master' instance that have Dispatcher plugged into it and generally
+ will take care about all session events.
+ xmlns is the stream namespace that will be used. Client must set this argument
+ If server sets this argument than stream will be dropped if opened with some another namespace.
+ peer is the name of peer instance. This is the flag that differentiates client session from
+ server session. Client must set it to the name of the server that will be connected, server must
+ leave this argument alone.
+ """
+ self.xmlns=xmlns
+ if peer:
+ self.TYP='client'
+ self.peer=peer
+ self._socket_state=SOCKET_UNCONNECTED
+ else:
+ self.TYP='server'
+ self.peer=None
+ self._socket_state=SOCKET_ALIVE
+ self._sock=socket
+ self._send=socket.send
+ self._recv=socket.recv
+ self.fileno=socket.fileno
+ self._registered=0
+
+ self.Dispatcher=owner.Dispatcher
+ self.DBG_LINE='session'
+ self.DEBUG=owner.Dispatcher.DEBUG
+ self._expected={}
+ self._owner=owner
+ if self.TYP=='server': self.ID=`random.random()`[2:]
+ else: self.ID=None
+
+ self.sendbuffer=''
+ self._stream_pos_queued=None
+ self._stream_pos_sent=0
+ self.deliver_key_queue=[]
+ self.deliver_queue_map={}
+ self.stanza_queue=[]
+
+ self._session_state=SESSION_NOT_AUTHED
+ self.waiting_features=[]
+ for feature in [NS_TLS,NS_SASL,NS_BIND,NS_SESSION]:
+ if feature in owner.features: self.waiting_features.append(feature)
+ self.features=[]
+ self.feature_in_process=None
+ self.slave_session=None
+ self.StartStream()
+
+ def StartStream(self):
+ """ This method is used to initialise the internal xml expat parser
+ and to send initial stream header (in case of client connection).
+ Should be used after initial connection and after every stream restart."""
+ self._stream_state=STREAM__NOT_OPENED
+ self.Stream=simplexml.NodeBuilder()
+ self.Stream._dispatch_depth=2
+ self.Stream.dispatch=self._dispatch
+ self.Parse=self.Stream.Parse
+ self.Stream.stream_footer_received=self._stream_close
+ if self.TYP=='client':
+ self.Stream.stream_header_received=self._catch_stream_id
+ self._stream_open()
+ else:
+ self.Stream.stream_header_received=self._stream_open
+
+ def receive(self):
+ """ Reads all pending incoming data.
+ Raises IOError on disconnection.
+ Blocks until at least one byte is read."""
+ try: received = self._recv(10240)
+ except: received = ''
+
+ if len(received): # length of 0 means disconnect
+ self.DEBUG(`self.fileno()`+' '+received,'got')
+ else:
+ self.DEBUG('Socket error while receiving data','error')
+ self.set_socket_state(SOCKET_DEAD)
+ raise IOError("Peer disconnected")
+ return received
+
+ def sendnow(self,chunk):
+ """ Put chunk into "immidiatedly send" queue.
+ Should only be used for auth/TLS stuff and like.
+ If you just want to shedule regular stanza for delivery use enqueue method.
+ """
+ if isinstance(chunk,Node): chunk = chunk.__str__().encode('utf-8')
+ elif type(chunk)==type(u''): chunk = chunk.encode('utf-8')
+ self.enqueue(chunk)
+
+ def enqueue(self,stanza):
+ """ Takes Protocol instance as argument.
+ Puts stanza into "send" fifo queue. Items into the send queue are hold until
+ stream authenticated. After that this method is effectively the same as "sendnow" method."""
+ if isinstance(stanza,Protocol):
+ self.stanza_queue.append(stanza)
+ else: self.sendbuffer+=stanza
+ if self._socket_state>=SOCKET_ALIVE: self.push_queue()
+
+ def push_queue(self,failreason=ERR_RECIPIENT_UNAVAILABLE):
+ """ If stream is authenticated than move items from "send" queue to "immidiatedly send" queue.
+ Else if the stream is failed then return all queued stanzas with error passed as argument.
+ Otherwise do nothing."""
+ # If the stream authed - convert stanza_queue into sendbuffer and set the checkpoints
+
+ if self._stream_state>=STREAM__CLOSED or self._socket_state>=SOCKET_DEAD: # the stream failed. Return all stanzas that are still waiting for delivery.
+ self._owner.deactivatesession(self)
+ for key in self.deliver_key_queue: # Not sure. May be I
+ self._dispatch(Error(self.deliver_queue_map[key],failreason),trusted=1) # should simply re-dispatch it?
+ for stanza in self.stanza_queue: # But such action can invoke
+ self._dispatch(Error(stanza,failreason),trusted=1) # Infinite loops in case of S2S connection...
+ self.deliver_queue_map,self.deliver_key_queue,self.stanza_queue={},[],[]
+ return
+ elif self._session_state>=SESSION_AUTHED: # FIXME! Должен быть какой-то другой флаг.
+ #### LOCK_QUEUE
+ for stanza in self.stanza_queue:
+ txt=stanza.__str__().encode('utf-8')
+ self.sendbuffer+=txt
+ self._stream_pos_queued+=len(txt) # should be re-evaluated for SSL connection.
+ self.deliver_queue_map[self._stream_pos_queued]=stanza # position of the stream when stanza will be successfully and fully sent
+ self.deliver_key_queue.append(self._stream_pos_queued)
+ self.stanza_queue=[]
+ #### UNLOCK_QUEUE
+
+ def flush_queue(self):
+ """ Put the "immidiatedly send" queue content on the wire. Blocks until at least one byte sent."""
+ if self.sendbuffer:
+ try:
+ # LOCK_QUEUE
+ sent=self._send(self.sendbuffer) # Блокирующая штучка!
+ except:
+ # UNLOCK_QUEUE
+ self.set_socket_state(SOCKET_DEAD)
+ self.DEBUG("Socket error while sending data",'error')
+ return self.terminate_stream()
+ self.DEBUG(`self.fileno()`+' '+self.sendbuffer[:sent],'sent')
+ self._stream_pos_sent+=sent
+ self.sendbuffer=self.sendbuffer[sent:]
+ self._stream_pos_delivered=self._stream_pos_sent # Should be acquired from socket somehow. Take SSL into account.
+ while self.deliver_key_queue and self._stream_pos_delivered>self.deliver_key_queue[0]:
+ del self.deliver_queue_map[self.deliver_key_queue[0]]
+ self.deliver_key_queue.remove(self.deliver_key_queue[0])
+ # UNLOCK_QUEUE
+
+ def _dispatch(self,stanza,trusted=0):
+ """ This is callback that is used to pass the received stanza forth to owner's dispatcher
+ _if_ the stream is authorised. Otherwise the stanza is just dropped.
+ The 'trusted' argument is used to emulate stanza receive.
+ This method is used internally.
+ """
+ self._owner.packets+=1
+ if self._stream_state==STREAM__OPENED or trusted: # if the server really should reject all stanzas after he is closed stream (himeself)?
+ self.DEBUG(stanza.__str__(),'dispatch')
+ stanza.trusted=trusted
+ return self.Dispatcher.dispatch(stanza,self)
+
+ def _catch_stream_id(self,ns=None,tag='stream',attrs={}):
+ """ This callback is used to detect the stream namespace of incoming stream. Used internally. """
+ if not attrs.has_key('id') or not attrs['id']:
+ return self.terminate_stream(STREAM_INVALID_XML)
+ self.ID=attrs['id']
+ if not attrs.has_key('version'): self._owner.Dialback(self)
+
+ def _stream_open(self,ns=None,tag='stream',attrs={}):
+ """ This callback is used to handle opening stream tag of the incoming stream.
+ In the case of client session it just make some validation.
+ Server session also sends server headers and if the stream valid the features node.
+ Used internally. """
+ text='<?xml version="1.0" encoding="utf-8"?>\n<stream:stream'
+ if self.TYP=='client':
+ text+=' to="%s"'%self.peer
+ else:
+ text+=' id="%s"'%self.ID
+ if not attrs.has_key('to'): text+=' from="%s"'%self._owner.servernames[0]
+ else: text+=' from="%s"'%attrs['to']
+ if attrs.has_key('xml:lang'): text+=' xml:lang="%s"'%attrs['xml:lang']
+ if self.xmlns: xmlns=self.xmlns
+ else: xmlns=NS_SERVER
+ text+=' xmlns:db="%s" xmlns:stream="%s" xmlns="%s"'%(NS_DIALBACK,NS_STREAMS,xmlns)
+ if attrs.has_key('version') or self.TYP=='client': text+=' version="1.0"'
+ self.sendnow(text+'>')
+ self.set_stream_state(STREAM__OPENED)
+ if self.TYP=='client': return
+ if tag<>'stream': return self.terminate_stream(STREAM_INVALID_XML)
+ if ns<>NS_STREAMS: return self.terminate_stream(STREAM_INVALID_NAMESPACE)
+ if self.Stream.xmlns<>self.xmlns: return self.terminate_stream(STREAM_BAD_NAMESPACE_PREFIX)
+ if not attrs.has_key('to'): return self.terminate_stream(STREAM_IMPROPER_ADDRESSING)
+ if attrs['to'] not in self._owner.servernames: return self.terminate_stream(STREAM_HOST_UNKNOWN)
+ self.ourname=attrs['to'].lower()
+ if self.TYP=='server' and attrs.has_key('version'):
+ # send features
+ features=Node('stream:features')
+ if NS_TLS in self.waiting_features:
+ features.NT.starttls.setNamespace(NS_TLS)
+ features.T.starttls.NT.required
+ if NS_SASL in self.waiting_features:
+ features.NT.mechanisms.setNamespace(NS_SASL)
+ for mec in self._owner.SASL.mechanisms:
+ features.T.mechanisms.NT.mechanism=mec
+ else:
+ if NS_BIND in self.waiting_features: features.NT.bind.setNamespace(NS_BIND)
+ if NS_SESSION in self.waiting_features: features.NT.session.setNamespace(NS_SESSION)
+ self.sendnow(features)
+
+ def feature(self,feature):
+ """ Declare some stream feature as activated one. """
+ if feature not in self.features: self.features.append(feature)
+ self.unfeature(feature)
+
+ def unfeature(self,feature):
+ """ Declare some feature as illegal. Illegal features can not be used.
+ Example: BIND feature becomes illegal after Non-SASL auth. """
+ if feature in self.waiting_features: self.waiting_features.remove(feature)
+
+ def _stream_close(self,unregister=1):
+ """ Write the closing stream tag and destroy the underlaying socket. Used internally. """
+ if self._stream_state>=STREAM__CLOSED: return
+ self.set_stream_state(STREAM__CLOSING)
+ self.sendnow('</stream:stream>')
+ self.set_stream_state(STREAM__CLOSED)
+ self.push_queue() # decompose queue really since STREAM__CLOSED
+ self._owner.flush_queues()
+ if unregister: self._owner.unregistersession(self)
+ self._destroy_socket()
+
+ def terminate_stream(self,error=None,unregister=1):
+ """ Notify the peer about stream closure.
+ Ensure that xmlstream is not brokes - i.e. if the stream isn't opened yet -
+ open it before closure.
+ If the error condition is specified than create a stream error and send it along with
+ closing stream tag.
+ Emulate receiving 'unavailable' type presence just before stream closure.
+ """
+ if self._stream_state>=STREAM__CLOSING: return
+ if self._stream_state<STREAM__OPENED:
+ self.set_stream_state(STREAM__CLOSING)
+ self._stream_open()
+ else:
+ self.set_stream_state(STREAM__CLOSING)
+ p=Presence(typ='unavailable')
+ p.setNamespace(NS_CLIENT)
+ self._dispatch(p,trusted=1)
+ if error:
+ if isinstance(error,Node): self.sendnow(error)
+ else: self.sendnow(ErrorNode(error))
+ self._stream_close(unregister=unregister)
+ if self.slave_session:
+ self.slave_session.terminate_stream(STREAM_REMOTE_CONNECTION_FAILED)
+
+ def _destroy_socket(self):
+ """ Break cyclic dependancies to let python's GC free memory right now."""
+ self.Stream.dispatch=None
+ self.Stream.stream_footer_received=None
+ self.Stream.stream_header_received=None
+ self.Stream.destroy()
+ self._sock.close()
+ self.set_socket_state(SOCKET_DEAD)
+
+ def start_feature(self,f):
+ """ Declare some feature as "negotiating now" to prevent other features from start negotiating. """
+ if self.feature_in_process: raise "Starting feature %s over %s !"%(f,self.feature_in_process)
+ self.feature_in_process=f
+
+ def stop_feature(self,f):
+ """ Declare some feature as "negotiated" to allow other features start negotiating. """
+ if self.feature_in_process<>f: raise "Stopping feature %s instead of %s !"%(f,self.feature_in_process)
+ self.feature_in_process=None
+
+ def set_socket_state(self,newstate):
+ """ Change the underlaying socket state.
+ Socket starts with SOCKET_UNCONNECTED state
+ and then proceeds (possibly) to SOCKET_ALIVE
+ and then to SOCKET_DEAD """
+ if self._socket_state<newstate: self._socket_state=newstate
+
+ def set_session_state(self,newstate):
+ """ Change the session state.
+ Session starts with SESSION_NOT_AUTHED state
+ and then comes through
+ SESSION_AUTHED, SESSION_BOUND, SESSION_OPENED and SESSION_CLOSED states.
+ """
+ if self._session_state<newstate:
+ if self._session_state<SESSION_AUTHED and \
+ newstate>=SESSION_AUTHED: self._stream_pos_queued=self._stream_pos_sent
+ self._session_state=newstate
+
+ def set_stream_state(self,newstate):
+ """ Change the underlaying XML stream state
+ Stream starts with STREAM__NOT_OPENED and then proceeds with
+ STREAM__OPENED, STREAM__CLOSING and STREAM__CLOSED states.
+ Note that some features (like TLS and SASL)
+ requires stream re-start so this state can have non-linear changes. """
+ if self._stream_state<newstate: self._stream_state=newstate