baseplate.message_queue
¶
This module provides a thin wrapper around POSIX Message queues.
Note
This implementation uses POSIX Message queues and is not portable to all operating systems.
There are also various limits on the sizes of queues:
- The
msgqueue
rlimit
limits the amount of space the user can use on message queues. - The
fs.mqueue.msg_max
andfs.mqueue.msgsize_max
sysctls limit the maximum number of messages and the maximum size of each message which a queue can be configured to have.
Minimal Example¶
Here’s a minimal, artificial example of a separate producer and consumer process pair (run the producer then the consumer):
# producer.py
from baseplate.message_queue import MessageQueue
# If the queue doesn't already exist, we'll create it.
mq = MessageQueue(
"/baseplate-testing", max_messages=1, max_message_size=1)
message = "1"
mq.put(message)
print("Put Message: %s" % message)
You should see:
Put Message: 1
After running the producer once, we have a single message pushed on to our POSIX message queue. Next up, run the consumer:
# consumer.py
from baseplate.message_queue import MessageQueue
mq = MessageQueue(
"/baseplate-testing", max_messages=1, max_message_size=1)
# Unless a `timeout` kwarg is passed, this will block until
# we can pop a message from the queue.
message = mq.get()
print("Get Message: %s" % message)
You’ll end up seeing:
Get Message: 1
The /baseplate-testing
value is the name of the queue. Queues names should
start with a forward slash, followed by one or more characters (but no
additional slashes).
Multiple processes can bind to the same queue by specifying the same queue name.
Message Queue Default Limits¶
Most operating systems with POSIX queues include very low defaults for the maximum message size and maximum queue depths. On Linux 2.6+, you can list and check the values for these by running:
$ ls /proc/sys/fs/mqueue/
msg_default msg_max msgsize_default msgsize_max queues_max
$ cat /proc/sys/fs/mqueue/msgsize_max
8192
Explaining these in detail is outside the scope of this document, so we’ll
refer you to POSIX Message queues (or man 7 mq_overview
) for detailed
instructions on what these mean.
Gotchas¶
If you attempt to create a POSIX Queue where one of your provided values is
over the limits defined under /proc/sys/fs/mqueue/
, you’ll probably end
up seeing a vague ValueError
exception. Here’s an example:
>>> from baseplate.message_queue import MessageQueue
>>> mq = MessageQueue(
"/over-the-limit", max_messages=11, max_message_size=8096)
Traceback (most recent call last):
File "<input>", line 2, in <module>
File "/home/myuser/baseplate/baseplate/message_queue.py", line 83, in __init__
max_message_size=max_message_size,
ValueError: Invalid parameter(s)
Since the default value for /proc/sys/fs/mqueue/msg_max
on Linux is 10,
our max_messages=11
is invalid. You can raise these limits by doing
something like this as a privileged user:
$ echo "50" > /proc/sys/fs/mqueue/msg_max
CLI Usage¶
The message_queue module can also be run as a command-line tool to consume, log, and discard messages from a given queue:
python -m baseplate.message_queue --read /queue
or to write arbitrary messages to the queue:
echo hello! | python -m baseplate.message_queue --write /queue
See --help
for more info.
baseplate.message_queue¶
A gevent-friendly POSIX message queue.
-
class
baseplate.message_queue.
MessageQueue
(name, max_messages, max_message_size)¶ A gevent-friendly (but not required) inter process message queue.
name
should be a string of up to 255 characters consisting of an initial slash, followed by one or more characters, none of which are slashes.Note: This relies on POSIX message queues being available and select(2)-able like other file descriptors. Not all operating systems support this.
-
get
(timeout=None)¶ Read a message from the queue.
Parameters: timeout (float) – If the queue is empty, the call will block up to timeout
seconds or forever ifNone
.Raises: TimedOutError
The queue was empty for the allowed duration of the call.
-
put
(message, timeout=None)¶ Add a message to the queue.
Parameters: timeout (float) – If the queue is full, the call will block up to timeout
seconds or forever ifNone
.Raises: TimedOutError
The queue was full for the allowed duration of the call.
-
unlink
()¶ Remove the queue from the system.
The queue will not leave until the last active user closes it.
-
close
()¶ Close the queue, freeing related resources.
This must be called explicitly if queues are created/destroyed on the fly. It is not automatically called when the object is reclaimed by Python.
-