repos / zmx

session persistence for terminal processes
git clone https://github.com/neurosnap/zmx.git

zmx / src
Eric Bower  ·  2025-12-15

main.zig

   1const std = @import("std");
   2const posix = std.posix;
   3const builtin = @import("builtin");
   4const build_options = @import("build_options");
   5const ghostty_vt = @import("ghostty-vt");
   6const ipc = @import("ipc.zig");
   7const log = @import("log.zig");
   8
   9pub const version = build_options.version;
  10pub const ghostty_version = build_options.ghostty_version;
  11
  12var log_system = log.LogSystem{};
  13
  14pub const std_options: std.Options = .{
  15    .logFn = zmxLogFn,
  16    .log_level = .debug,
  17};
  18
  19fn zmxLogFn(
  20    comptime level: std.log.Level,
  21    comptime scope: @Type(.enum_literal),
  22    comptime format: []const u8,
  23    args: anytype,
  24) void {
  25    log_system.log(level, scope, format, args);
  26}
  27
  28const c = switch (builtin.os.tag) {
  29    .macos => @cImport({
  30        @cInclude("sys/ioctl.h"); // ioctl and constants
  31        @cInclude("termios.h");
  32        @cInclude("stdlib.h");
  33        @cInclude("unistd.h");
  34    }),
  35    .freebsd => @cImport({
  36        @cInclude("termios.h"); // ioctl and constants
  37        @cInclude("libutil.h"); // openpty()
  38        @cInclude("stdlib.h");
  39        @cInclude("unistd.h");
  40    }),
  41    else => @cImport({
  42        @cInclude("sys/ioctl.h"); // ioctl and constants
  43        @cInclude("pty.h");
  44        @cInclude("stdlib.h");
  45        @cInclude("unistd.h");
  46    }),
  47};
  48
  49// Manually declare forkpty for macOS since util.h is not available during cross-compilation
  50const forkpty = if (builtin.os.tag == .macos)
  51    struct {
  52        extern "c" fn forkpty(master_fd: *c_int, name: ?[*:0]u8, termp: ?*const c.struct_termios, winp: ?*const c.struct_winsize) c_int;
  53    }.forkpty
  54else
  55    c.forkpty;
  56
  57var sigwinch_received: std.atomic.Value(bool) = std.atomic.Value(bool).init(false);
  58
  59const Client = struct {
  60    alloc: std.mem.Allocator,
  61    socket_fd: i32,
  62    has_pending_output: bool = false,
  63    read_buf: ipc.SocketBuffer,
  64    write_buf: std.ArrayList(u8),
  65
  66    pub fn deinit(self: *Client) void {
  67        posix.close(self.socket_fd);
  68        self.read_buf.deinit();
  69        self.write_buf.deinit(self.alloc);
  70    }
  71};
  72
  73const Cfg = struct {
  74    socket_dir: []const u8,
  75    log_dir: []const u8,
  76    max_scrollback: usize = 10_000_000,
  77
  78    pub fn init(alloc: std.mem.Allocator) !Cfg {
  79        const tmpdir = posix.getenv("TMPDIR") orelse "/tmp";
  80        const uid = posix.getuid();
  81
  82        var socket_dir: []const u8 = "";
  83        if (posix.getenv("ZMX_DIR")) |zmxdir| {
  84            socket_dir = try alloc.dupe(u8, zmxdir);
  85        } else {
  86            socket_dir = try std.fmt.allocPrint(alloc, "{s}/zmx-{d}", .{ tmpdir, uid });
  87        }
  88        errdefer alloc.free(socket_dir);
  89
  90        const log_dir = try std.fmt.allocPrint(alloc, "{s}/logs", .{socket_dir});
  91        errdefer alloc.free(log_dir);
  92
  93        var cfg = Cfg{
  94            .socket_dir = socket_dir,
  95            .log_dir = log_dir,
  96        };
  97
  98        try cfg.mkdir();
  99
 100        return cfg;
 101    }
 102
 103    pub fn deinit(self: *Cfg, alloc: std.mem.Allocator) void {
 104        if (self.socket_dir.len > 0) alloc.free(self.socket_dir);
 105        if (self.log_dir.len > 0) alloc.free(self.log_dir);
 106    }
 107
 108    pub fn mkdir(self: *Cfg) !void {
 109        posix.mkdirat(posix.AT.FDCWD, self.socket_dir, 0o750) catch |err| switch (err) {
 110            error.PathAlreadyExists => {},
 111            else => return err,
 112        };
 113
 114        posix.mkdirat(posix.AT.FDCWD, self.log_dir, 0o750) catch |err| switch (err) {
 115            error.PathAlreadyExists => {},
 116            else => return err,
 117        };
 118    }
 119};
 120
 121const Daemon = struct {
 122    cfg: *Cfg,
 123    alloc: std.mem.Allocator,
 124    clients: std.ArrayList(*Client),
 125    session_name: []const u8,
 126    socket_path: []const u8,
 127    running: bool,
 128    pid: i32,
 129    command: ?[]const []const u8 = null,
 130    has_pty_output: bool = false,
 131
 132    pub fn deinit(self: *Daemon) void {
 133        self.clients.deinit(self.alloc);
 134        self.alloc.free(self.socket_path);
 135    }
 136
 137    pub fn shutdown(self: *Daemon) void {
 138        std.log.info("shutting down daemon session_name={s}", .{self.session_name});
 139        self.running = false;
 140
 141        for (self.clients.items) |client| {
 142            client.deinit();
 143            self.alloc.destroy(client);
 144        }
 145        self.clients.clearRetainingCapacity();
 146    }
 147
 148    pub fn closeClient(self: *Daemon, client: *Client, i: usize, shutdown_on_last: bool) bool {
 149        const fd = client.socket_fd;
 150        client.deinit();
 151        self.alloc.destroy(client);
 152        _ = self.clients.orderedRemove(i);
 153        std.log.info("client disconnected fd={d} remaining={d}", .{ fd, self.clients.items.len });
 154        if (shutdown_on_last and self.clients.items.len == 0) {
 155            self.shutdown();
 156            return true;
 157        }
 158        return false;
 159    }
 160};
 161
 162pub fn main() !void {
 163    // use c_allocator to avoid "reached unreachable code" panic in DebugAllocator when forking
 164    const alloc = std.heap.c_allocator;
 165
 166    var args = try std.process.argsWithAllocator(alloc);
 167    defer args.deinit();
 168    _ = args.skip(); // skip program name
 169
 170    var cfg = try Cfg.init(alloc);
 171    defer cfg.deinit(alloc);
 172
 173    const log_path = try std.fs.path.join(alloc, &.{ cfg.log_dir, "zmx.log" });
 174    defer alloc.free(log_path);
 175    try log_system.init(alloc, log_path);
 176    defer log_system.deinit();
 177
 178    const cmd = args.next() orelse {
 179        return list(&cfg);
 180    };
 181
 182    if (std.mem.eql(u8, cmd, "version") or std.mem.eql(u8, cmd, "v") or std.mem.eql(u8, cmd, "-v") or std.mem.eql(u8, cmd, "--version")) {
 183        return printVersion();
 184    } else if (std.mem.eql(u8, cmd, "help") or std.mem.eql(u8, cmd, "h") or std.mem.eql(u8, cmd, "-h")) {
 185        return help();
 186    } else if (std.mem.eql(u8, cmd, "list") or std.mem.eql(u8, cmd, "l")) {
 187        return list(&cfg);
 188    } else if (std.mem.eql(u8, cmd, "detach") or std.mem.eql(u8, cmd, "d")) {
 189        return detachAll(&cfg);
 190    } else if (std.mem.eql(u8, cmd, "kill") or std.mem.eql(u8, cmd, "k")) {
 191        const session_name = args.next() orelse {
 192            return error.SessionNameRequired;
 193        };
 194        return kill(&cfg, session_name);
 195    } else if (std.mem.eql(u8, cmd, "attach") or std.mem.eql(u8, cmd, "a")) {
 196        const session_name = args.next() orelse {
 197            return error.SessionNameRequired;
 198        };
 199
 200        var command_args: std.ArrayList([]const u8) = .empty;
 201        defer command_args.deinit(alloc);
 202        while (args.next()) |arg| {
 203            try command_args.append(alloc, arg);
 204        }
 205
 206        const clients = try std.ArrayList(*Client).initCapacity(alloc, 10);
 207        var command: ?[][]const u8 = null;
 208        if (command_args.items.len > 0) {
 209            command = command_args.items;
 210        }
 211        var daemon = Daemon{
 212            .running = true,
 213            .cfg = &cfg,
 214            .alloc = alloc,
 215            .clients = clients,
 216            .session_name = session_name,
 217            .socket_path = undefined,
 218            .pid = undefined,
 219            .command = command,
 220        };
 221        daemon.socket_path = try getSocketPath(alloc, cfg.socket_dir, session_name);
 222        std.log.info("socket path={s}", .{daemon.socket_path});
 223        return attach(&daemon);
 224    } else {
 225        return help();
 226    }
 227}
 228
 229fn printVersion() !void {
 230    var buf: [256]u8 = undefined;
 231    var w = std.fs.File.stdout().writer(&buf);
 232    try w.interface.print("zmx {s}\nghostty-vt {s}\n", .{ version, ghostty_version });
 233    try w.interface.flush();
 234}
 235
 236fn help() !void {
 237    const help_text =
 238        \\zmx - session persistence for terminal processes
 239        \\
 240        \\Usage: zmx <command> [args]
 241        \\
 242        \\Commands:
 243        \\  [a]ttach <name> [command...]  Create or attach to a session
 244        \\  [d]etach                      Detach all clients from current session (ctrl+\ for current client)
 245        \\  [l]ist                        List active sessions
 246        \\  [k]ill <name>                 Kill a session and all attached clients
 247        \\  [v]ersion                     Show version information
 248        \\  [h]elp                        Show this help message
 249        \\
 250    ;
 251    var buf: [4096]u8 = undefined;
 252    var w = std.fs.File.stdout().writer(&buf);
 253    try w.interface.print(help_text, .{});
 254    try w.interface.flush();
 255}
 256
 257const SessionEntry = struct {
 258    name: []const u8,
 259    pid: ?i32,
 260    clients_len: ?usize,
 261    is_error: bool,
 262    error_name: ?[]const u8,
 263
 264    fn lessThan(_: void, a: SessionEntry, b: SessionEntry) bool {
 265        return std.mem.order(u8, a.name, b.name) == .lt;
 266    }
 267};
 268
 269fn list(cfg: *Cfg) !void {
 270    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
 271    defer _ = gpa.deinit();
 272    const alloc = gpa.allocator();
 273
 274    var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{ .iterate = true });
 275    defer dir.close();
 276    var iter = dir.iterate();
 277    var buf: [4096]u8 = undefined;
 278    var w = std.fs.File.stdout().writer(&buf);
 279
 280    var sessions = try std.ArrayList(SessionEntry).initCapacity(alloc, 16);
 281    defer {
 282        for (sessions.items) |session| {
 283            alloc.free(session.name);
 284        }
 285        sessions.deinit(alloc);
 286    }
 287
 288    while (try iter.next()) |entry| {
 289        const exists = sessionExists(dir, entry.name) catch continue;
 290        if (exists) {
 291            const name = try alloc.dupe(u8, entry.name);
 292            errdefer alloc.free(name);
 293
 294            const socket_path = try getSocketPath(alloc, cfg.socket_dir, entry.name);
 295            defer alloc.free(socket_path);
 296
 297            const result = probeSession(alloc, socket_path) catch |err| {
 298                try sessions.append(alloc, .{
 299                    .name = name,
 300                    .pid = null,
 301                    .clients_len = null,
 302                    .is_error = true,
 303                    .error_name = @errorName(err),
 304                });
 305                cleanupStaleSocket(dir, entry.name);
 306                continue;
 307            };
 308            posix.close(result.fd);
 309
 310            try sessions.append(alloc, .{
 311                .name = name,
 312                .pid = result.info.pid,
 313                .clients_len = result.info.clients_len,
 314                .is_error = false,
 315                .error_name = null,
 316            });
 317        }
 318    }
 319
 320    if (sessions.items.len == 0) {
 321        try w.interface.print("no sessions found in {s}\n", .{cfg.socket_dir});
 322        try w.interface.flush();
 323        return;
 324    }
 325
 326    std.mem.sort(SessionEntry, sessions.items, {}, SessionEntry.lessThan);
 327
 328    for (sessions.items) |session| {
 329        if (session.is_error) {
 330            try w.interface.print("session_name={s}\tstatus={s}\t(cleaning up)\n", .{ session.name, session.error_name.? });
 331        } else {
 332            try w.interface.print("session_name={s}\tpid={d}\tclients={d}\n", .{ session.name, session.pid.?, session.clients_len.? });
 333        }
 334        try w.interface.flush();
 335    }
 336}
 337
 338fn detachAll(cfg: *Cfg) !void {
 339    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
 340    defer _ = gpa.deinit();
 341    const alloc = gpa.allocator();
 342    const session_name = std.process.getEnvVarOwned(alloc, "ZMX_SESSION") catch |err| switch (err) {
 343        error.EnvironmentVariableNotFound => {
 344            std.log.err("ZMX_SESSION env var not found: are you inside a zmx session?", .{});
 345            return;
 346        },
 347        else => return err,
 348    };
 349    defer alloc.free(session_name);
 350
 351    var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
 352    defer dir.close();
 353
 354    const socket_path = try getSocketPath(alloc, cfg.socket_dir, session_name);
 355    defer alloc.free(socket_path);
 356    const result = probeSession(alloc, socket_path) catch |err| {
 357        std.log.err("session unresponsive: {s}", .{@errorName(err)});
 358        cleanupStaleSocket(dir, session_name);
 359        return;
 360    };
 361    defer posix.close(result.fd);
 362    ipc.send(result.fd, .DetachAll, "") catch |err| switch (err) {
 363        error.BrokenPipe, error.ConnectionResetByPeer => return,
 364        else => return err,
 365    };
 366}
 367
 368fn kill(cfg: *Cfg, session_name: []const u8) !void {
 369    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
 370    defer _ = gpa.deinit();
 371    const alloc = gpa.allocator();
 372
 373    var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
 374    defer dir.close();
 375
 376    const exists = try sessionExists(dir, session_name);
 377    if (!exists) {
 378        std.log.err("cannot kill session because it does not exist session_name={s}", .{session_name});
 379        return;
 380    }
 381
 382    const socket_path = try getSocketPath(alloc, cfg.socket_dir, session_name);
 383    defer alloc.free(socket_path);
 384    const result = probeSession(alloc, socket_path) catch |err| {
 385        std.log.err("session unresponsive: {s}", .{@errorName(err)});
 386        cleanupStaleSocket(dir, session_name);
 387        var buf: [4096]u8 = undefined;
 388        var w = std.fs.File.stdout().writer(&buf);
 389        w.interface.print("cleaned up stale session {s}\n", .{session_name}) catch {};
 390        w.interface.flush() catch {};
 391        return;
 392    };
 393    defer posix.close(result.fd);
 394    ipc.send(result.fd, .Kill, "") catch |err| switch (err) {
 395        error.BrokenPipe, error.ConnectionResetByPeer => return,
 396        else => return err,
 397    };
 398
 399    var buf: [4096]u8 = undefined;
 400    var w = std.fs.File.stdout().writer(&buf);
 401    try w.interface.print("killed session {s}\n", .{session_name});
 402    try w.interface.flush();
 403}
 404
 405fn attach(daemon: *Daemon) !void {
 406    if (std.posix.getenv("ZMX_SESSION")) |_| {
 407        return error.CannotAttachToSessionInSession;
 408    }
 409
 410    var dir = try std.fs.openDirAbsolute(daemon.cfg.socket_dir, .{});
 411    defer dir.close();
 412
 413    const exists = try sessionExists(dir, daemon.session_name);
 414    var should_create = !exists;
 415
 416    if (exists) {
 417        if (probeSession(daemon.alloc, daemon.socket_path)) |result| {
 418            posix.close(result.fd);
 419            if (daemon.command != null) {
 420                std.log.warn("session already exists, ignoring command session={s}", .{daemon.session_name});
 421            }
 422        } else |_| {
 423            cleanupStaleSocket(dir, daemon.session_name);
 424            should_create = true;
 425        }
 426    }
 427
 428    if (should_create) {
 429        std.log.info("creating session={s}", .{daemon.session_name});
 430        const server_sock_fd = try createSocket(daemon.socket_path);
 431
 432        const pid = try posix.fork();
 433        if (pid == 0) { // child
 434            _ = try posix.setsid();
 435
 436            log_system.deinit();
 437            const session_log_name = try std.fmt.allocPrint(daemon.alloc, "{s}.log", .{daemon.session_name});
 438            defer daemon.alloc.free(session_log_name);
 439            const session_log_path = try std.fs.path.join(daemon.alloc, &.{ daemon.cfg.log_dir, session_log_name });
 440            defer daemon.alloc.free(session_log_path);
 441            try log_system.init(daemon.alloc, session_log_path);
 442
 443            errdefer {
 444                posix.close(server_sock_fd);
 445                dir.deleteFile(daemon.session_name) catch {};
 446            }
 447            const pty_fd = try spawnPty(daemon);
 448            defer {
 449                posix.close(pty_fd);
 450                posix.close(server_sock_fd);
 451                std.log.info("deleting socket file session_name={s}", .{daemon.session_name});
 452                dir.deleteFile(daemon.session_name) catch |err| {
 453                    std.log.warn("failed to delete socket file err={s}", .{@errorName(err)});
 454                };
 455            }
 456            try daemonLoop(daemon, server_sock_fd, pty_fd);
 457            // Reap PTY child to prevent zombie
 458            _ = posix.waitpid(daemon.pid, 0);
 459            daemon.deinit();
 460            return;
 461        }
 462        posix.close(server_sock_fd);
 463        std.Thread.sleep(10 * std.time.ns_per_ms);
 464    }
 465
 466    const client_sock = try sessionConnect(daemon.socket_path);
 467    std.log.info("attached session={s}", .{daemon.session_name});
 468    //  this is typically used with tcsetattr() to modify terminal settings.
 469    //      - you first get the current settings with tcgetattr()
 470    //      - modify the desired attributes in the termios structure
 471    //      - then apply the changes with tcsetattr().
 472    //  This prevents unintended side effects by preserving other settings.
 473    var orig_termios: c.termios = undefined;
 474    _ = c.tcgetattr(posix.STDIN_FILENO, &orig_termios);
 475
 476    // restore stdin fd to its original state after exiting.
 477    // Use TCSAFLUSH to discard any unread input, preventing stale input after detach.
 478    defer {
 479        _ = c.tcsetattr(posix.STDIN_FILENO, c.TCSAFLUSH, &orig_termios);
 480        // Clear screen and show cursor on detach
 481        const restore_seq = "\x1b[?25h\x1b[2J\x1b[H";
 482        _ = posix.write(posix.STDOUT_FILENO, restore_seq) catch {};
 483    }
 484
 485    var raw_termios = orig_termios;
 486    //  set raw mode after successful connection.
 487    //      disables canonical mode (line buffering), input echoing, signal generation from
 488    //      control characters (like Ctrl+C), and flow control.
 489    c.cfmakeraw(&raw_termios);
 490
 491    // Additional granular raw mode settings for precise control
 492    // (matches what abduco and shpool do)
 493    raw_termios.c_cc[c.VLNEXT] = c._POSIX_VDISABLE; // Disable literal-next (Ctrl-V)
 494    // We want to intercept Ctrl+\ (SIGQUIT) so we can use it as a detach key
 495    raw_termios.c_cc[c.VQUIT] = c._POSIX_VDISABLE; // Disable SIGQUIT (Ctrl+\)
 496    raw_termios.c_cc[c.VMIN] = 1; // Minimum chars to read: return after 1 byte
 497    raw_termios.c_cc[c.VTIME] = 0; // Read timeout: no timeout, return immediately
 498
 499    _ = c.tcsetattr(posix.STDIN_FILENO, c.TCSANOW, &raw_termios);
 500
 501    // Clear screen and move cursor to home before attaching
 502    const clear_seq = "\x1b[2J\x1b[H";
 503    _ = try posix.write(posix.STDOUT_FILENO, clear_seq);
 504
 505    try clientLoop(daemon.cfg, client_sock);
 506}
 507
 508fn clientLoop(_: *Cfg, client_sock_fd: i32) !void {
 509    // use c_allocator to avoid "reached unreachable code" panic in DebugAllocator when forking
 510    const alloc = std.heap.c_allocator;
 511    defer posix.close(client_sock_fd);
 512
 513    setupSigwinchHandler();
 514
 515    // Send init message with terminal size
 516    const size = getTerminalSize(posix.STDOUT_FILENO);
 517    ipc.send(client_sock_fd, .Init, std.mem.asBytes(&size)) catch {};
 518
 519    var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(alloc, 2);
 520    defer poll_fds.deinit(alloc);
 521
 522    var read_buf = try ipc.SocketBuffer.init(alloc);
 523    defer read_buf.deinit();
 524
 525    var stdout_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
 526    defer stdout_buf.deinit(alloc);
 527
 528    const stdin_fd = posix.STDIN_FILENO;
 529
 530    // Make stdin non-blocking
 531    const flags = try posix.fcntl(stdin_fd, posix.F.GETFL, 0);
 532    _ = try posix.fcntl(stdin_fd, posix.F.SETFL, flags | posix.SOCK.NONBLOCK);
 533
 534    while (true) {
 535        // Check for pending SIGWINCH
 536        if (sigwinch_received.swap(false, .acq_rel)) {
 537            const next_size = getTerminalSize(posix.STDOUT_FILENO);
 538            ipc.send(client_sock_fd, .Resize, std.mem.asBytes(&next_size)) catch |err| switch (err) {
 539                error.BrokenPipe, error.ConnectionResetByPeer => return,
 540                else => return err,
 541            };
 542        }
 543
 544        poll_fds.clearRetainingCapacity();
 545
 546        try poll_fds.append(alloc, .{
 547            .fd = stdin_fd,
 548            .events = posix.POLL.IN,
 549            .revents = 0,
 550        });
 551
 552        try poll_fds.append(alloc, .{
 553            .fd = client_sock_fd,
 554            .events = posix.POLL.IN,
 555            .revents = 0,
 556        });
 557
 558        if (stdout_buf.items.len > 0) {
 559            try poll_fds.append(alloc, .{
 560                .fd = posix.STDOUT_FILENO,
 561                .events = posix.POLL.OUT,
 562                .revents = 0,
 563            });
 564        }
 565
 566        _ = posix.poll(poll_fds.items, -1) catch |err| {
 567            if (err == error.Interrupted) continue; // EINTR from signal, loop again
 568            return err;
 569        };
 570
 571        // Handle stdin -> socket (Input)
 572        if (poll_fds.items[0].revents & (posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
 573            var buf: [4096]u8 = undefined;
 574            const n_opt: ?usize = posix.read(stdin_fd, &buf) catch |err| blk: {
 575                if (err == error.WouldBlock) break :blk null;
 576                return err;
 577            };
 578
 579            if (n_opt) |n| {
 580                if (n > 0) {
 581                    // Check for Kitty keyboard protocol escape sequence for Ctrl+\
 582                    // Format: CSI 92 ; <modifiers> u  where modifiers has Ctrl bit (bit 2) set
 583                    // Examples: \e[92;5u (basic), \e[92;133u (with event flags)
 584                    if (isKittyCtrlBackslash(buf[0..n])) {
 585                        ipc.send(client_sock_fd, .Detach, "") catch |err| switch (err) {
 586                            error.BrokenPipe, error.ConnectionResetByPeer => return,
 587                            else => return err,
 588                        };
 589                        continue;
 590                    }
 591
 592                    var i: usize = 0;
 593                    while (i < n) : (i += 1) {
 594                        if (buf[i] == 0x1C) { // Ctrl+\ (File Separator)
 595                            ipc.send(client_sock_fd, .Detach, "") catch |err| switch (err) {
 596                                error.BrokenPipe, error.ConnectionResetByPeer => return,
 597                                else => return err,
 598                            };
 599                        } else {
 600                            const payload = buf[i .. i + 1];
 601                            ipc.send(client_sock_fd, .Input, payload) catch |err| switch (err) {
 602                                error.BrokenPipe, error.ConnectionResetByPeer => return,
 603                                else => return err,
 604                            };
 605                        }
 606                    }
 607                } else {
 608                    // EOF on stdin
 609                    return;
 610                }
 611            }
 612        }
 613
 614        // Handle socket -> stdout (Output)
 615        if (poll_fds.items[1].revents & posix.POLL.IN != 0) {
 616            const n = read_buf.read(client_sock_fd) catch |err| {
 617                if (err == error.WouldBlock) continue;
 618                if (err == error.ConnectionResetByPeer or err == error.BrokenPipe) {
 619                    return;
 620                }
 621                std.log.err("daemon read err={s}", .{@errorName(err)});
 622                return err;
 623            };
 624            if (n == 0) {
 625                return; // Server closed connection
 626            }
 627
 628            while (read_buf.next()) |msg| {
 629                switch (msg.header.tag) {
 630                    .Output => {
 631                        if (msg.payload.len > 0) {
 632                            try stdout_buf.appendSlice(alloc, msg.payload);
 633                        }
 634                    },
 635                    else => {},
 636                }
 637            }
 638        }
 639
 640        if (stdout_buf.items.len > 0) {
 641            const n = posix.write(posix.STDOUT_FILENO, stdout_buf.items) catch |err| blk: {
 642                if (err == error.WouldBlock) break :blk 0;
 643                return err;
 644            };
 645            if (n > 0) {
 646                try stdout_buf.replaceRange(alloc, 0, n, &[_]u8{});
 647            }
 648        }
 649
 650        if (poll_fds.items[1].revents & (posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
 651            return;
 652        }
 653    }
 654}
 655
 656fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
 657    std.log.info("daemon started session={s} pty_fd={d}", .{ daemon.session_name, pty_fd });
 658    var should_exit = false;
 659    var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(daemon.alloc, 8);
 660    defer poll_fds.deinit(daemon.alloc);
 661
 662    const init_size = getTerminalSize(pty_fd);
 663    var term = try ghostty_vt.Terminal.init(daemon.alloc, .{
 664        .cols = init_size.cols,
 665        .rows = init_size.rows,
 666        .max_scrollback = daemon.cfg.max_scrollback,
 667    });
 668    defer term.deinit(daemon.alloc);
 669    var vt_stream = term.vtStream();
 670    defer vt_stream.deinit();
 671
 672    while (!should_exit and daemon.running) {
 673        poll_fds.clearRetainingCapacity();
 674
 675        try poll_fds.append(daemon.alloc, .{
 676            .fd = server_sock_fd,
 677            .events = posix.POLL.IN,
 678            .revents = 0,
 679        });
 680
 681        try poll_fds.append(daemon.alloc, .{
 682            .fd = pty_fd,
 683            .events = posix.POLL.IN,
 684            .revents = 0,
 685        });
 686
 687        for (daemon.clients.items) |client| {
 688            var events: i16 = posix.POLL.IN;
 689            if (client.has_pending_output) {
 690                events |= posix.POLL.OUT;
 691            }
 692            try poll_fds.append(daemon.alloc, .{
 693                .fd = client.socket_fd,
 694                .events = events,
 695                .revents = 0,
 696            });
 697        }
 698
 699        _ = posix.poll(poll_fds.items, -1) catch |err| {
 700            return err;
 701        };
 702
 703        if (poll_fds.items[0].revents & (posix.POLL.ERR | posix.POLL.HUP | posix.POLL.NVAL) != 0) {
 704            std.log.err("server socket error revents={d}", .{poll_fds.items[0].revents});
 705            should_exit = true;
 706        } else if (poll_fds.items[0].revents & posix.POLL.IN != 0) {
 707            const client_fd = try posix.accept(server_sock_fd, null, null, posix.SOCK.NONBLOCK | posix.SOCK.CLOEXEC);
 708            const client = try daemon.alloc.create(Client);
 709            client.* = Client{
 710                .alloc = daemon.alloc,
 711                .socket_fd = client_fd,
 712                .read_buf = try ipc.SocketBuffer.init(daemon.alloc),
 713                .write_buf = undefined,
 714            };
 715            client.write_buf = try std.ArrayList(u8).initCapacity(client.alloc, 4096);
 716            try daemon.clients.append(daemon.alloc, client);
 717            std.log.info("client connected fd={d} total={d}", .{ client_fd, daemon.clients.items.len });
 718        }
 719
 720        if (poll_fds.items[1].revents & (posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
 721            // Read from PTY
 722            var buf: [4096]u8 = undefined;
 723            const n_opt: ?usize = posix.read(pty_fd, &buf) catch |err| blk: {
 724                if (err == error.WouldBlock) break :blk null;
 725                break :blk 0;
 726            };
 727
 728            if (n_opt) |n| {
 729                if (n == 0) {
 730                    // EOF: Shell exited
 731                    std.log.info("shell exited pty_fd={d}", .{pty_fd});
 732                    should_exit = true;
 733                } else {
 734                    // Feed PTY output to terminal emulator for state tracking
 735                    try vt_stream.nextSlice(buf[0..n]);
 736                    daemon.has_pty_output = true;
 737
 738                    // Broadcast data to all clients
 739                    for (daemon.clients.items) |client| {
 740                        ipc.appendMessage(daemon.alloc, &client.write_buf, .Output, buf[0..n]) catch |err| {
 741                            std.log.warn("failed to buffer output for client err={s}", .{@errorName(err)});
 742                            continue;
 743                        };
 744                        client.has_pending_output = true;
 745                    }
 746                }
 747            }
 748        }
 749
 750        var i: usize = daemon.clients.items.len;
 751        // Only iterate over clients that were present when poll_fds was constructed
 752        // poll_fds contains [server, pty, client0, client1, ...]
 753        // So number of clients in poll_fds is poll_fds.items.len - 2
 754        const num_polled_clients = poll_fds.items.len - 2;
 755        if (i > num_polled_clients) {
 756            // If we have more clients than polled (i.e. we just accepted one), start from the polled ones
 757            i = num_polled_clients;
 758        }
 759
 760        clients_loop: while (i > 0) {
 761            i -= 1;
 762            const client = daemon.clients.items[i];
 763            const revents = poll_fds.items[i + 2].revents;
 764
 765            if (revents & posix.POLL.IN != 0) {
 766                const n = client.read_buf.read(client.socket_fd) catch |err| {
 767                    if (err == error.WouldBlock) continue;
 768                    std.log.debug("client read err={s} fd={d}", .{ @errorName(err), client.socket_fd });
 769                    const last = daemon.closeClient(client, i, false);
 770                    if (last) should_exit = true;
 771                    continue;
 772                };
 773
 774                if (n == 0) {
 775                    // Client closed connection
 776                    const last = daemon.closeClient(client, i, false);
 777                    if (last) should_exit = true;
 778                    continue;
 779                }
 780
 781                while (client.read_buf.next()) |msg| {
 782                    switch (msg.header.tag) {
 783                        .Input => {
 784                            if (msg.payload.len > 0) {
 785                                _ = try posix.write(pty_fd, msg.payload);
 786                            }
 787                        },
 788                        .Init => {
 789                            if (msg.payload.len == @sizeOf(ipc.Resize)) {
 790                                const resize = std.mem.bytesToValue(ipc.Resize, msg.payload);
 791                                var ws: c.struct_winsize = .{
 792                                    .ws_row = resize.rows,
 793                                    .ws_col = resize.cols,
 794                                    .ws_xpixel = 0,
 795                                    .ws_ypixel = 0,
 796                                };
 797                                _ = c.ioctl(pty_fd, c.TIOCSWINSZ, &ws);
 798                                try term.resize(daemon.alloc, resize.cols, resize.rows);
 799
 800                                // Force SIGWINCH to PTY foreground process group on re-attach
 801                                var pgrp: posix.pid_t = 0;
 802                                if (c.ioctl(pty_fd, c.TIOCGPGRP, &pgrp) == 0 and pgrp > 0) {
 803                                    posix.kill(-pgrp, posix.SIG.WINCH) catch {};
 804                                }
 805                                std.log.debug("init resize rows={d} cols={d}", .{ resize.rows, resize.cols });
 806
 807                                // Only send terminal state if there's been PTY output (skip on first attach)
 808                                if (daemon.has_pty_output) {
 809                                    var builder: std.Io.Writer.Allocating = .init(daemon.alloc);
 810                                    defer builder.deinit();
 811                                    var term_formatter = ghostty_vt.formatter.TerminalFormatter.init(&term, .vt);
 812                                    term_formatter.content = .{ .selection = null };
 813                                    term_formatter.extra = .{
 814                                        .palette = false, // Don't override host terminal's palette
 815                                        .modes = true,
 816                                        .scrolling_region = true,
 817                                        .tabstops = true,
 818                                        .pwd = true,
 819                                        .keyboard = true,
 820                                        .screen = .all,
 821                                    };
 822                                    term_formatter.format(&builder.writer) catch |err| {
 823                                        std.log.warn("failed to format terminal state err={s}", .{@errorName(err)});
 824                                    };
 825                                    const term_output = builder.writer.buffered();
 826                                    if (term_output.len > 0) {
 827                                        ipc.appendMessage(daemon.alloc, &client.write_buf, .Output, term_output) catch |err| {
 828                                            std.log.warn("failed to buffer terminal state for client err={s}", .{@errorName(err)});
 829                                        };
 830                                        client.has_pending_output = true;
 831                                    }
 832                                }
 833                            }
 834                        },
 835                        .Resize => {
 836                            if (msg.payload.len == @sizeOf(ipc.Resize)) {
 837                                const resize = std.mem.bytesToValue(ipc.Resize, msg.payload);
 838                                var ws: c.struct_winsize = .{
 839                                    .ws_row = resize.rows,
 840                                    .ws_col = resize.cols,
 841                                    .ws_xpixel = 0,
 842                                    .ws_ypixel = 0,
 843                                };
 844                                _ = c.ioctl(pty_fd, c.TIOCSWINSZ, &ws);
 845                                try term.resize(daemon.alloc, resize.cols, resize.rows);
 846                                std.log.debug("resize rows={d} cols={d}", .{ resize.rows, resize.cols });
 847                            }
 848                        },
 849                        .Detach => {
 850                            std.log.info("client detach fd={d}", .{client.socket_fd});
 851                            _ = daemon.closeClient(client, i, false);
 852                            break :clients_loop;
 853                        },
 854                        .DetachAll => {
 855                            std.log.info("detach all clients={d}", .{daemon.clients.items.len});
 856                            for (daemon.clients.items) |client_to_close| {
 857                                client_to_close.deinit();
 858                                daemon.alloc.destroy(client_to_close);
 859                            }
 860                            daemon.clients.clearRetainingCapacity();
 861                            break :clients_loop;
 862                        },
 863                        .Kill => {
 864                            std.log.info("kill received session={s}", .{daemon.session_name});
 865                            posix.kill(daemon.pid, posix.SIG.TERM) catch |err| {
 866                                std.log.warn("failed to send SIGTERM to pty child err={s}", .{@errorName(err)});
 867                            };
 868                            daemon.shutdown();
 869                            should_exit = true;
 870                            break :clients_loop;
 871                        },
 872                        .Info => {
 873                            // subtract current client since it's just fetching info
 874                            const clients_len = daemon.clients.items.len - 1;
 875                            const info = ipc.Info{
 876                                .clients_len = clients_len,
 877                                .pid = daemon.pid,
 878                            };
 879                            try ipc.appendMessage(daemon.alloc, &client.write_buf, .Info, std.mem.asBytes(&info));
 880                            client.has_pending_output = true;
 881                        },
 882                        .Output => {}, // Clients shouldn't send output
 883                    }
 884                }
 885            }
 886
 887            if (revents & posix.POLL.OUT != 0) {
 888                // Flush pending output buffers
 889                const n = posix.write(client.socket_fd, client.write_buf.items) catch |err| blk: {
 890                    if (err == error.WouldBlock) break :blk 0;
 891                    // Error on write, close client
 892                    const last = daemon.closeClient(client, i, false);
 893                    if (last) should_exit = true;
 894                    continue;
 895                };
 896
 897                if (n > 0) {
 898                    client.write_buf.replaceRange(daemon.alloc, 0, n, &[_]u8{}) catch unreachable;
 899                }
 900
 901                if (client.write_buf.items.len == 0) {
 902                    client.has_pending_output = false;
 903                }
 904            }
 905
 906            if (revents & (posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
 907                const last = daemon.closeClient(client, i, false);
 908                if (last) should_exit = true;
 909            }
 910        }
 911    }
 912}
 913
 914fn spawnPty(daemon: *Daemon) !c_int {
 915    const size = getTerminalSize(posix.STDOUT_FILENO);
 916    var ws: c.struct_winsize = .{
 917        .ws_row = size.rows,
 918        .ws_col = size.cols,
 919        .ws_xpixel = 0,
 920        .ws_ypixel = 0,
 921    };
 922
 923    var master_fd: c_int = undefined;
 924    const pid = forkpty(&master_fd, null, null, &ws);
 925    if (pid < 0) {
 926        return error.ForkPtyFailed;
 927    }
 928
 929    if (pid == 0) { // child pid code path
 930        const session_env = try std.fmt.allocPrint(daemon.alloc, "ZMX_SESSION={s}\x00", .{daemon.session_name});
 931        _ = c.putenv(@ptrCast(session_env.ptr));
 932
 933        if (daemon.command) |cmd_args| {
 934            const alloc = std.heap.c_allocator;
 935            var argv_buf: [64:null]?[*:0]const u8 = undefined;
 936            for (cmd_args, 0..) |arg, i| {
 937                argv_buf[i] = alloc.dupeZ(u8, arg) catch {
 938                    std.posix.exit(1);
 939                };
 940            }
 941            argv_buf[cmd_args.len] = null;
 942            const argv: [*:null]const ?[*:0]const u8 = &argv_buf;
 943            const err = std.posix.execvpeZ(argv_buf[0].?, argv, std.c.environ);
 944            std.log.err("execvpe failed: cmd={s} err={s}", .{ cmd_args[0], @errorName(err) });
 945            std.posix.exit(1);
 946        } else {
 947            const shell = std.posix.getenv("SHELL") orelse "/bin/sh";
 948            const argv = [_:null]?[*:0]const u8{ shell, null };
 949            const err = std.posix.execveZ(shell, &argv, std.c.environ);
 950            std.log.err("execve failed: err={s}", .{@errorName(err)});
 951            std.posix.exit(1);
 952        }
 953    }
 954    // master pid code path
 955    daemon.pid = pid;
 956    std.log.info("pty spawned session={s} pid={d}", .{ daemon.session_name, pid });
 957
 958    // make pty non-blocking
 959    const flags = try posix.fcntl(master_fd, posix.F.GETFL, 0);
 960    _ = try posix.fcntl(master_fd, posix.F.SETFL, flags | @as(u32, 0o4000));
 961    return master_fd;
 962}
 963
 964fn sessionConnect(fname: []const u8) !i32 {
 965    var unix_addr = try std.net.Address.initUnix(fname);
 966    const socket_fd = try posix.socket(posix.AF.UNIX, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
 967    errdefer posix.close(socket_fd);
 968    try posix.connect(socket_fd, &unix_addr.any, unix_addr.getOsSockLen());
 969    return socket_fd;
 970}
 971
 972const SessionProbeError = error{
 973    Timeout,
 974    ConnectionRefused,
 975    Unexpected,
 976};
 977
 978const SessionProbeResult = struct {
 979    fd: i32,
 980    info: ipc.Info,
 981};
 982
 983fn probeSession(alloc: std.mem.Allocator, socket_path: []const u8) SessionProbeError!SessionProbeResult {
 984    const timeout_ms = 1000;
 985    const fd = sessionConnect(socket_path) catch |err| switch (err) {
 986        error.ConnectionRefused => return error.ConnectionRefused,
 987        else => return error.Unexpected,
 988    };
 989    errdefer posix.close(fd);
 990
 991    ipc.send(fd, .Info, "") catch return error.Unexpected;
 992
 993    var poll_fds = [_]posix.pollfd{.{ .fd = fd, .events = posix.POLL.IN, .revents = 0 }};
 994    const poll_result = posix.poll(&poll_fds, timeout_ms) catch return error.Unexpected;
 995    if (poll_result == 0) {
 996        return error.Timeout;
 997    }
 998
 999    var sb = ipc.SocketBuffer.init(alloc) catch return error.Unexpected;
1000    defer sb.deinit();
1001
1002    const n = sb.read(fd) catch return error.Unexpected;
1003    if (n == 0) return error.Unexpected;
1004
1005    while (sb.next()) |msg| {
1006        if (msg.header.tag == .Info) {
1007            if (msg.payload.len == @sizeOf(ipc.Info)) {
1008                return .{
1009                    .fd = fd,
1010                    .info = std.mem.bytesToValue(ipc.Info, msg.payload[0..@sizeOf(ipc.Info)]),
1011                };
1012            }
1013        }
1014    }
1015    return error.Unexpected;
1016}
1017
1018fn cleanupStaleSocket(dir: std.fs.Dir, session_name: []const u8) void {
1019    std.log.warn("stale socket found, cleaning up session={s}", .{session_name});
1020    dir.deleteFile(session_name) catch |err| {
1021        std.log.warn("failed to delete stale socket err={s}", .{@errorName(err)});
1022    };
1023}
1024
1025fn sessionExists(dir: std.fs.Dir, name: []const u8) !bool {
1026    const stat = dir.statFile(name) catch |err| switch (err) {
1027        error.FileNotFound => return false,
1028        else => return err,
1029    };
1030    if (stat.kind != .unix_domain_socket) {
1031        return error.FileNotUnixSocket;
1032    }
1033    return true;
1034}
1035
1036fn createSocket(fname: []const u8) !i32 {
1037    // AF.UNIX: Unix domain socket for local IPC with client processes
1038    // SOCK.STREAM: Reliable, bidirectional communication
1039    // SOCK.NONBLOCK: Set socket to non-blocking
1040    const fd = try posix.socket(posix.AF.UNIX, posix.SOCK.STREAM | posix.SOCK.NONBLOCK | posix.SOCK.CLOEXEC, 0);
1041    errdefer posix.close(fd);
1042
1043    var unix_addr = try std.net.Address.initUnix(fname);
1044    try posix.bind(fd, &unix_addr.any, unix_addr.getOsSockLen());
1045    try posix.listen(fd, 128);
1046    return fd;
1047}
1048
1049pub fn getSocketPath(alloc: std.mem.Allocator, socket_dir: []const u8, session_name: []const u8) ![]const u8 {
1050    const dir = socket_dir;
1051    const fname = try alloc.alloc(u8, dir.len + session_name.len + 1);
1052    @memcpy(fname[0..dir.len], dir);
1053    @memcpy(fname[dir.len .. dir.len + 1], "/");
1054    @memcpy(fname[dir.len + 1 ..], session_name);
1055    return fname;
1056}
1057
1058fn handleSigwinch(_: i32, _: *const posix.siginfo_t, _: ?*anyopaque) callconv(.c) void {
1059    sigwinch_received.store(true, .release);
1060}
1061
1062fn setupSigwinchHandler() void {
1063    const act: posix.Sigaction = .{
1064        .handler = .{ .sigaction = handleSigwinch },
1065        .mask = posix.sigemptyset(),
1066        .flags = posix.SA.SIGINFO,
1067    };
1068    posix.sigaction(posix.SIG.WINCH, &act, null);
1069}
1070
1071fn getTerminalSize(fd: i32) ipc.Resize {
1072    var ws: c.struct_winsize = undefined;
1073    if (c.ioctl(fd, c.TIOCGWINSZ, &ws) == 0 and ws.ws_row > 0 and ws.ws_col > 0) {
1074        return .{ .rows = ws.ws_row, .cols = ws.ws_col };
1075    }
1076    return .{ .rows = 24, .cols = 80 };
1077}
1078
1079/// Detects Kitty keyboard protocol escape sequence for Ctrl+\
1080/// Common sequences: \e[92;5u (basic), \e[92;133u (with event flags)
1081fn isKittyCtrlBackslash(buf: []const u8) bool {
1082    return std.mem.indexOf(u8, buf, "\x1b[92;5u") != null or
1083        std.mem.indexOf(u8, buf, "\x1b[92;133u") != null;
1084}
1085
1086test "isKittyCtrlBackslash" {
1087    try std.testing.expect(isKittyCtrlBackslash("\x1b[92;5u"));
1088    try std.testing.expect(isKittyCtrlBackslash("\x1b[92;133u"));
1089    try std.testing.expect(!isKittyCtrlBackslash("\x1b[92;1u"));
1090    try std.testing.expect(!isKittyCtrlBackslash("\x1b[93;5u"));
1091    try std.testing.expect(!isKittyCtrlBackslash("garbage"));
1092}