Asynchronous Client

The asynchronous client is based on Twisted, a very mature and powerful asynchronous programming framework. It supports destination specific message and error handlers (with default “poison pill” error handling), concurrent message processing, graceful shutdown, and connect and disconnect timeouts.

TLS/SSL support may be configured on the StompConfig object in exactly the same way as demonstrated in the sync client, but you need to enable TLS in Twisted via pip install 'twisted[tls]'.

Examples

Producer

import json
import logging

from twisted.internet import defer, task

from stompest.config import StompConfig

from stompest.async import Stomp
from stompest.async.listener import ReceiptListener

class Producer(object):
    QUEUE = '/queue/testIn'

    def __init__(self, config=None):
        if config is None:
            config = StompConfig('tcp://localhost:61613')
        self.config = config

    @defer.inlineCallbacks
    def run(self, _):
        client = Stomp(self.config)
        yield client.connect()
        client.add(ReceiptListener(1.0))
        for j in range(10):
            yield client.send(self.QUEUE, json.dumps({'count': j}).encode(), receipt='message-%d' % j)
        client.disconnect(receipt='bye')
        yield client.disconnected # graceful disconnect: waits until all receipts have arrived

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    task.react(Producer().run)

Transformer

import json
import logging

from twisted.internet import reactor, defer

from stompest.config import StompConfig
from stompest.protocol import StompSpec

from stompest.async import Stomp
from stompest.async.listener import SubscriptionListener

class IncrementTransformer(object):
    IN_QUEUE = '/queue/testIn'
    OUT_QUEUE = '/queue/testOut'
    ERROR_QUEUE = '/queue/testTransformerError'

    def __init__(self, config=None):
        if config is None:
            config = StompConfig('tcp://localhost:61613')
        self.config = config

    @defer.inlineCallbacks
    def run(self):
        client = Stomp(self.config)
        yield client.connect()
        headers = {
            # client-individual mode is necessary for concurrent processing
            # (requires ActiveMQ >= 5.2)
            StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
            # the maximal number of messages the broker will let you work on at the same time
            'activemq.prefetchSize': '100',
        }
        client.subscribe(self.IN_QUEUE, headers, listener=SubscriptionListener(self.addOne, errorDestination=self.ERROR_QUEUE))

    def addOne(self, client, frame):
        """
        NOTE: you can return a Deferred here
        """
        data = json.loads(frame.body.decode())
        data['count'] += 1
        client.send(self.OUT_QUEUE, json.dumps(data).encode())

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    IncrementTransformer().run()
    reactor.run()

Consumer

import json
import logging

from twisted.internet import defer, reactor

from stompest.config import StompConfig
from stompest.protocol import StompSpec

from stompest.async import Stomp
from stompest.async.listener import SubscriptionListener


class Consumer(object):
    QUEUE = '/queue/testOut'
    ERROR_QUEUE = '/queue/testConsumerError'

    def __init__(self, config=None):
        if config is None:
            config = StompConfig('tcp://localhost:61613')
        self.config = config

    @defer.inlineCallbacks
    def run(self):
        client = Stomp(self.config)
        yield client.connect()
        headers = {
            # client-individual mode is necessary for concurrent processing
            # (requires ActiveMQ >= 5.2)
            StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
            # the maximal number of messages the broker will let you work on at the same time
            'activemq.prefetchSize': '100',
        }
        client.subscribe(self.QUEUE, headers, listener=SubscriptionListener(self.consume, errorDestination=self.ERROR_QUEUE))

    def consume(self, client, frame):
        """
        NOTE: you can return a Deferred here
        """
        data = json.loads(frame.body.decode())
        print('Received frame with count %d' % data['count'])

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    Consumer().run()
    reactor.run()

API

class stompest.async.client.Stomp(config, listenersFactory=None, endpointFactory=None)

An asynchronous STOMP client for the Twisted framework.

Parameters:
  • config – A StompConfig object.
  • listenersFactory – The listeners which this (parameterless) function produces will be added to the connection each time connect() is called. The default behavior (None) is to use defaultListeners() in the module async.listener.
  • endpointFactory – This function produces a Twisted endpoint which will be used to establish the wire-level connection. It accepts two arguments broker (as it is produced by iteration over an StompFailoverTransport) and timeout (connect timeout in seconds, None meaning that we will wait indefinitely). The default behavior (None) is to use endpointFactory() in the module async.util.

Note

All API methods which may request a RECEIPT frame from the broker – which is indicated by the receipt parameter – will wait for the RECEIPT response until this client’s ReceiptListener‘s timeout (given that one was added to this client, which by default is not the case). Here, “wait” is to be understood in the asynchronous sense that the method’s twisted.internet.defer.Deferred result will only call back then. If receipt is None, no such header is sent, and the callback will be triggered earlier.

See also

StompConfig for how to set configuration options, StompSession for session state, protocol.commands for all API options which are documented here. Details on endpoints can be found in the Twisted endpoint howto.

abort(transaction=None, receipt=None)

Send an ABORT frame to abort a STOMP transaction.

ack(frame, receipt=None)

Send an ACK frame for a received MESSAGE frame.

add(listener)

Add a listener to this client. For the interface definition, cf. Listener.

begin(transaction=None, receipt=None)

Send a BEGIN frame to begin a STOMP transaction.

commit(transaction=None, receipt=None)

Send a COMMIT frame to commit a STOMP transaction.

connect(headers=None, versions=None, host=None, heartBeats=None, connectTimeout=None, connectedTimeout=None)

Establish a connection to a STOMP broker. If the wire-level connect fails, attempt a failover according to the settings in the client’s StompConfig object. If there are active subscriptions in the session, replay them when the STOMP connection is established.

Parameters:
  • versions – The STOMP protocol versions we wish to support. The default behavior (None) is the same as for the connect() function of the commands API, but the highest supported version will be the one you specified in the StompConfig object. The version which is valid for the connection about to be initiated will be stored in the session.
  • connectTimeout – This is the time (in seconds) to wait for the wire-level connection to be established. If None, we will wait indefinitely.
  • connectedTimeout – This is the time (in seconds) to wait for the STOMP connection to be established (that is, the broker’s CONNECTED frame to arrive). If None, we will wait indefinitely.

See also

The protocol.failover and session modules for the details of subscription replay and failover transport.

disconnect(self, receipt=None, reason=None, timeout=None)

Send a DISCONNECT frame and terminate the STOMP connection.

Parameters:
  • reason – A disconnect reason (a Exception) to err back. Example: versions=['1.0', '1.1']
  • timeout – This is the time (in seconds) to wait for a graceful disconnect, that is, for pending message handlers to complete. If timeout is None, we will wait indefinitely.

Note

The session‘s active subscriptions will be cleared if no failure has been passed to this method. This allows you to replay the subscriptions upon reconnect. If you do not wish to do so, you have to clear the subscriptions yourself by calling the close() method of the session. The result of any (user-requested or not) disconnect event is available via the disconnected property.

disconnected

This twisted.internet.defer.Deferred calls back when the connection to the broker was lost. It will err back when the connection loss was unexpected or caused by another error.

nack(frame, receipt=None)

Send a NACK frame for a received MESSAGE frame.

remove(listener)

Remove a listener from this client.

send(destination, body=b'', headers=None, receipt=None)

Send a SEND frame.

sendFrame(*args, **kwargs)

Send a raw STOMP frame.

Note

If we are not connected, this method, and all other API commands for sending STOMP frames except connect(), will raise a StompConnectionError. Use this command only if you have to bypass the StompSession logic and you know what you’re doing!

session

The StompSession associated to this client.

subscribe(destination, headers=None, receipt=None, listener=None)
Parameters:listener – An optional Listener object which will be added to this connection to handle events associated to this subscription.

Send a SUBSCRIBE frame to subscribe to a STOMP destination. The callback value of the twisted.internet.defer.Deferred which this method returns is a token which is used internally to match incoming MESSAGE frames and must be kept if you wish to unsubscribe() later.

unsubscribe(token, receipt=None)

Send an UNSUBSCRIBE frame to terminate an existing subscription.

Parameters:token – The result of the subscribe() command which initiated the subscription in question.
class stompest.async.listener.ConnectListener

Waits for the CONNECTED frame to arrive.

class stompest.async.listener.DisconnectListener

Handles graceful disconnect.

class stompest.async.listener.ErrorListener

Handles ERROR frames.

class stompest.async.listener.HeartBeatListener(thresholds=None)

Handles heart-beating.

Parameters:thresholds – tolerance thresholds (relative to the negotiated heart-beat periods). The default None is equivalent to the content of the class atrribute DEFAULT_HEART_BEAT_THRESHOLDS. Example: {'client': 0.6, 'server' 2.5} means that the client will send a heart-beat if it had shown no activity for 60 % of the negotiated client heart-beat period and that the client will disconnect if the server has shown no activity for 250 % of the negotiated server heart-beat period.
class stompest.async.listener.Listener

This base class defines the interface for the handlers of possible asynchronous STOMP connection events. You may implement any subset of these event handlers and add the resulting listener to the Stomp connection.

class stompest.async.listener.ReceiptListener(timeout=None)
Parameters:timeout – When a STOMP frame was sent to the broker and a RECEIPT frame was requested, this is the time (in seconds) to wait for RECEIPT frames to arrive. If None, we will wait indefinitely.

Example:

>>> client.add(ReceiptListener(1.0))
class stompest.async.listener.SubscriptionListener(handler, ack=True, errorDestination=None, onMessageFailed=None)

Corresponds to a STOMP subscription.

Parameters:
  • handler – A callable f(client, frame) which accepts a Stomp connection and the received StompFrame.
  • ack – Check this option if you wish to automatically ack MESSAGE frames after they were handled (successfully or not).
  • errorDestination – If a frame was not handled successfully, forward a copy of the offending frame to this destination. Example: errorDestination='/queue/back-to-square-one'
  • onMessageFailed – You can specify a custom error handler which must be a callable with signature f(connection, failure, frame, errorDestination). Note that a non-trivial choice of this error handler overrides the default behavior (forward frame to error destination and ack it).

See also

The unit tests in the module tests.async_client_integration_test cover a couple of usage scenarios.

onConnectionLost(connection, reason)

Forget everything about this listener’s subscription and unregister from the connection.

onMessage(connection, frame, context)

Handle a message originating from this listener’s subscription.

onSubscribe(connection, frame, context)

Set the ack header of the SUBSCRIBE frame initiating this listener’s subscription to the value of the class atrribute DEFAULT_ACK_MODE (if it isn’t set already). Keep a copy of the headers for handling messages originating from this subscription.

onUnsubscribe(connection, frame, context)

Forget everything about this listener’s subscription and unregister from the connection.