import time
import logging
import unittest
from sleekxmpp.test import SleekTest
from sleekxmpp.xmlstream import ElementBase, register_stanza_plugin
class TestAdHocCommands(SleekTest):
def setUp(self):
self.stream_start(mode='client',
plugins=['xep_0030', 'xep_0004', 'xep_0050'])
# Real session IDs don't make for nice tests, so use
# a dummy value.
self.xmpp['xep_0050'].new_session = lambda: '_sessionid_'
def tearDown(self):
self.stream_close()
def testInitialPayloadCommand(self):
"""Test a command with an initial payload."""
class TestPayload(ElementBase):
name = 'foo'
namespace = 'test'
interfaces = set(['bar'])
plugin_attrib = name
Command = self.xmpp['xep_0050'].stanza.Command
register_stanza_plugin(Command, TestPayload, iterable=True)
def handle_command(iq, session):
initial = session['payload']
logging.debug(initial)
new_payload = TestPayload()
if initial:
new_payload['bar'] = 'Received: %s' % initial['bar']
else:
new_payload['bar'] = 'Failed'
logging.debug(initial)
session['payload'] = new_payload
session['next'] = None
session['has_next'] = False
return session
self.xmpp['xep_0050'].add_command('tester@localhost', 'foo',
'Do Foo', handle_command)
self.recv("""
""")
self.send("""
""")
def testZeroStepCommand(self):
"""Test running a command with no steps."""
def handle_command(iq, session):
form = self.xmpp['xep_0004'].makeForm(ftype='result')
form.addField(var='foo', ftype='text-single',
label='Foo', value='bar')
session['payload'] = form
session['next'] = None
session['has_next'] = False
return session
self.xmpp['xep_0050'].add_command('tester@localhost', 'foo',
'Do Foo', handle_command)
self.recv("""
""")
self.send("""
bar
""")
def testOneStepCommand(self):
"""Test running a single step command."""
results = []
def handle_command(iq, session):
def handle_form(form, session):
results.append(form.get_values()['foo'])
session['payload'] = None
form = self.xmpp['xep_0004'].makeForm('form')
form.addField(var='foo', ftype='text-single', label='Foo')
session['payload'] = form
session['next'] = handle_form
session['has_next'] = False
return session
self.xmpp['xep_0050'].add_command('tester@localhost', 'foo',
'Do Foo', handle_command)
self.recv("""
""")
self.send("""
""")
self.recv("""
blah
""")
self.send("""
""")
self.assertEqual(results, ['blah'],
"Command handler was not executed: %s" % results)
def testTwoStepCommand(self):
"""Test using a two-stage command."""
results = []
def handle_command(iq, session):
def handle_step2(form, session):
results.append(form.get_values()['bar'])
session['payload'] = None
def handle_step1(form, session):
results.append(form.get_values()['foo'])
form = self.xmpp['xep_0004'].makeForm('form')
form.addField(var='bar', ftype='text-single', label='Bar')
session['payload'] = form
session['next'] = handle_step2
session['has_next'] = False
return session
form = self.xmpp['xep_0004'].makeForm('form')
form.addField(var='foo', ftype='text-single', label='Foo')
session['payload'] = form
session['next'] = handle_step1
session['has_next'] = True
return session
self.xmpp['xep_0050'].add_command('tester@localhost', 'foo',
'Do Foo', handle_command)
self.recv("""
""")
self.send("""
""")
self.recv("""
blah
""")
self.send("""
""")
self.recv("""
meh
""")
self.send("""
""")
self.assertEqual(results, ['blah', 'meh'],
"Command handler was not executed: %s" % results)
def testCancelCommand(self):
"""Test canceling command."""
results = []
def handle_command(iq, session):
def handle_form(form, session):
results.append(form['values']['foo'])
def handle_cancel(iq, session):
results.append('canceled')
form = self.xmpp['xep_0004'].makeForm('form')
form.addField(var='foo', ftype='text-single', label='Foo')
session['payload'] = form
session['next'] = handle_form
session['cancel'] = handle_cancel
session['has_next'] = False
return session
self.xmpp['xep_0050'].add_command('tester@localhost', 'foo',
'Do Foo', handle_command)
self.recv("""
""")
self.send("""
""")
self.recv("""
blah
""")
self.send("""
""")
self.assertEqual(results, ['canceled'],
"Cancelation handler not executed: %s" % results)
def testCommandNote(self):
"""Test adding notes to commands."""
def handle_command(iq, session):
form = self.xmpp['xep_0004'].makeForm(ftype='result')
form.addField(var='foo', ftype='text-single',
label='Foo', value='bar')
session['payload'] = form
session['next'] = None
session['has_next'] = False
session['notes'] = [('info', 'testing notes')]
return session
self.xmpp['xep_0050'].add_command('tester@localhost', 'foo',
'Do Foo', handle_command)
self.recv("""
""")
self.send("""
testing notes
bar
""")
def testMultiPayloads(self):
"""Test using commands with multiple payloads."""
results = []
def handle_command(iq, session):
def handle_form(forms, session):
for form in forms:
results.append(form.get_values()['FORM_TYPE'])
session['payload'] = None
form1 = self.xmpp['xep_0004'].makeForm('form')
form1.addField(var='FORM_TYPE', ftype='hidden', value='form_1')
form1.addField(var='foo', ftype='text-single', label='Foo')
form2 = self.xmpp['xep_0004'].makeForm('form')
form2.addField(var='FORM_TYPE', ftype='hidden', value='form_2')
form2.addField(var='foo', ftype='text-single', label='Foo')
session['payload'] = [form1, form2]
session['next'] = handle_form
session['has_next'] = False
return session
self.xmpp['xep_0050'].add_command('tester@localhost', 'foo',
'Do Foo', handle_command)
self.recv("""
""")
self.send("""
form_1
form_2
""")
self.recv("""
form_1
bar
form_2
bar
""")
self.send("""
""")
self.assertEqual(results, [['form_1'], ['form_2']],
"Command handler was not executed: %s" % results)
def testClientAPI(self):
"""Test using client-side API for commands."""
results = []
def handle_complete(iq, session):
for item in session['custom_data']:
results.append(item)
def handle_step2(iq, session):
form = self.xmpp['xep_0004'].makeForm(ftype='submit')
form.addField(var='bar', value='123')
session['custom_data'].append('baz')
session['payload'] = form
session['next'] = handle_complete
self.xmpp['xep_0050'].complete_command(session)
def handle_step1(iq, session):
form = self.xmpp['xep_0004'].makeForm(ftype='submit')
form.addField(var='foo', value='42')
session['custom_data'].append('bar')
session['payload'] = form
session['next'] = handle_step2
self.xmpp['xep_0050'].continue_command(session)
session = {'custom_data': ['foo'],
'next': handle_step1}
self.xmpp['xep_0050'].start_command(
'foo@example.com',
'test_client',
session)
self.send("""
""")
self.recv("""
""")
self.send("""
42
""")
self.recv("""
""")
self.send("""
123
""")
self.recv("""
""")
# Give the event queue time to process
time.sleep(0.3)
self.failUnless(results == ['foo', 'bar', 'baz'],
'Incomplete command workflow: %s' % results)
def testClientAPICancel(self):
"""Test using client-side cancel API for commands."""
results = []
def handle_canceled(iq, session):
for item in session['custom_data']:
results.append(item)
def handle_step1(iq, session):
session['custom_data'].append('bar')
session['next'] = handle_canceled
self.xmpp['xep_0050'].cancel_command(session)
session = {'custom_data': ['foo'],
'next': handle_step1}
self.xmpp['xep_0050'].start_command(
'foo@example.com',
'test_client',
session)
self.send("""
""")
self.recv("""
""")
self.send("""
""")
self.recv("""
""")
# Give the event queue time to process
time.sleep(0.3)
self.failUnless(results == ['foo', 'bar'],
'Incomplete command workflow: %s' % results)
def testClientAPIError(self):
"""Test using client-side error API for commands."""
results = []
def handle_error(iq, session):
for item in session['custom_data']:
results.append(item)
session = {'custom_data': ['foo'],
'error': handle_error}
self.xmpp['xep_0050'].start_command(
'foo@example.com',
'test_client',
session)
self.send("""
""")
self.recv("""
""")
# Give the event queue time to process
time.sleep(0.3)
self.failUnless(results == ['foo'],
'Incomplete command workflow: %s' % results)
def testClientAPIErrorStrippedResponse(self):
"""Test errors that don't include the command substanza."""
results = []
def handle_error(iq, session):
for item in session['custom_data']:
results.append(item)
session = {'custom_data': ['foo'],
'error': handle_error}
self.xmpp['xep_0050'].start_command(
'foo@example.com',
'test_client',
session)
self.send("""
""")
self.recv("""
""")
# Give the event queue time to process
time.sleep(0.3)
self.failUnless(results == ['foo'],
'Incomplete command workflow: %s' % results)
suite = unittest.TestLoader().loadTestsFromTestCase(TestAdHocCommands)