rockorager/ourio
An asynchronous IO runtime
Ourio (prounounced "oreo", think "Ouroboros") is an asynchronous IO runtime built heavily around the semantics of io_uring. The design is inspired by libxev, which is in turn inspired by TigerBeetle.
Ourio has only a slightly different approach: it is designed to encourage
message passing approach to asynchronous IO. Users of the library give each task
a Context, which contains a pointer, a callback, and a message. The message is
implemented as a u16, and generally you should use an enum for it. The idea is
that you can minimize the number of callback functions required by tagging tasks
with a small amount of semantic meaning in the msg
field.
Ourio has io_uring and kqueue backends. Ourio supports the msg_ring
capability of io_uring to pass a completion from one ring to another. This
allows a multithreaded application to implement message passing using io_uring
(or kqueue, if that's your flavor). Multithreaded applications should plan to
use one Ring
per thread. Submission onto the runtime is not thread safe,
any message passing must occur using msg_ring
rather than directly submitting
a task to another
Ourio also includes a fully mockable IO runtime to make it easy to unit test your async code.
Each IO operation creates a Task
. When scheduling a task on the runtime, the
caller receives a pointer to the Task
at which point they could cancel it, or
set a deadline.
// Timers are always relative time
const task = try rt.timer(.{.sec = 3}, .{.cb = onCompletion, .msg = 0});
// If the deadline expired, the task will be sent to the onCompletion callback
// with a result of error.Canceled. Deadlines are always absolute time
try task.setDeadline(rt, .{.sec = std.time.timestamp() + 3});
// Alternatively, we can hold on to the pointer for the task while it is with
// the runtime and cancel it. The Context we give to the cancel function let's
// us know the result of the cancelation, but we will also receive a message
// from the original task with error.Canceled. We can ignore the cancel result
// by using the default context value
try task.cancel(rt, .{});
Say we accept
a connection in one thread, and want to send the file descriptor
to another for handling.
// Spawn a thread with a queue of 16 entries. When this function returns, the
// the thread is idle and waiting to receive tasks via msgRing
const thread = main_rt.spawnThread(16);
const target_task = try main_rt.getTask();
target_task.* {
.userdata = &foo,
.msg = @intFromEnum(Msg.some_message),
.cb = Worker.onCompletion,
.req = .{ .userfd = fd },
};
// Send target_task from the main_rt thread to the thread Ring. The
// thread_rt Ring will then // process the task as a completion, ie
// Worker.onCompletion will be called with this task. That thread can then
// schedule a recv, a write, etc on the file descriptor it just received. Or do
// arbitrary work
_ = try main_rt.msgRing(&thread.ring, target_task, .{});
You can have multiple Rings in a single thread. One could be a priority
Ring, or handle specific types of tasks, etc. Poll any Ring
from any other
Ring
.
const fd = rt1.backend.pollableFd();
_ = try rt2.poll(fd, .{
.cb = onCompletion,
.msg = @intFromEnum(Msg.rt1_has_completions)}
);
An example implementation of an asynchronous writer to two file descriptors:
const std = @import("std");
const io = @import("ourio");
const posix = std.posix;
pub const MultiWriter = struct {
fd1: posix.fd_t,
fd1_written: usize = 0,
fd2: posix.fd_t,
fd2_written: usize = 0,
buf: std.ArrayListUnmanaged(u8),
pub const Msg = enum { fd1, fd2 };
pub fn init(fd1: posix.fd_t, fd2: posix.fd_t) MultiWriter {
return .{ .fd1 = fd1, .fd2 = fd2 };
}
pub fn write(self: *MultiWriter, gpa: Allocator, bytes: []const u8) !void {
try self.buf.appendSlice(gpa, bytes);
}
pub fn flush(self: *MultiWriter, rt: *io.Ring) !void {
if (self.fd1_written < self.buf.items.len) {
_ = try rt.write(self.fd1, self.buf.items[self.fd1_written..], .{
.ptr = self,
.msg = @intFromEnum(Msg.fd1),
.cb = MultiWriter.onCompletion,
});
}
if (self.fd2_written < self.buf.items.len) {
_ = try rt.write(self.fd2, self.buf.items[self.fd2_written..], .{
.ptr = self,
.msg = @intFromEnum(Msg.fd2),
.cb = MultiWriter.onCompletion,
});
}
}
pub fn onCompletion(rt: *io.Ring, task: io.Task) anyerror!void {
const self = task.userdataCast(MultiWriter);
const result = task.result.?;
const n = try result.write;
switch (task.msgToEnum(MultiWriter.Msg)) {
.fd1 => self.fd1_written += n,
.fd2 => self.fd2_written += n,
}
const len = self.buf.items.len;
if (self.fd1_written < len or self.fd2_written < len)
return self.flush(rt);
self.fd1_written = 0;
self.fd2_written = 0;
self.buf.clearRetainingCapacity();
}
};
pub fn main() !void {
var gpa: std.heap.DebugAllocator(.{}) = .init;
var rt: io.Ring = try .init(gpa.allocator(), 16);
defer rt.deinit();
// Pretend I created some files
const fd1: posix.fd_t = 5;
const fd2: posix.fd_t = 6;
var mw: MultiWriter = .init(fd1, fd2);
try mw.write(gpa.allocator(), "Hello, world!");
try mw.flush(&rt);
try rt.run(.until_done);
}