repos / zmx

session persistence for terminal processes
git clone https://github.com/neurosnap/zmx.git

zmx / src
Eric Bower  ·  2026-04-29

main.zig

   1const std = @import("std");
   2const posix = std.posix;
   3const builtin = @import("builtin");
   4const build_options = @import("build_options");
   5const ghostty_vt = @import("ghostty-vt");
   6const ipc = @import("ipc.zig");
   7const log = @import("log.zig");
   8const completions = @import("completions.zig");
   9const util = @import("util.zig");
  10const cross = @import("cross.zig");
  11const socket = @import("socket.zig");
  12
  13pub const version = build_options.version;
  14pub const git_sha = build_options.git_sha;
  15pub const ghostty_version = build_options.ghostty_version;
  16
  17var log_system = log.LogSystem{};
  18
  19pub const std_options: std.Options = .{
  20    .logFn = zmxLogFn,
  21    .log_level = .debug,
  22};
  23
  24fn zmxLogFn(
  25    comptime level: std.log.Level,
  26    comptime scope: @Type(.enum_literal),
  27    comptime format: []const u8,
  28    args: anytype,
  29) void {
  30    log_system.log(level, scope, format, args);
  31}
  32
  33/// Self-pipe woken by signal handlers. std.posix.poll loops on .INTR internally
  34/// (PollError has no Interrupted member), so a signal that lands during poll()
  35/// never surfaces; the handler writes a byte here and poll() wakes on POLLIN.
  36var sig_pipe: [2]posix.fd_t = .{ -1, -1 };
  37
  38// https://github.com/ziglang/zig/blob/738d2be9d6b6ef3ff3559130c05159ef53336224/lib/std/posix.zig#L3505
  39const O_NONBLOCK: usize = 1 << @bitOffsetOf(posix.O, "NONBLOCK");
  40
  41const SessionMatch = struct {
  42    name: []const u8,
  43    is_prefix: bool,
  44
  45    fn matches(self: SessionMatch, session_name: []const u8) bool {
  46        if (self.is_prefix) return std.mem.startsWith(u8, session_name, self.name);
  47        return std.mem.eql(u8, session_name, self.name);
  48    }
  49};
  50
  51fn parseSessionArg(alloc: std.mem.Allocator, raw: []const u8) !SessionMatch {
  52    if (raw.len > 0 and raw[raw.len - 1] == '*') {
  53        const name = try socket.getSeshName(alloc, raw[0 .. raw.len - 1]);
  54        return .{ .name = name, .is_prefix = true };
  55    }
  56    const name = try socket.getSeshName(alloc, raw);
  57    return .{ .name = name, .is_prefix = false };
  58}
  59
  60fn openSignalPipe() !void {
  61    sig_pipe = try posix.pipe2(.{ .CLOEXEC = true, .NONBLOCK = true });
  62}
  63
  64fn drainSignalPipe() void {
  65    var b: [16]u8 = undefined;
  66    while (true) {
  67        const n = posix.read(sig_pipe[0], &b) catch return;
  68        if (n == 0) return;
  69    }
  70}
  71
  72pub fn main() !void {
  73    // use c_allocator to avoid "reached unreachable code" panic in DebugAllocator when forking
  74    const alloc = std.heap.c_allocator;
  75
  76    // Every subcommand may write to a Unix-domain socket; a peer that
  77    // disappears between probe and send would otherwise kill us before
  78    // write() can return BrokenPipe. Inherited across fork, so this also
  79    // covers the daemon.
  80    ignoreSigpipe();
  81
  82    var args = try std.process.argsWithAllocator(alloc);
  83    defer args.deinit();
  84    _ = args.skip(); // skip program name
  85
  86    var cfg = try Cfg.init(alloc);
  87    defer cfg.deinit(alloc);
  88
  89    const log_path = try std.fs.path.join(alloc, &.{ cfg.log_dir, "zmx.log" });
  90    defer alloc.free(log_path);
  91    try log_system.init(alloc, log_path, cfg.log_mode);
  92    defer log_system.deinit();
  93
  94    const cmd = args.next() orelse {
  95        return list(&cfg, false);
  96    };
  97
  98    if (std.mem.eql(u8, cmd, "version") or std.mem.eql(u8, cmd, "v") or std.mem.eql(u8, cmd, "-v") or std.mem.eql(u8, cmd, "--version")) {
  99        return printVersion(&cfg);
 100    } else if (std.mem.eql(u8, cmd, "help") or std.mem.eql(u8, cmd, "h") or std.mem.eql(u8, cmd, "-h")) {
 101        return help();
 102    } else if (std.mem.eql(u8, cmd, "list") or std.mem.eql(u8, cmd, "l") or std.mem.eql(u8, cmd, "ls")) {
 103        const short = if (args.next()) |arg| std.mem.eql(u8, arg, "--short") else false;
 104        return list(&cfg, short);
 105    } else if (std.mem.eql(u8, cmd, "completions") or std.mem.eql(u8, cmd, "c")) {
 106        const arg = args.next() orelse return;
 107        const shell = completions.Shell.fromString(arg) orelse return;
 108        return printCompletions(shell);
 109    } else if (std.mem.eql(u8, cmd, "detach") or std.mem.eql(u8, cmd, "d")) {
 110        return detachAll(&cfg);
 111    } else if (std.mem.eql(u8, cmd, "history") or std.mem.eql(u8, cmd, "hi")) {
 112        var session_name: ?[]const u8 = null;
 113        var format: util.HistoryFormat = .plain;
 114        while (args.next()) |arg| {
 115            if (std.mem.eql(u8, arg, "--vt")) {
 116                format = .vt;
 117            } else if (std.mem.eql(u8, arg, "--html")) {
 118                format = .html;
 119            } else if (session_name == null) {
 120                session_name = arg;
 121            }
 122        }
 123        const sesh_env = socket.getSeshNameFromEnv();
 124        const sesh = try socket.getSeshName(alloc, session_name orelse sesh_env);
 125        defer alloc.free(sesh);
 126        return history(&cfg, sesh, format);
 127    } else if (std.mem.eql(u8, cmd, "attach") or std.mem.eql(u8, cmd, "a")) {
 128        const session_name = args.next() orelse "";
 129
 130        var command_args: std.ArrayList([]const u8) = .empty;
 131        defer command_args.deinit(alloc);
 132        while (args.next()) |arg| {
 133            try command_args.append(alloc, arg);
 134        }
 135
 136        const clients = try std.ArrayList(*Client).initCapacity(alloc, 10);
 137        var command: ?[][]const u8 = null;
 138        if (command_args.items.len > 0) {
 139            command = command_args.items;
 140        }
 141
 142        var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
 143        const cwd = std.posix.getcwd(&cwd_buf) catch "";
 144
 145        const sesh = try socket.getSeshName(alloc, session_name);
 146        defer alloc.free(sesh);
 147        var daemon = Daemon{
 148            .running = true,
 149            .cfg = &cfg,
 150            .alloc = alloc,
 151            .clients = clients,
 152            .session_name = sesh,
 153            .socket_path = undefined,
 154            .pid = undefined,
 155            .command = command,
 156            .cwd = cwd,
 157            .created_at = @intCast(std.time.timestamp()),
 158            .leader_client_fd = null,
 159        };
 160        daemon.socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
 161            error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
 162            error.OutOfMemory => return err,
 163        };
 164        std.log.info("socket path={s}", .{daemon.socket_path});
 165        return attach(&daemon);
 166    } else if (std.mem.eql(u8, cmd, "run") or std.mem.eql(u8, cmd, "r")) {
 167        const session_name = args.next() orelse "";
 168
 169        var cmd_args_raw: std.ArrayList([]const u8) = .empty;
 170        defer cmd_args_raw.deinit(alloc);
 171        var detached = false;
 172        while (args.next()) |arg| {
 173            if (std.mem.startsWith(u8, arg, "-d")) {
 174                detached = true;
 175                continue;
 176            }
 177            try cmd_args_raw.append(alloc, arg);
 178        }
 179        const clients = try std.ArrayList(*Client).initCapacity(alloc, 10);
 180
 181        var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
 182        const cwd = std.posix.getcwd(&cwd_buf) catch "";
 183
 184        const sesh = try socket.getSeshName(alloc, session_name);
 185        defer alloc.free(sesh);
 186        var daemon = Daemon{
 187            .running = true,
 188            .cfg = &cfg,
 189            .alloc = alloc,
 190            .clients = clients,
 191            .session_name = sesh,
 192            .socket_path = undefined,
 193            .pid = undefined,
 194            .command = null,
 195            .cwd = cwd,
 196            .created_at = @intCast(std.time.timestamp()),
 197            .is_task_mode = true,
 198            .leader_client_fd = null,
 199        };
 200        daemon.socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
 201            error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
 202            error.OutOfMemory => return err,
 203        };
 204        std.log.info("socket path={s}", .{daemon.socket_path});
 205        return run(&daemon, detached, cmd_args_raw.items);
 206    } else if (std.mem.eql(u8, cmd, "send") or std.mem.eql(u8, cmd, "s")) {
 207        const session_name = args.next() orelse "";
 208        if (session_name.len == 0) return error.SessionNameRequired;
 209
 210        var text_parts: std.ArrayList([]const u8) = .empty;
 211        defer text_parts.deinit(alloc);
 212        while (args.next()) |arg| {
 213            try text_parts.append(alloc, arg);
 214        }
 215
 216        const sesh = try socket.getSeshName(alloc, session_name);
 217        defer alloc.free(sesh);
 218        const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
 219            error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
 220            error.OutOfMemory => return err,
 221        };
 222        return send(&cfg, sesh, socket_path, text_parts.items, .Input);
 223    } else if (std.mem.eql(u8, cmd, "print") or std.mem.eql(u8, cmd, "p")) {
 224        const session_name = args.next() orelse "";
 225        if (session_name.len == 0) return error.SessionNameRequired;
 226
 227        var text_parts: std.ArrayList([]const u8) = .empty;
 228        defer text_parts.deinit(alloc);
 229        while (args.next()) |arg| {
 230            try text_parts.append(alloc, arg);
 231        }
 232
 233        const sesh = try socket.getSeshName(alloc, session_name);
 234        defer alloc.free(sesh);
 235        const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
 236            error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
 237            error.OutOfMemory => return err,
 238        };
 239        return send(&cfg, sesh, socket_path, text_parts.items, .Output);
 240    } else if (std.mem.eql(u8, cmd, "kill") or std.mem.eql(u8, cmd, "k")) {
 241        var stderr_buffer: [1024]u8 = undefined;
 242        var stderr_writer = std.fs.File.stderr().writer(&stderr_buffer);
 243        const stderr = &stderr_writer.interface;
 244
 245        var matchers: std.ArrayList(SessionMatch) = .empty;
 246        defer {
 247            for (matchers.items) |m| {
 248                alloc.free(m.name);
 249            }
 250            matchers.deinit(alloc);
 251        }
 252        var force = false;
 253        while (args.next()) |session_name| {
 254            if (std.mem.eql(u8, session_name, "--force")) {
 255                force = true;
 256                continue;
 257            }
 258            const m = try parseSessionArg(alloc, session_name);
 259            try matchers.append(alloc, m);
 260        }
 261        if (matchers.items.len == 0) {
 262            return error.SessionNameRequired;
 263        }
 264        var sessions = try util.get_session_entries(alloc, cfg.socket_dir);
 265        defer {
 266            for (sessions.items) |session| {
 267                session.deinit(alloc);
 268            }
 269            sessions.deinit(alloc);
 270        }
 271
 272        for (sessions.items) |session| {
 273            for (matchers.items) |m| {
 274                if (!m.matches(session.name)) {
 275                    continue;
 276                }
 277
 278                kill(&cfg, session.name, force) catch |err| {
 279                    try stderr.print(
 280                        "failed to kill session={s}: {s}\n",
 281                        .{ session.name, @errorName(err) },
 282                    );
 283                    try stderr.flush();
 284                };
 285                break;
 286            }
 287        }
 288    } else if (std.mem.eql(u8, cmd, "wait") or std.mem.eql(u8, cmd, "w")) {
 289        var matchers: std.ArrayList(SessionMatch) = .empty;
 290        defer {
 291            for (matchers.items) |m| {
 292                alloc.free(m.name);
 293            }
 294            matchers.deinit(alloc);
 295        }
 296        while (args.next()) |session_name| {
 297            const m = try parseSessionArg(alloc, session_name);
 298            try matchers.append(alloc, m);
 299        }
 300        if (matchers.items.len == 0) {
 301            return error.SessionNameRequired;
 302        }
 303        return wait(&cfg, matchers);
 304    } else if (std.mem.eql(u8, cmd, "tail") or std.mem.eql(u8, cmd, "t")) {
 305        var matchers: std.ArrayList(SessionMatch) = .empty;
 306        defer {
 307            for (matchers.items) |m| {
 308                alloc.free(m.name);
 309            }
 310            matchers.deinit(alloc);
 311        }
 312        while (args.next()) |session_name| {
 313            const m = try parseSessionArg(alloc, session_name);
 314            try matchers.append(alloc, m);
 315        }
 316        if (matchers.items.len == 0) {
 317            return error.SessionNameRequired;
 318        }
 319
 320        // Resolve matchers against session list to get actual session names.
 321        var resolved_names: std.ArrayList([]const u8) = .empty;
 322        defer {
 323            for (resolved_names.items) |name| {
 324                alloc.free(name);
 325            }
 326            resolved_names.deinit(alloc);
 327        }
 328
 329        var any_prefix = false;
 330        for (matchers.items) |m| {
 331            if (m.is_prefix) {
 332                any_prefix = true;
 333                break;
 334            }
 335        }
 336
 337        if (any_prefix) {
 338            var sessions = try util.get_session_entries(alloc, cfg.socket_dir);
 339            defer {
 340                for (sessions.items) |session| {
 341                    session.deinit(alloc);
 342                }
 343                sessions.deinit(alloc);
 344            }
 345            for (sessions.items) |session| {
 346                for (matchers.items) |m| {
 347                    if (m.matches(session.name)) {
 348                        try resolved_names.append(alloc, try alloc.dupe(u8, session.name));
 349                        break;
 350                    }
 351                }
 352            }
 353        }
 354        // Add exact-match names directly.
 355        for (matchers.items) |m| {
 356            if (!m.is_prefix) {
 357                try resolved_names.append(alloc, try alloc.dupe(u8, m.name));
 358            }
 359        }
 360
 361        var client_socket_fds = try std.ArrayList(i32).initCapacity(alloc, resolved_names.items.len);
 362        defer {
 363            for (client_socket_fds.items) |client_fd| {
 364                posix.close(client_fd);
 365            }
 366            client_socket_fds.deinit(alloc);
 367        }
 368
 369        for (resolved_names.items) |session_name| {
 370            const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, session_name) catch |err| switch (err) {
 371                error.NameTooLong => return socket.printSessionNameTooLong(session_name, cfg.socket_dir),
 372                error.OutOfMemory => return err,
 373            };
 374            const client_sock = try socket.sessionConnect(socket_path);
 375            try client_socket_fds.append(alloc, client_sock);
 376        }
 377        _ = try tail(client_socket_fds, false, false);
 378    } else if (std.mem.eql(u8, cmd, "write") or std.mem.eql(u8, cmd, "wr")) {
 379        const session_name = args.next() orelse "";
 380        if (session_name.len == 0) return error.SessionNameRequired;
 381        const file_path = args.next() orelse "";
 382        if (file_path.len == 0) return error.FilePathRequired;
 383
 384        var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
 385        const cwd = std.posix.getcwd(&cwd_buf) catch "";
 386        const clients = try std.ArrayList(*Client).initCapacity(alloc, 10);
 387        const sesh = try socket.getSeshName(alloc, session_name);
 388        defer alloc.free(sesh);
 389        var daemon = Daemon{
 390            .running = true,
 391            .cfg = &cfg,
 392            .alloc = alloc,
 393            .clients = clients,
 394            .session_name = sesh,
 395            .socket_path = undefined,
 396            .pid = undefined,
 397            .command = null,
 398            .cwd = cwd,
 399            .created_at = @intCast(std.time.timestamp()),
 400            .is_task_mode = true,
 401            .leader_client_fd = null,
 402        };
 403        daemon.socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
 404            error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
 405            error.OutOfMemory => return err,
 406        };
 407        std.log.info("socket path={s}", .{daemon.socket_path});
 408        try writeFile(&daemon, file_path);
 409    } else {
 410        return help();
 411    }
 412}
 413
 414/// Client represents each terminal that has connected to a session.
 415///
 416/// Multiple Clients can connect to a single session.
 417const Client = struct {
 418    alloc: std.mem.Allocator,
 419    socket_fd: i32,
 420    has_pending_output: bool = false,
 421    read_buf: ipc.SocketBuffer,
 422    write_buf: std.ArrayList(u8),
 423
 424    pub fn deinit(self: *Client) void {
 425        posix.close(self.socket_fd);
 426        self.read_buf.deinit();
 427        self.write_buf.deinit(self.alloc);
 428    }
 429};
 430
 431/// Cfg is zmx's configuration container.
 432///
 433/// The purpose of this container is to hold anything that can be modified by the user.
 434const Cfg = struct {
 435    socket_dir: []const u8,
 436    log_dir: []const u8,
 437    max_scrollback: usize = 10_000_000,
 438    dir_mode: u32 = 0o750,
 439    log_mode: u32 = 0o640,
 440
 441    pub fn init(alloc: std.mem.Allocator) !Cfg {
 442        const socket_dir = try socketDir(alloc);
 443        const log_dir = try std.fmt.allocPrint(alloc, "{s}/logs", .{socket_dir});
 444        errdefer alloc.free(log_dir);
 445
 446        const dir_mode = if (std.posix.getenv("ZMX_DIR_MODE")) |m|
 447            std.fmt.parseInt(u32, m, 8) catch 0o750
 448        else
 449            0o750;
 450
 451        const log_mode = if (std.posix.getenv("ZMX_LOG_MODE")) |m|
 452            std.fmt.parseInt(u32, m, 8) catch 0o640
 453        else
 454            0o640;
 455
 456        var cfg = Cfg{
 457            .socket_dir = socket_dir,
 458            .log_dir = log_dir,
 459            .dir_mode = dir_mode,
 460            .log_mode = log_mode,
 461        };
 462
 463        try cfg.mkdir();
 464
 465        return cfg;
 466    }
 467
 468    fn socketDir(alloc: std.mem.Allocator) ![]const u8 {
 469        const tmpdir = std.mem.trimRight(u8, posix.getenv("TMPDIR") orelse "/tmp", "/");
 470        const uid = posix.getuid();
 471
 472        const socket_dir: []const u8 = if (posix.getenv("ZMX_DIR")) |zmxdir|
 473            try alloc.dupe(u8, zmxdir)
 474        else if (posix.getenv("XDG_RUNTIME_DIR")) |xdg_runtime|
 475            try std.fmt.allocPrint(alloc, "{s}/zmx", .{xdg_runtime})
 476        else
 477            try std.fmt.allocPrint(alloc, "{s}/zmx-{d}", .{ tmpdir, uid });
 478        errdefer alloc.free(socket_dir);
 479
 480        return socket_dir;
 481    }
 482
 483    pub fn deinit(self: *Cfg, alloc: std.mem.Allocator) void {
 484        if (self.socket_dir.len > 0) alloc.free(self.socket_dir);
 485        if (self.log_dir.len > 0) alloc.free(self.log_dir);
 486    }
 487
 488    pub fn mkdir(self: *Cfg) !void {
 489        posix.mkdirat(posix.AT.FDCWD, self.socket_dir, @intCast(self.dir_mode)) catch |err| switch (err) {
 490            error.PathAlreadyExists => {},
 491            else => return err,
 492        };
 493
 494        posix.mkdirat(posix.AT.FDCWD, self.log_dir, @intCast(self.dir_mode)) catch |err| switch (err) {
 495            error.PathAlreadyExists => {},
 496            else => return err,
 497        };
 498    }
 499};
 500
 501test "Cfg.init uses default modes when env vars are not set" {
 502    const alloc = std.testing.allocator;
 503
 504    // Ensure they are not set
 505    _ = cross.c.unsetenv("ZMX_DIR_MODE");
 506    _ = cross.c.unsetenv("ZMX_LOG_MODE");
 507
 508    var cfg = try Cfg.init(alloc);
 509    defer cfg.deinit(alloc);
 510
 511    try std.testing.expectEqual(@as(u32, 0o750), cfg.dir_mode);
 512    try std.testing.expectEqual(@as(u32, 0o640), cfg.log_mode);
 513}
 514
 515test "Cfg.init uses custom modes from env vars" {
 516    const alloc = std.testing.allocator;
 517
 518    // Set custom octal values
 519    _ = cross.c.setenv("ZMX_DIR_MODE", "770", 1);
 520    _ = cross.c.setenv("ZMX_LOG_MODE", "660", 1);
 521    defer {
 522        _ = cross.c.unsetenv("ZMX_DIR_MODE");
 523        _ = cross.c.unsetenv("ZMX_LOG_MODE");
 524    }
 525
 526    var cfg = try Cfg.init(alloc);
 527    defer cfg.deinit(alloc);
 528
 529    try std.testing.expectEqual(@as(u32, 0o770), cfg.dir_mode);
 530    try std.testing.expectEqual(@as(u32, 0o660), cfg.log_mode);
 531}
 532
 533/// Daemon is responsible for managing a zmx session.
 534///
 535/// It holds all the state for a running session.  Instead of a single daemon for all sessions, we
 536/// create a daemon for every session.  This has some benefits. The ipc communication between
 537/// session clients and the daemon doesn't need to be tagged with the session name.  If a daemon
 538/// crashes for one session won't crash all the other sessions.
 539///
 540/// Conceptually it's also much simpler to reason about.
 541const Daemon = struct {
 542    cfg: *Cfg,
 543    alloc: std.mem.Allocator,
 544    clients: std.ArrayList(*Client),
 545    // This control which client is the leader.  The leader controls terminal state and
 546    // cols/rows of session.
 547    leader_client_fd: ?i32,
 548    session_name: []const u8,
 549    socket_path: []const u8,
 550    running: bool,
 551    pid: i32,
 552    command: ?[]const []const u8 = null,
 553    cwd: []const u8 = "",
 554    has_pty_output: bool = false,
 555    has_had_client: bool = false,
 556    has_terminal_client: bool = false, // true only after a real attach (.Init received)
 557    created_at: u64, // unix timestamp (ns)
 558    is_task_mode: bool = false, // flag for when session is run as a task
 559    task_exit_code: ?u8 = null, // null = running or n/a, set when task completes
 560    task_ended_at: ?u64 = null, // timestamp when task exited
 561    is_fish: bool = false, // true if session shell is fish (affects exit code variable)
 562    pty_fd: i32 = -1, // set by daemonLoop so handleRun can probe the foreground process
 563    pty_write_buf: std.ArrayList(u8) = .empty,
 564
 565    const EnsureSessionResult = struct {
 566        created: bool,
 567        is_daemon: bool,
 568    };
 569
 570    pub fn deinit(self: *Daemon) void {
 571        self.clients.deinit(self.alloc);
 572        self.pty_write_buf.deinit(self.alloc);
 573        self.alloc.free(self.socket_path);
 574    }
 575
 576    pub fn shutdown(self: *Daemon) void {
 577        std.log.info("shutting down daemon session={s}", .{self.session_name});
 578        self.running = false;
 579
 580        for (self.clients.items) |client| {
 581            client.deinit();
 582            self.alloc.destroy(client);
 583        }
 584        self.clients.clearRetainingCapacity();
 585    }
 586
 587    pub fn closeClient(self: *Daemon, client: *Client, i: usize, shutdown_on_last: bool) bool {
 588        const fd = client.socket_fd;
 589        // leader is disconnected, remove ref and let another client claim leader on input
 590        if (self.leader_client_fd == client.socket_fd) {
 591            std.log.info(
 592                "unsetting leader session={s} fd={d}",
 593                .{ self.session_name, client.socket_fd },
 594            );
 595            self.leader_client_fd = null;
 596        }
 597        client.deinit();
 598        self.alloc.destroy(client);
 599        _ = self.clients.orderedRemove(i);
 600        std.log.info("client disconnected fd={d} remaining={d}", .{ fd, self.clients.items.len });
 601        if (shutdown_on_last and self.clients.items.len == 0) {
 602            self.shutdown();
 603            return true;
 604        }
 605        return false;
 606    }
 607
 608    fn setLeader(self: *Daemon, client: *Client) !void {
 609        std.log.info("setting new leader client_fd={d}", .{client.socket_fd});
 610        self.leader_client_fd = client.socket_fd;
 611        // Send a resize message to the client so it can send us back their window size
 612        // so we can resize the pty and ghostty state.
 613        try ipc.appendMessage(self.alloc, &client.write_buf, .Resize, "");
 614        client.has_pending_output = true;
 615    }
 616
 617    /// Runs in the forked child. Either execs or returns an error (caller
 618    /// must exit on error -- returning would fall through to parent code).
 619    fn execChild(self: *Daemon) !noreturn {
 620        const alloc = std.heap.c_allocator;
 621
 622        // main() set SIGPIPE to SIG_IGN, which (unlike handlers) survives
 623        // exec. Restore the default so the shell and its children behave
 624        // normally (e.g. `yes | head` should exit 141 via SIGPIPE).
 625        const dfl: posix.Sigaction = .{
 626            .handler = .{ .handler = posix.SIG.DFL },
 627            .mask = posix.sigemptyset(),
 628            .flags = 0,
 629        };
 630        posix.sigaction(posix.SIG.PIPE, &dfl, null);
 631
 632        const session_env = try std.fmt.allocPrintSentinel(
 633            alloc,
 634            "ZMX_SESSION={s}",
 635            .{self.session_name},
 636            0,
 637        );
 638        _ = cross.c.putenv(session_env.ptr);
 639
 640        if (self.command) |cmd_args| {
 641            const argv = try alloc.allocSentinel(?[*:0]const u8, cmd_args.len, null);
 642            for (cmd_args, 0..) |arg, i| {
 643                argv[i] = try alloc.dupeZ(u8, arg);
 644            }
 645            const err = std.posix.execvpeZ(argv[0].?, argv.ptr, std.c.environ);
 646            std.log.err("execvpe failed: cmd={s} err={s}", .{ cmd_args[0], @errorName(err) });
 647            std.posix.exit(1);
 648        }
 649
 650        const shell = util.detectShell();
 651        // Use "-shellname" as argv[0] to signal login shell (traditional method)
 652        const login_shell = try std.fmt.allocPrintSentinel(
 653            alloc,
 654            "-{s}",
 655            .{std.fs.path.basename(shell)},
 656            0,
 657        );
 658        const argv = [_:null]?[*:0]const u8{ login_shell, null };
 659        const err = std.posix.execveZ(shell, &argv, std.c.environ);
 660        std.log.err("execve failed: err={s}", .{@errorName(err)});
 661        std.posix.exit(1);
 662    }
 663
 664    /// spawnPty runs forkpty() and executes the shell or shell command the user provides.
 665    fn spawnPty(self: *Daemon) !c_int {
 666        const size = ipc.getTerminalSize(posix.STDOUT_FILENO);
 667        var ws: cross.c.struct_winsize = .{
 668            .ws_row = size.rows,
 669            .ws_col = size.cols,
 670            .ws_xpixel = 0,
 671            .ws_ypixel = 0,
 672        };
 673
 674        var master_fd: c_int = undefined;
 675        const pid = cross.forkpty(&master_fd, null, null, &ws);
 676        if (pid < 0) {
 677            return error.ForkPtyFailed;
 678        }
 679
 680        if (pid == 0) { // child pid code path
 681            // In the forked child, ANY error must exit rather than propagate:
 682            // a returned error falls through to the parent code path below,
 683            // running a second daemon on the same socket (or worse, hitting
 684            // errdefers that delete the parent's socket file).
 685            execChild(self) catch |err| {
 686                std.log.err("child setup failed: {s}", .{@errorName(err)});
 687                std.posix.exit(1);
 688            };
 689            unreachable; // execChild either execs or exits, never returns ok
 690        }
 691        // master pid code path
 692        self.pid = pid;
 693        std.log.info("pty spawned session={s} pid={d}", .{ self.session_name, pid });
 694
 695        // make pty non-blocking
 696        const flags = try posix.fcntl(master_fd, posix.F.GETFL, 0);
 697        _ = try posix.fcntl(master_fd, posix.F.SETFL, flags | O_NONBLOCK);
 698        return master_fd;
 699    }
 700
 701    /// ensureSession "upserts" a session by checking if the unix socket exists already.
 702    /// If not it creates one and spawns the daemon.
 703    fn ensureSession(self: *Daemon) !EnsureSessionResult {
 704        var dir = try std.fs.openDirAbsolute(self.cfg.socket_dir, .{});
 705        defer dir.close();
 706
 707        const exists = try socket.sessionExists(dir, self.session_name);
 708        var should_create = !exists;
 709
 710        if (exists) {
 711            if (ipc.connectSession(self.socket_path)) |fd| {
 712                posix.close(fd);
 713                if (self.command != null) {
 714                    std.log.warn(
 715                        "session already exists, ignoring command session={s}",
 716                        .{self.session_name},
 717                    );
 718                }
 719            } else |err| switch (err) {
 720                // Daemon is definitively gone: safe to replace.
 721                error.ConnectionRefused => {
 722                    socket.cleanupStaleSocket(dir, self.session_name);
 723                    should_create = true;
 724                },
 725                // Connect failed for an unusual reason. The check is only to
 726                // decide create-vs-attach; the socket file exists, so proceed
 727                // to attach rather than fail or orphan.
 728                else => {
 729                    std.log.warn(
 730                        "connect failed ({s}), proceeding to attach session={s}",
 731                        .{ @errorName(err), self.session_name },
 732                    );
 733                },
 734            }
 735        }
 736
 737        if (should_create) {
 738            std.log.info("creating session={s}", .{self.session_name});
 739            const server_sock_fd = try socket.createSocket(self.socket_path);
 740
 741            // creates the daemon
 742            const pid = try posix.fork();
 743            if (pid == 0) { // child (daemon)
 744                // becomes the session leader and detaches process from its controlling terminal
 745                _ = try posix.setsid();
 746
 747                log_system.deinit();
 748
 749                // Redirect stdin/stdout/stderr to /dev/null. The daemon
 750                // communicates via its unix socket, not stdio. Without
 751                // this, any pipe on FDs 0-2 (e.g. from bats' `run`
 752                // keyword) stays open for the daemon's lifetime, causing
 753                // the caller to hang waiting for EOF.
 754                {
 755                    const devnull = std.posix.open(
 756                        "/dev/null",
 757                        .{ .ACCMODE = .RDWR },
 758                        0,
 759                    ) catch |err| {
 760                        std.log.warn("failed to open /dev/null: {s}", .{@errorName(err)});
 761                        return err;
 762                    };
 763                    inline for (.{ posix.STDIN_FILENO, posix.STDOUT_FILENO, posix.STDERR_FILENO }) |fd| {
 764                        _ = posix.dup2(devnull, fd) catch |err| {
 765                            std.log.warn("dup2 /dev/null -> {d}: {s}", .{ fd, @errorName(err) });
 766                            return err;
 767                        };
 768                    }
 769                    if (devnull > 2) posix.close(devnull);
 770                }
 771
 772                // Close file descriptors inherited from the parent that the
 773                // daemon doesn't need. This prevents test harnesses (like
 774                // bats) from hanging -- they wait for their internal FDs (3+)
 775                // to close before exiting.
 776                //
 777                // Must run BEFORE log_system.init() otherwise the new log
 778                // FD gets closed, and spawnPty() reuses that FD number for
 779                // the PTY master, causing log writes to leak into the terminal.
 780                //
 781                // Skip server_sock_fd (needed for IPC) and dir.fd (needed to
 782                // delete the socket file on shutdown).
 783                {
 784                    const dir_fd = @as(i32, @intCast(dir.fd));
 785                    var fd: i32 = 3;
 786                    while (fd < 64) : (fd += 1) {
 787                        if (fd == server_sock_fd or fd == dir_fd) continue;
 788                        _ = std.c.close(fd);
 789                    }
 790                }
 791
 792                const session_log_name = try std.fmt.allocPrint(
 793                    self.alloc,
 794                    "{s}.log",
 795                    .{self.session_name},
 796                );
 797                defer self.alloc.free(session_log_name);
 798                const session_log_path = try std.fs.path.join(
 799                    self.alloc,
 800                    &.{ self.cfg.log_dir, session_log_name },
 801                );
 802                defer self.alloc.free(session_log_path);
 803                try log_system.init(self.alloc, session_log_path, self.cfg.log_mode);
 804
 805                // If spawnPty fails, clean up here. Once it succeeds,
 806                // the inner block's defer takes ownership of cleanup to
 807                // avoid double-closing server_sock_fd on daemonLoop error.
 808                const pty_fd = self.spawnPty() catch |err| {
 809                    posix.close(server_sock_fd);
 810                    dir.deleteFile(self.session_name) catch {};
 811                    return err;
 812                };
 813
 814                defer {
 815                    self.handleKill();
 816                    self.deinit();
 817                    posix.close(pty_fd);
 818                    _ = posix.waitpid(self.pid, 0);
 819                    posix.close(server_sock_fd);
 820                    std.log.info("deleting socket file session={s}", .{self.session_name});
 821                    dir.deleteFile(self.session_name) catch |err| {
 822                        std.log.warn("failed to delete socket file err={s}", .{@errorName(err)});
 823                    };
 824                }
 825
 826                try daemonLoop(self, server_sock_fd, pty_fd);
 827                return .{ .created = true, .is_daemon = true };
 828            }
 829            posix.close(server_sock_fd);
 830            std.Thread.sleep(10 * std.time.ns_per_ms);
 831            return .{ .created = true, .is_daemon = false };
 832        }
 833
 834        return .{ .created = false, .is_daemon = false };
 835    }
 836
 837    const PTY_WRITE_BUF_MAX = 256 * 1024;
 838
 839    /// Queue bytes for the PTY's stdin. Flushed by daemonLoop on POLLOUT.
 840    /// Drops the payload if the buffer is over cap -- same failure mode as
 841    /// the old direct-write ptyWrite (drop on EAGAIN), just at a 64x higher
 842    /// threshold. Capping avoids OOM when the shell stops reading; dropping
 843    /// new (not old) bytes avoids tearing a partially-accepted sequence.
 844    fn queuePtyInput(self: *Daemon, data: []const u8) void {
 845        if (data.len == 0) return;
 846        if (self.pty_write_buf.items.len + data.len > PTY_WRITE_BUF_MAX) {
 847            std.log.warn(
 848                "pty input dropped {d} bytes (buffer full, shell not reading)",
 849                .{data.len},
 850            );
 851            return;
 852        }
 853        std.log.debug("buffering pty input data={x}", .{data});
 854        self.pty_write_buf.appendSlice(self.alloc, data) catch |err| {
 855            std.log.warn(
 856                "pty input dropped {d} bytes: {s}",
 857                .{ data.len, @errorName(err) },
 858            );
 859        };
 860    }
 861
 862    pub fn handleInput(self: *Daemon, client: *Client, payload: []const u8) !void {
 863        std.log.debug("buffering pty input data={x}", .{payload});
 864        // client is leader, send entire payload (ansi escape codes + text)
 865        if (self.leader_client_fd == client.socket_fd) {
 866            self.queuePtyInput(payload);
 867            return;
 868        }
 869
 870        // check if leader needs to be updated by detecting any user input
 871        if (util.isUserInput(payload)) {
 872            try self.setLeader(client);
 873            self.queuePtyInput(payload);
 874        }
 875    }
 876
 877    pub fn handleSwitch(self: *Daemon, session_name: []const u8) !void {
 878        for (self.clients.items) |client| {
 879            if (self.leader_client_fd == client.socket_fd) {
 880                ipc.appendMessage(
 881                    self.alloc,
 882                    &client.write_buf,
 883                    .Switch,
 884                    session_name,
 885                ) catch |err| {
 886                    std.log.warn(
 887                        "failed to buffer terminal state for client err={s}",
 888                        .{@errorName(err)},
 889                    );
 890                };
 891                client.has_pending_output = true;
 892                return;
 893            }
 894        }
 895        return error.NoLeaderFound;
 896    }
 897
 898    pub fn handleInit(
 899        self: *Daemon,
 900        client: *Client,
 901        pty_fd: i32,
 902        term: *ghostty_vt.Terminal,
 903        payload: []const u8,
 904    ) !void {
 905        if (payload.len != @sizeOf(ipc.Resize)) return;
 906
 907        // Serialize terminal state BEFORE resize to capture correct cursor position.
 908        // Resizing triggers reflow which can move the cursor, and the shell's
 909        // SIGWINCH-triggered redraw will run after our snapshot is sent.
 910        // Only serialize on re-attach (has_had_client), not first attach, to avoid
 911        // interfering with shell initialization (DA1 queries, etc.)
 912        if (self.has_pty_output and self.has_had_client) {
 913            const cursor = &term.screens.active.cursor;
 914            std.log.debug(
 915                "cursor before serialize: x={d} y={d} pending_wrap={}",
 916                .{ cursor.x, cursor.y, cursor.pending_wrap },
 917            );
 918            if (util.serializeTerminalState(self.alloc, term)) |term_output| {
 919                std.log.debug("serialize terminal state", .{});
 920                // Rewrite OSC 133;A to include redraw=0 so the outer terminal
 921                // does not clear prompt lines on resize (issue #111).
 922                const restore_data = util.rewritePromptRedraw(self.alloc, term_output) orelse term_output;
 923                defer self.alloc.free(term_output);
 924                defer if (restore_data.ptr != term_output.ptr) self.alloc.free(restore_data);
 925                ipc.appendMessage(self.alloc, &client.write_buf, .Output, restore_data) catch |err| {
 926                    std.log.warn(
 927                        "failed to buffer terminal state for client err={s}",
 928                        .{@errorName(err)},
 929                    );
 930                };
 931                client.has_pending_output = true;
 932            }
 933        }
 934
 935        // no leader is set so set one
 936        if (self.leader_client_fd == null) {
 937            try self.setLeader(client);
 938        }
 939
 940        // only resize if leader
 941        if (self.leader_client_fd == client.socket_fd) {
 942            const resize = std.mem.bytesToValue(ipc.Resize, payload);
 943            var ws: cross.c.struct_winsize = .{
 944                .ws_row = resize.rows,
 945                .ws_col = resize.cols,
 946                .ws_xpixel = 0,
 947                .ws_ypixel = 0,
 948            };
 949            _ = cross.c.ioctl(pty_fd, cross.c.TIOCSWINSZ, &ws);
 950            // Disable prompt_redraw before resize. The daemon's internal terminal
 951            // would otherwise clear prompt lines expecting the shell to redraw them,
 952            // but the shell's redraw goes to the PTY (forwarded to clients), not to
 953            // this daemon terminal. The clearing corrupts the daemon's snapshot state.
 954            const saved_prompt_redraw = term.flags.shell_redraws_prompt;
 955            term.flags.shell_redraws_prompt = .false;
 956            defer term.flags.shell_redraws_prompt = saved_prompt_redraw;
 957            try term.resize(self.alloc, resize.cols, resize.rows);
 958
 959            // Mark that we've had a client init, so subsequent clients get terminal state
 960            self.has_had_client = true;
 961            self.has_terminal_client = true;
 962
 963            std.log.debug("init resize rows={d} cols={d}", .{ resize.rows, resize.cols });
 964        }
 965    }
 966
 967    pub fn handleResize(
 968        self: *Daemon,
 969        client: *Client,
 970        pty_fd: i32,
 971        term: *ghostty_vt.Terminal,
 972        payload: []const u8,
 973    ) !void {
 974        if (payload.len != @sizeOf(ipc.Resize)) return;
 975        if (self.leader_client_fd == null) {
 976            try self.setLeader(client);
 977        }
 978        // only leader can resize
 979        if (self.leader_client_fd != client.socket_fd) return;
 980
 981        const resize = std.mem.bytesToValue(ipc.Resize, payload);
 982        var ws: cross.c.struct_winsize = .{
 983            .ws_row = resize.rows,
 984            .ws_col = resize.cols,
 985            .ws_xpixel = 0,
 986            .ws_ypixel = 0,
 987        };
 988        _ = cross.c.ioctl(pty_fd, cross.c.TIOCSWINSZ, &ws);
 989        // Disable prompt_redraw before resize (same rationale as handleInit).
 990        const saved_prompt_redraw = term.flags.shell_redraws_prompt;
 991        term.flags.shell_redraws_prompt = .false;
 992        defer term.flags.shell_redraws_prompt = saved_prompt_redraw;
 993        try term.resize(self.alloc, resize.cols, resize.rows);
 994        std.log.debug("resize rows={d} cols={d}", .{ resize.rows, resize.cols });
 995    }
 996
 997    pub fn handleDetach(self: *Daemon, client: *Client, i: usize) void {
 998        std.log.info("client detach session={s} fd={d}", .{ self.session_name, client.socket_fd });
 999        _ = self.closeClient(client, i, false);
1000    }
1001
1002    pub fn handleDetachAll(self: *Daemon) void {
1003        std.log.info("detach all clients={d}", .{self.clients.items.len});
1004        for (self.clients.items) |client_to_close| {
1005            client_to_close.deinit();
1006            self.alloc.destroy(client_to_close);
1007        }
1008        self.clients.clearRetainingCapacity();
1009    }
1010
1011    pub fn handleKill(self: *Daemon) void {
1012        std.log.info("kill received session={s}", .{self.session_name});
1013        self.shutdown();
1014        // gracefully shutdown shell processes, shells tend to ignore SIGTERM so we send SIGHUP
1015        // instead
1016        //   https://www.gnu.org/software/bash/manual/html_node/Signals.html
1017        // negative pid means kill process and children
1018        std.log.info("sending SIGHUP session={s} pid={d}", .{ self.session_name, self.pid });
1019        posix.kill(-self.pid, posix.SIG.HUP) catch |err| {
1020            std.log.warn("failed to send SIGHUP to pty child err={s}", .{@errorName(err)});
1021        };
1022        std.Thread.sleep(500 * std.time.ns_per_ms);
1023        posix.kill(-self.pid, posix.SIG.KILL) catch |err| {
1024            std.log.warn("failed to send SIGKILL to pty child err={s}", .{@errorName(err)});
1025        };
1026    }
1027
1028    pub fn handleInfo(self: *Daemon, client: *Client) !void {
1029        // zeroes() so asBytes() doesn't ship struct padding + unused cmd/cwd
1030        // tail bytes (daemon stack contents) to clients.
1031        var info = std.mem.zeroes(ipc.Info);
1032        info.clients_len = self.clients.items.len - 1;
1033        info.pid = self.pid;
1034        info.created_at = self.created_at;
1035        info.task_ended_at = self.task_ended_at orelse 0;
1036        info.task_exit_code = self.task_exit_code orelse 0;
1037
1038        // Build command string from args, re-quoting args that contain
1039        // shell-special characters so the displayed command is copy-pasteable.
1040        const cur_cmd = self.command;
1041        if (cur_cmd) |args| {
1042            for (args, 0..) |arg, i| {
1043                const quoted = if (util.shellNeedsQuoting(arg))
1044                    util.shellQuote(self.alloc, arg) catch null
1045                else
1046                    null;
1047                defer if (quoted) |q| self.alloc.free(q);
1048                const src = quoted orelse arg;
1049
1050                const need = src.len + @as(usize, if (i > 0) 1 else 0);
1051                if (info.cmd_len + need > ipc.MAX_CMD_LEN) {
1052                    const ellipsis = "...";
1053                    if (info.cmd_len + ellipsis.len <= ipc.MAX_CMD_LEN) {
1054                        @memcpy(info.cmd[info.cmd_len..][0..ellipsis.len], ellipsis);
1055                        info.cmd_len += ellipsis.len;
1056                    }
1057                    break;
1058                }
1059
1060                if (i > 0) {
1061                    info.cmd[info.cmd_len] = ' ';
1062                    info.cmd_len += 1;
1063                }
1064                @memcpy(info.cmd[info.cmd_len..][0..src.len], src);
1065                info.cmd_len += @intCast(src.len);
1066            }
1067        }
1068
1069        info.cwd_len = @intCast(@min(self.cwd.len, ipc.MAX_CWD_LEN));
1070        @memcpy(info.cwd[0..info.cwd_len], self.cwd[0..info.cwd_len]);
1071
1072        try ipc.appendMessage(self.alloc, &client.write_buf, .Info, std.mem.asBytes(&info));
1073        client.has_pending_output = true;
1074    }
1075
1076    pub fn handleHistory(
1077        self: *Daemon,
1078        client: *Client,
1079        term: *ghostty_vt.Terminal,
1080        payload: []const u8,
1081    ) !void {
1082        const format: util.HistoryFormat = if (payload.len > 0)
1083            std.meta.intToEnum(util.HistoryFormat, payload[0]) catch .plain
1084        else
1085            .plain;
1086        if (util.serializeTerminal(self.alloc, term, format)) |output| {
1087            defer self.alloc.free(output);
1088            try ipc.appendMessage(self.alloc, &client.write_buf, .History, output);
1089            client.has_pending_output = true;
1090        } else {
1091            try ipc.appendMessage(self.alloc, &client.write_buf, .History, "");
1092            client.has_pending_output = true;
1093        }
1094    }
1095
1096    pub fn handleRun(self: *Daemon, client: *Client, payload: []const u8) !void {
1097        // Reset task tracking so the new command's exit marker is detected.
1098        // Without this, a second `zmx run` on the same session is ignored
1099        // because task_exit_code is still set from the first run.
1100        self.task_exit_code = null;
1101        self.task_ended_at = null;
1102        self.is_task_mode = true;
1103
1104        if (payload.len == 0) return;
1105
1106        // Auto-detect the foreground process on the PTY to determine shell type.
1107        if (self.pty_fd >= 0) {
1108            var name_buf: [64]u8 = undefined;
1109            if (cross.getForegroundProcessName(self.pty_fd, &name_buf)) |name| {
1110                self.is_fish = std.mem.eql(u8, name, "fish");
1111                std.log.debug("foreground process={s} is_fish={}", .{ name, self.is_fish });
1112            }
1113        }
1114        const cmd = payload;
1115
1116        // Daemon appends the task marker so the client never injects
1117        // shell-specific syntax, keeping Ctrl-C recovery clean.
1118        const marker = if (self.is_fish)
1119            "; echo ZMX_TASK_COMPLETED:$status"
1120        else
1121            "; echo ZMX_TASK_COMPLETED:$?";
1122
1123        if (cmd.len > 0 and cmd[cmd.len - 1] == '\r') {
1124            self.queuePtyInput(cmd[0 .. cmd.len - 1]);
1125        } else {
1126            self.queuePtyInput(cmd);
1127        }
1128        self.queuePtyInput(marker);
1129        self.queuePtyInput("\r");
1130
1131        try ipc.appendMessage(self.alloc, &client.write_buf, .Ack, "");
1132        client.has_pending_output = true;
1133        self.has_had_client = true;
1134        std.log.debug("run command len={d}", .{payload.len});
1135    }
1136
1137    pub fn handleOutput(self: *Daemon, payload: []const u8, vt_stream: anytype) !void {
1138        vt_stream.nextSlice(payload);
1139        self.has_pty_output = true;
1140        for (self.clients.items) |client| {
1141            try ipc.appendMessage(self.alloc, &client.write_buf, .Output, payload);
1142            client.has_pending_output = true;
1143        }
1144        if (self.clients.items.len > 0) {
1145            posix.kill(self.pid, posix.SIG.WINCH) catch |err| {
1146                std.log.warn("failed to send SIGWINCH err={s}", .{@errorName(err)});
1147            };
1148        }
1149    }
1150
1151    pub fn handleWrite(self: *Daemon, client: *Client, payload: []const u8) !void {
1152        // Wire format: [u32 path len][path bytes][file content]
1153        if (payload.len < @sizeOf(u32)) return error.InvalidPayload;
1154        const path_len = std.mem.bytesToValue(u32, payload[0..@sizeOf(u32)]);
1155        if (payload.len < @sizeOf(u32) + path_len) return error.InvalidPayload;
1156        const file_path = payload[@sizeOf(u32)..][0..path_len];
1157        const file_content = payload[@sizeOf(u32) + path_len ..];
1158
1159        // Inject file creation through the PTY so it works over SSH.
1160        // Base64-encode content and pipe through printf | base64 -d > file.
1161        // Chunk large files to stay under command-line length limits.
1162        // 48000 is divisible by 3 (clean base64 boundaries) and encodes
1163        // to ~64KB, well under typical ARG_MAX.
1164        const chunk_size = 48000;
1165        var offset: usize = 0;
1166        var is_first = true;
1167
1168        while (offset < file_content.len or is_first) {
1169            const end = @min(offset + chunk_size, file_content.len);
1170            const chunk = file_content[offset..end];
1171
1172            const encoded_len = std.base64.standard.Encoder.calcSize(chunk.len);
1173            const encoded = try self.alloc.alloc(u8, encoded_len);
1174            defer self.alloc.free(encoded);
1175            _ = std.base64.standard.Encoder.encode(encoded, chunk);
1176
1177            self.queuePtyInput("printf '%s' '");
1178            self.queuePtyInput(encoded);
1179            if (is_first) {
1180                self.queuePtyInput("' | base64 -d > '");
1181            } else {
1182                self.queuePtyInput("' | base64 -d >> '");
1183            }
1184            self.queuePtyInput(file_path);
1185            self.queuePtyInput("'");
1186            self.queuePtyInput("\r");
1187
1188            offset = end;
1189            is_first = false;
1190        }
1191
1192        try ipc.appendMessage(self.alloc, &client.write_buf, .Ack, "");
1193        client.has_pending_output = true;
1194        self.has_had_client = true;
1195        std.log.debug(
1196            "write command len={d} file_path={s}",
1197            .{ file_content.len, file_path },
1198        );
1199    }
1200};
1201
1202fn printVersion(cfg: *Cfg) !void {
1203    var buf: [256]u8 = undefined;
1204    var w = std.fs.File.stdout().writer(&buf);
1205    var ver = version;
1206    if (builtin.mode == .Debug) {
1207        ver = git_sha;
1208    }
1209    try w.interface.print(
1210        "zmx\t\t{s}\nghostty_vt\t{s}\nsocket_dir\t{s}\nlog_dir\t\t{s}\n",
1211        .{ ver, ghostty_version, cfg.socket_dir, cfg.log_dir },
1212    );
1213    try w.interface.flush();
1214}
1215
1216fn printCompletions(shell: completions.Shell) !void {
1217    const script = shell.getCompletionScript();
1218    var buf: [8192]u8 = undefined;
1219    var w = std.fs.File.stdout().writer(&buf);
1220    try w.interface.print("{s}\n", .{script});
1221    try w.interface.flush();
1222}
1223
1224fn help() !void {
1225    const help_text =
1226        \\zmx - session persistence for terminal processes
1227        \\
1228        \\Usage: zmx <command> [args...]
1229        \\
1230        \\Commands:
1231        \\  [a]ttach <name> [command...]             Attach to session, creating if needed
1232        \\  [r]un <name> [-d] [command...]           Send command without attaching
1233        \\  [s]end <name> <text...>                  Send raw input to session PTY
1234        \\  [p]rint <name> <text...>                 Inject text into session display
1235        \\  [wr]ite <name> <file_path>               Write stdin to file_path through the session
1236        \\  [d]etach                                 Detach all clients (ctrl+\\ for current client)
1237        \\  [l]ist|ls [--short]                      List active sessions
1238        \\  [k]ill <name>... [--force]               Kill session and all attached clients
1239        \\  [hi]story <name> [--vt|--html]           Output session scrollback
1240        \\  [w]ait <name>...                         Wait for session tasks to complete
1241        \\  [t]ail <name>...                         Follow session output
1242        \\  [c]ompletions <shell>                    Shell completions (bash, zsh, fish)
1243        \\  [v]ersion                                Show version
1244        \\  [h]elp                                   Show this help
1245        \\
1246        \\Attach:
1247        \\  This will spawn a login $SHELL with a PTY.  You can provide a
1248        \\  command instead of creating a shell.
1249        \\
1250        \\  Examples:
1251        \\    zmx attach dev
1252        \\    zmx attach dev vim
1253        \\
1254        \\History:
1255        \\  This should generally be used with `tail` to print the last lines
1256        \\  of the session's scrollback history.
1257        \\
1258        \\  Examples:
1259        \\    zmx history <session> | tail -100
1260        \\
1261        \\Run:
1262        \\  Commands are passed as-is: do not wrap in quotes.
1263        \\  Commands run sequentially: do not send multiple in parallel.
1264        \\  Avoid interactive programs (pagers, editors, prompts): they hang.
1265        \\
1266        \\  If the command hangs, send Ctrl+C to recover:
1267        \\    zmx run <session> $(printf '\x03')
1268        \\
1269        \\  If the command hangs, print the history to see the error:
1270        \\    zmx history <session> | tail -100
1271        \\
1272        \\  `-d` will detach from the calling terminal. Use `wait` to track
1273        \\  its status.
1274        \\
1275        \\  Examples:
1276        \\    zmx run dev ls
1277        \\    zmx run dev zig build
1278        \\    zmx run dev grep -r TODO src
1279        \\    zmx run dev git -c core.pager=cat diff
1280        \\
1281        \\Send:
1282        \\  Sends raw text to the session's PTY input (fire-and-forget).
1283        \\  Unlike `run`, no completion marker is appended and no exit code
1284        \\  is tracked.  Useful for TUI applications, interactive prompts,
1285        \\  or any program that reads stdin directly.
1286        \\
1287        \\  Text is sent byte-for-byte with no automatic carriage return.
1288        \\  Append \r yourself when you want the shell to execute a command.
1289        \\
1290        \\  Text can also be piped via stdin:
1291        \\    printf 'ls -la\r' | zmx send dev
1292        \\
1293        \\  Examples:
1294        \\    printf 'echo hello\r' | zmx send dev
1295        \\    zmx send dev $(printf '\x03')
1296        \\    zmx send dev /compact
1297        \\
1298        \\Print:
1299        \\  Injects text directly into the session display and scrollback.
1300        \\  Never touches the PTY input -- the shell sees nothing.
1301        \\  Caller is responsible for newlines (\\r\\n).
1302        \\
1303        \\  Examples:
1304        \\    printf '\\r\\nhello\\r\\n' | zmx print dev
1305        \\    zmx print dev "$(printf '\\r\\nalert\\r\\n')"
1306        \\
1307        \\Write:
1308        \\  Writes stdin to file_path inside the session. Works over SSH.
1309        \\  file_path can be absolute or relative to the session shell's cwd.
1310        \\  Requires base64 and printf in the remote environment.
1311        \\  Large files are chunked automatically (~48KB per chunk).
1312        \\  File path must not contain single quotes.
1313        \\
1314        \\  Examples:
1315        \\    echo "hello" | zmx write dev /tmp/hello.txt
1316        \\    cat main.zig | zmx write dev src/main.zig
1317        \\
1318        \\Wait:
1319        \\  Used with a detached run task to track its status.  Multiple
1320        \\  sessions can be provided.
1321        \\
1322        \\  Examples:
1323        \\    zmx run -d dev sleep 10
1324        \\    zmx wait dev
1325        \\    zmx wait dev other
1326        \\
1327        \\Environment variables:
1328        \\  SHELL                Default shell for new sessions
1329        \\  ZMX_DIR              Socket directory (priority 1)
1330        \\  XDG_RUNTIME_DIR      Socket directory (priority 2)
1331        \\  TMPDIR               Socket directory (priority 3)
1332        \\  ZMX_SESSION          Session name (injected automatically)
1333        \\  ZMX_SESSION_PREFIX   Prefix added to all session names
1334        \\  ZMX_DIR_MODE         Sets mode for socket and log directories (octal, defaults to 0750)
1335        \\  ZMX_LOG_MODE         Sets mode for log files (octal, defaults to 0640)
1336        \\
1337    ;
1338    var buf: [8192]u8 = undefined;
1339    var w = std.fs.File.stdout().writer(&buf);
1340    try w.interface.print(help_text, .{});
1341    try w.interface.flush();
1342}
1343
1344fn tail(client_socket_fds: std.ArrayList(i32), detached: bool, is_run_cmd: bool) !u8 {
1345    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1346    defer _ = gpa.deinit();
1347    const alloc = gpa.allocator();
1348
1349    var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(alloc, 4);
1350    defer poll_fds.deinit(alloc);
1351
1352    var read_buf = try ipc.SocketBuffer.init(alloc);
1353    defer read_buf.deinit();
1354
1355    var stdout_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
1356    defer stdout_buf.deinit(alloc);
1357
1358    var is_first_line = true;
1359    var task_complete_code: ?u8 = null;
1360
1361    while (true) {
1362        poll_fds.clearRetainingCapacity();
1363
1364        // Poll socket for read
1365        for (client_socket_fds.items) |client_sock_fd| {
1366            try poll_fds.append(alloc, .{
1367                .fd = client_sock_fd,
1368                .events = posix.POLL.IN,
1369                .revents = 0,
1370            });
1371        }
1372
1373        // Poll for write if we have pending data
1374        if (stdout_buf.items.len > 0) {
1375            try poll_fds.append(alloc, .{
1376                .fd = posix.STDOUT_FILENO,
1377                .events = posix.POLL.OUT,
1378                .revents = 0,
1379            });
1380        }
1381
1382        _ = posix.poll(poll_fds.items, -1) catch |err| {
1383            if (err == error.Interrupted) continue; // EINTR from signal, loop again
1384            return err;
1385        };
1386
1387        // Handle socket read (incoming Output messages from daemon)
1388        for (poll_fds.items) |*poll_fd| {
1389            if (poll_fd.revents & posix.POLL.IN != 0) {
1390                const n = read_buf.read(poll_fd.fd) catch |err| {
1391                    if (err == error.WouldBlock) continue;
1392                    if (err == error.ConnectionResetByPeer or err == error.BrokenPipe) {
1393                        return 1;
1394                    }
1395                    std.log.err("daemon read err={s}", .{@errorName(err)});
1396                    return err;
1397                };
1398                if (n == 0) {
1399                    // Server closed connection
1400                    return 0;
1401                }
1402
1403                while (read_buf.next()) |msg| {
1404                    switch (msg.header.tag) {
1405                        .Ack => {
1406                            if (detached) {
1407                                _ = posix.write(posix.STDOUT_FILENO, "command sent!\n") catch |err| blk: {
1408                                    if (err == error.WouldBlock) break :blk 0;
1409                                    return err;
1410                                };
1411                                return 0;
1412                            }
1413                        },
1414                        .Output => {
1415                            if (msg.payload.len > 0) {
1416                                // strip the first line since it is an echo of
1417                                // the command.
1418                                if (!detached and is_run_cmd and is_first_line) {
1419                                    if (std.mem.indexOfScalar(u8, msg.payload, '\n')) |nl| {
1420                                        is_first_line = false;
1421                                        if (nl + 1 < msg.payload.len) {
1422                                            try stdout_buf.appendSlice(alloc, msg.payload[nl + 1 ..]);
1423                                        }
1424                                    }
1425                                } else {
1426                                    try stdout_buf.appendSlice(alloc, msg.payload);
1427                                }
1428                            }
1429                        },
1430                        .TaskComplete => {
1431                            task_complete_code = if (msg.payload.len > 0) msg.payload[0] else 0;
1432                        },
1433                        else => {},
1434                    }
1435                }
1436            }
1437        }
1438
1439        if (stdout_buf.items.len > 0) {
1440            const n = posix.write(posix.STDOUT_FILENO, stdout_buf.items) catch |err| blk: {
1441                if (err == error.WouldBlock) break :blk 0;
1442                return err;
1443            };
1444            if (task_complete_code) |exit_code| {
1445                return exit_code;
1446            }
1447            if (n > 0) {
1448                try stdout_buf.replaceRange(alloc, 0, n, &[_]u8{});
1449            }
1450        }
1451
1452        // Check for HUP/ERR on any socket
1453        for (poll_fds.items) |poll_fd| {
1454            if (poll_fd.revents & (posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
1455                return 0;
1456            }
1457        }
1458    }
1459}
1460
1461fn wait(cfg: *Cfg, matchers: std.ArrayList(SessionMatch)) !void {
1462    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1463    defer _ = gpa.deinit();
1464    const alloc = gpa.allocator();
1465
1466    var stdout_buffer: [1024]u8 = undefined;
1467    var stdout_writer = std.fs.File.stdout().writer(&stdout_buffer);
1468    const stdout = &stdout_writer.interface;
1469
1470    var stderr_buffer: [1024]u8 = undefined;
1471    var stderr_writer = std.fs.File.stderr().writer(&stderr_buffer);
1472    const stderr = &stderr_writer.interface;
1473
1474    // Highest match count seen so far. Lets us distinguish "sessions haven't
1475    // appeared yet" (keep polling) from "sessions we were tracking
1476    // disappeared" (fail -- daemon crashed or was killed).
1477    var max_seen: i32 = 0;
1478    var zero_match_iters: u32 = 0;
1479
1480    var agg_exit_code: u8 = 0;
1481    while (true) {
1482        agg_exit_code = 0;
1483        var sessions = try util.get_session_entries(alloc, cfg.socket_dir);
1484        var total: i32 = 0;
1485        var done: i32 = 0;
1486
1487        for (sessions.items) |session| {
1488            var found = false;
1489            for (matchers.items) |m| {
1490                if (m.matches(session.name)) {
1491                    found = true;
1492                    break;
1493                }
1494            }
1495            if (!found) {
1496                continue;
1497            }
1498
1499            total += 1;
1500            if (session.is_error) {
1501                // Daemon unreachable (probe timed out). On Timeout the socket
1502                // is no longer deleted, so this session would otherwise
1503                // persist as task_ended_at==0 forever → infinite "still
1504                // waiting". Count it as done+failed so wait terminates.
1505                try stderr.print(
1506                    "[{d}] task unreachable: {s} ({s})\n",
1507                    .{ std.time.timestamp(), session.name, session.error_name orelse "unknown" },
1508                );
1509                try stderr.flush();
1510                agg_exit_code = 1;
1511                done += 1;
1512                continue;
1513            }
1514            if (session.task_ended_at == 0) {
1515                try stdout.print(
1516                    "[{d}] waiting task={s}\n",
1517                    .{ std.time.timestamp(), session.name },
1518                );
1519                try stdout.flush();
1520                continue;
1521            }
1522            try stdout.print(
1523                "[{d}] completed task={s} exit_code={d}\n",
1524                .{ session.task_ended_at.?, session.name, session.task_exit_code.? },
1525            );
1526            try stdout.flush();
1527            if (session.task_exit_code != 0) {
1528                agg_exit_code = session.task_exit_code orelse 0;
1529            }
1530            done += 1;
1531        }
1532
1533        for (sessions.items) |session| {
1534            session.deinit(alloc);
1535        }
1536        sessions.deinit(alloc);
1537
1538        // Check disappearance BEFORE completion: if one of N sessions
1539        // crashed and the remaining N-1 happen to be done, total==done
1540        // would be a false success.
1541        if (total < max_seen) {
1542            try stderr.print(
1543                "error: {d} session(s) disappeared before completing\n",
1544                .{max_seen - total},
1545            );
1546            try stderr.flush();
1547            std.process.exit(1);
1548            return;
1549        }
1550        max_seen = total;
1551
1552        if (total > 0 and total == done) {
1553            break;
1554        }
1555
1556        if (max_seen == 0) {
1557            // `zmx run foo && zmx wait foo` is essentially sequential, so
1558            // matching sessions should be visible from the first poll. If
1559            // nothing appears after a few iterations it's almost certainly a
1560            // typo, not a slow start.
1561            zero_match_iters += 1;
1562            if (zero_match_iters >= 3) {
1563                try stderr.print("error: no matching sessions found\n", .{});
1564                try stderr.flush();
1565                std.process.exit(2);
1566                return;
1567            }
1568        }
1569
1570        std.Thread.sleep(1000 * std.time.ns_per_ms);
1571    }
1572
1573    if (agg_exit_code == 0) {
1574        try stdout.print("task(s) completed!\n", .{});
1575    } else {
1576        try stdout.print("task(s) failed!\n", .{});
1577    }
1578    try stdout.flush();
1579
1580    const sessions = try util.get_session_entries(alloc, cfg.socket_dir);
1581    for (sessions.items) |session| {
1582        var found = false;
1583        for (matchers.items) |m| {
1584            if (m.matches(session.name)) {
1585                found = true;
1586                break;
1587            }
1588        }
1589        if (!found) {
1590            continue;
1591        }
1592        if (session.task_exit_code.? > 0) {
1593            try stdout.print("---\n", .{});
1594            try stdout.print("[{d}] failed task={s} exit_status={d}\n\n", .{
1595                session.task_ended_at.?,
1596                session.name,
1597                session.task_exit_code.?,
1598            });
1599            try stdout.print("See the logs:\nzmx history {s}\nzmx attach {s}\n", .{ session.name, session.name });
1600            try stdout.flush();
1601        }
1602    }
1603
1604    std.process.exit(agg_exit_code);
1605}
1606
1607fn list(cfg: *Cfg, short: bool) !void {
1608    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1609    defer _ = gpa.deinit();
1610    const alloc = gpa.allocator();
1611
1612    const current_session = socket.getSeshNameFromEnv();
1613    var buf: [4096]u8 = undefined;
1614    var stdout = std.fs.File.stdout().writer(&buf);
1615
1616    var sessions = try util.get_session_entries(alloc, cfg.socket_dir);
1617    defer {
1618        for (sessions.items) |session| {
1619            session.deinit(alloc);
1620        }
1621        sessions.deinit(alloc);
1622    }
1623
1624    if (sessions.items.len == 0) {
1625        if (short) return;
1626        var errbuf: [4096]u8 = undefined;
1627        var stderr = std.fs.File.stderr().writer(&errbuf);
1628        try stderr.interface.print("no sessions found in {s}\n", .{cfg.socket_dir});
1629        try stderr.interface.flush();
1630        return;
1631    }
1632
1633    std.mem.sort(util.SessionEntry, sessions.items, {}, util.SessionEntry.lessThan);
1634
1635    for (sessions.items) |session| {
1636        try util.writeSessionLine(&stdout.interface, session, short, current_session);
1637        try stdout.interface.flush();
1638    }
1639}
1640
1641fn detachAll(cfg: *Cfg) !void {
1642    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1643    defer _ = gpa.deinit();
1644    const alloc = gpa.allocator();
1645    const session_name = socket.getSeshNameFromEnv();
1646    if (session_name.len == 0) {
1647        std.log.err("ZMX_SESSION env var not found: are you inside a zmx session?", .{});
1648        return;
1649    }
1650
1651    var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
1652    defer dir.close();
1653
1654    const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, session_name) catch |err| switch (err) {
1655        error.NameTooLong => return socket.printSessionNameTooLong(session_name, cfg.socket_dir),
1656        error.OutOfMemory => return err,
1657    };
1658    defer alloc.free(socket_path);
1659    const fd = ipc.connectSession(socket_path) catch |err| {
1660        std.log.err("session unresponsive: {s}", .{@errorName(err)});
1661        if (err == error.ConnectionRefused) socket.cleanupStaleSocket(dir, session_name);
1662        return;
1663    };
1664    defer posix.close(fd);
1665    ipc.send(fd, .DetachAll, "") catch |err| switch (err) {
1666        error.BrokenPipe, error.ConnectionResetByPeer => return,
1667        else => return err,
1668    };
1669}
1670
1671fn kill(cfg: *Cfg, session_name: []const u8, force: bool) !void {
1672    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1673    defer _ = gpa.deinit();
1674    const alloc = gpa.allocator();
1675
1676    const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, session_name) catch |err| switch (err) {
1677        error.NameTooLong => return socket.printSessionNameTooLong(session_name, cfg.socket_dir),
1678        error.OutOfMemory => return err,
1679    };
1680    defer alloc.free(socket_path);
1681
1682    var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
1683    defer dir.close();
1684
1685    const exists = try socket.sessionExists(dir, session_name);
1686    if (!exists) {
1687        var buf: [4096]u8 = undefined;
1688        var w = std.fs.File.stderr().writer(&buf);
1689        w.interface.print("error: session \"{s}\" does not exist\n", .{session_name}) catch {};
1690        w.interface.flush() catch {};
1691        return error.SessionNotFound;
1692    }
1693    const fd = ipc.connectSession(socket_path) catch |err| {
1694        std.log.err("session unresponsive: {s}", .{@errorName(err)});
1695        var buf: [4096]u8 = undefined;
1696        var w = std.fs.File.stdout().writer(&buf);
1697        if (force or err == error.ConnectionRefused) {
1698            socket.cleanupStaleSocket(dir, session_name);
1699            w.interface.print("cleaned up stale session {s}\n", .{session_name}) catch {};
1700        } else {
1701            w.interface.print(
1702                "session {s} is unresponsive ({s})\ndaemon may be busy: try again, add `--force` flag, or kill the process directly\n",
1703                .{ session_name, @errorName(err) },
1704            ) catch {};
1705        }
1706        w.interface.flush() catch {};
1707        return;
1708    };
1709
1710    defer posix.close(fd);
1711    ipc.send(fd, .Kill, "") catch |err| switch (err) {
1712        error.BrokenPipe, error.ConnectionResetByPeer => return,
1713        else => return err,
1714    };
1715
1716    var buf: [100]u8 = undefined;
1717    var w = std.fs.File.stdout().writer(&buf);
1718    try w.interface.print("killed session {s}\n", .{session_name});
1719    try w.interface.flush();
1720}
1721
1722fn history(cfg: *Cfg, session_name: []const u8, format: util.HistoryFormat) !void {
1723    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1724    defer _ = gpa.deinit();
1725    const alloc = gpa.allocator();
1726
1727    const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, session_name) catch |err| switch (err) {
1728        error.NameTooLong => return socket.printSessionNameTooLong(session_name, cfg.socket_dir),
1729        error.OutOfMemory => return err,
1730    };
1731    defer alloc.free(socket_path);
1732
1733    var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
1734    defer dir.close();
1735
1736    const exists = try socket.sessionExists(dir, session_name);
1737    if (!exists) {
1738        var buf: [4096]u8 = undefined;
1739        var w = std.fs.File.stderr().writer(&buf);
1740        w.interface.print("error: session \"{s}\" does not exist\n", .{session_name}) catch {};
1741        w.interface.flush() catch {};
1742        return error.SessionNotFound;
1743    }
1744    const fd = ipc.connectSession(socket_path) catch |err| {
1745        std.log.err("session unresponsive: {s}", .{@errorName(err)});
1746        if (err == error.ConnectionRefused) socket.cleanupStaleSocket(dir, session_name);
1747        return;
1748    };
1749    defer posix.close(fd);
1750
1751    const format_byte = [_]u8{@intFromEnum(format)};
1752    ipc.send(fd, .History, &format_byte) catch |err| switch (err) {
1753        error.BrokenPipe, error.ConnectionResetByPeer => return,
1754        else => return err,
1755    };
1756
1757    var sb = try ipc.SocketBuffer.init(alloc);
1758    defer sb.deinit();
1759
1760    while (true) {
1761        var poll_fds = [_]posix.pollfd{.{ .fd = fd, .events = posix.POLL.IN, .revents = 0 }};
1762        const poll_result = posix.poll(&poll_fds, 5000) catch return;
1763        if (poll_result == 0) {
1764            std.log.err("timeout waiting for history response", .{});
1765            return;
1766        }
1767
1768        const n = sb.read(fd) catch return;
1769        if (n == 0) return;
1770
1771        while (sb.next()) |msg| {
1772            if (msg.header.tag == .History) {
1773                _ = posix.write(posix.STDOUT_FILENO, msg.payload) catch return;
1774                return;
1775            }
1776        }
1777    }
1778}
1779
1780fn switchSesh(daemon: *Daemon, current_sesh: []const u8) !void {
1781    // we want daemon.session_name because that's the session name the user provided during zmx attach
1782    // instead of the name of the session they are currently inside of.
1783    const next_session = daemon.session_name;
1784
1785    const socket_path = socket.getSocketPath(daemon.alloc, daemon.cfg.socket_dir, current_sesh) catch |err| switch (err) {
1786        error.NameTooLong => return socket.printSessionNameTooLong(current_sesh, daemon.cfg.socket_dir),
1787        error.OutOfMemory => return err,
1788    };
1789    defer daemon.alloc.free(socket_path);
1790
1791    var dir = try std.fs.openDirAbsolute(daemon.cfg.socket_dir, .{});
1792    defer dir.close();
1793
1794    const exists = try socket.sessionExists(dir, current_sesh);
1795    if (!exists) {
1796        var buf: [4096]u8 = undefined;
1797        var w = std.fs.File.stderr().writer(&buf);
1798        w.interface.print("error: session \"{s}\" does not exist\n", .{current_sesh}) catch {};
1799        w.interface.flush() catch {};
1800        return error.SessionNotFound;
1801    }
1802    const fd = ipc.connectSession(socket_path) catch |err| {
1803        std.log.err("session unresponsive: {s}", .{@errorName(err)});
1804        if (err == error.ConnectionRefused) socket.cleanupStaleSocket(dir, current_sesh);
1805        return;
1806    };
1807    defer posix.close(fd);
1808
1809    ipc.send(fd, .Switch, next_session) catch |err| switch (err) {
1810        error.BrokenPipe, error.ConnectionResetByPeer => return,
1811        else => return err,
1812    };
1813}
1814
1815fn attach(daemon: *Daemon) !void {
1816    const sesh = socket.getSeshNameFromEnv();
1817    if (sesh.len > 0) {
1818        return switchSesh(daemon, sesh);
1819    }
1820
1821    const result = try daemon.ensureSession();
1822    if (result.is_daemon) return;
1823
1824    const client_sock = try socket.sessionConnect(daemon.socket_path);
1825    std.log.info("attached session={s}", .{daemon.session_name});
1826    //  This is typically used with tcsetattr() to modify terminal settings.
1827    //      - you first get the current settings with tcgetattr()
1828    //      - modify the desired attributes in the termios structure
1829    //      - then apply the changes with tcsetattr().
1830    //  This prevents unintended side effects by preserving other settings.
1831    // restore stdin fd to its original state after exiting.
1832    // Use TCSAFLUSH to discard any unread input, preventing stale input after detach.
1833    //
1834    // tcgetattr fails when stdin is not a TTY (e.g. piped). In that case,
1835    // skip terminal setup entirely rather than applying undefined stack bytes
1836    // via tcsetattr.
1837    var orig_termios: cross.c.termios = undefined;
1838    const stdin_is_tty = cross.c.tcgetattr(posix.STDIN_FILENO, &orig_termios) == 0;
1839
1840    defer {
1841        if (stdin_is_tty) {
1842            _ = cross.c.tcsetattr(posix.STDIN_FILENO, cross.c.TCSAFLUSH, &orig_termios);
1843        }
1844        // Reset terminal modes on detach:
1845        const restore_seq = "\x1bc";
1846        _ = posix.write(posix.STDOUT_FILENO, restore_seq) catch {};
1847    }
1848
1849    if (stdin_is_tty) {
1850        var raw_termios = orig_termios;
1851        //  set raw mode after successful connection.
1852        //      disables canonical mode (line buffering), input echoing, signal generation from
1853        //      control characters (like Ctrl+C), and flow control.
1854        cross.c.cfmakeraw(&raw_termios);
1855
1856        // Additional granular raw mode settings for precise control
1857        // (matches what abduco and shpool do)
1858        raw_termios.c_cc[cross.c.VLNEXT] = cross.c._POSIX_VDISABLE; // Disable literal-next (Ctrl-V)
1859        // We want to intercept Ctrl+\ (SIGQUIT) so we can use it as a detach key
1860        raw_termios.c_cc[cross.c.VQUIT] = cross.c._POSIX_VDISABLE; // Disable SIGQUIT (Ctrl+\)
1861        raw_termios.c_cc[cross.c.VMIN] = 1; // Minimum chars to read: return after 1 byte
1862        raw_termios.c_cc[cross.c.VTIME] = 0; // Read timeout: no timeout, return immediately
1863
1864        _ = cross.c.tcsetattr(posix.STDIN_FILENO, cross.c.TCSANOW, &raw_termios);
1865    }
1866
1867    // Clear screen before attaching. This provides a clean slate before
1868    // the session restore.
1869    const clear_seq = "\x1b[2J\x1b[H";
1870    _ = try posix.write(posix.STDOUT_FILENO, clear_seq);
1871
1872    const looper = try clientLoop(client_sock);
1873    switch (looper.kind) {
1874        .detach => return,
1875        .switch_session => {
1876            if (looper.session_name) |session_name| {
1877                var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
1878                const cwd = std.posix.getcwd(&cwd_buf) catch "";
1879                const target_path = socket.getSocketPath(
1880                    daemon.alloc,
1881                    daemon.cfg.socket_dir,
1882                    session_name,
1883                ) catch |err| switch (err) {
1884                    error.NameTooLong => return socket.printSessionNameTooLong(
1885                        session_name,
1886                        daemon.cfg.socket_dir,
1887                    ),
1888                    error.OutOfMemory => return err,
1889                };
1890
1891                const clients = try std.ArrayList(*Client).initCapacity(daemon.alloc, 10);
1892                var target_daemon = Daemon{
1893                    .running = true,
1894                    .cfg = daemon.cfg,
1895                    .alloc = daemon.alloc,
1896                    .clients = clients,
1897                    .session_name = session_name,
1898                    .socket_path = target_path,
1899                    .pid = undefined,
1900                    .cwd = cwd,
1901                    .created_at = @intCast(std.time.timestamp()),
1902                    .leader_client_fd = null,
1903                };
1904                return attach(&target_daemon);
1905            }
1906        },
1907    }
1908}
1909
1910fn writeFile(daemon: *Daemon, file_path: []const u8) !void {
1911    var buf: [4096]u8 = undefined;
1912    var w = std.fs.File.stdout().writer(&buf);
1913    const sesh_result = try daemon.ensureSession();
1914    if (sesh_result.is_daemon) return;
1915
1916    if (sesh_result.created) {
1917        try w.interface.print("session \"{s}\" created\n", .{daemon.session_name});
1918        try w.interface.flush();
1919    }
1920    const stdin_fd = posix.STDIN_FILENO;
1921    var stdin_buf = try std.ArrayList(u8).initCapacity(daemon.alloc, 4096);
1922    defer stdin_buf.deinit(daemon.alloc);
1923
1924    while (true) {
1925        var tmp: [4096]u8 = undefined;
1926        const n = posix.read(stdin_fd, &tmp) catch |err| {
1927            if (err == error.WouldBlock) break;
1928            return err;
1929        };
1930        if (n == 0) break;
1931        try stdin_buf.appendSlice(daemon.alloc, tmp[0..n]);
1932    }
1933
1934    const socket_path = socket.getSocketPath(
1935        daemon.alloc,
1936        daemon.cfg.socket_dir,
1937        daemon.session_name,
1938    ) catch |err| switch (err) {
1939        error.NameTooLong => return socket.printSessionNameTooLong(
1940            daemon.session_name,
1941            daemon.cfg.socket_dir,
1942        ),
1943        error.OutOfMemory => return err,
1944    };
1945    var dir = try std.fs.openDirAbsolute(daemon.cfg.socket_dir, .{});
1946    defer dir.close();
1947
1948    const result = ipc.probeSession(daemon.alloc, socket_path) catch |err| {
1949        std.log.err("session unresponsive: {s}", .{@errorName(err)});
1950        if (err == error.ConnectionRefused) {
1951            socket.cleanupStaleSocket(dir, daemon.session_name);
1952            w.interface.print("cleaned up stale session {s}\n", .{daemon.session_name}) catch {};
1953        } else {
1954            w.interface.print(
1955                "session {s} is unresponsive ({s})\ndaemon may be busy: try again\n",
1956                .{ daemon.session_name, @errorName(err) },
1957            ) catch {};
1958        }
1959        w.interface.flush() catch {};
1960        return;
1961    };
1962
1963    defer posix.close(result.fd);
1964
1965    // Build wire payload: [u32 path len][path bytes][file content]
1966    var wire_buf = try std.ArrayList(u8).initCapacity(
1967        daemon.alloc,
1968        @sizeOf(u32) + file_path.len + stdin_buf.items.len,
1969    );
1970    defer wire_buf.deinit(daemon.alloc);
1971    const path_len: u32 = @intCast(file_path.len);
1972    try wire_buf.appendSlice(daemon.alloc, std.mem.asBytes(&path_len));
1973    try wire_buf.appendSlice(daemon.alloc, file_path);
1974    try wire_buf.appendSlice(daemon.alloc, stdin_buf.items);
1975
1976    ipc.send(result.fd, .Write, wire_buf.items) catch |err| switch (err) {
1977        error.BrokenPipe, error.ConnectionResetByPeer => return,
1978        else => return err,
1979    };
1980
1981    var sb = try ipc.SocketBuffer.init(daemon.alloc);
1982    defer sb.deinit();
1983
1984    const n = sb.read(result.fd) catch return error.ReadFailed;
1985    if (n == 0) return error.ConnectionClosed;
1986
1987    while (sb.next()) |msg| {
1988        if (msg.header.tag == .Ack) {
1989            try w.interface.print("file created {s}\n", .{file_path});
1990            try w.interface.flush();
1991            return;
1992        }
1993    }
1994
1995    return error.NoAckReceived;
1996}
1997
1998fn send(cfg: *Cfg, session_name: []const u8, socket_path: []const u8, text_parts: [][]const u8, tag: ipc.Tag) !void {
1999    const alloc = std.heap.c_allocator;
2000    var buf: [4096]u8 = undefined;
2001    var w = std.fs.File.stdout().writer(&buf);
2002
2003    var payload = std.ArrayList(u8).empty;
2004    defer payload.deinit(alloc);
2005
2006    if (text_parts.len > 0) {
2007        for (text_parts, 0..) |part, i| {
2008            if (i > 0) try payload.append(alloc, ' ');
2009            try payload.appendSlice(alloc, part);
2010        }
2011    } else {
2012        // Read from stdin when no text arguments provided.
2013        const stdin_fd = posix.STDIN_FILENO;
2014        if (!std.posix.isatty(stdin_fd)) {
2015            while (true) {
2016                var tmp: [4096]u8 = undefined;
2017                const n = posix.read(stdin_fd, &tmp) catch |err| {
2018                    if (err == error.WouldBlock) break;
2019                    return err;
2020                };
2021                if (n == 0) break;
2022                try payload.appendSlice(alloc, tmp[0..n]);
2023            }
2024            // Strip trailing newline from piped input; the caller is
2025            // responsible for including \r when submission is desired.
2026            // For .Output the caller controls exact bytes, so don't strip.
2027            if (tag != .Output and payload.items.len > 0 and payload.items[payload.items.len - 1] == '\n') {
2028                _ = payload.pop();
2029            }
2030        }
2031    }
2032
2033    if (payload.items.len == 0) return error.TextRequired;
2034
2035    var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
2036    defer dir.close();
2037
2038    const probe_result = ipc.probeSession(alloc, socket_path) catch |err| {
2039        std.log.err("session unresponsive: {s}", .{@errorName(err)});
2040        if (err == error.ConnectionRefused) {
2041            socket.cleanupStaleSocket(dir, session_name);
2042            try w.interface.print("cleaned up stale session {s}\n", .{session_name});
2043        } else {
2044            try w.interface.print(
2045                "session {s} is unresponsive ({s})\ndaemon may be busy: try again\n",
2046                .{ session_name, @errorName(err) },
2047            );
2048        }
2049        try w.interface.flush();
2050        return;
2051    };
2052    defer posix.close(probe_result.fd);
2053
2054    ipc.send(probe_result.fd, tag, payload.items) catch |err| switch (err) {
2055        error.ConnectionResetByPeer, error.BrokenPipe => return,
2056        else => return err,
2057    };
2058}
2059
2060fn run(daemon: *Daemon, detached: bool, command_args: [][]const u8) !void {
2061    const alloc = daemon.alloc;
2062    var buf: [4096]u8 = undefined;
2063    var w = std.fs.File.stdout().writer(&buf);
2064
2065    var cmd_to_send: ?[]const u8 = null;
2066    var allocated_cmd: ?[]u8 = null;
2067    defer if (allocated_cmd) |cmd| alloc.free(cmd);
2068
2069    const result = try daemon.ensureSession();
2070    if (result.is_daemon) return;
2071
2072    if (result.created) {
2073        try w.interface.print("session \"{s}\" created\n", .{daemon.session_name});
2074        try w.interface.flush();
2075    }
2076
2077    if (command_args.len > 0) {
2078        var cmd_list = std.ArrayList(u8).empty;
2079        defer cmd_list.deinit(alloc);
2080
2081        for (command_args, 0..) |arg, i| {
2082            if (i > 0) try cmd_list.append(alloc, ' ');
2083            if (util.shellNeedsQuoting(arg)) {
2084                const quoted = try util.shellQuote(alloc, arg);
2085                defer alloc.free(quoted);
2086                try cmd_list.appendSlice(alloc, quoted);
2087            } else {
2088                try cmd_list.appendSlice(alloc, arg);
2089            }
2090        }
2091
2092        // \r, not \n: once the shell is at the readline prompt the PTY is in
2093        // raw mode; readline's accept-line binds to CR. The first-ever run
2094        // works with \n only because it arrives during shell startup while
2095        // the line discipline is still canonical.
2096        try cmd_list.append(alloc, '\r');
2097
2098        cmd_to_send = try cmd_list.toOwnedSlice(alloc);
2099        allocated_cmd = @constCast(cmd_to_send.?);
2100    } else {
2101        const stdin_fd = posix.STDIN_FILENO;
2102        if (!std.posix.isatty(stdin_fd)) {
2103            var stdin_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
2104            defer stdin_buf.deinit(alloc);
2105
2106            while (true) {
2107                var tmp: [4096]u8 = undefined;
2108                const n = posix.read(stdin_fd, &tmp) catch |err| {
2109                    if (err == error.WouldBlock) break;
2110                    return err;
2111                };
2112                if (n == 0) break;
2113                try stdin_buf.appendSlice(alloc, tmp[0..n]);
2114            }
2115
2116            if (stdin_buf.items.len > 0) {
2117                // Normalize any trailing newline to CR so readline (raw mode)
2118                // accepts each line.
2119                if (stdin_buf.items[stdin_buf.items.len - 1] == '\n') {
2120                    stdin_buf.items[stdin_buf.items.len - 1] = '\r';
2121                } else {
2122                    try stdin_buf.append(alloc, '\r');
2123                }
2124
2125                cmd_to_send = try alloc.dupe(u8, stdin_buf.items);
2126                allocated_cmd = @constCast(cmd_to_send.?);
2127            }
2128        }
2129    }
2130
2131    if (cmd_to_send == null) {
2132        return error.CommandRequired;
2133    }
2134
2135    const client_sock = ipc.connectSession(daemon.socket_path) catch |err| {
2136        std.log.err("session not ready: {s}", .{@errorName(err)});
2137        return error.SessionNotReady;
2138    };
2139    defer posix.close(client_sock);
2140
2141    var fds = try std.ArrayList(i32).initCapacity(alloc, 1);
2142    defer fds.deinit(alloc);
2143    try fds.append(alloc, client_sock);
2144
2145    ipc.send(client_sock, .Run, cmd_to_send.?) catch |err| switch (err) {
2146        error.ConnectionResetByPeer, error.BrokenPipe => return,
2147        else => return err,
2148    };
2149
2150    const exit_code = try tail(fds, detached, true);
2151    posix.exit(exit_code);
2152}
2153
2154const ClientResult = struct {
2155    kind: enum {
2156        detach,
2157        switch_session,
2158    },
2159    session_name: ?[]const u8,
2160};
2161
2162/// clientLoop sends ipc commands to its corresponding daemon.  It uses poll() as its non-blocking
2163/// mechanism. It will send stdin to the daemon and receive stdout from the daemon.
2164fn clientLoop(client_sock_fd: i32) !ClientResult {
2165    // use c_allocator to avoid "reached unreachable code" panic in DebugAllocator when forking
2166    const alloc = std.heap.c_allocator;
2167    defer posix.close(client_sock_fd);
2168
2169    try openSignalPipe();
2170    installWakeHandler(posix.SIG.WINCH);
2171
2172    // Make socket non-blocking to avoid blocking on writes
2173    var sock_flags = try posix.fcntl(client_sock_fd, posix.F.GETFL, 0);
2174    sock_flags |= O_NONBLOCK;
2175    _ = try posix.fcntl(client_sock_fd, posix.F.SETFL, sock_flags);
2176
2177    // Buffer for outgoing socket writes
2178    var sock_write_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
2179    defer sock_write_buf.deinit(alloc);
2180
2181    // Send init message with terminal size (buffered)
2182    const size = ipc.getTerminalSize(posix.STDOUT_FILENO);
2183    try ipc.appendMessage(alloc, &sock_write_buf, .Init, std.mem.asBytes(&size));
2184
2185    var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(alloc, 4);
2186    defer poll_fds.deinit(alloc);
2187
2188    var read_buf = try ipc.SocketBuffer.init(alloc);
2189    defer read_buf.deinit();
2190
2191    var stdout_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
2192    defer stdout_buf.deinit(alloc);
2193
2194    const stdin_fd = posix.STDIN_FILENO;
2195
2196    // Make stdin non-blocking. O_NONBLOCK is set on the open file description,
2197    // which is shared with the parent shell; restore on exit to avoid
2198    // corrupting the parent's stdin.
2199    const stdin_orig_flags = try posix.fcntl(stdin_fd, posix.F.GETFL, 0);
2200    _ = try posix.fcntl(stdin_fd, posix.F.SETFL, stdin_orig_flags | O_NONBLOCK);
2201    defer _ = posix.fcntl(stdin_fd, posix.F.SETFL, stdin_orig_flags) catch {};
2202
2203    while (true) {
2204        poll_fds.clearRetainingCapacity();
2205
2206        try poll_fds.append(alloc, .{
2207            .fd = stdin_fd,
2208            .events = posix.POLL.IN,
2209            .revents = 0,
2210        });
2211
2212        // Poll socket for read, and also for write if we have pending data
2213        var sock_events: i16 = posix.POLL.IN;
2214        if (sock_write_buf.items.len > 0) {
2215            sock_events |= posix.POLL.OUT;
2216        }
2217        try poll_fds.append(alloc, .{
2218            .fd = client_sock_fd,
2219            .events = sock_events,
2220            .revents = 0,
2221        });
2222
2223        try poll_fds.append(alloc, .{ .fd = sig_pipe[0], .events = posix.POLL.IN, .revents = 0 });
2224
2225        if (stdout_buf.items.len > 0) {
2226            try poll_fds.append(alloc, .{
2227                .fd = posix.STDOUT_FILENO,
2228                .events = posix.POLL.OUT,
2229                .revents = 0,
2230            });
2231        }
2232
2233        _ = try posix.poll(poll_fds.items, -1);
2234
2235        if (poll_fds.items[2].revents & posix.POLL.IN != 0) {
2236            drainSignalPipe();
2237            const next_size = ipc.getTerminalSize(posix.STDOUT_FILENO);
2238            try ipc.appendMessage(alloc, &sock_write_buf, .Resize, std.mem.asBytes(&next_size));
2239        }
2240
2241        // Handle stdin -> socket (Input)
2242        const inp_flags = (posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL);
2243        if (poll_fds.items[0].revents & inp_flags != 0) {
2244            var buf: [4096]u8 = undefined;
2245            const n_opt: ?usize = posix.read(stdin_fd, &buf) catch |err| blk: {
2246                if (err == error.WouldBlock) break :blk null;
2247                return err;
2248            };
2249
2250            if (n_opt) |n| {
2251                if (n > 0) {
2252                    // Check for detach sequences (ctrl+\ as first byte or Kitty escape sequence)
2253                    if (util.isCtrlBackslash(buf[0..n])) {
2254                        try ipc.appendMessage(alloc, &sock_write_buf, .Detach, "");
2255                    } else {
2256                        try ipc.appendMessage(alloc, &sock_write_buf, .Input, buf[0..n]);
2257                    }
2258                } else {
2259                    // EOF on stdin
2260                    return ClientResult{ .kind = .detach, .session_name = null };
2261                }
2262            }
2263        }
2264
2265        // Handle socket read (incoming Output messages from daemon)
2266        if (poll_fds.items[1].revents & posix.POLL.IN != 0) {
2267            const n = read_buf.read(client_sock_fd) catch |err| {
2268                if (err == error.WouldBlock) continue;
2269                if (err == error.ConnectionResetByPeer or err == error.BrokenPipe) {
2270                    return ClientResult{ .kind = .detach, .session_name = null };
2271                }
2272                std.log.err("daemon read err={s}", .{@errorName(err)});
2273                return err;
2274            };
2275            if (n == 0) {
2276                // Server closed connection
2277                return ClientResult{ .kind = .detach, .session_name = null };
2278            }
2279
2280            while (read_buf.next()) |msg| {
2281                switch (msg.header.tag) {
2282                    .Output => {
2283                        if (msg.payload.len > 0) {
2284                            try stdout_buf.appendSlice(alloc, msg.payload);
2285                        }
2286                    },
2287                    .Resize => {
2288                        // daemon is asking for the client's window size usually in response
2289                        // to this client being set as leader.
2290                        const next_size = ipc.getTerminalSize(posix.STDOUT_FILENO);
2291                        try ipc.appendMessage(
2292                            alloc,
2293                            &sock_write_buf,
2294                            .Resize,
2295                            std.mem.asBytes(&next_size),
2296                        );
2297                    },
2298                    .Switch => {
2299                        return ClientResult{ .kind = .switch_session, .session_name = try alloc.dupe(u8, msg.payload) };
2300                    },
2301                    else => {},
2302                }
2303            }
2304        }
2305
2306        // Handle socket write (flush buffered messages to daemon)
2307        if (poll_fds.items[1].revents & posix.POLL.OUT != 0) {
2308            if (sock_write_buf.items.len > 0) {
2309                const n = posix.write(client_sock_fd, sock_write_buf.items) catch |err| blk: {
2310                    if (err == error.WouldBlock) break :blk 0;
2311                    if (err == error.ConnectionResetByPeer or err == error.BrokenPipe) {
2312                        return ClientResult{ .kind = .detach, .session_name = null };
2313                    }
2314                    return err;
2315                };
2316                if (n > 0) {
2317                    try sock_write_buf.replaceRange(alloc, 0, n, &[_]u8{});
2318                }
2319            }
2320        }
2321
2322        if (stdout_buf.items.len > 0) {
2323            const n = posix.write(posix.STDOUT_FILENO, stdout_buf.items) catch |err| blk: {
2324                if (err == error.WouldBlock) break :blk 0;
2325                return err;
2326            };
2327            if (n > 0) {
2328                try stdout_buf.replaceRange(alloc, 0, n, &[_]u8{});
2329            }
2330        }
2331
2332        if (poll_fds.items[1].revents & (posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
2333            return ClientResult{ .kind = .detach, .session_name = null };
2334        }
2335    }
2336}
2337
2338/// dameonLoop is what the daemon runs to send and receive ipc commands from its corresponding
2339/// clients.  It uses poll() as its non-blocking mechanism.
2340fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
2341    std.log.info("daemon started session={s} pty_fd={d}", .{ daemon.session_name, pty_fd });
2342    daemon.pty_fd = pty_fd;
2343    try openSignalPipe();
2344    installWakeHandler(posix.SIG.TERM);
2345    var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(daemon.alloc, 8);
2346    defer poll_fds.deinit(daemon.alloc);
2347
2348    const init_size = ipc.getTerminalSize(pty_fd);
2349    var term = try ghostty_vt.Terminal.init(daemon.alloc, .{
2350        .cols = init_size.cols,
2351        .rows = init_size.rows,
2352        .max_scrollback = daemon.cfg.max_scrollback,
2353    });
2354    defer term.deinit(daemon.alloc);
2355    var vt_stream = term.vtStream();
2356    defer vt_stream.deinit();
2357
2358    daemon_loop: while (daemon.running) {
2359        poll_fds.clearRetainingCapacity();
2360
2361        try poll_fds.append(daemon.alloc, .{
2362            .fd = server_sock_fd,
2363            .events = posix.POLL.IN,
2364            .revents = 0,
2365        });
2366
2367        var pty_events: i16 = posix.POLL.IN;
2368        if (daemon.pty_write_buf.items.len > 0) {
2369            pty_events |= posix.POLL.OUT;
2370        }
2371        try poll_fds.append(daemon.alloc, .{
2372            .fd = pty_fd,
2373            .events = pty_events,
2374            .revents = 0,
2375        });
2376
2377        try poll_fds.append(daemon.alloc, .{ .fd = sig_pipe[0], .events = posix.POLL.IN, .revents = 0 });
2378
2379        for (daemon.clients.items) |client| {
2380            var events: i16 = posix.POLL.IN;
2381            if (client.has_pending_output) {
2382                events |= posix.POLL.OUT;
2383            }
2384            try poll_fds.append(daemon.alloc, .{
2385                .fd = client.socket_fd,
2386                .events = events,
2387                .revents = 0,
2388            });
2389        }
2390
2391        _ = try posix.poll(poll_fds.items, -1);
2392
2393        if (poll_fds.items[2].revents & posix.POLL.IN != 0) {
2394            drainSignalPipe();
2395            std.log.info(
2396                "SIGTERM received, shutting down gracefully session={s}",
2397                .{daemon.session_name},
2398            );
2399            break :daemon_loop;
2400        }
2401
2402        if (poll_fds.items[0].revents & (posix.POLL.ERR | posix.POLL.HUP | posix.POLL.NVAL) != 0) {
2403            std.log.err("server socket error revents={d}", .{poll_fds.items[0].revents});
2404            break :daemon_loop;
2405        } else if (poll_fds.items[0].revents & posix.POLL.IN != 0) {
2406            const client_fd = try posix.accept(
2407                server_sock_fd,
2408                null,
2409                null,
2410                posix.SOCK.NONBLOCK | posix.SOCK.CLOEXEC,
2411            );
2412            const client = try daemon.alloc.create(Client);
2413            client.* = Client{
2414                .alloc = daemon.alloc,
2415                .socket_fd = client_fd,
2416                .read_buf = try ipc.SocketBuffer.init(daemon.alloc),
2417                .write_buf = undefined,
2418            };
2419            client.write_buf = try std.ArrayList(u8).initCapacity(client.alloc, 4096);
2420            try daemon.clients.append(daemon.alloc, client);
2421            std.log.info(
2422                "client connected fd={d} total={d}",
2423                .{ client_fd, daemon.clients.items.len },
2424            );
2425        }
2426
2427        const inp_flags = posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL;
2428        if (poll_fds.items[1].revents & inp_flags != 0) {
2429            // Read from PTY
2430            var buf: [4096]u8 = undefined;
2431            const n_opt: ?usize = posix.read(pty_fd, &buf) catch |err| blk: {
2432                if (err == error.WouldBlock) break :blk null;
2433                break :blk 0;
2434            };
2435
2436            if (n_opt) |n| {
2437                if (n == 0) {
2438                    // EOF: Shell exited
2439                    std.log.info("shell exited pty_fd={d}", .{pty_fd});
2440                    break :daemon_loop;
2441                } else {
2442                    // Feed PTY output to terminal emulator for state tracking
2443                    vt_stream.nextSlice(buf[0..n]);
2444                    daemon.has_pty_output = true;
2445
2446                    // When no real terminal client has attached yet, respond to
2447                    // terminal queries (e.g. DA1/DA2) on behalf of the terminal.
2448                    // This prevents fish from waiting 10s for unanswered queries.
2449                    // `has_terminal_client` is only set when a client sends .Init
2450                    // (a real zmx attach), not when a `zmx run` tail-only client
2451                    // connects.
2452                    if (!daemon.has_terminal_client and
2453                        daemon.pty_write_buf.items.len < Daemon.PTY_WRITE_BUF_MAX)
2454                    {
2455                        util.respondToDeviceAttributes(daemon.alloc, &daemon.pty_write_buf, buf[0..n]);
2456                    }
2457
2458                    // In run mode, scan output for exit code marker
2459                    if (daemon.is_task_mode and daemon.task_exit_code == null) {
2460                        if (util.findTaskExitMarker(buf[0..n])) |exit_code| {
2461                            daemon.task_exit_code = exit_code;
2462                            daemon.task_ended_at = @intCast(std.time.timestamp());
2463
2464                            std.log.info("task completed exit_code={d}", .{exit_code});
2465
2466                            // Notify connected clients
2467                            for (daemon.clients.items) |c| {
2468                                ipc.appendMessage(daemon.alloc, &c.write_buf, .TaskComplete, &[_]u8{exit_code}) catch {};
2469                                c.has_pending_output = true;
2470                            }
2471                        }
2472                    }
2473
2474                    // Broadcast data to all clients.
2475                    // Rewrite OSC 133;A to include redraw=0 so the outer terminal
2476                    // does not clear prompt lines on resize (issue #111).
2477                    const broadcast_data = util.rewritePromptRedraw(daemon.alloc, buf[0..n]) orelse buf[0..n];
2478                    defer if (broadcast_data.ptr != buf[0..n].ptr) daemon.alloc.free(broadcast_data);
2479                    for (daemon.clients.items) |client| {
2480                        ipc.appendMessage(daemon.alloc, &client.write_buf, .Output, broadcast_data) catch |err| {
2481                            std.log.warn(
2482                                "failed to buffer output for client err={s}",
2483                                .{@errorName(err)},
2484                            );
2485                            continue;
2486                        };
2487                        client.has_pending_output = true;
2488                    }
2489                }
2490            }
2491        }
2492
2493        if (poll_fds.items[1].revents & posix.POLL.OUT != 0) {
2494            while (daemon.pty_write_buf.items.len > 0) {
2495                const n = posix.write(pty_fd, daemon.pty_write_buf.items) catch |err| {
2496                    if (err != error.WouldBlock) {
2497                        std.log.warn("pty write failed: {s}", .{@errorName(err)});
2498                        daemon.pty_write_buf.clearRetainingCapacity();
2499                    }
2500                    break;
2501                };
2502                if (n == 0) break;
2503                daemon.pty_write_buf.replaceRange(daemon.alloc, 0, n, &[_]u8{}) catch unreachable;
2504            }
2505        }
2506
2507        var i: usize = daemon.clients.items.len;
2508        // Only iterate over clients that were present when poll_fds was constructed
2509        // poll_fds contains [server, pty, sig_pipe, client0, client1, ...]
2510        // So number of clients in poll_fds is poll_fds.items.len - 3
2511        const num_polled_clients = poll_fds.items.len - 3;
2512        if (i > num_polled_clients) {
2513            // If we have more clients than polled (i.e. we just accepted one), start from the
2514            // polled ones
2515            i = num_polled_clients;
2516        }
2517
2518        clients_loop: while (i > 0) {
2519            i -= 1;
2520            const client = daemon.clients.items[i];
2521            const revents = poll_fds.items[i + 3].revents;
2522
2523            if (revents & posix.POLL.IN != 0) {
2524                const n = client.read_buf.read(client.socket_fd) catch |err| {
2525                    if (err == error.WouldBlock) continue;
2526                    std.log.debug(
2527                        "client read err={s} fd={d}",
2528                        .{ @errorName(err), client.socket_fd },
2529                    );
2530                    const last = daemon.closeClient(client, i, false);
2531                    if (last) break :daemon_loop;
2532                    continue;
2533                };
2534
2535                if (n == 0) {
2536                    // Client closed connection
2537                    const last = daemon.closeClient(client, i, false);
2538                    if (last) break :daemon_loop;
2539                    continue;
2540                }
2541
2542                while (client.read_buf.next()) |msg| {
2543                    switch (msg.header.tag) {
2544                        .Input => try daemon.handleInput(client, msg.payload),
2545                        .Output => try daemon.handleOutput(msg.payload, &vt_stream),
2546                        .Init => try daemon.handleInit(client, pty_fd, &term, msg.payload),
2547                        .Switch => try daemon.handleSwitch(msg.payload),
2548                        .Resize => try daemon.handleResize(client, pty_fd, &term, msg.payload),
2549                        .Detach => {
2550                            daemon.handleDetach(client, i);
2551                            break :clients_loop;
2552                        },
2553                        .DetachAll => {
2554                            daemon.handleDetachAll();
2555                            break :clients_loop;
2556                        },
2557                        .Kill => {
2558                            break :daemon_loop;
2559                        },
2560                        .Info => try daemon.handleInfo(client),
2561                        .History => try daemon.handleHistory(client, &term, msg.payload),
2562                        .Run => try daemon.handleRun(client, msg.payload),
2563                        .Ack, .TaskComplete => {},
2564                        .Write => try daemon.handleWrite(client, msg.payload),
2565                        _ => std.log.warn(
2566                            "ignoring unknown IPC tag={d}",
2567                            .{@intFromEnum(msg.header.tag)},
2568                        ),
2569                    }
2570                }
2571            }
2572
2573            if (revents & posix.POLL.OUT != 0) {
2574                // Flush pending output buffers
2575                const n = posix.write(client.socket_fd, client.write_buf.items) catch |err| blk: {
2576                    if (err == error.WouldBlock) break :blk 0;
2577                    // Error on write, close client
2578                    const last = daemon.closeClient(client, i, false);
2579                    if (last) break :daemon_loop;
2580                    continue;
2581                };
2582
2583                if (n > 0) {
2584                    client.write_buf.replaceRange(daemon.alloc, 0, n, &[_]u8{}) catch unreachable;
2585                }
2586
2587                if (client.write_buf.items.len == 0) {
2588                    client.has_pending_output = false;
2589                }
2590            }
2591
2592            if (revents & (posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
2593                const last = daemon.closeClient(client, i, false);
2594                if (last) break :daemon_loop;
2595            }
2596        }
2597    }
2598}
2599
2600fn wakeSignalPipe(_: i32, _: *const posix.siginfo_t, _: ?*anyopaque) callconv(.c) void {
2601    const saved = std.c._errno().*;
2602    _ = std.c.write(sig_pipe[1], "x", 1);
2603    std.c._errno().* = saved;
2604}
2605
2606// std.posix.poll retries EINTR internally, so SA_RESTART is moot -- neither
2607// setting wakes the loop. The handler writes to sig_pipe instead; poll()
2608// wakes on its read end.
2609fn installWakeHandler(sig: u6) void {
2610    const act: posix.Sigaction = .{
2611        .handler = .{ .sigaction = wakeSignalPipe },
2612        .mask = posix.sigemptyset(),
2613        .flags = posix.SA.SIGINFO,
2614    };
2615    posix.sigaction(sig, &act, null);
2616}
2617
2618fn ignoreSigpipe() void {
2619    const act: posix.Sigaction = .{
2620        .handler = .{ .handler = posix.SIG.IGN },
2621        .mask = posix.sigemptyset(),
2622        .flags = 0,
2623    };
2624    posix.sigaction(posix.SIG.PIPE, &act, null);
2625}