NAME Data::Queue::Shared - High-performance shared-memory MPMC queues for Linux SYNOPSIS use Data::Queue::Shared; # Integer queue (lock-free Vyukov MPMC) my $q = Data::Queue::Shared::Int->new('/tmp/myq.shm', 1024); # Anonymous queue (fork-inherited, no filesystem) my $q = Data::Queue::Shared::Int->new(undef, 1024); # memfd-backed queue (shareable via fd passing) my $q = Data::Queue::Shared::Str->new_memfd("my_queue", 1024); my $fd = $q->memfd; # pass via SCM_RIGHTS or fork my $q2 = Data::Queue::Shared::Str->new_from_fd($fd); $q->push(42); my $val = $q->pop; # non-blocking, undef if empty # Blocking pop (waits for data) my $val = $q->pop_wait; # infinite wait my $val = $q->pop_wait(1.5); # 1.5 second timeout # Batch operations my $pushed = $q->push_multi(1, 2, 3, 4, 5); my @vals = $q->pop_multi(10); # pop up to 10 # String queue (mutex-protected, circular arena) my $sq = Data::Queue::Shared::Str->new('/tmp/strq.shm', 1024); $sq->push("hello world"); my $msg = $sq->pop; # With explicit arena size (default: capacity * 256) my $sq = Data::Queue::Shared::Str->new('/tmp/strq.shm', 1024, 1048576); # Multiprocess if (fork() == 0) { my $child = Data::Queue::Shared::Int->new('/tmp/myq.shm', 1024); $child->push(99); exit; } wait; print $q->pop; # 99 DESCRIPTION Data::Queue::Shared provides bounded MPMC (multi-producer, multi-consumer) queues stored in file-backed shared memory (mmap(MAP_SHARED)), enabling efficient multiprocess data sharing on Linux. Linux-only. Requires 64-bit Perl. Variants Data::Queue::Shared::Int - int64 values, lock-free (16 bytes/slot) Uses the Vyukov bounded MPMC algorithm. Push and pop are lock-free (CAS-based). Optimal for integer job IDs, counters, indices. Data::Queue::Shared::Int32 - int32 values, lock-free (8 bytes/slot) Data::Queue::Shared::Int16 - int16 values, lock-free (8 bytes/slot) Compact variants with 32-bit Vyukov sequence numbers. Half the memory footprint per slot = double the cache density. Same lock-free algorithm. Same API as Int. Values outside the type range are silently truncated (standard C cast semantics). Data::Queue::Shared::Str - byte string values, mutex-protected Uses a futex-based mutex with a circular arena for variable-length string storage. Supports UTF-8 flag preservation. Optimal for messages, serialized data, filenames. Features * File-backed mmap for cross-process sharing * Lock-free MPMC for integer queues (Vyukov algorithm) * Futex-based blocking wait with timeout (no busy-spin) * PID-based stale lock recovery (dead process detection) * Batch push/pop operations * Circular arena for zero-fragmentation string storage * Optional keyword API via XS::Parse::Keyword (zero method-dispatch overhead) Constructor # Int queue my $q = Data::Queue::Shared::Int->new($path, $capacity); # Str queue my $q = Data::Queue::Shared::Str->new($path, $capacity); my $q = Data::Queue::Shared::Str->new($path, $capacity, $arena_bytes); Creates or opens a shared queue backed by file $path. $capacity is rounded up to the next power of 2. When opening an existing file, parameters are read from the stored header. Multiple processes can open the same file simultaneously. Pass "undef" for $path to create an anonymous queue using "MAP_SHARED|MAP_ANONYMOUS". Anonymous queues are shared with child processes via fork() but cannot be opened by unrelated processes. memfd Constructor my $q = Data::Queue::Shared::Int->new_memfd($name, $capacity); my $q = Data::Queue::Shared::Str->new_memfd($name, $capacity); my $q = Data::Queue::Shared::Str->new_memfd($name, $cap, $arena); Creates a queue backed by memfd_create(2). No filesystem path — the backing memory is identified by a file descriptor. Use memfd() to retrieve the fd and pass it to other processes via "SCM_RIGHTS" (Unix domain socket fd passing) or fork() inheritance. my $q2 = Data::Queue::Shared::Int->new_from_fd($fd); my $q2 = Data::Queue::Shared::Str->new_from_fd($fd); Opens a queue from a received memfd. The fd is dup'd internally. my $fd = $q->memfd; # backing fd (-1 if file-backed/anonymous) For Str queues, $arena_bytes sets the string storage arena size (default: "$capacity * 256", minimum 4096, maximum 4GB). Strings are stored in a circular arena; total stored string bytes cannot exceed the arena capacity. Individual strings are limited to ~2GB. API Core operations my $ok = $q->push($value); # non-blocking, false if full my $val = $q->pop; # non-blocking, undef if empty my $ok = $q->push_wait($value); # blocking, infinite wait my $ok = $q->push_wait($value, $secs); # blocking with timeout my $val = $q->pop_wait; # blocking, infinite wait my $val = $q->pop_wait($secs); # blocking with timeout my $val = $q->peek; # read front without consuming "peek" returns the front element without removing it ("undef" if empty). For Int, this is a best-effort snapshot (racy in concurrent MPMC). For Str, this is exact (mutex-protected). Deque operations (Str only) my $ok = $q->push_front($value); # non-blocking push to front my $ok = $q->push_front_wait($value); # blocking push to front my $ok = $q->push_front_wait($val, $secs); # with timeout my $val = $q->pop_back; # non-blocking pop from back my $val = $q->pop_back_wait; # blocking pop from back my $val = $q->pop_back_wait($timeout); # with timeout "push_front" inserts at the head — useful for requeueing failed jobs. "pop_back" removes from the tail — useful for work-stealing or undo. Not available for Int (Vyukov algorithm is strictly FIFO). Batch operations my $n = $q->push_multi(@values); # non-blocking, returns pushed count my @v = $q->pop_multi($count); # non-blocking, pop up to $count my $n = $q->push_wait_multi($timeout, @values); # blocking batch push my @v = $q->pop_wait_multi($n, $timeout); # block for >=1, grab up to $n my @v = $q->drain; # pop all elements my @v = $q->drain($max); # pop up to $max elements "pop_wait_multi" blocks until at least one element is available (or timeout), then grabs up to $n elements non-blocking. Returns empty list on timeout. "push_wait_multi" pushes all values, blocking if the queue is full. $timeout is seconds (-1 = infinite, 0 = try once). Status my $n = $q->size; # approximate for Int (lock-free), exact for Str my $cap = $q->capacity; # max elements my $ok = $q->is_empty; my $ok = $q->is_full; Management $q->clear; # remove all elements $q->sync; # msync — flush to disk for crash durability $q->unlink; # remove backing file Class->unlink($path); # class method form my $p = $q->path; # backing file path my $s = $q->stats; # diagnostic hashref Stats keys: "size", "capacity", "mmap_size", "push_ok", "pop_ok", "push_full", "pop_empty", "recoveries", "push_waiters", "pop_waiters". Str queues additionally include "arena_cap" and "arena_used". All counters are approximate under concurrent access (diagnostic only). "push_waiters"/"pop_waiters" show currently blocked producers/consumers. Event Loop Integration (eventfd) my $fd = $q->eventfd; # create eventfd, returns fd number $q->eventfd_set($fd); # use an existing fd (e.g. inherited via fork) my $fd = $q->fileno; # current eventfd (-1 if none) $q->notify; # signal eventfd (call after push) $q->eventfd_consume; # drain notification counter Notification is opt-in: "push" does not write to the eventfd automatically. Call "notify" explicitly after pushing. This gives full control over batching (push N items, notify once) and avoids any overhead when eventfd is not used. use EV; my $q = Data::Queue::Shared::Str->new($path, 1024); my $fd = $q->eventfd; my $w = EV::io $fd, EV::READ, sub { $q->eventfd_consume; while (defined(my $item = $q->pop)) { process($item); } }; # Producer side: $q->push($item); $q->notify; # wake the EV watcher EV::run; For cross-process notification, create the eventfd before fork(). Child processes inherit the fd and should call eventfd_set($fd) on their queue handle. Writes from any process sharing the fd will wake all event-loop watchers. Crash Safety If a process dies while holding the Str queue mutex, other processes detect the stale lock within 2 seconds via PID tracking and automatically recover. The Int queue is lock-free and requires no crash recovery for normal push/pop operations. Keyword API When XS::Parse::Keyword is installed at build time, keyword forms are available that bypass method dispatch: use Data::Queue::Shared::Int; # activates q_int_* keywords q_int_push $q, $value; my $val = q_int_pop $q; my $val = q_int_peek $q; my $n = q_int_size $q; Replace "int" with "int32", "int16", or "str" for other variants. Keywords are lexically scoped and require "use" (not "require"). BENCHMARKS Throughput versus other Perl queue/IPC modules, 200K items, single process and cross-process, Linux x86_64. Run "perl -Mblib bench/vs.pl 200000" to reproduce. SINGLE-PROCESS INTEGER PUSH+POP (interleaved) Rate Data::Queue::Shared::Int 5.0M/s MCE::Queue 1.8M/s POSIX::RT::MQ 806K/s IPC::Msg (SysV) 802K/s IPC::Transit 58K/s Forks::Queue (Shmem) 11K/s SINGLE-PROCESS STRING PUSH+POP (~50B, interleaved) Rate Data::Queue::Shared::Str 2.6M/s MCE::Queue 1.5M/s POSIX::RT::MQ 990K/s IPC::Msg (SysV) 857K/s Forks::Queue (Shmem) 11K/s BATCH PUSH+POP (100 per batch, integers) Rate Shared::Int push_multi 14.9M/s MCE::Queue 4.5M/s CROSS-PROCESS (1 producer + 1 consumer, integers) Rate Shared::Int 6.0M/s MCE::Queue 4.1M/s POSIX::RT::MQ 1.2M/s IPC::Msg (SysV) 956K/s Forks::Queue (Shmem) 4K/s Key takeaways: * 2.8x faster than MCE::Queue for single-process integer ops * 1.5x faster than MCE::Queue for cross-process integers * 6x faster than kernel IPC (POSIX mq / SysV msgq) * 3.3x faster batch ops (single mutex hold vs per-item) * True concurrent MPMC (MCE::Queue is workers-to-manager only) AUTHOR vividsnow LICENSE This is free software; you can redistribute it and/or modify it under the same terms as Perl itself.