Eric Bower
·
2026-06-19
1const std = @import("std");
2const posix = std.posix;
3const cross = @import("cross.zig");
4const socket = @import("socket.zig");
5
6pub const Tag = enum(u8) {
7 Input = 0,
8 Output = 1,
9 Resize = 2,
10 Detach = 3,
11 DetachAll = 4,
12 Kill = 5,
13 Info = 6,
14 Init = 7,
15 History = 8,
16 Run = 9,
17 Ack = 10,
18 Switch = 11,
19 Write = 12,
20 TaskComplete = 13,
21 // Non-exhaustive: this enum comes off the wire via bytesToValue and
22 // @enumFromInt, so out-of-range values (14-255) are representable
23 // rather than UB. Switches must handle `_` (unknown tag).
24 _,
25};
26
27comptime {
28 if (@typeInfo(Tag).@"enum".is_exhaustive) @compileError(
29 "ipc.Tag must stay non-exhaustive — old daemons rely on `_` to ignore unknown tags",
30 );
31}
32
33pub const Header = packed struct {
34 tag: Tag,
35 len: u32,
36};
37
38pub const Resize = packed struct {
39 rows: u16,
40 cols: u16,
41};
42
43pub fn getTerminalSize(fd: i32) Resize {
44 var ws: cross.c.struct_winsize = undefined;
45 if (cross.c.ioctl(fd, cross.c.TIOCGWINSZ, &ws) == 0 and ws.ws_row > 0 and ws.ws_col > 0) {
46 return .{ .rows = ws.ws_row, .cols = ws.ws_col };
47 }
48 return .{ .rows = 24, .cols = 160 };
49}
50
51pub const MAX_CMD_LEN = 256;
52pub const MAX_CWD_LEN = 256;
53
54/// Frozen wire shape. Do NOT add fields — new stats go in new `Tag` values
55/// so old daemons (whose `_` arm ignores unknown tags) stay reachable.
56/// Changing `@sizeOf(Info)` breaks `zmx list` against running daemons.
57pub const Info = extern struct {
58 clients_len: u64,
59 pid: i32,
60 cmd_len: u16,
61 cwd_len: u16,
62 cmd: [MAX_CMD_LEN]u8,
63 cwd: [MAX_CWD_LEN]u8,
64 created_at: u64,
65 task_ended_at: u64,
66 task_exit_code: u8,
67};
68
69pub fn expectedLength(data: []const u8) ?usize {
70 if (data.len < @sizeOf(Header)) return null;
71 const header = std.mem.bytesToValue(Header, data[0..@sizeOf(Header)]);
72 // header.len comes off the wire; widen to usize before adding so a
73 // near-u32-max value can't wrap (panic in safe mode, UB in release).
74 return @as(usize, @sizeOf(Header)) + @as(usize, header.len);
75}
76
77pub fn send(fd: i32, tag: Tag, data: []const u8) !void {
78 const header = Header{
79 .tag = tag,
80 .len = @intCast(data.len),
81 };
82 const header_bytes = std.mem.asBytes(&header);
83 try writeAll(fd, header_bytes);
84 if (data.len > 0) {
85 try writeAll(fd, data);
86 }
87}
88
89pub fn appendMessage(
90 alloc: std.mem.Allocator,
91 list: *std.ArrayList(u8),
92 tag: Tag,
93 data: []const u8,
94) !void {
95 const header = Header{
96 .tag = tag,
97 .len = @intCast(data.len),
98 };
99 // Guarantee capacity for header + payload in one check to avoid
100 // intermediate realloc between the two appends on the hot path.
101 try list.ensureTotalCapacity(alloc, list.items.len + @sizeOf(Header) + data.len);
102 list.appendSliceAssumeCapacity(std.mem.asBytes(&header));
103 if (data.len > 0) {
104 list.appendSliceAssumeCapacity(data);
105 }
106}
107
108fn writeAll(fd: i32, data: []const u8) !void {
109 var index: usize = 0;
110 while (index < data.len) {
111 const n = try posix.write(fd, data[index..]);
112 if (n == 0) return error.DiskQuota;
113 index += n;
114 }
115}
116
117pub const Message = struct {
118 tag: Tag,
119 data: []u8,
120
121 pub fn deinit(self: Message, alloc: std.mem.Allocator) void {
122 if (self.data.len > 0) {
123 alloc.free(self.data);
124 }
125 }
126};
127
128pub const SocketMsg = struct {
129 header: Header,
130 payload: []const u8,
131};
132
133pub const SocketBuffer = struct {
134 buf: std.ArrayList(u8),
135 alloc: std.mem.Allocator,
136 head: usize,
137
138 pub fn init(alloc: std.mem.Allocator) !SocketBuffer {
139 return .{
140 .buf = try std.ArrayList(u8).initCapacity(alloc, 4096),
141 .alloc = alloc,
142 .head = 0,
143 };
144 }
145
146 pub fn deinit(self: *SocketBuffer) void {
147 self.buf.deinit(self.alloc);
148 }
149
150 /// Reads from fd into buffer.
151 /// Returns number of bytes read.
152 /// Propagates error.WouldBlock and other errors to caller.
153 /// Returns 0 on EOF.
154 pub fn read(self: *SocketBuffer, fd: i32) !usize {
155 if (self.head > 0) {
156 const remaining = self.buf.items.len - self.head;
157 if (remaining > 0) {
158 std.mem.copyForwards(u8, self.buf.items[0..remaining], self.buf.items[self.head..]);
159 self.buf.items.len = remaining;
160 } else {
161 self.buf.clearRetainingCapacity();
162 }
163 self.head = 0;
164 }
165
166 var tmp: [4096]u8 = undefined;
167 const n = try posix.read(fd, &tmp);
168 if (n > 0) {
169 try self.buf.appendSlice(self.alloc, tmp[0..n]);
170 }
171 return n;
172 }
173
174 /// Returns the next complete message or `null` when none available.
175 /// `buf` is advanced automatically; caller keeps the returned slices
176 /// valid until the following `next()` (or `deinit`).
177 pub fn next(self: *SocketBuffer) ?SocketMsg {
178 const available = self.buf.items[self.head..];
179 const total = expectedLength(available) orelse return null;
180 if (available.len < total) return null;
181
182 const hdr = std.mem.bytesToValue(Header, available[0..@sizeOf(Header)]);
183 const pay = available[@sizeOf(Header)..total];
184
185 self.head += total;
186 return .{ .header = hdr, .payload = pay };
187 }
188};
189
190const ConnectError = error{
191 ConnectionRefused,
192 Unexpected,
193};
194
195/// Connect-only liveness check. Callers that don't read `Info` should use
196/// this (not `probeSession`) so they survive `Info` shape changes.
197pub fn connectSession(socket_path: []const u8) ConnectError!i32 {
198 return socket.sessionConnect(socket_path) catch |err| switch (err) {
199 error.ConnectionRefused => return error.ConnectionRefused,
200 else => return error.Unexpected,
201 };
202}
203
204const SessionProbeError = error{
205 Timeout,
206 ConnectionRefused,
207 Unexpected,
208 InfoSizeMismatch,
209};
210
211const SessionProbeResult = struct {
212 fd: i32,
213 info: Info,
214};
215
216pub fn probeSession(
217 alloc: std.mem.Allocator,
218 socket_path: []const u8,
219) SessionProbeError!SessionProbeResult {
220 const timeout_ms = 1000;
221 const fd = try connectSession(socket_path);
222 errdefer posix.close(fd);
223
224 send(fd, .Info, "") catch return error.Unexpected;
225
226 var poll_fds = [_]posix.pollfd{.{ .fd = fd, .events = posix.POLL.IN, .revents = 0 }};
227 const poll_result = posix.poll(&poll_fds, timeout_ms) catch return error.Unexpected;
228 if (poll_result == 0) {
229 return error.Timeout;
230 }
231
232 var sb = SocketBuffer.init(alloc) catch return error.Unexpected;
233 defer sb.deinit();
234
235 const n = sb.read(fd) catch return error.Unexpected;
236 if (n == 0) return error.Unexpected;
237
238 while (sb.next()) |msg| {
239 if (msg.header.tag == .Info) {
240 if (msg.payload.len != @sizeOf(Info)) return error.InfoSizeMismatch;
241 return .{
242 .fd = fd,
243 .info = std.mem.bytesToValue(Info, msg.payload[0..@sizeOf(Info)]),
244 };
245 }
246 }
247 return error.Unexpected;
248}
249
250// WIRE PROTOCOL FREEZE — read before "fixing" any test below.
251//
252// Changing these constants does not fix the test; it breaks every
253// running daemon for every user until they `pkill -f zmx`.
254//
255// Need a new field? → add a new `Tag` value (next free integer).
256// Need to remove one? → don't. Reserve the integer, stop sending it.
257test "Info wire size is frozen" {
258 try std.testing.expectEqual(@as(usize, 552), @sizeOf(Info));
259 // packed struct{u8,u32} backs to u40 → @sizeOf rounds to 8, not 5.
260 try std.testing.expectEqual(@as(usize, 8), @sizeOf(Header));
261}
262
263test "Tag wire values are frozen" {
264 inline for (.{
265 .{ Tag.Input, 0 }, .{ Tag.Output, 1 }, .{ Tag.Resize, 2 },
266 .{ Tag.Detach, 3 }, .{ Tag.DetachAll, 4 }, .{ Tag.Kill, 5 },
267 .{ Tag.Info, 6 }, .{ Tag.Init, 7 }, .{ Tag.History, 8 },
268 .{ Tag.Run, 9 }, .{ Tag.Ack, 10 }, .{ Tag.Switch, 11 },
269 .{ Tag.Write, 12 }, .{ Tag.TaskComplete, 13 },
270 }) |p| try std.testing.expectEqual(@as(u8, p[1]), @intFromEnum(p[0]));
271}
272
273test "zeroed Info has no stack garbage in wire bytes" {
274 var info = std.mem.zeroes(Info);
275 info.clients_len = 3;
276 info.pid = 999;
277 info.task_exit_code = 7;
278 const bytes = std.mem.asBytes(&info);
279 // Tail padding after task_exit_code must be zero (asBytes ships it).
280 const last_field_end = @offsetOf(Info, "task_exit_code") + @sizeOf(u8);
281 for (bytes[last_field_end..]) |b| try std.testing.expectEqual(@as(u8, 0), b);
282}