summaryrefslogtreecommitdiff
path: root/src/xmpppy-0.5.0rc1/xmpp/filetransfer.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/xmpppy-0.5.0rc1/xmpp/filetransfer.py')
-rw-r--r--src/xmpppy-0.5.0rc1/xmpp/filetransfer.py199
1 files changed, 199 insertions, 0 deletions
diff --git a/src/xmpppy-0.5.0rc1/xmpp/filetransfer.py b/src/xmpppy-0.5.0rc1/xmpp/filetransfer.py
new file mode 100644
index 00000000..87ddc219
--- /dev/null
+++ b/src/xmpppy-0.5.0rc1/xmpp/filetransfer.py
@@ -0,0 +1,199 @@
+## filetransfer.py
+##
+## Copyright (C) 2004 Alexey "Snake" Nezhdanov
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2, or (at your option)
+## any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+
+# $Id: filetransfer.py,v 1.6 2004/12/25 20:06:59 snakeru Exp $
+
+"""
+This module contains IBB class that is the simple implementation of JEP-0047.
+Note that this is just a transport for data. You have to negotiate data transfer before
+(via StreamInitiation most probably). Unfortunately SI is not implemented yet.
+"""
+
+from protocol import *
+from dispatcher import PlugIn
+import base64
+
+class IBB(PlugIn):
+ """ IBB used to transfer small-sized data chunk over estabilished xmpp connection.
+ Data is split into small blocks (by default 3000 bytes each), encoded as base 64
+ and sent to another entity that compiles these blocks back into the data chunk.
+ This is very inefficiend but should work under any circumstances. Note that
+ using IBB normally should be the last resort.
+ """
+ def __init__(self):
+ """ Initialise internal variables. """
+ PlugIn.__init__(self)
+ self.DBG_LINE='ibb'
+ self._exported_methods=[self.OpenStream]
+ self._streams={}
+ self._ampnode=Node(NS_AMP+' amp',payload=[Node('rule',{'condition':'deliver-at','value':'stored','action':'error'}),Node('rule',{'condition':'match-resource','value':'exact','action':'error'})])
+
+ def plugin(self,owner):
+ """ Register handlers for receiving incoming datastreams. Used internally. """
+ self._owner.RegisterHandlerOnce('iq',self.StreamOpenReplyHandler) # Move to StreamOpen and specify stanza id
+ self._owner.RegisterHandler('iq',self.IqHandler,ns=NS_IBB)
+ self._owner.RegisterHandler('message',self.ReceiveHandler,ns=NS_IBB)
+
+ def IqHandler(self,conn,stanza):
+ """ Handles streams state change. Used internally. """
+ typ=stanza.getType()
+ self.DEBUG('IqHandler called typ->%s'%typ,'info')
+ if typ=='set' and stanza.getTag('open',namespace=NS_IBB): self.StreamOpenHandler(conn,stanza)
+ elif typ=='set' and stanza.getTag('close',namespace=NS_IBB): self.StreamCloseHandler(conn,stanza)
+ elif typ=='result': self.StreamCommitHandler(conn,stanza)
+ elif typ=='error': self.StreamOpenReplyHandler(conn,stanza)
+ else: conn.send(Error(stanza,ERR_BAD_REQUEST))
+ raise NodeProcessed
+
+ def StreamOpenHandler(self,conn,stanza):
+ """ Handles opening of new incoming stream. Used internally. """
+ """
+<iq type='set'
+ from='romeo@montague.net/orchard'
+ to='juliet@capulet.com/balcony'
+ id='inband_1'>
+ <open sid='mySID'
+ block-size='4096'
+ xmlns='http://jabber.org/protocol/ibb'/>
+</iq>
+"""
+ err=None
+ sid,blocksize=stanza.getTagAttr('open','sid'),stanza.getTagAttr('open','block-size')
+ self.DEBUG('StreamOpenHandler called sid->%s blocksize->%s'%(sid,blocksize),'info')
+ try: blocksize=int(blocksize)
+ except: err=ERR_BAD_REQUEST
+ if not sid or not blocksize: err=ERR_BAD_REQUEST
+ elif sid in self._streams.keys(): err=ERR_UNEXPECTED_REQUEST
+ if err: rep=Error(stanza,err)
+ else:
+ self.DEBUG("Opening stream: id %s, block-size %s"%(sid,blocksize),'info')
+ rep=Protocol('iq',stanza.getFrom(),'result',stanza.getTo(),{'id':stanza.getID()})
+ self._streams[sid]={'direction':'<'+str(stanza.getFrom()),'block-size':blocksize,'fp':open('/tmp/xmpp_file_'+sid,'w'),'seq':0,'syn_id':stanza.getID()}
+ conn.send(rep)
+
+ def OpenStream(self,sid,to,fp,blocksize=3000):
+ """ Start new stream. You should provide stream id 'sid', the endpoind jid 'to',
+ the file object containing info for send 'fp'. Also the desired blocksize can be specified.
+ Take into account that recommended stanza size is 4k and IBB uses base64 encoding
+ that increases size of data by 1/3."""
+ if sid in self._streams.keys(): return
+ if not JID(to).getResource(): return
+ self._streams[sid]={'direction':'|>'+to,'block-size':blocksize,'fp':fp,'seq':0}
+ self._owner.RegisterCycleHandler(self.SendHandler)
+ syn=Protocol('iq',to,'set',payload=[Node(NS_IBB+' open',{'sid':sid,'block-size':blocksize})])
+ self._owner.send(syn)
+ self._streams[sid]['syn_id']=syn.getID()
+ return self._streams[sid]
+
+ def SendHandler(self,conn):
+ """ Send next portion of data if it is time to do it. Used internally. """
+ self.DEBUG('SendHandler called','info')
+ for sid in self._streams.keys():
+ stream=self._streams[sid]
+ if stream['direction'][:2]=='|>': cont=1
+ elif stream['direction'][0]=='>':
+ chunk=stream['fp'].read(stream['block-size'])
+ if chunk:
+ datanode=Node(NS_IBB+' data',{'sid':sid,'seq':stream['seq']},base64.encodestring(chunk))
+ stream['seq']+=1
+ if stream['seq']==65536: stream['seq']=0
+ conn.send(Protocol('message',stream['direction'][1:],payload=[datanode,self._ampnode]))
+ else:
+ """ notify the other side about stream closing
+ notify the local user about sucessfull send
+ delete the local stream"""
+ conn.send(Protocol('iq',stream['direction'][1:],'set',payload=[Node(NS_IBB+' close',{'sid':sid})]))
+ conn.Event(self.DBG_LINE,'SUCCESSFULL SEND',stream)
+ del self._streams[sid]
+ self._owner.UnregisterCycleHandler(self.SendHandler)
+
+ """
+<message from='romeo@montague.net/orchard' to='juliet@capulet.com/balcony' id='msg1'>
+ <data xmlns='http://jabber.org/protocol/ibb' sid='mySID' seq='0'>
+ qANQR1DBwU4DX7jmYZnncmUQB/9KuKBddzQH+tZ1ZywKK0yHKnq57kWq+RFtQdCJ
+ WpdWpR0uQsuJe7+vh3NWn59/gTc5MDlX8dS9p0ovStmNcyLhxVgmqS8ZKhsblVeu
+ IpQ0JgavABqibJolc3BKrVtVV1igKiX/N7Pi8RtY1K18toaMDhdEfhBRzO/XB0+P
+ AQhYlRjNacGcslkhXqNjK5Va4tuOAPy2n1Q8UUrHbUd0g+xJ9Bm0G0LZXyvCWyKH
+ kuNEHFQiLuCY6Iv0myq6iX6tjuHehZlFSh80b5BVV9tNLwNR5Eqz1klxMhoghJOA
+ </data>
+ <amp xmlns='http://jabber.org/protocol/amp'>
+ <rule condition='deliver-at' value='stored' action='error'/>
+ <rule condition='match-resource' value='exact' action='error'/>
+ </amp>
+</message>
+"""
+
+ def ReceiveHandler(self,conn,stanza):
+ """ Receive next portion of incoming datastream and store it write
+ it to temporary file. Used internally.
+ """
+ sid,seq,data=stanza.getTagAttr('data','sid'),stanza.getTagAttr('data','seq'),stanza.getTagData('data')
+ self.DEBUG('ReceiveHandler called sid->%s seq->%s'%(sid,seq),'info')
+ try: seq=int(seq); data=base64.decodestring(data)
+ except: seq=''; data=''
+ err=None
+ if not sid in self._streams.keys(): err=ERR_ITEM_NOT_FOUND
+ else:
+ stream=self._streams[sid]
+ if not data: err=ERR_BAD_REQUEST
+ elif seq<>stream['seq']: err=ERR_UNEXPECTED_REQUEST
+ else:
+ self.DEBUG('Successfull receive sid->%s %s+%s bytes'%(sid,stream['fp'].tell(),len(data)),'ok')
+ stream['seq']+=1
+ stream['fp'].write(data)
+ if err:
+ self.DEBUG('Error on receive: %s'%err,'error')
+ conn.send(Error(Iq(to=stanza.getFrom(),frm=stanza.getTo(),payload=[Node(NS_IBB+' close')]),err,reply=0))
+
+ def StreamCloseHandler(self,conn,stanza):
+ """ Handle stream closure due to all data transmitted.
+ Raise xmpppy event specifying successfull data receive. """
+ sid=stanza.getTagAttr('close','sid')
+ self.DEBUG('StreamCloseHandler called sid->%s'%sid,'info')
+ if sid in self._streams.keys():
+ conn.send(stanza.buildReply('result'))
+ conn.Event(self.DBG_LINE,'SUCCESSFULL RECEIVE',self._streams[sid])
+ del self._streams[sid]
+ else: conn.send(Error(stanza,ERR_ITEM_NOT_FOUND))
+
+ def StreamBrokenHandler(self,conn,stanza):
+ """ Handle stream closure due to all some error while receiving data.
+ Raise xmpppy event specifying unsuccessfull data receive. """
+ syn_id=stanza.getID()
+ self.DEBUG('StreamBrokenHandler called syn_id->%s'%syn_id,'info')
+ for sid in self._streams.keys():
+ stream=self._streams[sid]
+ if stream['syn_id']==syn_id:
+ if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream)
+ else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream)
+ del self._streams[sid]
+
+ def StreamOpenReplyHandler(self,conn,stanza):
+ """ Handle remote side reply about is it agree or not to receive our datastream.
+ Used internally. Raises xmpppy event specfiying if the data transfer
+ is agreed upon."""
+ syn_id=stanza.getID()
+ self.DEBUG('StreamOpenReplyHandler called syn_id->%s'%syn_id,'info')
+ for sid in self._streams.keys():
+ stream=self._streams[sid]
+ if stream['syn_id']==syn_id:
+ if stanza.getType()=='error':
+ if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream)
+ else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream)
+ del self._streams[sid]
+ elif stanza.getType()=='result':
+ if stream['direction'][0]=='|':
+ stream['direction']=stream['direction'][1:]
+ conn.Event(self.DBG_LINE,'STREAM COMMITTED',stream)
+ else: conn.send(Error(stanza,ERR_UNEXPECTED_REQUEST))