repos / zmx

session persistence for terminal processes
git clone https://github.com/neurosnap/zmx.git

commit
7cdb333
parent
4dff5d4
author
Eric Bower
date
2026-03-23 11:51:56 -0400 EDT
chore: wrap lines where applicable to 100
5 files changed,  +152, -36
M src/ipc.zig
+10, -2
 1@@ -74,7 +74,12 @@ pub fn send(fd: i32, tag: Tag, data: []const u8) !void {
 2     }
 3 }
 4 
 5-pub fn appendMessage(alloc: std.mem.Allocator, list: *std.ArrayList(u8), tag: Tag, data: []const u8) !void {
 6+pub fn appendMessage(
 7+    alloc: std.mem.Allocator,
 8+    list: *std.ArrayList(u8),
 9+    tag: Tag,
10+    data: []const u8,
11+) !void {
12     const header = Header{
13         .tag = tag,
14         .len = @intCast(data.len),
15@@ -178,7 +183,10 @@ const SessionProbeResult = struct {
16     info: Info,
17 };
18 
19-pub fn probeSession(alloc: std.mem.Allocator, socket_path: []const u8) SessionProbeError!SessionProbeResult {
20+pub fn probeSession(
21+    alloc: std.mem.Allocator,
22+    socket_path: []const u8,
23+) SessionProbeError!SessionProbeResult {
24     const timeout_ms = 1000;
25     const fd = socket.sessionConnect(socket_path) catch |err| switch (err) {
26         error.ConnectionRefused => return error.ConnectionRefused,
M src/log.zig
+15, -3
 1@@ -13,7 +13,10 @@ pub const LogSystem = struct {
 2         self.path = try alloc.dupe(u8, path);
 3 
 4         const file = std.fs.openFileAbsolute(path, .{ .mode = .read_write }) catch |err| switch (err) {
 5-            error.FileNotFound => try std.fs.createFileAbsolute(path, .{ .read = true, .mode = 0o640 }),
 6+            error.FileNotFound => try std.fs.createFileAbsolute(
 7+                path,
 8+                .{ .read = true, .mode = 0o640 },
 9+            ),
10             else => return err,
11         };
12 
13@@ -28,7 +31,13 @@ pub const LogSystem = struct {
14         if (self.path.len > 0) self.alloc.free(self.path);
15     }
16 
17-    pub fn log(self: *LogSystem, comptime level: std.log.Level, comptime scope: @Type(.enum_literal), comptime format: []const u8, args: anytype) void {
18+    pub fn log(
19+        self: *LogSystem,
20+        comptime level: std.log.Level,
21+        comptime scope: @Type(.enum_literal),
22+        comptime format: []const u8,
23+        args: anytype,
24+    ) void {
25         self.mutex.lock();
26         defer self.mutex.unlock();
27 
28@@ -82,7 +91,10 @@ pub const LogSystem = struct {
29             else => return err,
30         };
31 
32-        self.file = try std.fs.createFileAbsolute(self.path, .{ .truncate = true, .read = true, .mode = 0o640 });
33+        self.file = try std.fs.createFileAbsolute(
34+            self.path,
35+            .{ .truncate = true, .read = true, .mode = 0o640 },
36+        );
37         self.current_size = 0;
38     }
39 };
M src/main.zig
+100, -25
  1@@ -355,7 +355,12 @@ const Daemon = struct {
  2 
  3         const shell = util.detectShell();
  4         // Use "-shellname" as argv[0] to signal login shell (traditional method)
  5-        const login_shell = try std.fmt.allocPrintSentinel(alloc, "-{s}", .{std.fs.path.basename(shell)}, 0);
  6+        const login_shell = try std.fmt.allocPrintSentinel(
  7+            alloc,
  8+            "-{s}",
  9+            .{std.fs.path.basename(shell)},
 10+            0,
 11+        );
 12         const argv = [_:null]?[*:0]const u8{ login_shell, null };
 13         const err = std.posix.execveZ(shell, &argv, std.c.environ);
 14         std.log.err("execve failed: err={s}", .{@errorName(err)});
 15@@ -412,7 +417,10 @@ const Daemon = struct {
 16             if (ipc.probeSession(self.alloc, self.socket_path)) |result| {
 17                 posix.close(result.fd);
 18                 if (self.command != null) {
 19-                    std.log.warn("session already exists, ignoring command session={s}", .{self.session_name});
 20+                    std.log.warn(
 21+                        "session already exists, ignoring command session={s}",
 22+                        .{self.session_name},
 23+                    );
 24                 }
 25             } else |err| switch (err) {
 26                 // Daemon is definitively gone: safe to replace.
 27@@ -424,7 +432,10 @@ const Daemon = struct {
 28                 // The probe is only to decide create-vs-attach; the session
 29                 // exists, so proceed to attach rather than fail or orphan.
 30                 else => {
 31-                    std.log.warn("probe slow ({s}), proceeding to attach session={s}", .{ @errorName(err), self.session_name });
 32+                    std.log.warn(
 33+                        "probe slow ({s}), proceeding to attach session={s}",
 34+                        .{ @errorName(err), self.session_name },
 35+                    );
 36                 },
 37             }
 38         }
 39@@ -440,9 +451,16 @@ const Daemon = struct {
 40                 _ = try posix.setsid();
 41 
 42                 log_system.deinit();
 43-                const session_log_name = try std.fmt.allocPrint(self.alloc, "{s}.log", .{self.session_name});
 44+                const session_log_name = try std.fmt.allocPrint(
 45+                    self.alloc,
 46+                    "{s}.log",
 47+                    .{self.session_name},
 48+                );
 49                 defer self.alloc.free(session_log_name);
 50-                const session_log_path = try std.fs.path.join(self.alloc, &.{ self.cfg.log_dir, session_log_name });
 51+                const session_log_path = try std.fs.path.join(
 52+                    self.alloc,
 53+                    &.{ self.cfg.log_dir, session_log_name },
 54+                );
 55                 defer self.alloc.free(session_log_path);
 56                 try log_system.init(self.alloc, session_log_path);
 57 
 58@@ -490,9 +508,15 @@ const Daemon = struct {
 59         while (remaining.len > 0) {
 60             const n = posix.write(pty_fd, remaining) catch |err| {
 61                 if (err == error.WouldBlock) {
 62-                    std.log.warn("pty write dropped {d}/{d} bytes (buffer full)", .{ remaining.len, data.len });
 63+                    std.log.warn(
 64+                        "pty write dropped {d}/{d} bytes (buffer full)",
 65+                        .{ remaining.len, data.len },
 66+                    );
 67                 } else {
 68-                    std.log.warn("pty write failed, {d} bytes lost: {s}", .{ remaining.len, @errorName(err) });
 69+                    std.log.warn(
 70+                        "pty write failed, {d} bytes lost: {s}",
 71+                        .{ remaining.len, @errorName(err) },
 72+                    );
 73                 }
 74                 return;
 75             };
 76@@ -526,12 +550,18 @@ const Daemon = struct {
 77         // interfering with shell initialization (DA1 queries, etc.)
 78         if (self.has_pty_output and self.has_had_client) {
 79             const cursor = &term.screens.active.cursor;
 80-            std.log.debug("cursor before serialize: x={d} y={d} pending_wrap={}", .{ cursor.x, cursor.y, cursor.pending_wrap });
 81+            std.log.debug(
 82+                "cursor before serialize: x={d} y={d} pending_wrap={}",
 83+                .{ cursor.x, cursor.y, cursor.pending_wrap },
 84+            );
 85             if (util.serializeTerminalState(self.alloc, term)) |term_output| {
 86                 std.log.debug("serialize terminal state", .{});
 87                 defer self.alloc.free(term_output);
 88                 ipc.appendMessage(self.alloc, &client.write_buf, .Output, term_output) catch |err| {
 89-                    std.log.warn("failed to buffer terminal state for client err={s}", .{@errorName(err)});
 90+                    std.log.warn(
 91+                        "failed to buffer terminal state for client err={s}",
 92+                        .{@errorName(err)},
 93+                    );
 94                 };
 95                 client.has_pending_output = true;
 96             }
 97@@ -552,7 +582,12 @@ const Daemon = struct {
 98         std.log.debug("init resize rows={d} cols={d}", .{ resize.rows, resize.cols });
 99     }
100 
101-    pub fn handleResize(self: *Daemon, pty_fd: i32, term: *ghostty_vt.Terminal, payload: []const u8) !void {
102+    pub fn handleResize(
103+        self: *Daemon,
104+        pty_fd: i32,
105+        term: *ghostty_vt.Terminal,
106+        payload: []const u8,
107+    ) !void {
108         if (payload.len != @sizeOf(ipc.Resize)) return;
109 
110         const resize = std.mem.bytesToValue(ipc.Resize, payload);
111@@ -584,7 +619,8 @@ const Daemon = struct {
112     pub fn handleKill(self: *Daemon) void {
113         std.log.info("kill received session={s}", .{self.session_name});
114         self.shutdown();
115-        // gracefully shutdown shell processes, shells tend to ignore SIGTERM so we send SIGHUP instead
116+        // gracefully shutdown shell processes, shells tend to ignore SIGTERM so we send SIGHUP
117+        // instead
118         //   https://www.gnu.org/software/bash/manual/html_node/Signals.html
119         // negative pid means kill process and children
120         std.log.info("sending SIGHUP session={s} pid={d}", .{ self.session_name, self.pid });
121@@ -653,7 +689,12 @@ const Daemon = struct {
122         client.has_pending_output = true;
123     }
124 
125-    pub fn handleHistory(self: *Daemon, client: *Client, term: *ghostty_vt.Terminal, payload: []const u8) !void {
126+    pub fn handleHistory(
127+        self: *Daemon,
128+        client: *Client,
129+        term: *ghostty_vt.Terminal,
130+        payload: []const u8,
131+    ) !void {
132         const format: util.HistoryFormat = if (payload.len > 0)
133             std.meta.intToEnum(util.HistoryFormat, payload[0]) catch .plain
134         else
135@@ -784,7 +825,10 @@ fn wait(cfg: *Cfg, session_names: std.ArrayList([]const u8)) !void {
136                 // is no longer deleted, so this session would otherwise
137                 // persist as task_ended_at==0 forever → infinite "still
138                 // waiting". Count it as done+failed so wait terminates.
139-                try stderr.print("task unreachable: {s} ({s})\n", .{ session.name, session.error_name orelse "unknown" });
140+                try stderr.print(
141+                    "task unreachable: {s} ({s})\n",
142+                    .{ session.name, session.error_name orelse "unknown" },
143+                );
144                 try stderr.flush();
145                 agg_exit_code = 1;
146                 done += 1;
147@@ -810,7 +854,10 @@ fn wait(cfg: *Cfg, session_names: std.ArrayList([]const u8)) !void {
148         // crashed and the remaining N-1 happen to be done, total==done
149         // would be a false success.
150         if (total < max_seen) {
151-            try stderr.print("error: {d} session(s) disappeared before completing\n", .{max_seen - total});
152+            try stderr.print(
153+                "error: {d} session(s) disappeared before completing\n",
154+                .{max_seen - total},
155+            );
156             try stderr.flush();
157             std.process.exit(1);
158             return;
159@@ -943,7 +990,10 @@ fn kill(cfg: *Cfg, session_name: []const u8) !void {
160             socket.cleanupStaleSocket(dir, session_name);
161             w.interface.print("cleaned up stale session {s}\n", .{session_name}) catch {};
162         } else {
163-            w.interface.print("session {s} is unresponsive ({s}) -- daemon may be busy, try again or kill the process directly\n", .{ session_name, @errorName(err) }) catch {};
164+            w.interface.print(
165+                "session {s} is unresponsive ({s}) -- daemon may be busy, try again or kill the process directly\n",
166+                .{ session_name, @errorName(err) },
167+            ) catch {};
168         }
169         w.interface.flush() catch {};
170         return;
171@@ -1193,7 +1243,9 @@ fn run(daemon: *Daemon, command_args: [][]const u8) !void {
172         else => return err,
173     };
174 
175-    var poll_fds = [_]posix.pollfd{.{ .fd = probe_result.fd, .events = posix.POLL.IN, .revents = 0 }};
176+    var poll_fds = [_]posix.pollfd{
177+        .{ .fd = probe_result.fd, .events = posix.POLL.IN, .revents = 0 },
178+    };
179     const poll_result = posix.poll(&poll_fds, 5000) catch return error.PollFailed;
180     if (poll_result == 0) {
181         std.log.err("timeout waiting for ack", .{});
182@@ -1297,7 +1349,8 @@ fn clientLoop(client_sock_fd: i32) !void {
183         };
184 
185         // Handle stdin -> socket (Input)
186-        if (poll_fds.items[0].revents & (posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
187+        const inp_flags = (posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL);
188+        if (poll_fds.items[0].revents & inp_flags != 0) {
189             var buf: [4096]u8 = undefined;
190             const n_opt: ?usize = posix.read(stdin_fd, &buf) catch |err| blk: {
191                 if (err == error.WouldBlock) break :blk null;
192@@ -1397,7 +1450,10 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
193 
194     daemon_loop: while (daemon.running) {
195         if (sigterm_received.swap(false, .acq_rel)) {
196-            std.log.info("SIGTERM received, shutting down gracefully session={s}", .{daemon.session_name});
197+            std.log.info(
198+                "SIGTERM received, shutting down gracefully session={s}",
199+                .{daemon.session_name},
200+            );
201             break :daemon_loop;
202         }
203 
204@@ -1436,7 +1492,12 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
205             std.log.err("server socket error revents={d}", .{poll_fds.items[0].revents});
206             break :daemon_loop;
207         } else if (poll_fds.items[0].revents & posix.POLL.IN != 0) {
208-            const client_fd = try posix.accept(server_sock_fd, null, null, posix.SOCK.NONBLOCK | posix.SOCK.CLOEXEC);
209+            const client_fd = try posix.accept(
210+                server_sock_fd,
211+                null,
212+                null,
213+                posix.SOCK.NONBLOCK | posix.SOCK.CLOEXEC,
214+            );
215             const client = try daemon.alloc.create(Client);
216             client.* = Client{
217                 .alloc = daemon.alloc,
218@@ -1446,10 +1507,14 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
219             };
220             client.write_buf = try std.ArrayList(u8).initCapacity(client.alloc, 4096);
221             try daemon.clients.append(daemon.alloc, client);
222-            std.log.info("client connected fd={d} total={d}", .{ client_fd, daemon.clients.items.len });
223+            std.log.info(
224+                "client connected fd={d} total={d}",
225+                .{ client_fd, daemon.clients.items.len },
226+            );
227         }
228 
229-        if (poll_fds.items[1].revents & (posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
230+        const inp_flags = posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL;
231+        if (poll_fds.items[1].revents & inp_flags != 0) {
232             // Read from PTY
233             var buf: [4096]u8 = undefined;
234             const n_opt: ?usize = posix.read(pty_fd, &buf) catch |err| blk: {
235@@ -1490,7 +1555,10 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
236                     // Broadcast data to all clients
237                     for (daemon.clients.items) |client| {
238                         ipc.appendMessage(daemon.alloc, &client.write_buf, .Output, buf[0..n]) catch |err| {
239-                            std.log.warn("failed to buffer output for client err={s}", .{@errorName(err)});
240+                            std.log.warn(
241+                                "failed to buffer output for client err={s}",
242+                                .{@errorName(err)},
243+                            );
244                             continue;
245                         };
246                         client.has_pending_output = true;
247@@ -1505,7 +1573,8 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
248         // So number of clients in poll_fds is poll_fds.items.len - 2
249         const num_polled_clients = poll_fds.items.len - 2;
250         if (i > num_polled_clients) {
251-            // If we have more clients than polled (i.e. we just accepted one), start from the polled ones
252+            // If we have more clients than polled (i.e. we just accepted one), start from the
253+            // polled ones
254             i = num_polled_clients;
255         }
256 
257@@ -1517,7 +1586,10 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
258             if (revents & posix.POLL.IN != 0) {
259                 const n = client.read_buf.read(client.socket_fd) catch |err| {
260                     if (err == error.WouldBlock) continue;
261-                    std.log.debug("client read err={s} fd={d}", .{ @errorName(err), client.socket_fd });
262+                    std.log.debug(
263+                        "client read err={s} fd={d}",
264+                        .{ @errorName(err), client.socket_fd },
265+                    );
266                     const last = daemon.closeClient(client, i, false);
267                     if (last) break :daemon_loop;
268                     continue;
269@@ -1550,7 +1622,10 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
270                         .History => try daemon.handleHistory(client, &term, msg.payload),
271                         .Run => try daemon.handleRun(client, pty_fd, msg.payload),
272                         .Output, .Ack => {},
273-                        _ => std.log.warn("ignoring unknown IPC tag={d}", .{@intFromEnum(msg.header.tag)}),
274+                        _ => std.log.warn(
275+                            "ignoring unknown IPC tag={d}",
276+                            .{@intFromEnum(msg.header.tag)},
277+                        ),
278                     }
279                 }
280             }
M src/socket.zig
+10, -2
 1@@ -58,7 +58,11 @@ pub fn createSocket(fname: []const u8) !i32 {
 2     // AF.UNIX: Unix domain socket for local IPC with client processes
 3     // SOCK.STREAM: Reliable, bidirectional communication
 4     // SOCK.NONBLOCK: Set socket to non-blocking
 5-    const fd = try posix.socket(posix.AF.UNIX, posix.SOCK.STREAM | posix.SOCK.NONBLOCK | posix.SOCK.CLOEXEC, 0);
 6+    const fd = try posix.socket(
 7+        posix.AF.UNIX,
 8+        posix.SOCK.STREAM | posix.SOCK.NONBLOCK | posix.SOCK.CLOEXEC,
 9+        0,
10+    );
11     errdefer posix.close(fd);
12 
13     var unix_addr = try std.net.Address.initUnix(fname);
14@@ -74,7 +78,11 @@ pub const max_socket_path_len: usize = @typeInfo(
15     @TypeOf(@as(posix.sockaddr.un, undefined).path),
16 ).array.len - 1;
17 
18-pub fn getSocketPath(alloc: std.mem.Allocator, socket_dir: []const u8, session_name: []const u8) error{ NameTooLong, OutOfMemory }![]const u8 {
19+pub fn getSocketPath(
20+    alloc: std.mem.Allocator,
21+    socket_dir: []const u8,
22+    session_name: []const u8,
23+) error{ NameTooLong, OutOfMemory }![]const u8 {
24     const dir = socket_dir;
25     const path_len = dir.len + 1 + session_name.len;
26     if (path_len > max_socket_path_len) return error.NameTooLong;
M src/util.zig
+17, -4
 1@@ -27,7 +27,10 @@ pub const SessionEntry = struct {
 2     }
 3 };
 4 
 5-pub fn get_session_entries(alloc: std.mem.Allocator, socket_dir: []const u8) !std.ArrayList(SessionEntry) {
 6+pub fn get_session_entries(
 7+    alloc: std.mem.Allocator,
 8+    socket_dir: []const u8,
 9+) !std.ArrayList(SessionEntry) {
10     var dir = try std.fs.openDirAbsolute(socket_dir, .{ .iterate = true });
11     defer dir.close();
12     var iter = dir.iterate();
13@@ -102,7 +105,8 @@ pub fn shellNeedsQuoting(arg: []const u8) bool {
14     if (arg.len == 0) return true;
15     for (arg) |ch| {
16         switch (ch) {
17-            ' ', '\t', '"', '\'', '\\', '$', '`', '!', '(', ')', '{', '}', '[', ']', '|', '&', ';', '<', '>', '?', '*', '~', '#', '\n' => return true,
18+            ' ', '\t', '"', '\'', '\\', '$', '`', '!', '(', ')', '{', '}', '[', ']' => return true,
19+            '|', '&', ';', '<', '>', '?', '*', '~', '#', '\n' => return true,
20             else => {},
21         }
22     }
23@@ -333,7 +337,11 @@ pub const HistoryFormat = enum(u8) {
24     html = 2,
25 };
26 
27-pub fn serializeTerminal(alloc: std.mem.Allocator, term: *ghostty_vt.Terminal, format: HistoryFormat) ?[]const u8 {
28+pub fn serializeTerminal(
29+    alloc: std.mem.Allocator,
30+    term: *ghostty_vt.Terminal,
31+    format: HistoryFormat,
32+) ?[]const u8 {
33     var builder: std.Io.Writer.Allocating = .init(alloc);
34     defer builder.deinit();
35 
36@@ -378,7 +386,12 @@ pub fn detectShell() [:0]const u8 {
37 
38 /// Formats a session entry for list output (only the name when `short` is
39 /// true), adding a prefix to indicate the current session, if there is one.
40-pub fn writeSessionLine(writer: *std.Io.Writer, session: SessionEntry, short: bool, current_session: ?[]const u8) !void {
41+pub fn writeSessionLine(
42+    writer: *std.Io.Writer,
43+    session: SessionEntry,
44+    short: bool,
45+    current_session: ?[]const u8,
46+) !void {
47     const current_arrow = "→";
48     const prefix = if (current_session) |current|
49         if (std.mem.eql(u8, current, session.name)) current_arrow ++ " " else "  "