From a3d111be12144d9dea80165d84cd8168714d3d54 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Wed, 23 Mar 2011 23:00:41 -0400 Subject: Added new XEP-0050 implementation. Backward incompatibility alert! Please see examples/adhoc_provider.py for how to use the new plugin implementation, or the test examples in the files tests/test_stream_xep_0050.py and tests/test_stanza_xep_0050.py. Major changes: - May now have zero-step commands. Useful if a command is intended to be a dynamic status report that doesn't require any user input. - May use payloads other than data forms, such as a completely custom stanza type. - May include multiple payload items, such as multiple data forms, or a form and a custom stanza type. - Includes a command user API for calling adhoc commands on remote agents and managing the workflow. - Added support for note elements. Todo: - Add prev action support. You may use register_plugin('old_0050') to continue using the previous XEP-0050 implementation. --- tests/test_stanza_xep_0050.py | 114 +++++++ tests/test_stream_xep_0050.py | 686 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 800 insertions(+) create mode 100644 tests/test_stanza_xep_0050.py create mode 100644 tests/test_stream_xep_0050.py (limited to 'tests') diff --git a/tests/test_stanza_xep_0050.py b/tests/test_stanza_xep_0050.py new file mode 100644 index 00000000..ae584de4 --- /dev/null +++ b/tests/test_stanza_xep_0050.py @@ -0,0 +1,114 @@ +from sleekxmpp import Iq +from sleekxmpp.test import * +from sleekxmpp.plugins.xep_0050 import Command + + +class TestAdHocCommandStanzas(SleekTest): + + def setUp(self): + register_stanza_plugin(Iq, Command) + + def testAction(self): + """Test using the action attribute.""" + iq = self.Iq() + iq['type'] = 'set' + iq['command']['node'] = 'foo' + + iq['command']['action'] = 'execute' + self.failUnless(iq['command']['action'] == 'execute') + + iq['command']['action'] = 'complete' + self.failUnless(iq['command']['action'] == 'complete') + + iq['command']['action'] = 'cancel' + self.failUnless(iq['command']['action'] == 'cancel') + + def testSetActions(self): + """Test setting next actions in a command stanza.""" + iq = self.Iq() + iq['type'] = 'result' + iq['command']['node'] = 'foo' + iq['command']['actions'] = ['prev', 'next'] + + self.check(iq, """ + + + + + + + + + """) + + def testGetActions(self): + """Test retrieving next actions from a command stanza.""" + iq = self.Iq() + iq['command']['node'] = 'foo' + iq['command']['actions'] = ['prev', 'next'] + + results = iq['command']['actions'] + expected = ['prev', 'next'] + self.assertEqual(results, expected, + "Incorrect next actions: %s" % results) + + def testDelActions(self): + """Test removing next actions from a command stanza.""" + iq = self.Iq() + iq['type'] = 'result' + iq['command']['node'] = 'foo' + iq['command']['actions'] = ['prev', 'next'] + + del iq['command']['actions'] + + self.check(iq, """ + + + + """) + + def testAddNote(self): + """Test adding a command note.""" + iq = self.Iq() + iq['type'] = 'result' + iq['command']['node'] = 'foo' + iq['command'].add_note('Danger!', ntype='warning') + + self.check(iq, """ + + + Danger! + + + """) + + def testNotes(self): + """Test using command notes.""" + iq = self.Iq() + iq['type'] = 'result' + iq['command']['node'] = 'foo' + + notes = [('info', 'Interesting...'), + ('warning', 'Danger!'), + ('error', "I can't let you do that")] + iq['command']['notes'] = notes + + self.failUnless(iq['command']['notes'] == notes, + "Notes don't match: %s %s" % (notes, iq['command']['notes'])) + + self.check(iq, """ + + + Interesting... + Danger! + I can't let you do that + + + """) + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestAdHocCommandStanzas) diff --git a/tests/test_stream_xep_0050.py b/tests/test_stream_xep_0050.py new file mode 100644 index 00000000..11b293c8 --- /dev/null +++ b/tests/test_stream_xep_0050.py @@ -0,0 +1,686 @@ +import time +import threading + +from sleekxmpp.test import * + + +class TestAdHocCommands(SleekTest): + + def setUp(self): + self.stream_start(mode='client', + plugins=['xep_0030', 'xep_0004', 'xep_0050']) + + # Real session IDs don't make for nice tests, so use + # a dummy value. + self.xmpp['xep_0050'].new_session = lambda: '_sessionid_' + + def tearDown(self): + self.stream_close() + + def testZeroStepCommand(self): + """Test running a command with no steps.""" + + def handle_command(iq, session): + form = self.xmpp['xep_0004'].makeForm(ftype='result') + form.addField(var='foo', ftype='text-single', + label='Foo', value='bar') + + session['payload'] = form + session['next'] = None + session['has_next'] = False + + return session + + self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', + 'Do Foo', handle_command) + + self.recv(""" + + + + """) + + self.send(""" + + + + + bar + + + + + """) + + def testOneStepCommand(self): + """Test running a single step command.""" + results = [] + + def handle_command(iq, session): + + def handle_form(form, session): + results.append(form['values']['foo']) + + form = self.xmpp['xep_0004'].makeForm('form') + form.addField(var='foo', ftype='text-single', label='Foo') + + session['payload'] = form + session['next'] = handle_form + session['has_next'] = False + + return session + + self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', + 'Do Foo', handle_command) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + + + + + + """) + + self.recv(""" + + + + + blah + + + + + """) + + self.send(""" + + + + """) + + self.assertEqual(results, ['blah'], + "Command handler was not executed: %s" % results) + + def testTwoStepCommand(self): + """Test using a two-stage command.""" + results = [] + + def handle_command(iq, session): + + def handle_step2(form, session): + results.append(form['values']['bar']) + + def handle_step1(form, session): + results.append(form['values']['foo']) + + form = self.xmpp['xep_0004'].makeForm('form') + form.addField(var='bar', ftype='text-single', label='Bar') + + session['payload'] = form + session['next'] = handle_step2 + session['has_next'] = False + + return session + + form = self.xmpp['xep_0004'].makeForm('form') + form.addField(var='foo', ftype='text-single', label='Foo') + + session['payload'] = form + session['next'] = handle_step1 + session['has_next'] = True + + return session + + self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', + 'Do Foo', handle_command) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + + + + + + """) + + self.recv(""" + + + + + blah + + + + + """) + + self.send(""" + + + + + + + + + + + """) + + self.recv(""" + + + + + meh + + + + + """) + self.send(""" + + + + """) + + self.assertEqual(results, ['blah', 'meh'], + "Command handler was not executed: %s" % results) + + def testCancelCommand(self): + """Test canceling command.""" + results = [] + + def handle_command(iq, session): + + def handle_form(form, session): + results.append(form['values']['foo']) + + def handle_cancel(iq, session): + results.append('canceled') + + form = self.xmpp['xep_0004'].makeForm('form') + form.addField(var='foo', ftype='text-single', label='Foo') + + session['payload'] = form + session['next'] = handle_form + session['cancel'] = handle_cancel + session['has_next'] = False + + return session + + self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', + 'Do Foo', handle_command) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + + + + + + """) + + self.recv(""" + + + + + blah + + + + + """) + + self.send(""" + + + + """) + + self.assertEqual(results, ['canceled'], + "Cancelation handler not executed: %s" % results) + + def testCommandNote(self): + """Test adding notes to commands.""" + + def handle_command(iq, session): + form = self.xmpp['xep_0004'].makeForm(ftype='result') + form.addField(var='foo', ftype='text-single', + label='Foo', value='bar') + + session['payload'] = form + session['next'] = None + session['has_next'] = False + session['notes'] = [('info', 'testing notes')] + + return session + + self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', + 'Do Foo', handle_command) + + self.recv(""" + + + + """) + + self.send(""" + + + testing notes + + + bar + + + + + """) + + + + def testMultiPayloads(self): + """Test using commands with multiple payloads.""" + results = [] + + def handle_command(iq, session): + + def handle_form(forms, session): + for form in forms: + results.append(form['values']['FORM_TYPE']) + + form1 = self.xmpp['xep_0004'].makeForm('form') + form1.addField(var='FORM_TYPE', ftype='hidden', value='form_1') + form1.addField(var='foo', ftype='text-single', label='Foo') + + form2 = self.xmpp['xep_0004'].makeForm('form') + form2.addField(var='FORM_TYPE', ftype='hidden', value='form_2') + form2.addField(var='foo', ftype='text-single', label='Foo') + + session['payload'] = [form1, form2] + session['next'] = handle_form + session['has_next'] = False + + return session + + self.xmpp['xep_0050'].add_command('tester@localhost', 'foo', + 'Do Foo', handle_command) + + self.recv(""" + + + + """) + + self.send(""" + + + + + + + + form_1 + + + + + + form_2 + + + + + + """) + + self.recv(""" + + + + + form_1 + + + bar + + + + + form_2 + + + bar + + + + + """) + + self.send(""" + + + + """) + + self.assertEqual(results, [['form_1'], ['form_2']], + "Command handler was not executed: %s" % results) + + def testClientAPI(self): + """Test using client-side API for commands.""" + results = [] + + def handle_complete(iq, session): + for item in session['custom_data']: + results.append(item) + + def handle_step2(iq, session): + form = self.xmpp['xep_0004'].makeForm(ftype='submit') + form.addField(var='bar', value='123') + + session['custom_data'].append('baz') + session['payload'] = form + session['next'] = handle_complete + self.xmpp['xep_0050'].complete_command(session) + + def handle_step1(iq, session): + form = self.xmpp['xep_0004'].makeForm(ftype='submit') + form.addField(var='foo', value='42') + + session['custom_data'].append('bar') + session['payload'] = form + session['next'] = handle_step2 + self.xmpp['xep_0050'].continue_command(session) + + session = {'custom_data': ['foo'], + 'next': handle_step1} + + self.xmpp['xep_0050'].start_command( + 'foo@example.com', + 'test_client', + session) + + self.send(""" + + + + """) + + self.recv(""" + + + + + + + + """) + + self.send(""" + + + + + 42 + + + + + """) + + self.recv(""" + + + + + + + + """) + + self.send(""" + + + + + 123 + + + + + """) + + self.recv(""" + + + + """) + + # Give the event queue time to process + time.sleep(0.3) + + self.failUnless(results == ['foo', 'bar', 'baz'], + 'Incomplete command workflow: %s' % results) + + def testClientAPICancel(self): + """Test using client-side cancel API for commands.""" + results = [] + + def handle_canceled(iq, session): + for item in session['custom_data']: + results.append(item) + + def handle_step1(iq, session): + session['custom_data'].append('bar') + session['next'] = handle_canceled + self.xmpp['xep_0050'].cancel_command(session) + + session = {'custom_data': ['foo'], + 'next': handle_step1} + + self.xmpp['xep_0050'].start_command( + 'foo@example.com', + 'test_client', + session) + + self.send(""" + + + + """) + + self.recv(""" + + + + + + + + """) + + self.send(""" + + + + """) + + self.recv(""" + + + + """) + + # Give the event queue time to process + time.sleep(0.3) + + self.failUnless(results == ['foo', 'bar'], + 'Incomplete command workflow: %s' % results) + + def testClientAPIError(self): + """Test using client-side error API for commands.""" + results = [] + + def handle_error(iq, session): + for item in session['custom_data']: + results.append(item) + + session = {'custom_data': ['foo'], + 'error': handle_error} + + self.xmpp['xep_0050'].start_command( + 'foo@example.com', + 'test_client', + session) + + self.send(""" + + + + """) + + self.recv(""" + + + + + + + """) + + # Give the event queue time to process + time.sleep(0.3) + + self.failUnless(results == ['foo'], + 'Incomplete command workflow: %s' % results) + + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestAdHocCommands) -- cgit v1.2.3