summaryrefslogtreecommitdiff
path: root/src/xmpppy-0.5.0rc1/build/lib/xmpp/filetransfer.py
blob: 87ddc2196939be84df0b115ce3042520c81909af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
##   filetransfer.py 
##
##   Copyright (C) 2004 Alexey "Snake" Nezhdanov
##
##   This program is free software; you can redistribute it and/or modify
##   it under the terms of the GNU General Public License as published by
##   the Free Software Foundation; either version 2, or (at your option)
##   any later version.
##
##   This program is distributed in the hope that it will be useful,
##   but WITHOUT ANY WARRANTY; without even the implied warranty of
##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
##   GNU General Public License for more details.

# $Id: filetransfer.py,v 1.6 2004/12/25 20:06:59 snakeru Exp $

"""
This module contains IBB class that is the simple implementation of JEP-0047.
Note that this is just a transport for data. You have to negotiate data transfer before
(via StreamInitiation most probably). Unfortunately SI is not implemented yet.
"""

from protocol import *
from dispatcher import PlugIn
import base64

class IBB(PlugIn):
    """ IBB used to transfer small-sized data chunk over estabilished xmpp connection.
        Data is split into small blocks (by default 3000 bytes each), encoded as base 64
        and sent to another entity that compiles these blocks back into the data chunk.
        This is very inefficiend but should work under any circumstances. Note that 
        using IBB normally should be the last resort.
    """
    def __init__(self):
        """ Initialise internal variables. """
        PlugIn.__init__(self)
        self.DBG_LINE='ibb'
        self._exported_methods=[self.OpenStream]
        self._streams={}
        self._ampnode=Node(NS_AMP+' amp',payload=[Node('rule',{'condition':'deliver-at','value':'stored','action':'error'}),Node('rule',{'condition':'match-resource','value':'exact','action':'error'})])

    def plugin(self,owner):
        """ Register handlers for receiving incoming datastreams. Used internally. """
        self._owner.RegisterHandlerOnce('iq',self.StreamOpenReplyHandler) # Move to StreamOpen and specify stanza id
        self._owner.RegisterHandler('iq',self.IqHandler,ns=NS_IBB)
        self._owner.RegisterHandler('message',self.ReceiveHandler,ns=NS_IBB)

    def IqHandler(self,conn,stanza):
        """ Handles streams state change. Used internally. """
        typ=stanza.getType()
        self.DEBUG('IqHandler called typ->%s'%typ,'info')
        if typ=='set' and stanza.getTag('open',namespace=NS_IBB): self.StreamOpenHandler(conn,stanza)
        elif typ=='set' and stanza.getTag('close',namespace=NS_IBB): self.StreamCloseHandler(conn,stanza)
        elif typ=='result': self.StreamCommitHandler(conn,stanza)
        elif typ=='error': self.StreamOpenReplyHandler(conn,stanza)
        else: conn.send(Error(stanza,ERR_BAD_REQUEST))
        raise NodeProcessed

    def StreamOpenHandler(self,conn,stanza):
        """ Handles opening of new incoming stream. Used internally. """
        """
<iq type='set' 
    from='romeo@montague.net/orchard'
    to='juliet@capulet.com/balcony'
    id='inband_1'>
  <open sid='mySID' 
        block-size='4096'
        xmlns='http://jabber.org/protocol/ibb'/>
</iq>
"""
        err=None
        sid,blocksize=stanza.getTagAttr('open','sid'),stanza.getTagAttr('open','block-size')
        self.DEBUG('StreamOpenHandler called sid->%s blocksize->%s'%(sid,blocksize),'info')
        try: blocksize=int(blocksize)
        except: err=ERR_BAD_REQUEST
        if not sid or not blocksize: err=ERR_BAD_REQUEST
        elif sid in self._streams.keys(): err=ERR_UNEXPECTED_REQUEST
        if err: rep=Error(stanza,err)
        else:
            self.DEBUG("Opening stream: id %s, block-size %s"%(sid,blocksize),'info')
            rep=Protocol('iq',stanza.getFrom(),'result',stanza.getTo(),{'id':stanza.getID()})
            self._streams[sid]={'direction':'<'+str(stanza.getFrom()),'block-size':blocksize,'fp':open('/tmp/xmpp_file_'+sid,'w'),'seq':0,'syn_id':stanza.getID()}
        conn.send(rep)

    def OpenStream(self,sid,to,fp,blocksize=3000):
        """ Start new stream. You should provide stream id 'sid', the endpoind jid 'to',
            the file object containing info for send 'fp'. Also the desired blocksize can be specified.
            Take into account that recommended stanza size is 4k and IBB uses base64 encoding
            that increases size of data by 1/3."""
        if sid in self._streams.keys(): return
        if not JID(to).getResource(): return
        self._streams[sid]={'direction':'|>'+to,'block-size':blocksize,'fp':fp,'seq':0}
        self._owner.RegisterCycleHandler(self.SendHandler)
        syn=Protocol('iq',to,'set',payload=[Node(NS_IBB+' open',{'sid':sid,'block-size':blocksize})])
        self._owner.send(syn)
        self._streams[sid]['syn_id']=syn.getID()
        return self._streams[sid]

    def SendHandler(self,conn):
        """ Send next portion of data if it is time to do it. Used internally. """
        self.DEBUG('SendHandler called','info')
        for sid in self._streams.keys():
            stream=self._streams[sid]
            if stream['direction'][:2]=='|>': cont=1
            elif stream['direction'][0]=='>':
                chunk=stream['fp'].read(stream['block-size'])
                if chunk:
                    datanode=Node(NS_IBB+' data',{'sid':sid,'seq':stream['seq']},base64.encodestring(chunk))
                    stream['seq']+=1
                    if stream['seq']==65536: stream['seq']=0
                    conn.send(Protocol('message',stream['direction'][1:],payload=[datanode,self._ampnode]))
                else:
                    """ notify the other side about stream closing
                        notify the local user about sucessfull send
                        delete the local stream"""
                    conn.send(Protocol('iq',stream['direction'][1:],'set',payload=[Node(NS_IBB+' close',{'sid':sid})]))
                    conn.Event(self.DBG_LINE,'SUCCESSFULL SEND',stream)
                    del self._streams[sid]
                    self._owner.UnregisterCycleHandler(self.SendHandler)

                    """
<message from='romeo@montague.net/orchard' to='juliet@capulet.com/balcony' id='msg1'>
  <data xmlns='http://jabber.org/protocol/ibb' sid='mySID' seq='0'>
    qANQR1DBwU4DX7jmYZnncmUQB/9KuKBddzQH+tZ1ZywKK0yHKnq57kWq+RFtQdCJ
    WpdWpR0uQsuJe7+vh3NWn59/gTc5MDlX8dS9p0ovStmNcyLhxVgmqS8ZKhsblVeu
    IpQ0JgavABqibJolc3BKrVtVV1igKiX/N7Pi8RtY1K18toaMDhdEfhBRzO/XB0+P
    AQhYlRjNacGcslkhXqNjK5Va4tuOAPy2n1Q8UUrHbUd0g+xJ9Bm0G0LZXyvCWyKH
    kuNEHFQiLuCY6Iv0myq6iX6tjuHehZlFSh80b5BVV9tNLwNR5Eqz1klxMhoghJOA
  </data>
  <amp xmlns='http://jabber.org/protocol/amp'>
    <rule condition='deliver-at' value='stored' action='error'/>
    <rule condition='match-resource' value='exact' action='error'/>
  </amp>
</message>
"""

    def ReceiveHandler(self,conn,stanza):
        """ Receive next portion of incoming datastream and store it write
            it to temporary file. Used internally.
        """
        sid,seq,data=stanza.getTagAttr('data','sid'),stanza.getTagAttr('data','seq'),stanza.getTagData('data')
        self.DEBUG('ReceiveHandler called sid->%s seq->%s'%(sid,seq),'info')
        try: seq=int(seq); data=base64.decodestring(data)
        except: seq=''; data=''
        err=None
        if not sid in self._streams.keys(): err=ERR_ITEM_NOT_FOUND
        else:
            stream=self._streams[sid]
            if not data: err=ERR_BAD_REQUEST
            elif seq<>stream['seq']: err=ERR_UNEXPECTED_REQUEST
            else:
                self.DEBUG('Successfull receive sid->%s %s+%s bytes'%(sid,stream['fp'].tell(),len(data)),'ok')
                stream['seq']+=1
                stream['fp'].write(data)
        if err:
            self.DEBUG('Error on receive: %s'%err,'error')
            conn.send(Error(Iq(to=stanza.getFrom(),frm=stanza.getTo(),payload=[Node(NS_IBB+' close')]),err,reply=0))

    def StreamCloseHandler(self,conn,stanza):
        """ Handle stream closure due to all data transmitted.
            Raise xmpppy event specifying successfull data receive. """
        sid=stanza.getTagAttr('close','sid')
        self.DEBUG('StreamCloseHandler called sid->%s'%sid,'info')
        if sid in self._streams.keys():
            conn.send(stanza.buildReply('result'))
            conn.Event(self.DBG_LINE,'SUCCESSFULL RECEIVE',self._streams[sid])
            del self._streams[sid]
        else: conn.send(Error(stanza,ERR_ITEM_NOT_FOUND))

    def StreamBrokenHandler(self,conn,stanza):
        """ Handle stream closure due to all some error while receiving data.
            Raise xmpppy event specifying unsuccessfull data receive. """
        syn_id=stanza.getID()
        self.DEBUG('StreamBrokenHandler called syn_id->%s'%syn_id,'info')
        for sid in self._streams.keys():
            stream=self._streams[sid]
            if stream['syn_id']==syn_id:
                if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream)
                else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream)
                del self._streams[sid]

    def StreamOpenReplyHandler(self,conn,stanza):
        """ Handle remote side reply about is it agree or not to receive our datastream.
            Used internally. Raises xmpppy event specfiying if the data transfer
            is agreed upon."""
        syn_id=stanza.getID()
        self.DEBUG('StreamOpenReplyHandler called syn_id->%s'%syn_id,'info')
        for sid in self._streams.keys():
            stream=self._streams[sid]
            if stream['syn_id']==syn_id:
                if stanza.getType()=='error':
                    if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream)
                    else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream)
                    del self._streams[sid]
                elif stanza.getType()=='result':
                    if stream['direction'][0]=='|':
                        stream['direction']=stream['direction'][1:]
                        conn.Event(self.DBG_LINE,'STREAM COMMITTED',stream)
                    else: conn.send(Error(stanza,ERR_UNEXPECTED_REQUEST))