Protocol Layer¶
The protocol
package is a collection of generic components each of which you can use independently for your own STOMP related functionality.
Note
Please restrict your imports to the main package stompest.protocol
. The subpackage structure is potentially unstable.
Failover Transport¶
-
class
stompest.protocol.failover.
StompFailoverTransport
(uri)¶ Looping over this object, you can produce a series of tuples (broker, delay in s). When the failover scheme does not allow further failover, a
StompConnectTimeout
error is raised.Parameters: uri – A failover URI. Example:
>>> from stompest.protocol import StompFailoverTransport >>> from stompest.error import StompConnectTimeout >>> failover = StompFailoverTransport('failover:(tcp://remote1:61615,tcp://localhost:61616)?randomize=false,startupMaxReconnectAttempts=3,initialReconnectDelay=7,maxReconnectDelay=8,maxReconnectAttempts=0') >>> try: ... for (broker, delay) in failover: ... print 'broker: %s, delay: %f' % (broker, delay) ... except StompConnectTimeout as e: ... print 'timeout: %s' % e ... broker: {'host': 'remote1', 'protocol': 'tcp', 'port': 61615}, delay: 0.000000 broker: {'host': 'localhost', 'protocol': 'tcp', 'port': 61616}, delay: 0.007000 broker: {'host': 'remote1', 'protocol': 'tcp', 'port': 61615}, delay: 0.008000 broker: {'host': 'localhost', 'protocol': 'tcp', 'port': 61616}, delay: 0.008000 timeout: Reconnect timeout: 3 attempts >>> try: ... for (broker, delay) in failover: ... print 'broker: %s, delay: %f' % (broker, delay) ... except StompConnectTimeout as e: ... print 'timeout: %s' % e ... broker: {'host': 'remote1', 'protocol': 'tcp', 'port': 61615}, delay: 0.000000 timeout: Reconnect timeout: 0 attempts
See also
The
StompFailoverUri
which parses failover transport URIs.
-
class
stompest.protocol.failover.
StompFailoverUri
(uri)¶ This is a parser for the failover URI scheme used in stompest. The parsed parameters are available in the attributes
brokers
andoptions
. The Failover transport syntax is very close to the one used in ActiveMQ.Parameters: uri – A failover URI. Its basic form is:
'failover:(uri1,...,uriN)?transportOptions'
or:
'failover:uri1,...,uriN'
Example:
>>> from stompest.protocol import StompFailoverUri >>> uri = StompFailoverUri('failover:(tcp://remote1:61615,tcp://localhost:61616)?randomize=false,startupMaxReconnectAttempts=3,initialReconnectDelay=7,maxReconnectDelay=8,maxReconnectAttempts=0') >>> print uri.brokers [{'host': 'remote1', 'protocol': 'tcp', 'port': 61615}, {'host': 'localhost', 'protocol': 'tcp', 'port': 61616}] >>> print uri.options {'initialReconnectDelay': 7, 'maxReconnectDelay': 8, 'backOffMultiplier': 2.0, 'startupMaxReconnectAttempts': 3, 'priorityBackup': False, 'maxReconnectAttempts': 0, 'reconnectDelayJitter': 0, 'useExponentialBackOff': True, 'randomize': False}
Supported Options:
option type default description initialReconnectDelay int 10
how long to wait before the first reconnect attempt (in ms) maxReconnectDelay int 30000
the maximum amount of time we ever wait between reconnect attempts (in ms) useExponentialBackOff bool True
should an exponential backoff be used between reconnect attempts backOffMultiplier float 2.0
the exponent used in the exponential backoff attempts maxReconnectAttempts int -1
-1
means retry forever0
means don’t retry (only try connection once but no retry)> 0
means the maximum number of reconnect attempts before an error is sent back to the clientstartupMaxReconnectAttempts int 0
if not 0
, then this is the maximum number of reconnect attempts before an error is sent back to the client on the first attempt by the client to start a connection, once connected the maxReconnectAttempts option takes precedencereconnectDelayJitter int 0
jitter in ms by which reconnect delay is blurred in order to avoid stampeding randomize bool True
use a random algorithm to choose the the URI to use for reconnect from the list provided priorityBackup bool False
if set, prefer local connections to remote connections See also
StompFailoverTransport
, failover transport of ActiveMQ.
Session State¶
The StompSession
object implements an abstract STOMP protocol session, where “abstract” means that it is entirely client or transport agnostic. The session API builds upon the low-level and stateless API of the protocol.commands
module, but it also keeps track of the session state (e.g., STOMP protocol version negotiation, active subscriptions). You can use the API provided by StompSession
independently of the stompest clients to roll your own STOMP client.
Note
Being stateful implies that the session keeps track of subscriptions, receipts, and transactions, so keep track of them yourself, too! – Unless you like to be surprised by a spurious StompProtocolError
...
See also
The stateless API in the module protocol.commands
for all API command parameters which are not documented here.
Example:
>>> from stompest.protocol import StompFrame, StompSession, StompSpec
>>> session = StompSession(StompSpec.VERSION_1_1)
>>> session.connect(login='', passcode='')
StompFrame(command='CONNECT', headers={'passcode': '', 'login': '', 'host': '', 'accept-version': '1.0,1.1'})
>>> print(session.version, session.state)
1.1 connecting
>>> session.connected(StompFrame(StompSpec.CONNECTED, {StompSpec.SESSION_HEADER: 'tete-a-tete'})) # The broker only understands STOMP 1.0.
>>> print(session.version, session.state)
1.0 connected
>>> session.disconnect()
StompFrame(command='DISCONNECT')
>>> print(session.version, session.state)
1.0 disconnecting
>>> session.close()
>>> print(session.version, session.state)
1.1 disconnected
-
class
stompest.protocol.session.
StompSession
(version=None, check=True)¶ This object implements an abstract STOMP protocol session.
Parameters: - version – The highest (and at the same time default) STOMP protocol version.
- check – This flag decides whether the session should accept commands only in the proper session states (
True
) or in any session state (False
).
-
abort
(transaction, receipt=None)¶ Create an ABORT frame to abort a STOMP transaction.
Parameters: transaction – See transaction()
.Note
If you try and abort a transaction which is not pending, this will result in a
StompProtocolError
.
-
ack
(frame, receipt=None)¶ Create an ACK frame for a received MESSAGE frame.
-
beat
()¶ Create a STOMP heart-beat.
-
begin
(transaction=None, receipt=None)¶ Create a BEGIN frame and begin an abstract STOMP transaction.
Parameters: transaction – See transaction()
.Note
If you try and begin a pending transaction twice, this will result in a
StompProtocolError
.
-
clientHeartBeat
¶ The negotiated client heart-beat period in ms.
-
close
(flush=True)¶ Clean up the session: Set the state to
DISCONNECTED
, remove all information related to an eventual broker connection, clear all pending transactions and receipts.Parameters: flush – Clear all active subscriptions. This flag controls whether the next connect()
will replay the currently active subscriptions or will wipe the slate clean.
-
commit
(transaction, receipt=None)¶ Send a COMMIT command to commit a STOMP transaction.
Parameters: transaction – See transaction()
.Note
If you try and commit a transaction which is not pending, this will result in a
StompProtocolError
.
-
connect
(login=None, passcode=None, headers=None, versions=None, host=None, heartBeats=None)¶ Create a CONNECT frame and set the session state to
CONNECTING
.
-
connected
(frame)¶ Handle a CONNECTED frame and set the session state to
CONNECTED
.
-
disconnect
(receipt=None)¶ Create a DISCONNECT frame and set the session state to
DISCONNECTING
.
-
id
¶ The session id for the current client-broker connection.
-
lastReceived
¶ The last time when data was received.
-
lastSent
¶ The last time when data was sent.
-
message
(frame)¶ Handle a MESSAGE frame. Returns a token which you can use to match this message to its subscription.
See also
The
subscribe()
method.
-
nack
(frame, receipt=None)¶ Create a NACK frame for a received MESSAGE frame.
-
receipt
(frame)¶ Handle a RECEIPT frame. Returns the receipt id which you can use to match this receipt to the command that requested it.
-
received
()¶ Notify the session that data was received (counts as server heart-beat).
-
replay
()¶ Flush all active subscriptions and return an iterator over the
subscribe()
parameters (destinations, header, receipt, context) which you can consume to replay the subscriptions upon the nextconnect()
.
-
send
(destination, body='', headers=None, receipt=None)¶ Create a SEND frame.
-
sent
()¶ Notify the session that data was sent (counts as client heart-beat).
-
server
¶ The server id for the current client-broker connection.
-
serverHeartBeat
¶ The negotiated server heart-beat period in ms.
-
state
¶ The current session state.
-
subscribe
(destination, headers=None, receipt=None, context=None)¶ Create a SUBSCRIBE frame and keep track of the subscription assiocated to it. This method returns a token which you have to keep if you wish to match incoming MESSAGE frames to this subscription with
message()
or tounsubscribe()
later.Parameters: context – An arbitrary context object which you can use to store any information related to the subscription at hand.
-
subscription
(token)¶ For a given subscription token, obtain the corresponding subscription context.
Parameters: token – The result of the subscribe()
method call which you used to initiate the subscription in question.
-
transaction
(transaction=None)¶ Generate a transaction id which can be used for
begin()
,abort()
, andcommit()
.Parameters: transaction – A valid transaction id, or None
(automatically generate a unique id).
-
unsubscribe
(token, receipt=None)¶ Create an UNSUBSCRIBE frame and lose track of the subscription assiocated to it.
-
version
¶ The STOMP protocol version of the current client-broker connection (if any), or the version you created this session with (otherwise).
STOMP Frame Representation¶
-
class
stompest.protocol.frame.
StompFrame
(command, headers=None, body='', rawHeaders=None, version=None)¶ This object represents a STOMP frame.
Parameters: - command – A valid STOMP command.
- headers – The STOMP headers (represented as a
dict
), orNone
(no headers). - body – The frame body. The body will be cast as a binary string
str
(Python 2) orbytes
(Python 3). - rawHeaders – The raw STOMP headers (represented as a collection of (header, value) pairs), or
None
(no raw headers). - version – A valid STOMP protocol version, or
None
(equivalent to theDEFAULT_VERSION
attribute of theStompSpec
class).
Note
The frame’s attributes are internally stored as arbitrary Python objects. The frame’s
version
attribute controls the wire-level encoding of itscommand
andheaders
(depending on STOMP protocol version, this may be ASCII or UTF-8), while itsbody
is not encoded at all (it’s just cast as astr
).Example:
>>> from stompest.protocol import StompFrame, StompSpec >>> frame = StompFrame(StompSpec.SEND, rawHeaders=[('foo', 'bar1'), ('foo', 'bar2')]) >>> frame StompFrame(command='SEND', rawHeaders=[('foo', 'bar1'), ('foo', 'bar2')]) >>> bytes(frame) b'SEND\nfoo:bar1\nfoo:bar2\n\n\x00' >>> dict(frame) {'command': 'SEND', 'rawHeaders': [('foo', 'bar1'), ('foo', 'bar2')]} >>> frame.headers {'foo': 'bar1'} >>> frame.headers = {'foo': 'bar3'} >>> frame.headers {'foo': 'bar1'} >>> frame.unraw() >>> frame StompFrame(command='SEND', headers={'foo': 'bar1'}) >>> bytes(frame) b'SEND\nfoo:bar1\n\n\x00' >>> frame.headers = {'foo': 'bar4'} >>> frame.headers {'foo': 'bar4'} >>> frame = StompFrame(StompSpec.SEND, rawHeaders=[('some french', b'fen\xc3\xaatre'.decode('utf-8'))], version=StompSpec.VERSION_1_0) >>> bytes(frame) Traceback (most recent call last): File "<stdin>", line 1, in <module> UnicodeEncodeError: 'ascii' codec can't encode character 'ê' in position 15: ordinal not in range(128) >>> frame.version = StompSpec.VERSION_1_1 >>> bytes(frame) b'SEND\nsome french:fen\xc3\xaatre\n\n\x00' >>> frame.headers {'some french': 'fenêtre'}
-
info
()¶ Produce a log-friendly representation of the frame (show only non-trivial content, and truncate the message to INFO_LENGTH characters).
-
unraw
()¶ If the frame has raw headers, copy their deduplicated version to the
headers
attribute, and remove the raw headers afterwards.
-
class
stompest.protocol.frame.
StompHeartBeat
¶ This object represents a STOMP heart-beat. Its string representation (via
__str__()
) renders the wire-level STOMP heart-beat.
Commands¶
This module implements a low-level and stateless API for all commands of the STOMP protocol version supported by stompest. All STOMP command frames are represented as StompFrame
objects. It forms the basis for StompSession
which represents the full state of an abstract STOMP protocol session and (via StompSession
) of both high-level STOMP clients. You can use the commands API independently of other stompest modules to roll your own STOMP related functionality.
Note
Whenever you have to pass a version parameter to a command, this is because the behavior of that command depends on the STOMP protocol version of your current session. The default version is the value of StompSpec.DEFAULT_VERSION
, which is currently '1.0'
but may change in upcoming versions of stompest (or you might override it yourself). Any command which does not conform to the STOMP protocol version in question will result in a StompProtocolError
. The version parameter will always be the last argument in the signature; since command signatures may vary with a new STOMP protocol version, you are advised to always specify it as a keyword (as opposed to a positional) argument.
Examples:
>>> from stompest.protocol import commands, StompFrame, StompSpec
>>> versions = list(commands.versions(StompSpec.VERSION_1_1))
>>> versions
['1.0', '1.1']
>>> commands.connect(versions=versions)
StompFrame(command='CONNECT', headers={'host': '', 'accept-version': '1.0,1.1'})
>>> frame, token = commands.subscribe('/queue/test', {StompSpec.ACK_HEADER: 'client-individual', 'activemq.prefetchSize': '100'})
>>> frame = StompFrame(StompSpec.MESSAGE, {StompSpec.DESTINATION_HEADER: '/queue/test', StompSpec.MESSAGE_ID_HEADER: '007'}, b'hello')
>>> frame
StompFrame(command='MESSAGE', headers={'destination': '/queue/test', 'message-id': '007'}, body=b'hello')
>>> commands.message(frame) == token # This message matches your subscription.
True
>>> commands.message(frame)
('destination', '/queue/test')
>>> frame.version = StompSpec.VERSION_1_1
>>> commands.message(frame)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
stompest.error.StompProtocolError: Invalid MESSAGE frame (subscription header mandatory in version 1.1) [headers={'destination': '/queue/test', 'message-id': '007'}]
>>> commands.disconnect(receipt='message-12345')
StompFrame(command='DISCONNECT', headers={'receipt': 'message-12345'})
See also
Specification of STOMP protocols 1.0 and 1.1, your favorite broker’s documentation for additional STOMP headers.
-
stompest.protocol.commands.
abort
(transaction, receipt=None, version=None)¶ Create an ABORT frame.
Parameters: - transaction – The id of the transaction.
- receipt – See
disconnect()
.
-
stompest.protocol.commands.
ack
(frame, transactions=None, receipt=None)¶ Create an ACK frame for a received MESSAGE frame.
Parameters: - frame – The
StompFrame
object representing the MESSAGE frame we wish to ack. - transactions – The ids of currently active transactions — only if the frame is part of one of these transactions, the transaction header is included in the ACK frame.
- receipt – See
disconnect()
.
- frame – The
-
stompest.protocol.commands.
beat
(version=None)¶ Create a STOMP heart-beat.
-
stompest.protocol.commands.
begin
(transaction, receipt=None, version=None)¶ Create a BEGIN frame.
Parameters: - transaction – The id of the transaction.
- receipt – See
disconnect()
.
-
stompest.protocol.commands.
commit
(transaction, receipt=None, version=None)¶ Create a COMMIT frame.
Parameters: - transaction – The id of the transaction.
- receipt – See
disconnect()
.
-
stompest.protocol.commands.
connect
(login=None, passcode=None, headers=None, versions=None, host=None, heartBeats=None)¶ Create a CONNECT frame.
Parameters: - login – The login header. The default is
None
, which means that no such header will be added. - passcode – The passcode header. The default is
None
, which means that no such header will be added. - headers – Additional STOMP headers.
- versions – A list of the STOMP versions we wish to support. The default is
None
, which means that we will offer the broker to accept any version prior or equal to the default STOMP protocol version. - host – The host header which gives this client a human readable name on the broker side.
- heartBeats – A pair (client heart-beat, server heart-beat) of integer heart-beat intervals in ms. Both intervals must be non-negative. A client heart-beat of 0 means that no heart-beats will be sent by the client. Similarly, a server heart-beat of 0 means that the client does not expect heart-beats from the server.
- login – The login header. The default is
-
stompest.protocol.commands.
connected
(frame, versions=None)¶ Handle a CONNECTED frame.
Parameters: versions – The same versions parameter you used to create the CONNECT frame.
-
stompest.protocol.commands.
disconnect
(receipt=None, version=None)¶ Create a DISCONNECT frame.
Parameters: receipt – Add a receipt header with this id to request a RECEIPT frame from the broker. If None
, no such header is added.
-
stompest.protocol.commands.
error
(frame)¶ Handle an ERROR frame. Does not really do anything except checking that this is an ERROR frame.
-
stompest.protocol.commands.
message
(frame)¶ Handle a MESSAGE frame. Returns a token which you can use to match this message to its subscription.
See also
The
subscribe()
command.
-
stompest.protocol.commands.
nack
(frame, transactions=None, receipt=None)¶ Create a NACK frame for a received MESSAGE frame.
Parameters: - frame – The
StompFrame
object representing the MESSAGE frame we wish to nack. - transactions – The ids of currently active transactions — only if the frame is part of one of these transactions, the transaction header is included in the NACK frame.
- receipt – See
disconnect()
.
- frame – The
-
stompest.protocol.commands.
negotiateHeartBeat
(client, server)¶ Determine the negotiated heart-beating period.
Parameters: - client – The client’s proposed heart-beating period.
- server – The server’s proposed heart-beating period.
-
stompest.protocol.commands.
receipt
(frame)¶ Handle a RECEIPT frame. Returns the receipt id which you can use to match this receipt to the command that requested it.
-
stompest.protocol.commands.
send
(destination, body='', headers=None, receipt=None, version=None)¶ Create a SEND frame.
Parameters: - destination – Destination for the frame.
- body – Binary message body. If the body contains null-bytes, it must be accompanied by the STOMP header content-length which specifies the number of bytes in the message body.
- headers – Additional STOMP headers.
- receipt – See
disconnect()
.
-
stompest.protocol.commands.
stomp
(login=None, passcode=None, headers=None, versions=None, host=None, heartBeats=None)¶ Create a STOMP frame. Not supported in STOMP protocol 1.0, synonymous to
connect()
for STOMP protocol 1.1 and higher.
-
stompest.protocol.commands.
subscribe
(destination, headers, receipt=None, version=None)¶ Create a pair (frame, token) of a SUBSCRIBE frame and a token which you have to keep if you wish to match incoming MESSAGE frames to this subscription with
message()
or tounsubscribe()
later.Parameters: - destination – Destination for the subscription.
- headers – Additional STOMP headers.
- receipt – See
disconnect()
.
-
stompest.protocol.commands.
unsubscribe
(token, receipt=None, version=None)¶ Create an UNSUBSCRIBE frame.
Parameters: - token – The result of the
subscribe()
command which you used to initiate the subscription in question. - receipt – See
disconnect()
.
- token – The result of the
STOMP Protocol Specification¶
-
class
stompest.protocol.spec.
StompSpec
¶ This class hosts all constants related to the STOMP protocol specification in its various versions. There really isn’t much to document, but you are invited to take a look at all available constants in the source code. Wait a minute ... one attribute is particularly noteworthy, name
DEFAULT_VERSION
— which currently is'1.0'
(but this may change in upcoming stompest releases, so you’re advised to always explicitly define which STOMP protocol version you are going to use).See also
Specification of STOMP protocol, your favorite broker’s documentation for additional STOMP headers.
-
classmethod
version
(version=None)¶ Check whether version is a valid STOMP protocol version.
Parameters: version – A candidate version, or None
(which is equivalent to the value ofStompSpec.DEFAULT_VERSION
).
-
classmethod
versions
(version)¶ Obtain all versions prior or equal to version.
-
classmethod
Wire-Level Parser¶
-
class
stompest.protocol.parser.
StompParser
(version=None)¶ This is a parser for a wire-level byte-stream of STOMP frames.
Parameters: version – A valid STOMP protocol version, or None
(equivalent to theDEFAULT_VERSION
attribute of theStompSpec
class).Example:
>>> from stompest.protocol import StompParser >>> parser = StompParser('1.0') # STOMP 1.0 does not support the NACK command. >>> messages = [b'RECEIPT\nreceipt-id:message-12345\n\n\x00', b'NACK\nsubscription:0\nmessage-id:007\n\n\x00'] >>> for message in messages: ... parser.add(message) ... Traceback (most recent call last): File "<stdin>", line 1, in <module> stompest.error.StompFrameError: Invalid command: 'NACK' >>> parser.get() StompFrame(command='RECEIPT', rawHeaders=[('receipt-id', 'message-12345')]) >>> parser.canRead() False >>> parser.get() None >>> parser = StompParser('1.1') >>> parser.add(messages[1]) >>> parser.get() StompFrame(command='NACK', rawHeaders=[('subscription', '0'), ('message-id', '007')], version='1.1')
-
add
(data)¶ Add a byte-stream of wire-level data.
Parameters: data – A byte-stream, i.e., a str
-like (Python 2) orbytes
-like (Python 3) object.
-
canRead
()¶ Indicates whether there are frames available.
-
get
()¶ Return the next frame as a
StompFrame
object (if any), orNone
(otherwise).
-
reset
()¶ Reset internal state, including all fully or partially parsed frames.
-