Protocol Layer

The protocol package is a collection of generic components each of which you can use independently for your own STOMP related functionality.

Note

Please restrict your imports to the main package stompest.protocol. The subpackage structure is potentially unstable.

Failover Transport

class stompest.protocol.failover.StompFailoverTransport(uri)

Looping over this object, you can produce a series of tuples (broker, delay in s). When the failover scheme does not allow further failover, a StompConnectTimeout error is raised.

Parameters:uri – A failover URI.

Example:

>>> from stompest.protocol import StompFailoverTransport
>>> from stompest.error import StompConnectTimeout
>>> failover = StompFailoverTransport('failover:(tcp://remote1:61615,tcp://localhost:61616)?randomize=false,startupMaxReconnectAttempts=3,initialReconnectDelay=7,maxReconnectDelay=8,maxReconnectAttempts=0')
>>> try:
...     for (broker, delay) in failover:
...         print 'broker: %s, delay: %f' % (broker, delay)                                                                       
... except StompConnectTimeout as e:
...     print 'timeout: %s' % e
... 
broker: {'host': 'remote1', 'protocol': 'tcp', 'port': 61615}, delay: 0.000000
broker: {'host': 'localhost', 'protocol': 'tcp', 'port': 61616}, delay: 0.007000
broker: {'host': 'remote1', 'protocol': 'tcp', 'port': 61615}, delay: 0.008000
broker: {'host': 'localhost', 'protocol': 'tcp', 'port': 61616}, delay: 0.008000
timeout: Reconnect timeout: 3 attempts
>>> try:
...     for (broker, delay) in failover:
...         print 'broker: %s, delay: %f' % (broker, delay)
... except StompConnectTimeout as e:
...     print 'timeout: %s' % e
... 
broker: {'host': 'remote1', 'protocol': 'tcp', 'port': 61615}, delay: 0.000000
timeout: Reconnect timeout: 0 attempts

See also

The StompFailoverUri which parses failover transport URIs.

class stompest.protocol.failover.StompFailoverUri(uri)

This is a parser for the failover URI scheme used in stompest. The parsed parameters are available in the attributes brokers and options. The Failover transport syntax is very close to the one used in ActiveMQ.

Parameters:uri

A failover URI. Its basic form is:

'failover:(uri1,...,uriN)?transportOptions'

or:

'failover:uri1,...,uriN'

Example:

>>> from stompest.protocol import StompFailoverUri    
>>> uri = StompFailoverUri('failover:(tcp://remote1:61615,tcp://localhost:61616)?randomize=false,startupMaxReconnectAttempts=3,initialReconnectDelay=7,maxReconnectDelay=8,maxReconnectAttempts=0')
>>> print uri.brokers
[{'host': 'remote1', 'protocol': 'tcp', 'port': 61615}, {'host': 'localhost', 'protocol': 'tcp', 'port': 61616}]
>>> print uri.options
{'initialReconnectDelay': 7, 'maxReconnectDelay': 8, 'backOffMultiplier': 2.0, 'startupMaxReconnectAttempts': 3, 'priorityBackup': False, 'maxReconnectAttempts': 0, 'reconnectDelayJitter': 0, 'useExponentialBackOff': True, 'randomize': False}

Supported Options:

option type default description
initialReconnectDelay int 10 how long to wait before the first reconnect attempt (in ms)
maxReconnectDelay int 30000 the maximum amount of time we ever wait between reconnect attempts (in ms)
useExponentialBackOff bool True should an exponential backoff be used between reconnect attempts
backOffMultiplier float 2.0 the exponent used in the exponential backoff attempts
maxReconnectAttempts int -1 -1 means retry forever 0 means don’t retry (only try connection once but no retry) > 0 means the maximum number of reconnect attempts before an error is sent back to the client
startupMaxReconnectAttempts int 0 if not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client on the first attempt by the client to start a connection, once connected the maxReconnectAttempts option takes precedence
reconnectDelayJitter int 0 jitter in ms by which reconnect delay is blurred in order to avoid stampeding
randomize bool True use a random algorithm to choose the the URI to use for reconnect from the list provided
priorityBackup bool False if set, prefer local connections to remote connections

Session State

The StompSession object implements an abstract STOMP protocol session, where “abstract” means that it is entirely client or transport agnostic. The session API builds upon the low-level and stateless API of the protocol.commands module, but it also keeps track of the session state (e.g., STOMP protocol version negotiation, active subscriptions). You can use the API provided by StompSession independently of the stompest clients to roll your own STOMP client.

Note

Being stateful implies that the session keeps track of subscriptions, receipts, and transactions, so keep track of them yourself, too! – Unless you like to be surprised by a spurious StompProtocolError ...

See also

The stateless API in the module protocol.commands for all API command parameters which are not documented here.

Example:

>>> from stompest.protocol import StompFrame, StompSession, StompSpec
>>> session = StompSession(StompSpec.VERSION_1_1)
>>> session.connect(login='', passcode='')
StompFrame(command='CONNECT', headers={'passcode': '', 'login': '', 'host': '', 'accept-version': '1.0,1.1'})
>>> print(session.version, session.state)
1.1 connecting
>>> session.connected(StompFrame(StompSpec.CONNECTED, {StompSpec.SESSION_HEADER: 'tete-a-tete'})) # The broker only understands STOMP 1.0.
>>> print(session.version, session.state)
1.0 connected
>>> session.disconnect()
StompFrame(command='DISCONNECT')
>>> print(session.version, session.state)
1.0 disconnecting
>>> session.close()
>>> print(session.version, session.state)
1.1 disconnected
class stompest.protocol.session.StompSession(version=None, check=True)

This object implements an abstract STOMP protocol session.

Parameters:
  • version – The highest (and at the same time default) STOMP protocol version.
  • check – This flag decides whether the session should accept commands only in the proper session states (True) or in any session state (False).
abort(transaction, receipt=None)

Create an ABORT frame to abort a STOMP transaction.

Parameters:transaction – See transaction().

Note

If you try and abort a transaction which is not pending, this will result in a StompProtocolError.

ack(frame, receipt=None)

Create an ACK frame for a received MESSAGE frame.

beat()

Create a STOMP heart-beat.

begin(transaction=None, receipt=None)

Create a BEGIN frame and begin an abstract STOMP transaction.

Parameters:transaction – See transaction().

Note

If you try and begin a pending transaction twice, this will result in a StompProtocolError.

clientHeartBeat

The negotiated client heart-beat period in ms.

close(flush=True)

Clean up the session: Set the state to DISCONNECTED, remove all information related to an eventual broker connection, clear all pending transactions and receipts.

Parameters:flush – Clear all active subscriptions. This flag controls whether the next connect() will replay the currently active subscriptions or will wipe the slate clean.
commit(transaction, receipt=None)

Send a COMMIT command to commit a STOMP transaction.

Parameters:transaction – See transaction().

Note

If you try and commit a transaction which is not pending, this will result in a StompProtocolError.

connect(login=None, passcode=None, headers=None, versions=None, host=None, heartBeats=None)

Create a CONNECT frame and set the session state to CONNECTING.

connected(frame)

Handle a CONNECTED frame and set the session state to CONNECTED.

disconnect(receipt=None)

Create a DISCONNECT frame and set the session state to DISCONNECTING.

id

The session id for the current client-broker connection.

lastReceived

The last time when data was received.

lastSent

The last time when data was sent.

message(frame)

Handle a MESSAGE frame. Returns a token which you can use to match this message to its subscription.

See also

The subscribe() method.

nack(frame, receipt=None)

Create a NACK frame for a received MESSAGE frame.

receipt(frame)

Handle a RECEIPT frame. Returns the receipt id which you can use to match this receipt to the command that requested it.

received()

Notify the session that data was received (counts as server heart-beat).

replay()

Flush all active subscriptions and return an iterator over the subscribe() parameters (destinations, header, receipt, context) which you can consume to replay the subscriptions upon the next connect().

send(destination, body='', headers=None, receipt=None)

Create a SEND frame.

sent()

Notify the session that data was sent (counts as client heart-beat).

server

The server id for the current client-broker connection.

serverHeartBeat

The negotiated server heart-beat period in ms.

state

The current session state.

subscribe(destination, headers=None, receipt=None, context=None)

Create a SUBSCRIBE frame and keep track of the subscription assiocated to it. This method returns a token which you have to keep if you wish to match incoming MESSAGE frames to this subscription with message() or to unsubscribe() later.

Parameters:context – An arbitrary context object which you can use to store any information related to the subscription at hand.
subscription(token)

For a given subscription token, obtain the corresponding subscription context.

Parameters:token – The result of the subscribe() method call which you used to initiate the subscription in question.
transaction(transaction=None)

Generate a transaction id which can be used for begin(), abort(), and commit().

Parameters:transaction – A valid transaction id, or None (automatically generate a unique id).
unsubscribe(token, receipt=None)

Create an UNSUBSCRIBE frame and lose track of the subscription assiocated to it.

version

The STOMP protocol version of the current client-broker connection (if any), or the version you created this session with (otherwise).

STOMP Frame Representation

class stompest.protocol.frame.StompFrame(command, headers=None, body='', rawHeaders=None, version=None)

This object represents a STOMP frame.

Parameters:
  • command – A valid STOMP command.
  • headers – The STOMP headers (represented as a dict), or None (no headers).
  • body – The frame body. The body will be cast as a binary string str (Python 2) or bytes (Python 3).
  • rawHeaders – The raw STOMP headers (represented as a collection of (header, value) pairs), or None (no raw headers).
  • version – A valid STOMP protocol version, or None (equivalent to the DEFAULT_VERSION attribute of the StompSpec class).

Note

The frame’s attributes are internally stored as arbitrary Python objects. The frame’s version attribute controls the wire-level encoding of its command and headers (depending on STOMP protocol version, this may be ASCII or UTF-8), while its body is not encoded at all (it’s just cast as a str).

Example:

>>> from stompest.protocol import StompFrame, StompSpec
>>> frame = StompFrame(StompSpec.SEND, rawHeaders=[('foo', 'bar1'), ('foo', 'bar2')])
>>> frame
StompFrame(command='SEND', rawHeaders=[('foo', 'bar1'), ('foo', 'bar2')])
>>> bytes(frame)
b'SEND\nfoo:bar1\nfoo:bar2\n\n\x00'
>>> dict(frame)
{'command': 'SEND', 'rawHeaders': [('foo', 'bar1'), ('foo', 'bar2')]}
>>> frame.headers
{'foo': 'bar1'}
>>> frame.headers = {'foo': 'bar3'}
>>> frame.headers
{'foo': 'bar1'}
>>> frame.unraw()
>>> frame
StompFrame(command='SEND', headers={'foo': 'bar1'})
>>> bytes(frame)
b'SEND\nfoo:bar1\n\n\x00'
>>> frame.headers = {'foo': 'bar4'}
>>> frame.headers
{'foo': 'bar4'}
>>> frame = StompFrame(StompSpec.SEND, rawHeaders=[('some french', b'fen\xc3\xaatre'.decode('utf-8'))], version=StompSpec.VERSION_1_0)
>>> bytes(frame)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
UnicodeEncodeError: 'ascii' codec can't encode character 'ê' in position 15: ordinal not in range(128)
>>> frame.version = StompSpec.VERSION_1_1
>>> bytes(frame)
b'SEND\nsome french:fen\xc3\xaatre\n\n\x00'
>>> frame.headers
{'some french': 'fenêtre'}
info()

Produce a log-friendly representation of the frame (show only non-trivial content, and truncate the message to INFO_LENGTH characters).

unraw()

If the frame has raw headers, copy their deduplicated version to the headers attribute, and remove the raw headers afterwards.

class stompest.protocol.frame.StompHeartBeat

This object represents a STOMP heart-beat. Its string representation (via __str__()) renders the wire-level STOMP heart-beat.

Commands

This module implements a low-level and stateless API for all commands of the STOMP protocol version supported by stompest. All STOMP command frames are represented as StompFrame objects. It forms the basis for StompSession which represents the full state of an abstract STOMP protocol session and (via StompSession) of both high-level STOMP clients. You can use the commands API independently of other stompest modules to roll your own STOMP related functionality.

Note

Whenever you have to pass a version parameter to a command, this is because the behavior of that command depends on the STOMP protocol version of your current session. The default version is the value of StompSpec.DEFAULT_VERSION, which is currently '1.0' but may change in upcoming versions of stompest (or you might override it yourself). Any command which does not conform to the STOMP protocol version in question will result in a StompProtocolError. The version parameter will always be the last argument in the signature; since command signatures may vary with a new STOMP protocol version, you are advised to always specify it as a keyword (as opposed to a positional) argument.

Examples:

>>> from stompest.protocol import commands, StompFrame, StompSpec
>>> versions = list(commands.versions(StompSpec.VERSION_1_1))
>>> versions
['1.0', '1.1']
>>> commands.connect(versions=versions)
StompFrame(command='CONNECT', headers={'host': '', 'accept-version': '1.0,1.1'})
>>> frame, token = commands.subscribe('/queue/test', {StompSpec.ACK_HEADER: 'client-individual', 'activemq.prefetchSize': '100'})
>>> frame = StompFrame(StompSpec.MESSAGE, {StompSpec.DESTINATION_HEADER: '/queue/test', StompSpec.MESSAGE_ID_HEADER: '007'}, b'hello')
>>> frame
StompFrame(command='MESSAGE', headers={'destination': '/queue/test', 'message-id': '007'}, body=b'hello')
>>> commands.message(frame) == token # This message matches your subscription.
True
>>> commands.message(frame)
('destination', '/queue/test')
>>> frame.version = StompSpec.VERSION_1_1
>>> commands.message(frame)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
stompest.error.StompProtocolError: Invalid MESSAGE frame (subscription header mandatory in version 1.1) [headers={'destination': '/queue/test', 'message-id': '007'}]
>>> commands.disconnect(receipt='message-12345')
StompFrame(command='DISCONNECT', headers={'receipt': 'message-12345'})

See also

Specification of STOMP protocols 1.0 and 1.1, your favorite broker’s documentation for additional STOMP headers.

stompest.protocol.commands.abort(transaction, receipt=None, version=None)

Create an ABORT frame.

Parameters:
  • transaction – The id of the transaction.
  • receipt – See disconnect().
stompest.protocol.commands.ack(frame, transactions=None, receipt=None)

Create an ACK frame for a received MESSAGE frame.

Parameters:
  • frame – The StompFrame object representing the MESSAGE frame we wish to ack.
  • transactions – The ids of currently active transactions — only if the frame is part of one of these transactions, the transaction header is included in the ACK frame.
  • receipt – See disconnect().
stompest.protocol.commands.beat(version=None)

Create a STOMP heart-beat.

stompest.protocol.commands.begin(transaction, receipt=None, version=None)

Create a BEGIN frame.

Parameters:
  • transaction – The id of the transaction.
  • receipt – See disconnect().
stompest.protocol.commands.commit(transaction, receipt=None, version=None)

Create a COMMIT frame.

Parameters:
  • transaction – The id of the transaction.
  • receipt – See disconnect().
stompest.protocol.commands.connect(login=None, passcode=None, headers=None, versions=None, host=None, heartBeats=None)

Create a CONNECT frame.

Parameters:
  • login – The login header. The default is None, which means that no such header will be added.
  • passcode – The passcode header. The default is None, which means that no such header will be added.
  • headers – Additional STOMP headers.
  • versions – A list of the STOMP versions we wish to support. The default is None, which means that we will offer the broker to accept any version prior or equal to the default STOMP protocol version.
  • host – The host header which gives this client a human readable name on the broker side.
  • heartBeats – A pair (client heart-beat, server heart-beat) of integer heart-beat intervals in ms. Both intervals must be non-negative. A client heart-beat of 0 means that no heart-beats will be sent by the client. Similarly, a server heart-beat of 0 means that the client does not expect heart-beats from the server.
stompest.protocol.commands.connected(frame, versions=None)

Handle a CONNECTED frame.

Parameters:versions – The same versions parameter you used to create the CONNECT frame.
stompest.protocol.commands.disconnect(receipt=None, version=None)

Create a DISCONNECT frame.

Parameters:receipt – Add a receipt header with this id to request a RECEIPT frame from the broker. If None, no such header is added.
stompest.protocol.commands.error(frame)

Handle an ERROR frame. Does not really do anything except checking that this is an ERROR frame.

stompest.protocol.commands.message(frame)

Handle a MESSAGE frame. Returns a token which you can use to match this message to its subscription.

See also

The subscribe() command.

stompest.protocol.commands.nack(frame, transactions=None, receipt=None)

Create a NACK frame for a received MESSAGE frame.

Parameters:
  • frame – The StompFrame object representing the MESSAGE frame we wish to nack.
  • transactions – The ids of currently active transactions — only if the frame is part of one of these transactions, the transaction header is included in the NACK frame.
  • receipt – See disconnect().
stompest.protocol.commands.negotiateHeartBeat(client, server)

Determine the negotiated heart-beating period.

Parameters:
  • client – The client’s proposed heart-beating period.
  • server – The server’s proposed heart-beating period.
stompest.protocol.commands.receipt(frame)

Handle a RECEIPT frame. Returns the receipt id which you can use to match this receipt to the command that requested it.

stompest.protocol.commands.send(destination, body='', headers=None, receipt=None, version=None)

Create a SEND frame.

Parameters:
  • destination – Destination for the frame.
  • body – Binary message body. If the body contains null-bytes, it must be accompanied by the STOMP header content-length which specifies the number of bytes in the message body.
  • headers – Additional STOMP headers.
  • receipt – See disconnect().
stompest.protocol.commands.stomp(login=None, passcode=None, headers=None, versions=None, host=None, heartBeats=None)

Create a STOMP frame. Not supported in STOMP protocol 1.0, synonymous to connect() for STOMP protocol 1.1 and higher.

stompest.protocol.commands.subscribe(destination, headers, receipt=None, version=None)

Create a pair (frame, token) of a SUBSCRIBE frame and a token which you have to keep if you wish to match incoming MESSAGE frames to this subscription with message() or to unsubscribe() later.

Parameters:
  • destination – Destination for the subscription.
  • headers – Additional STOMP headers.
  • receipt – See disconnect().
stompest.protocol.commands.unsubscribe(token, receipt=None, version=None)

Create an UNSUBSCRIBE frame.

Parameters:
  • token – The result of the subscribe() command which you used to initiate the subscription in question.
  • receipt – See disconnect().

STOMP Protocol Specification

class stompest.protocol.spec.StompSpec

This class hosts all constants related to the STOMP protocol specification in its various versions. There really isn’t much to document, but you are invited to take a look at all available constants in the source code. Wait a minute ... one attribute is particularly noteworthy, name DEFAULT_VERSION — which currently is '1.0' (but this may change in upcoming stompest releases, so you’re advised to always explicitly define which STOMP protocol version you are going to use).

See also

Specification of STOMP protocol, your favorite broker’s documentation for additional STOMP headers.

classmethod version(version=None)

Check whether version is a valid STOMP protocol version.

Parameters:version – A candidate version, or None (which is equivalent to the value of StompSpec.DEFAULT_VERSION).
classmethod versions(version)

Obtain all versions prior or equal to version.

Wire-Level Parser

class stompest.protocol.parser.StompParser(version=None)

This is a parser for a wire-level byte-stream of STOMP frames.

Parameters:version – A valid STOMP protocol version, or None (equivalent to the DEFAULT_VERSION attribute of the StompSpec class).

Example:

>>> from stompest.protocol import StompParser
>>> parser = StompParser('1.0') # STOMP 1.0 does not support the NACK command.
>>> messages = [b'RECEIPT\nreceipt-id:message-12345\n\n\x00', b'NACK\nsubscription:0\nmessage-id:007\n\n\x00']
>>> for message in messages:
...     parser.add(message)
... 
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
stompest.error.StompFrameError: Invalid command: 'NACK'
>>> parser.get()
StompFrame(command='RECEIPT', rawHeaders=[('receipt-id', 'message-12345')])
>>> parser.canRead()
False
>>> parser.get()
None
>>> parser = StompParser('1.1')
>>> parser.add(messages[1])
>>> parser.get()
StompFrame(command='NACK', rawHeaders=[('subscription', '0'), ('message-id', '007')], version='1.1')    
add(data)

Add a byte-stream of wire-level data.

Parameters:data – A byte-stream, i.e., a str-like (Python 2) or bytes-like (Python 3) object.
canRead()

Indicates whether there are frames available.

get()

Return the next frame as a StompFrame object (if any), or None (otherwise).

reset()

Reset internal state, including all fully or partially parsed frames.