import time import logging import unittest from sleekxmpp.test import SleekTest from sleekxmpp.xmlstream import ElementBase, register_stanza_plugin class TestAdHocCommands(SleekTest): def setUp(self): self.stream_start(mode='client', plugins=['xep_0030', 'xep_0004', 'xep_0050']) # Real session IDs don't make for nice tests, so use # a dummy value. self.xmpp['xep_0050'].new_session = lambda: '_sessionid_' def tearDown(self): self.stream_close() def testInitialPayloadCommand(self): """Test a command with an initial payload.""" class TestPayload(ElementBase): name = 'foo' namespace = 'test' interfaces = set(['bar']) plugin_attrib = name Command = self.xmpp['xep_0050'].stanza.Command register_stanza_plugin(Command, TestPayload, iterable=True) def handle_command(iq, session): initial = session['payload'] logging.debug(initial) new_payload = TestPayload() if initial: new_payload['bar'] = 'Received: %s' % initial['bar'] else: new_payload['bar'] = 'Failed' logging.debug(initial) session['payload'] = new_payload session['next'] = None session['has_next'] = False return session self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', 'Do Foo', handle_command) self.recv(""" """) self.send(""" """) def testZeroStepCommand(self): """Test running a command with no steps.""" def handle_command(iq, session): form = self.xmpp['xep_0004'].makeForm(ftype='result') form.addField(var='foo', ftype='text-single', label='Foo', value='bar') session['payload'] = form session['next'] = None session['has_next'] = False return session self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', 'Do Foo', handle_command) self.recv(""" """) self.send(""" bar """) def testOneStepCommand(self): """Test running a single step command.""" results = [] def handle_command(iq, session): def handle_form(form, session): results.append(form.get_values()['foo']) session['payload'] = None form = self.xmpp['xep_0004'].makeForm('form') form.addField(var='foo', ftype='text-single', label='Foo') session['payload'] = form session['next'] = handle_form session['has_next'] = False return session self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', 'Do Foo', handle_command) self.recv(""" """) self.send(""" """) self.recv(""" blah """) self.send(""" """) self.assertEqual(results, ['blah'], "Command handler was not executed: %s" % results) def testTwoStepCommand(self): """Test using a two-stage command.""" results = [] def handle_command(iq, session): def handle_step2(form, session): results.append(form.get_values()['bar']) session['payload'] = None def handle_step1(form, session): results.append(form.get_values()['foo']) form = self.xmpp['xep_0004'].makeForm('form') form.addField(var='bar', ftype='text-single', label='Bar') session['payload'] = form session['next'] = handle_step2 session['has_next'] = False return session form = self.xmpp['xep_0004'].makeForm('form') form.addField(var='foo', ftype='text-single', label='Foo') session['payload'] = form session['next'] = handle_step1 session['has_next'] = True return session self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', 'Do Foo', handle_command) self.recv(""" """) self.send(""" """) self.recv(""" blah """) self.send(""" """) self.recv(""" meh """) self.send(""" """) self.assertEqual(results, ['blah', 'meh'], "Command handler was not executed: %s" % results) def testCancelCommand(self): """Test canceling command.""" results = [] def handle_command(iq, session): def handle_form(form, session): results.append(form['values']['foo']) def handle_cancel(iq, session): results.append('canceled') form = self.xmpp['xep_0004'].makeForm('form') form.addField(var='foo', ftype='text-single', label='Foo') session['payload'] = form session['next'] = handle_form session['cancel'] = handle_cancel session['has_next'] = False return session self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', 'Do Foo', handle_command) self.recv(""" """) self.send(""" """) self.recv(""" blah """) self.send(""" """) self.assertEqual(results, ['canceled'], "Cancelation handler not executed: %s" % results) def testCommandNote(self): """Test adding notes to commands.""" def handle_command(iq, session): form = self.xmpp['xep_0004'].makeForm(ftype='result') form.addField(var='foo', ftype='text-single', label='Foo', value='bar') session['payload'] = form session['next'] = None session['has_next'] = False session['notes'] = [('info', 'testing notes')] return session self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', 'Do Foo', handle_command) self.recv(""" """) self.send(""" testing notes bar """) def testMultiPayloads(self): """Test using commands with multiple payloads.""" results = [] def handle_command(iq, session): def handle_form(forms, session): for form in forms: results.append(form.get_values()['FORM_TYPE']) session['payload'] = None form1 = self.xmpp['xep_0004'].makeForm('form') form1.addField(var='FORM_TYPE', ftype='hidden', value='form_1') form1.addField(var='foo', ftype='text-single', label='Foo') form2 = self.xmpp['xep_0004'].makeForm('form') form2.addField(var='FORM_TYPE', ftype='hidden', value='form_2') form2.addField(var='foo', ftype='text-single', label='Foo') session['payload'] = [form1, form2] session['next'] = handle_form session['has_next'] = False return session self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', 'Do Foo', handle_command) self.recv(""" """) self.send(""" form_1 form_2 """) self.recv(""" form_1 bar form_2 bar """) self.send(""" """) self.assertEqual(results, [['form_1'], ['form_2']], "Command handler was not executed: %s" % results) def testClientAPI(self): """Test using client-side API for commands.""" results = [] def handle_complete(iq, session): for item in session['custom_data']: results.append(item) def handle_step2(iq, session): form = self.xmpp['xep_0004'].makeForm(ftype='submit') form.addField(var='bar', value='123') session['custom_data'].append('baz') session['payload'] = form session['next'] = handle_complete self.xmpp['xep_0050'].complete_command(session) def handle_step1(iq, session): form = self.xmpp['xep_0004'].makeForm(ftype='submit') form.addField(var='foo', value='42') session['custom_data'].append('bar') session['payload'] = form session['next'] = handle_step2 self.xmpp['xep_0050'].continue_command(session) session = {'custom_data': ['foo'], 'next': handle_step1} self.xmpp['xep_0050'].start_command( 'foo@example.com', 'test_client', session) self.send(""" """) self.recv(""" """) self.send(""" 42 """) self.recv(""" """) self.send(""" 123 """) self.recv(""" """) # Give the event queue time to process time.sleep(0.3) self.failUnless(results == ['foo', 'bar', 'baz'], 'Incomplete command workflow: %s' % results) def testClientAPICancel(self): """Test using client-side cancel API for commands.""" results = [] def handle_canceled(iq, session): for item in session['custom_data']: results.append(item) def handle_step1(iq, session): session['custom_data'].append('bar') session['next'] = handle_canceled self.xmpp['xep_0050'].cancel_command(session) session = {'custom_data': ['foo'], 'next': handle_step1} self.xmpp['xep_0050'].start_command( 'foo@example.com', 'test_client', session) self.send(""" """) self.recv(""" """) self.send(""" """) self.recv(""" """) # Give the event queue time to process time.sleep(0.3) self.failUnless(results == ['foo', 'bar'], 'Incomplete command workflow: %s' % results) def testClientAPIError(self): """Test using client-side error API for commands.""" results = [] def handle_error(iq, session): for item in session['custom_data']: results.append(item) session = {'custom_data': ['foo'], 'error': handle_error} self.xmpp['xep_0050'].start_command( 'foo@example.com', 'test_client', session) self.send(""" """) self.recv(""" """) # Give the event queue time to process time.sleep(0.3) self.failUnless(results == ['foo'], 'Incomplete command workflow: %s' % results) def testClientAPIErrorStrippedResponse(self): """Test errors that don't include the command substanza.""" results = [] def handle_error(iq, session): for item in session['custom_data']: results.append(item) session = {'custom_data': ['foo'], 'error': handle_error} self.xmpp['xep_0050'].start_command( 'foo@example.com', 'test_client', session) self.send(""" """) self.recv(""" """) # Give the event queue time to process time.sleep(0.3) self.failUnless(results == ['foo'], 'Incomplete command workflow: %s' % results) suite = unittest.TestLoader().loadTestsFromTestCase(TestAdHocCommands)