repos / zmx

session persistence for terminal processes
git clone https://github.com/neurosnap/zmx.git

commit
d48b642
parent
77d03a5
author
Eric Bower
date
2026-04-03 11:26:35 -0400 EDT
feat: new client leader policy

The last client to send user input bytes (non-ansi escape codes) becomes the leader. The client
leader controls resizing and any other terminal state changes. Non-leader clients are read-only
until they send user input bytes and takeover leadership.

Closes: https://github.com/neurosnap/zmx/issues/73 
2 files changed,  +81, -7
M CHANGELOG.md
+6, -0
 1@@ -4,6 +4,12 @@ Use spec: https://common-changelog.org/
 2 
 3 ## Staged
 4 
 5+### Added
 6+
 7+- New client leader policy: last client to send user input bytes (non-ansi escape codes) becomes the leader
 8+  - The client leader controls resizing and any other terminal state changes
 9+  - Non-leader clients are read-only until they send user input bytes and takeover leadership
10+
11 ### Changed
12 
13 - `zmx kill` now supports multiple args and it will kill sessions that match a prefix
M src/main.zig
+75, -7
  1@@ -122,6 +122,7 @@ pub fn main() !void {
  2             .command = command,
  3             .cwd = cwd,
  4             .created_at = @intCast(std.time.timestamp()),
  5+            .leader_client_fd = null,
  6         };
  7         daemon.socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
  8             error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
  9@@ -157,6 +158,7 @@ pub fn main() !void {
 10             .created_at = @intCast(std.time.timestamp()),
 11             .is_task_mode = true,
 12             .task_command = cmd_args_raw.items,
 13+            .leader_client_fd = undefined,
 14         };
 15         daemon.socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
 16             error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
 17@@ -334,6 +336,7 @@ const Daemon = struct {
 18     cfg: *Cfg,
 19     alloc: std.mem.Allocator,
 20     clients: std.ArrayList(*Client),
 21+    leader_client_fd: ?i32,
 22     session_name: []const u8,
 23     socket_path: []const u8,
 24     running: bool,
 25@@ -356,7 +359,7 @@ const Daemon = struct {
 26     }
 27 
 28     pub fn shutdown(self: *Daemon) void {
 29-        std.log.info("shutting down daemon session_name={s}", .{self.session_name});
 30+        std.log.info("shutting down daemon session={s}", .{self.session_name});
 31         self.running = false;
 32 
 33         for (self.clients.items) |client| {
 34@@ -538,7 +541,7 @@ const Daemon = struct {
 35                     posix.close(pty_fd);
 36                     _ = posix.waitpid(self.pid, 0);
 37                     posix.close(server_sock_fd);
 38-                    std.log.info("deleting socket file session_name={s}", .{self.session_name});
 39+                    std.log.info("deleting socket file session={s}", .{self.session_name});
 40                     dir.deleteFile(self.session_name) catch |err| {
 41                         std.log.warn("failed to delete socket file err={s}", .{@errorName(err)});
 42                     };
 43@@ -571,6 +574,7 @@ const Daemon = struct {
 44             );
 45             return;
 46         }
 47+        std.log.debug("buffering pty input data={x}", .{data});
 48         self.pty_write_buf.appendSlice(self.alloc, data) catch |err| {
 49             std.log.warn(
 50                 "pty input dropped {d} bytes: {s}",
 51@@ -579,8 +583,50 @@ const Daemon = struct {
 52         };
 53     }
 54 
 55-    pub fn handleInput(self: *Daemon, payload: []const u8) void {
 56-        self.queuePtyInput(payload);
 57+    pub fn handleInput(self: *Daemon, client: *Client, payload: []const u8) !void {
 58+        // client is leader, send entire payload (ansi escape codes + text)
 59+        if (self.leader_client_fd == client.socket_fd) {
 60+            self.queuePtyInput(payload);
 61+            return;
 62+        }
 63+
 64+        // quick check to see if a newline happened so we can set that client to leader
 65+        // without creating a ghostty vt
 66+        if (std.mem.indexOfScalar(u8, payload, '\r')) |_| {
 67+            std.log.info(
 68+                "setting new leader session={s} client_fd={d}",
 69+                .{ self.session_name, client.socket_fd },
 70+            );
 71+            self.leader_client_fd = client.socket_fd;
 72+            self.queuePtyInput(payload);
 73+            return;
 74+        }
 75+
 76+        // check if leader needs to be updated
 77+        // this is probably really ineffecient but it was the easiest and most robust way
 78+        // to strip ansi escape codes and only detect plain text to determine if we need
 79+        // to set a new leader
 80+        var termx = try ghostty_vt.Terminal.init(client.alloc, .{
 81+            .cols = 80,
 82+            .rows = 24,
 83+        });
 84+        defer termx.deinit(client.alloc);
 85+        var vt_stream = termx.vtStream();
 86+        defer vt_stream.deinit();
 87+        try vt_stream.nextSlice(payload);
 88+        if (util.serializeTerminal(client.alloc, &termx, .plain)) |output| {
 89+            defer client.alloc.free(output);
 90+            // if there's no text output then this client is effectively read-only until they type
 91+            if (output.len > 0) {
 92+                std.log.info(
 93+                    "setting new leader session={s} client_fd={d}",
 94+                    .{ self.session_name, client.socket_fd },
 95+                );
 96+                self.leader_client_fd = client.socket_fd;
 97+                // new leader is set to this client so send *entire* payload
 98+                self.queuePtyInput(payload);
 99+            }
100+        }
101     }
102 
103     pub fn handleInit(
104@@ -591,6 +637,10 @@ const Daemon = struct {
105         payload: []const u8,
106     ) !void {
107         if (payload.len != @sizeOf(ipc.Resize)) return;
108+        // no leader is set so set one
109+        if (self.leader_client_fd == null) {
110+            self.leader_client_fd = client.socket_fd;
111+        }
112 
113         const resize = std.mem.bytesToValue(ipc.Resize, payload);
114 
115@@ -635,11 +685,21 @@ const Daemon = struct {
116 
117     pub fn handleResize(
118         self: *Daemon,
119+        client: *Client,
120         pty_fd: i32,
121         term: *ghostty_vt.Terminal,
122         payload: []const u8,
123     ) !void {
124         if (payload.len != @sizeOf(ipc.Resize)) return;
125+        if (self.leader_client_fd == null) {
126+            std.log.info(
127+                "setting new leader session={s} client_fd={d}",
128+                .{ self.session_name, client.socket_fd },
129+            );
130+            self.leader_client_fd = client.socket_fd;
131+        }
132+        // only leader can resize
133+        if (self.leader_client_fd != client.socket_fd) return;
134 
135         const resize = std.mem.bytesToValue(ipc.Resize, payload);
136         var ws: cross.c.struct_winsize = .{
137@@ -654,7 +714,15 @@ const Daemon = struct {
138     }
139 
140     pub fn handleDetach(self: *Daemon, client: *Client, i: usize) void {
141-        std.log.info("client detach fd={d}", .{client.socket_fd});
142+        std.log.info("client detach session={s} fd={d}", .{ self.session_name, client.socket_fd });
143+        // leader is trying to disconnect, remove ref and let another client claim leader on input
144+        if (self.leader_client_fd == client.socket_fd) {
145+            std.log.info(
146+                "unsetting leader session={s} fd={d}",
147+                .{ self.session_name, client.socket_fd },
148+            );
149+            self.leader_client_fd = null;
150+        }
151         _ = self.closeClient(client, i, false);
152     }
153 
154@@ -1680,9 +1748,9 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
155 
156                 while (client.read_buf.next()) |msg| {
157                     switch (msg.header.tag) {
158-                        .Input => daemon.handleInput(msg.payload),
159+                        .Input => try daemon.handleInput(client, msg.payload),
160                         .Init => try daemon.handleInit(client, pty_fd, &term, msg.payload),
161-                        .Resize => try daemon.handleResize(pty_fd, &term, msg.payload),
162+                        .Resize => try daemon.handleResize(client, pty_fd, &term, msg.payload),
163                         .Detach => {
164                             daemon.handleDetach(client, i);
165                             break :clients_loop;