The other examples did rendering based on events, which this renderer does not. This makes these applications potentially not that efficient, but allows for consistent frame times that make animations, etc. possible. This example serves to show that you can use `zterm` for both types of render scheduling and even change between them without much efford.
341 lines
10 KiB
Zig
341 lines
10 KiB
Zig
// taken from https://github.com/rockorager/libvaxis/blob/main/src/queue.zig (MIT-License)
|
|
// with slight modifications
|
|
|
|
/// Queue implementation. Thread safe. Fixed size. Blocking push and pop. Polling through tryPop and tryPush.
|
|
pub fn Queue(comptime T: type, comptime size: usize) type {
|
|
return struct {
|
|
buf: [size]T = undefined,
|
|
|
|
read_index: usize = 0,
|
|
write_index: usize = 0,
|
|
|
|
mutex: std.Thread.Mutex = .{},
|
|
// blocks when the buffer is full
|
|
not_full: std.Thread.Condition = .{},
|
|
// ...or empty
|
|
not_empty: std.Thread.Condition = .{},
|
|
|
|
const QueueType = @This();
|
|
|
|
/// Pop an item from the queue. Blocks until an item is available.
|
|
pub fn pop(this: *QueueType) T {
|
|
this.mutex.lock();
|
|
defer this.mutex.unlock();
|
|
while (this.isEmptyLH()) {
|
|
this.not_empty.wait(&this.mutex);
|
|
}
|
|
assert(!this.isEmptyLH());
|
|
if (this.isFullLH()) {
|
|
// If we are full, wake up a push that might be
|
|
// waiting here.
|
|
this.not_full.signal();
|
|
}
|
|
|
|
const result = this.buf[this.mask(this.read_index)];
|
|
this.read_index = this.mask2(this.read_index + 1);
|
|
return result;
|
|
}
|
|
|
|
/// Push an item into the queue. Blocks until an item has been
|
|
/// put in the queue.
|
|
pub fn push(this: *QueueType, item: T) void {
|
|
this.mutex.lock();
|
|
defer this.mutex.unlock();
|
|
while (this.isFullLH()) {
|
|
this.not_full.wait(&this.mutex);
|
|
}
|
|
if (this.isEmptyLH()) {
|
|
// If we were empty, wake up a pop if it was waiting.
|
|
this.not_empty.signal();
|
|
}
|
|
assert(!this.isFullLH());
|
|
|
|
this.buf[this.mask(this.write_index)] = item;
|
|
this.write_index = this.mask2(this.write_index + 1);
|
|
}
|
|
|
|
/// Push an item into the queue. Returns true when the item
|
|
/// was successfully placed in the queue, false if the queue
|
|
/// was full.
|
|
pub fn tryPush(this: *QueueType, item: T) bool {
|
|
this.mutex.lock();
|
|
if (this.isFullLH()) {
|
|
this.mutex.unlock();
|
|
return false;
|
|
}
|
|
this.mutex.unlock();
|
|
this.push(item);
|
|
return true;
|
|
}
|
|
|
|
/// Pop an item from the queue. Returns null when no item is
|
|
/// available.
|
|
pub fn tryPop(this: *QueueType) ?T {
|
|
this.mutex.lock();
|
|
if (this.isEmptyLH()) {
|
|
this.mutex.unlock();
|
|
return null;
|
|
}
|
|
this.mutex.unlock();
|
|
return this.pop();
|
|
}
|
|
|
|
/// Poll the queue. This call blocks until events are in the queue
|
|
pub fn poll(this: *QueueType) void {
|
|
this.mutex.lock();
|
|
defer this.mutex.unlock();
|
|
while (this.isEmptyLH()) {
|
|
this.not_empty.wait(&this.mutex);
|
|
}
|
|
assert(!this.isEmptyLH());
|
|
}
|
|
|
|
pub fn lock(this: *QueueType) void {
|
|
this.mutex.lock();
|
|
}
|
|
|
|
pub fn unlock(this: *QueueType) void {
|
|
this.mutex.unlock();
|
|
}
|
|
|
|
/// Used to efficiently drain the queue
|
|
pub fn drain(this: *QueueType) ?T {
|
|
if (this.isEmptyLH()) return null;
|
|
|
|
const result = this.buf[this.mask(this.read_index)];
|
|
this.read_index = this.mask2(this.read_index + 1);
|
|
return result;
|
|
}
|
|
|
|
fn isEmptyLH(this: QueueType) bool {
|
|
return this.write_index == this.read_index;
|
|
}
|
|
|
|
fn isFullLH(this: QueueType) bool {
|
|
return this.mask2(this.write_index + this.buf.len) ==
|
|
this.read_index;
|
|
}
|
|
|
|
/// Returns `true` if the queue is empty and `false` otherwise.
|
|
pub fn isEmpty(this: *QueueType) bool {
|
|
this.mutex.lock();
|
|
defer this.mutex.unlock();
|
|
return this.isEmptyLH();
|
|
}
|
|
|
|
/// Returns `true` if the queue is full and `false` otherwise.
|
|
pub fn isFull(this: *QueueType) bool {
|
|
this.mutex.lock();
|
|
defer this.mutex.unlock();
|
|
return this.isFullLH();
|
|
}
|
|
|
|
/// Returns the length
|
|
pub fn len(this: QueueType) usize {
|
|
const wrap_offset = 2 * this.buf.len *
|
|
@intFromBool(this.write_index < this.read_index);
|
|
const adjusted_write_index = this.write_index + wrap_offset;
|
|
return adjusted_write_index - this.read_index;
|
|
}
|
|
|
|
/// Returns `index` modulo the length of the backing slice.
|
|
fn mask(this: QueueType, index: usize) usize {
|
|
return index % this.buf.len;
|
|
}
|
|
|
|
/// Returns `index` modulo twice the length of the backing slice.
|
|
fn mask2(this: QueueType, index: usize) usize {
|
|
return index % (2 * this.buf.len);
|
|
}
|
|
};
|
|
}
|
|
|
|
const std = @import("std");
|
|
const testing = std.testing;
|
|
const assert = std.debug.assert;
|
|
const Thread = std.Thread;
|
|
const cfg = Thread.SpawnConfig{ .allocator = testing.allocator };
|
|
|
|
test "Queue: simple push / pop" {
|
|
var queue: Queue(u8, 16) = .{};
|
|
queue.push(1);
|
|
queue.push(2);
|
|
const pop = queue.pop();
|
|
try testing.expectEqual(1, pop);
|
|
try testing.expectEqual(2, queue.pop());
|
|
}
|
|
|
|
fn testPushPop(q: *Queue(u8, 2)) !void {
|
|
q.push(3);
|
|
try testing.expectEqual(2, q.pop());
|
|
}
|
|
|
|
test "Fill, wait to push, pop once in another thread" {
|
|
var queue: Queue(u8, 2) = .{};
|
|
queue.push(1);
|
|
queue.push(2);
|
|
const t = try Thread.spawn(cfg, testPushPop, .{&queue});
|
|
try testing.expectEqual(false, queue.tryPush(3));
|
|
try testing.expectEqual(1, queue.pop());
|
|
t.join();
|
|
try testing.expectEqual(3, queue.pop());
|
|
try testing.expectEqual(null, queue.tryPop());
|
|
}
|
|
|
|
fn testPush(q: *Queue(u8, 2)) void {
|
|
q.push(0);
|
|
q.push(1);
|
|
q.push(2);
|
|
q.push(3);
|
|
q.push(4);
|
|
}
|
|
|
|
test "Try to pop, fill from another thread" {
|
|
var queue: Queue(u8, 2) = .{};
|
|
const thread = try Thread.spawn(cfg, testPush, .{&queue});
|
|
for (0..5) |idx| {
|
|
try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop());
|
|
}
|
|
thread.join();
|
|
}
|
|
|
|
fn sleepyPop(q: *Queue(u8, 2)) !void {
|
|
// First we wait for the queue to be full.
|
|
while (!q.isFull())
|
|
try Thread.yield();
|
|
|
|
// Then we spuriously wake it up, because that's a thing that can
|
|
// happen.
|
|
q.not_full.signal();
|
|
q.not_empty.signal();
|
|
|
|
// Then give the other thread a good chance of waking up. It's not
|
|
// clear that yield guarantees the other thread will be scheduled,
|
|
// so we'll throw a sleep in here just to be sure. The queue is
|
|
// still full and the push in the other thread is still blocked
|
|
// waiting for space.
|
|
try Thread.yield();
|
|
std.time.sleep(std.time.ns_per_s);
|
|
// Finally, let that other thread go.
|
|
try testing.expectEqual(1, q.pop());
|
|
|
|
// This won't continue until the other thread has had a chance to
|
|
// put at least one item in the queue.
|
|
while (!q.isFull())
|
|
try Thread.yield();
|
|
// But we want to ensure that there's a second push waiting, so
|
|
// here's another sleep.
|
|
std.time.sleep(std.time.ns_per_s / 2);
|
|
|
|
// Another spurious wake...
|
|
q.not_full.signal();
|
|
q.not_empty.signal();
|
|
// And another chance for the other thread to see that it's
|
|
// spurious and go back to sleep.
|
|
try Thread.yield();
|
|
std.time.sleep(std.time.ns_per_s / 2);
|
|
|
|
// Pop that thing and we're done.
|
|
try testing.expectEqual(2, q.pop());
|
|
}
|
|
|
|
test "Fill, block, fill, block" {
|
|
// Fill the queue, block while trying to write another item, have
|
|
// a background thread unblock us, then block while trying to
|
|
// write yet another thing. Have the background thread unblock
|
|
// that too (after some time) then drain the queue. This test
|
|
// fails if the while loop in `push` is turned into an `if`.
|
|
|
|
var queue: Queue(u8, 2) = .{};
|
|
const thread = try Thread.spawn(cfg, sleepyPop, .{&queue});
|
|
queue.push(1);
|
|
queue.push(2);
|
|
const now = std.time.milliTimestamp();
|
|
queue.push(3); // This one should block.
|
|
const then = std.time.milliTimestamp();
|
|
|
|
// Just to make sure the sleeps are yielding to this thread, make
|
|
// sure it took at least 900ms to do the push.
|
|
try testing.expect(then - now > 900);
|
|
|
|
// This should block again, waiting for the other thread.
|
|
queue.push(4);
|
|
|
|
// And once that push has gone through, the other thread's done.
|
|
thread.join();
|
|
try testing.expectEqual(3, queue.pop());
|
|
try testing.expectEqual(4, queue.pop());
|
|
}
|
|
|
|
fn sleepyPush(q: *Queue(u8, 1)) !void {
|
|
// Try to ensure the other thread has already started trying to pop.
|
|
try Thread.yield();
|
|
std.time.sleep(std.time.ns_per_s / 2);
|
|
|
|
// Spurious wake
|
|
q.not_full.signal();
|
|
q.not_empty.signal();
|
|
|
|
try Thread.yield();
|
|
std.time.sleep(std.time.ns_per_s / 2);
|
|
|
|
// Stick something in the queue so it can be popped.
|
|
q.push(1);
|
|
// Ensure it's been popped.
|
|
while (!q.isEmpty())
|
|
try Thread.yield();
|
|
// Give the other thread time to block again.
|
|
try Thread.yield();
|
|
std.time.sleep(std.time.ns_per_s / 2);
|
|
|
|
// Spurious wake
|
|
q.not_full.signal();
|
|
q.not_empty.signal();
|
|
|
|
q.push(2);
|
|
}
|
|
|
|
test "Drain, block, drain, block" {
|
|
// This is like fill/block/fill/block, but on the pop end. This
|
|
// test should fail if the `while` loop in `pop` is turned into an
|
|
// `if`.
|
|
|
|
var queue: Queue(u8, 1) = .{};
|
|
const thread = try Thread.spawn(cfg, sleepyPush, .{&queue});
|
|
try testing.expectEqual(1, queue.pop());
|
|
try testing.expectEqual(2, queue.pop());
|
|
thread.join();
|
|
}
|
|
|
|
fn readerThread(q: *Queue(u8, 1)) !void {
|
|
try testing.expectEqual(1, q.pop());
|
|
}
|
|
|
|
test "2 readers" {
|
|
// 2 threads read, one thread writes
|
|
var queue: Queue(u8, 1) = .{};
|
|
const t1 = try Thread.spawn(cfg, readerThread, .{&queue});
|
|
const t2 = try Thread.spawn(cfg, readerThread, .{&queue});
|
|
try Thread.yield();
|
|
std.time.sleep(std.time.ns_per_s / 2);
|
|
queue.push(1);
|
|
queue.push(1);
|
|
t1.join();
|
|
t2.join();
|
|
}
|
|
|
|
fn writerThread(q: *Queue(u8, 1)) !void {
|
|
q.push(1);
|
|
}
|
|
|
|
test "2 writers" {
|
|
var queue: Queue(u8, 1) = .{};
|
|
const t1 = try Thread.spawn(cfg, writerThread, .{&queue});
|
|
const t2 = try Thread.spawn(cfg, writerThread, .{&queue});
|
|
|
|
try testing.expectEqual(1, queue.pop());
|
|
try testing.expectEqual(1, queue.pop());
|
|
t1.join();
|
|
t2.join();
|
|
}
|