Synchronous Client

The synchronous client is dead simple. It does not assume anything about your concurrency model (thread vs process) or force you to use it any particular way. It gets out of your way and lets you do what you want.

Examples

If you use ActiveMQ to run these examples, make sure you enable the STOMP connector, (see here for details):

<!-- add this to the config file "activemq.xml" -->
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>

For debugging purposes, it is highly recommended to turn on the logger on level DEBUG:

import logging
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)

Producer

from stompest.config import StompConfig
from stompest.sync import Stomp

CONFIG = StompConfig('tcp://localhost:61613')
QUEUE = '/queue/test'

if __name__ == '__main__':
    client = Stomp(CONFIG)
    client.connect()
    client.send(QUEUE, 'test message 1'.encode())
    client.send(QUEUE, 'test message 2'.encode())
    client.disconnect()

Consumer

from stompest.config import StompConfig
from stompest.protocol import StompSpec
from stompest.sync import Stomp

CONFIG = StompConfig('tcp://localhost:61613')
QUEUE = '/queue/test'

if __name__ == '__main__':
    client = Stomp(CONFIG)
    client.connect()
    client.subscribe(QUEUE, {StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL})
    while True:
        frame = client.receiveFrame()
        print('Got %s' % frame.info())
        client.ack(frame)
    client.disconnect()

TLS/SSL Consumer

import ssl
from stompest.config import StompConfig
from stompest.protocol import StompSpec
from stompest.sync import Stomp

context = ssl.create_default_context()
# Disable cert validation for demo only
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE

CONFIG = StompConfig('ssl://localhost:61612', sslContext=context)
QUEUE = '/queue/test'

if __name__ == '__main__':
    client = Stomp(CONFIG)
    client.connect()
    client.subscribe(QUEUE, {StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL})
    while True:
        frame = client.receiveFrame()
        print('Got %s' % frame.info())
        client.ack(frame)
    client.disconnect()

An SSL producer would be configured in the same way. The configuration of an SSL-enabled ActiveMQ server is somewhat complicated. The config file for ActiveMQ, activemq.xml, must have the following additions:

<!-- add this to the config file "activemq.xml" -->
<sslContext>
    <sslContext
        keyStore="broker.ks" keyStorePassword="password"
        trustStore="client.ts" trustStorePassword="password"/>
</sslContext>
<transportConnectors>
    <transportConnector name="stomp+ssl" uri="stomp+ssl://0.0.0.0:61612"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
</transportConnectors>

The SSL transport configuration (on port 61612) is shown alongside the standard STOMP configuration (on port 61613) for contrast. More about the required ActiveMQ setup, as well as instructions to generate the files broker.ks and client.ts may be found in the ActiveMQ documentation under How do I use SSL.

API

class stompest.sync.client.Stomp(config)

A synchronous STOMP client. This is the successor of the simple STOMP client in stompest 1.x, but the API is not backward compatible.

Parameters:config – A StompConfig object

See also

StompConfig for how to set session configuration options, StompSession for session state, protocol.commands for all API options which are documented here.

abort(transaction=None, receipt=None)

Send an ABORT frame to abort a STOMP transaction.

ack(frame, receipt=None)

Send an ACK frame for a received MESSAGE frame.

beat()

Create a STOMP heart-beat.

Example:

>>> # you might want to enable logging to trace the wire-level traffic
... import time
>>> client = Stomp(StompConfig('tcp://localhost:61613', version=StompSpec.VERSION_1_1))
>>> client.connect(heartBeats=(100, 100))
>>> start = time.time()
>>> elapsed = lambda t = None: (t or time.time()) - start
>>> times = lambda: 'elapsed: %.2f, last received: %.2f, last sent: %.2f' % (
...     elapsed(), elapsed(client.lastReceived), elapsed(client.lastSent)
... )
>>> while elapsed() < 2 * client.clientHeartBeat / 1000.0:
...     client.canRead(0.8 * client.serverHeartBeat / 1000.0) # poll server heart-beats
...     client.beat() # send client heart-beat
...     print times()
... 
False
elapsed: 0.08, last received: 0.00, last sent: 0.08
False
elapsed: 0.17, last received: 0.00, last sent: 0.17
False
elapsed: 0.25, last received: 0.20, last sent: 0.25
>>> client.canRead() # server will disconnect us because we're not heart-beating any more
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
stompest.error.StompConnectionError: Connection closed [No more data]
>>> print times()
elapsed: 0.50, last received: 0.50, last sent: 0.25
begin(transaction=None, receipt=None)

Send a BEGIN frame to begin a STOMP transaction.

canRead(timeout=None)

Tell whether there is an incoming STOMP frame available for us to read.

Parameters:timeout – This is the time (in seconds) to wait for a frame to become available. If None, we will wait indefinitely.

Note

If the wire-level connection is not available, this method will raise a StompConnectionError!

clientHeartBeat

The negotiated client heart-beat period in ms.

close(flush=True)

Close both the client’s session and transport (that is, the wire-level connection with the broker).

Parameters:flush – Decides whether the session should forget its active subscriptions or not.

Note

If you do not flush the subscriptions, they will be replayed upon this client’s next connect()!

commit(transaction=None, receipt=None)

Send a COMMIT frame to commit a STOMP transaction.

connect(headers=None, versions=None, host=None, heartBeats=None, connectTimeout=None, connectedTimeout=None)

Establish a connection to a STOMP broker. If the wire-level connect fails, attempt a failover according to the settings in the client’s StompConfig object. If there are active subscriptions in the session, replay them when the STOMP connection is established.

Parameters:
  • versions – The STOMP protocol versions we wish to support. The default behavior (None) is the same as for the connect() function of the commands API, but the highest supported version will be the one you specified in the StompConfig object. The version which is valid for the connection about to be initiated will be stored in the session.
  • connectTimeout – This is the time (in seconds) to wait for the wire-level connection to be established. If None, we will wait indefinitely.
  • connectedTimeout – This is the time (in seconds) to wait for the STOMP connection to be established (that is, the broker’s CONNECTED frame to arrive). If None, we will wait indefinitely.

Example:

>>> client = Stomp(StompConfig('tcp://localhost:61613', version=StompSpec.VERSION_1_1))
>>> client.connect()
>>> client.session.version
'1.1'
>>> client.disconnect()
>>> client.connect(versions=[StompSpec.VERSION_1_0])
>>> client.session.version
'1.0'
>>> client.disconnect()
>>> client.session.version
'1.1'

See also

The protocol.failover and protocol.session modules for the details of subscription replay and failover transport.

disconnect(receipt=None)

Send a STOMP DISCONNECT command and terminate the STOMP connection.

Note

Calling this method will clear the session’s active subscriptions unless you request a RECEIPT response from the broker. In the latter case, you have to disconnect the wire-level connection and flush the subscriptions yourself by calling self.close(flush=True).

lastReceived

The last time when data was received.

lastSent

The last time when data was sent.

message(frame)

If you received a MESSAGE frame, this method will produce a token which allows you to match it against its subscription.

Parameters:frame – a MESSAGE frame.

Note

If the client is not aware of the subscription, or if we are not connected, this method will raise a StompProtocolError.

nack(frame, receipt=None)

Send a NACK frame for a received MESSAGE frame.

receipt(frame)

If you received a RECEIPT frame, this method will extract the receipt id which you employed to request that receipt.

Parameters:frame – A MESSAGE frame (a StompFrame object).

Note

If the client is not aware of the outstanding receipt, this method will raise a StompProtocolError.

receiveFrame()

Fetch the next available frame.

Note

If we are not connected, this method will raise a StompConnectionError. Keep in mind that this method will block forever if there are no frames incoming on the wire. Be sure to use peek with self.canRead(timeout) before!

send(destination, body=b'', headers=None, receipt=None)

Send a SEND frame.

sendFrame(frame)

Send a raw STOMP frame.

Parameters:frame – Any STOMP frame (represented as a StompFrame object).

Note

If we are not connected, this method, and all other API commands for sending STOMP frames except connect(), will raise a StompConnectionError. Use this command only if you have to bypass the StompSession logic and you know what you’re doing!

serverHeartBeat

The negotiated server heart-beat period in ms.

session

The StompSession associated to this client.

subscribe(destination, headers=None, receipt=None)

Send a SUBSCRIBE frame to subscribe to a STOMP destination. This method returns a token which you have to keep if you wish to match incoming MESSAGE frames to this subscription or to unsubscribe() later.

transaction(transaction=None, receipt=None)

A context manager for STOMP transactions. Upon entering the with block, a transaction will be begun and upon exiting, that transaction will be committed or (if an error occurred) aborted.

Example:

>>> client = Stomp(StompConfig('tcp://localhost:61613'))
>>> client.connect()
>>> client.subscribe('/queue/test', {'ack': 'client-individual'})
('destination', '/queue/test')
>>> client.canRead(0) # Check that queue is empty.
False
>>> with client.transaction(receipt='important') as transaction:
...     client.send('/queue/test', b'message with transaction header', {StompSpec.TRANSACTION_HEADER: transaction})
...     client.send('/queue/test', b'message without transaction header')
...     raise RuntimeError('poof')
... 
Traceback (most recent call last):
  File "<stdin>", line 4, in <module>
RuntimeError: poof
>>> client.receiveFrame()
StompFrame(command='RECEIPT', headers={'receipt-id': 'important-begin'})
>>> client.receiveFrame()
StompFrame(command='RECEIPT', headers={'receipt-id': 'important-abort'})
>>> frame = client.receiveFrame()
>>> frame.command, frame.body
('MESSAGE', b'message without transaction header')
>>> client.ack(frame)
>>> client.canRead(0) # frame with transaction header was dropped by the broker
False
>>> client.disconnect()
unsubscribe(token, receipt=None)

Send an UNSUBSCRIBE frame to terminate an existing subscription.