Files
zterm/src/queue.zig
Yves Biener cba07b119c
Some checks failed
Zig Project Action / Lint, Spell-check and test zig project (push) Failing after 55s
chor: upgrade to latest zig; remove zg dependency
2025-09-29 23:09:42 +02:00

341 lines
10 KiB
Zig

// taken from https://github.com/rockorager/libvaxis/blob/main/src/queue.zig (MIT-License)
// with slight modifications
/// Queue implementation. Thread safe. Fixed size. Blocking push and pop. Polling through tryPop and tryPush.
pub fn Queue(comptime T: type, comptime size: usize) type {
return struct {
buf: [size]T = undefined,
read_index: usize = 0,
write_index: usize = 0,
mutex: std.Thread.Mutex = .{},
// blocks when the buffer is full
not_full: std.Thread.Condition = .{},
// ...or empty
not_empty: std.Thread.Condition = .{},
const QueueType = @This();
/// Pop an item from the queue. Blocks until an item is available.
pub fn pop(this: *QueueType) T {
this.mutex.lock();
defer this.mutex.unlock();
while (this.isEmptyLH()) {
this.not_empty.wait(&this.mutex);
}
assert(!this.isEmptyLH());
if (this.isFullLH()) {
// If we are full, wake up a push that might be
// waiting here.
this.not_full.signal();
}
const result = this.buf[this.mask(this.read_index)];
this.read_index = this.mask2(this.read_index + 1);
return result;
}
/// Push an item into the queue. Blocks until an item has been
/// put in the queue.
pub fn push(this: *QueueType, item: T) void {
this.mutex.lock();
defer this.mutex.unlock();
while (this.isFullLH()) {
this.not_full.wait(&this.mutex);
}
if (this.isEmptyLH()) {
// If we were empty, wake up a pop if it was waiting.
this.not_empty.signal();
}
assert(!this.isFullLH());
this.buf[this.mask(this.write_index)] = item;
this.write_index = this.mask2(this.write_index + 1);
}
/// Push an item into the queue. Returns true when the item
/// was successfully placed in the queue, false if the queue
/// was full.
pub fn tryPush(this: *QueueType, item: T) bool {
this.mutex.lock();
if (this.isFullLH()) {
this.mutex.unlock();
return false;
}
this.mutex.unlock();
this.push(item);
return true;
}
/// Pop an item from the queue. Returns null when no item is
/// available.
pub fn tryPop(this: *QueueType) ?T {
this.mutex.lock();
if (this.isEmptyLH()) {
this.mutex.unlock();
return null;
}
this.mutex.unlock();
return this.pop();
}
/// Poll the queue. This call blocks until events are in the queue
pub fn poll(this: *QueueType) void {
this.mutex.lock();
defer this.mutex.unlock();
while (this.isEmptyLH()) {
this.not_empty.wait(&this.mutex);
}
assert(!this.isEmptyLH());
}
pub fn lock(this: *QueueType) void {
this.mutex.lock();
}
pub fn unlock(this: *QueueType) void {
this.mutex.unlock();
}
/// Used to efficiently drain the queue
pub fn drain(this: *QueueType) ?T {
if (this.isEmptyLH()) return null;
const result = this.buf[this.mask(this.read_index)];
this.read_index = this.mask2(this.read_index + 1);
return result;
}
fn isEmptyLH(this: QueueType) bool {
return this.write_index == this.read_index;
}
fn isFullLH(this: QueueType) bool {
return this.mask2(this.write_index + this.buf.len) ==
this.read_index;
}
/// Returns `true` if the queue is empty and `false` otherwise.
pub fn isEmpty(this: *QueueType) bool {
this.mutex.lock();
defer this.mutex.unlock();
return this.isEmptyLH();
}
/// Returns `true` if the queue is full and `false` otherwise.
pub fn isFull(this: *QueueType) bool {
this.mutex.lock();
defer this.mutex.unlock();
return this.isFullLH();
}
/// Returns the length
pub fn len(this: QueueType) usize {
const wrap_offset = 2 * this.buf.len *
@intFromBool(this.write_index < this.read_index);
const adjusted_write_index = this.write_index + wrap_offset;
return adjusted_write_index - this.read_index;
}
/// Returns `index` modulo the length of the backing slice.
fn mask(this: QueueType, index: usize) usize {
return index % this.buf.len;
}
/// Returns `index` modulo twice the length of the backing slice.
fn mask2(this: QueueType, index: usize) usize {
return index % (2 * this.buf.len);
}
};
}
const std = @import("std");
const testing = std.testing;
const assert = std.debug.assert;
const Thread = std.Thread;
const cfg = Thread.SpawnConfig{ .allocator = testing.allocator };
test "Queue: simple push / pop" {
var queue: Queue(u8, 16) = .{};
queue.push(1);
queue.push(2);
const pop = queue.pop();
try testing.expectEqual(1, pop);
try testing.expectEqual(2, queue.pop());
}
fn testPushPop(q: *Queue(u8, 2)) !void {
q.push(3);
try testing.expectEqual(2, q.pop());
}
test "Fill, wait to push, pop once in another thread" {
var queue: Queue(u8, 2) = .{};
queue.push(1);
queue.push(2);
const t = try Thread.spawn(cfg, testPushPop, .{&queue});
try testing.expectEqual(false, queue.tryPush(3));
try testing.expectEqual(1, queue.pop());
t.join();
try testing.expectEqual(3, queue.pop());
try testing.expectEqual(null, queue.tryPop());
}
fn testPush(q: *Queue(u8, 2)) void {
q.push(0);
q.push(1);
q.push(2);
q.push(3);
q.push(4);
}
test "Try to pop, fill from another thread" {
var queue: Queue(u8, 2) = .{};
const thread = try Thread.spawn(cfg, testPush, .{&queue});
for (0..5) |idx| {
try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop());
}
thread.join();
}
fn sleepyPop(q: *Queue(u8, 2)) !void {
// First we wait for the queue to be full.
while (!q.isFull())
try Thread.yield();
// Then we spuriously wake it up, because that's a thing that can
// happen.
q.not_full.signal();
q.not_empty.signal();
// Then give the other thread a good chance of waking up. It's not
// clear that yield guarantees the other thread will be scheduled,
// so we'll throw a sleep in here just to be sure. The queue is
// still full and the push in the other thread is still blocked
// waiting for space.
try Thread.yield();
std.Thread.sleep(std.time.ns_per_s);
// Finally, let that other thread go.
try testing.expectEqual(1, q.pop());
// This won't continue until the other thread has had a chance to
// put at least one item in the queue.
while (!q.isFull())
try Thread.yield();
// But we want to ensure that there's a second push waiting, so
// here's another sleep.
std.Thread.sleep(std.time.ns_per_s / 2);
// Another spurious wake...
q.not_full.signal();
q.not_empty.signal();
// And another chance for the other thread to see that it's
// spurious and go back to sleep.
try Thread.yield();
std.Thread.sleep(std.time.ns_per_s / 2);
// Pop that thing and we're done.
try testing.expectEqual(2, q.pop());
}
test "Fill, block, fill, block" {
// Fill the queue, block while trying to write another item, have
// a background thread unblock us, then block while trying to
// write yet another thing. Have the background thread unblock
// that too (after some time) then drain the queue. This test
// fails if the while loop in `push` is turned into an `if`.
var queue: Queue(u8, 2) = .{};
const thread = try Thread.spawn(cfg, sleepyPop, .{&queue});
queue.push(1);
queue.push(2);
const now = std.time.milliTimestamp();
queue.push(3); // This one should block.
const then = std.time.milliTimestamp();
// Just to make sure the sleeps are yielding to this thread, make
// sure it took at least 900ms to do the push.
try testing.expect(then - now > 900);
// This should block again, waiting for the other thread.
queue.push(4);
// And once that push has gone through, the other thread's done.
thread.join();
try testing.expectEqual(3, queue.pop());
try testing.expectEqual(4, queue.pop());
}
fn sleepyPush(q: *Queue(u8, 1)) !void {
// Try to ensure the other thread has already started trying to pop.
try Thread.yield();
std.Thread.sleep(std.time.ns_per_s / 2);
// Spurious wake
q.not_full.signal();
q.not_empty.signal();
try Thread.yield();
std.Thread.sleep(std.time.ns_per_s / 2);
// Stick something in the queue so it can be popped.
q.push(1);
// Ensure it's been popped.
while (!q.isEmpty())
try Thread.yield();
// Give the other thread time to block again.
try Thread.yield();
std.Thread.sleep(std.time.ns_per_s / 2);
// Spurious wake
q.not_full.signal();
q.not_empty.signal();
q.push(2);
}
test "Drain, block, drain, block" {
// This is like fill/block/fill/block, but on the pop end. This
// test should fail if the `while` loop in `pop` is turned into an
// `if`.
var queue: Queue(u8, 1) = .{};
const thread = try Thread.spawn(cfg, sleepyPush, .{&queue});
try testing.expectEqual(1, queue.pop());
try testing.expectEqual(2, queue.pop());
thread.join();
}
fn readerThread(q: *Queue(u8, 1)) !void {
try testing.expectEqual(1, q.pop());
}
test "2 readers" {
// 2 threads read, one thread writes
var queue: Queue(u8, 1) = .{};
const t1 = try Thread.spawn(cfg, readerThread, .{&queue});
const t2 = try Thread.spawn(cfg, readerThread, .{&queue});
try Thread.yield();
std.Thread.sleep(std.time.ns_per_s / 2);
queue.push(1);
queue.push(1);
t1.join();
t2.join();
}
fn writerThread(q: *Queue(u8, 1)) !void {
q.push(1);
}
test "2 writers" {
var queue: Queue(u8, 1) = .{};
const t1 = try Thread.spawn(cfg, writerThread, .{&queue});
const t2 = try Thread.spawn(cfg, writerThread, .{&queue});
try testing.expectEqual(1, queue.pop());
try testing.expectEqual(1, queue.pop());
t1.join();
t2.join();
}