repos / zmx

session persistence for terminal processes
git clone https://github.com/neurosnap/zmx.git

commit
3477fb2
parent
98fa966
author
Ian Tay
date
2026-03-18 15:54:12 -0400 EDT
feat(daemon): buffer PTY stdin writes and flush via POLLOUT

Replaces the best-effort ptyWrite (drop on EAGAIN) with a buffered
queue flushed by the daemon's poll loop. Mirrors the pattern already
used for client-socket writes.

- pty_write_buf on Daemon, capped at 256KB (drop new payload on
  overflow — same failure mode as before, 64x higher threshold)
- POLLOUT registered on pty_fd when buffer non-empty; flush handler
  loops until EAGAIN
- handleInput/handleRun queue instead of writing directly
- respondToDeviceAttributes routes through the same buffer so DA
  responses can't interleave with a draining .Run payload after
  client disconnect

Follow-up to #82.
2 files changed,  +60, -43
M src/main.zig
+52, -40
  1@@ -341,9 +341,11 @@ const Daemon = struct {
  2     task_exit_code: ?u8 = null, // null = running or n/a, set when task completes
  3     task_ended_at: ?u64 = null, // timestamp when task exited
  4     task_command: ?[]const []const u8 = null,
  5+    pty_write_buf: std.ArrayList(u8) = .empty,
  6 
  7     pub fn deinit(self: *Daemon) void {
  8         self.clients.deinit(self.alloc);
  9+        self.pty_write_buf.deinit(self.alloc);
 10         self.alloc.free(self.socket_path);
 11     }
 12 
 13@@ -547,40 +549,32 @@ const Daemon = struct {
 14         return .{ .created = false, .is_daemon = false };
 15     }
 16 
 17-    /// Best-effort write to the (non-blocking) PTY fd. Retries short writes
 18-    /// until complete, but on WouldBlock (kernel buffer full) gives up and
 19-    /// drops the remainder — the daemon is single-threaded, so blocking here
 20-    /// to wait for POLLOUT would deadlock against a shell that's itself
 21-    /// blocked writing echo to a full PTY output buffer that we're not
 22-    /// draining. Dropping is the same trade-off the old code made implicitly
 23-    /// (short writes were silently truncated), just without the crash.
 24-    fn ptyWrite(pty_fd: i32, data: []const u8) void {
 25-        var remaining = data;
 26-        while (remaining.len > 0) {
 27-            const n = posix.write(pty_fd, remaining) catch |err| {
 28-                if (err == error.WouldBlock) {
 29-                    std.log.warn(
 30-                        "pty write dropped {d}/{d} bytes (buffer full)",
 31-                        .{ remaining.len, data.len },
 32-                    );
 33-                } else {
 34-                    std.log.warn(
 35-                        "pty write failed, {d} bytes lost: {s}",
 36-                        .{ remaining.len, @errorName(err) },
 37-                    );
 38-                }
 39-                return;
 40-            };
 41-            if (n == 0) return;
 42-            remaining = remaining[n..];
 43+    const PTY_WRITE_BUF_MAX = 256 * 1024;
 44+
 45+    /// Queue bytes for the PTY's stdin. Flushed by daemonLoop on POLLOUT.
 46+    /// Drops the payload if the buffer is over cap -- same failure mode as
 47+    /// the old direct-write ptyWrite (drop on EAGAIN), just at a 64x higher
 48+    /// threshold. Capping avoids OOM when the shell stops reading; dropping
 49+    /// new (not old) bytes avoids tearing a partially-accepted sequence.
 50+    fn queuePtyInput(self: *Daemon, data: []const u8) void {
 51+        if (data.len == 0) return;
 52+        if (self.pty_write_buf.items.len + data.len > PTY_WRITE_BUF_MAX) {
 53+            std.log.warn(
 54+                "pty input dropped {d} bytes (buffer full, shell not reading)",
 55+                .{data.len},
 56+            );
 57+            return;
 58         }
 59+        self.pty_write_buf.appendSlice(self.alloc, data) catch |err| {
 60+            std.log.warn(
 61+                "pty input dropped {d} bytes: {s}",
 62+                .{ data.len, @errorName(err) },
 63+            );
 64+        };
 65     }
 66 
 67-    pub fn handleInput(self: *Daemon, pty_fd: i32, payload: []const u8) void {
 68-        _ = self;
 69-        if (payload.len > 0) {
 70-            ptyWrite(pty_fd, payload);
 71-        }
 72+    pub fn handleInput(self: *Daemon, payload: []const u8) void {
 73+        self.queuePtyInput(payload);
 74     }
 75 
 76     pub fn handleInit(
 77@@ -760,7 +754,7 @@ const Daemon = struct {
 78         }
 79     }
 80 
 81-    pub fn handleRun(self: *Daemon, client: *Client, pty_fd: i32, payload: []const u8) !void {
 82+    pub fn handleRun(self: *Daemon, client: *Client, payload: []const u8) !void {
 83         // Reset task tracking so the new command's exit marker is detected.
 84         // Without this, a second `zmx run` on the same session is ignored
 85         // because task_exit_code is still set from the first run.
 86@@ -768,9 +762,7 @@ const Daemon = struct {
 87         self.task_ended_at = null;
 88         self.is_task_mode = true;
 89 
 90-        if (payload.len > 0) {
 91-            ptyWrite(pty_fd, payload);
 92-        }
 93+        self.queuePtyInput(payload);
 94         try ipc.appendMessage(self.alloc, &client.write_buf, .Ack, "");
 95         client.has_pending_output = true;
 96         self.has_had_client = true;
 97@@ -1523,9 +1515,13 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
 98             .revents = 0,
 99         });
100 
101+        var pty_events: i16 = posix.POLL.IN;
102+        if (daemon.pty_write_buf.items.len > 0) {
103+            pty_events |= posix.POLL.OUT;
104+        }
105         try poll_fds.append(daemon.alloc, .{
106             .fd = pty_fd,
107-            .events = posix.POLL.IN,
108+            .events = pty_events,
109             .revents = 0,
110         });
111 
112@@ -1595,8 +1591,10 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
113                     // This prevents shells like from fish from waiting 2s
114                     // and then sending a no DA query response warning because
115                     // there's no client terminal to respond to the query.
116-                    if (daemon.clients.items.len == 0) {
117-                        util.respondToDeviceAttributes(pty_fd, buf[0..n]);
118+                    if (daemon.clients.items.len == 0 and
119+                        daemon.pty_write_buf.items.len < Daemon.PTY_WRITE_BUF_MAX)
120+                    {
121+                        util.respondToDeviceAttributes(daemon.alloc, &daemon.pty_write_buf, buf[0..n]);
122                     }
123 
124                     // In run mode, scan output for exit code marker
125@@ -1625,6 +1623,20 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
126             }
127         }
128 
129+        if (poll_fds.items[1].revents & posix.POLL.OUT != 0) {
130+            while (daemon.pty_write_buf.items.len > 0) {
131+                const n = posix.write(pty_fd, daemon.pty_write_buf.items) catch |err| {
132+                    if (err != error.WouldBlock) {
133+                        std.log.warn("pty write failed: {s}", .{@errorName(err)});
134+                        daemon.pty_write_buf.clearRetainingCapacity();
135+                    }
136+                    break;
137+                };
138+                if (n == 0) break;
139+                daemon.pty_write_buf.replaceRange(daemon.alloc, 0, n, &[_]u8{}) catch unreachable;
140+            }
141+        }
142+
143         var i: usize = daemon.clients.items.len;
144         // Only iterate over clients that were present when poll_fds was constructed
145         // poll_fds contains [server, pty, client0, client1, ...]
146@@ -1662,7 +1674,7 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
147 
148                 while (client.read_buf.next()) |msg| {
149                     switch (msg.header.tag) {
150-                        .Input => daemon.handleInput(pty_fd, msg.payload),
151+                        .Input => daemon.handleInput(msg.payload),
152                         .Init => try daemon.handleInit(client, pty_fd, &term, msg.payload),
153                         .Resize => try daemon.handleResize(pty_fd, &term, msg.payload),
154                         .Detach => {
155@@ -1678,7 +1690,7 @@ fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
156                         },
157                         .Info => try daemon.handleInfo(client),
158                         .History => try daemon.handleHistory(client, &term, msg.payload),
159-                        .Run => try daemon.handleRun(client, pty_fd, msg.payload),
160+                        .Run => try daemon.handleRun(client, msg.payload),
161                         .Output, .Ack => {},
162                         _ => std.log.warn(
163                             "ignoring unknown IPC tag={d}",
M src/util.zig
+8, -3
 1@@ -145,11 +145,16 @@ const DA2_QUERY_EXPLICIT = "\x1b[>0c";
 2 const DA1_RESPONSE = "\x1b[?62;22c";
 3 const DA2_RESPONSE = "\x1b[>1;10;0c";
 4 
 5-pub fn respondToDeviceAttributes(pty_fd: i32, data: []const u8) void {
 6+pub fn respondToDeviceAttributes(alloc: std.mem.Allocator, buf: *std.ArrayList(u8), data: []const u8) void {
 7     // Scan for DA queries in PTY output and respond on behalf of the terminal.
 8     // This handles the case where no client is attached (e.g. zmx run)
 9     // and the shell (e.g. fish) sends a DA query that would otherwise go unanswered.
10     //
11+    // Responses are queued into the daemon's pty_write_buf (not written
12+    // directly) so they don't interleave with any already-buffered input —
13+    // e.g. a large `zmx run` payload still draining after the client
14+    // disconnected.
15+    //
16     // DA1 query: ESC [ c  or  ESC [ 0 c
17     // DA2 query: ESC [ > c  or  ESC [ > 0 c
18     // DA1 response (from terminal): ESC [ ? ... c  (has '?' after '[')
19@@ -164,9 +169,9 @@ pub fn respondToDeviceAttributes(pty_fd: i32, data: []const u8) void {
20                 continue;
21             }
22             if (matchSeq(data[i..], DA2_QUERY) or matchSeq(data[i..], DA2_QUERY_EXPLICIT)) {
23-                _ = posix.write(pty_fd, DA2_RESPONSE) catch {};
24+                buf.appendSlice(alloc, DA2_RESPONSE) catch {};
25             } else if (matchSeq(data[i..], DA1_QUERY) or matchSeq(data[i..], DA1_QUERY_EXPLICIT)) {
26-                _ = posix.write(pty_fd, DA1_RESPONSE) catch {};
27+                buf.appendSlice(alloc, DA1_RESPONSE) catch {};
28             }
29         }
30         i += 1;