main zmx / src / ipc.zig
Eric Bower  ·  2026-06-19
  1const std = @import("std");
  2const posix = std.posix;
  3const cross = @import("cross.zig");
  4const socket = @import("socket.zig");
  5
  6pub const Tag = enum(u8) {
  7    Input = 0,
  8    Output = 1,
  9    Resize = 2,
 10    Detach = 3,
 11    DetachAll = 4,
 12    Kill = 5,
 13    Info = 6,
 14    Init = 7,
 15    History = 8,
 16    Run = 9,
 17    Ack = 10,
 18    Switch = 11,
 19    Write = 12,
 20    TaskComplete = 13,
 21    // Non-exhaustive: this enum comes off the wire via bytesToValue and
 22    // @enumFromInt, so out-of-range values (14-255) are representable
 23    // rather than UB. Switches must handle `_` (unknown tag).
 24    _,
 25};
 26
 27comptime {
 28    if (@typeInfo(Tag).@"enum".is_exhaustive) @compileError(
 29        "ipc.Tag must stay non-exhaustive — old daemons rely on `_` to ignore unknown tags",
 30    );
 31}
 32
 33pub const Header = packed struct {
 34    tag: Tag,
 35    len: u32,
 36};
 37
 38pub const Resize = packed struct {
 39    rows: u16,
 40    cols: u16,
 41};
 42
 43pub fn getTerminalSize(fd: i32) Resize {
 44    var ws: cross.c.struct_winsize = undefined;
 45    if (cross.c.ioctl(fd, cross.c.TIOCGWINSZ, &ws) == 0 and ws.ws_row > 0 and ws.ws_col > 0) {
 46        return .{ .rows = ws.ws_row, .cols = ws.ws_col };
 47    }
 48    return .{ .rows = 24, .cols = 160 };
 49}
 50
 51pub const MAX_CMD_LEN = 256;
 52pub const MAX_CWD_LEN = 256;
 53
 54/// Frozen wire shape. Do NOT add fields — new stats go in new `Tag` values
 55/// so old daemons (whose `_` arm ignores unknown tags) stay reachable.
 56/// Changing `@sizeOf(Info)` breaks `zmx list` against running daemons.
 57pub const Info = extern struct {
 58    clients_len: u64,
 59    pid: i32,
 60    cmd_len: u16,
 61    cwd_len: u16,
 62    cmd: [MAX_CMD_LEN]u8,
 63    cwd: [MAX_CWD_LEN]u8,
 64    created_at: u64,
 65    task_ended_at: u64,
 66    task_exit_code: u8,
 67};
 68
 69pub fn expectedLength(data: []const u8) ?usize {
 70    if (data.len < @sizeOf(Header)) return null;
 71    const header = std.mem.bytesToValue(Header, data[0..@sizeOf(Header)]);
 72    // header.len comes off the wire; widen to usize before adding so a
 73    // near-u32-max value can't wrap (panic in safe mode, UB in release).
 74    return @as(usize, @sizeOf(Header)) + @as(usize, header.len);
 75}
 76
 77pub fn send(fd: i32, tag: Tag, data: []const u8) !void {
 78    const header = Header{
 79        .tag = tag,
 80        .len = @intCast(data.len),
 81    };
 82    const header_bytes = std.mem.asBytes(&header);
 83    try writeAll(fd, header_bytes);
 84    if (data.len > 0) {
 85        try writeAll(fd, data);
 86    }
 87}
 88
 89pub fn appendMessage(
 90    alloc: std.mem.Allocator,
 91    list: *std.ArrayList(u8),
 92    tag: Tag,
 93    data: []const u8,
 94) !void {
 95    const header = Header{
 96        .tag = tag,
 97        .len = @intCast(data.len),
 98    };
 99    // Guarantee capacity for header + payload in one check to avoid
100    // intermediate realloc between the two appends on the hot path.
101    try list.ensureTotalCapacity(alloc, list.items.len + @sizeOf(Header) + data.len);
102    list.appendSliceAssumeCapacity(std.mem.asBytes(&header));
103    if (data.len > 0) {
104        list.appendSliceAssumeCapacity(data);
105    }
106}
107
108fn writeAll(fd: i32, data: []const u8) !void {
109    var index: usize = 0;
110    while (index < data.len) {
111        const n = try posix.write(fd, data[index..]);
112        if (n == 0) return error.DiskQuota;
113        index += n;
114    }
115}
116
117pub const Message = struct {
118    tag: Tag,
119    data: []u8,
120
121    pub fn deinit(self: Message, alloc: std.mem.Allocator) void {
122        if (self.data.len > 0) {
123            alloc.free(self.data);
124        }
125    }
126};
127
128pub const SocketMsg = struct {
129    header: Header,
130    payload: []const u8,
131};
132
133pub const SocketBuffer = struct {
134    buf: std.ArrayList(u8),
135    alloc: std.mem.Allocator,
136    head: usize,
137
138    pub fn init(alloc: std.mem.Allocator) !SocketBuffer {
139        return .{
140            .buf = try std.ArrayList(u8).initCapacity(alloc, 4096),
141            .alloc = alloc,
142            .head = 0,
143        };
144    }
145
146    pub fn deinit(self: *SocketBuffer) void {
147        self.buf.deinit(self.alloc);
148    }
149
150    /// Reads from fd into buffer.
151    /// Returns number of bytes read.
152    /// Propagates error.WouldBlock and other errors to caller.
153    /// Returns 0 on EOF.
154    pub fn read(self: *SocketBuffer, fd: i32) !usize {
155        if (self.head > 0) {
156            const remaining = self.buf.items.len - self.head;
157            if (remaining > 0) {
158                std.mem.copyForwards(u8, self.buf.items[0..remaining], self.buf.items[self.head..]);
159                self.buf.items.len = remaining;
160            } else {
161                self.buf.clearRetainingCapacity();
162            }
163            self.head = 0;
164        }
165
166        var tmp: [4096]u8 = undefined;
167        const n = try posix.read(fd, &tmp);
168        if (n > 0) {
169            try self.buf.appendSlice(self.alloc, tmp[0..n]);
170        }
171        return n;
172    }
173
174    /// Returns the next complete message or `null` when none available.
175    /// `buf` is advanced automatically; caller keeps the returned slices
176    /// valid until the following `next()` (or `deinit`).
177    pub fn next(self: *SocketBuffer) ?SocketMsg {
178        const available = self.buf.items[self.head..];
179        const total = expectedLength(available) orelse return null;
180        if (available.len < total) return null;
181
182        const hdr = std.mem.bytesToValue(Header, available[0..@sizeOf(Header)]);
183        const pay = available[@sizeOf(Header)..total];
184
185        self.head += total;
186        return .{ .header = hdr, .payload = pay };
187    }
188};
189
190const ConnectError = error{
191    ConnectionRefused,
192    Unexpected,
193};
194
195/// Connect-only liveness check. Callers that don't read `Info` should use
196/// this (not `probeSession`) so they survive `Info` shape changes.
197pub fn connectSession(socket_path: []const u8) ConnectError!i32 {
198    return socket.sessionConnect(socket_path) catch |err| switch (err) {
199        error.ConnectionRefused => return error.ConnectionRefused,
200        else => return error.Unexpected,
201    };
202}
203
204const SessionProbeError = error{
205    Timeout,
206    ConnectionRefused,
207    Unexpected,
208    InfoSizeMismatch,
209};
210
211const SessionProbeResult = struct {
212    fd: i32,
213    info: Info,
214};
215
216pub fn probeSession(
217    alloc: std.mem.Allocator,
218    socket_path: []const u8,
219) SessionProbeError!SessionProbeResult {
220    const timeout_ms = 1000;
221    const fd = try connectSession(socket_path);
222    errdefer posix.close(fd);
223
224    send(fd, .Info, "") catch return error.Unexpected;
225
226    var poll_fds = [_]posix.pollfd{.{ .fd = fd, .events = posix.POLL.IN, .revents = 0 }};
227    const poll_result = posix.poll(&poll_fds, timeout_ms) catch return error.Unexpected;
228    if (poll_result == 0) {
229        return error.Timeout;
230    }
231
232    var sb = SocketBuffer.init(alloc) catch return error.Unexpected;
233    defer sb.deinit();
234
235    const n = sb.read(fd) catch return error.Unexpected;
236    if (n == 0) return error.Unexpected;
237
238    while (sb.next()) |msg| {
239        if (msg.header.tag == .Info) {
240            if (msg.payload.len != @sizeOf(Info)) return error.InfoSizeMismatch;
241            return .{
242                .fd = fd,
243                .info = std.mem.bytesToValue(Info, msg.payload[0..@sizeOf(Info)]),
244            };
245        }
246    }
247    return error.Unexpected;
248}
249
250//  WIRE PROTOCOL FREEZE — read before "fixing" any test below.
251//
252//  Changing these constants does not fix the test; it breaks every
253//  running daemon for every user until they `pkill -f zmx`.
254//
255//  Need a new field?   → add a new `Tag` value (next free integer).
256//  Need to remove one? → don't. Reserve the integer, stop sending it.
257test "Info wire size is frozen" {
258    try std.testing.expectEqual(@as(usize, 552), @sizeOf(Info));
259    // packed struct{u8,u32} backs to u40 → @sizeOf rounds to 8, not 5.
260    try std.testing.expectEqual(@as(usize, 8), @sizeOf(Header));
261}
262
263test "Tag wire values are frozen" {
264    inline for (.{
265        .{ Tag.Input, 0 },  .{ Tag.Output, 1 },        .{ Tag.Resize, 2 },
266        .{ Tag.Detach, 3 }, .{ Tag.DetachAll, 4 },     .{ Tag.Kill, 5 },
267        .{ Tag.Info, 6 },   .{ Tag.Init, 7 },          .{ Tag.History, 8 },
268        .{ Tag.Run, 9 },    .{ Tag.Ack, 10 },          .{ Tag.Switch, 11 },
269        .{ Tag.Write, 12 }, .{ Tag.TaskComplete, 13 },
270    }) |p| try std.testing.expectEqual(@as(u8, p[1]), @intFromEnum(p[0]));
271}
272
273test "zeroed Info has no stack garbage in wire bytes" {
274    var info = std.mem.zeroes(Info);
275    info.clients_len = 3;
276    info.pid = 999;
277    info.task_exit_code = 7;
278    const bytes = std.mem.asBytes(&info);
279    // Tail padding after task_exit_code must be zero (asBytes ships it).
280    const last_field_end = @offsetOf(Info, "task_exit_code") + @sizeOf(u8);
281    for (bytes[last_field_end..]) |b| try std.testing.expectEqual(@as(u8, 0), b);
282}