From a3d111be12144d9dea80165d84cd8168714d3d54 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Wed, 23 Mar 2011 23:00:41 -0400 Subject: Added new XEP-0050 implementation. Backward incompatibility alert! Please see examples/adhoc_provider.py for how to use the new plugin implementation, or the test examples in the files tests/test_stream_xep_0050.py and tests/test_stanza_xep_0050.py. Major changes: - May now have zero-step commands. Useful if a command is intended to be a dynamic status report that doesn't require any user input. - May use payloads other than data forms, such as a completely custom stanza type. - May include multiple payload items, such as multiple data forms, or a form and a custom stanza type. - Includes a command user API for calling adhoc commands on remote agents and managing the workflow. - Added support for note elements. Todo: - Add prev action support. You may use register_plugin('old_0050') to continue using the previous XEP-0050 implementation. --- tests/test_stream_xep_0050.py | 686 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 686 insertions(+) create mode 100644 tests/test_stream_xep_0050.py (limited to 'tests/test_stream_xep_0050.py') diff --git a/tests/test_stream_xep_0050.py b/tests/test_stream_xep_0050.py new file mode 100644 index 00000000..11b293c8 --- /dev/null +++ b/tests/test_stream_xep_0050.py @@ -0,0 +1,686 @@ +import time +import threading + +from sleekxmpp.test import * + + +class TestAdHocCommands(SleekTest): + + def setUp(self): + self.stream_start(mode='client', + plugins=['xep_0030', 'xep_0004', 'xep_0050']) + + # Real session IDs don't make for nice tests, so use + # a dummy value. + self.xmpp['xep_0050'].new_session = lambda: '_sessionid_' + + def tearDown(self): + self.stream_close() + + def testZeroStepCommand(self): + """Test running a command with no steps.""" + + def handle_command(iq, session): + form = self.xmpp['xep_0004'].makeForm(ftype='result') + form.addField(var='foo', ftype='text-single', + label='Foo', value='bar') + + session['payload'] = form + session['next'] = None + session['has_next'] = False + + return session + + self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', + 'Do Foo', handle_command) + + self.recv(""" + + + + """) + + self.send(""" + + + + + bar + + + + + """) + + def testOneStepCommand(self): + """Test running a single step command.""" + results = [] + + def handle_command(iq, session): + + def handle_form(form, session): + results.append(form['values']['foo']) + + form = self.xmpp['xep_0004'].makeForm('form') + form.addField(var='foo', ftype='text-single', label='Foo') + + session['payload'] = form + session['next'] = handle_form + session['has_next'] = False + + return session + + self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', + 'Do Foo', handle_command) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + + + + + + """) + + self.recv(""" + + + + + blah + + + + + """) + + self.send(""" + + + + """) + + self.assertEqual(results, ['blah'], + "Command handler was not executed: %s" % results) + + def testTwoStepCommand(self): + """Test using a two-stage command.""" + results = [] + + def handle_command(iq, session): + + def handle_step2(form, session): + results.append(form['values']['bar']) + + def handle_step1(form, session): + results.append(form['values']['foo']) + + form = self.xmpp['xep_0004'].makeForm('form') + form.addField(var='bar', ftype='text-single', label='Bar') + + session['payload'] = form + session['next'] = handle_step2 + session['has_next'] = False + + return session + + form = self.xmpp['xep_0004'].makeForm('form') + form.addField(var='foo', ftype='text-single', label='Foo') + + session['payload'] = form + session['next'] = handle_step1 + session['has_next'] = True + + return session + + self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', + 'Do Foo', handle_command) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + + + + + + """) + + self.recv(""" + + + + + blah + + + + + """) + + self.send(""" + + + + + + + + + + + """) + + self.recv(""" + + + + + meh + + + + + """) + self.send(""" + + + + """) + + self.assertEqual(results, ['blah', 'meh'], + "Command handler was not executed: %s" % results) + + def testCancelCommand(self): + """Test canceling command.""" + results = [] + + def handle_command(iq, session): + + def handle_form(form, session): + results.append(form['values']['foo']) + + def handle_cancel(iq, session): + results.append('canceled') + + form = self.xmpp['xep_0004'].makeForm('form') + form.addField(var='foo', ftype='text-single', label='Foo') + + session['payload'] = form + session['next'] = handle_form + session['cancel'] = handle_cancel + session['has_next'] = False + + return session + + self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', + 'Do Foo', handle_command) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + + + + + + """) + + self.recv(""" + + + + + blah + + + + + """) + + self.send(""" + + + + """) + + self.assertEqual(results, ['canceled'], + "Cancelation handler not executed: %s" % results) + + def testCommandNote(self): + """Test adding notes to commands.""" + + def handle_command(iq, session): + form = self.xmpp['xep_0004'].makeForm(ftype='result') + form.addField(var='foo', ftype='text-single', + label='Foo', value='bar') + + session['payload'] = form + session['next'] = None + session['has_next'] = False + session['notes'] = [('info', 'testing notes')] + + return session + + self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', + 'Do Foo', handle_command) + + self.recv(""" + + + + """) + + self.send(""" + + + testing notes + + + bar + + + + + """) + + + + def testMultiPayloads(self): + """Test using commands with multiple payloads.""" + results = [] + + def handle_command(iq, session): + + def handle_form(forms, session): + for form in forms: + results.append(form['values']['FORM_TYPE']) + + form1 = self.xmpp['xep_0004'].makeForm('form') + form1.addField(var='FORM_TYPE', ftype='hidden', value='form_1') + form1.addField(var='foo', ftype='text-single', label='Foo') + + form2 = self.xmpp['xep_0004'].makeForm('form') + form2.addField(var='FORM_TYPE', ftype='hidden', value='form_2') + form2.addField(var='foo', ftype='text-single', label='Foo') + + session['payload'] = [form1, form2] + session['next'] = handle_form + session['has_next'] = False + + return session + + self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', + 'Do Foo', handle_command) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + + + form_1 + + + + + + form_2 + + + + + + """) + + self.recv(""" + + + + + form_1 + + + bar + + + + + form_2 + + + bar + + + + + """) + + self.send(""" + + + + """) + + self.assertEqual(results, [['form_1'], ['form_2']], + "Command handler was not executed: %s" % results) + + def testClientAPI(self): + """Test using client-side API for commands.""" + results = [] + + def handle_complete(iq, session): + for item in session['custom_data']: + results.append(item) + + def handle_step2(iq, session): + form = self.xmpp['xep_0004'].makeForm(ftype='submit') + form.addField(var='bar', value='123') + + session['custom_data'].append('baz') + session['payload'] = form + session['next'] = handle_complete + self.xmpp['xep_0050'].complete_command(session) + + def handle_step1(iq, session): + form = self.xmpp['xep_0004'].makeForm(ftype='submit') + form.addField(var='foo', value='42') + + session['custom_data'].append('bar') + session['payload'] = form + session['next'] = handle_step2 + self.xmpp['xep_0050'].continue_command(session) + + session = {'custom_data': ['foo'], + 'next': handle_step1} + + self.xmpp['xep_0050'].start_command( + 'foo@example.com', + 'test_client', + session) + + self.send(""" + + + + """) + + self.recv(""" + + + + + + + + """) + + self.send(""" + + + + + 42 + + + + + """) + + self.recv(""" + + + + + + + + """) + + self.send(""" + + + + + 123 + + + + + """) + + self.recv(""" + + + + """) + + # Give the event queue time to process + time.sleep(0.3) + + self.failUnless(results == ['foo', 'bar', 'baz'], + 'Incomplete command workflow: %s' % results) + + def testClientAPICancel(self): + """Test using client-side cancel API for commands.""" + results = [] + + def handle_canceled(iq, session): + for item in session['custom_data']: + results.append(item) + + def handle_step1(iq, session): + session['custom_data'].append('bar') + session['next'] = handle_canceled + self.xmpp['xep_0050'].cancel_command(session) + + session = {'custom_data': ['foo'], + 'next': handle_step1} + + self.xmpp['xep_0050'].start_command( + 'foo@example.com', + 'test_client', + session) + + self.send(""" + + + + """) + + self.recv(""" + + + + + + + + """) + + self.send(""" + + + + """) + + self.recv(""" + + + + """) + + # Give the event queue time to process + time.sleep(0.3) + + self.failUnless(results == ['foo', 'bar'], + 'Incomplete command workflow: %s' % results) + + def testClientAPIError(self): + """Test using client-side error API for commands.""" + results = [] + + def handle_error(iq, session): + for item in session['custom_data']: + results.append(item) + + session = {'custom_data': ['foo'], + 'error': handle_error} + + self.xmpp['xep_0050'].start_command( + 'foo@example.com', + 'test_client', + session) + + self.send(""" + + + + """) + + self.recv(""" + + + + + + + """) + + # Give the event queue time to process + time.sleep(0.3) + + self.failUnless(results == ['foo'], + 'Incomplete command workflow: %s' % results) + + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestAdHocCommands) -- cgit v1.2.3