Eric Bower
·
2026-04-26
ipc.zig
1const std = @import("std");
2const posix = std.posix;
3const cross = @import("cross.zig");
4const socket = @import("socket.zig");
5
6pub const Tag = enum(u8) {
7 Input = 0,
8 Output = 1,
9 Resize = 2,
10 Detach = 3,
11 DetachAll = 4,
12 Kill = 5,
13 Info = 6,
14 Init = 7,
15 History = 8,
16 Run = 9,
17 Ack = 10,
18 Switch = 11,
19 Write = 12,
20 TaskComplete = 13,
21 // Non-exhaustive: this enum comes off the wire via bytesToValue and
22 // @enumFromInt, so out-of-range values (14-255) are representable
23 // rather than UB. Switches must handle `_` (unknown tag).
24 _,
25};
26
27comptime {
28 if (@typeInfo(Tag).@"enum".is_exhaustive) @compileError(
29 "ipc.Tag must stay non-exhaustive — old daemons rely on `_` to ignore unknown tags",
30 );
31}
32
33pub const Header = packed struct {
34 tag: Tag,
35 len: u32,
36};
37
38pub const Resize = packed struct {
39 rows: u16,
40 cols: u16,
41};
42
43pub fn getTerminalSize(fd: i32) Resize {
44 var ws: cross.c.struct_winsize = undefined;
45 if (cross.c.ioctl(fd, cross.c.TIOCGWINSZ, &ws) == 0 and ws.ws_row > 0 and ws.ws_col > 0) {
46 return .{ .rows = ws.ws_row, .cols = ws.ws_col };
47 }
48 return .{ .rows = 24, .cols = 80 };
49}
50
51pub const MAX_CMD_LEN = 256;
52pub const MAX_CWD_LEN = 256;
53
54/// Frozen wire shape. Do NOT add fields — new stats go in new `Tag` values
55/// so old daemons (whose `_` arm ignores unknown tags) stay reachable.
56/// Changing `@sizeOf(Info)` breaks `zmx list` against running daemons.
57pub const Info = extern struct {
58 clients_len: u64,
59 pid: i32,
60 cmd_len: u16,
61 cwd_len: u16,
62 cmd: [MAX_CMD_LEN]u8,
63 cwd: [MAX_CWD_LEN]u8,
64 created_at: u64,
65 task_ended_at: u64,
66 task_exit_code: u8,
67};
68
69pub fn expectedLength(data: []const u8) ?usize {
70 if (data.len < @sizeOf(Header)) return null;
71 const header = std.mem.bytesToValue(Header, data[0..@sizeOf(Header)]);
72 // header.len comes off the wire; widen to usize before adding so a
73 // near-u32-max value can't wrap (panic in safe mode, UB in release).
74 return @as(usize, @sizeOf(Header)) + @as(usize, header.len);
75}
76
77pub fn send(fd: i32, tag: Tag, data: []const u8) !void {
78 const header = Header{
79 .tag = tag,
80 .len = @intCast(data.len),
81 };
82 const header_bytes = std.mem.asBytes(&header);
83 try writeAll(fd, header_bytes);
84 if (data.len > 0) {
85 try writeAll(fd, data);
86 }
87}
88
89pub fn appendMessage(
90 alloc: std.mem.Allocator,
91 list: *std.ArrayList(u8),
92 tag: Tag,
93 data: []const u8,
94) !void {
95 std.log.info("sending ipc message tag={s}", .{@tagName(tag)});
96 const header = Header{
97 .tag = tag,
98 .len = @intCast(data.len),
99 };
100 try list.appendSlice(alloc, std.mem.asBytes(&header));
101 if (data.len > 0) {
102 try list.appendSlice(alloc, data);
103 }
104}
105
106fn writeAll(fd: i32, data: []const u8) !void {
107 var index: usize = 0;
108 while (index < data.len) {
109 const n = try posix.write(fd, data[index..]);
110 if (n == 0) return error.DiskQuota;
111 index += n;
112 }
113}
114
115pub const Message = struct {
116 tag: Tag,
117 data: []u8,
118
119 pub fn deinit(self: Message, alloc: std.mem.Allocator) void {
120 if (self.data.len > 0) {
121 alloc.free(self.data);
122 }
123 }
124};
125
126pub const SocketMsg = struct {
127 header: Header,
128 payload: []const u8,
129};
130
131pub const SocketBuffer = struct {
132 buf: std.ArrayList(u8),
133 alloc: std.mem.Allocator,
134 head: usize,
135
136 pub fn init(alloc: std.mem.Allocator) !SocketBuffer {
137 return .{
138 .buf = try std.ArrayList(u8).initCapacity(alloc, 4096),
139 .alloc = alloc,
140 .head = 0,
141 };
142 }
143
144 pub fn deinit(self: *SocketBuffer) void {
145 self.buf.deinit(self.alloc);
146 }
147
148 /// Reads from fd into buffer.
149 /// Returns number of bytes read.
150 /// Propagates error.WouldBlock and other errors to caller.
151 /// Returns 0 on EOF.
152 pub fn read(self: *SocketBuffer, fd: i32) !usize {
153 if (self.head > 0) {
154 const remaining = self.buf.items.len - self.head;
155 if (remaining > 0) {
156 std.mem.copyForwards(u8, self.buf.items[0..remaining], self.buf.items[self.head..]);
157 self.buf.items.len = remaining;
158 } else {
159 self.buf.clearRetainingCapacity();
160 }
161 self.head = 0;
162 }
163
164 var tmp: [4096]u8 = undefined;
165 const n = try posix.read(fd, &tmp);
166 if (n > 0) {
167 try self.buf.appendSlice(self.alloc, tmp[0..n]);
168 }
169 return n;
170 }
171
172 /// Returns the next complete message or `null` when none available.
173 /// `buf` is advanced automatically; caller keeps the returned slices
174 /// valid until the following `next()` (or `deinit`).
175 pub fn next(self: *SocketBuffer) ?SocketMsg {
176 const available = self.buf.items[self.head..];
177 const total = expectedLength(available) orelse return null;
178 if (available.len < total) return null;
179
180 const hdr = std.mem.bytesToValue(Header, available[0..@sizeOf(Header)]);
181 const pay = available[@sizeOf(Header)..total];
182
183 self.head += total;
184 return .{ .header = hdr, .payload = pay };
185 }
186};
187
188const ConnectError = error{
189 ConnectionRefused,
190 Unexpected,
191};
192
193/// Connect-only liveness check. Callers that don't read `Info` should use
194/// this (not `probeSession`) so they survive `Info` shape changes.
195pub fn connectSession(socket_path: []const u8) ConnectError!i32 {
196 return socket.sessionConnect(socket_path) catch |err| switch (err) {
197 error.ConnectionRefused => return error.ConnectionRefused,
198 else => return error.Unexpected,
199 };
200}
201
202const SessionProbeError = error{
203 Timeout,
204 ConnectionRefused,
205 Unexpected,
206 InfoSizeMismatch,
207};
208
209const SessionProbeResult = struct {
210 fd: i32,
211 info: Info,
212};
213
214pub fn probeSession(
215 alloc: std.mem.Allocator,
216 socket_path: []const u8,
217) SessionProbeError!SessionProbeResult {
218 const timeout_ms = 1000;
219 const fd = try connectSession(socket_path);
220 errdefer posix.close(fd);
221
222 send(fd, .Info, "") catch return error.Unexpected;
223
224 var poll_fds = [_]posix.pollfd{.{ .fd = fd, .events = posix.POLL.IN, .revents = 0 }};
225 const poll_result = posix.poll(&poll_fds, timeout_ms) catch return error.Unexpected;
226 if (poll_result == 0) {
227 return error.Timeout;
228 }
229
230 var sb = SocketBuffer.init(alloc) catch return error.Unexpected;
231 defer sb.deinit();
232
233 const n = sb.read(fd) catch return error.Unexpected;
234 if (n == 0) return error.Unexpected;
235
236 while (sb.next()) |msg| {
237 if (msg.header.tag == .Info) {
238 if (msg.payload.len != @sizeOf(Info)) return error.InfoSizeMismatch;
239 return .{
240 .fd = fd,
241 .info = std.mem.bytesToValue(Info, msg.payload[0..@sizeOf(Info)]),
242 };
243 }
244 }
245 return error.Unexpected;
246}
247
248// WIRE PROTOCOL FREEZE — read before "fixing" any test below.
249//
250// Changing these constants does not fix the test; it breaks every
251// running daemon for every user until they `pkill -f zmx`.
252//
253// Need a new field? → add a new `Tag` value (next free integer).
254// Need to remove one? → don't. Reserve the integer, stop sending it.
255test "Info wire size is frozen" {
256 try std.testing.expectEqual(@as(usize, 552), @sizeOf(Info));
257 // packed struct{u8,u32} backs to u40 → @sizeOf rounds to 8, not 5.
258 try std.testing.expectEqual(@as(usize, 8), @sizeOf(Header));
259}
260
261test "Tag wire values are frozen" {
262 inline for (.{
263 .{ Tag.Input, 0 }, .{ Tag.Output, 1 }, .{ Tag.Resize, 2 },
264 .{ Tag.Detach, 3 }, .{ Tag.DetachAll, 4 }, .{ Tag.Kill, 5 },
265 .{ Tag.Info, 6 }, .{ Tag.Init, 7 }, .{ Tag.History, 8 },
266 .{ Tag.Run, 9 }, .{ Tag.Ack, 10 }, .{ Tag.Switch, 11 },
267 .{ Tag.Write, 12 }, .{ Tag.TaskComplete, 13 },
268 }) |p| try std.testing.expectEqual(@as(u8, p[1]), @intFromEnum(p[0]));
269}
270
271test "zeroed Info has no stack garbage in wire bytes" {
272 var info = std.mem.zeroes(Info);
273 info.clients_len = 3;
274 info.pid = 999;
275 info.task_exit_code = 7;
276 const bytes = std.mem.asBytes(&info);
277 // Tail padding after task_exit_code must be zero (asBytes ships it).
278 const last_field_end = @offsetOf(Info, "task_exit_code") + @sizeOf(u8);
279 for (bytes[last_field_end..]) |b| try std.testing.expectEqual(@as(u8, 0), b);
280}