summaryrefslogtreecommitdiff
path: root/sleekxmpp/xmlstream/statemachine.py
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/xmlstream/statemachine.py')
-rw-r--r--sleekxmpp/xmlstream/statemachine.py43
1 files changed, 27 insertions, 16 deletions
diff --git a/sleekxmpp/xmlstream/statemachine.py b/sleekxmpp/xmlstream/statemachine.py
index 9412d5ad..8939397b 100644
--- a/sleekxmpp/xmlstream/statemachine.py
+++ b/sleekxmpp/xmlstream/statemachine.py
@@ -86,11 +86,11 @@ class StateMachine(object):
start = time.time()
while not self.__current_state in from_states or not self.lock.acquire(False):
# detect timeout:
- if time.time() >= start + wait: return False
- self.notifier.wait(wait)
+ remainder = start + wait - time.time()
+ if remainder > 0: self.notifier.wait(remainder)
+ else: return False
try: # lock is acquired; all other threads will return false or wait until notify/timeout
- self.notifier.clear()
if self.__current_state in from_states: # should always be True due to lock
# Note that func might throw an exception, but that's OK, it aborts the transition
@@ -107,7 +107,8 @@ class StateMachine(object):
log.error( "StateMachine bug!! The lock should ensure this doesn't happen!" )
return False
finally:
- self.notifier.set()
+ self.notifier.set() # notify any waiting threads that the state has changed.
+ self.notifier.clear()
self.lock.release()
@@ -146,14 +147,14 @@ class StateMachine(object):
return _StateCtx(self, from_state, to_state, wait)
- def ensure(self, state, wait=0.0):
+ def ensure(self, state, wait=0.0, block_on_transition=False ):
'''
Ensure the state machine is currently in `state`, or wait until it enters `state`.
'''
- return self.ensure_any( (state,), wait=wait )
+ return self.ensure_any( (state,), wait=wait, block_on_transition=block_on_transition )
- def ensure_any(self, states, wait=0.0):
+ def ensure_any(self, states, wait=0.0, block_on_transition=False):
'''
Ensure we are currently in one of the given `states` or wait until
we enter one of those states.
@@ -172,16 +173,25 @@ class StateMachine(object):
if not state in self.__states:
raise ValueError( "StateMachine does not contain state '%s'" % state )
- # Locking never really gained us anything here, since the lock was released
- # before the function returned anyways. The only thing it _did_ do was
- # increase the probability that this function would block for longer than
- # intended if a `transition` function or context was running while holding
- # the lock.
+ # if we're in the middle of a transition, determine whether we should
+ # 'fall back' to the 'current' state, or wait for the new state, in order to
+ # avoid an operation occurring in the wrong state.
+ # TODO another option would be an ensure_ctx that uses a semaphore to allow
+ # threads to indicate they want to remain in a particular state.
+
+ # will return immediately if no transition is in process.
+ if block_on_transition:
+ # we're not in the middle of a transition; don't hold the lock
+ if self.lock.acquire(False): self.lock.release()
+ # wait for the transition to complete
+ else: self.notifier.wait()
+
start = time.time()
while not self.__current_state in states:
# detect timeout:
- if time.time() >= start + wait: return False
- self.notifier.wait(wait)
+ remainder = start + wait - time.time()
+ if remainder > 0: self.notifier.wait(remainder)
+ else: return False
return True
@@ -227,10 +237,11 @@ class _StateCtx:
start = time.time()
while not self.state_machine[ self.from_state ] or not self.state_machine.lock.acquire(False):
# detect timeout:
- if time.time() >= start + self.wait:
+ remainder = start + self.wait - time.time()
+ if remainder > 0: self.state_machine.notifier.wait(remainder)
+ else:
log.debug('StateMachine timeout while waiting for state: %s', self.from_state )
return False
- self.state_machine.notifier.wait(self.wait)
self._locked = True # lock has been acquired at this point
self.state_machine.notifier.clear()