Eric Bower
·
2026-06-19
1const std = @import("std");
2const posix = std.posix;
3const build_options = @import("build_options");
4const ghostty_vt = @import("ghostty-vt");
5const ipc = @import("ipc.zig");
6const log = @import("log.zig");
7const completions = @import("completions.zig");
8const util = @import("util.zig");
9const cross = @import("cross.zig");
10const socket = @import("socket.zig");
11
12pub const version = build_options.version;
13pub const ghostty_version = build_options.ghostty_version;
14
15var log_system = log.LogSystem{};
16
17pub const std_options: std.Options = .{
18 .logFn = zmxLogFn,
19 .log_level = .debug,
20};
21
22fn zmxLogFn(
23 comptime level: std.log.Level,
24 comptime scope: @Type(.enum_literal),
25 comptime format: []const u8,
26 args: anytype,
27) void {
28 log_system.log(level, scope, format, args);
29}
30
31/// Self-pipe woken by signal handlers. std.posix.poll loops on .INTR internally
32/// (PollError has no Interrupted member), so a signal that lands during poll()
33/// never surfaces; the handler writes a byte here and poll() wakes on POLLIN.
34var sig_pipe: [2]posix.fd_t = .{ -1, -1 };
35
36// https://github.com/ziglang/zig/blob/738d2be9d6b6ef3ff3559130c05159ef53336224/lib/std/posix.zig#L3505
37const O_NONBLOCK: usize = 1 << @bitOffsetOf(posix.O, "NONBLOCK");
38
39const SessionMatch = struct {
40 name: []const u8,
41 is_prefix: bool,
42
43 fn matches(self: SessionMatch, session_name: []const u8) bool {
44 if (self.is_prefix) return std.mem.startsWith(u8, session_name, self.name);
45 return std.mem.eql(u8, session_name, self.name);
46 }
47};
48
49fn parseSessionArg(alloc: std.mem.Allocator, raw: []const u8) !SessionMatch {
50 if (raw.len > 0 and raw[raw.len - 1] == '*') {
51 const name = try socket.getSeshName(alloc, raw[0 .. raw.len - 1]);
52 return .{ .name = name, .is_prefix = true };
53 }
54 const name = try socket.getSeshName(alloc, raw);
55 return .{ .name = name, .is_prefix = false };
56}
57
58fn openSignalPipe() !void {
59 sig_pipe = try posix.pipe2(.{ .CLOEXEC = true, .NONBLOCK = true });
60}
61
62fn drainSignalPipe() void {
63 var b: [16]u8 = undefined;
64 while (true) {
65 const n = posix.read(sig_pipe[0], &b) catch return;
66 if (n == 0) return;
67 }
68}
69
70pub fn main() !void {
71 // use c_allocator to avoid "reached unreachable code" panic in DebugAllocator when forking
72 const alloc = std.heap.c_allocator;
73
74 // Every subcommand may write to a Unix-domain socket; a peer that
75 // disappears between probe and send would otherwise kill us before
76 // write() can return BrokenPipe. Inherited across fork, so this also
77 // covers the daemon.
78 ignoreSigpipe();
79
80 var args = try std.process.argsWithAllocator(alloc);
81 defer args.deinit();
82 _ = args.skip(); // skip program name
83
84 var cfg = try Cfg.init(alloc);
85 defer cfg.deinit(alloc);
86
87 const log_path = try std.fs.path.join(alloc, &.{ cfg.log_dir, "zmx.log" });
88 defer alloc.free(log_path);
89 try log_system.init(alloc, log_path, cfg.log_mode);
90 defer log_system.deinit();
91
92 const cmd = args.next() orelse {
93 return list(&cfg, false);
94 };
95
96 if (std.mem.eql(u8, cmd, "version") or std.mem.eql(u8, cmd, "v") or std.mem.eql(u8, cmd, "-v") or std.mem.eql(u8, cmd, "--version")) {
97 return printVersion(&cfg);
98 } else if (std.mem.eql(u8, cmd, "help") or std.mem.eql(u8, cmd, "h") or std.mem.eql(u8, cmd, "-h")) {
99 return help();
100 } else if (std.mem.eql(u8, cmd, "list") or std.mem.eql(u8, cmd, "l") or std.mem.eql(u8, cmd, "ls")) {
101 var short = false;
102 if (args.next()) |arg| {
103 if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) {
104 return help();
105 }
106 short = std.mem.eql(u8, arg, "--short");
107 }
108 return list(&cfg, short);
109 } else if (std.mem.eql(u8, cmd, "completions") or std.mem.eql(u8, cmd, "c")) {
110 const arg = args.next() orelse return;
111 if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) {
112 return help();
113 }
114 const shell = completions.Shell.fromString(arg) orelse return;
115 return printCompletions(shell);
116 } else if (std.mem.eql(u8, cmd, "detach") or std.mem.eql(u8, cmd, "d")) {
117 return detachAll(&cfg);
118 } else if (std.mem.eql(u8, cmd, "history") or std.mem.eql(u8, cmd, "hi")) {
119 var session_name: ?[]const u8 = null;
120 var format: util.HistoryFormat = .plain;
121 while (args.next()) |arg| {
122 if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) {
123 return help();
124 } else if (std.mem.eql(u8, arg, "--vt")) {
125 format = .vt;
126 } else if (std.mem.eql(u8, arg, "--html")) {
127 format = .html;
128 } else if (session_name == null) {
129 session_name = arg;
130 }
131 }
132 const sesh_env = socket.getSeshNameFromEnv();
133 const sesh = try socket.getSeshName(alloc, session_name orelse sesh_env);
134 defer alloc.free(sesh);
135 return history(&cfg, sesh, format);
136 } else if (std.mem.eql(u8, cmd, "attach") or std.mem.eql(u8, cmd, "a")) {
137 const session_name = args.next() orelse "";
138 if (std.mem.eql(u8, session_name, "--help") or std.mem.eql(u8, session_name, "-h")) {
139 return help();
140 }
141
142 var command_args: std.ArrayList([]const u8) = .empty;
143 defer command_args.deinit(alloc);
144 while (args.next()) |arg| {
145 try command_args.append(alloc, arg);
146 }
147
148 const clients = try std.ArrayList(*Client).initCapacity(alloc, 10);
149 var command: ?[][]const u8 = null;
150 if (command_args.items.len > 0) {
151 command = command_args.items;
152 }
153
154 var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
155 const cwd = std.posix.getcwd(&cwd_buf) catch "";
156
157 const sesh = try socket.getSeshName(alloc, session_name);
158 defer alloc.free(sesh);
159 var daemon = Daemon{
160 .running = true,
161 .cfg = &cfg,
162 .alloc = alloc,
163 .clients = clients,
164 .session_name = sesh,
165 .socket_path = undefined,
166 .pid = undefined,
167 .command = command,
168 .cwd = cwd,
169 .created_at = @intCast(std.time.timestamp()),
170 .leader_client_fd = null,
171 };
172 daemon.socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
173 error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
174 error.OutOfMemory => return err,
175 };
176 std.log.info("socket path={s}", .{daemon.socket_path});
177 return attach(&daemon);
178 } else if (std.mem.eql(u8, cmd, "run") or std.mem.eql(u8, cmd, "r")) {
179 const session_name = args.next() orelse "";
180 if (std.mem.eql(u8, session_name, "--help") or std.mem.eql(u8, session_name, "-h")) {
181 return help();
182 }
183
184 var cmd_args_raw: std.ArrayList([]const u8) = .empty;
185 defer cmd_args_raw.deinit(alloc);
186 var detached = false;
187 while (args.next()) |arg| {
188 if (std.mem.startsWith(u8, arg, "-d")) {
189 detached = true;
190 continue;
191 }
192 try cmd_args_raw.append(alloc, arg);
193 }
194 const clients = try std.ArrayList(*Client).initCapacity(alloc, 10);
195
196 var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
197 const cwd = std.posix.getcwd(&cwd_buf) catch "";
198
199 const sesh = try socket.getSeshName(alloc, session_name);
200 defer alloc.free(sesh);
201 var daemon = Daemon{
202 .running = true,
203 .cfg = &cfg,
204 .alloc = alloc,
205 .clients = clients,
206 .session_name = sesh,
207 .socket_path = undefined,
208 .pid = undefined,
209 .command = null,
210 .cwd = cwd,
211 .created_at = @intCast(std.time.timestamp()),
212 .is_task_mode = true,
213 .leader_client_fd = null,
214 };
215 daemon.socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
216 error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
217 error.OutOfMemory => return err,
218 };
219 std.log.info("socket path={s}", .{daemon.socket_path});
220 return run(&daemon, detached, cmd_args_raw.items);
221 } else if (std.mem.eql(u8, cmd, "send") or std.mem.eql(u8, cmd, "s")) {
222 const session_name = args.next() orelse "";
223 if (std.mem.eql(u8, session_name, "--help") or std.mem.eql(u8, session_name, "-h")) {
224 return help();
225 }
226 if (session_name.len == 0) return error.SessionNameRequired;
227
228 var text_parts: std.ArrayList([]const u8) = .empty;
229 defer text_parts.deinit(alloc);
230 while (args.next()) |arg| {
231 try text_parts.append(alloc, arg);
232 }
233
234 const sesh = try socket.getSeshName(alloc, session_name);
235 defer alloc.free(sesh);
236 const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
237 error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
238 error.OutOfMemory => return err,
239 };
240 return send(&cfg, sesh, socket_path, text_parts.items, .Input);
241 } else if (std.mem.eql(u8, cmd, "print") or std.mem.eql(u8, cmd, "p")) {
242 const session_name = args.next() orelse "";
243 if (std.mem.eql(u8, session_name, "--help") or std.mem.eql(u8, session_name, "-h")) {
244 return help();
245 }
246 if (session_name.len == 0) return error.SessionNameRequired;
247
248 var text_parts: std.ArrayList([]const u8) = .empty;
249 defer text_parts.deinit(alloc);
250 while (args.next()) |arg| {
251 try text_parts.append(alloc, arg);
252 }
253
254 const sesh = try socket.getSeshName(alloc, session_name);
255 defer alloc.free(sesh);
256 const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
257 error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
258 error.OutOfMemory => return err,
259 };
260 return send(&cfg, sesh, socket_path, text_parts.items, .Output);
261 } else if (std.mem.eql(u8, cmd, "kill") or std.mem.eql(u8, cmd, "k")) {
262 var stderr_buffer: [1024]u8 = undefined;
263 var stderr_writer = std.fs.File.stderr().writer(&stderr_buffer);
264 const stderr = &stderr_writer.interface;
265
266 var matchers: std.ArrayList(SessionMatch) = .empty;
267 defer {
268 for (matchers.items) |m| {
269 alloc.free(m.name);
270 }
271 matchers.deinit(alloc);
272 }
273 var force = false;
274 while (args.next()) |session_name| {
275 if (std.mem.eql(u8, session_name, "--help") or std.mem.eql(u8, session_name, "-h")) {
276 return help();
277 }
278 if (std.mem.eql(u8, session_name, "--force")) {
279 force = true;
280 continue;
281 }
282 const m = try parseSessionArg(alloc, session_name);
283 try matchers.append(alloc, m);
284 }
285 if (matchers.items.len == 0) {
286 return error.SessionNameRequired;
287 }
288 var sessions = try util.get_session_entries(alloc, cfg.socket_dir);
289 defer {
290 for (sessions.items) |session| {
291 session.deinit(alloc);
292 }
293 sessions.deinit(alloc);
294 }
295
296 for (sessions.items) |session| {
297 for (matchers.items) |m| {
298 if (!m.matches(session.name)) {
299 continue;
300 }
301
302 kill(&cfg, session.name, force) catch |err| {
303 try stderr.print(
304 "failed to kill session={s}: {s}\n",
305 .{ session.name, @errorName(err) },
306 );
307 try stderr.flush();
308 };
309 break;
310 }
311 }
312 } else if (std.mem.eql(u8, cmd, "wait") or std.mem.eql(u8, cmd, "w")) {
313 var matchers: std.ArrayList(SessionMatch) = .empty;
314 defer {
315 for (matchers.items) |m| {
316 alloc.free(m.name);
317 }
318 matchers.deinit(alloc);
319 }
320 while (args.next()) |session_name| {
321 if (std.mem.eql(u8, session_name, "--help") or std.mem.eql(u8, session_name, "-h")) {
322 return help();
323 }
324 const m = try parseSessionArg(alloc, session_name);
325 try matchers.append(alloc, m);
326 }
327 if (matchers.items.len == 0) {
328 return error.SessionNameRequired;
329 }
330 return wait(&cfg, matchers);
331 } else if (std.mem.eql(u8, cmd, "tail") or std.mem.eql(u8, cmd, "t")) {
332 var matchers: std.ArrayList(SessionMatch) = .empty;
333 defer {
334 for (matchers.items) |m| {
335 alloc.free(m.name);
336 }
337 matchers.deinit(alloc);
338 }
339 while (args.next()) |session_name| {
340 if (std.mem.eql(u8, session_name, "--help") or std.mem.eql(u8, session_name, "-h")) {
341 return help();
342 }
343 const m = try parseSessionArg(alloc, session_name);
344 try matchers.append(alloc, m);
345 }
346 if (matchers.items.len == 0) {
347 return error.SessionNameRequired;
348 }
349
350 // Resolve matchers against session list to get actual session names.
351 var resolved_names: std.ArrayList([]const u8) = .empty;
352 defer {
353 for (resolved_names.items) |name| {
354 alloc.free(name);
355 }
356 resolved_names.deinit(alloc);
357 }
358
359 var any_prefix = false;
360 for (matchers.items) |m| {
361 if (m.is_prefix) {
362 any_prefix = true;
363 break;
364 }
365 }
366
367 if (any_prefix) {
368 var sessions = try util.get_session_entries(alloc, cfg.socket_dir);
369 defer {
370 for (sessions.items) |session| {
371 session.deinit(alloc);
372 }
373 sessions.deinit(alloc);
374 }
375 for (sessions.items) |session| {
376 for (matchers.items) |m| {
377 if (m.matches(session.name)) {
378 try resolved_names.append(alloc, try alloc.dupe(u8, session.name));
379 break;
380 }
381 }
382 }
383 }
384 // Add exact-match names directly.
385 for (matchers.items) |m| {
386 if (!m.is_prefix) {
387 try resolved_names.append(alloc, try alloc.dupe(u8, m.name));
388 }
389 }
390
391 var client_socket_fds = try std.ArrayList(i32).initCapacity(alloc, resolved_names.items.len);
392 defer {
393 for (client_socket_fds.items) |client_fd| {
394 posix.close(client_fd);
395 }
396 client_socket_fds.deinit(alloc);
397 }
398
399 for (resolved_names.items) |session_name| {
400 const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, session_name) catch |err| switch (err) {
401 error.NameTooLong => return socket.printSessionNameTooLong(session_name, cfg.socket_dir),
402 error.OutOfMemory => return err,
403 };
404 const client_sock = try socket.sessionConnect(socket_path);
405 try client_socket_fds.append(alloc, client_sock);
406 }
407 _ = try tail(client_socket_fds, false, false);
408 } else if (std.mem.eql(u8, cmd, "write") or std.mem.eql(u8, cmd, "wr")) {
409 const session_name = args.next() orelse "";
410 if (std.mem.eql(u8, session_name, "--help") or std.mem.eql(u8, session_name, "-h")) {
411 return help();
412 }
413 if (session_name.len == 0) return error.SessionNameRequired;
414 const file_path = args.next() orelse "";
415 if (std.mem.eql(u8, file_path, "--help") or std.mem.eql(u8, file_path, "-h")) {
416 return help();
417 }
418 if (file_path.len == 0) return error.FilePathRequired;
419
420 var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
421 const cwd = std.posix.getcwd(&cwd_buf) catch "";
422 const clients = try std.ArrayList(*Client).initCapacity(alloc, 10);
423 const sesh = try socket.getSeshName(alloc, session_name);
424 defer alloc.free(sesh);
425 var daemon = Daemon{
426 .running = true,
427 .cfg = &cfg,
428 .alloc = alloc,
429 .clients = clients,
430 .session_name = sesh,
431 .socket_path = undefined,
432 .pid = undefined,
433 .command = null,
434 .cwd = cwd,
435 .created_at = @intCast(std.time.timestamp()),
436 .is_task_mode = true,
437 .leader_client_fd = null,
438 };
439 daemon.socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
440 error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
441 error.OutOfMemory => return err,
442 };
443 std.log.info("socket path={s}", .{daemon.socket_path});
444 try writeFile(&daemon, file_path);
445 } else {
446 return help();
447 }
448}
449
450/// Client represents each terminal that has connected to a session.
451///
452/// Multiple Clients can connect to a single session.
453const Client = struct {
454 alloc: std.mem.Allocator,
455 socket_fd: i32,
456 has_pending_output: bool = false,
457 read_buf: ipc.SocketBuffer,
458 write_buf: std.ArrayList(u8),
459
460 pub fn deinit(self: *Client) void {
461 posix.close(self.socket_fd);
462 self.read_buf.deinit();
463 self.write_buf.deinit(self.alloc);
464 }
465};
466
467/// Cfg is zmx's configuration container.
468///
469/// The purpose of this container is to hold anything that can be modified by the user.
470const Cfg = struct {
471 socket_dir: []const u8,
472 log_dir: []const u8,
473 max_scrollback: usize = 10_000_000,
474 dir_mode: u32 = 0o750,
475 log_mode: u32 = 0o640,
476
477 pub fn init(alloc: std.mem.Allocator) !Cfg {
478 const socket_dir = try socketDir(alloc);
479 const log_dir = try std.fmt.allocPrint(alloc, "{s}/logs", .{socket_dir});
480 errdefer alloc.free(log_dir);
481
482 const dir_mode = if (std.posix.getenv("ZMX_DIR_MODE")) |m|
483 std.fmt.parseInt(u32, m, 8) catch 0o750
484 else
485 0o750;
486
487 const log_mode = if (std.posix.getenv("ZMX_LOG_MODE")) |m|
488 std.fmt.parseInt(u32, m, 8) catch 0o640
489 else
490 0o640;
491
492 var cfg = Cfg{
493 .socket_dir = socket_dir,
494 .log_dir = log_dir,
495 .dir_mode = dir_mode,
496 .log_mode = log_mode,
497 };
498
499 try cfg.mkdir();
500
501 return cfg;
502 }
503
504 fn socketDir(alloc: std.mem.Allocator) ![]const u8 {
505 const tmpdir = std.mem.trimRight(u8, posix.getenv("TMPDIR") orelse "/tmp", "/");
506 const uid = posix.getuid();
507
508 const socket_dir: []const u8 = if (posix.getenv("ZMX_DIR")) |zmxdir|
509 try alloc.dupe(u8, zmxdir)
510 else if (posix.getenv("XDG_RUNTIME_DIR")) |xdg_runtime|
511 try std.fmt.allocPrint(alloc, "{s}/zmx", .{xdg_runtime})
512 else
513 try std.fmt.allocPrint(alloc, "{s}/zmx-{d}", .{ tmpdir, uid });
514 errdefer alloc.free(socket_dir);
515
516 return socket_dir;
517 }
518
519 pub fn deinit(self: *Cfg, alloc: std.mem.Allocator) void {
520 if (self.socket_dir.len > 0) alloc.free(self.socket_dir);
521 if (self.log_dir.len > 0) alloc.free(self.log_dir);
522 }
523
524 pub fn mkdir(self: *Cfg) !void {
525 posix.mkdirat(posix.AT.FDCWD, self.socket_dir, @intCast(self.dir_mode)) catch |err| switch (err) {
526 error.PathAlreadyExists => {},
527 else => return err,
528 };
529
530 posix.mkdirat(posix.AT.FDCWD, self.log_dir, @intCast(self.dir_mode)) catch |err| switch (err) {
531 error.PathAlreadyExists => {},
532 else => return err,
533 };
534 }
535};
536
537test "Cfg.init uses default modes when env vars are not set" {
538 const alloc = std.testing.allocator;
539
540 // Ensure they are not set
541 _ = cross.c.unsetenv("ZMX_DIR_MODE");
542 _ = cross.c.unsetenv("ZMX_LOG_MODE");
543
544 var cfg = try Cfg.init(alloc);
545 defer cfg.deinit(alloc);
546
547 try std.testing.expectEqual(@as(u32, 0o750), cfg.dir_mode);
548 try std.testing.expectEqual(@as(u32, 0o640), cfg.log_mode);
549}
550
551test "Cfg.init uses custom modes from env vars" {
552 const alloc = std.testing.allocator;
553
554 // Set custom octal values
555 _ = cross.c.setenv("ZMX_DIR_MODE", "770", 1);
556 _ = cross.c.setenv("ZMX_LOG_MODE", "660", 1);
557 defer {
558 _ = cross.c.unsetenv("ZMX_DIR_MODE");
559 _ = cross.c.unsetenv("ZMX_LOG_MODE");
560 }
561
562 var cfg = try Cfg.init(alloc);
563 defer cfg.deinit(alloc);
564
565 try std.testing.expectEqual(@as(u32, 0o770), cfg.dir_mode);
566 try std.testing.expectEqual(@as(u32, 0o660), cfg.log_mode);
567}
568
569/// Daemon is responsible for managing a zmx session.
570///
571/// It holds all the state for a running session. Instead of a single daemon for all sessions, we
572/// create a daemon for every session. This has some benefits. The ipc communication between
573/// session clients and the daemon doesn't need to be tagged with the session name. If a daemon
574/// crashes for one session won't crash all the other sessions.
575///
576/// Conceptually it's also much simpler to reason about.
577const Daemon = struct {
578 cfg: *Cfg,
579 alloc: std.mem.Allocator,
580 clients: std.ArrayList(*Client),
581 // This control which client is the leader. The leader controls terminal state and
582 // cols/rows of session.
583 leader_client_fd: ?i32,
584 session_name: []const u8,
585 socket_path: []const u8,
586 running: bool,
587 pid: i32,
588 command: ?[]const []const u8 = null,
589 cwd: []const u8 = "",
590 has_pty_output: bool = false,
591 has_had_client: bool = false,
592 has_terminal_client: bool = false, // true only after a real attach (.Init received)
593 created_at: u64, // unix timestamp (ns)
594 is_task_mode: bool = false, // flag for when session is run as a task
595 task_exit_code: ?u8 = null, // null = running or n/a, set when task completes
596 task_ended_at: ?u64 = null, // timestamp when task exited
597 pty_fd: i32 = -1, // set by daemonLoop so handleRun can probe the foreground process
598 pty_write_buf: std.ArrayList(u8) = .empty,
599
600 const EnsureSessionResult = struct {
601 created: bool,
602 is_daemon: bool,
603 };
604
605 pub fn deinit(self: *Daemon) void {
606 self.clients.deinit(self.alloc);
607 self.pty_write_buf.deinit(self.alloc);
608 self.alloc.free(self.socket_path);
609 }
610
611 pub fn shutdown(self: *Daemon) void {
612 std.log.info("shutting down daemon session={s}", .{self.session_name});
613 self.running = false;
614
615 for (self.clients.items) |client| {
616 client.deinit();
617 self.alloc.destroy(client);
618 }
619 self.clients.clearRetainingCapacity();
620 }
621
622 pub fn closeClient(self: *Daemon, client: *Client, i: usize, shutdown_on_last: bool) bool {
623 const fd = client.socket_fd;
624 // leader is disconnected, remove ref and let another client claim leader on input
625 if (self.leader_client_fd == client.socket_fd) {
626 std.log.info(
627 "unsetting leader session={s} fd={d}",
628 .{ self.session_name, client.socket_fd },
629 );
630 self.leader_client_fd = null;
631 }
632 client.deinit();
633 self.alloc.destroy(client);
634 _ = self.clients.orderedRemove(i);
635 std.log.info("client disconnected fd={d} remaining={d}", .{ fd, self.clients.items.len });
636 if (shutdown_on_last and self.clients.items.len == 0) {
637 self.shutdown();
638 return true;
639 }
640 return false;
641 }
642
643 fn setLeader(self: *Daemon, client: *Client) !void {
644 std.log.info("setting new leader client_fd={d}", .{client.socket_fd});
645 self.leader_client_fd = client.socket_fd;
646 // Send a resize message to the client so it can send us back their window size
647 // so we can resize the pty and ghostty state.
648 try ipc.appendMessage(self.alloc, &client.write_buf, .Resize, "");
649 client.has_pending_output = true;
650 }
651
652 /// Runs in the forked child. Either execs or returns an error (caller
653 /// must exit on error -- returning would fall through to parent code).
654 fn execChild(self: *Daemon) !noreturn {
655 const alloc = std.heap.c_allocator;
656
657 // main() set SIGPIPE to SIG_IGN, which (unlike handlers) survives
658 // exec. Restore the default so the shell and its children behave
659 // normally (e.g. `yes | head` should exit 141 via SIGPIPE).
660 const dfl: posix.Sigaction = .{
661 .handler = .{ .handler = posix.SIG.DFL },
662 .mask = posix.sigemptyset(),
663 .flags = 0,
664 };
665 posix.sigaction(posix.SIG.PIPE, &dfl, null);
666
667 const session_env = try std.fmt.allocPrintSentinel(
668 alloc,
669 "ZMX_SESSION={s}",
670 .{self.session_name},
671 0,
672 );
673 _ = cross.c.putenv(session_env.ptr);
674
675 if (self.command) |cmd_args| {
676 const argv = try alloc.allocSentinel(?[*:0]const u8, cmd_args.len, null);
677 for (cmd_args, 0..) |arg, i| {
678 argv[i] = try alloc.dupeZ(u8, arg);
679 }
680 const err = std.posix.execvpeZ(argv[0].?, argv.ptr, std.c.environ);
681 std.log.err("execvpe failed: cmd={s} err={s}", .{ cmd_args[0], @errorName(err) });
682 std.posix.exit(1);
683 }
684
685 const shell: [:0]const u8 = if (self.is_task_mode) "bash" else util.detectShell();
686 // Use "-shellname" as argv[0] to signal login shell (traditional method)
687 const login_shell = try std.fmt.allocPrintSentinel(
688 alloc,
689 "-{s}",
690 .{std.fs.path.basename(shell)},
691 0,
692 );
693 const argv = [_:null]?[*:0]const u8{ login_shell, null };
694 const err = std.posix.execvpeZ(shell, &argv, std.c.environ);
695 std.log.err("execvpe failed: shell={s} err={s}", .{ shell, @errorName(err) });
696 std.posix.exit(1);
697 }
698
699 /// spawnPty runs forkpty() and executes the shell or shell command the user provides.
700 fn spawnPty(self: *Daemon) !c_int {
701 const size = ipc.getTerminalSize(posix.STDOUT_FILENO);
702 var ws: cross.c.struct_winsize = .{
703 .ws_row = size.rows,
704 .ws_col = size.cols,
705 .ws_xpixel = 0,
706 .ws_ypixel = 0,
707 };
708
709 var master_fd: c_int = undefined;
710 const pid = cross.forkpty(&master_fd, null, null, &ws);
711 if (pid < 0) {
712 return error.ForkPtyFailed;
713 }
714
715 if (pid == 0) { // child pid code path
716 // In the forked child, ANY error must exit rather than propagate:
717 // a returned error falls through to the parent code path below,
718 // running a second daemon on the same socket (or worse, hitting
719 // errdefers that delete the parent's socket file).
720 execChild(self) catch |err| {
721 std.log.err("child setup failed: {s}", .{@errorName(err)});
722 std.posix.exit(1);
723 };
724 unreachable; // execChild either execs or exits, never returns ok
725 }
726 // master pid code path
727 self.pid = pid;
728 std.log.info("pty spawned session={s} pid={d}", .{ self.session_name, pid });
729
730 // make pty non-blocking
731 const flags = try posix.fcntl(master_fd, posix.F.GETFL, 0);
732 _ = try posix.fcntl(master_fd, posix.F.SETFL, flags | O_NONBLOCK);
733 return master_fd;
734 }
735
736 /// ensureSession "upserts" a session by checking if the unix socket exists already.
737 /// If not it creates one and spawns the daemon.
738 fn ensureSession(self: *Daemon) !EnsureSessionResult {
739 var dir = try std.fs.openDirAbsolute(self.cfg.socket_dir, .{});
740 defer dir.close();
741
742 const exists = try socket.sessionExists(dir, self.session_name);
743 var should_create = !exists;
744
745 if (exists) {
746 if (ipc.connectSession(self.socket_path)) |fd| {
747 posix.close(fd);
748 if (self.command != null) {
749 std.log.warn(
750 "session already exists, ignoring command session={s}",
751 .{self.session_name},
752 );
753 }
754 } else |err| switch (err) {
755 // Daemon is definitively gone: safe to replace.
756 error.ConnectionRefused => {
757 socket.cleanupStaleSocket(dir, self.session_name);
758 should_create = true;
759 },
760 // Connect failed for an unusual reason. The check is only to
761 // decide create-vs-attach; the socket file exists, so proceed
762 // to attach rather than fail or orphan.
763 else => {
764 std.log.warn(
765 "connect failed ({s}), proceeding to attach session={s}",
766 .{ @errorName(err), self.session_name },
767 );
768 },
769 }
770 }
771
772 if (should_create) {
773 std.log.info("creating session={s}", .{self.session_name});
774 const server_sock_fd = try socket.createSocket(self.socket_path);
775
776 // creates the daemon
777 const pid = try posix.fork();
778 if (pid == 0) { // child (daemon)
779 // becomes the session leader and detaches process from its controlling terminal
780 _ = try posix.setsid();
781
782 log_system.deinit();
783
784 // Redirect stdin/stdout/stderr to /dev/null. The daemon
785 // communicates via its unix socket, not stdio. Without
786 // this, any pipe on FDs 0-2 (e.g. from bats' `run`
787 // keyword) stays open for the daemon's lifetime, causing
788 // the caller to hang waiting for EOF.
789 {
790 const devnull = std.posix.open(
791 "/dev/null",
792 .{ .ACCMODE = .RDWR },
793 0,
794 ) catch |err| {
795 std.log.warn("failed to open /dev/null: {s}", .{@errorName(err)});
796 return err;
797 };
798 inline for (.{ posix.STDIN_FILENO, posix.STDOUT_FILENO, posix.STDERR_FILENO }) |fd| {
799 _ = posix.dup2(devnull, fd) catch |err| {
800 std.log.warn("dup2 /dev/null -> {d}: {s}", .{ fd, @errorName(err) });
801 return err;
802 };
803 }
804 if (devnull > 2) posix.close(devnull);
805 }
806
807 // Close file descriptors inherited from the parent that the
808 // daemon doesn't need. This prevents test harnesses (like
809 // bats) from hanging -- they wait for their internal FDs (3+)
810 // to close before exiting.
811 //
812 // Must run BEFORE log_system.init() otherwise the new log
813 // FD gets closed, and spawnPty() reuses that FD number for
814 // the PTY master, causing log writes to leak into the terminal.
815 //
816 // Skip server_sock_fd (needed for IPC) and dir.fd (needed to
817 // delete the socket file on shutdown).
818 {
819 const dir_fd = @as(i32, @intCast(dir.fd));
820 var fd: i32 = 3;
821 while (fd < 64) : (fd += 1) {
822 if (fd == server_sock_fd or fd == dir_fd) continue;
823 _ = std.c.close(fd);
824 }
825 }
826
827 const session_log_name = try std.fmt.allocPrint(
828 self.alloc,
829 "{s}.log",
830 .{self.session_name},
831 );
832 defer self.alloc.free(session_log_name);
833 const session_log_path = try std.fs.path.join(
834 self.alloc,
835 &.{ self.cfg.log_dir, session_log_name },
836 );
837 defer self.alloc.free(session_log_path);
838 try log_system.init(self.alloc, session_log_path, self.cfg.log_mode);
839
840 // If spawnPty fails, clean up here. Once it succeeds,
841 // the inner block's defer takes ownership of cleanup to
842 // avoid double-closing server_sock_fd on daemonLoop error.
843 const pty_fd = self.spawnPty() catch |err| {
844 posix.close(server_sock_fd);
845 dir.deleteFile(self.session_name) catch {};
846 return err;
847 };
848
849 defer {
850 self.handleKill();
851 self.deinit();
852 posix.close(pty_fd);
853 _ = posix.waitpid(self.pid, 0);
854 posix.close(server_sock_fd);
855 std.log.info("deleting socket file session={s}", .{self.session_name});
856 dir.deleteFile(self.session_name) catch |err| {
857 std.log.warn("failed to delete socket file err={s}", .{@errorName(err)});
858 };
859 }
860
861 try daemonLoop(self, server_sock_fd, pty_fd);
862 return .{ .created = true, .is_daemon = true };
863 }
864 posix.close(server_sock_fd);
865 std.Thread.sleep(10 * std.time.ns_per_ms);
866 return .{ .created = true, .is_daemon = false };
867 }
868
869 return .{ .created = false, .is_daemon = false };
870 }
871
872 const PTY_WRITE_BUF_MAX = 256 * 1024;
873
874 /// Queue bytes for the PTY's stdin. Flushed by daemonLoop on POLLOUT.
875 /// Drops the payload if the buffer is over cap -- same failure mode as
876 /// the old direct-write ptyWrite (drop on EAGAIN), just at a 64x higher
877 /// threshold. Capping avoids OOM when the shell stops reading; dropping
878 /// new (not old) bytes avoids tearing a partially-accepted sequence.
879 fn queuePtyInput(self: *Daemon, data: []const u8) void {
880 if (data.len == 0) return;
881 if (self.pty_write_buf.items.len + data.len > PTY_WRITE_BUF_MAX) {
882 std.log.warn(
883 "pty input dropped {d} bytes (buffer full, shell not reading)",
884 .{data.len},
885 );
886 return;
887 }
888 std.log.debug("buffering pty input data={x}", .{data});
889 self.pty_write_buf.appendSlice(self.alloc, data) catch |err| {
890 std.log.warn(
891 "pty input dropped {d} bytes: {s}",
892 .{ data.len, @errorName(err) },
893 );
894 };
895 }
896
897 pub fn handleInput(self: *Daemon, client: *Client, payload: []const u8) !void {
898 std.log.debug("buffering pty input data={x}", .{payload});
899 // client is leader, send entire payload (ansi escape codes + text)
900 if (self.leader_client_fd == client.socket_fd) {
901 self.queuePtyInput(payload);
902 return;
903 }
904
905 // check if leader needs to be updated by detecting any user input
906 if (util.isUserInput(payload)) {
907 try self.setLeader(client);
908 self.queuePtyInput(payload);
909 }
910 }
911
912 pub fn handleSwitch(self: *Daemon, session_name: []const u8) !void {
913 for (self.clients.items) |client| {
914 if (self.leader_client_fd == client.socket_fd) {
915 ipc.appendMessage(
916 self.alloc,
917 &client.write_buf,
918 .Switch,
919 session_name,
920 ) catch |err| {
921 std.log.warn(
922 "failed to buffer terminal state for client err={s}",
923 .{@errorName(err)},
924 );
925 };
926 client.has_pending_output = true;
927 return;
928 }
929 }
930 return error.NoLeaderFound;
931 }
932
933 pub fn handleInit(
934 self: *Daemon,
935 client: *Client,
936 pty_fd: i32,
937 term: *ghostty_vt.Terminal,
938 payload: []const u8,
939 ) !void {
940 if (payload.len != @sizeOf(ipc.Resize)) return;
941
942 // Serialize terminal state BEFORE resize to capture correct cursor position.
943 // Resizing triggers reflow which can move the cursor, and the shell's
944 // SIGWINCH-triggered redraw will run after our snapshot is sent.
945 // Only serialize on re-attach (has_had_client), not first attach, to avoid
946 // interfering with shell initialization (DA1 queries, etc.)
947 if (self.has_pty_output and self.has_had_client) {
948 const cursor = &term.screens.active.cursor;
949 std.log.debug(
950 "cursor before serialize: x={d} y={d} pending_wrap={}",
951 .{ cursor.x, cursor.y, cursor.pending_wrap },
952 );
953 if (util.serializeTerminalState(self.alloc, term)) |term_output| {
954 std.log.debug("serialize terminal state", .{});
955 // Rewrite OSC 133;A to include redraw=0 so the outer terminal
956 // does not clear prompt lines on resize (issue #111).
957 const restore_data = util.rewritePromptRedraw(self.alloc, term_output) orelse term_output;
958 defer self.alloc.free(term_output);
959 defer if (restore_data.ptr != term_output.ptr) self.alloc.free(restore_data);
960 ipc.appendMessage(self.alloc, &client.write_buf, .Output, restore_data) catch |err| {
961 std.log.warn(
962 "failed to buffer terminal state for client err={s}",
963 .{@errorName(err)},
964 );
965 };
966 client.has_pending_output = true;
967 }
968 }
969
970 // no leader is set so set one
971 if (self.leader_client_fd == null) {
972 try self.setLeader(client);
973 }
974
975 // only resize if leader
976 if (self.leader_client_fd == client.socket_fd) {
977 const resize = std.mem.bytesToValue(ipc.Resize, payload);
978 var ws: cross.c.struct_winsize = .{
979 .ws_row = resize.rows,
980 .ws_col = resize.cols,
981 .ws_xpixel = 0,
982 .ws_ypixel = 0,
983 };
984 _ = cross.c.ioctl(pty_fd, cross.c.TIOCSWINSZ, &ws);
985 // Disable prompt_redraw before resize. The daemon's internal terminal
986 // would otherwise clear prompt lines expecting the shell to redraw them,
987 // but the shell's redraw goes to the PTY (forwarded to clients), not to
988 // this daemon terminal. The clearing corrupts the daemon's snapshot state.
989 const saved_prompt_redraw = term.flags.shell_redraws_prompt;
990 term.flags.shell_redraws_prompt = .false;
991 defer term.flags.shell_redraws_prompt = saved_prompt_redraw;
992 try term.resize(self.alloc, resize.cols, resize.rows);
993
994 // Mark that we've had a client init, so subsequent clients get terminal state
995 self.has_had_client = true;
996 self.has_terminal_client = true;
997
998 std.log.debug("init resize rows={d} cols={d}", .{ resize.rows, resize.cols });
999 }
1000 }
1001
1002 pub fn handleResize(
1003 self: *Daemon,
1004 client: *Client,
1005 pty_fd: i32,
1006 term: *ghostty_vt.Terminal,
1007 payload: []const u8,
1008 ) !void {
1009 if (payload.len != @sizeOf(ipc.Resize)) return;
1010 if (self.leader_client_fd == null) {
1011 try self.setLeader(client);
1012 }
1013 // only leader can resize
1014 if (self.leader_client_fd != client.socket_fd) return;
1015
1016 const resize = std.mem.bytesToValue(ipc.Resize, payload);
1017 var ws: cross.c.struct_winsize = .{
1018 .ws_row = resize.rows,
1019 .ws_col = resize.cols,
1020 .ws_xpixel = 0,
1021 .ws_ypixel = 0,
1022 };
1023 _ = cross.c.ioctl(pty_fd, cross.c.TIOCSWINSZ, &ws);
1024 // Disable prompt_redraw before resize (same rationale as handleInit).
1025 const saved_prompt_redraw = term.flags.shell_redraws_prompt;
1026 term.flags.shell_redraws_prompt = .false;
1027 defer term.flags.shell_redraws_prompt = saved_prompt_redraw;
1028 try term.resize(self.alloc, resize.cols, resize.rows);
1029 std.log.debug("resize rows={d} cols={d}", .{ resize.rows, resize.cols });
1030 }
1031
1032 pub fn handleDetach(self: *Daemon, client: *Client, i: usize) void {
1033 std.log.info("client detach session={s} fd={d}", .{ self.session_name, client.socket_fd });
1034 _ = self.closeClient(client, i, false);
1035 }
1036
1037 pub fn handleDetachAll(self: *Daemon) void {
1038 std.log.info("detach all clients={d}", .{self.clients.items.len});
1039 for (self.clients.items) |client_to_close| {
1040 client_to_close.deinit();
1041 self.alloc.destroy(client_to_close);
1042 }
1043 self.clients.clearRetainingCapacity();
1044 }
1045
1046 pub fn handleKill(self: *Daemon) void {
1047 std.log.info("kill received session={s}", .{self.session_name});
1048 self.shutdown();
1049 // gracefully shutdown shell processes, shells tend to ignore SIGTERM so we send SIGHUP
1050 // instead
1051 // https://www.gnu.org/software/bash/manual/html_node/Signals.html
1052 // negative pid means kill process and children
1053 std.log.info("sending SIGHUP session={s} pid={d}", .{ self.session_name, self.pid });
1054 posix.kill(-self.pid, posix.SIG.HUP) catch |err| {
1055 std.log.warn("failed to send SIGHUP to pty child err={s}", .{@errorName(err)});
1056 };
1057 std.Thread.sleep(500 * std.time.ns_per_ms);
1058 posix.kill(-self.pid, posix.SIG.KILL) catch |err| {
1059 std.log.warn("failed to send SIGKILL to pty child err={s}", .{@errorName(err)});
1060 };
1061 }
1062
1063 pub fn handleInfo(self: *Daemon, client: *Client) !void {
1064 // zeroes() so asBytes() doesn't ship struct padding + unused cmd/cwd
1065 // tail bytes (daemon stack contents) to clients.
1066 var info = std.mem.zeroes(ipc.Info);
1067 info.clients_len = self.clients.items.len - 1;
1068 info.pid = self.pid;
1069 info.created_at = self.created_at;
1070 info.task_ended_at = self.task_ended_at orelse 0;
1071 info.task_exit_code = self.task_exit_code orelse 0;
1072
1073 // Build command string from args, re-quoting args that contain
1074 // shell-special characters so the displayed command is copy-pasteable.
1075 const cur_cmd = self.command;
1076 if (cur_cmd) |args| {
1077 for (args, 0..) |arg, i| {
1078 const quoted = if (util.shellNeedsQuoting(arg))
1079 util.shellQuote(self.alloc, arg) catch null
1080 else
1081 null;
1082 defer if (quoted) |q| self.alloc.free(q);
1083 const src = quoted orelse arg;
1084
1085 const need = src.len + @as(usize, if (i > 0) 1 else 0);
1086 if (info.cmd_len + need > ipc.MAX_CMD_LEN) {
1087 const ellipsis = "...";
1088 if (info.cmd_len + ellipsis.len <= ipc.MAX_CMD_LEN) {
1089 @memcpy(info.cmd[info.cmd_len..][0..ellipsis.len], ellipsis);
1090 info.cmd_len += ellipsis.len;
1091 }
1092 break;
1093 }
1094
1095 if (i > 0) {
1096 info.cmd[info.cmd_len] = ' ';
1097 info.cmd_len += 1;
1098 }
1099 @memcpy(info.cmd[info.cmd_len..][0..src.len], src);
1100 info.cmd_len += @intCast(src.len);
1101 }
1102 }
1103
1104 info.cwd_len = @intCast(@min(self.cwd.len, ipc.MAX_CWD_LEN));
1105 @memcpy(info.cwd[0..info.cwd_len], self.cwd[0..info.cwd_len]);
1106
1107 try ipc.appendMessage(self.alloc, &client.write_buf, .Info, std.mem.asBytes(&info));
1108 client.has_pending_output = true;
1109 }
1110
1111 pub fn handleHistory(
1112 self: *Daemon,
1113 client: *Client,
1114 term: *ghostty_vt.Terminal,
1115 payload: []const u8,
1116 ) !void {
1117 const format: util.HistoryFormat = if (payload.len > 0)
1118 std.meta.intToEnum(util.HistoryFormat, payload[0]) catch .plain
1119 else
1120 .plain;
1121 if (util.serializeTerminal(self.alloc, term, format)) |output| {
1122 defer self.alloc.free(output);
1123 try ipc.appendMessage(self.alloc, &client.write_buf, .History, output);
1124 client.has_pending_output = true;
1125 } else {
1126 try ipc.appendMessage(self.alloc, &client.write_buf, .History, "");
1127 client.has_pending_output = true;
1128 }
1129 }
1130
1131 pub fn handleRun(self: *Daemon, client: *Client, payload: []const u8) !void {
1132 // Reset task tracking so the new command's exit marker is detected.
1133 // Without this, a second `zmx run` on the same session is ignored
1134 // because task_exit_code is still set from the first run.
1135 self.task_exit_code = null;
1136 self.task_ended_at = null;
1137 self.is_task_mode = true;
1138
1139 if (payload.len == 0) return;
1140
1141 const cmd = payload;
1142
1143 // Chain the exit marker with `;` on the same line. `$?` captures the
1144 // exit code of the command (not the `;`). The sole exception is when
1145 // the command contains a heredoc (`<<`), the delimiter must be alone
1146 // on its line, so the marker goes on the next line instead.
1147 const single_line_marker = "; echo ZMX_TASK_COMPLETED:$?\r";
1148 const heredoc_marker = "\r\necho ZMX_TASK_COMPLETED:$?\r";
1149 const uses_heredoc = std.mem.indexOf(u8, cmd, "<<") != null;
1150
1151 if (cmd.len > 0 and cmd[cmd.len - 1] == '\r') {
1152 self.queuePtyInput(cmd[0 .. cmd.len - 1]);
1153 } else {
1154 self.queuePtyInput(cmd);
1155 }
1156 self.queuePtyInput(if (uses_heredoc) heredoc_marker else single_line_marker);
1157
1158 try ipc.appendMessage(self.alloc, &client.write_buf, .Ack, "");
1159 client.has_pending_output = true;
1160 self.has_had_client = true;
1161 std.log.debug("run command len={d}", .{payload.len});
1162 }
1163
1164 pub fn handleOutput(self: *Daemon, payload: []const u8, vt_stream: anytype) !void {
1165 vt_stream.nextSlice(payload);
1166 self.has_pty_output = true;
1167 for (self.clients.items) |client| {
1168 try ipc.appendMessage(self.alloc, &client.write_buf, .Output, payload);
1169 client.has_pending_output = true;
1170 }
1171 if (self.clients.items.len > 0) {
1172 posix.kill(self.pid, posix.SIG.WINCH) catch |err| {
1173 std.log.warn("failed to send SIGWINCH err={s}", .{@errorName(err)});
1174 };
1175 }
1176 }
1177
1178 pub fn handleWrite(self: *Daemon, client: *Client, payload: []const u8) !void {
1179 // Wire format: [u32 path len][path bytes][file content]
1180 if (payload.len < @sizeOf(u32)) return error.InvalidPayload;
1181 const path_len = std.mem.bytesToValue(u32, payload[0..@sizeOf(u32)]);
1182 if (payload.len < @sizeOf(u32) + path_len) return error.InvalidPayload;
1183 const file_path = payload[@sizeOf(u32)..][0..path_len];
1184 const file_content = payload[@sizeOf(u32) + path_len ..];
1185
1186 // Inject file creation through the PTY so it works over SSH.
1187 // Base64-encode content and pipe through printf | base64 -d > file.
1188 // Chunk large files to stay under command-line length limits.
1189 // 48000 is divisible by 3 (clean base64 boundaries) and encodes
1190 // to ~64KB, well under typical ARG_MAX.
1191 const chunk_size = 48000;
1192 var offset: usize = 0;
1193 var is_first = true;
1194
1195 while (offset < file_content.len or is_first) {
1196 const end = @min(offset + chunk_size, file_content.len);
1197 const chunk = file_content[offset..end];
1198
1199 const encoded_len = std.base64.standard.Encoder.calcSize(chunk.len);
1200 const encoded = try self.alloc.alloc(u8, encoded_len);
1201 defer self.alloc.free(encoded);
1202 _ = std.base64.standard.Encoder.encode(encoded, chunk);
1203
1204 self.queuePtyInput("printf '%s' '");
1205 self.queuePtyInput(encoded);
1206 if (is_first) {
1207 self.queuePtyInput("' | base64 -d > '");
1208 } else {
1209 self.queuePtyInput("' | base64 -d >> '");
1210 }
1211 self.queuePtyInput(file_path);
1212 self.queuePtyInput("'");
1213 self.queuePtyInput("\r");
1214
1215 offset = end;
1216 is_first = false;
1217 }
1218
1219 try ipc.appendMessage(self.alloc, &client.write_buf, .Ack, "");
1220 client.has_pending_output = true;
1221 self.has_had_client = true;
1222 std.log.debug(
1223 "write command len={d} file_path={s}",
1224 .{ file_content.len, file_path },
1225 );
1226 }
1227};
1228
1229fn printVersion(cfg: *Cfg) !void {
1230 var buf: [256]u8 = undefined;
1231 var w = std.fs.File.stdout().writer(&buf);
1232 try w.interface.print(
1233 "zmx\t\t{s}\nghostty_vt\t{s}\nsocket_dir\t{s}\nlog_dir\t\t{s}\n",
1234 .{ version, ghostty_version, cfg.socket_dir, cfg.log_dir },
1235 );
1236 try w.interface.flush();
1237}
1238
1239fn printCompletions(shell: completions.Shell) !void {
1240 const script = shell.getCompletionScript();
1241 var buf: [8192]u8 = undefined;
1242 var w = std.fs.File.stdout().writer(&buf);
1243 try w.interface.print("{s}\n", .{script});
1244 try w.interface.flush();
1245}
1246
1247fn help() !void {
1248 const help_text =
1249 \\zmx - session persistence for terminal processes
1250 \\
1251 \\Usage: zmx <command> [args...]
1252 \\
1253 \\Commands:
1254 \\ [a]ttach <name> [command...] Attach to session, creating if needed
1255 \\ [r]un <name> [-d] [command...] Send command without attaching
1256 \\ [s]end <name> <text...> Send raw input to session PTY
1257 \\ [p]rint <name> <text...> Inject text into session display
1258 \\ [wr]ite <name> <file_path> Write stdin to file_path through the session
1259 \\ [d]etach Detach all clients (ctrl+\\ for current client)
1260 \\ [l]ist|ls [--short] List active sessions
1261 \\ [k]ill <name>... [--force] Kill session and all attached clients
1262 \\ [hi]story <name> [--vt|--html] Output session scrollback
1263 \\ [w]ait <name>... Wait for session tasks to complete
1264 \\ [t]ail <name>... Follow session output
1265 \\ [c]ompletions <shell> Shell completions (bash, zsh, fish, nu)
1266 \\ [v]ersion Show version and metadata (socket dir, log dir)
1267 \\ [h]elp Show this help
1268 \\
1269 \\Attach:
1270 \\ This will spawn a login $SHELL with a PTY. You can provide a
1271 \\ command instead of creating a shell.
1272 \\
1273 \\ Examples:
1274 \\ zmx attach dev
1275 \\ zmx attach dev vim
1276 \\
1277 \\History:
1278 \\ This should generally be used with `tail` to print the last lines
1279 \\ of the session's scrollback history.
1280 \\
1281 \\ Examples:
1282 \\ zmx history <session> | tail -100
1283 \\
1284 \\Run:
1285 \\ Commands run inside a PTY using bash
1286 \\ Commands are passed as-is: do not wrap in quotes.
1287 \\ Commands run sequentially: do not send multiple in parallel.
1288 \\ Stdin is redirected from /dev/null to prevent interactive programs
1289 \\ (pagers, editors, prompts) from blocking. Use `zmx send` for
1290 \\ commands that need user input, or pipe data directly:
1291 \\ echo "data" | zmx run dev cat
1292 \\
1293 \\ `-d` will detach from the calling terminal. Use `wait` to track
1294 \\ its status.
1295 \\
1296 \\ Examples:
1297 \\ zmx run dev ls
1298 \\ zmx run dev zig build
1299 \\ zmx run dev grep -r TODO src
1300 \\ zmx run dev git log --oneline # pager won't block
1301 \\ echo "hello" | zmx run dev cat # piped stdin still works
1302 \\
1303 \\ # heredoc
1304 \\ printf "cat << 'EOF'\r\nHello $USER\r\nToday is $(date).\r\nEOF" | zmx run dev
1305 \\
1306 \\ # non-blocking
1307 \\ zmx run dev -d sleep 10
1308 \\ zmx wait dev
1309 \\
1310 \\Send:
1311 \\ Sends raw text to the session's PTY input (fire-and-forget).
1312 \\ Unlike `run`, no completion marker is appended and no exit code
1313 \\ is tracked. Useful for TUI applications, interactive prompts,
1314 \\ or any program that reads stdin directly.
1315 \\
1316 \\ Text is sent byte-for-byte with no automatic carriage return.
1317 \\ Append \r yourself when you want the shell to execute a command.
1318 \\
1319 \\ Text can also be piped via stdin:
1320 \\ printf 'ls -la\r' | zmx send dev
1321 \\
1322 \\ Examples:
1323 \\ printf 'echo hello\r' | zmx send dev
1324 \\ zmx send dev $(printf '\x03')
1325 \\ zmx send dev /compact
1326 \\
1327 \\Print:
1328 \\ Injects text directly into the session display and scrollback.
1329 \\ Never touches the PTY input -- the shell sees nothing.
1330 \\ Caller is responsible for newlines (\\r\\n).
1331 \\
1332 \\ Examples:
1333 \\ printf '\\r\\nhello\\r\\n' | zmx print dev
1334 \\ zmx print dev "$(printf '\\r\\nalert\\r\\n')"
1335 \\
1336 \\Write:
1337 \\ Writes stdin to file_path inside the session. Works over SSH.
1338 \\ file_path can be absolute or relative to the session shell's cwd.
1339 \\ Requires base64 and printf in the remote environment.
1340 \\ Large files are chunked automatically (~48KB per chunk).
1341 \\ File path must not contain single quotes.
1342 \\
1343 \\ Examples:
1344 \\ echo "hello" | zmx write dev /tmp/hello.txt
1345 \\ cat main.zig | zmx write dev src/main.zig
1346 \\
1347 \\Wait:
1348 \\ Used with a detached run task to track its status. Multiple
1349 \\ sessions can be provided.
1350 \\
1351 \\ Examples:
1352 \\ zmx run -d dev sleep 10
1353 \\ zmx wait dev
1354 \\ zmx wait dev other
1355 \\
1356 \\Environment variables:
1357 \\ SHELL Default shell for new sessions
1358 \\ ZMX_DIR Socket directory (priority 1)
1359 \\ XDG_RUNTIME_DIR Socket directory (priority 2)
1360 \\ TMPDIR Socket directory (priority 3)
1361 \\ ZMX_SESSION Session name (injected automatically)
1362 \\ ZMX_SESSION_PREFIX Prefix added to all session names
1363 \\ ZMX_DIR_MODE Sets mode for socket and log directories (octal, defaults to 0750)
1364 \\ ZMX_LOG_MODE Sets mode for log files (octal, defaults to 0640)
1365 \\
1366 ;
1367 var buf: [8192]u8 = undefined;
1368 var w = std.fs.File.stdout().writer(&buf);
1369 try w.interface.print(help_text, .{});
1370 try w.interface.flush();
1371}
1372
1373fn tail(client_socket_fds: std.ArrayList(i32), detached: bool, is_run_cmd: bool) !u8 {
1374 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1375 defer _ = gpa.deinit();
1376 const alloc = gpa.allocator();
1377
1378 var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(alloc, 4);
1379 defer poll_fds.deinit(alloc);
1380
1381 var read_buf = try ipc.SocketBuffer.init(alloc);
1382 defer read_buf.deinit();
1383
1384 var stdout_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
1385 defer stdout_buf.deinit(alloc);
1386
1387 var is_first_line = true;
1388 var task_complete_code: ?u8 = null;
1389
1390 while (true) {
1391 poll_fds.clearRetainingCapacity();
1392
1393 // Poll socket for read
1394 for (client_socket_fds.items) |client_sock_fd| {
1395 try poll_fds.append(alloc, .{
1396 .fd = client_sock_fd,
1397 .events = posix.POLL.IN,
1398 .revents = 0,
1399 });
1400 }
1401
1402 // Poll for write if we have pending data
1403 if (stdout_buf.items.len > 0) {
1404 try poll_fds.append(alloc, .{
1405 .fd = posix.STDOUT_FILENO,
1406 .events = posix.POLL.OUT,
1407 .revents = 0,
1408 });
1409 }
1410
1411 _ = posix.poll(poll_fds.items, -1) catch |err| {
1412 if (err == error.Interrupted) continue; // EINTR from signal, loop again
1413 return err;
1414 };
1415
1416 // Handle socket read (incoming Output messages from daemon)
1417 for (poll_fds.items) |*poll_fd| {
1418 if (poll_fd.revents & posix.POLL.IN != 0) {
1419 const n = read_buf.read(poll_fd.fd) catch |err| {
1420 if (err == error.WouldBlock) continue;
1421 if (err == error.ConnectionResetByPeer or err == error.BrokenPipe) {
1422 return 1;
1423 }
1424 std.log.err("daemon read err={s}", .{@errorName(err)});
1425 return err;
1426 };
1427 if (n == 0) {
1428 // Server closed connection
1429 return 0;
1430 }
1431
1432 while (read_buf.next()) |msg| {
1433 switch (msg.header.tag) {
1434 .Ack => {
1435 if (detached) {
1436 _ = posix.write(posix.STDOUT_FILENO, "command sent!\n") catch |err| blk: {
1437 if (err == error.WouldBlock) break :blk 0;
1438 return err;
1439 };
1440 return 0;
1441 }
1442 },
1443 .Output => {
1444 if (msg.payload.len > 0) {
1445 // Strip the first line (command echo) for run mode.
1446 var payload = msg.payload;
1447 if (!detached and is_run_cmd and is_first_line) {
1448 if (std.mem.indexOfScalar(u8, payload, '\n')) |nl| {
1449 is_first_line = false;
1450 payload = payload[nl + 1 ..];
1451 } else {
1452 is_first_line = false;
1453 payload = payload[payload.len..]; // consume entire echo line
1454 }
1455 }
1456
1457 if (payload.len > 0) {
1458 // Strip ANSI escape sequences to produce plain text.
1459 // This prevents shell prompts, colors, cursor movements,
1460 // and other VT sequences from corrupting the caller's terminal.
1461 const plain = util.stripAnsi(alloc, payload) catch |err| {
1462 std.log.warn("stripAnsi failed: {s}", .{@errorName(err)});
1463 continue;
1464 };
1465 defer alloc.free(plain);
1466 if (plain.len > 0) {
1467 try stdout_buf.appendSlice(alloc, plain);
1468 }
1469 }
1470 }
1471 },
1472 .TaskComplete => {
1473 task_complete_code = if (msg.payload.len > 0) msg.payload[0] else 0;
1474 },
1475 else => {},
1476 }
1477 }
1478 }
1479 }
1480
1481 if (stdout_buf.items.len > 0) {
1482 const n = posix.write(posix.STDOUT_FILENO, stdout_buf.items) catch |err| blk: {
1483 if (err == error.WouldBlock) break :blk 0;
1484 return err;
1485 };
1486 if (task_complete_code) |exit_code| {
1487 return exit_code;
1488 }
1489 if (n > 0) {
1490 try stdout_buf.replaceRange(alloc, 0, n, &[_]u8{});
1491 }
1492 }
1493
1494 // Check for HUP/ERR on any socket
1495 for (poll_fds.items) |poll_fd| {
1496 if (poll_fd.revents & (posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
1497 return 0;
1498 }
1499 }
1500 }
1501}
1502
1503fn wait(cfg: *Cfg, matchers: std.ArrayList(SessionMatch)) !void {
1504 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1505 defer _ = gpa.deinit();
1506 const alloc = gpa.allocator();
1507
1508 var stdout_buffer: [1024]u8 = undefined;
1509 var stdout_writer = std.fs.File.stdout().writer(&stdout_buffer);
1510 const stdout = &stdout_writer.interface;
1511
1512 var stderr_buffer: [1024]u8 = undefined;
1513 var stderr_writer = std.fs.File.stderr().writer(&stderr_buffer);
1514 const stderr = &stderr_writer.interface;
1515
1516 // Highest match count seen so far. Lets us distinguish "sessions haven't
1517 // appeared yet" (keep polling) from "sessions we were tracking
1518 // disappeared" (fail -- daemon crashed or was killed).
1519 var max_seen: i32 = 0;
1520 var zero_match_iters: u32 = 0;
1521
1522 var agg_exit_code: u8 = 0;
1523 var last_print: i64 = 0;
1524 var prev_done: i32 = 0;
1525 while (true) {
1526 agg_exit_code = 0;
1527 var sessions = try util.get_session_entries(alloc, cfg.socket_dir);
1528 var total: i32 = 0;
1529 var done: i32 = 0;
1530
1531 for (sessions.items) |session| {
1532 var found = false;
1533 for (matchers.items) |m| {
1534 if (m.matches(session.name)) {
1535 found = true;
1536 break;
1537 }
1538 }
1539 if (!found) {
1540 continue;
1541 }
1542
1543 total += 1;
1544 if (session.is_error) {
1545 // Daemon unreachable (probe timed out). On Timeout the socket
1546 // is no longer deleted, so this session would otherwise
1547 // persist as task_ended_at==0 forever → infinite "still
1548 // waiting". Count it as done+failed so wait terminates.
1549 try stderr.print(
1550 "[{d}] task unreachable: {s} ({s})\n",
1551 .{ std.time.timestamp(), session.name, session.error_name orelse "unknown" },
1552 );
1553 try stderr.flush();
1554 agg_exit_code = 1;
1555 done += 1;
1556 continue;
1557 }
1558 if (session.task_ended_at == 0) {
1559 const now = std.time.timestamp();
1560 if (now - last_print >= 5) {
1561 try stdout.print(
1562 "[{d}] waiting task={s}\n",
1563 .{ now, session.name },
1564 );
1565 try stdout.flush();
1566 last_print = now;
1567 }
1568 continue;
1569 }
1570 if (done >= prev_done) {
1571 // Newly completed — print immediately
1572 try stdout.print(
1573 "[{d}] completed task={s} exit_code={d}\n",
1574 .{ session.task_ended_at.?, session.name, session.task_exit_code.? },
1575 );
1576 try stdout.flush();
1577 }
1578 if (session.task_exit_code != 0) {
1579 agg_exit_code = session.task_exit_code orelse 0;
1580 }
1581 done += 1;
1582 }
1583
1584 for (sessions.items) |session| {
1585 session.deinit(alloc);
1586 }
1587 sessions.deinit(alloc);
1588
1589 // Check disappearance BEFORE completion: if one of N sessions
1590 // crashed and the remaining N-1 happen to be done, total==done
1591 // would be a false success.
1592 if (total < max_seen) {
1593 try stderr.print(
1594 "error: {d} session(s) disappeared before completing\n",
1595 .{max_seen - total},
1596 );
1597 try stderr.flush();
1598 std.process.exit(1);
1599 return;
1600 }
1601 max_seen = total;
1602
1603 if (total > 0 and total == done) {
1604 break;
1605 }
1606
1607 if (max_seen == 0) {
1608 // `zmx run foo && zmx wait foo` is essentially sequential, so
1609 // matching sessions should be visible from the first poll. If
1610 // nothing appears after a few iterations it's almost certainly a
1611 // typo, not a slow start.
1612 zero_match_iters += 1;
1613 if (zero_match_iters >= 3) {
1614 try stderr.print("error: no matching sessions found\n", .{});
1615 try stderr.flush();
1616 std.process.exit(2);
1617 return;
1618 }
1619 }
1620
1621 prev_done = done;
1622 std.Thread.sleep(1000 * std.time.ns_per_ms);
1623 }
1624
1625 if (agg_exit_code == 0) {
1626 try stdout.print("task(s) completed!\n", .{});
1627 } else {
1628 try stdout.print("task(s) failed!\n", .{});
1629 }
1630 try stdout.flush();
1631
1632 const sessions = try util.get_session_entries(alloc, cfg.socket_dir);
1633 for (sessions.items) |session| {
1634 var found = false;
1635 for (matchers.items) |m| {
1636 if (m.matches(session.name)) {
1637 found = true;
1638 break;
1639 }
1640 }
1641 if (!found) {
1642 continue;
1643 }
1644 if (session.task_exit_code.? > 0) {
1645 try stdout.print("---\n", .{});
1646 try stdout.print("[{d}] failed task={s} exit_status={d}\n", .{
1647 session.task_ended_at.?,
1648 session.name,
1649 session.task_exit_code.?,
1650 });
1651
1652 // Fetch and print the last 20 lines of history for debugging
1653 const history_lines: usize = 20;
1654 const history_text = fetchHistory(alloc, cfg, session.name) catch null;
1655 if (history_text) |text| {
1656 defer alloc.free(text);
1657 try stdout.print("\nLast {d} lines of {s} history:\n", .{ history_lines, session.name });
1658
1659 // Count lines and find the start of the last N lines
1660 var total_lines: usize = 0;
1661 var it = std.mem.splitScalar(u8, text, '\n');
1662 while (it.next()) |_| {
1663 total_lines += 1;
1664 }
1665
1666 const skip = if (total_lines > history_lines) total_lines - history_lines else 0;
1667 var current: usize = 0;
1668 it = std.mem.splitScalar(u8, text, '\n');
1669 while (it.next()) |line| {
1670 if (current >= skip) {
1671 try stdout.print("{s}\n", .{line});
1672 }
1673 current += 1;
1674 }
1675 }
1676
1677 try stdout.print("\nSee the logs:\nzmx history {s}\nzmx attach {s}\n", .{ session.name, session.name });
1678 try stdout.flush();
1679 }
1680 }
1681
1682 std.process.exit(agg_exit_code);
1683}
1684
1685fn list(cfg: *Cfg, short: bool) !void {
1686 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1687 defer _ = gpa.deinit();
1688 const alloc = gpa.allocator();
1689
1690 const current_session = socket.getSeshNameFromEnv();
1691 var buf: [4096]u8 = undefined;
1692 var stdout = std.fs.File.stdout().writer(&buf);
1693
1694 var sessions = try util.get_session_entries(alloc, cfg.socket_dir);
1695 defer {
1696 for (sessions.items) |session| {
1697 session.deinit(alloc);
1698 }
1699 sessions.deinit(alloc);
1700 }
1701
1702 if (sessions.items.len == 0) {
1703 if (short) return;
1704 var errbuf: [4096]u8 = undefined;
1705 var stderr = std.fs.File.stderr().writer(&errbuf);
1706 try stderr.interface.print("no sessions found in {s}\n", .{cfg.socket_dir});
1707 try stderr.interface.flush();
1708 return;
1709 }
1710
1711 std.mem.sort(util.SessionEntry, sessions.items, {}, util.SessionEntry.lessThan);
1712
1713 for (sessions.items) |session| {
1714 try util.writeSessionLine(&stdout.interface, session, short, current_session);
1715 try stdout.interface.flush();
1716 }
1717}
1718
1719fn detachAll(cfg: *Cfg) !void {
1720 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1721 defer _ = gpa.deinit();
1722 const alloc = gpa.allocator();
1723 const session_name = socket.getSeshNameFromEnv();
1724 if (session_name.len == 0) {
1725 std.log.err("ZMX_SESSION env var not found: are you inside a zmx session?", .{});
1726 return;
1727 }
1728
1729 var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
1730 defer dir.close();
1731
1732 const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, session_name) catch |err| switch (err) {
1733 error.NameTooLong => return socket.printSessionNameTooLong(session_name, cfg.socket_dir),
1734 error.OutOfMemory => return err,
1735 };
1736 defer alloc.free(socket_path);
1737 const fd = ipc.connectSession(socket_path) catch |err| {
1738 std.log.err("session unresponsive: {s}", .{@errorName(err)});
1739 if (err == error.ConnectionRefused) socket.cleanupStaleSocket(dir, session_name);
1740 return;
1741 };
1742 defer posix.close(fd);
1743 ipc.send(fd, .DetachAll, "") catch |err| switch (err) {
1744 error.BrokenPipe, error.ConnectionResetByPeer => return,
1745 else => return err,
1746 };
1747}
1748
1749fn kill(cfg: *Cfg, session_name: []const u8, force: bool) !void {
1750 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1751 defer _ = gpa.deinit();
1752 const alloc = gpa.allocator();
1753
1754 const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, session_name) catch |err| switch (err) {
1755 error.NameTooLong => return socket.printSessionNameTooLong(session_name, cfg.socket_dir),
1756 error.OutOfMemory => return err,
1757 };
1758 defer alloc.free(socket_path);
1759
1760 var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
1761 defer dir.close();
1762
1763 const exists = try socket.sessionExists(dir, session_name);
1764 if (!exists) {
1765 var buf: [4096]u8 = undefined;
1766 var w = std.fs.File.stderr().writer(&buf);
1767 w.interface.print("error: session \"{s}\" does not exist\n", .{session_name}) catch {};
1768 w.interface.flush() catch {};
1769 return error.SessionNotFound;
1770 }
1771 const fd = ipc.connectSession(socket_path) catch |err| {
1772 std.log.err("session unresponsive: {s}", .{@errorName(err)});
1773 var buf: [4096]u8 = undefined;
1774 var w = std.fs.File.stdout().writer(&buf);
1775 if (force or err == error.ConnectionRefused) {
1776 socket.cleanupStaleSocket(dir, session_name);
1777 w.interface.print("cleaned up stale session {s}\n", .{session_name}) catch {};
1778 } else {
1779 w.interface.print(
1780 "session {s} is unresponsive ({s})\ndaemon may be busy: try again, add `--force` flag, or kill the process directly\n",
1781 .{ session_name, @errorName(err) },
1782 ) catch {};
1783 }
1784 w.interface.flush() catch {};
1785 return;
1786 };
1787
1788 defer posix.close(fd);
1789 ipc.send(fd, .Kill, "") catch |err| switch (err) {
1790 error.BrokenPipe, error.ConnectionResetByPeer => return,
1791 else => return err,
1792 };
1793
1794 var buf: [100]u8 = undefined;
1795 var w = std.fs.File.stdout().writer(&buf);
1796 try w.interface.print("killed session {s}\n", .{session_name});
1797 try w.interface.flush();
1798}
1799
1800/// Fetch terminal history from a session socket, returning it as an allocated
1801/// string. Caller owns the returned memory and must free it.
1802fn fetchHistory(
1803 alloc: std.mem.Allocator,
1804 cfg: *Cfg,
1805 session_name: []const u8,
1806) ![]const u8 {
1807 const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, session_name) catch |err| switch (err) {
1808 error.NameTooLong => {
1809 socket.printSessionNameTooLong(session_name, cfg.socket_dir);
1810 return error.NameTooLong;
1811 },
1812 error.OutOfMemory => return err,
1813 };
1814 defer alloc.free(socket_path);
1815
1816 var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
1817 defer dir.close();
1818
1819 const exists = try socket.sessionExists(dir, session_name);
1820 if (!exists) {
1821 return error.SessionNotFound;
1822 }
1823
1824 const fd = ipc.connectSession(socket_path) catch |err| {
1825 if (err == error.ConnectionRefused) socket.cleanupStaleSocket(dir, session_name);
1826 return err;
1827 };
1828 defer posix.close(fd);
1829
1830 const format_byte: u8 = @intFromEnum(util.HistoryFormat.plain);
1831 const payload = [_]u8{format_byte};
1832 ipc.send(fd, .History, &payload) catch |err| switch (err) {
1833 error.BrokenPipe, error.ConnectionResetByPeer => return error.SessionUnresponsive,
1834 else => return err,
1835 };
1836
1837 var sb = try ipc.SocketBuffer.init(alloc);
1838 defer sb.deinit();
1839
1840 var result = std.ArrayList(u8).initCapacity(alloc, 4096) catch return error.OutOfMemory;
1841 errdefer result.deinit(alloc);
1842
1843 while (true) {
1844 var poll_fds = [_]posix.pollfd{.{ .fd = fd, .events = posix.POLL.IN, .revents = 0 }};
1845 const poll_result = posix.poll(&poll_fds, 5000) catch return error.Timeout;
1846 if (poll_result == 0) {
1847 return error.Timeout;
1848 }
1849
1850 const n = sb.read(fd) catch return error.ReadFailed;
1851 if (n == 0) break;
1852
1853 while (sb.next()) |msg| {
1854 if (msg.header.tag == .History) {
1855 try result.appendSlice(alloc, msg.payload);
1856 return result.toOwnedSlice(alloc);
1857 }
1858 }
1859 }
1860
1861 return error.NoHistoryResponse;
1862}
1863
1864fn history(cfg: *Cfg, session_name: []const u8, format: util.HistoryFormat) !void {
1865 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1866 defer _ = gpa.deinit();
1867 const alloc = gpa.allocator();
1868
1869 const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, session_name) catch |err| switch (err) {
1870 error.NameTooLong => return socket.printSessionNameTooLong(session_name, cfg.socket_dir),
1871 error.OutOfMemory => return err,
1872 };
1873 defer alloc.free(socket_path);
1874
1875 var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
1876 defer dir.close();
1877
1878 const exists = try socket.sessionExists(dir, session_name);
1879 if (!exists) {
1880 var buf: [4096]u8 = undefined;
1881 var w = std.fs.File.stderr().writer(&buf);
1882 w.interface.print("error: session \"{s}\" does not exist\n", .{session_name}) catch {};
1883 w.interface.flush() catch {};
1884 return error.SessionNotFound;
1885 }
1886 const fd = ipc.connectSession(socket_path) catch |err| {
1887 std.log.err("session unresponsive: {s}", .{@errorName(err)});
1888 if (err == error.ConnectionRefused) socket.cleanupStaleSocket(dir, session_name);
1889 return;
1890 };
1891 defer posix.close(fd);
1892
1893 const format_byte = [_]u8{@intFromEnum(format)};
1894 ipc.send(fd, .History, &format_byte) catch |err| switch (err) {
1895 error.BrokenPipe, error.ConnectionResetByPeer => return,
1896 else => return err,
1897 };
1898
1899 var sb = try ipc.SocketBuffer.init(alloc);
1900 defer sb.deinit();
1901
1902 while (true) {
1903 var poll_fds = [_]posix.pollfd{.{ .fd = fd, .events = posix.POLL.IN, .revents = 0 }};
1904 const poll_result = posix.poll(&poll_fds, 5000) catch return;
1905 if (poll_result == 0) {
1906 std.log.err("timeout waiting for history response", .{});
1907 return;
1908 }
1909
1910 const n = sb.read(fd) catch return;
1911 if (n == 0) return;
1912
1913 while (sb.next()) |msg| {
1914 if (msg.header.tag == .History) {
1915 _ = posix.write(posix.STDOUT_FILENO, msg.payload) catch return;
1916 return;
1917 }
1918 }
1919 }
1920}
1921
1922fn switchSesh(daemon: *Daemon, current_sesh: []const u8) !void {
1923 // we want daemon.session_name because that's the session name the user provided during zmx attach
1924 // instead of the name of the session they are currently inside of.
1925 const next_session = daemon.session_name;
1926
1927 const socket_path = socket.getSocketPath(daemon.alloc, daemon.cfg.socket_dir, current_sesh) catch |err| switch (err) {
1928 error.NameTooLong => return socket.printSessionNameTooLong(current_sesh, daemon.cfg.socket_dir),
1929 error.OutOfMemory => return err,
1930 };
1931 defer daemon.alloc.free(socket_path);
1932
1933 var dir = try std.fs.openDirAbsolute(daemon.cfg.socket_dir, .{});
1934 defer dir.close();
1935
1936 const exists = try socket.sessionExists(dir, current_sesh);
1937 if (!exists) {
1938 var buf: [4096]u8 = undefined;
1939 var w = std.fs.File.stderr().writer(&buf);
1940 w.interface.print("error: session \"{s}\" does not exist\n", .{current_sesh}) catch {};
1941 w.interface.flush() catch {};
1942 return error.SessionNotFound;
1943 }
1944 const fd = ipc.connectSession(socket_path) catch |err| {
1945 std.log.err("session unresponsive: {s}", .{@errorName(err)});
1946 if (err == error.ConnectionRefused) socket.cleanupStaleSocket(dir, current_sesh);
1947 return;
1948 };
1949 defer posix.close(fd);
1950
1951 ipc.send(fd, .Switch, next_session) catch |err| switch (err) {
1952 error.BrokenPipe, error.ConnectionResetByPeer => return,
1953 else => return err,
1954 };
1955}
1956
1957fn attach(daemon: *Daemon) !void {
1958 const sesh = socket.getSeshNameFromEnv();
1959 if (sesh.len > 0) {
1960 return switchSesh(daemon, sesh);
1961 }
1962
1963 const result = try daemon.ensureSession();
1964 if (result.is_daemon) return;
1965
1966 const client_sock = try socket.sessionConnect(daemon.socket_path);
1967 std.log.info("attached session={s}", .{daemon.session_name});
1968 // This is typically used with tcsetattr() to modify terminal settings.
1969 // - you first get the current settings with tcgetattr()
1970 // - modify the desired attributes in the termios structure
1971 // - then apply the changes with tcsetattr().
1972 // This prevents unintended side effects by preserving other settings.
1973 // restore stdin fd to its original state after exiting.
1974 // Use TCSAFLUSH to discard any unread input, preventing stale input after detach.
1975 //
1976 // tcgetattr fails when stdin is not a TTY (e.g. piped). In that case,
1977 // skip terminal setup entirely rather than applying undefined stack bytes
1978 // via tcsetattr.
1979 var orig_termios: cross.c.termios = undefined;
1980 const stdin_is_tty = cross.c.tcgetattr(posix.STDIN_FILENO, &orig_termios) == 0;
1981
1982 defer {
1983 if (stdin_is_tty) {
1984 _ = cross.c.tcsetattr(posix.STDIN_FILENO, cross.c.TCSAFLUSH, &orig_termios);
1985 }
1986 // Reset terminal modes on detach:
1987 const restore_seq = "\x1bc";
1988 _ = posix.write(posix.STDOUT_FILENO, restore_seq) catch {};
1989 }
1990
1991 if (stdin_is_tty) {
1992 var raw_termios = orig_termios;
1993 // set raw mode after successful connection.
1994 // disables canonical mode (line buffering), input echoing, signal generation from
1995 // control characters (like Ctrl+C), and flow control.
1996 cross.c.cfmakeraw(&raw_termios);
1997
1998 // Additional granular raw mode settings for precise control
1999 // (matches what abduco and shpool do)
2000 raw_termios.c_cc[cross.c.VLNEXT] = cross.c._POSIX_VDISABLE; // Disable literal-next (Ctrl-V)
2001 // We want to intercept Ctrl+\ (SIGQUIT) so we can use it as a detach key
2002 raw_termios.c_cc[cross.c.VQUIT] = cross.c._POSIX_VDISABLE; // Disable SIGQUIT (Ctrl+\)
2003 raw_termios.c_cc[cross.c.VMIN] = 1; // Minimum chars to read: return after 1 byte
2004 raw_termios.c_cc[cross.c.VTIME] = 0; // Read timeout: no timeout, return immediately
2005
2006 _ = cross.c.tcsetattr(posix.STDIN_FILENO, cross.c.TCSANOW, &raw_termios);
2007 }
2008
2009 // Clear screen before attaching. This provides a clean slate before
2010 // the session restore.
2011 const clear_seq = "\x1b[2J\x1b[H";
2012 _ = try posix.write(posix.STDOUT_FILENO, clear_seq);
2013
2014 const looper = try clientLoop(client_sock);
2015 switch (looper.kind) {
2016 .detach => return,
2017 .switch_session => {
2018 if (looper.session_name) |session_name| {
2019 var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
2020 const cwd = std.posix.getcwd(&cwd_buf) catch "";
2021 const target_path = socket.getSocketPath(
2022 daemon.alloc,
2023 daemon.cfg.socket_dir,
2024 session_name,
2025 ) catch |err| switch (err) {
2026 error.NameTooLong => return socket.printSessionNameTooLong(
2027 session_name,
2028 daemon.cfg.socket_dir,
2029 ),
2030 error.OutOfMemory => return err,
2031 };
2032
2033 const clients = try std.ArrayList(*Client).initCapacity(daemon.alloc, 10);
2034 var target_daemon = Daemon{
2035 .running = true,
2036 .cfg = daemon.cfg,
2037 .alloc = daemon.alloc,
2038 .clients = clients,
2039 .session_name = session_name,
2040 .socket_path = target_path,
2041 .pid = undefined,
2042 .cwd = cwd,
2043 .created_at = @intCast(std.time.timestamp()),
2044 .leader_client_fd = null,
2045 };
2046 return attach(&target_daemon);
2047 }
2048 },
2049 }
2050}
2051
2052fn writeFile(daemon: *Daemon, file_path: []const u8) !void {
2053 var buf: [4096]u8 = undefined;
2054 var w = std.fs.File.stdout().writer(&buf);
2055 const sesh_result = try daemon.ensureSession();
2056 if (sesh_result.is_daemon) return;
2057
2058 if (sesh_result.created) {
2059 try w.interface.print("session \"{s}\" created\n", .{daemon.session_name});
2060 try w.interface.flush();
2061 }
2062 const stdin_fd = posix.STDIN_FILENO;
2063 var stdin_buf = try std.ArrayList(u8).initCapacity(daemon.alloc, 4096);
2064 defer stdin_buf.deinit(daemon.alloc);
2065
2066 while (true) {
2067 var tmp: [4096]u8 = undefined;
2068 const n = posix.read(stdin_fd, &tmp) catch |err| {
2069 if (err == error.WouldBlock) break;
2070 return err;
2071 };
2072 if (n == 0) break;
2073 try stdin_buf.appendSlice(daemon.alloc, tmp[0..n]);
2074 }
2075
2076 const socket_path = socket.getSocketPath(
2077 daemon.alloc,
2078 daemon.cfg.socket_dir,
2079 daemon.session_name,
2080 ) catch |err| switch (err) {
2081 error.NameTooLong => return socket.printSessionNameTooLong(
2082 daemon.session_name,
2083 daemon.cfg.socket_dir,
2084 ),
2085 error.OutOfMemory => return err,
2086 };
2087 var dir = try std.fs.openDirAbsolute(daemon.cfg.socket_dir, .{});
2088 defer dir.close();
2089
2090 const result = ipc.probeSession(daemon.alloc, socket_path) catch |err| {
2091 std.log.err("session unresponsive: {s}", .{@errorName(err)});
2092 if (err == error.ConnectionRefused) {
2093 socket.cleanupStaleSocket(dir, daemon.session_name);
2094 w.interface.print("cleaned up stale session {s}\n", .{daemon.session_name}) catch {};
2095 } else {
2096 w.interface.print(
2097 "session {s} is unresponsive ({s})\ndaemon may be busy: try again\n",
2098 .{ daemon.session_name, @errorName(err) },
2099 ) catch {};
2100 }
2101 w.interface.flush() catch {};
2102 return;
2103 };
2104
2105 defer posix.close(result.fd);
2106
2107 // Build wire payload: [u32 path len][path bytes][file content]
2108 var wire_buf = try std.ArrayList(u8).initCapacity(
2109 daemon.alloc,
2110 @sizeOf(u32) + file_path.len + stdin_buf.items.len,
2111 );
2112 defer wire_buf.deinit(daemon.alloc);
2113 const path_len: u32 = @intCast(file_path.len);
2114 try wire_buf.appendSlice(daemon.alloc, std.mem.asBytes(&path_len));
2115 try wire_buf.appendSlice(daemon.alloc, file_path);
2116 try wire_buf.appendSlice(daemon.alloc, stdin_buf.items);
2117
2118 ipc.send(result.fd, .Write, wire_buf.items) catch |err| switch (err) {
2119 error.BrokenPipe, error.ConnectionResetByPeer => return,
2120 else => return err,
2121 };
2122
2123 var sb = try ipc.SocketBuffer.init(daemon.alloc);
2124 defer sb.deinit();
2125
2126 const n = sb.read(result.fd) catch return error.ReadFailed;
2127 if (n == 0) return error.ConnectionClosed;
2128
2129 while (sb.next()) |msg| {
2130 if (msg.header.tag == .Ack) {
2131 try w.interface.print("file created {s}\n", .{file_path});
2132 try w.interface.flush();
2133 return;
2134 }
2135 }
2136
2137 return error.NoAckReceived;
2138}
2139
2140fn send(cfg: *Cfg, session_name: []const u8, socket_path: []const u8, text_parts: [][]const u8, tag: ipc.Tag) !void {
2141 const alloc = std.heap.c_allocator;
2142 var buf: [4096]u8 = undefined;
2143 var w = std.fs.File.stdout().writer(&buf);
2144
2145 var payload = std.ArrayList(u8).empty;
2146 defer payload.deinit(alloc);
2147
2148 if (text_parts.len > 0) {
2149 for (text_parts, 0..) |part, i| {
2150 if (i > 0) try payload.append(alloc, ' ');
2151 try payload.appendSlice(alloc, part);
2152 }
2153 } else {
2154 // Read from stdin when no text arguments provided.
2155 const stdin_fd = posix.STDIN_FILENO;
2156 if (!std.posix.isatty(stdin_fd)) {
2157 while (true) {
2158 var tmp: [4096]u8 = undefined;
2159 const n = posix.read(stdin_fd, &tmp) catch |err| {
2160 if (err == error.WouldBlock) break;
2161 return err;
2162 };
2163 if (n == 0) break;
2164 try payload.appendSlice(alloc, tmp[0..n]);
2165 }
2166 // Strip trailing newline from piped input; the caller is
2167 // responsible for including \r when submission is desired.
2168 // For .Output the caller controls exact bytes, so don't strip.
2169 if (tag != .Output and payload.items.len > 0 and payload.items[payload.items.len - 1] == '\n') {
2170 _ = payload.pop();
2171 }
2172 }
2173 }
2174
2175 if (payload.items.len == 0) return error.TextRequired;
2176
2177 var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
2178 defer dir.close();
2179
2180 const probe_result = ipc.probeSession(alloc, socket_path) catch |err| {
2181 std.log.err("session unresponsive: {s}", .{@errorName(err)});
2182 if (err == error.ConnectionRefused) {
2183 socket.cleanupStaleSocket(dir, session_name);
2184 try w.interface.print("cleaned up stale session {s}\n", .{session_name});
2185 } else {
2186 try w.interface.print(
2187 "session {s} is unresponsive ({s})\ndaemon may be busy: try again\n",
2188 .{ session_name, @errorName(err) },
2189 );
2190 }
2191 try w.interface.flush();
2192 return;
2193 };
2194 defer posix.close(probe_result.fd);
2195
2196 ipc.send(probe_result.fd, tag, payload.items) catch |err| switch (err) {
2197 error.ConnectionResetByPeer, error.BrokenPipe => return,
2198 else => return err,
2199 };
2200}
2201
2202fn run(daemon: *Daemon, detached: bool, command_args: [][]const u8) !void {
2203 const alloc = daemon.alloc;
2204 var buf: [4096]u8 = undefined;
2205 var w = std.fs.File.stdout().writer(&buf);
2206
2207 var cmd_to_send: ?[]const u8 = null;
2208 var allocated_cmd: ?[]u8 = null;
2209 defer if (allocated_cmd) |cmd| alloc.free(cmd);
2210
2211 const result = try daemon.ensureSession();
2212 if (result.is_daemon) return;
2213
2214 if (result.created) {
2215 try w.interface.print("session \"{s}\" created\n", .{daemon.session_name});
2216 try w.interface.flush();
2217 }
2218
2219 if (command_args.len > 0) {
2220 var cmd_list = std.ArrayList(u8).empty;
2221 defer cmd_list.deinit(alloc);
2222
2223 for (command_args, 0..) |arg, i| {
2224 if (i > 0) try cmd_list.append(alloc, ' ');
2225 if (util.shellNeedsQuoting(arg)) {
2226 const quoted = try util.shellQuote(alloc, arg);
2227 defer alloc.free(quoted);
2228 try cmd_list.appendSlice(alloc, quoted);
2229 } else {
2230 try cmd_list.appendSlice(alloc, arg);
2231 }
2232 }
2233
2234 // \r, not \n: once the shell is at the readline prompt the PTY is in
2235 // raw mode; readline's accept-line binds to CR. The first-ever run
2236 // works with \n only because it arrives during shell startup while
2237 // the line discipline is still canonical.
2238 try cmd_list.append(alloc, '\r');
2239
2240 cmd_to_send = try cmd_list.toOwnedSlice(alloc);
2241 allocated_cmd = @constCast(cmd_to_send.?);
2242 } else {
2243 const stdin_fd = posix.STDIN_FILENO;
2244 if (!std.posix.isatty(stdin_fd)) {
2245 var stdin_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
2246 defer stdin_buf.deinit(alloc);
2247
2248 while (true) {
2249 var tmp: [4096]u8 = undefined;
2250 const n = posix.read(stdin_fd, &tmp) catch |err| {
2251 if (err == error.WouldBlock) break;
2252 return err;
2253 };
2254 if (n == 0) break;
2255 try stdin_buf.appendSlice(alloc, tmp[0..n]);
2256 }
2257
2258 if (stdin_buf.items.len > 0) {
2259 // Normalize any trailing newline to CR so readline (raw mode)
2260 // accepts each line.
2261 if (stdin_buf.items[stdin_buf.items.len - 1] == '\n') {
2262 stdin_buf.items[stdin_buf.items.len - 1] = '\r';
2263 } else {
2264 try stdin_buf.append(alloc, '\r');
2265 }
2266
2267 cmd_to_send = try alloc.dupe(u8, stdin_buf.items);
2268 allocated_cmd = @constCast(cmd_to_send.?);
2269 }
2270 }
2271 }
2272
2273 if (cmd_to_send == null) {
2274 return error.CommandRequired;
2275 }
2276
2277 const client_sock = ipc.connectSession(daemon.socket_path) catch |err| {
2278 std.log.err("session not ready: {s}", .{@errorName(err)});
2279 return error.SessionNotReady;
2280 };
2281 defer posix.close(client_sock);
2282
2283 var fds = try std.ArrayList(i32).initCapacity(alloc, 1);
2284 defer fds.deinit(alloc);
2285 try fds.append(alloc, client_sock);
2286
2287 ipc.send(client_sock, .Run, cmd_to_send.?) catch |err| switch (err) {
2288 error.ConnectionResetByPeer, error.BrokenPipe => return,
2289 else => return err,
2290 };
2291
2292 const exit_code = try tail(fds, detached, true);
2293 posix.exit(exit_code);
2294}
2295
2296const ClientResult = struct {
2297 kind: enum {
2298 detach,
2299 switch_session,
2300 },
2301 session_name: ?[]const u8,
2302};
2303
2304/// clientLoop sends ipc commands to its corresponding daemon. It uses poll() as its non-blocking
2305/// mechanism. It will send stdin to the daemon and receive stdout from the daemon.
2306fn clientLoop(client_sock_fd: i32) !ClientResult {
2307 // use c_allocator to avoid "reached unreachable code" panic in DebugAllocator when forking
2308 const alloc = std.heap.c_allocator;
2309 defer posix.close(client_sock_fd);
2310
2311 try openSignalPipe();
2312 installWakeHandler(posix.SIG.WINCH);
2313
2314 // Make socket non-blocking to avoid blocking on writes
2315 var sock_flags = try posix.fcntl(client_sock_fd, posix.F.GETFL, 0);
2316 sock_flags |= O_NONBLOCK;
2317 _ = try posix.fcntl(client_sock_fd, posix.F.SETFL, sock_flags);
2318
2319 // Buffer for outgoing socket writes
2320 var sock_write_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
2321 defer sock_write_buf.deinit(alloc);
2322
2323 // Send init message with terminal size (buffered)
2324 const size = ipc.getTerminalSize(posix.STDOUT_FILENO);
2325 try ipc.appendMessage(alloc, &sock_write_buf, .Init, std.mem.asBytes(&size));
2326
2327 var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(alloc, 4);
2328 defer poll_fds.deinit(alloc);
2329
2330 var read_buf = try ipc.SocketBuffer.init(alloc);
2331 defer read_buf.deinit();
2332
2333 var stdout_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
2334 defer stdout_buf.deinit(alloc);
2335
2336 const stdin_fd = posix.STDIN_FILENO;
2337
2338 // Make stdin non-blocking. O_NONBLOCK is set on the open file description,
2339 // which is shared with the parent shell; restore on exit to avoid
2340 // corrupting the parent's stdin.
2341 const stdin_orig_flags = try posix.fcntl(stdin_fd, posix.F.GETFL, 0);
2342 _ = try posix.fcntl(stdin_fd, posix.F.SETFL, stdin_orig_flags | O_NONBLOCK);
2343 defer _ = posix.fcntl(stdin_fd, posix.F.SETFL, stdin_orig_flags) catch {};
2344
2345 while (true) {
2346 poll_fds.clearRetainingCapacity();
2347
2348 try poll_fds.append(alloc, .{
2349 .fd = stdin_fd,
2350 .events = posix.POLL.IN,
2351 .revents = 0,
2352 });
2353
2354 // Poll socket for read, and also for write if we have pending data
2355 var sock_events: i16 = posix.POLL.IN;
2356 if (sock_write_buf.items.len > 0) {
2357 sock_events |= posix.POLL.OUT;
2358 }
2359 try poll_fds.append(alloc, .{
2360 .fd = client_sock_fd,
2361 .events = sock_events,
2362 .revents = 0,
2363 });
2364
2365 try poll_fds.append(alloc, .{ .fd = sig_pipe[0], .events = posix.POLL.IN, .revents = 0 });
2366
2367 if (stdout_buf.items.len > 0) {
2368 try poll_fds.append(alloc, .{
2369 .fd = posix.STDOUT_FILENO,
2370 .events = posix.POLL.OUT,
2371 .revents = 0,
2372 });
2373 }
2374
2375 _ = try posix.poll(poll_fds.items, -1);
2376
2377 if (poll_fds.items[2].revents & posix.POLL.IN != 0) {
2378 drainSignalPipe();
2379 const next_size = ipc.getTerminalSize(posix.STDOUT_FILENO);
2380 try ipc.appendMessage(alloc, &sock_write_buf, .Resize, std.mem.asBytes(&next_size));
2381 }
2382
2383 // Handle stdin -> socket (Input)
2384 const inp_flags = (posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL);
2385 if (poll_fds.items[0].revents & inp_flags != 0) {
2386 var buf: [4096]u8 = undefined;
2387 const n_opt: ?usize = posix.read(stdin_fd, &buf) catch |err| blk: {
2388 if (err == error.WouldBlock) break :blk null;
2389 return err;
2390 };
2391
2392 if (n_opt) |n| {
2393 if (n > 0) {
2394 // Check for detach sequences (ctrl+\ as first byte or Kitty escape sequence)
2395 if (util.isCtrlBackslash(buf[0..n])) {
2396 try ipc.appendMessage(alloc, &sock_write_buf, .Detach, "");
2397 } else {
2398 try ipc.appendMessage(alloc, &sock_write_buf, .Input, buf[0..n]);
2399 }
2400 } else {
2401 // EOF on stdin
2402 return ClientResult{ .kind = .detach, .session_name = null };
2403 }
2404 }
2405 }
2406
2407 // Handle socket read (incoming Output messages from daemon)
2408 if (poll_fds.items[1].revents & posix.POLL.IN != 0) {
2409 const n = read_buf.read(client_sock_fd) catch |err| {
2410 if (err == error.WouldBlock) continue;
2411 if (err == error.ConnectionResetByPeer or err == error.BrokenPipe) {
2412 return ClientResult{ .kind = .detach, .session_name = null };
2413 }
2414 std.log.err("daemon read err={s}", .{@errorName(err)});
2415 return err;
2416 };
2417 if (n == 0) {
2418 // Server closed connection
2419 return ClientResult{ .kind = .detach, .session_name = null };
2420 }
2421
2422 while (read_buf.next()) |msg| {
2423 switch (msg.header.tag) {
2424 .Output => {
2425 if (msg.payload.len > 0) {
2426 try stdout_buf.appendSlice(alloc, msg.payload);
2427 }
2428 },
2429 .Resize => {
2430 // daemon is asking for the client's window size usually in response
2431 // to this client being set as leader.
2432 const next_size = ipc.getTerminalSize(posix.STDOUT_FILENO);
2433 try ipc.appendMessage(
2434 alloc,
2435 &sock_write_buf,
2436 .Resize,
2437 std.mem.asBytes(&next_size),
2438 );
2439 },
2440 .Switch => {
2441 return ClientResult{ .kind = .switch_session, .session_name = try alloc.dupe(u8, msg.payload) };
2442 },
2443 else => {},
2444 }
2445 }
2446 }
2447
2448 // Handle socket write (flush buffered messages to daemon)
2449 if (poll_fds.items[1].revents & posix.POLL.OUT != 0) {
2450 if (sock_write_buf.items.len > 0) {
2451 const n = posix.write(client_sock_fd, sock_write_buf.items) catch |err| blk: {
2452 if (err == error.WouldBlock) break :blk 0;
2453 if (err == error.ConnectionResetByPeer or err == error.BrokenPipe) {
2454 return ClientResult{ .kind = .detach, .session_name = null };
2455 }
2456 return err;
2457 };
2458 if (n > 0) {
2459 try sock_write_buf.replaceRange(alloc, 0, n, &[_]u8{});
2460 }
2461 }
2462 }
2463
2464 if (stdout_buf.items.len > 0) {
2465 const n = posix.write(posix.STDOUT_FILENO, stdout_buf.items) catch |err| blk: {
2466 if (err == error.WouldBlock) break :blk 0;
2467 return err;
2468 };
2469 if (n > 0) {
2470 try stdout_buf.replaceRange(alloc, 0, n, &[_]u8{});
2471 }
2472 }
2473
2474 if (poll_fds.items[1].revents & (posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
2475 return ClientResult{ .kind = .detach, .session_name = null };
2476 }
2477 }
2478}
2479
2480/// dameonLoop is what the daemon runs to send and receive ipc commands from its corresponding
2481/// clients. It uses poll() as its non-blocking mechanism.
2482fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
2483 std.log.info("daemon started session={s} pty_fd={d}", .{ daemon.session_name, pty_fd });
2484 daemon.pty_fd = pty_fd;
2485 try openSignalPipe();
2486 installWakeHandler(posix.SIG.TERM);
2487 var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(daemon.alloc, 8);
2488 defer poll_fds.deinit(daemon.alloc);
2489
2490 const init_size = ipc.getTerminalSize(pty_fd);
2491 var term = try ghostty_vt.Terminal.init(daemon.alloc, .{
2492 .cols = init_size.cols,
2493 .rows = init_size.rows,
2494 .max_scrollback = daemon.cfg.max_scrollback,
2495 });
2496 defer term.deinit(daemon.alloc);
2497 var vt_stream = term.vtStream();
2498 defer vt_stream.deinit();
2499
2500 daemon_loop: while (daemon.running) {
2501 poll_fds.clearRetainingCapacity();
2502
2503 try poll_fds.append(daemon.alloc, .{
2504 .fd = server_sock_fd,
2505 .events = posix.POLL.IN,
2506 .revents = 0,
2507 });
2508
2509 var pty_events: i16 = posix.POLL.IN;
2510 if (daemon.pty_write_buf.items.len > 0) {
2511 pty_events |= posix.POLL.OUT;
2512 }
2513 try poll_fds.append(daemon.alloc, .{
2514 .fd = pty_fd,
2515 .events = pty_events,
2516 .revents = 0,
2517 });
2518
2519 try poll_fds.append(daemon.alloc, .{ .fd = sig_pipe[0], .events = posix.POLL.IN, .revents = 0 });
2520
2521 for (daemon.clients.items) |client| {
2522 var events: i16 = posix.POLL.IN;
2523 if (client.has_pending_output) {
2524 events |= posix.POLL.OUT;
2525 }
2526 try poll_fds.append(daemon.alloc, .{
2527 .fd = client.socket_fd,
2528 .events = events,
2529 .revents = 0,
2530 });
2531 }
2532
2533 _ = try posix.poll(poll_fds.items, -1);
2534
2535 if (poll_fds.items[2].revents & posix.POLL.IN != 0) {
2536 drainSignalPipe();
2537 std.log.info(
2538 "SIGTERM received, shutting down gracefully session={s}",
2539 .{daemon.session_name},
2540 );
2541 break :daemon_loop;
2542 }
2543
2544 if (poll_fds.items[0].revents & (posix.POLL.ERR | posix.POLL.HUP | posix.POLL.NVAL) != 0) {
2545 std.log.err("server socket error revents={d}", .{poll_fds.items[0].revents});
2546 break :daemon_loop;
2547 } else if (poll_fds.items[0].revents & posix.POLL.IN != 0) {
2548 const client_fd = try posix.accept(
2549 server_sock_fd,
2550 null,
2551 null,
2552 posix.SOCK.NONBLOCK | posix.SOCK.CLOEXEC,
2553 );
2554 const client = try daemon.alloc.create(Client);
2555 client.* = Client{
2556 .alloc = daemon.alloc,
2557 .socket_fd = client_fd,
2558 .read_buf = try ipc.SocketBuffer.init(daemon.alloc),
2559 .write_buf = undefined,
2560 };
2561 // 64KB initial capacity lets ~15 broadcast cycles (N_TTY_BUF_SIZE reads
2562 // * header) accumulate before the first ArrayList growth. The write
2563 // buffer is userspace-only: it drains via POLLOUT to the client socket,
2564 // which has no corresponding kernel-imposed per-write limit.
2565 client.write_buf = try std.ArrayList(u8).initCapacity(client.alloc, 65536);
2566 try daemon.clients.append(daemon.alloc, client);
2567 std.log.info(
2568 "client connected fd={d} total={d}",
2569 .{ client_fd, daemon.clients.items.len },
2570 );
2571 }
2572
2573 const inp_flags = posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL;
2574 if (poll_fds.items[1].revents & inp_flags != 0) {
2575 // Read from PTY. Buffer is sized to N_TTY_BUF_SIZE (4096): the hard
2576 // kernel limit for the N_TTY line discipline. A larger buffer doesn't
2577 // help: each read() from a PTY master returns at most 4096 bytes
2578 // regardless of the userspace buffer size.
2579 var buf: [4096]u8 = undefined;
2580 const n_opt: ?usize = posix.read(pty_fd, &buf) catch |err| blk: {
2581 if (err == error.WouldBlock) break :blk null;
2582 break :blk 0;
2583 };
2584
2585 if (n_opt) |n| {
2586 if (n == 0) {
2587 // EOF: Shell exited
2588 std.log.info("shell exited pty_fd={d}", .{pty_fd});
2589 break :daemon_loop;
2590 } else {
2591 // Feed PTY output to terminal emulator for state tracking
2592 vt_stream.nextSlice(buf[0..n]);
2593 daemon.has_pty_output = true;
2594
2595 // When no real terminal client has attached yet, respond to
2596 // terminal queries (e.g. DA1/DA2) on behalf of the terminal.
2597 // This prevents fish from waiting 10s for unanswered queries.
2598 // `has_terminal_client` is only set when a client sends .Init
2599 // (a real zmx attach), not when a `zmx run` tail-only client
2600 // connects.
2601 if (!daemon.has_terminal_client and
2602 daemon.pty_write_buf.items.len < Daemon.PTY_WRITE_BUF_MAX)
2603 {
2604 util.respondToDeviceAttributes(daemon.alloc, &daemon.pty_write_buf, buf[0..n]);
2605 }
2606
2607 // In run mode, scan output for exit code marker
2608 if (daemon.is_task_mode and daemon.task_exit_code == null) {
2609 if (util.findTaskExitMarker(buf[0..n])) |exit_code| {
2610 daemon.task_exit_code = exit_code;
2611 daemon.task_ended_at = @intCast(std.time.timestamp());
2612
2613 std.log.info("task completed exit_code={d}", .{exit_code});
2614
2615 // Notify connected clients
2616 for (daemon.clients.items) |c| {
2617 ipc.appendMessage(daemon.alloc, &c.write_buf, .TaskComplete, &[_]u8{exit_code}) catch {};
2618 c.has_pending_output = true;
2619 }
2620 }
2621 }
2622
2623 // Broadcast data to all clients.
2624 // Rewrite OSC 133;A to include redraw=0 so the outer terminal
2625 // does not clear prompt lines on resize (issue #111).
2626 const broadcast_data = util.rewritePromptRedraw(daemon.alloc, buf[0..n]) orelse buf[0..n];
2627 defer if (broadcast_data.ptr != buf[0..n].ptr) daemon.alloc.free(broadcast_data);
2628 for (daemon.clients.items) |client| {
2629 ipc.appendMessage(daemon.alloc, &client.write_buf, .Output, broadcast_data) catch |err| {
2630 std.log.warn(
2631 "failed to buffer output for client err={s}",
2632 .{@errorName(err)},
2633 );
2634 continue;
2635 };
2636 client.has_pending_output = true;
2637 }
2638 }
2639 }
2640 }
2641
2642 if (poll_fds.items[1].revents & posix.POLL.OUT != 0) {
2643 while (daemon.pty_write_buf.items.len > 0) {
2644 const n = posix.write(pty_fd, daemon.pty_write_buf.items) catch |err| {
2645 if (err != error.WouldBlock) {
2646 std.log.warn("pty write failed: {s}", .{@errorName(err)});
2647 daemon.pty_write_buf.clearRetainingCapacity();
2648 }
2649 break;
2650 };
2651 if (n == 0) break;
2652 daemon.pty_write_buf.replaceRange(daemon.alloc, 0, n, &[_]u8{}) catch unreachable;
2653 }
2654 }
2655
2656 var i: usize = daemon.clients.items.len;
2657 // Only iterate over clients that were present when poll_fds was constructed
2658 // poll_fds contains [server, pty, sig_pipe, client0, client1, ...]
2659 // So number of clients in poll_fds is poll_fds.items.len - 3
2660 const num_polled_clients = poll_fds.items.len - 3;
2661 if (i > num_polled_clients) {
2662 // If we have more clients than polled (i.e. we just accepted one), start from the
2663 // polled ones
2664 i = num_polled_clients;
2665 }
2666
2667 clients_loop: while (i > 0) {
2668 i -= 1;
2669 const client = daemon.clients.items[i];
2670 const revents = poll_fds.items[i + 3].revents;
2671
2672 if (revents & posix.POLL.IN != 0) {
2673 const n = client.read_buf.read(client.socket_fd) catch |err| {
2674 if (err == error.WouldBlock) continue;
2675 std.log.debug(
2676 "client read err={s} fd={d}",
2677 .{ @errorName(err), client.socket_fd },
2678 );
2679 const last = daemon.closeClient(client, i, false);
2680 if (last) break :daemon_loop;
2681 continue;
2682 };
2683
2684 if (n == 0) {
2685 // Client closed connection
2686 const last = daemon.closeClient(client, i, false);
2687 if (last) break :daemon_loop;
2688 continue;
2689 }
2690
2691 while (client.read_buf.next()) |msg| {
2692 switch (msg.header.tag) {
2693 .Input => try daemon.handleInput(client, msg.payload),
2694 .Output => try daemon.handleOutput(msg.payload, &vt_stream),
2695 .Init => try daemon.handleInit(client, pty_fd, &term, msg.payload),
2696 .Switch => try daemon.handleSwitch(msg.payload),
2697 .Resize => try daemon.handleResize(client, pty_fd, &term, msg.payload),
2698 .Detach => {
2699 daemon.handleDetach(client, i);
2700 break :clients_loop;
2701 },
2702 .DetachAll => {
2703 daemon.handleDetachAll();
2704 break :clients_loop;
2705 },
2706 .Kill => {
2707 break :daemon_loop;
2708 },
2709 .Info => try daemon.handleInfo(client),
2710 .History => try daemon.handleHistory(client, &term, msg.payload),
2711 .Run => try daemon.handleRun(client, msg.payload),
2712 .Ack, .TaskComplete => {},
2713 .Write => try daemon.handleWrite(client, msg.payload),
2714 _ => std.log.warn(
2715 "ignoring unknown IPC tag={d}",
2716 .{@intFromEnum(msg.header.tag)},
2717 ),
2718 }
2719 }
2720 }
2721
2722 if (revents & posix.POLL.OUT != 0) {
2723 // Flush pending output buffers
2724 const n = posix.write(client.socket_fd, client.write_buf.items) catch |err| blk: {
2725 if (err == error.WouldBlock) break :blk 0;
2726 // Error on write, close client
2727 const last = daemon.closeClient(client, i, false);
2728 if (last) break :daemon_loop;
2729 continue;
2730 };
2731
2732 if (n > 0) {
2733 client.write_buf.replaceRange(daemon.alloc, 0, n, &[_]u8{}) catch unreachable;
2734 }
2735
2736 if (client.write_buf.items.len == 0) {
2737 client.has_pending_output = false;
2738 }
2739 }
2740
2741 if (revents & (posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
2742 const last = daemon.closeClient(client, i, false);
2743 if (last) break :daemon_loop;
2744 }
2745 }
2746 }
2747}
2748
2749fn wakeSignalPipe(_: i32, _: *const posix.siginfo_t, _: ?*anyopaque) callconv(.c) void {
2750 const saved = std.c._errno().*;
2751 _ = std.c.write(sig_pipe[1], "x", 1);
2752 std.c._errno().* = saved;
2753}
2754
2755// std.posix.poll retries EINTR internally, so SA_RESTART is moot -- neither
2756// setting wakes the loop. The handler writes to sig_pipe instead; poll()
2757// wakes on its read end.
2758fn installWakeHandler(sig: u6) void {
2759 const act: posix.Sigaction = .{
2760 .handler = .{ .sigaction = wakeSignalPipe },
2761 .mask = posix.sigemptyset(),
2762 .flags = posix.SA.SIGINFO,
2763 };
2764 posix.sigaction(sig, &act, null);
2765}
2766
2767fn ignoreSigpipe() void {
2768 const act: posix.Sigaction = .{
2769 .handler = .{ .handler = posix.SIG.IGN },
2770 .mask = posix.sigemptyset(),
2771 .flags = 0,
2772 };
2773 posix.sigaction(posix.SIG.PIPE, &act, null);
2774}