- commit
- e889a18
- parent
- cd82c84
- author
- Ian Tay
- date
- 2026-04-10 12:06:49 -0400 EDT
fix(daemon): self-pipe signal wakeup + version-tolerant IPC liveness
Self-pipe (idle daemon was deaf to SIGTERM/SIGWINCH):
std.posix.poll loops on .INTR internally (PollError has no Interrupted
member), so the prior `catch error.Interrupted` was unreachable since
8640be51/690487b — signals only "worked" when unrelated I/O woke poll().
Replace with the self-pipe trick: posix.pipe2(.{CLOEXEC,NONBLOCK}), one
wakeSignalPipe handler with errno save/restore, pipe read-end at fixed
poll_fds[2] in both loops; resize/shutdown act in the drain branch.
Removes the now-redundant sigwinch/sigterm atomic flags and collapses
setupSig*Handler into installWakeHandler(sig).
IPC versioning (Option E — decouple liveness from Info shape):
connectSession() does connect-only liveness; 5 of 6 callers (kill,
detach, history, run, attach) switch to it so they survive Info struct
changes. probeSession keeps the Info round-trip for `list`, which now
reports InfoSizeMismatch instead of Unexpected on version skew.
Hygiene: clients_len usize->u64 (extern struct); handleInfo zero-inits
Info so asBytes() doesn't ship 7B tail padding + cmd/cwd stack tails.
Tests freeze @sizeOf(Info)=552 / @sizeOf(Header)=8 and assert zeroed
Info ships zero padding; wired via test{_=ipc;}.
2 files changed,
+144,
-108
+45,
-12
1@@ -19,7 +19,7 @@ pub const Tag = enum(u8) {
2 Write = 12,
3 TaskComplete = 13,
4 // Non-exhaustive: this enum comes off the wire via bytesToValue and
5- // @enumFromInt, so out-of-range values (11-255) are representable
6+ // @enumFromInt, so out-of-range values (14-255) are representable
7 // rather than UB. Switches must handle `_` (unknown tag).
8 _,
9 };
10@@ -45,8 +45,11 @@ pub fn getTerminalSize(fd: i32) Resize {
11 pub const MAX_CMD_LEN = 256;
12 pub const MAX_CWD_LEN = 256;
13
14+/// Frozen wire shape. Do NOT add fields — new stats go in new `Tag` values
15+/// so old daemons (whose `_` arm ignores unknown tags) stay reachable.
16+/// Changing `@sizeOf(Info)` breaks `zmx list` against running daemons.
17 pub const Info = extern struct {
18- clients_len: usize,
19+ clients_len: u64,
20 pid: i32,
21 cmd_len: u16,
22 cwd_len: u16,
23@@ -176,10 +179,25 @@ pub const SocketBuffer = struct {
24 }
25 };
26
27+const ConnectError = error{
28+ ConnectionRefused,
29+ Unexpected,
30+};
31+
32+/// Unlike `probeSession`, does not round-trip `Info` — kill/detach/history/run
33+/// stay usable against version-skewed daemons.
34+pub fn connectSession(socket_path: []const u8) ConnectError!i32 {
35+ return socket.sessionConnect(socket_path) catch |err| switch (err) {
36+ error.ConnectionRefused => return error.ConnectionRefused,
37+ else => return error.Unexpected,
38+ };
39+}
40+
41 const SessionProbeError = error{
42 Timeout,
43 ConnectionRefused,
44 Unexpected,
45+ InfoSizeMismatch,
46 };
47
48 const SessionProbeResult = struct {
49@@ -192,10 +210,7 @@ pub fn probeSession(
50 socket_path: []const u8,
51 ) SessionProbeError!SessionProbeResult {
52 const timeout_ms = 1000;
53- const fd = socket.sessionConnect(socket_path) catch |err| switch (err) {
54- error.ConnectionRefused => return error.ConnectionRefused,
55- else => return error.Unexpected,
56- };
57+ const fd = try connectSession(socket_path);
58 errdefer posix.close(fd);
59
60 send(fd, .Info, "") catch return error.Unexpected;
61@@ -214,13 +229,31 @@ pub fn probeSession(
62
63 while (sb.next()) |msg| {
64 if (msg.header.tag == .Info) {
65- if (msg.payload.len == @sizeOf(Info)) {
66- return .{
67- .fd = fd,
68- .info = std.mem.bytesToValue(Info, msg.payload[0..@sizeOf(Info)]),
69- };
70- }
71+ if (msg.payload.len != @sizeOf(Info)) return error.InfoSizeMismatch;
72+ return .{
73+ .fd = fd,
74+ .info = std.mem.bytesToValue(Info, msg.payload[0..@sizeOf(Info)]),
75+ };
76 }
77 }
78 return error.Unexpected;
79 }
80+
81+test "Info wire size is frozen" {
82+ // Bumping this means version-skewed `zmx list` breaks. See doc comment
83+ // on `Info` — add a new `Tag` instead of growing this struct.
84+ try std.testing.expectEqual(@as(usize, 552), @sizeOf(Info));
85+ // packed struct{u8,u32} backs to u40 → @sizeOf rounds to 8, not 5.
86+ try std.testing.expectEqual(@as(usize, 8), @sizeOf(Header));
87+}
88+
89+test "zeroed Info has no stack garbage in wire bytes" {
90+ var info = std.mem.zeroes(Info);
91+ info.clients_len = 3;
92+ info.pid = 999;
93+ info.task_exit_code = 7;
94+ const bytes = std.mem.asBytes(&info);
95+ // Tail padding after task_exit_code must be zero (asBytes ships it).
96+ const last_field_end = @offsetOf(Info, "task_exit_code") + @sizeOf(u8);
97+ for (bytes[last_field_end..]) |b| try std.testing.expectEqual(@as(u8, 0), b);
98+}
+99,
-96
1@@ -30,8 +30,10 @@ fn zmxLogFn(
2 log_system.log(level, scope, format, args);
3 }
4
5-var sigwinch_received: std.atomic.Value(bool) = std.atomic.Value(bool).init(false);
6-var sigterm_received: std.atomic.Value(bool) = std.atomic.Value(bool).init(false);
7+/// Self-pipe woken by signal handlers. std.posix.poll loops on .INTR internally
8+/// (PollError has no Interrupted member), so a signal that lands during poll()
9+/// never surfaces; the handler writes a byte here and poll() wakes on POLLIN.
10+var sig_pipe: [2]posix.fd_t = .{ -1, -1 };
11
12 // https://github.com/ziglang/zig/blob/738d2be9d6b6ef3ff3559130c05159ef53336224/lib/std/posix.zig#L3505
13 const O_NONBLOCK: usize = 1 << @bitOffsetOf(posix.O, "NONBLOCK");
14@@ -55,6 +57,18 @@ fn parseSessionArg(alloc: std.mem.Allocator, raw: []const u8) !SessionMatch {
15 return .{ .name = name, .is_prefix = false };
16 }
17
18+fn openSignalPipe() !void {
19+ sig_pipe = try posix.pipe2(.{ .CLOEXEC = true, .NONBLOCK = true });
20+}
21+
22+fn drainSignalPipe() void {
23+ var b: [16]u8 = undefined;
24+ while (true) {
25+ const n = posix.read(sig_pipe[0], &b) catch return;
26+ if (n == 0) return;
27+ }
28+}
29+
30 pub fn main() !void {
31 // use c_allocator to avoid "reached unreachable code" panic in DebugAllocator when forking
32 const alloc = std.heap.c_allocator;
33@@ -694,8 +708,8 @@ const Daemon = struct {
34 var should_create = !exists;
35
36 if (exists) {
37- if (ipc.probeSession(self.alloc, self.socket_path)) |result| {
38- posix.close(result.fd);
39+ if (ipc.connectSession(self.socket_path)) |fd| {
40+ posix.close(fd);
41 if (self.command != null) {
42 std.log.warn(
43 "session already exists, ignoring command session={s}",
44@@ -708,12 +722,12 @@ const Daemon = struct {
45 socket.cleanupStaleSocket(dir, self.session_name);
46 should_create = true;
47 },
48- // Probe didn't respond in time -- daemon may just be busy.
49- // The probe is only to decide create-vs-attach; the session
50- // exists, so proceed to attach rather than fail or orphan.
51+ // Connect failed for an unusual reason. The check is only to
52+ // decide create-vs-attach; the socket file exists, so proceed
53+ // to attach rather than fail or orphan.
54 else => {
55 std.log.warn(
56- "probe slow ({s}), proceeding to attach session={s}",
57+ "connect failed ({s}), proceeding to attach session={s}",
58 .{ @errorName(err), self.session_name },
59 );
60 },
61@@ -1012,12 +1026,17 @@ const Daemon = struct {
62 }
63
64 pub fn handleInfo(self: *Daemon, client: *Client) !void {
65- const clients_len = self.clients.items.len - 1;
66+ // zeroes() so asBytes() doesn't ship struct padding + unused cmd/cwd
67+ // tail bytes (daemon stack contents) to clients.
68+ var info = std.mem.zeroes(ipc.Info);
69+ info.clients_len = self.clients.items.len - 1;
70+ info.pid = self.pid;
71+ info.created_at = self.created_at;
72+ info.task_ended_at = self.task_ended_at orelse 0;
73+ info.task_exit_code = self.task_exit_code orelse 0;
74
75 // Build command string from args, re-quoting args that contain
76 // shell-special characters so the displayed command is copy-pasteable.
77- var cmd_buf: [ipc.MAX_CMD_LEN]u8 = undefined;
78- var cmd_len: u16 = 0;
79 const cur_cmd = self.command;
80 if (cur_cmd) |args| {
81 for (args, 0..) |arg, i| {
82@@ -1029,40 +1048,27 @@ const Daemon = struct {
83 const src = quoted orelse arg;
84
85 const need = src.len + @as(usize, if (i > 0) 1 else 0);
86- if (cmd_len + need > ipc.MAX_CMD_LEN) {
87+ if (info.cmd_len + need > ipc.MAX_CMD_LEN) {
88 const ellipsis = "...";
89- if (cmd_len + ellipsis.len <= ipc.MAX_CMD_LEN) {
90- @memcpy(cmd_buf[cmd_len..][0..ellipsis.len], ellipsis);
91- cmd_len += ellipsis.len;
92+ if (info.cmd_len + ellipsis.len <= ipc.MAX_CMD_LEN) {
93+ @memcpy(info.cmd[info.cmd_len..][0..ellipsis.len], ellipsis);
94+ info.cmd_len += ellipsis.len;
95 }
96 break;
97 }
98
99 if (i > 0) {
100- cmd_buf[cmd_len] = ' ';
101- cmd_len += 1;
102+ info.cmd[info.cmd_len] = ' ';
103+ info.cmd_len += 1;
104 }
105- @memcpy(cmd_buf[cmd_len..][0..src.len], src);
106- cmd_len += @intCast(src.len);
107+ @memcpy(info.cmd[info.cmd_len..][0..src.len], src);
108+ info.cmd_len += @intCast(src.len);
109 }
110 }
111
112- // Copy cwd
113- var cwd_buf: [ipc.MAX_CWD_LEN]u8 = undefined;
114- const cwd_len: u16 = @intCast(@min(self.cwd.len, ipc.MAX_CWD_LEN));
115- @memcpy(cwd_buf[0..cwd_len], self.cwd[0..cwd_len]);
116-
117- const info = ipc.Info{
118- .clients_len = clients_len,
119- .pid = self.pid,
120- .cmd_len = cmd_len,
121- .cwd_len = cwd_len,
122- .cmd = cmd_buf,
123- .cwd = cwd_buf,
124- .created_at = self.created_at,
125- .task_ended_at = self.task_ended_at orelse 0,
126- .task_exit_code = self.task_exit_code orelse 0,
127- };
128+ info.cwd_len = @intCast(@min(self.cwd.len, ipc.MAX_CWD_LEN));
129+ @memcpy(info.cwd[0..info.cwd_len], self.cwd[0..info.cwd_len]);
130+
131 try ipc.appendMessage(self.alloc, &client.write_buf, .Info, std.mem.asBytes(&info));
132 client.has_pending_output = true;
133 }
134@@ -1617,13 +1623,13 @@ fn detachAll(cfg: *Cfg) !void {
135 error.OutOfMemory => return err,
136 };
137 defer alloc.free(socket_path);
138- const result = ipc.probeSession(alloc, socket_path) catch |err| {
139+ const fd = ipc.connectSession(socket_path) catch |err| {
140 std.log.err("session unresponsive: {s}", .{@errorName(err)});
141 if (err == error.ConnectionRefused) socket.cleanupStaleSocket(dir, session_name);
142 return;
143 };
144- defer posix.close(result.fd);
145- ipc.send(result.fd, .DetachAll, "") catch |err| switch (err) {
146+ defer posix.close(fd);
147+ ipc.send(fd, .DetachAll, "") catch |err| switch (err) {
148 error.BrokenPipe, error.ConnectionResetByPeer => return,
149 else => return err,
150 };
151@@ -1651,7 +1657,7 @@ fn kill(cfg: *Cfg, session_name: []const u8, force: bool) !void {
152 w.interface.flush() catch {};
153 return error.SessionNotFound;
154 }
155- const result = ipc.probeSession(alloc, socket_path) catch |err| {
156+ const fd = ipc.connectSession(socket_path) catch |err| {
157 std.log.err("session unresponsive: {s}", .{@errorName(err)});
158 var buf: [4096]u8 = undefined;
159 var w = std.fs.File.stdout().writer(&buf);
160@@ -1668,8 +1674,8 @@ fn kill(cfg: *Cfg, session_name: []const u8, force: bool) !void {
161 return;
162 };
163
164- defer posix.close(result.fd);
165- ipc.send(result.fd, .Kill, "") catch |err| switch (err) {
166+ defer posix.close(fd);
167+ ipc.send(fd, .Kill, "") catch |err| switch (err) {
168 error.BrokenPipe, error.ConnectionResetByPeer => return,
169 else => return err,
170 };
171@@ -1702,15 +1708,15 @@ fn history(cfg: *Cfg, session_name: []const u8, format: util.HistoryFormat) !voi
172 w.interface.flush() catch {};
173 return error.SessionNotFound;
174 }
175- const result = ipc.probeSession(alloc, socket_path) catch |err| {
176+ const fd = ipc.connectSession(socket_path) catch |err| {
177 std.log.err("session unresponsive: {s}", .{@errorName(err)});
178 if (err == error.ConnectionRefused) socket.cleanupStaleSocket(dir, session_name);
179 return;
180 };
181- defer posix.close(result.fd);
182+ defer posix.close(fd);
183
184 const format_byte = [_]u8{@intFromEnum(format)};
185- ipc.send(result.fd, .History, &format_byte) catch |err| switch (err) {
186+ ipc.send(fd, .History, &format_byte) catch |err| switch (err) {
187 error.BrokenPipe, error.ConnectionResetByPeer => return,
188 else => return err,
189 };
190@@ -1719,14 +1725,14 @@ fn history(cfg: *Cfg, session_name: []const u8, format: util.HistoryFormat) !voi
191 defer sb.deinit();
192
193 while (true) {
194- var poll_fds = [_]posix.pollfd{.{ .fd = result.fd, .events = posix.POLL.IN, .revents = 0 }};
195+ var poll_fds = [_]posix.pollfd{.{ .fd = fd, .events = posix.POLL.IN, .revents = 0 }};
196 const poll_result = posix.poll(&poll_fds, 5000) catch return;
197 if (poll_result == 0) {
198 std.log.err("timeout waiting for history response", .{});
199 return;
200 }
201
202- const n = sb.read(result.fd) catch return;
203+ const n = sb.read(fd) catch return;
204 if (n == 0) return;
205
206 while (sb.next()) |msg| {
207@@ -2103,7 +2109,10 @@ fn run(daemon: *Daemon, detached: bool, command_args: [][]const u8) !void {
208 return error.CommandRequired;
209 }
210
211- const client_sock = try socket.sessionConnect(daemon.socket_path);
212+ const client_sock = ipc.connectSession(daemon.socket_path) catch |err| {
213+ std.log.err("session not ready: {s}", .{@errorName(err)});
214+ return error.SessionNotReady;
215+ };
216 defer posix.close(client_sock);
217
218 var fds = try std.ArrayList(i32).initCapacity(alloc, 1);
219@@ -2134,7 +2143,8 @@ fn clientLoop(client_sock_fd: i32) !ClientResult {
220 const alloc = std.heap.c_allocator;
221 defer posix.close(client_sock_fd);
222
223- setupSigwinchHandler();
224+ try openSignalPipe();
225+ installWakeHandler(posix.SIG.WINCH);
226
227 // Make socket non-blocking to avoid blocking on writes
228 var sock_flags = try posix.fcntl(client_sock_fd, posix.F.GETFL, 0);
229@@ -2168,12 +2178,6 @@ fn clientLoop(client_sock_fd: i32) !ClientResult {
230 defer _ = posix.fcntl(stdin_fd, posix.F.SETFL, stdin_orig_flags) catch {};
231
232 while (true) {
233- // Check for pending SIGWINCH
234- if (sigwinch_received.swap(false, .acq_rel)) {
235- const next_size = ipc.getTerminalSize(posix.STDOUT_FILENO);
236- try ipc.appendMessage(alloc, &sock_write_buf, .Resize, std.mem.asBytes(&next_size));
237- }
238-
239 poll_fds.clearRetainingCapacity();
240
241 try poll_fds.append(alloc, .{
242@@ -2193,6 +2197,8 @@ fn clientLoop(client_sock_fd: i32) !ClientResult {
243 .revents = 0,
244 });
245
246+ try poll_fds.append(alloc, .{ .fd = sig_pipe[0], .events = posix.POLL.IN, .revents = 0 });
247+
248 if (stdout_buf.items.len > 0) {
249 try poll_fds.append(alloc, .{
250 .fd = posix.STDOUT_FILENO,
251@@ -2201,10 +2207,13 @@ fn clientLoop(client_sock_fd: i32) !ClientResult {
252 });
253 }
254
255- _ = posix.poll(poll_fds.items, -1) catch |err| {
256- if (err == error.Interrupted) continue; // EINTR from signal, loop again
257- return err;
258- };
259+ _ = try posix.poll(poll_fds.items, -1);
260+
261+ if (poll_fds.items[2].revents & posix.POLL.IN != 0) {
262+ drainSignalPipe();
263+ const next_size = ipc.getTerminalSize(posix.STDOUT_FILENO);
264+ try ipc.appendMessage(alloc, &sock_write_buf, .Resize, std.mem.asBytes(&next_size));
265+ }
266
267 // Handle stdin -> socket (Input)
268 const inp_flags = (posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL);
269@@ -2308,7 +2317,8 @@ fn clientLoop(client_sock_fd: i32) !ClientResult {
270 fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
271 std.log.info("daemon started session={s} pty_fd={d}", .{ daemon.session_name, pty_fd });
272 daemon.pty_fd = pty_fd;
273- setupSigtermHandler();
274+ try openSignalPipe();
275+ installWakeHandler(posix.SIG.TERM);
276 var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(daemon.alloc, 8);
277 defer poll_fds.deinit(daemon.alloc);
278
279@@ -2323,14 +2333,6 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
280 defer vt_stream.deinit();
281
282 daemon_loop: while (daemon.running) {
283- if (sigterm_received.swap(false, .acq_rel)) {
284- std.log.info(
285- "SIGTERM received, shutting down gracefully session={s}",
286- .{daemon.session_name},
287- );
288- break :daemon_loop;
289- }
290-
291 poll_fds.clearRetainingCapacity();
292
293 try poll_fds.append(daemon.alloc, .{
294@@ -2349,6 +2351,8 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
295 .revents = 0,
296 });
297
298+ try poll_fds.append(daemon.alloc, .{ .fd = sig_pipe[0], .events = posix.POLL.IN, .revents = 0 });
299+
300 for (daemon.clients.items) |client| {
301 var events: i16 = posix.POLL.IN;
302 if (client.has_pending_output) {
303@@ -2361,10 +2365,16 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
304 });
305 }
306
307- _ = posix.poll(poll_fds.items, -1) catch |err| {
308- if (err == error.Interrupted) continue;
309- return err;
310- };
311+ _ = try posix.poll(poll_fds.items, -1);
312+
313+ if (poll_fds.items[2].revents & posix.POLL.IN != 0) {
314+ drainSignalPipe();
315+ std.log.info(
316+ "SIGTERM received, shutting down gracefully session={s}",
317+ .{daemon.session_name},
318+ );
319+ break :daemon_loop;
320+ }
321
322 if (poll_fds.items[0].revents & (posix.POLL.ERR | posix.POLL.HUP | posix.POLL.NVAL) != 0) {
323 std.log.err("server socket error revents={d}", .{poll_fds.items[0].revents});
324@@ -2473,9 +2483,9 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
325
326 var i: usize = daemon.clients.items.len;
327 // Only iterate over clients that were present when poll_fds was constructed
328- // poll_fds contains [server, pty, client0, client1, ...]
329- // So number of clients in poll_fds is poll_fds.items.len - 2
330- const num_polled_clients = poll_fds.items.len - 2;
331+ // poll_fds contains [server, pty, sig_pipe, client0, client1, ...]
332+ // So number of clients in poll_fds is poll_fds.items.len - 3
333+ const num_polled_clients = poll_fds.items.len - 3;
334 if (i > num_polled_clients) {
335 // If we have more clients than polled (i.e. we just accepted one), start from the
336 // polled ones
337@@ -2485,7 +2495,7 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
338 clients_loop: while (i > 0) {
339 i -= 1;
340 const client = daemon.clients.items[i];
341- const revents = poll_fds.items[i + 2].revents;
342+ const revents = poll_fds.items[i + 3].revents;
343
344 if (revents & posix.POLL.IN != 0) {
345 const n = client.read_buf.read(client.socket_fd) catch |err| {
346@@ -2564,33 +2574,22 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
347 }
348 }
349
350-fn handleSigwinch(_: i32, _: *const posix.siginfo_t, _: ?*anyopaque) callconv(.c) void {
351- sigwinch_received.store(true, .release);
352+fn wakeSignalPipe(_: i32, _: *const posix.siginfo_t, _: ?*anyopaque) callconv(.c) void {
353+ const saved = std.c._errno().*;
354+ _ = std.c.write(sig_pipe[1], "x", 1);
355+ std.c._errno().* = saved;
356 }
357
358-fn handleSigterm(_: i32, _: *const posix.siginfo_t, _: ?*anyopaque) callconv(.c) void {
359- sigterm_received.store(true, .release);
360-}
361-
362-// No SA_RESTART: we want the signal to interrupt poll() so the
363-// loop can check the flag. On BSD/macOS, SA_RESTART makes poll restartable,
364-// which would leave an idle daemon deaf to SIGTERM until other I/O wakes it.
365-fn setupSigwinchHandler() void {
366+// std.posix.poll retries EINTR internally, so SA_RESTART is moot — neither
367+// setting wakes the loop. The handler writes to sig_pipe instead; poll()
368+// wakes on its read end.
369+fn installWakeHandler(sig: u6) void {
370 const act: posix.Sigaction = .{
371- .handler = .{ .sigaction = handleSigwinch },
372+ .handler = .{ .sigaction = wakeSignalPipe },
373 .mask = posix.sigemptyset(),
374 .flags = posix.SA.SIGINFO,
375 };
376- posix.sigaction(posix.SIG.WINCH, &act, null);
377-}
378-
379-fn setupSigtermHandler() void {
380- const act: posix.Sigaction = .{
381- .handler = .{ .sigaction = handleSigterm },
382- .mask = posix.sigemptyset(),
383- .flags = posix.SA.SIGINFO,
384- };
385- posix.sigaction(posix.SIG.TERM, &act, null);
386+ posix.sigaction(sig, &act, null);
387 }
388
389 fn ignoreSigpipe() void {
390@@ -2601,3 +2600,7 @@ fn ignoreSigpipe() void {
391 };
392 posix.sigaction(posix.SIG.PIPE, &act, null);
393 }
394+
395+test {
396+ _ = ipc;
397+}