diff options
Diffstat (limited to 'sleekxmpp/xmlstream/statemachine.py')
-rw-r--r-- | sleekxmpp/xmlstream/statemachine.py | 43 |
1 files changed, 27 insertions, 16 deletions
diff --git a/sleekxmpp/xmlstream/statemachine.py b/sleekxmpp/xmlstream/statemachine.py index 9412d5ad..8939397b 100644 --- a/sleekxmpp/xmlstream/statemachine.py +++ b/sleekxmpp/xmlstream/statemachine.py @@ -86,11 +86,11 @@ class StateMachine(object): start = time.time() while not self.__current_state in from_states or not self.lock.acquire(False): # detect timeout: - if time.time() >= start + wait: return False - self.notifier.wait(wait) + remainder = start + wait - time.time() + if remainder > 0: self.notifier.wait(remainder) + else: return False try: # lock is acquired; all other threads will return false or wait until notify/timeout - self.notifier.clear() if self.__current_state in from_states: # should always be True due to lock # Note that func might throw an exception, but that's OK, it aborts the transition @@ -107,7 +107,8 @@ class StateMachine(object): log.error( "StateMachine bug!! The lock should ensure this doesn't happen!" ) return False finally: - self.notifier.set() + self.notifier.set() # notify any waiting threads that the state has changed. + self.notifier.clear() self.lock.release() @@ -146,14 +147,14 @@ class StateMachine(object): return _StateCtx(self, from_state, to_state, wait) - def ensure(self, state, wait=0.0): + def ensure(self, state, wait=0.0, block_on_transition=False ): ''' Ensure the state machine is currently in `state`, or wait until it enters `state`. ''' - return self.ensure_any( (state,), wait=wait ) + return self.ensure_any( (state,), wait=wait, block_on_transition=block_on_transition ) - def ensure_any(self, states, wait=0.0): + def ensure_any(self, states, wait=0.0, block_on_transition=False): ''' Ensure we are currently in one of the given `states` or wait until we enter one of those states. @@ -172,16 +173,25 @@ class StateMachine(object): if not state in self.__states: raise ValueError( "StateMachine does not contain state '%s'" % state ) - # Locking never really gained us anything here, since the lock was released - # before the function returned anyways. The only thing it _did_ do was - # increase the probability that this function would block for longer than - # intended if a `transition` function or context was running while holding - # the lock. + # if we're in the middle of a transition, determine whether we should + # 'fall back' to the 'current' state, or wait for the new state, in order to + # avoid an operation occurring in the wrong state. + # TODO another option would be an ensure_ctx that uses a semaphore to allow + # threads to indicate they want to remain in a particular state. + + # will return immediately if no transition is in process. + if block_on_transition: + # we're not in the middle of a transition; don't hold the lock + if self.lock.acquire(False): self.lock.release() + # wait for the transition to complete + else: self.notifier.wait() + start = time.time() while not self.__current_state in states: # detect timeout: - if time.time() >= start + wait: return False - self.notifier.wait(wait) + remainder = start + wait - time.time() + if remainder > 0: self.notifier.wait(remainder) + else: return False return True @@ -227,10 +237,11 @@ class _StateCtx: start = time.time() while not self.state_machine[ self.from_state ] or not self.state_machine.lock.acquire(False): # detect timeout: - if time.time() >= start + self.wait: + remainder = start + self.wait - time.time() + if remainder > 0: self.state_machine.notifier.wait(remainder) + else: log.debug('StateMachine timeout while waiting for state: %s', self.from_state ) return False - self.state_machine.notifier.wait(self.wait) self._locked = True # lock has been acquired at this point self.state_machine.notifier.clear() |