repos / zmx

session persistence for terminal processes
git clone https://github.com/neurosnap/zmx.git

commit
2724be4
parent
ed69a16
author
Eric Bower
date
2025-10-12 22:37:14 -0400 EDT
refactor: use binary format for pty_in and pty_out
2 files changed,  +76, -66
M src/attach.zig
+46, -12
 1@@ -24,6 +24,8 @@ const Context = struct {
 2     read_completion: ?*xev.Completion = null,
 3     read_ctx: ?*ReadContext = null,
 4     config: config_mod.Config,
 5+    frame_buffer: std.ArrayList(u8),
 6+    frame_expecting_bytes: usize = 0,
 7 };
 8 
 9 const params = clap.parseParamsComptime(
10@@ -97,7 +99,10 @@ pub fn main(config: config_mod.Config, iter: *std.process.ArgIterator) !void {
11         .loop = &loop,
12         .session_name = session_name,
13         .config = config,
14+        .frame_buffer = std.ArrayList(u8){},
15     };
16+    ctx.frame_buffer = std.ArrayList(u8).initCapacity(allocator, 4096) catch unreachable;
17+    defer ctx.frame_buffer.deinit(allocator);
18 
19     // Get terminal size
20     var ws: c.struct_winsize = undefined;
21@@ -190,14 +195,52 @@ fn readCallback(
22 
23         const data = read_buffer.slice[0..len];
24 
25-        // Find newline to get complete message
26+        // Check if this is a binary frame (starts with FrameHeader)
27+        if (data.len >= @sizeOf(protocol.FrameHeader)) {
28+            const potential_header = data[0..@sizeOf(protocol.FrameHeader)];
29+            const header: *const protocol.FrameHeader = @ptrCast(@alignCast(potential_header));
30+
31+            if (header.frame_type == @intFromEnum(protocol.FrameType.pty_binary)) {
32+                // This is a binary PTY frame
33+                const expected_total = @sizeOf(protocol.FrameHeader) + header.length;
34+                if (data.len >= expected_total) {
35+                    // We have the complete frame
36+                    const payload = data[@sizeOf(protocol.FrameHeader)..expected_total];
37+                    _ = posix.write(posix.STDOUT_FILENO, payload) catch {};
38+                    return .rearm;
39+                } else {
40+                    // Partial frame, buffer it
41+                    ctx.frame_buffer.appendSlice(ctx.allocator, data) catch {};
42+                    ctx.frame_expecting_bytes = expected_total - data.len;
43+                    return .rearm;
44+                }
45+            }
46+        }
47+
48+        // If we're expecting more frame bytes, accumulate them
49+        if (ctx.frame_expecting_bytes > 0) {
50+            ctx.frame_buffer.appendSlice(ctx.allocator, data) catch {};
51+            if (ctx.frame_buffer.items.len >= @sizeOf(protocol.FrameHeader)) {
52+                const header: *const protocol.FrameHeader = @ptrCast(@alignCast(ctx.frame_buffer.items[0..@sizeOf(protocol.FrameHeader)]));
53+                const expected_total = @sizeOf(protocol.FrameHeader) + header.length;
54+
55+                if (ctx.frame_buffer.items.len >= expected_total) {
56+                    // Complete frame received
57+                    const payload = ctx.frame_buffer.items[@sizeOf(protocol.FrameHeader)..expected_total];
58+                    _ = posix.write(posix.STDOUT_FILENO, payload) catch {};
59+                    ctx.frame_buffer.clearRetainingCapacity();
60+                    ctx.frame_expecting_bytes = 0;
61+                }
62+            }
63+            return .rearm;
64+        }
65+
66+        // Otherwise parse as JSON control message
67         const newline_idx = std.mem.indexOf(u8, data, "\n") orelse {
68-            // std.debug.print("No newline found in {d} bytes, waiting for more data\n", .{len});
69             return .rearm;
70         };
71 
72         const msg_line = data[0..newline_idx];
73-        // std.debug.print("Parsing message ({d} bytes): {s}\n", .{msg_line.len, msg_line});
74 
75         const msg_type_parsed = protocol.parseMessageType(ctx.allocator, msg_line) catch |err| {
76             std.debug.print("JSON parse error: {s}\n", .{@errorName(err)});
77@@ -280,15 +323,6 @@ fn readCallback(
78                 _ = posix.write(posix.STDERR_FILENO, "\r\nSession killed\r\n") catch {};
79                 return cleanup(ctx, completion);
80             },
81-            .pty_out => {
82-                const parsed = protocol.parseMessage(protocol.PtyOutput, ctx.allocator, msg_line) catch |err| {
83-                    std.debug.print("Failed to parse pty output: {s}\n", .{@errorName(err)});
84-                    return .rearm;
85-                };
86-                defer parsed.deinit();
87-
88-                _ = posix.write(posix.STDOUT_FILENO, parsed.value.payload.text) catch {};
89-            },
90             else => {
91                 std.debug.print("Unexpected message type in attach client: {s}\n", .{msg_type.toString()});
92             },
M src/daemon.zig
+30, -54
  1@@ -590,31 +590,13 @@ fn handleAttachSession(ctx: *ServerContext, client: *Client, session_name: []con
  2         });
  3     }
  4 
  5-    // If reattaching, send the scrollback buffer (raw PTY output with colors)
  6-    // Limit to last 64KB to avoid huge JSON messages
  7+    // If reattaching, send the scrollback buffer as binary frame
  8     if (is_reattach and session.buffer.items.len > 0) {
  9         const buffer_slice = try session.vt.plainStringUnwrapped(client.allocator);
 10-        try protocol.writeJson(ctx.allocator, client.fd, .pty_out, protocol.PtyOutput{
 11-            .text = buffer_slice,
 12-        });
 13-        std.debug.print("Sent scrollback buffer to client fd={d}\n", .{client.fd});
 14         defer client.allocator.free(buffer_slice);
 15-        //
 16-        // std.debug.print("Sending scrollback buffer: {d} bytes total\n", .{session.buffer.items.len});
 17-        //
 18-        // const max_buffer_size = 64 * 1024;
 19-        // const buffer_start = if (session.buffer.items.len > max_buffer_size)
 20-        //     session.buffer.items.len - max_buffer_size
 21-        // else
 22-        //     0;
 23-        // const buffer_slice = session.buffer.items[buffer_start..];
 24-        //
 25-        // std.debug.print("Sending slice: {d} bytes (from offset {d})\n", .{ buffer_slice.len, buffer_start });
 26-        //
 27-        // try protocol.writeJson(ctx.allocator, client.fd, .pty_out, protocol.PtyOutput{
 28-        //     .text = buffer_slice,
 29-        // });
 30-        // std.debug.print("Sent scrollback buffer to client fd={d}\n", .{client.fd});
 31+
 32+        try protocol.writeBinaryFrame(client.fd, .pty_binary, buffer_slice);
 33+        std.debug.print("Sent scrollback buffer to client fd={d} ({d} bytes)\n", .{ client.fd, buffer_slice.len });
 34     }
 35 }
 36 
 37@@ -861,6 +843,10 @@ fn readPtyCallback(
 38             std.debug.print("Buffer append error: {s}\n", .{@errorName(err)});
 39         };
 40 
 41+        // Build a sanitized buffer that only includes bytes we can safely send
 42+        var sanitized_buf = std.ArrayList(u8).initCapacity(session.allocator, valid_len) catch return .disarm;
 43+        defer sanitized_buf.deinit(session.allocator);
 44+
 45         // Parse through libghostty-vt byte-by-byte to handle invalid data
 46         // This is necessary because binary data (like /dev/urandom) can cause
 47         // panics in @enumFromInt when high bytes appear during escape sequences
 48@@ -876,56 +862,46 @@ fn readPtyCallback(
 49                 std.debug.print("VT parse error at byte 0x{x}: {s}\n", .{ byte, @errorName(err) });
 50                 // Reset to ground state on any error
 51                 session.vt_stream.parser.state = .ground;
 52+                continue;
 53             };
 54+            // Only add to sanitized buffer if we successfully parsed it
 55+            sanitized_buf.append(session.allocator, byte) catch {};
 56         }
 57 
 58         // Only proxy to clients if someone is attached
 59-        if (session.attached_clients.count() > 0 and valid_len > 0) {
 60-            // Build JSON response with properly escaped text
 61-            var response_buf = std.ArrayList(u8).initCapacity(session.allocator, 4096) catch return .disarm;
 62-            defer response_buf.deinit(session.allocator);
 63-
 64-            response_buf.appendSlice(session.allocator, "{\"type\":\"pty_out\",\"payload\":{\"text\":\"") catch return .disarm;
 65-
 66-            // Escape JSON special characters while preserving UTF-8 sequences
 67-            for (valid_data) |byte| {
 68-                switch (byte) {
 69-                    '"' => response_buf.appendSlice(session.allocator, "\\\"") catch return .disarm,
 70-                    '\\' => response_buf.appendSlice(session.allocator, "\\\\") catch return .disarm,
 71-                    '\n' => response_buf.appendSlice(session.allocator, "\\n") catch return .disarm,
 72-                    '\r' => response_buf.appendSlice(session.allocator, "\\r") catch return .disarm,
 73-                    '\t' => response_buf.appendSlice(session.allocator, "\\t") catch return .disarm,
 74-                    0x08 => response_buf.appendSlice(session.allocator, "\\b") catch return .disarm,
 75-                    0x0C => response_buf.appendSlice(session.allocator, "\\f") catch return .disarm,
 76-                    0x00...0x07, 0x0B, 0x0E...0x1F, 0x7F => {
 77-                        const escaped = std.fmt.allocPrint(session.allocator, "\\u{x:0>4}", .{byte}) catch return .disarm;
 78-                        defer session.allocator.free(escaped);
 79-                        response_buf.appendSlice(session.allocator, escaped) catch return .disarm;
 80-                    },
 81-                    else => response_buf.append(session.allocator, byte) catch return .disarm,
 82-                }
 83-            }
 84+        if (session.attached_clients.count() > 0 and sanitized_buf.items.len > 0) {
 85+            // Send PTY output as binary frame to avoid JSON escaping issues
 86+            // Frame format: [4-byte length][2-byte type][payload]
 87+            const header = protocol.FrameHeader{
 88+                .length = @intCast(sanitized_buf.items.len),
 89+                .frame_type = @intFromEnum(protocol.FrameType.pty_binary),
 90+            };
 91+
 92+            // Build complete frame with header + payload
 93+            var frame_buf = std.ArrayList(u8).initCapacity(session.allocator, @sizeOf(protocol.FrameHeader) + sanitized_buf.items.len) catch return .disarm;
 94+            defer frame_buf.deinit(session.allocator);
 95 
 96-            response_buf.appendSlice(session.allocator, "\"}}\n") catch return .disarm;
 97-            const response = response_buf.items;
 98+            const header_bytes = std.mem.asBytes(&header);
 99+            frame_buf.appendSlice(session.allocator, header_bytes) catch return .disarm;
100+            frame_buf.appendSlice(session.allocator, sanitized_buf.items) catch return .disarm;
101 
102             // Send to all attached clients using async write
103             var it = session.attached_clients.keyIterator();
104             while (it.next()) |client_fd| {
105                 const attached_client = ctx.clients.get(client_fd.*) orelse continue;
106-                const owned_response = session.allocator.dupe(u8, response) catch continue;
107+                const owned_frame = session.allocator.dupe(u8, frame_buf.items) catch continue;
108 
109                 const write_ctx = session.allocator.create(PtyWriteContext) catch {
110-                    session.allocator.free(owned_response);
111+                    session.allocator.free(owned_frame);
112                     continue;
113                 };
114                 write_ctx.* = .{
115                     .allocator = session.allocator,
116-                    .message = owned_response,
117+                    .message = owned_frame,
118                 };
119 
120                 const write_completion = session.allocator.create(xev.Completion) catch {
121-                    session.allocator.free(owned_response);
122+                    session.allocator.free(owned_frame);
123                     session.allocator.destroy(write_ctx);
124                     continue;
125                 };
126@@ -933,7 +909,7 @@ fn readPtyCallback(
127                 attached_client.stream.write(
128                     loop,
129                     write_completion,
130-                    .{ .slice = owned_response },
131+                    .{ .slice = owned_frame },
132                     PtyWriteContext,
133                     write_ctx,
134                     ptyWriteCallback,