summaryrefslogtreecommitdiff
path: root/sleekxmpp
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp')
-rw-r--r--sleekxmpp/__init__.py4
-rw-r--r--sleekxmpp/xmlstream/statemachine.py43
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py72
3 files changed, 70 insertions, 49 deletions
diff --git a/sleekxmpp/__init__.py b/sleekxmpp/__init__.py
index 3379e45d..3b2b5e0f 100644
--- a/sleekxmpp/__init__.py
+++ b/sleekxmpp/__init__.py
@@ -159,11 +159,11 @@ class ClientXMPP(basexmpp, XMLStream):
def reconnect(self):
self.disconnect(reconnect=True)
- def disconnect(self, reconnect=False):
+ def disconnect(self, reconnect=False, error=False):
self.event("disconnected")
self.authenticated = False
self.sessionstarted = False
- XMLStream.disconnect(self, reconnect)
+ XMLStream.disconnect(self, reconnect, error)
def registerFeature(self, mask, pointer, breaker = False):
"""Register a stream feature."""
diff --git a/sleekxmpp/xmlstream/statemachine.py b/sleekxmpp/xmlstream/statemachine.py
index 9412d5ad..8939397b 100644
--- a/sleekxmpp/xmlstream/statemachine.py
+++ b/sleekxmpp/xmlstream/statemachine.py
@@ -86,11 +86,11 @@ class StateMachine(object):
start = time.time()
while not self.__current_state in from_states or not self.lock.acquire(False):
# detect timeout:
- if time.time() >= start + wait: return False
- self.notifier.wait(wait)
+ remainder = start + wait - time.time()
+ if remainder > 0: self.notifier.wait(remainder)
+ else: return False
try: # lock is acquired; all other threads will return false or wait until notify/timeout
- self.notifier.clear()
if self.__current_state in from_states: # should always be True due to lock
# Note that func might throw an exception, but that's OK, it aborts the transition
@@ -107,7 +107,8 @@ class StateMachine(object):
log.error( "StateMachine bug!! The lock should ensure this doesn't happen!" )
return False
finally:
- self.notifier.set()
+ self.notifier.set() # notify any waiting threads that the state has changed.
+ self.notifier.clear()
self.lock.release()
@@ -146,14 +147,14 @@ class StateMachine(object):
return _StateCtx(self, from_state, to_state, wait)
- def ensure(self, state, wait=0.0):
+ def ensure(self, state, wait=0.0, block_on_transition=False ):
'''
Ensure the state machine is currently in `state`, or wait until it enters `state`.
'''
- return self.ensure_any( (state,), wait=wait )
+ return self.ensure_any( (state,), wait=wait, block_on_transition=block_on_transition )
- def ensure_any(self, states, wait=0.0):
+ def ensure_any(self, states, wait=0.0, block_on_transition=False):
'''
Ensure we are currently in one of the given `states` or wait until
we enter one of those states.
@@ -172,16 +173,25 @@ class StateMachine(object):
if not state in self.__states:
raise ValueError( "StateMachine does not contain state '%s'" % state )
- # Locking never really gained us anything here, since the lock was released
- # before the function returned anyways. The only thing it _did_ do was
- # increase the probability that this function would block for longer than
- # intended if a `transition` function or context was running while holding
- # the lock.
+ # if we're in the middle of a transition, determine whether we should
+ # 'fall back' to the 'current' state, or wait for the new state, in order to
+ # avoid an operation occurring in the wrong state.
+ # TODO another option would be an ensure_ctx that uses a semaphore to allow
+ # threads to indicate they want to remain in a particular state.
+
+ # will return immediately if no transition is in process.
+ if block_on_transition:
+ # we're not in the middle of a transition; don't hold the lock
+ if self.lock.acquire(False): self.lock.release()
+ # wait for the transition to complete
+ else: self.notifier.wait()
+
start = time.time()
while not self.__current_state in states:
# detect timeout:
- if time.time() >= start + wait: return False
- self.notifier.wait(wait)
+ remainder = start + wait - time.time()
+ if remainder > 0: self.notifier.wait(remainder)
+ else: return False
return True
@@ -227,10 +237,11 @@ class _StateCtx:
start = time.time()
while not self.state_machine[ self.from_state ] or not self.state_machine.lock.acquire(False):
# detect timeout:
- if time.time() >= start + self.wait:
+ remainder = start + self.wait - time.time()
+ if remainder > 0: self.state_machine.notifier.wait(remainder)
+ else:
log.debug('StateMachine timeout while waiting for state: %s', self.from_state )
return False
- self.state_machine.notifier.wait(self.wait)
self._locked = True # lock has been acquired at this point
self.state_machine.notifier.clear()
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 1fb3f8bb..88ec219c 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -83,7 +83,9 @@ class XMLStream(object):
self.namespace_map = {}
- self.run = True
+ # booleans are not volatile in Python and changes
+ # do not seem to be detected easily between threads.
+ self.quit = threading.Event()
def setSocket(self, socket):
"Set the socket"
@@ -120,7 +122,7 @@ class XMLStream(object):
# holds the state lock.
delay = 1.0 # reconnection delay
- while self.run:
+ while not self.quit.is_set():
logging.debug('connecting....')
try:
if host and port:
@@ -150,14 +152,14 @@ class XMLStream(object):
if not reattempt: return False
except:
logging.exception("Connection error")
- if not reattempt: return False
+ if not reattempt: return False
# quiesce if rconnection fails:
# This algorithm based loosely on Twisted internet.protocol
# http://twistedmatrix.com/trac/browser/trunk/twisted/internet/protocol.py#L310
delay = min(delay * RECONNECT_QUIESCE_FACTOR, RECONNECT_MAX_DELAY)
delay = random.normalvariate(delay, delay * RECONNECT_QUIESCE_JITTER)
- logging.debug('Waiting %fs until next reconnect attempt...', delay)
+ logging.debug('Waiting %.3fs until next reconnect attempt...', delay)
time.sleep(delay)
@@ -189,8 +191,8 @@ class XMLStream(object):
raise RestartStream()
def process(self, threaded=True):
+ self.quit.clear()
self.scheduler.process(threaded=True)
- self.run = True
for t in range(0, HANDLER_THREADS):
th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
th.setDaemon(True)
@@ -214,14 +216,16 @@ class XMLStream(object):
def _process(self):
"Start processing the socket."
logging.debug('Process thread starting...')
- while self.run:
- if not self.state.ensure('connected',wait=2): continue
+ while not self.quit.is_set():
+ if not self.state.ensure('connected',wait=2, block_on_transition=True): continue
try:
+ logging.debug(' ------------------------------- starting process loop...')
self.sendPriorityRaw(self.stream_header)
- while self.run and self.__readXML(): pass
+ self.__readXML() # this loops until the stream is terminated.
except socket.timeout:
- logging.debug('socket rcv timeout')
- pass
+ # TODO currently this will re-send a stream header if this exception occurs.
+ # I don't think that's intended behavior.
+ logging.warn('socket rcv timeout')
except RestartStream:
logging.debug("Restarting stream...")
continue # DON'T re-initialize the stream -- this exception is sent
@@ -230,14 +234,19 @@ class XMLStream(object):
logging.debug("System interrupt detected")
self.shutdown()
self.eventqueue.put(('quit', None, None))
- except cElementTree.XMLParserError: #if there is an xml parsing exception, assume stream needs to be restarted
- logging.warn('XML RCV parsing error!', exc_info=1)
- if self.should_reconnect: self.disconnect(reconnect=True)
- else: self.disconnect()
except:
logging.exception('Unexpected error in RCV thread')
- if self.should_reconnect: self.disconnect(reconnect=True)
- else: self.disconnect()
+
+ # if the RCV socket is terminated for whatever reason (e.g. we reach this point of
+ # code,) our only sane choice of action is an attempt to re-establish the connection.
+ if not self.quit.is_set():
+ logging.info( 'about to reconnect..........' )
+ try:
+ self.disconnect(reconnect=self.should_reconnect, error=True)
+ except:
+ logging.exception( "WTF disconnect!" )
+ logging.info( 'reconnect complete!' )
+ logging.info( 'reconnect complete!' )
logging.debug('Quitting Process thread')
@@ -257,8 +266,8 @@ class XMLStream(object):
if event == b'end':
edepth += -1
if edepth == 0 and event == b'end':
- # what is this case exactly? Premature EOF?
- logging.debug("Ending readXML loop")
+ logging.warn("Premature EOF from read socket; Ending readXML loop")
+ # this is a premature EOF as far as I can tell; raise an exception so the stream get closed and re-established cleanly.
return False
elif edepth == 1:
#self.xmlin.put(xmlobj)
@@ -266,13 +275,14 @@ class XMLStream(object):
if root: root.clear()
if event == b'start':
edepth += 1
- logging.debug("Exiting readXML loop")
+ logging.warn("Exiting readXML loop")
+ # TODO under what conditions will this _ever_ occur?
return False
def _sendThread(self):
logging.debug('send thread starting...')
- while self.run:
- if not self.state.ensure('connected',wait=2): continue
+ while not self.quit.is_set():
+ if not self.state.ensure('connected',wait=2, block_on_transition=True): continue
data = None
try:
@@ -294,7 +304,7 @@ class XMLStream(object):
# some sort of event that could be handled by a common thread or the reader
# thread to perform reconnect and then re-initialize the handler threads as well.
if self.should_reconnect:
- self.disconnect(reconnect=True)
+ self.disconnect(reconnect=True, error=True)
def sendRaw(self, data):
self.sendqueue.put((1, data))
@@ -304,16 +314,16 @@ class XMLStream(object):
self.sendqueue.put((0, data))
return True
- def disconnect(self, reconnect=False):
+ def disconnect(self, reconnect=False, error=False):
with self.state.transition_ctx('connected','disconnected') as locked:
if not locked:
logging.warning("Already disconnected.")
return
+
logging.debug("Disconnecting...")
- self.sendRaw(self.stream_footer)
- time.sleep(5)
- #send end of stream
- #wait for end of stream back
+ # don't send a footer on error; if the stream is already closed,
+ # this won't get sent until the stream is re-initialized!
+ if not error: self.sendRaw(self.stream_footer) #send end of stream
try:
# self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
@@ -331,7 +341,7 @@ class XMLStream(object):
Disconnects and shuts down all event threads.
'''
self.disconnect()
- self.run = False
+ self.quit.set()
self.scheduler.run = False
def incoming_filter(self, xmlobj):
@@ -339,9 +349,9 @@ class XMLStream(object):
def __spawnEvent(self, xmlobj):
"watching xmlOut and processes handlers"
+ if logging.getLogger().isEnabledFor(logging.DEBUG):
+ logging.debug("RECV: %s" % cElementTree.tostring(xmlobj))
#convert XML into Stanza
- # TODO surround this log statement with an if, it's expensive
- logging.debug("RECV: %s" % cElementTree.tostring(xmlobj))
xmlobj = self.incoming_filter(xmlobj)
stanza = None
for stanza_class in self.__root_stanza:
@@ -369,7 +379,7 @@ class XMLStream(object):
def _eventRunner(self):
logging.debug("Loading event runner")
- while self.run:
+ while not self.quit.is_set():
try:
event = self.eventqueue.get(True, timeout=5)
except queue.Empty: