summaryrefslogtreecommitdiff
path: root/src/xmpppy-0.5.0rc1/xmpp/session.py
blob: 24066b32785657acdf81b3230386025db6f003c5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
##
##   XMPP server
##
##   Copyright (C) 2004 Alexey "Snake" Nezhdanov
##
##   This program is free software; you can redistribute it and/or modify
##   it under the terms of the GNU General Public License as published by
##   the Free Software Foundation; either version 2, or (at your option)
##   any later version.
##
##   This program is distributed in the hope that it will be useful,
##   but WITHOUT ANY WARRANTY; without even the implied warranty of
##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
##   GNU General Public License for more details.

__version__="$Id"

"""
When your handler is called it is getting the session instance as the first argument.
This is the difference from xmpppy 0.1 where you got the "Client" instance.
With Session class you can have "multi-session" client instead of having
one client for each connection. Is is specifically important when you are
writing the server.
"""

from protocol import *

# Transport-level flags
SOCKET_UNCONNECTED  =0
SOCKET_ALIVE        =1
SOCKET_DEAD         =2
# XML-level flags
STREAM__NOT_OPENED =1
STREAM__OPENED     =2
STREAM__CLOSING    =3
STREAM__CLOSED     =4
# XMPP-session flags
SESSION_NOT_AUTHED =1
SESSION_AUTHED     =2
SESSION_BOUND      =3
SESSION_OPENED     =4
SESSION_CLOSED     =5

class Session:
    """
    The Session class instance is used for storing all session-related info like 
    credentials, socket/xml stream/session state flags, roster items (in case of
    client type connection) etc.
    Session object have no means of discovering is any info is ready to be read.
    Instead you should use poll() (recomended) or select() methods for this purpose.
    Session can be one of two types: 'server' and 'client'. 'server' session handles
    inbound connection and 'client' one used to create an outbound one.
    Session instance have multitude of internal attributes. The most imporant is the 'peer' one.
    It is set once the peer is authenticated (client).
    """
    def __init__(self,socket,owner,xmlns=None,peer=None):
        """ When the session is created it's type (client/server) is determined from the beginning.
            socket argument is the pre-created socket-like object.
            It must have the following methods: send, recv, fileno, close.
            owner is the 'master' instance that have Dispatcher plugged into it and generally
            will take care about all session events.
            xmlns is the stream namespace that will be used. Client must set this argument
            If server sets this argument than stream will be dropped if opened with some another namespace.
            peer is the name of peer instance. This is the flag that differentiates client session from
            server session. Client must set it to the name of the server that will be connected, server must
            leave this argument alone.
            """
        self.xmlns=xmlns
        if peer:
            self.TYP='client'
            self.peer=peer
            self._socket_state=SOCKET_UNCONNECTED
        else:
            self.TYP='server'
            self.peer=None
            self._socket_state=SOCKET_ALIVE
        self._sock=socket
        self._send=socket.send
        self._recv=socket.recv
        self.fileno=socket.fileno
        self._registered=0

        self.Dispatcher=owner.Dispatcher
        self.DBG_LINE='session'
        self.DEBUG=owner.Dispatcher.DEBUG
        self._expected={}
        self._owner=owner
        if self.TYP=='server': self.ID=`random.random()`[2:]
        else: self.ID=None

        self.sendbuffer=''
        self._stream_pos_queued=None
        self._stream_pos_sent=0
        self.deliver_key_queue=[]
        self.deliver_queue_map={}
        self.stanza_queue=[]

        self._session_state=SESSION_NOT_AUTHED
        self.waiting_features=[]
        for feature in [NS_TLS,NS_SASL,NS_BIND,NS_SESSION]:
            if feature in owner.features: self.waiting_features.append(feature)
        self.features=[]
        self.feature_in_process=None
        self.slave_session=None
        self.StartStream()

    def StartStream(self):
        """ This method is used to initialise the internal xml expat parser
            and to send initial stream header (in case of client connection).
            Should be used after initial connection and after every stream restart."""
        self._stream_state=STREAM__NOT_OPENED
        self.Stream=simplexml.NodeBuilder()
        self.Stream._dispatch_depth=2
        self.Stream.dispatch=self._dispatch
        self.Parse=self.Stream.Parse
        self.Stream.stream_footer_received=self._stream_close
        if self.TYP=='client':
            self.Stream.stream_header_received=self._catch_stream_id
            self._stream_open()
        else:
            self.Stream.stream_header_received=self._stream_open

    def receive(self):
        """ Reads all pending incoming data.
            Raises IOError on disconnection.
            Blocks until at least one byte is read."""
        try: received = self._recv(10240)
        except: received = ''

        if len(received): # length of 0 means disconnect
            self.DEBUG(`self.fileno()`+' '+received,'got')
        else:
            self.DEBUG('Socket error while receiving data','error')
            self.set_socket_state(SOCKET_DEAD)
            raise IOError("Peer disconnected")
        return received

    def sendnow(self,chunk):
        """ Put chunk into "immidiatedly send" queue.
            Should only be used for auth/TLS stuff and like.
            If you just want to shedule regular stanza for delivery use enqueue method.
        """
        if isinstance(chunk,Node): chunk = chunk.__str__().encode('utf-8')
        elif type(chunk)==type(u''): chunk = chunk.encode('utf-8')
        self.enqueue(chunk)

    def enqueue(self,stanza):
        """ Takes Protocol instance as argument.
            Puts stanza into "send" fifo queue. Items into the send queue are hold until
            stream authenticated. After that this method is effectively the same as "sendnow" method."""
        if isinstance(stanza,Protocol):
            self.stanza_queue.append(stanza)
        else: self.sendbuffer+=stanza
        if self._socket_state>=SOCKET_ALIVE: self.push_queue()

    def push_queue(self,failreason=ERR_RECIPIENT_UNAVAILABLE):
        """ If stream is authenticated than move items from "send" queue to "immidiatedly send" queue.
            Else if the stream is failed then return all queued stanzas with error passed as argument.
            Otherwise do nothing."""
        # If the stream authed - convert stanza_queue into sendbuffer and set the checkpoints

        if self._stream_state>=STREAM__CLOSED or self._socket_state>=SOCKET_DEAD: # the stream failed. Return all stanzas that are still waiting for delivery.
            self._owner.deactivatesession(self)
            for key in self.deliver_key_queue:                                          # Not sure. May be I
                self._dispatch(Error(self.deliver_queue_map[key],failreason),trusted=1) # should simply re-dispatch it?
            for stanza in self.stanza_queue:                                            # But such action can invoke
                self._dispatch(Error(stanza,failreason),trusted=1)                      # Infinite loops in case of S2S connection...
            self.deliver_queue_map,self.deliver_key_queue,self.stanza_queue={},[],[]
            return
        elif self._session_state>=SESSION_AUTHED:       # FIXME! äÏÌÖÅÎ ÂÙÔØ ËÁËÏÊ-ÔÏ ÄÒÕÇÏÊ ÆÌÁÇ.
            #### LOCK_QUEUE
            for stanza in self.stanza_queue:
                txt=stanza.__str__().encode('utf-8')
                self.sendbuffer+=txt
                self._stream_pos_queued+=len(txt)       # should be re-evaluated for SSL connection.
                self.deliver_queue_map[self._stream_pos_queued]=stanza     # position of the stream when stanza will be successfully and fully sent
                self.deliver_key_queue.append(self._stream_pos_queued)
            self.stanza_queue=[]
            #### UNLOCK_QUEUE

    def flush_queue(self):
        """ Put the "immidiatedly send" queue content on the wire. Blocks until at least one byte sent."""
        if self.sendbuffer:
            try:
                # LOCK_QUEUE
                sent=self._send(self.sendbuffer)    # âÌÏËÉÒÕÀÝÁÑ ÛÔÕÞËÁ!
            except:
                # UNLOCK_QUEUE
                self.set_socket_state(SOCKET_DEAD)
                self.DEBUG("Socket error while sending data",'error')
                return self.terminate_stream()
            self.DEBUG(`self.fileno()`+' '+self.sendbuffer[:sent],'sent')
            self._stream_pos_sent+=sent
            self.sendbuffer=self.sendbuffer[sent:]
            self._stream_pos_delivered=self._stream_pos_sent            # Should be acquired from socket somehow. Take SSL into account.
            while self.deliver_key_queue and self._stream_pos_delivered>self.deliver_key_queue[0]:
                del self.deliver_queue_map[self.deliver_key_queue[0]]
                self.deliver_key_queue.remove(self.deliver_key_queue[0])
            # UNLOCK_QUEUE

    def _dispatch(self,stanza,trusted=0):
        """ This is callback that is used to pass the received stanza forth to owner's dispatcher
            _if_ the stream is authorised. Otherwise the stanza is just dropped.
            The 'trusted' argument is used to emulate stanza receive.
            This method is used internally.
        """
        self._owner.packets+=1
        if self._stream_state==STREAM__OPENED or trusted:               # if the server really should reject all stanzas after he is closed stream (himeself)?
            self.DEBUG(stanza.__str__(),'dispatch')
            stanza.trusted=trusted
            return self.Dispatcher.dispatch(stanza,self)

    def _catch_stream_id(self,ns=None,tag='stream',attrs={}):
        """ This callback is used to detect the stream namespace of incoming stream. Used internally. """
        if not attrs.has_key('id') or not attrs['id']:
            return self.terminate_stream(STREAM_INVALID_XML)
        self.ID=attrs['id']
        if not attrs.has_key('version'): self._owner.Dialback(self)

    def _stream_open(self,ns=None,tag='stream',attrs={}):
        """ This callback is used to handle opening stream tag of the incoming stream.
            In the case of client session it just make some validation.
            Server session also sends server headers and if the stream valid the features node.
            Used internally. """
        text='<?xml version="1.0" encoding="utf-8"?>\n<stream:stream'
        if self.TYP=='client':
            text+=' to="%s"'%self.peer
        else:
            text+=' id="%s"'%self.ID
            if not attrs.has_key('to'): text+=' from="%s"'%self._owner.servernames[0]
            else: text+=' from="%s"'%attrs['to']
        if attrs.has_key('xml:lang'): text+=' xml:lang="%s"'%attrs['xml:lang']
        if self.xmlns: xmlns=self.xmlns
        else: xmlns=NS_SERVER
        text+=' xmlns:db="%s" xmlns:stream="%s" xmlns="%s"'%(NS_DIALBACK,NS_STREAMS,xmlns)
        if attrs.has_key('version') or self.TYP=='client': text+=' version="1.0"'
        self.sendnow(text+'>')
        self.set_stream_state(STREAM__OPENED)
        if self.TYP=='client': return
        if tag<>'stream': return self.terminate_stream(STREAM_INVALID_XML)
        if ns<>NS_STREAMS: return self.terminate_stream(STREAM_INVALID_NAMESPACE)
        if self.Stream.xmlns<>self.xmlns: return self.terminate_stream(STREAM_BAD_NAMESPACE_PREFIX)
        if not attrs.has_key('to'): return self.terminate_stream(STREAM_IMPROPER_ADDRESSING)
        if attrs['to'] not in self._owner.servernames: return self.terminate_stream(STREAM_HOST_UNKNOWN)
        self.ourname=attrs['to'].lower()
        if self.TYP=='server' and attrs.has_key('version'):
            # send features
            features=Node('stream:features')
            if NS_TLS in self.waiting_features:
                features.NT.starttls.setNamespace(NS_TLS)
                features.T.starttls.NT.required
            if NS_SASL in self.waiting_features:
                features.NT.mechanisms.setNamespace(NS_SASL)
                for mec in self._owner.SASL.mechanisms:
                    features.T.mechanisms.NT.mechanism=mec
            else:
                if NS_BIND in self.waiting_features: features.NT.bind.setNamespace(NS_BIND)
                if NS_SESSION in self.waiting_features: features.NT.session.setNamespace(NS_SESSION)
            self.sendnow(features)

    def feature(self,feature):
        """ Declare some stream feature as activated one. """
        if feature not in self.features: self.features.append(feature)
        self.unfeature(feature)

    def unfeature(self,feature):
        """ Declare some feature as illegal. Illegal features can not be used.
            Example: BIND feature becomes illegal after Non-SASL auth. """
        if feature in self.waiting_features: self.waiting_features.remove(feature)

    def _stream_close(self,unregister=1):
        """ Write the closing stream tag and destroy the underlaying socket. Used internally. """
        if self._stream_state>=STREAM__CLOSED: return
        self.set_stream_state(STREAM__CLOSING)
        self.sendnow('</stream:stream>')
        self.set_stream_state(STREAM__CLOSED)
        self.push_queue()       # decompose queue really since STREAM__CLOSED
        self._owner.flush_queues()
        if unregister: self._owner.unregistersession(self)
        self._destroy_socket()

    def terminate_stream(self,error=None,unregister=1):
        """ Notify the peer about stream closure.
            Ensure that xmlstream is not brokes - i.e. if the stream isn't opened yet -
            open it before closure.
            If the error condition is specified than create a stream error and send it along with
            closing stream tag.
            Emulate receiving 'unavailable' type presence just before stream closure.
        """
        if self._stream_state>=STREAM__CLOSING: return
        if self._stream_state<STREAM__OPENED:
            self.set_stream_state(STREAM__CLOSING)
            self._stream_open()
        else:
            self.set_stream_state(STREAM__CLOSING)
            p=Presence(typ='unavailable')
            p.setNamespace(NS_CLIENT)
            self._dispatch(p,trusted=1)
        if error:
            if isinstance(error,Node): self.sendnow(error)
            else: self.sendnow(ErrorNode(error))
        self._stream_close(unregister=unregister)
        if self.slave_session:
            self.slave_session.terminate_stream(STREAM_REMOTE_CONNECTION_FAILED)

    def _destroy_socket(self):
        """ Break cyclic dependancies to let python's GC free memory right now."""
        self.Stream.dispatch=None
        self.Stream.stream_footer_received=None
        self.Stream.stream_header_received=None
        self.Stream.destroy()
        self._sock.close()
        self.set_socket_state(SOCKET_DEAD)

    def start_feature(self,f):
        """ Declare some feature as "negotiating now" to prevent other features from start negotiating. """
        if self.feature_in_process: raise "Starting feature %s over %s !"%(f,self.feature_in_process)
        self.feature_in_process=f

    def stop_feature(self,f):
        """ Declare some feature as "negotiated" to allow other features start negotiating. """
        if self.feature_in_process<>f: raise "Stopping feature %s instead of %s !"%(f,self.feature_in_process)
        self.feature_in_process=None

    def set_socket_state(self,newstate):
        """ Change the underlaying socket state.
            Socket starts with SOCKET_UNCONNECTED state
            and then proceeds (possibly) to SOCKET_ALIVE
            and then to SOCKET_DEAD """
        if self._socket_state<newstate: self._socket_state=newstate

    def set_session_state(self,newstate):
        """ Change the session state.
            Session starts with SESSION_NOT_AUTHED state
            and then comes through 
            SESSION_AUTHED, SESSION_BOUND, SESSION_OPENED and SESSION_CLOSED states.
        """
        if self._session_state<newstate:
            if self._session_state<SESSION_AUTHED and \
               newstate>=SESSION_AUTHED: self._stream_pos_queued=self._stream_pos_sent
            self._session_state=newstate

    def set_stream_state(self,newstate):
        """ Change the underlaying XML stream state
            Stream starts with STREAM__NOT_OPENED and then proceeds with
            STREAM__OPENED, STREAM__CLOSING and STREAM__CLOSED states.
            Note that some features (like TLS and SASL)
            requires stream re-start so this state can have non-linear changes. """
        if self._stream_state<newstate: self._stream_state=newstate