repos / zmx

session persistence for terminal processes
git clone https://github.com/neurosnap/zmx.git

zmx / src
Eric Bower  ·  2026-04-26

ipc.zig

  1const std = @import("std");
  2const posix = std.posix;
  3const cross = @import("cross.zig");
  4const socket = @import("socket.zig");
  5
  6pub const Tag = enum(u8) {
  7    Input = 0,
  8    Output = 1,
  9    Resize = 2,
 10    Detach = 3,
 11    DetachAll = 4,
 12    Kill = 5,
 13    Info = 6,
 14    Init = 7,
 15    History = 8,
 16    Run = 9,
 17    Ack = 10,
 18    Switch = 11,
 19    Write = 12,
 20    TaskComplete = 13,
 21    // Non-exhaustive: this enum comes off the wire via bytesToValue and
 22    // @enumFromInt, so out-of-range values (14-255) are representable
 23    // rather than UB. Switches must handle `_` (unknown tag).
 24    _,
 25};
 26
 27comptime {
 28    if (@typeInfo(Tag).@"enum".is_exhaustive) @compileError(
 29        "ipc.Tag must stay non-exhaustive — old daemons rely on `_` to ignore unknown tags",
 30    );
 31}
 32
 33pub const Header = packed struct {
 34    tag: Tag,
 35    len: u32,
 36};
 37
 38pub const Resize = packed struct {
 39    rows: u16,
 40    cols: u16,
 41};
 42
 43pub fn getTerminalSize(fd: i32) Resize {
 44    var ws: cross.c.struct_winsize = undefined;
 45    if (cross.c.ioctl(fd, cross.c.TIOCGWINSZ, &ws) == 0 and ws.ws_row > 0 and ws.ws_col > 0) {
 46        return .{ .rows = ws.ws_row, .cols = ws.ws_col };
 47    }
 48    return .{ .rows = 24, .cols = 80 };
 49}
 50
 51pub const MAX_CMD_LEN = 256;
 52pub const MAX_CWD_LEN = 256;
 53
 54/// Frozen wire shape. Do NOT add fields — new stats go in new `Tag` values
 55/// so old daemons (whose `_` arm ignores unknown tags) stay reachable.
 56/// Changing `@sizeOf(Info)` breaks `zmx list` against running daemons.
 57pub const Info = extern struct {
 58    clients_len: u64,
 59    pid: i32,
 60    cmd_len: u16,
 61    cwd_len: u16,
 62    cmd: [MAX_CMD_LEN]u8,
 63    cwd: [MAX_CWD_LEN]u8,
 64    created_at: u64,
 65    task_ended_at: u64,
 66    task_exit_code: u8,
 67};
 68
 69pub fn expectedLength(data: []const u8) ?usize {
 70    if (data.len < @sizeOf(Header)) return null;
 71    const header = std.mem.bytesToValue(Header, data[0..@sizeOf(Header)]);
 72    // header.len comes off the wire; widen to usize before adding so a
 73    // near-u32-max value can't wrap (panic in safe mode, UB in release).
 74    return @as(usize, @sizeOf(Header)) + @as(usize, header.len);
 75}
 76
 77pub fn send(fd: i32, tag: Tag, data: []const u8) !void {
 78    const header = Header{
 79        .tag = tag,
 80        .len = @intCast(data.len),
 81    };
 82    const header_bytes = std.mem.asBytes(&header);
 83    try writeAll(fd, header_bytes);
 84    if (data.len > 0) {
 85        try writeAll(fd, data);
 86    }
 87}
 88
 89pub fn appendMessage(
 90    alloc: std.mem.Allocator,
 91    list: *std.ArrayList(u8),
 92    tag: Tag,
 93    data: []const u8,
 94) !void {
 95    std.log.info("sending ipc message tag={s}", .{@tagName(tag)});
 96    const header = Header{
 97        .tag = tag,
 98        .len = @intCast(data.len),
 99    };
100    try list.appendSlice(alloc, std.mem.asBytes(&header));
101    if (data.len > 0) {
102        try list.appendSlice(alloc, data);
103    }
104}
105
106fn writeAll(fd: i32, data: []const u8) !void {
107    var index: usize = 0;
108    while (index < data.len) {
109        const n = try posix.write(fd, data[index..]);
110        if (n == 0) return error.DiskQuota;
111        index += n;
112    }
113}
114
115pub const Message = struct {
116    tag: Tag,
117    data: []u8,
118
119    pub fn deinit(self: Message, alloc: std.mem.Allocator) void {
120        if (self.data.len > 0) {
121            alloc.free(self.data);
122        }
123    }
124};
125
126pub const SocketMsg = struct {
127    header: Header,
128    payload: []const u8,
129};
130
131pub const SocketBuffer = struct {
132    buf: std.ArrayList(u8),
133    alloc: std.mem.Allocator,
134    head: usize,
135
136    pub fn init(alloc: std.mem.Allocator) !SocketBuffer {
137        return .{
138            .buf = try std.ArrayList(u8).initCapacity(alloc, 4096),
139            .alloc = alloc,
140            .head = 0,
141        };
142    }
143
144    pub fn deinit(self: *SocketBuffer) void {
145        self.buf.deinit(self.alloc);
146    }
147
148    /// Reads from fd into buffer.
149    /// Returns number of bytes read.
150    /// Propagates error.WouldBlock and other errors to caller.
151    /// Returns 0 on EOF.
152    pub fn read(self: *SocketBuffer, fd: i32) !usize {
153        if (self.head > 0) {
154            const remaining = self.buf.items.len - self.head;
155            if (remaining > 0) {
156                std.mem.copyForwards(u8, self.buf.items[0..remaining], self.buf.items[self.head..]);
157                self.buf.items.len = remaining;
158            } else {
159                self.buf.clearRetainingCapacity();
160            }
161            self.head = 0;
162        }
163
164        var tmp: [4096]u8 = undefined;
165        const n = try posix.read(fd, &tmp);
166        if (n > 0) {
167            try self.buf.appendSlice(self.alloc, tmp[0..n]);
168        }
169        return n;
170    }
171
172    /// Returns the next complete message or `null` when none available.
173    /// `buf` is advanced automatically; caller keeps the returned slices
174    /// valid until the following `next()` (or `deinit`).
175    pub fn next(self: *SocketBuffer) ?SocketMsg {
176        const available = self.buf.items[self.head..];
177        const total = expectedLength(available) orelse return null;
178        if (available.len < total) return null;
179
180        const hdr = std.mem.bytesToValue(Header, available[0..@sizeOf(Header)]);
181        const pay = available[@sizeOf(Header)..total];
182
183        self.head += total;
184        return .{ .header = hdr, .payload = pay };
185    }
186};
187
188const ConnectError = error{
189    ConnectionRefused,
190    Unexpected,
191};
192
193/// Connect-only liveness check. Callers that don't read `Info` should use
194/// this (not `probeSession`) so they survive `Info` shape changes.
195pub fn connectSession(socket_path: []const u8) ConnectError!i32 {
196    return socket.sessionConnect(socket_path) catch |err| switch (err) {
197        error.ConnectionRefused => return error.ConnectionRefused,
198        else => return error.Unexpected,
199    };
200}
201
202const SessionProbeError = error{
203    Timeout,
204    ConnectionRefused,
205    Unexpected,
206    InfoSizeMismatch,
207};
208
209const SessionProbeResult = struct {
210    fd: i32,
211    info: Info,
212};
213
214pub fn probeSession(
215    alloc: std.mem.Allocator,
216    socket_path: []const u8,
217) SessionProbeError!SessionProbeResult {
218    const timeout_ms = 1000;
219    const fd = try connectSession(socket_path);
220    errdefer posix.close(fd);
221
222    send(fd, .Info, "") catch return error.Unexpected;
223
224    var poll_fds = [_]posix.pollfd{.{ .fd = fd, .events = posix.POLL.IN, .revents = 0 }};
225    const poll_result = posix.poll(&poll_fds, timeout_ms) catch return error.Unexpected;
226    if (poll_result == 0) {
227        return error.Timeout;
228    }
229
230    var sb = SocketBuffer.init(alloc) catch return error.Unexpected;
231    defer sb.deinit();
232
233    const n = sb.read(fd) catch return error.Unexpected;
234    if (n == 0) return error.Unexpected;
235
236    while (sb.next()) |msg| {
237        if (msg.header.tag == .Info) {
238            if (msg.payload.len != @sizeOf(Info)) return error.InfoSizeMismatch;
239            return .{
240                .fd = fd,
241                .info = std.mem.bytesToValue(Info, msg.payload[0..@sizeOf(Info)]),
242            };
243        }
244    }
245    return error.Unexpected;
246}
247
248//  WIRE PROTOCOL FREEZE — read before "fixing" any test below.
249//
250//  Changing these constants does not fix the test; it breaks every
251//  running daemon for every user until they `pkill -f zmx`.
252//
253//  Need a new field?   → add a new `Tag` value (next free integer).
254//  Need to remove one? → don't. Reserve the integer, stop sending it.
255test "Info wire size is frozen" {
256    try std.testing.expectEqual(@as(usize, 552), @sizeOf(Info));
257    // packed struct{u8,u32} backs to u40 → @sizeOf rounds to 8, not 5.
258    try std.testing.expectEqual(@as(usize, 8), @sizeOf(Header));
259}
260
261test "Tag wire values are frozen" {
262    inline for (.{
263        .{ Tag.Input, 0 },  .{ Tag.Output, 1 },        .{ Tag.Resize, 2 },
264        .{ Tag.Detach, 3 }, .{ Tag.DetachAll, 4 },     .{ Tag.Kill, 5 },
265        .{ Tag.Info, 6 },   .{ Tag.Init, 7 },          .{ Tag.History, 8 },
266        .{ Tag.Run, 9 },    .{ Tag.Ack, 10 },          .{ Tag.Switch, 11 },
267        .{ Tag.Write, 12 }, .{ Tag.TaskComplete, 13 },
268    }) |p| try std.testing.expectEqual(@as(u8, p[1]), @intFromEnum(p[0]));
269}
270
271test "zeroed Info has no stack garbage in wire bytes" {
272    var info = std.mem.zeroes(Info);
273    info.clients_len = 3;
274    info.pid = 999;
275    info.task_exit_code = 7;
276    const bytes = std.mem.asBytes(&info);
277    // Tail padding after task_exit_code must be zero (asBytes ships it).
278    const last_field_end = @offsetOf(Info, "task_exit_code") + @sizeOf(u8);
279    for (bytes[last_field_end..]) |b| try std.testing.expectEqual(@as(u8, 0), b);
280}