repos / zmx

session persistence for terminal processes
git clone https://github.com/neurosnap/zmx.git

commit
ceb43cc
parent
9e8f01e
author
Eric Bower
date
2025-12-24 00:06:32 -0500 EST
refactor: make client socket write non-blocking
1 files changed,  +42, -37
M src/main.zig
+42, -37
  1@@ -616,11 +616,19 @@ fn clientLoop(_: *Cfg, client_sock_fd: i32) !void {
  2 
  3     setupSigwinchHandler();
  4 
  5-    // Send init message with terminal size
  6+    // Make socket non-blocking to avoid blocking on writes
  7+    const sock_flags = try posix.fcntl(client_sock_fd, posix.F.GETFL, 0);
  8+    _ = try posix.fcntl(client_sock_fd, posix.F.SETFL, sock_flags | posix.SOCK.NONBLOCK);
  9+
 10+    // Buffer for outgoing socket writes
 11+    var sock_write_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
 12+    defer sock_write_buf.deinit(alloc);
 13+
 14+    // Send init message with terminal size (buffered)
 15     const size = getTerminalSize(posix.STDOUT_FILENO);
 16-    ipc.send(client_sock_fd, .Init, std.mem.asBytes(&size)) catch {};
 17+    try ipc.appendMessage(alloc, &sock_write_buf, .Init, std.mem.asBytes(&size));
 18 
 19-    var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(alloc, 2);
 20+    var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(alloc, 4);
 21     defer poll_fds.deinit(alloc);
 22 
 23     var read_buf = try ipc.SocketBuffer.init(alloc);
 24@@ -642,10 +650,7 @@ fn clientLoop(_: *Cfg, client_sock_fd: i32) !void {
 25         // Check for pending SIGWINCH
 26         if (sigwinch_received.swap(false, .acq_rel)) {
 27             const next_size = getTerminalSize(posix.STDOUT_FILENO);
 28-            ipc.send(client_sock_fd, .Resize, std.mem.asBytes(&next_size)) catch |err| switch (err) {
 29-                error.BrokenPipe, error.ConnectionResetByPeer => return,
 30-                else => return err,
 31-            };
 32+            try ipc.appendMessage(alloc, &sock_write_buf, .Resize, std.mem.asBytes(&next_size));
 33         }
 34 
 35         poll_fds.clearRetainingCapacity();
 36@@ -656,9 +661,14 @@ fn clientLoop(_: *Cfg, client_sock_fd: i32) !void {
 37             .revents = 0,
 38         });
 39 
 40+        // Poll socket for read, and also for write if we have pending data
 41+        var sock_events: i16 = posix.POLL.IN;
 42+        if (sock_write_buf.items.len > 0) {
 43+            sock_events |= posix.POLL.OUT;
 44+        }
 45         try poll_fds.append(alloc, .{
 46             .fd = client_sock_fd,
 47-            .events = posix.POLL.IN,
 48+            .events = sock_events,
 49             .revents = 0,
 50         });
 51 
 52@@ -688,17 +698,11 @@ fn clientLoop(_: *Cfg, client_sock_fd: i32) !void {
 53                     // Check for Kitty keyboard protocol sequences first
 54                     switch (parseStdinInput(buf[0..n], &prefix_active)) {
 55                         .detach => {
 56-                            ipc.send(client_sock_fd, .Detach, "") catch |err| switch (err) {
 57-                                error.BrokenPipe, error.ConnectionResetByPeer => return,
 58-                                else => return err,
 59-                            };
 60+                            try ipc.appendMessage(alloc, &sock_write_buf, .Detach, "");
 61                             continue;
 62                         },
 63                         .send => |data| {
 64-                            ipc.send(client_sock_fd, .Input, data) catch |err| switch (err) {
 65-                                error.BrokenPipe, error.ConnectionResetByPeer => return,
 66-                                else => return err,
 67-                            };
 68+                            try ipc.appendMessage(alloc, &sock_write_buf, .Input, data);
 69                             continue;
 70                         },
 71                         .activate_prefix => continue,
 72@@ -711,17 +715,11 @@ fn clientLoop(_: *Cfg, client_sock_fd: i32) !void {
 73                         const action = parseStdinByte(buf[i], prefix_active);
 74                         switch (action) {
 75                             .detach => {
 76-                                ipc.send(client_sock_fd, .Detach, "") catch |err| switch (err) {
 77-                                    error.BrokenPipe, error.ConnectionResetByPeer => return,
 78-                                    else => return err,
 79-                                };
 80+                                try ipc.appendMessage(alloc, &sock_write_buf, .Detach, "");
 81                                 prefix_active = false;
 82                             },
 83                             .send => |data| {
 84-                                ipc.send(client_sock_fd, .Input, data) catch |err| switch (err) {
 85-                                    error.BrokenPipe, error.ConnectionResetByPeer => return,
 86-                                    else => return err,
 87-                                };
 88+                                try ipc.appendMessage(alloc, &sock_write_buf, .Input, data);
 89                                 prefix_active = false;
 90                             },
 91                             .activate_prefix => {
 92@@ -730,20 +728,11 @@ fn clientLoop(_: *Cfg, client_sock_fd: i32) !void {
 93                             .none => {
 94                                 if (prefix_active) {
 95                                     // Unknown prefix command, forward both ctrl+b and this key
 96-                                    ipc.send(client_sock_fd, .Input, &[_]u8{0x02}) catch |err| switch (err) {
 97-                                        error.BrokenPipe, error.ConnectionResetByPeer => return,
 98-                                        else => return err,
 99-                                    };
100-                                    ipc.send(client_sock_fd, .Input, buf[i .. i + 1]) catch |err| switch (err) {
101-                                        error.BrokenPipe, error.ConnectionResetByPeer => return,
102-                                        else => return err,
103-                                    };
104+                                    try ipc.appendMessage(alloc, &sock_write_buf, .Input, &[_]u8{0x02});
105+                                    try ipc.appendMessage(alloc, &sock_write_buf, .Input, buf[i .. i + 1]);
106                                     prefix_active = false;
107                                 } else {
108-                                    ipc.send(client_sock_fd, .Input, buf[i .. i + 1]) catch |err| switch (err) {
109-                                        error.BrokenPipe, error.ConnectionResetByPeer => return,
110-                                        else => return err,
111-                                    };
112+                                    try ipc.appendMessage(alloc, &sock_write_buf, .Input, buf[i .. i + 1]);
113                                 }
114                             },
115                         }
116@@ -755,7 +744,7 @@ fn clientLoop(_: *Cfg, client_sock_fd: i32) !void {
117             }
118         }
119 
120-        // Handle socket -> stdout (Output)
121+        // Handle socket read (incoming Output messages from daemon)
122         if (poll_fds.items[1].revents & posix.POLL.IN != 0) {
123             const n = read_buf.read(client_sock_fd) catch |err| {
124                 if (err == error.WouldBlock) continue;
125@@ -781,6 +770,22 @@ fn clientLoop(_: *Cfg, client_sock_fd: i32) !void {
126             }
127         }
128 
129+        // Handle socket write (flush buffered messages to daemon)
130+        if (poll_fds.items[1].revents & posix.POLL.OUT != 0) {
131+            if (sock_write_buf.items.len > 0) {
132+                const n = posix.write(client_sock_fd, sock_write_buf.items) catch |err| blk: {
133+                    if (err == error.WouldBlock) break :blk 0;
134+                    if (err == error.ConnectionResetByPeer or err == error.BrokenPipe) {
135+                        return;
136+                    }
137+                    return err;
138+                };
139+                if (n > 0) {
140+                    try sock_write_buf.replaceRange(alloc, 0, n, &[_]u8{});
141+                }
142+            }
143+        }
144+
145         if (stdout_buf.items.len > 0) {
146             const n = posix.write(posix.STDOUT_FILENO, stdout_buf.items) catch |err| blk: {
147                 if (err == error.WouldBlock) break :blk 0;