From 0a23f84ec328898aad2c723128471f42a1f12022 Mon Sep 17 00:00:00 2001 From: Tom Nichols Date: Thu, 1 Jul 2010 15:10:22 -0400 Subject: fix for statemachine where operations would unintentionally block if the lock was acquired in a long-running transition --- tests/test_statemachine.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) (limited to 'tests') diff --git a/tests/test_statemachine.py b/tests/test_statemachine.py index e44b8e48..0046dd02 100644 --- a/tests/test_statemachine.py +++ b/tests/test_statemachine.py @@ -256,6 +256,73 @@ class testStateMachine(unittest.TestCase): self.assertTrue( s['three'] ) + def testTransitionsDontUnintentionallyBlock(self): + ''' + There was a bug where a long-running transition (e.g. one with a 'func' + arg or a `transition_ctx` call would cause any `transition` or `ensure` + call to block since the lock is acquired before checking the current + state. Attempts to acquire the mutex need to be non-blocking so when a + timeout is _not_ given, the caller can return immediately. At the same + time, threads that _do_ want to wait need the ability to be notified + (to avoid waiting beyond when the lock is released) so we've moved to a + combination of a plain-ol `threading.Lock` to act as mutex, and a + `threading.Event` to perform notification for threads who choose to wait. + ''' + + s = sm.StateMachine(('one','two','three')) + + with s.transition_ctx('two','three') as result: + self.failIf( result ) + self.assertTrue( s['one'] ) + self.failIf( s.current_state in ('two','three') ) + + self.assertTrue( s['one'] ) + + statuses = {'t1':"not started", + 't2':'not started'} + + def t1(): + print 'thread 1 started' + # no wait, so this should 'return False' immediately. + self.failIf( s.transition('two','three') ) + statuses['t1'] = 'complete' + print 'thread 1 transitioned' + + def t2(): + print 'thread 2 started' + self.failIf( s['two'] ) + self.failIf( s['three'] ) + # we want this thread to acquire the lock, but for + # the second thread not to wait on the first. + with s.transition_ctx('one','two', 10) as locked: + statuses['t2'] = 'started' + print 'thread 2 has entered context' + self.assertTrue( locked ) + # give thread1 a chance to complete while this + # thread still owns the lock + time.sleep(5) + self.assertTrue( s['two'] ) + statuses['t2'] = 'complete' + + t1 = threading.Thread(target=t1) + t2 = threading.Thread(target=t2) + + t2.start() # this should acquire the lock + time.sleep(.2) + self.assertEqual( 'started', statuses['t2'] ) + t1.start() # but it shouldn't prevent thread 1 from completing + time.sleep(1) + + self.assertEqual( 'complete', statuses['t1'] ) + + t1.join() + t2.join() + + self.assertEqual( 'complete', statuses['t2'] ) + + self.assertTrue( s['two'] ) + + suite = unittest.TestLoader().loadTestsFromTestCase(testStateMachine) if __name__ == '__main__': unittest.main() -- cgit v1.2.3