1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
See the file license.txt for copying permission.
"""
from __future__ import with_statement
import threading
import time
import logging
class StateMachine(object):
def __init__(self, states=[]):
self.lock = threading.Condition(threading.RLock())
self.__states= []
self.addStates(states)
self.__default_state = self.__states[0]
self.__current_state = self.__default_state
def addStates(self, states):
with self.lock:
for state in states:
if state in self.__states:
raise IndexError("The state '%s' is already in the StateMachine." % state)
self.__states.append( state )
def transition(self, from_state, to_state, wait=0.0, func=None, args=[], kwargs={} ):
'''
Transition from the given `from_state` to the given `to_state`.
This method will return `True` if the state machine is now in `to_state`. It
will return `False` if a timeout occurred the transition did not occur.
If `wait` is 0 (the default,) this method returns immediately if the state machine
is not in `from_state`.
If you want the thread to block and transition once the state machine to enters
`from_state`, set `wait` to a non-negative value. Note there is no 'block
indefinitely' flag since this leads to deadlock. If you want to wait indefinitely,
choose a reasonable value for `wait` (e.g. 20 seconds) and do so in a while loop like so:
::
while not thread_should_exit and not state_machine.transition('disconnected', 'connecting', wait=20 ):
pass # timeout will occur every 20s unless transition occurs
if thread_should_exit: return
# perform actions here after successful transition
This allows the thread to be responsive by setting `thread_should_exit=True`.
The optional `func` argument allows the user to pass a callable operation which occurs
within the context of the state transition (e.g. while the state machine is locked.)
If `func` returns a True value, the transition will occur. If `func` returns a non-
True value or if an exception is thrown, the transition will not occur. Any thrown
exception is not caught by the state machine and is the caller's responsibility to handle.
If `func` completes normally, this method will return the value returned by `func.` If
values for `args` and `kwargs` are provided, they are expanded and passed like so:
`func( *args, **kwargs )`.
'''
return self.transition_any( (from_state,), to_state, wait=wait,
func=func, args=args, kwargs=kwargs )
def transition_any(self, from_states, to_state, wait=0.0, func=None, args=[], kwargs={} ):
'''
Transition from any of the given `from_states` to the given `to_state`.
'''
with self.lock:
for state in from_states:
if isinstance(state,tuple) or isinstance(state,list):
raise ValueError( "State %s should be a string. Did you mean to call 'StateMachine.transition_any()?" % str(state) )
if not state in self.__states:
raise ValueError( "StateMachine does not contain from_state %s." % state )
if not to_state in self.__states:
raise ValueError( "StateMachine does not contain to_state %s." % to_state )
start = time.time()
while not self.__current_state in from_states:
# detect timeout:
if time.time() >= start + wait: return False
self.lock.wait(wait)
if self.__current_state in from_states: # should always be True due to lock
return_val = True
# Note that func might throw an exception, but that's OK, it aborts the transition
if func is not None: return_val = func(*args,**kwargs)
# some 'false' value returned from func,
# indicating that transition should not occur:
if not return_val: return return_val
logging.debug(' ==== TRANSITION %s -> %s', self.__current_state, to_state)
self.__current_state = to_state
self.lock.notifyAll()
return return_val # some 'true' value returned by func or True if func was None
else:
logging.error( "StateMachine bug!! The lock should ensure this doesn't happen!" )
return False
def ensure(self, state, wait=0.0):
'''
Ensure the state machine is currently in `state`, or wait until it enters `state`.
'''
return self.ensure_any( (state,), wait=wait )
def ensure_any(self, states, wait=0.0):
'''
Ensure we are currently in one of the given `states`
'''
with self.lock:
for state in states:
if isinstance(state,tuple) or isinstance(state,list):
raise ValueError( "State %s should be a string. Did you mean to call 'StateMachine.transition_any()?" % str(state) )
if not state in self.__states:
raise ValueError( "StateMachine does not contain state %s." % state )
start = time.time()
while not self.__current_state in states:
# detect timeout:
if time.time() >= start + wait: return False
self.lock.wait(wait)
return self.__current_state in states # should always be True due to lock
def reset(self):
# TODO need to lock before calling this?
self.transition(self.__current_state, self._default_state)
def current_state(self):
'''
Return the current state name.
'''
return self.__current_state
def __getitem__(self, state):
'''
Non-blocking, non-synchronized test to determine if we are in the given state.
Use `StateMachine.ensure(state)` to wait until the machine enters a certain state.
'''
return self.__current_state == state
def __enter__(self):
self.lock.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.lock.nofityAll()
self.lock.release()
return False # re-raise any exception
|