name: testing-message-queue description: "Write tests for the bocpy message queue — the lock-free tag-based MPSC ring buffer. Use when: testing send/receive, FIFO ordering, selective receive, timeouts, cross-thread messaging, multi-producer scenarios, tag capacity limits, set_tags behavior, or payload round-trip fidelity."
Testing the Message Queue
This skill describes how to write tests for the bocpy message queue — the
lock-free, tag-based MPSC ring buffer implemented in C that underlies
send()/receive().
Architecture Overview
| Component | Detail |
|---|---|
| Queues | 16 statically allocated ring buffers (BOC_QUEUE_COUNT = 16), each with capacity 16 384. |
| Tags | String labels that map 1:1 to queues. Each unique tag occupies one queue slot. |
| Assignment modes | Automatic (default): tags claim a queue on first send/receive. Explicit: set_tags() pre-assigns tags, clears all messages, and resets queues. |
set_tags([]) | Drains every queue, frees all tag associations, and sets all queues to UNASSIGNED — effectively restoring auto-assign mode. |
| Thread safety | Enqueue is lock-free (atomic CAS on tail). Multiple threads can send to the same tag concurrently. receive spins on the queue with a sleep until a message is available or a timeout elapses. |
Test File Location
Message queue tests live in test/test_message_queue.py.
Fixture: Resetting Queue State
Every test must start with a clean slate. Use an autouse fixture that calls
set_tags([]) before and after each test. This drains all queues and returns
them to auto-assign mode — no set_tags call is needed inside individual tests
unless the test is specifically exercising set_tags behavior.
MAX_QUEUES = 16
@pytest.fixture(autouse=True)
def reset_queues():
set_tags([])
yield
set_tags([])
Why set_tags([]) instead of set_tags([...16 tags...])?
Calling set_tags is optional. The system auto-assigns tags to queues on
first use. The purpose of the fixture is to clear stale state, not to
pre-allocate tags. Using set_tags([]) resets all 16 queues to UNASSIGNED so
every test starts fresh, and most tests can exercise the auto-assignment path
naturally.
Pattern 1 — Basic Send / Receive
The simplest test: send a message, receive it, assert on the result. No
set_tags needed — the tag auto-assigns.
def test_basic():
send("my_tag", "hello")
tag, val = receive("my_tag", 1)
assert tag == "my_tag"
assert val == "hello"
Always pass a timeout to receive so tests fail fast rather than hanging.
Pattern 2 — Verifying FIFO Order
Messages on the same tag are delivered in FIFO order. Send a sequence, then receive and compare element-by-element.
def test_fifo():
for i in range(10):
send("fifo", i)
for i in range(10):
_, val = receive("fifo", 1)
assert val == i
Pattern 3 — Selective Receive Across Multiple Tags
receive accepts a list or tuple of tags and returns the first available match.
Messages on non-matching tags remain in their queues.
def test_selective():
send("skip", "no")
send("want", "yes")
_, val = receive("want", 1)
assert val == "yes"
# "skip" message is still there
_, val = receive("skip", 1)
assert val == "no"
def test_multi_tag_receive():
send("b", "found")
tag, val = receive(["a", "b", "c"], 1)
assert tag == "b"
assert val == "found"
Pattern 4 — Timeout Behavior
| Timeout value | Behavior |
|---|---|
0 | Return immediately — delivers a queued message or (TIMEOUT, None). |
| Positive float | Wait up to that many seconds. |
| Negative (default) | Block indefinitely until a message arrives. |
The optional after callback fires only on timeout and replaces the default
(TIMEOUT, None) return value.
def test_zero_timeout():
tag, val = receive("empty", 0)
assert tag == TIMEOUT
assert val is None
def test_after_callback():
tag, val = receive("miss", 0.05, lambda: ("fallback", 99))
assert tag == "fallback"
assert val == 99
def test_after_not_called_on_success():
called = []
send("ok", "payload")
tag, val = receive(
"ok", 1,
lambda: (called.append(True), ("fb", -1))[-1],
)
assert tag == "ok"
assert called == []
Pattern 5 — Cross-Thread Messaging
send and receive are thread-safe. Use threading.Thread to test
cross-thread delivery.
def test_cross_thread():
def sender():
send("ct", "from_thread")
t = threading.Thread(target=sender)
t.start()
tag, val = receive("ct", 5)
t.join()
assert tag == "ct"
assert val == "from_thread"
def test_bidirectional():
def echo():
_, val = receive("ping", 5)
send("pong", val * 2)
t = threading.Thread(target=echo)
t.start()
send("ping", 21)
_, val = receive("pong", 5)
t.join()
assert val == 42
Pattern 6 — Multiple Concurrent Producers
Multiple threads sending to the same tag exercises the lock-free MPSC enqueue. Collect all messages and verify completeness. Per-producer FIFO ordering is guaranteed (messages from a single thread arrive in send order).
def test_multi_producer():
n_producers = 8
msgs_per = 200
def producer(pid):
for i in range(msgs_per):
send("mp", (pid, i))
threads = [threading.Thread(target=producer, args=(p,))
for p in range(n_producers)]
for t in threads:
t.start()
received = []
for _ in range(n_producers * msgs_per):
_, val = receive("mp", 10)
received.append(val)
for t in threads:
t.join()
# Every (pid, seq) pair delivered exactly once
assert sorted(received) == sorted(
(p, i) for p in range(n_producers) for i in range(msgs_per)
)
Pattern 7 — Tag Capacity Limits
There are exactly 16 queue slots. Exceeding them raises KeyError.
def test_capacity_exceeded():
for i in range(MAX_QUEUES):
send(f"tag{i}", "fill")
with pytest.raises(KeyError, match="tag capacity exceeded"):
send("one_too_many", "boom")
Pattern 8 — Testing set_tags Explicitly
Use set_tags tests to verify pre-assignment, clearing, overflow, and reset
behavior. These tests call set_tags directly inside the test body.
def test_set_tags_clears():
send("old", "stale")
set_tags(["new"])
# Old messages are gone; "new" starts empty.
tag, val = receive("new", 0)
assert tag == TIMEOUT
def test_set_tags_resets_to_auto():
set_tags(["pre"])
send("pre", "msg")
set_tags([]) # back to auto-assign
send("fresh", "works")
tag, val = receive("fresh", 1)
assert val == "works"
def test_set_tags_overflow():
set_tags([f"t{i}" for i in range(MAX_QUEUES)])
with pytest.raises(KeyError, match="tag capacity exceeded"):
send("extra", "x")
def test_set_tags_rejects_too_many():
with pytest.raises(IndexError):
set_tags([f"t{i}" for i in range(MAX_QUEUES + 1)])
Pattern 9 — Payload Round-Trip Fidelity
Messages cross interpreter boundaries via cross-interpreter data (XIData) or
pickle. Use @pytest.mark.parametrize to sweep payload types.
@pytest.mark.parametrize("payload", [
0, -1, 2**31, 3.14, float("inf"),
True, False, None,
"", "hello",
(), (1,), (1, (2, (3,))),
])
def test_roundtrip(payload):
send("rt", payload)
_, val = receive("rt", 1)
assert val == payload
Error Handling Tests
| Error | How to trigger | Expected exception |
|---|---|---|
Non-string tag in send | send(123, "x") | TypeError |
Empty tag list in receive | receive([], 0) | RuntimeError |
| Non-string in tag list | receive([123], 0) | TypeError |
| Tag capacity exceeded | send on 17th unique tag | KeyError |
Too many tags in set_tags | set_tags([...17 items...]) | IndexError |
Non-str in set_tags | set_tags([123]) | TypeError |
Non-sequence in set_tags | set_tags(42) | TypeError |
Common Pitfalls
| Pitfall | Fix |
|---|---|
Calling receive without a timeout | Tests hang forever if no message arrives. Always pass a timeout. |
| Using too many unique tags across tests | Each unique tag consumes a queue slot. The fixture must reset queues between tests via set_tags([]). |
| Forgetting the fixture resets queues | Without the autouse fixture, stale messages or tag assignments leak between tests, causing flaky failures. |
Assuming set_tags is required | It isn't. Tags auto-assign on first use. Only call set_tags when you need to pre-assign or clear queues. |
| Not joining threads before asserting | A thread may still hold the GIL or be mid-send. Always t.join() before final assertions. |
| Sending from more threads than queue capacity | If threads use distinct tags, you'll hit the 16-queue limit. Have concurrent threads share a single tag. |