summaryrefslogtreecommitdiff
path: root/tests/test_statemachine.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_statemachine.py')
-rw-r--r--tests/test_statemachine.py67
1 files changed, 67 insertions, 0 deletions
diff --git a/tests/test_statemachine.py b/tests/test_statemachine.py
index e44b8e48..0046dd02 100644
--- a/tests/test_statemachine.py
+++ b/tests/test_statemachine.py
@@ -256,6 +256,73 @@ class testStateMachine(unittest.TestCase):
self.assertTrue( s['three'] )
+ def testTransitionsDontUnintentionallyBlock(self):
+ '''
+ There was a bug where a long-running transition (e.g. one with a 'func'
+ arg or a `transition_ctx` call would cause any `transition` or `ensure`
+ call to block since the lock is acquired before checking the current
+ state. Attempts to acquire the mutex need to be non-blocking so when a
+ timeout is _not_ given, the caller can return immediately. At the same
+ time, threads that _do_ want to wait need the ability to be notified
+ (to avoid waiting beyond when the lock is released) so we've moved to a
+ combination of a plain-ol `threading.Lock` to act as mutex, and a
+ `threading.Event` to perform notification for threads who choose to wait.
+ '''
+
+ s = sm.StateMachine(('one','two','three'))
+
+ with s.transition_ctx('two','three') as result:
+ self.failIf( result )
+ self.assertTrue( s['one'] )
+ self.failIf( s.current_state in ('two','three') )
+
+ self.assertTrue( s['one'] )
+
+ statuses = {'t1':"not started",
+ 't2':'not started'}
+
+ def t1():
+ print 'thread 1 started'
+ # no wait, so this should 'return False' immediately.
+ self.failIf( s.transition('two','three') )
+ statuses['t1'] = 'complete'
+ print 'thread 1 transitioned'
+
+ def t2():
+ print 'thread 2 started'
+ self.failIf( s['two'] )
+ self.failIf( s['three'] )
+ # we want this thread to acquire the lock, but for
+ # the second thread not to wait on the first.
+ with s.transition_ctx('one','two', 10) as locked:
+ statuses['t2'] = 'started'
+ print 'thread 2 has entered context'
+ self.assertTrue( locked )
+ # give thread1 a chance to complete while this
+ # thread still owns the lock
+ time.sleep(5)
+ self.assertTrue( s['two'] )
+ statuses['t2'] = 'complete'
+
+ t1 = threading.Thread(target=t1)
+ t2 = threading.Thread(target=t2)
+
+ t2.start() # this should acquire the lock
+ time.sleep(.2)
+ self.assertEqual( 'started', statuses['t2'] )
+ t1.start() # but it shouldn't prevent thread 1 from completing
+ time.sleep(1)
+
+ self.assertEqual( 'complete', statuses['t1'] )
+
+ t1.join()
+ t2.join()
+
+ self.assertEqual( 'complete', statuses['t2'] )
+
+ self.assertTrue( s['two'] )
+
+
suite = unittest.TestLoader().loadTestsFromTestCase(testStateMachine)
if __name__ == '__main__': unittest.main()