repos / zmx

session persistence for terminal processes
git clone https://github.com/neurosnap/zmx.git

commit
ef41342
parent
1c8d6c0
author
Eric Bower
date
2025-10-13 15:11:16 -0400 EDT
chore: remove pty json handling
3 files changed,  +66, -58
M src/attach.zig
+4, -30
 1@@ -18,6 +18,7 @@ const Context = struct {
 2     stream: xev.Stream,
 3     stdin_stream: xev.Stream,
 4     stdout_stream: xev.Stream,
 5+    socket_fd: std.posix.fd_t,
 6     allocator: std.mem.Allocator,
 7     loop: *xev.Loop,
 8     session_name: []const u8,
 9@@ -100,6 +101,7 @@ pub fn main(config: config_mod.Config, iter: *std.process.ArgIterator) !void {
10         .stream = xev.Stream.initFd(socket_fd),
11         .stdin_stream = xev.Stream.initFd(posix.STDIN_FILENO),
12         .stdout_stream = xev.Stream.initFd(posix.STDOUT_FILENO),
13+        .socket_fd = socket_fd,
14         .allocator = allocator,
15         .loop = &loop,
16         .session_name = session_name,
17@@ -328,15 +330,7 @@ fn readCallback(
18                 _ = posix.write(posix.STDERR_FILENO, "\r\nSession killed\r\n") catch {};
19                 return cleanup(ctx, completion);
20             },
21-            .pty_out => {
22-                const parsed = protocol.parseMessage(protocol.PtyOutput, ctx.allocator, msg_line) catch |err| {
23-                    std.debug.print("Failed to parse pty_out: {s}\r\n", .{@errorName(err)});
24-                    return .rearm;
25-                };
26-                defer parsed.deinit();
27 
28-                writeToStdout(ctx, parsed.value.payload.text);
29-            },
30             else => {
31                 std.debug.print("Unexpected message type in attach client: {s}\r\n", .{msg_type.toString()});
32             },
33@@ -414,29 +408,9 @@ fn sendDetachRequest(ctx: *Context) void {
34 }
35 
36 fn sendPtyInput(ctx: *Context, data: []const u8) void {
37-    const request_payload = protocol.PtyInput{ .text = data };
38-    var out: std.io.Writer.Allocating = .init(ctx.allocator);
39-    defer out.deinit();
40-
41-    const msg = protocol.Message(@TypeOf(request_payload)){
42-        .type = protocol.MessageType.pty_in.toString(),
43-        .payload = request_payload,
44+    protocol.writeBinaryFrame(ctx.socket_fd, .pty_binary, data) catch |err| {
45+        std.debug.print("Failed to send pty input: {s}\r\n", .{@errorName(err)});
46     };
47-
48-    var stringify: std.json.Stringify = .{ .writer = &out.writer };
49-    stringify.write(msg) catch return;
50-    out.writer.writeByte('\n') catch return;
51-
52-    const owned_message = ctx.allocator.dupe(u8, out.written()) catch return;
53-
54-    const write_ctx = ctx.allocator.create(StdinWriteContext) catch return;
55-    write_ctx.* = .{
56-        .allocator = ctx.allocator,
57-        .message = owned_message,
58-    };
59-
60-    const write_completion = ctx.allocator.create(xev.Completion) catch return;
61-    ctx.stream.write(ctx.loop, write_completion, .{ .slice = owned_message }, StdinWriteContext, write_ctx, stdinWriteCallback);
62 }
63 
64 // Context for async write operations to daemon socket.
M src/daemon.zig
+62, -14
  1@@ -272,7 +272,41 @@ fn readCallback(
  2             return closeClient(client, completion);
  3         };
  4 
  5-        // Process complete messages (delimited by newline)
  6+        // Check for binary frames first
  7+        while (client.message_buffer.items.len >= @sizeOf(protocol.FrameHeader)) {
  8+            const header: *const protocol.FrameHeader = @ptrCast(@alignCast(client.message_buffer.items.ptr));
  9+
 10+            if (header.frame_type == @intFromEnum(protocol.FrameType.pty_binary)) {
 11+                const expected_total = @sizeOf(protocol.FrameHeader) + header.length;
 12+                if (client.message_buffer.items.len >= expected_total) {
 13+                    const payload = client.message_buffer.items[@sizeOf(protocol.FrameHeader)..expected_total];
 14+                    handleBinaryFrame(client, payload) catch |err| {
 15+                        std.debug.print("handleBinaryFrame failed: {s}\n", .{@errorName(err)});
 16+                        return closeClient(client, completion);
 17+                    };
 18+
 19+                    // Remove processed frame from buffer
 20+                    const remaining = client.message_buffer.items[expected_total..];
 21+                    const remaining_copy = client.allocator.dupe(u8, remaining) catch {
 22+                        return closeClient(client, completion);
 23+                    };
 24+                    client.message_buffer.clearRetainingCapacity();
 25+                    client.message_buffer.appendSlice(client.allocator, remaining_copy) catch {
 26+                        client.allocator.free(remaining_copy);
 27+                        return closeClient(client, completion);
 28+                    };
 29+                    client.allocator.free(remaining_copy);
 30+                } else {
 31+                    // Incomplete frame, wait for more data
 32+                    break;
 33+                }
 34+            } else {
 35+                // Not a binary frame, try JSON
 36+                break;
 37+            }
 38+        }
 39+
 40+        // Process complete JSON messages (delimited by newline)
 41         while (std.mem.indexOf(u8, client.message_buffer.items, "\n")) |newline_pos| {
 42             const message = client.message_buffer.items[0..newline_pos];
 43             handleMessage(client, message) catch |err| {
 44@@ -303,6 +337,10 @@ fn readCallback(
 45     }
 46 }
 47 
 48+fn handleBinaryFrame(client: *Client, payload: []const u8) !void {
 49+    try handlePtyInput(client, payload);
 50+}
 51+
 52 fn handleMessage(client: *Client, data: []const u8) !void {
 53     std.debug.print("Received message from client fd={d}: {s}\n", .{ client.fd, data });
 54 
 55@@ -338,11 +376,6 @@ fn handleMessage(client: *Client, data: []const u8) !void {
 56             std.debug.print("Handling list sessions request\n", .{});
 57             try handleListSessions(client.server_ctx, client);
 58         },
 59-        .pty_in => {
 60-            const parsed = try protocol.parseMessage(protocol.PtyInput, client.allocator, data);
 61-            defer parsed.deinit();
 62-            try handlePtyInput(client, parsed.value.payload.text);
 63-        },
 64         .window_resize => {
 65             const parsed = try protocol.parseMessage(protocol.WindowResize, client.allocator, data);
 66             defer parsed.deinit();
 67@@ -593,9 +626,7 @@ fn handleAttachSession(ctx: *ServerContext, client: *Client, session_name: []con
 68 
 69         // For first attach to new session, clear the client's terminal
 70         if (!is_reattach) {
 71-            try protocol.writeJson(ctx.allocator, client.fd, .pty_out, protocol.PtyOutput{
 72-                .text = "\x1b[2J\x1b[H", // Clear screen and move cursor to home
 73-            });
 74+            try protocol.writeBinaryFrame(client.fd, .pty_binary, "\x1b[2J\x1b[H");
 75         }
 76     } else {
 77         // Send attach success response for additional clients
 78@@ -749,12 +780,11 @@ fn renderTerminalSnapshot(session: *Session, allocator: std.mem.Allocator) ![]u8
 79 }
 80 
 81 fn notifyAttachedClientsAndCleanup(session: *Session, ctx: *ServerContext, reason: []const u8) void {
 82-    std.debug.print("Session '{s}' ending: {s}\n", .{ session.name, reason });
 83-
 84-    // Copy the session name before cleanup since HashMap key points to session.name
 85+    // Copy the session name FIRST before doing anything else, including printing
 86+    // This protects against any potential memory corruption
 87     const session_name = ctx.allocator.dupe(u8, session.name) catch {
 88-        // Fallback: just use the existing name and skip removal if allocation fails
 89-        std.debug.print("Failed to allocate session name copy\n", .{});
 90+        // Fallback: skip notification and just cleanup
 91+        std.debug.print("Failed to allocate session name copy during cleanup\n", .{});
 92         posix.close(session.pty_master_fd);
 93         session.deinit();
 94         ctx.allocator.destroy(session);
 95@@ -762,6 +792,8 @@ fn notifyAttachedClientsAndCleanup(session: *Session, ctx: *ServerContext, reaso
 96     };
 97     defer ctx.allocator.free(session_name);
 98 
 99+    std.debug.print("Session '{s}' ending: {s}\n", .{ session_name, reason });
100+
101     // Notify all attached clients
102     var it = session.attached_clients.keyIterator();
103     while (it.next()) |client_fd| {
104@@ -801,6 +833,22 @@ fn readPtyCallback(
105     const session = pty_ctx.session;
106     const ctx = pty_ctx.server_ctx;
107 
108+    // Check if session still exists (might have been killed by another client)
109+    const session_exists = blk: {
110+        var it = ctx.sessions.valueIterator();
111+        while (it.next()) |s| {
112+            if (s.* == session) break :blk true;
113+        }
114+        break :blk false;
115+    };
116+
117+    if (!session_exists) {
118+        // Session was already cleaned up, just free our context
119+        ctx.allocator.destroy(pty_ctx);
120+        ctx.allocator.destroy(completion);
121+        return .disarm;
122+    }
123+
124     if (read_result) |bytes_read| {
125         if (bytes_read == 0) {
126             std.debug.print("PTY closed (EOF)\n", .{});
M src/protocol.zig
+0, -14
 1@@ -8,7 +8,6 @@ pub const MessageType = enum {
 2     detach_session_request,
 3     kill_session_request,
 4     list_sessions_request,
 5-    pty_in,
 6     window_resize,
 7 
 8     // Daemon -> Client responses
 9@@ -16,7 +15,6 @@ pub const MessageType = enum {
10     detach_session_response,
11     kill_session_response,
12     list_sessions_response,
13-    pty_out,
14 
15     // Daemon -> Client notifications
16     detach_notification,
17@@ -28,13 +26,11 @@ pub const MessageType = enum {
18             .detach_session_request => "detach_session_request",
19             .kill_session_request => "kill_session_request",
20             .list_sessions_request => "list_sessions_request",
21-            .pty_in => "pty_in",
22             .window_resize => "window_resize",
23             .attach_session_response => "attach_session_response",
24             .detach_session_response => "detach_session_response",
25             .kill_session_response => "kill_session_response",
26             .list_sessions_response => "list_sessions_response",
27-            .pty_out => "pty_out",
28             .detach_notification => "detach_notification",
29             .kill_notification => "kill_notification",
30         };
31@@ -46,13 +42,11 @@ pub const MessageType = enum {
32             .{ "detach_session_request", .detach_session_request },
33             .{ "kill_session_request", .kill_session_request },
34             .{ "list_sessions_request", .list_sessions_request },
35-            .{ "pty_in", .pty_in },
36             .{ "window_resize", .window_resize },
37             .{ "attach_session_response", .attach_session_response },
38             .{ "detach_session_response", .detach_session_response },
39             .{ "kill_session_response", .kill_session_response },
40             .{ "list_sessions_response", .list_sessions_response },
41-            .{ "pty_out", .pty_out },
42             .{ "detach_notification", .detach_notification },
43             .{ "kill_notification", .kill_notification },
44         });
45@@ -78,10 +72,6 @@ pub const KillSessionRequest = struct {
46 
47 pub const ListSessionsRequest = struct {};
48 
49-pub const PtyInput = struct {
50-    text: []const u8,
51-};
52-
53 pub const WindowResize = struct {
54     rows: u16,
55     cols: u16,
56@@ -117,10 +107,6 @@ pub const ListSessionsResponse = struct {
57     error_message: ?[]const u8 = null,
58 };
59 
60-pub const PtyOutput = struct {
61-    text: []const u8,
62-};
63-
64 pub const DetachNotification = struct {
65     session_name: []const u8,
66 };