Asynchronous Client¶
The asynchronous client is based on Twisted, a very mature and powerful asynchronous programming framework. It supports destination specific message and error handlers (with default “poison pill” error handling), concurrent message processing, graceful shutdown, and connect and disconnect timeouts.
TLS/SSL support may be configured on the StompConfig
object
in exactly the same way as demonstrated in the sync client, but you need to enable TLS in Twisted via pip install 'twisted[tls]'
.
See also
STOMP protocol specification, Twisted API documentation, Apache ActiveMQ - Stomp, Apache ActiveMQ - SSL
Examples¶
Producer¶
import json
import logging
from twisted.internet import defer, task
from stompest.config import StompConfig
from stompest.async import Stomp
from stompest.async.listener import ReceiptListener
class Producer(object):
QUEUE = '/queue/testIn'
def __init__(self, config=None):
if config is None:
config = StompConfig('tcp://localhost:61613')
self.config = config
@defer.inlineCallbacks
def run(self, _):
client = Stomp(self.config)
yield client.connect()
client.add(ReceiptListener(1.0))
for j in range(10):
yield client.send(self.QUEUE, json.dumps({'count': j}).encode(), receipt='message-%d' % j)
client.disconnect(receipt='bye')
yield client.disconnected # graceful disconnect: waits until all receipts have arrived
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
task.react(Producer().run)
Transformer¶
import json
import logging
from twisted.internet import reactor, defer
from stompest.config import StompConfig
from stompest.protocol import StompSpec
from stompest.async import Stomp
from stompest.async.listener import SubscriptionListener
class IncrementTransformer(object):
IN_QUEUE = '/queue/testIn'
OUT_QUEUE = '/queue/testOut'
ERROR_QUEUE = '/queue/testTransformerError'
def __init__(self, config=None):
if config is None:
config = StompConfig('tcp://localhost:61613')
self.config = config
@defer.inlineCallbacks
def run(self):
client = Stomp(self.config)
yield client.connect()
headers = {
# client-individual mode is necessary for concurrent processing
# (requires ActiveMQ >= 5.2)
StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
# the maximal number of messages the broker will let you work on at the same time
'activemq.prefetchSize': '100',
}
client.subscribe(self.IN_QUEUE, headers, listener=SubscriptionListener(self.addOne, errorDestination=self.ERROR_QUEUE))
def addOne(self, client, frame):
"""
NOTE: you can return a Deferred here
"""
data = json.loads(frame.body.decode())
data['count'] += 1
client.send(self.OUT_QUEUE, json.dumps(data).encode())
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
IncrementTransformer().run()
reactor.run()
Consumer¶
import json
import logging
from twisted.internet import defer, reactor
from stompest.config import StompConfig
from stompest.protocol import StompSpec
from stompest.async import Stomp
from stompest.async.listener import SubscriptionListener
class Consumer(object):
QUEUE = '/queue/testOut'
ERROR_QUEUE = '/queue/testConsumerError'
def __init__(self, config=None):
if config is None:
config = StompConfig('tcp://localhost:61613')
self.config = config
@defer.inlineCallbacks
def run(self):
client = Stomp(self.config)
yield client.connect()
headers = {
# client-individual mode is necessary for concurrent processing
# (requires ActiveMQ >= 5.2)
StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
# the maximal number of messages the broker will let you work on at the same time
'activemq.prefetchSize': '100',
}
client.subscribe(self.QUEUE, headers, listener=SubscriptionListener(self.consume, errorDestination=self.ERROR_QUEUE))
def consume(self, client, frame):
"""
NOTE: you can return a Deferred here
"""
data = json.loads(frame.body.decode())
print('Received frame with count %d' % data['count'])
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
Consumer().run()
reactor.run()
API¶
-
class
stompest.async.client.
Stomp
(config, listenersFactory=None, endpointFactory=None)¶ An asynchronous STOMP client for the Twisted framework.
Parameters: - config – A
StompConfig
object. - listenersFactory – The listeners which this (parameterless) function produces will be added to the connection each time
connect()
is called. The default behavior (None
) is to usedefaultListeners()
in the moduleasync.listener
. - endpointFactory – This function produces a Twisted endpoint which will be used to establish the wire-level connection. It accepts two arguments broker (as it is produced by iteration over an
StompFailoverTransport
) and timeout (connect timeout in seconds,None
meaning that we will wait indefinitely). The default behavior (None
) is to useendpointFactory()
in the moduleasync.util
.
Note
All API methods which may request a RECEIPT frame from the broker – which is indicated by the receipt parameter – will wait for the RECEIPT response until this client’s
ReceiptListener
‘s timeout (given that one was added to this client, which by default is not the case). Here, “wait” is to be understood in the asynchronous sense that the method’stwisted.internet.defer.Deferred
result will only call back then. If receipt isNone
, no such header is sent, and the callback will be triggered earlier.See also
StompConfig
for how to set configuration options,StompSession
for session state,protocol.commands
for all API options which are documented here. Details on endpoints can be found in the Twisted endpoint howto.-
abort
(transaction=None, receipt=None)¶ Send an ABORT frame to abort a STOMP transaction.
-
ack
(frame, receipt=None)¶ Send an ACK frame for a received MESSAGE frame.
-
begin
(transaction=None, receipt=None)¶ Send a BEGIN frame to begin a STOMP transaction.
-
commit
(transaction=None, receipt=None)¶ Send a COMMIT frame to commit a STOMP transaction.
-
connect
(headers=None, versions=None, host=None, heartBeats=None, connectTimeout=None, connectedTimeout=None)¶ Establish a connection to a STOMP broker. If the wire-level connect fails, attempt a failover according to the settings in the client’s
StompConfig
object. If there are active subscriptions in thesession
, replay them when the STOMP connection is established.Parameters: - versions – The STOMP protocol versions we wish to support. The default behavior (
None
) is the same as for theconnect()
function of the commands API, but the highest supported version will be the one you specified in theStompConfig
object. The version which is valid for the connection about to be initiated will be stored in thesession
. - connectTimeout – This is the time (in seconds) to wait for the wire-level connection to be established. If
None
, we will wait indefinitely. - connectedTimeout – This is the time (in seconds) to wait for the STOMP connection to be established (that is, the broker’s CONNECTED frame to arrive). If
None
, we will wait indefinitely.
See also
The
protocol.failover
andsession
modules for the details of subscription replay and failover transport.- versions – The STOMP protocol versions we wish to support. The default behavior (
-
disconnect
(self, receipt=None, reason=None, timeout=None)¶ Send a DISCONNECT frame and terminate the STOMP connection.
Parameters: - reason – A disconnect reason (a
Exception
) to err back. Example:versions=['1.0', '1.1']
- timeout – This is the time (in seconds) to wait for a graceful disconnect, that is, for pending message handlers to complete. If timeout is
None
, we will wait indefinitely.
Note
The
session
‘s active subscriptions will be cleared if no failure has been passed to this method. This allows you to replay the subscriptions upon reconnect. If you do not wish to do so, you have to clear the subscriptions yourself by calling theclose()
method of thesession
. The result of any (user-requested or not) disconnect event is available via thedisconnected
property.- reason – A disconnect reason (a
-
disconnected
¶ This
twisted.internet.defer.Deferred
calls back when the connection to the broker was lost. It will err back when the connection loss was unexpected or caused by another error.
-
nack
(frame, receipt=None)¶ Send a NACK frame for a received MESSAGE frame.
-
remove
(listener)¶ Remove a listener from this client.
-
send
(destination, body=b'', headers=None, receipt=None)¶ Send a SEND frame.
-
sendFrame
(*args, **kwargs)¶ Send a raw STOMP frame.
Note
If we are not connected, this method, and all other API commands for sending STOMP frames except
connect()
, will raise aStompConnectionError
. Use this command only if you have to bypass theStompSession
logic and you know what you’re doing!
-
session
¶ The
StompSession
associated to this client.
-
subscribe
(destination, headers=None, receipt=None, listener=None)¶ Parameters: listener – An optional Listener
object which will be added to this connection to handle events associated to this subscription.Send a SUBSCRIBE frame to subscribe to a STOMP destination. The callback value of the
twisted.internet.defer.Deferred
which this method returns is a token which is used internally to match incoming MESSAGE frames and must be kept if you wish tounsubscribe()
later.
-
unsubscribe
(token, receipt=None)¶ Send an UNSUBSCRIBE frame to terminate an existing subscription.
Parameters: token – The result of the subscribe()
command which initiated the subscription in question.
- config – A
-
class
stompest.async.listener.
ConnectListener
¶ Waits for the CONNECTED frame to arrive.
-
class
stompest.async.listener.
DisconnectListener
¶ Handles graceful disconnect.
-
class
stompest.async.listener.
ErrorListener
¶ Handles ERROR frames.
-
class
stompest.async.listener.
HeartBeatListener
(thresholds=None)¶ Handles heart-beating.
Parameters: thresholds – tolerance thresholds (relative to the negotiated heart-beat periods). The default None
is equivalent to the content of the class atrributeDEFAULT_HEART_BEAT_THRESHOLDS
. Example:{'client': 0.6, 'server' 2.5}
means that the client will send a heart-beat if it had shown no activity for 60 % of the negotiated client heart-beat period and that the client will disconnect if the server has shown no activity for 250 % of the negotiated server heart-beat period.
-
class
stompest.async.listener.
Listener
¶ This base class defines the interface for the handlers of possible asynchronous STOMP connection events. You may implement any subset of these event handlers and add the resulting listener to the
Stomp
connection.
-
class
stompest.async.listener.
ReceiptListener
(timeout=None)¶ Parameters: timeout – When a STOMP frame was sent to the broker and a RECEIPT frame was requested, this is the time (in seconds) to wait for RECEIPT frames to arrive. If None
, we will wait indefinitely.Example:
>>> client.add(ReceiptListener(1.0))
-
class
stompest.async.listener.
SubscriptionListener
(handler, ack=True, errorDestination=None, onMessageFailed=None)¶ Corresponds to a STOMP subscription.
Parameters: - handler – A callable
f(client, frame)
which accepts aStomp
connection and the receivedStompFrame
. - ack – Check this option if you wish to automatically ack MESSAGE frames after they were handled (successfully or not).
- errorDestination – If a frame was not handled successfully, forward a copy of the offending frame to this destination. Example:
errorDestination='/queue/back-to-square-one'
- onMessageFailed – You can specify a custom error handler which must be a callable with signature
f(connection, failure, frame, errorDestination)
. Note that a non-trivial choice of this error handler overrides the default behavior (forward frame to error destination and ack it).
See also
The unit tests in the module
tests.async_client_integration_test
cover a couple of usage scenarios.-
onConnectionLost
(connection, reason)¶ Forget everything about this listener’s subscription and unregister from the connection.
-
onMessage
(connection, frame, context)¶ Handle a message originating from this listener’s subscription.
-
onSubscribe
(connection, frame, context)¶ Set the ack header of the SUBSCRIBE frame initiating this listener’s subscription to the value of the class atrribute
DEFAULT_ACK_MODE
(if it isn’t set already). Keep a copy of the headers for handling messages originating from this subscription.
-
onUnsubscribe
(connection, frame, context)¶ Forget everything about this listener’s subscription and unregister from the connection.
- handler – A callable