""" SleekXMPP: The Sleek XMPP Library Copyright (C) 2010 Nathanael C. Fritz This file is part of SleekXMPP. See the file license.txt for copying permission. """ from __future__ import with_statement import threading import time import logging class StateMachine(object): def __init__(self, states=[]): self.lock = threading.Condition(threading.RLock()) self.__states= [] self.addStates(states) self.__default_state = self.__states[0] self.__current_state = self.__default_state def addStates(self, states): with self.lock: for state in states: if state in self.__states: raise IndexError("The state '%s' is already in the StateMachine." % state) self.__states.append( state ) def transition(self, from_state, to_state, wait=0.0): ''' Transition from the given `from_state` to the given `to_state`. This method will return `True` if the state machine is now in `to_state`. It will return `False` if a timeout occurred the transition did not occur. If `wait` is 0 (the default,) this method returns immediately if the state machine is not in `from_state`. If you want the thread to block and transition once the state machine to enters `from_state`, set `wait` to a non-negative value. Note there is no 'block indefinitely' flag since this leads to deadlock. If you want to wait indefinitely, choose a reasonable value for `wait` (e.g. 20 seconds) and do so in a while loop like so: :: while not thread_should_exit and not state_machine.transition('disconnected', 'connecting', wait=20 ): pass # timeout will occur every 20s unless transition occurs if thread_should_exit: return # perform actions here after successful transition This allows the thread to be interrupted by setting `thread_should_exit=True` ''' return self.transition_any( (from_state,), to_state, wait=wait ) def transition_any(self, from_states, to_state, wait=0.0): ''' Transition from any of the given `from_states` to the given `to_state`. ''' with self.lock: for state in from_states: if isinstance(state,tuple) or isinstance(state,list): raise ValueError( "State %s should be a string. Did you mean to call 'StateMachine.transition_any()?" % str(state) ) if not state in self.__states: raise ValueError( "StateMachine does not contain from_state %s." % state ) if not to_state in self.__states: raise ValueError( "StateMachine does not contain to_state %s." % to_state ) start = time.time() while not self.__current_state in from_states: # detect timeout: if time.time() >= start + wait: return False self.lock.wait(wait) if self.__current_state in from_states: # should always be True due to lock logging.debug(' ==== TRANSITION %s -> %s', self.__current_state, to_state) self.__current_state = to_state self.lock.notifyAll() return True else: logging.error( "StateMachine bug!! The lock should ensure this doesn't happen!" ) return False def ensure(self, state, wait=0.0): ''' Ensure the state machine is currently in `state`, or wait until it enters `state`. ''' return self.ensure_any( (state,), wait=wait ) def ensure_any(self, states, wait=0.0): ''' Ensure we are currently in one of the given `states` ''' with self.lock: for state in states: if isinstance(state,tuple) or isinstance(state,list): raise ValueError( "State %s should be a string. Did you mean to call 'StateMachine.transition_any()?" % str(state) ) if not state in self.__states: raise ValueError( "StateMachine does not contain state %s." % state ) start = time.time() while not self.__current_state in states: # detect timeout: if time.time() >= start + wait: return False self.lock.wait(wait) return self.__current_state in states # should always be True due to lock def reset(self): # TODO need to lock before calling this? self.transition(self.__current_state, self._default_state) def current_state(self): ''' Return the current state name. ''' return self.__current_state def __getitem__(self, state): ''' Non-blocking, non-synchronized test to determine if we are in the given state. Use `StateMachine.ensure(state)` to wait until the machine enters a certain state. ''' return self.__current_state == state def __enter__(self): self.lock.acquire() return self def __exit__(self, exc_type, exc_val, exc_tb): self.lock.nofityAll() self.lock.release() return False # re-raise any exception