Eric Bower
·
2026-04-29
main.zig
1const std = @import("std");
2const posix = std.posix;
3const builtin = @import("builtin");
4const build_options = @import("build_options");
5const ghostty_vt = @import("ghostty-vt");
6const ipc = @import("ipc.zig");
7const log = @import("log.zig");
8const completions = @import("completions.zig");
9const util = @import("util.zig");
10const cross = @import("cross.zig");
11const socket = @import("socket.zig");
12
13pub const version = build_options.version;
14pub const git_sha = build_options.git_sha;
15pub const ghostty_version = build_options.ghostty_version;
16
17var log_system = log.LogSystem{};
18
19pub const std_options: std.Options = .{
20 .logFn = zmxLogFn,
21 .log_level = .debug,
22};
23
24fn zmxLogFn(
25 comptime level: std.log.Level,
26 comptime scope: @Type(.enum_literal),
27 comptime format: []const u8,
28 args: anytype,
29) void {
30 log_system.log(level, scope, format, args);
31}
32
33/// Self-pipe woken by signal handlers. std.posix.poll loops on .INTR internally
34/// (PollError has no Interrupted member), so a signal that lands during poll()
35/// never surfaces; the handler writes a byte here and poll() wakes on POLLIN.
36var sig_pipe: [2]posix.fd_t = .{ -1, -1 };
37
38// https://github.com/ziglang/zig/blob/738d2be9d6b6ef3ff3559130c05159ef53336224/lib/std/posix.zig#L3505
39const O_NONBLOCK: usize = 1 << @bitOffsetOf(posix.O, "NONBLOCK");
40
41const SessionMatch = struct {
42 name: []const u8,
43 is_prefix: bool,
44
45 fn matches(self: SessionMatch, session_name: []const u8) bool {
46 if (self.is_prefix) return std.mem.startsWith(u8, session_name, self.name);
47 return std.mem.eql(u8, session_name, self.name);
48 }
49};
50
51fn parseSessionArg(alloc: std.mem.Allocator, raw: []const u8) !SessionMatch {
52 if (raw.len > 0 and raw[raw.len - 1] == '*') {
53 const name = try socket.getSeshName(alloc, raw[0 .. raw.len - 1]);
54 return .{ .name = name, .is_prefix = true };
55 }
56 const name = try socket.getSeshName(alloc, raw);
57 return .{ .name = name, .is_prefix = false };
58}
59
60fn openSignalPipe() !void {
61 sig_pipe = try posix.pipe2(.{ .CLOEXEC = true, .NONBLOCK = true });
62}
63
64fn drainSignalPipe() void {
65 var b: [16]u8 = undefined;
66 while (true) {
67 const n = posix.read(sig_pipe[0], &b) catch return;
68 if (n == 0) return;
69 }
70}
71
72pub fn main() !void {
73 // use c_allocator to avoid "reached unreachable code" panic in DebugAllocator when forking
74 const alloc = std.heap.c_allocator;
75
76 // Every subcommand may write to a Unix-domain socket; a peer that
77 // disappears between probe and send would otherwise kill us before
78 // write() can return BrokenPipe. Inherited across fork, so this also
79 // covers the daemon.
80 ignoreSigpipe();
81
82 var args = try std.process.argsWithAllocator(alloc);
83 defer args.deinit();
84 _ = args.skip(); // skip program name
85
86 var cfg = try Cfg.init(alloc);
87 defer cfg.deinit(alloc);
88
89 const log_path = try std.fs.path.join(alloc, &.{ cfg.log_dir, "zmx.log" });
90 defer alloc.free(log_path);
91 try log_system.init(alloc, log_path, cfg.log_mode);
92 defer log_system.deinit();
93
94 const cmd = args.next() orelse {
95 return list(&cfg, false);
96 };
97
98 if (std.mem.eql(u8, cmd, "version") or std.mem.eql(u8, cmd, "v") or std.mem.eql(u8, cmd, "-v") or std.mem.eql(u8, cmd, "--version")) {
99 return printVersion(&cfg);
100 } else if (std.mem.eql(u8, cmd, "help") or std.mem.eql(u8, cmd, "h") or std.mem.eql(u8, cmd, "-h")) {
101 return help();
102 } else if (std.mem.eql(u8, cmd, "list") or std.mem.eql(u8, cmd, "l") or std.mem.eql(u8, cmd, "ls")) {
103 const short = if (args.next()) |arg| std.mem.eql(u8, arg, "--short") else false;
104 return list(&cfg, short);
105 } else if (std.mem.eql(u8, cmd, "completions") or std.mem.eql(u8, cmd, "c")) {
106 const arg = args.next() orelse return;
107 const shell = completions.Shell.fromString(arg) orelse return;
108 return printCompletions(shell);
109 } else if (std.mem.eql(u8, cmd, "detach") or std.mem.eql(u8, cmd, "d")) {
110 return detachAll(&cfg);
111 } else if (std.mem.eql(u8, cmd, "history") or std.mem.eql(u8, cmd, "hi")) {
112 var session_name: ?[]const u8 = null;
113 var format: util.HistoryFormat = .plain;
114 while (args.next()) |arg| {
115 if (std.mem.eql(u8, arg, "--vt")) {
116 format = .vt;
117 } else if (std.mem.eql(u8, arg, "--html")) {
118 format = .html;
119 } else if (session_name == null) {
120 session_name = arg;
121 }
122 }
123 const sesh_env = socket.getSeshNameFromEnv();
124 const sesh = try socket.getSeshName(alloc, session_name orelse sesh_env);
125 defer alloc.free(sesh);
126 return history(&cfg, sesh, format);
127 } else if (std.mem.eql(u8, cmd, "attach") or std.mem.eql(u8, cmd, "a")) {
128 const session_name = args.next() orelse "";
129
130 var command_args: std.ArrayList([]const u8) = .empty;
131 defer command_args.deinit(alloc);
132 while (args.next()) |arg| {
133 try command_args.append(alloc, arg);
134 }
135
136 const clients = try std.ArrayList(*Client).initCapacity(alloc, 10);
137 var command: ?[][]const u8 = null;
138 if (command_args.items.len > 0) {
139 command = command_args.items;
140 }
141
142 var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
143 const cwd = std.posix.getcwd(&cwd_buf) catch "";
144
145 const sesh = try socket.getSeshName(alloc, session_name);
146 defer alloc.free(sesh);
147 var daemon = Daemon{
148 .running = true,
149 .cfg = &cfg,
150 .alloc = alloc,
151 .clients = clients,
152 .session_name = sesh,
153 .socket_path = undefined,
154 .pid = undefined,
155 .command = command,
156 .cwd = cwd,
157 .created_at = @intCast(std.time.timestamp()),
158 .leader_client_fd = null,
159 };
160 daemon.socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
161 error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
162 error.OutOfMemory => return err,
163 };
164 std.log.info("socket path={s}", .{daemon.socket_path});
165 return attach(&daemon);
166 } else if (std.mem.eql(u8, cmd, "run") or std.mem.eql(u8, cmd, "r")) {
167 const session_name = args.next() orelse "";
168
169 var cmd_args_raw: std.ArrayList([]const u8) = .empty;
170 defer cmd_args_raw.deinit(alloc);
171 var detached = false;
172 while (args.next()) |arg| {
173 if (std.mem.startsWith(u8, arg, "-d")) {
174 detached = true;
175 continue;
176 }
177 try cmd_args_raw.append(alloc, arg);
178 }
179 const clients = try std.ArrayList(*Client).initCapacity(alloc, 10);
180
181 var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
182 const cwd = std.posix.getcwd(&cwd_buf) catch "";
183
184 const sesh = try socket.getSeshName(alloc, session_name);
185 defer alloc.free(sesh);
186 var daemon = Daemon{
187 .running = true,
188 .cfg = &cfg,
189 .alloc = alloc,
190 .clients = clients,
191 .session_name = sesh,
192 .socket_path = undefined,
193 .pid = undefined,
194 .command = null,
195 .cwd = cwd,
196 .created_at = @intCast(std.time.timestamp()),
197 .is_task_mode = true,
198 .leader_client_fd = null,
199 };
200 daemon.socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
201 error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
202 error.OutOfMemory => return err,
203 };
204 std.log.info("socket path={s}", .{daemon.socket_path});
205 return run(&daemon, detached, cmd_args_raw.items);
206 } else if (std.mem.eql(u8, cmd, "send") or std.mem.eql(u8, cmd, "s")) {
207 const session_name = args.next() orelse "";
208 if (session_name.len == 0) return error.SessionNameRequired;
209
210 var text_parts: std.ArrayList([]const u8) = .empty;
211 defer text_parts.deinit(alloc);
212 while (args.next()) |arg| {
213 try text_parts.append(alloc, arg);
214 }
215
216 const sesh = try socket.getSeshName(alloc, session_name);
217 defer alloc.free(sesh);
218 const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
219 error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
220 error.OutOfMemory => return err,
221 };
222 return send(&cfg, sesh, socket_path, text_parts.items, .Input);
223 } else if (std.mem.eql(u8, cmd, "print") or std.mem.eql(u8, cmd, "p")) {
224 const session_name = args.next() orelse "";
225 if (session_name.len == 0) return error.SessionNameRequired;
226
227 var text_parts: std.ArrayList([]const u8) = .empty;
228 defer text_parts.deinit(alloc);
229 while (args.next()) |arg| {
230 try text_parts.append(alloc, arg);
231 }
232
233 const sesh = try socket.getSeshName(alloc, session_name);
234 defer alloc.free(sesh);
235 const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
236 error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
237 error.OutOfMemory => return err,
238 };
239 return send(&cfg, sesh, socket_path, text_parts.items, .Output);
240 } else if (std.mem.eql(u8, cmd, "kill") or std.mem.eql(u8, cmd, "k")) {
241 var stderr_buffer: [1024]u8 = undefined;
242 var stderr_writer = std.fs.File.stderr().writer(&stderr_buffer);
243 const stderr = &stderr_writer.interface;
244
245 var matchers: std.ArrayList(SessionMatch) = .empty;
246 defer {
247 for (matchers.items) |m| {
248 alloc.free(m.name);
249 }
250 matchers.deinit(alloc);
251 }
252 var force = false;
253 while (args.next()) |session_name| {
254 if (std.mem.eql(u8, session_name, "--force")) {
255 force = true;
256 continue;
257 }
258 const m = try parseSessionArg(alloc, session_name);
259 try matchers.append(alloc, m);
260 }
261 if (matchers.items.len == 0) {
262 return error.SessionNameRequired;
263 }
264 var sessions = try util.get_session_entries(alloc, cfg.socket_dir);
265 defer {
266 for (sessions.items) |session| {
267 session.deinit(alloc);
268 }
269 sessions.deinit(alloc);
270 }
271
272 for (sessions.items) |session| {
273 for (matchers.items) |m| {
274 if (!m.matches(session.name)) {
275 continue;
276 }
277
278 kill(&cfg, session.name, force) catch |err| {
279 try stderr.print(
280 "failed to kill session={s}: {s}\n",
281 .{ session.name, @errorName(err) },
282 );
283 try stderr.flush();
284 };
285 break;
286 }
287 }
288 } else if (std.mem.eql(u8, cmd, "wait") or std.mem.eql(u8, cmd, "w")) {
289 var matchers: std.ArrayList(SessionMatch) = .empty;
290 defer {
291 for (matchers.items) |m| {
292 alloc.free(m.name);
293 }
294 matchers.deinit(alloc);
295 }
296 while (args.next()) |session_name| {
297 const m = try parseSessionArg(alloc, session_name);
298 try matchers.append(alloc, m);
299 }
300 if (matchers.items.len == 0) {
301 return error.SessionNameRequired;
302 }
303 return wait(&cfg, matchers);
304 } else if (std.mem.eql(u8, cmd, "tail") or std.mem.eql(u8, cmd, "t")) {
305 var matchers: std.ArrayList(SessionMatch) = .empty;
306 defer {
307 for (matchers.items) |m| {
308 alloc.free(m.name);
309 }
310 matchers.deinit(alloc);
311 }
312 while (args.next()) |session_name| {
313 const m = try parseSessionArg(alloc, session_name);
314 try matchers.append(alloc, m);
315 }
316 if (matchers.items.len == 0) {
317 return error.SessionNameRequired;
318 }
319
320 // Resolve matchers against session list to get actual session names.
321 var resolved_names: std.ArrayList([]const u8) = .empty;
322 defer {
323 for (resolved_names.items) |name| {
324 alloc.free(name);
325 }
326 resolved_names.deinit(alloc);
327 }
328
329 var any_prefix = false;
330 for (matchers.items) |m| {
331 if (m.is_prefix) {
332 any_prefix = true;
333 break;
334 }
335 }
336
337 if (any_prefix) {
338 var sessions = try util.get_session_entries(alloc, cfg.socket_dir);
339 defer {
340 for (sessions.items) |session| {
341 session.deinit(alloc);
342 }
343 sessions.deinit(alloc);
344 }
345 for (sessions.items) |session| {
346 for (matchers.items) |m| {
347 if (m.matches(session.name)) {
348 try resolved_names.append(alloc, try alloc.dupe(u8, session.name));
349 break;
350 }
351 }
352 }
353 }
354 // Add exact-match names directly.
355 for (matchers.items) |m| {
356 if (!m.is_prefix) {
357 try resolved_names.append(alloc, try alloc.dupe(u8, m.name));
358 }
359 }
360
361 var client_socket_fds = try std.ArrayList(i32).initCapacity(alloc, resolved_names.items.len);
362 defer {
363 for (client_socket_fds.items) |client_fd| {
364 posix.close(client_fd);
365 }
366 client_socket_fds.deinit(alloc);
367 }
368
369 for (resolved_names.items) |session_name| {
370 const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, session_name) catch |err| switch (err) {
371 error.NameTooLong => return socket.printSessionNameTooLong(session_name, cfg.socket_dir),
372 error.OutOfMemory => return err,
373 };
374 const client_sock = try socket.sessionConnect(socket_path);
375 try client_socket_fds.append(alloc, client_sock);
376 }
377 _ = try tail(client_socket_fds, false, false);
378 } else if (std.mem.eql(u8, cmd, "write") or std.mem.eql(u8, cmd, "wr")) {
379 const session_name = args.next() orelse "";
380 if (session_name.len == 0) return error.SessionNameRequired;
381 const file_path = args.next() orelse "";
382 if (file_path.len == 0) return error.FilePathRequired;
383
384 var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
385 const cwd = std.posix.getcwd(&cwd_buf) catch "";
386 const clients = try std.ArrayList(*Client).initCapacity(alloc, 10);
387 const sesh = try socket.getSeshName(alloc, session_name);
388 defer alloc.free(sesh);
389 var daemon = Daemon{
390 .running = true,
391 .cfg = &cfg,
392 .alloc = alloc,
393 .clients = clients,
394 .session_name = sesh,
395 .socket_path = undefined,
396 .pid = undefined,
397 .command = null,
398 .cwd = cwd,
399 .created_at = @intCast(std.time.timestamp()),
400 .is_task_mode = true,
401 .leader_client_fd = null,
402 };
403 daemon.socket_path = socket.getSocketPath(alloc, cfg.socket_dir, sesh) catch |err| switch (err) {
404 error.NameTooLong => return socket.printSessionNameTooLong(sesh, cfg.socket_dir),
405 error.OutOfMemory => return err,
406 };
407 std.log.info("socket path={s}", .{daemon.socket_path});
408 try writeFile(&daemon, file_path);
409 } else {
410 return help();
411 }
412}
413
414/// Client represents each terminal that has connected to a session.
415///
416/// Multiple Clients can connect to a single session.
417const Client = struct {
418 alloc: std.mem.Allocator,
419 socket_fd: i32,
420 has_pending_output: bool = false,
421 read_buf: ipc.SocketBuffer,
422 write_buf: std.ArrayList(u8),
423
424 pub fn deinit(self: *Client) void {
425 posix.close(self.socket_fd);
426 self.read_buf.deinit();
427 self.write_buf.deinit(self.alloc);
428 }
429};
430
431/// Cfg is zmx's configuration container.
432///
433/// The purpose of this container is to hold anything that can be modified by the user.
434const Cfg = struct {
435 socket_dir: []const u8,
436 log_dir: []const u8,
437 max_scrollback: usize = 10_000_000,
438 dir_mode: u32 = 0o750,
439 log_mode: u32 = 0o640,
440
441 pub fn init(alloc: std.mem.Allocator) !Cfg {
442 const socket_dir = try socketDir(alloc);
443 const log_dir = try std.fmt.allocPrint(alloc, "{s}/logs", .{socket_dir});
444 errdefer alloc.free(log_dir);
445
446 const dir_mode = if (std.posix.getenv("ZMX_DIR_MODE")) |m|
447 std.fmt.parseInt(u32, m, 8) catch 0o750
448 else
449 0o750;
450
451 const log_mode = if (std.posix.getenv("ZMX_LOG_MODE")) |m|
452 std.fmt.parseInt(u32, m, 8) catch 0o640
453 else
454 0o640;
455
456 var cfg = Cfg{
457 .socket_dir = socket_dir,
458 .log_dir = log_dir,
459 .dir_mode = dir_mode,
460 .log_mode = log_mode,
461 };
462
463 try cfg.mkdir();
464
465 return cfg;
466 }
467
468 fn socketDir(alloc: std.mem.Allocator) ![]const u8 {
469 const tmpdir = std.mem.trimRight(u8, posix.getenv("TMPDIR") orelse "/tmp", "/");
470 const uid = posix.getuid();
471
472 const socket_dir: []const u8 = if (posix.getenv("ZMX_DIR")) |zmxdir|
473 try alloc.dupe(u8, zmxdir)
474 else if (posix.getenv("XDG_RUNTIME_DIR")) |xdg_runtime|
475 try std.fmt.allocPrint(alloc, "{s}/zmx", .{xdg_runtime})
476 else
477 try std.fmt.allocPrint(alloc, "{s}/zmx-{d}", .{ tmpdir, uid });
478 errdefer alloc.free(socket_dir);
479
480 return socket_dir;
481 }
482
483 pub fn deinit(self: *Cfg, alloc: std.mem.Allocator) void {
484 if (self.socket_dir.len > 0) alloc.free(self.socket_dir);
485 if (self.log_dir.len > 0) alloc.free(self.log_dir);
486 }
487
488 pub fn mkdir(self: *Cfg) !void {
489 posix.mkdirat(posix.AT.FDCWD, self.socket_dir, @intCast(self.dir_mode)) catch |err| switch (err) {
490 error.PathAlreadyExists => {},
491 else => return err,
492 };
493
494 posix.mkdirat(posix.AT.FDCWD, self.log_dir, @intCast(self.dir_mode)) catch |err| switch (err) {
495 error.PathAlreadyExists => {},
496 else => return err,
497 };
498 }
499};
500
501test "Cfg.init uses default modes when env vars are not set" {
502 const alloc = std.testing.allocator;
503
504 // Ensure they are not set
505 _ = cross.c.unsetenv("ZMX_DIR_MODE");
506 _ = cross.c.unsetenv("ZMX_LOG_MODE");
507
508 var cfg = try Cfg.init(alloc);
509 defer cfg.deinit(alloc);
510
511 try std.testing.expectEqual(@as(u32, 0o750), cfg.dir_mode);
512 try std.testing.expectEqual(@as(u32, 0o640), cfg.log_mode);
513}
514
515test "Cfg.init uses custom modes from env vars" {
516 const alloc = std.testing.allocator;
517
518 // Set custom octal values
519 _ = cross.c.setenv("ZMX_DIR_MODE", "770", 1);
520 _ = cross.c.setenv("ZMX_LOG_MODE", "660", 1);
521 defer {
522 _ = cross.c.unsetenv("ZMX_DIR_MODE");
523 _ = cross.c.unsetenv("ZMX_LOG_MODE");
524 }
525
526 var cfg = try Cfg.init(alloc);
527 defer cfg.deinit(alloc);
528
529 try std.testing.expectEqual(@as(u32, 0o770), cfg.dir_mode);
530 try std.testing.expectEqual(@as(u32, 0o660), cfg.log_mode);
531}
532
533/// Daemon is responsible for managing a zmx session.
534///
535/// It holds all the state for a running session. Instead of a single daemon for all sessions, we
536/// create a daemon for every session. This has some benefits. The ipc communication between
537/// session clients and the daemon doesn't need to be tagged with the session name. If a daemon
538/// crashes for one session won't crash all the other sessions.
539///
540/// Conceptually it's also much simpler to reason about.
541const Daemon = struct {
542 cfg: *Cfg,
543 alloc: std.mem.Allocator,
544 clients: std.ArrayList(*Client),
545 // This control which client is the leader. The leader controls terminal state and
546 // cols/rows of session.
547 leader_client_fd: ?i32,
548 session_name: []const u8,
549 socket_path: []const u8,
550 running: bool,
551 pid: i32,
552 command: ?[]const []const u8 = null,
553 cwd: []const u8 = "",
554 has_pty_output: bool = false,
555 has_had_client: bool = false,
556 has_terminal_client: bool = false, // true only after a real attach (.Init received)
557 created_at: u64, // unix timestamp (ns)
558 is_task_mode: bool = false, // flag for when session is run as a task
559 task_exit_code: ?u8 = null, // null = running or n/a, set when task completes
560 task_ended_at: ?u64 = null, // timestamp when task exited
561 is_fish: bool = false, // true if session shell is fish (affects exit code variable)
562 pty_fd: i32 = -1, // set by daemonLoop so handleRun can probe the foreground process
563 pty_write_buf: std.ArrayList(u8) = .empty,
564
565 const EnsureSessionResult = struct {
566 created: bool,
567 is_daemon: bool,
568 };
569
570 pub fn deinit(self: *Daemon) void {
571 self.clients.deinit(self.alloc);
572 self.pty_write_buf.deinit(self.alloc);
573 self.alloc.free(self.socket_path);
574 }
575
576 pub fn shutdown(self: *Daemon) void {
577 std.log.info("shutting down daemon session={s}", .{self.session_name});
578 self.running = false;
579
580 for (self.clients.items) |client| {
581 client.deinit();
582 self.alloc.destroy(client);
583 }
584 self.clients.clearRetainingCapacity();
585 }
586
587 pub fn closeClient(self: *Daemon, client: *Client, i: usize, shutdown_on_last: bool) bool {
588 const fd = client.socket_fd;
589 // leader is disconnected, remove ref and let another client claim leader on input
590 if (self.leader_client_fd == client.socket_fd) {
591 std.log.info(
592 "unsetting leader session={s} fd={d}",
593 .{ self.session_name, client.socket_fd },
594 );
595 self.leader_client_fd = null;
596 }
597 client.deinit();
598 self.alloc.destroy(client);
599 _ = self.clients.orderedRemove(i);
600 std.log.info("client disconnected fd={d} remaining={d}", .{ fd, self.clients.items.len });
601 if (shutdown_on_last and self.clients.items.len == 0) {
602 self.shutdown();
603 return true;
604 }
605 return false;
606 }
607
608 fn setLeader(self: *Daemon, client: *Client) !void {
609 std.log.info("setting new leader client_fd={d}", .{client.socket_fd});
610 self.leader_client_fd = client.socket_fd;
611 // Send a resize message to the client so it can send us back their window size
612 // so we can resize the pty and ghostty state.
613 try ipc.appendMessage(self.alloc, &client.write_buf, .Resize, "");
614 client.has_pending_output = true;
615 }
616
617 /// Runs in the forked child. Either execs or returns an error (caller
618 /// must exit on error -- returning would fall through to parent code).
619 fn execChild(self: *Daemon) !noreturn {
620 const alloc = std.heap.c_allocator;
621
622 // main() set SIGPIPE to SIG_IGN, which (unlike handlers) survives
623 // exec. Restore the default so the shell and its children behave
624 // normally (e.g. `yes | head` should exit 141 via SIGPIPE).
625 const dfl: posix.Sigaction = .{
626 .handler = .{ .handler = posix.SIG.DFL },
627 .mask = posix.sigemptyset(),
628 .flags = 0,
629 };
630 posix.sigaction(posix.SIG.PIPE, &dfl, null);
631
632 const session_env = try std.fmt.allocPrintSentinel(
633 alloc,
634 "ZMX_SESSION={s}",
635 .{self.session_name},
636 0,
637 );
638 _ = cross.c.putenv(session_env.ptr);
639
640 if (self.command) |cmd_args| {
641 const argv = try alloc.allocSentinel(?[*:0]const u8, cmd_args.len, null);
642 for (cmd_args, 0..) |arg, i| {
643 argv[i] = try alloc.dupeZ(u8, arg);
644 }
645 const err = std.posix.execvpeZ(argv[0].?, argv.ptr, std.c.environ);
646 std.log.err("execvpe failed: cmd={s} err={s}", .{ cmd_args[0], @errorName(err) });
647 std.posix.exit(1);
648 }
649
650 const shell = util.detectShell();
651 // Use "-shellname" as argv[0] to signal login shell (traditional method)
652 const login_shell = try std.fmt.allocPrintSentinel(
653 alloc,
654 "-{s}",
655 .{std.fs.path.basename(shell)},
656 0,
657 );
658 const argv = [_:null]?[*:0]const u8{ login_shell, null };
659 const err = std.posix.execveZ(shell, &argv, std.c.environ);
660 std.log.err("execve failed: err={s}", .{@errorName(err)});
661 std.posix.exit(1);
662 }
663
664 /// spawnPty runs forkpty() and executes the shell or shell command the user provides.
665 fn spawnPty(self: *Daemon) !c_int {
666 const size = ipc.getTerminalSize(posix.STDOUT_FILENO);
667 var ws: cross.c.struct_winsize = .{
668 .ws_row = size.rows,
669 .ws_col = size.cols,
670 .ws_xpixel = 0,
671 .ws_ypixel = 0,
672 };
673
674 var master_fd: c_int = undefined;
675 const pid = cross.forkpty(&master_fd, null, null, &ws);
676 if (pid < 0) {
677 return error.ForkPtyFailed;
678 }
679
680 if (pid == 0) { // child pid code path
681 // In the forked child, ANY error must exit rather than propagate:
682 // a returned error falls through to the parent code path below,
683 // running a second daemon on the same socket (or worse, hitting
684 // errdefers that delete the parent's socket file).
685 execChild(self) catch |err| {
686 std.log.err("child setup failed: {s}", .{@errorName(err)});
687 std.posix.exit(1);
688 };
689 unreachable; // execChild either execs or exits, never returns ok
690 }
691 // master pid code path
692 self.pid = pid;
693 std.log.info("pty spawned session={s} pid={d}", .{ self.session_name, pid });
694
695 // make pty non-blocking
696 const flags = try posix.fcntl(master_fd, posix.F.GETFL, 0);
697 _ = try posix.fcntl(master_fd, posix.F.SETFL, flags | O_NONBLOCK);
698 return master_fd;
699 }
700
701 /// ensureSession "upserts" a session by checking if the unix socket exists already.
702 /// If not it creates one and spawns the daemon.
703 fn ensureSession(self: *Daemon) !EnsureSessionResult {
704 var dir = try std.fs.openDirAbsolute(self.cfg.socket_dir, .{});
705 defer dir.close();
706
707 const exists = try socket.sessionExists(dir, self.session_name);
708 var should_create = !exists;
709
710 if (exists) {
711 if (ipc.connectSession(self.socket_path)) |fd| {
712 posix.close(fd);
713 if (self.command != null) {
714 std.log.warn(
715 "session already exists, ignoring command session={s}",
716 .{self.session_name},
717 );
718 }
719 } else |err| switch (err) {
720 // Daemon is definitively gone: safe to replace.
721 error.ConnectionRefused => {
722 socket.cleanupStaleSocket(dir, self.session_name);
723 should_create = true;
724 },
725 // Connect failed for an unusual reason. The check is only to
726 // decide create-vs-attach; the socket file exists, so proceed
727 // to attach rather than fail or orphan.
728 else => {
729 std.log.warn(
730 "connect failed ({s}), proceeding to attach session={s}",
731 .{ @errorName(err), self.session_name },
732 );
733 },
734 }
735 }
736
737 if (should_create) {
738 std.log.info("creating session={s}", .{self.session_name});
739 const server_sock_fd = try socket.createSocket(self.socket_path);
740
741 // creates the daemon
742 const pid = try posix.fork();
743 if (pid == 0) { // child (daemon)
744 // becomes the session leader and detaches process from its controlling terminal
745 _ = try posix.setsid();
746
747 log_system.deinit();
748
749 // Redirect stdin/stdout/stderr to /dev/null. The daemon
750 // communicates via its unix socket, not stdio. Without
751 // this, any pipe on FDs 0-2 (e.g. from bats' `run`
752 // keyword) stays open for the daemon's lifetime, causing
753 // the caller to hang waiting for EOF.
754 {
755 const devnull = std.posix.open(
756 "/dev/null",
757 .{ .ACCMODE = .RDWR },
758 0,
759 ) catch |err| {
760 std.log.warn("failed to open /dev/null: {s}", .{@errorName(err)});
761 return err;
762 };
763 inline for (.{ posix.STDIN_FILENO, posix.STDOUT_FILENO, posix.STDERR_FILENO }) |fd| {
764 _ = posix.dup2(devnull, fd) catch |err| {
765 std.log.warn("dup2 /dev/null -> {d}: {s}", .{ fd, @errorName(err) });
766 return err;
767 };
768 }
769 if (devnull > 2) posix.close(devnull);
770 }
771
772 // Close file descriptors inherited from the parent that the
773 // daemon doesn't need. This prevents test harnesses (like
774 // bats) from hanging -- they wait for their internal FDs (3+)
775 // to close before exiting.
776 //
777 // Must run BEFORE log_system.init() otherwise the new log
778 // FD gets closed, and spawnPty() reuses that FD number for
779 // the PTY master, causing log writes to leak into the terminal.
780 //
781 // Skip server_sock_fd (needed for IPC) and dir.fd (needed to
782 // delete the socket file on shutdown).
783 {
784 const dir_fd = @as(i32, @intCast(dir.fd));
785 var fd: i32 = 3;
786 while (fd < 64) : (fd += 1) {
787 if (fd == server_sock_fd or fd == dir_fd) continue;
788 _ = std.c.close(fd);
789 }
790 }
791
792 const session_log_name = try std.fmt.allocPrint(
793 self.alloc,
794 "{s}.log",
795 .{self.session_name},
796 );
797 defer self.alloc.free(session_log_name);
798 const session_log_path = try std.fs.path.join(
799 self.alloc,
800 &.{ self.cfg.log_dir, session_log_name },
801 );
802 defer self.alloc.free(session_log_path);
803 try log_system.init(self.alloc, session_log_path, self.cfg.log_mode);
804
805 // If spawnPty fails, clean up here. Once it succeeds,
806 // the inner block's defer takes ownership of cleanup to
807 // avoid double-closing server_sock_fd on daemonLoop error.
808 const pty_fd = self.spawnPty() catch |err| {
809 posix.close(server_sock_fd);
810 dir.deleteFile(self.session_name) catch {};
811 return err;
812 };
813
814 defer {
815 self.handleKill();
816 self.deinit();
817 posix.close(pty_fd);
818 _ = posix.waitpid(self.pid, 0);
819 posix.close(server_sock_fd);
820 std.log.info("deleting socket file session={s}", .{self.session_name});
821 dir.deleteFile(self.session_name) catch |err| {
822 std.log.warn("failed to delete socket file err={s}", .{@errorName(err)});
823 };
824 }
825
826 try daemonLoop(self, server_sock_fd, pty_fd);
827 return .{ .created = true, .is_daemon = true };
828 }
829 posix.close(server_sock_fd);
830 std.Thread.sleep(10 * std.time.ns_per_ms);
831 return .{ .created = true, .is_daemon = false };
832 }
833
834 return .{ .created = false, .is_daemon = false };
835 }
836
837 const PTY_WRITE_BUF_MAX = 256 * 1024;
838
839 /// Queue bytes for the PTY's stdin. Flushed by daemonLoop on POLLOUT.
840 /// Drops the payload if the buffer is over cap -- same failure mode as
841 /// the old direct-write ptyWrite (drop on EAGAIN), just at a 64x higher
842 /// threshold. Capping avoids OOM when the shell stops reading; dropping
843 /// new (not old) bytes avoids tearing a partially-accepted sequence.
844 fn queuePtyInput(self: *Daemon, data: []const u8) void {
845 if (data.len == 0) return;
846 if (self.pty_write_buf.items.len + data.len > PTY_WRITE_BUF_MAX) {
847 std.log.warn(
848 "pty input dropped {d} bytes (buffer full, shell not reading)",
849 .{data.len},
850 );
851 return;
852 }
853 std.log.debug("buffering pty input data={x}", .{data});
854 self.pty_write_buf.appendSlice(self.alloc, data) catch |err| {
855 std.log.warn(
856 "pty input dropped {d} bytes: {s}",
857 .{ data.len, @errorName(err) },
858 );
859 };
860 }
861
862 pub fn handleInput(self: *Daemon, client: *Client, payload: []const u8) !void {
863 std.log.debug("buffering pty input data={x}", .{payload});
864 // client is leader, send entire payload (ansi escape codes + text)
865 if (self.leader_client_fd == client.socket_fd) {
866 self.queuePtyInput(payload);
867 return;
868 }
869
870 // check if leader needs to be updated by detecting any user input
871 if (util.isUserInput(payload)) {
872 try self.setLeader(client);
873 self.queuePtyInput(payload);
874 }
875 }
876
877 pub fn handleSwitch(self: *Daemon, session_name: []const u8) !void {
878 for (self.clients.items) |client| {
879 if (self.leader_client_fd == client.socket_fd) {
880 ipc.appendMessage(
881 self.alloc,
882 &client.write_buf,
883 .Switch,
884 session_name,
885 ) catch |err| {
886 std.log.warn(
887 "failed to buffer terminal state for client err={s}",
888 .{@errorName(err)},
889 );
890 };
891 client.has_pending_output = true;
892 return;
893 }
894 }
895 return error.NoLeaderFound;
896 }
897
898 pub fn handleInit(
899 self: *Daemon,
900 client: *Client,
901 pty_fd: i32,
902 term: *ghostty_vt.Terminal,
903 payload: []const u8,
904 ) !void {
905 if (payload.len != @sizeOf(ipc.Resize)) return;
906
907 // Serialize terminal state BEFORE resize to capture correct cursor position.
908 // Resizing triggers reflow which can move the cursor, and the shell's
909 // SIGWINCH-triggered redraw will run after our snapshot is sent.
910 // Only serialize on re-attach (has_had_client), not first attach, to avoid
911 // interfering with shell initialization (DA1 queries, etc.)
912 if (self.has_pty_output and self.has_had_client) {
913 const cursor = &term.screens.active.cursor;
914 std.log.debug(
915 "cursor before serialize: x={d} y={d} pending_wrap={}",
916 .{ cursor.x, cursor.y, cursor.pending_wrap },
917 );
918 if (util.serializeTerminalState(self.alloc, term)) |term_output| {
919 std.log.debug("serialize terminal state", .{});
920 // Rewrite OSC 133;A to include redraw=0 so the outer terminal
921 // does not clear prompt lines on resize (issue #111).
922 const restore_data = util.rewritePromptRedraw(self.alloc, term_output) orelse term_output;
923 defer self.alloc.free(term_output);
924 defer if (restore_data.ptr != term_output.ptr) self.alloc.free(restore_data);
925 ipc.appendMessage(self.alloc, &client.write_buf, .Output, restore_data) catch |err| {
926 std.log.warn(
927 "failed to buffer terminal state for client err={s}",
928 .{@errorName(err)},
929 );
930 };
931 client.has_pending_output = true;
932 }
933 }
934
935 // no leader is set so set one
936 if (self.leader_client_fd == null) {
937 try self.setLeader(client);
938 }
939
940 // only resize if leader
941 if (self.leader_client_fd == client.socket_fd) {
942 const resize = std.mem.bytesToValue(ipc.Resize, payload);
943 var ws: cross.c.struct_winsize = .{
944 .ws_row = resize.rows,
945 .ws_col = resize.cols,
946 .ws_xpixel = 0,
947 .ws_ypixel = 0,
948 };
949 _ = cross.c.ioctl(pty_fd, cross.c.TIOCSWINSZ, &ws);
950 // Disable prompt_redraw before resize. The daemon's internal terminal
951 // would otherwise clear prompt lines expecting the shell to redraw them,
952 // but the shell's redraw goes to the PTY (forwarded to clients), not to
953 // this daemon terminal. The clearing corrupts the daemon's snapshot state.
954 const saved_prompt_redraw = term.flags.shell_redraws_prompt;
955 term.flags.shell_redraws_prompt = .false;
956 defer term.flags.shell_redraws_prompt = saved_prompt_redraw;
957 try term.resize(self.alloc, resize.cols, resize.rows);
958
959 // Mark that we've had a client init, so subsequent clients get terminal state
960 self.has_had_client = true;
961 self.has_terminal_client = true;
962
963 std.log.debug("init resize rows={d} cols={d}", .{ resize.rows, resize.cols });
964 }
965 }
966
967 pub fn handleResize(
968 self: *Daemon,
969 client: *Client,
970 pty_fd: i32,
971 term: *ghostty_vt.Terminal,
972 payload: []const u8,
973 ) !void {
974 if (payload.len != @sizeOf(ipc.Resize)) return;
975 if (self.leader_client_fd == null) {
976 try self.setLeader(client);
977 }
978 // only leader can resize
979 if (self.leader_client_fd != client.socket_fd) return;
980
981 const resize = std.mem.bytesToValue(ipc.Resize, payload);
982 var ws: cross.c.struct_winsize = .{
983 .ws_row = resize.rows,
984 .ws_col = resize.cols,
985 .ws_xpixel = 0,
986 .ws_ypixel = 0,
987 };
988 _ = cross.c.ioctl(pty_fd, cross.c.TIOCSWINSZ, &ws);
989 // Disable prompt_redraw before resize (same rationale as handleInit).
990 const saved_prompt_redraw = term.flags.shell_redraws_prompt;
991 term.flags.shell_redraws_prompt = .false;
992 defer term.flags.shell_redraws_prompt = saved_prompt_redraw;
993 try term.resize(self.alloc, resize.cols, resize.rows);
994 std.log.debug("resize rows={d} cols={d}", .{ resize.rows, resize.cols });
995 }
996
997 pub fn handleDetach(self: *Daemon, client: *Client, i: usize) void {
998 std.log.info("client detach session={s} fd={d}", .{ self.session_name, client.socket_fd });
999 _ = self.closeClient(client, i, false);
1000 }
1001
1002 pub fn handleDetachAll(self: *Daemon) void {
1003 std.log.info("detach all clients={d}", .{self.clients.items.len});
1004 for (self.clients.items) |client_to_close| {
1005 client_to_close.deinit();
1006 self.alloc.destroy(client_to_close);
1007 }
1008 self.clients.clearRetainingCapacity();
1009 }
1010
1011 pub fn handleKill(self: *Daemon) void {
1012 std.log.info("kill received session={s}", .{self.session_name});
1013 self.shutdown();
1014 // gracefully shutdown shell processes, shells tend to ignore SIGTERM so we send SIGHUP
1015 // instead
1016 // https://www.gnu.org/software/bash/manual/html_node/Signals.html
1017 // negative pid means kill process and children
1018 std.log.info("sending SIGHUP session={s} pid={d}", .{ self.session_name, self.pid });
1019 posix.kill(-self.pid, posix.SIG.HUP) catch |err| {
1020 std.log.warn("failed to send SIGHUP to pty child err={s}", .{@errorName(err)});
1021 };
1022 std.Thread.sleep(500 * std.time.ns_per_ms);
1023 posix.kill(-self.pid, posix.SIG.KILL) catch |err| {
1024 std.log.warn("failed to send SIGKILL to pty child err={s}", .{@errorName(err)});
1025 };
1026 }
1027
1028 pub fn handleInfo(self: *Daemon, client: *Client) !void {
1029 // zeroes() so asBytes() doesn't ship struct padding + unused cmd/cwd
1030 // tail bytes (daemon stack contents) to clients.
1031 var info = std.mem.zeroes(ipc.Info);
1032 info.clients_len = self.clients.items.len - 1;
1033 info.pid = self.pid;
1034 info.created_at = self.created_at;
1035 info.task_ended_at = self.task_ended_at orelse 0;
1036 info.task_exit_code = self.task_exit_code orelse 0;
1037
1038 // Build command string from args, re-quoting args that contain
1039 // shell-special characters so the displayed command is copy-pasteable.
1040 const cur_cmd = self.command;
1041 if (cur_cmd) |args| {
1042 for (args, 0..) |arg, i| {
1043 const quoted = if (util.shellNeedsQuoting(arg))
1044 util.shellQuote(self.alloc, arg) catch null
1045 else
1046 null;
1047 defer if (quoted) |q| self.alloc.free(q);
1048 const src = quoted orelse arg;
1049
1050 const need = src.len + @as(usize, if (i > 0) 1 else 0);
1051 if (info.cmd_len + need > ipc.MAX_CMD_LEN) {
1052 const ellipsis = "...";
1053 if (info.cmd_len + ellipsis.len <= ipc.MAX_CMD_LEN) {
1054 @memcpy(info.cmd[info.cmd_len..][0..ellipsis.len], ellipsis);
1055 info.cmd_len += ellipsis.len;
1056 }
1057 break;
1058 }
1059
1060 if (i > 0) {
1061 info.cmd[info.cmd_len] = ' ';
1062 info.cmd_len += 1;
1063 }
1064 @memcpy(info.cmd[info.cmd_len..][0..src.len], src);
1065 info.cmd_len += @intCast(src.len);
1066 }
1067 }
1068
1069 info.cwd_len = @intCast(@min(self.cwd.len, ipc.MAX_CWD_LEN));
1070 @memcpy(info.cwd[0..info.cwd_len], self.cwd[0..info.cwd_len]);
1071
1072 try ipc.appendMessage(self.alloc, &client.write_buf, .Info, std.mem.asBytes(&info));
1073 client.has_pending_output = true;
1074 }
1075
1076 pub fn handleHistory(
1077 self: *Daemon,
1078 client: *Client,
1079 term: *ghostty_vt.Terminal,
1080 payload: []const u8,
1081 ) !void {
1082 const format: util.HistoryFormat = if (payload.len > 0)
1083 std.meta.intToEnum(util.HistoryFormat, payload[0]) catch .plain
1084 else
1085 .plain;
1086 if (util.serializeTerminal(self.alloc, term, format)) |output| {
1087 defer self.alloc.free(output);
1088 try ipc.appendMessage(self.alloc, &client.write_buf, .History, output);
1089 client.has_pending_output = true;
1090 } else {
1091 try ipc.appendMessage(self.alloc, &client.write_buf, .History, "");
1092 client.has_pending_output = true;
1093 }
1094 }
1095
1096 pub fn handleRun(self: *Daemon, client: *Client, payload: []const u8) !void {
1097 // Reset task tracking so the new command's exit marker is detected.
1098 // Without this, a second `zmx run` on the same session is ignored
1099 // because task_exit_code is still set from the first run.
1100 self.task_exit_code = null;
1101 self.task_ended_at = null;
1102 self.is_task_mode = true;
1103
1104 if (payload.len == 0) return;
1105
1106 // Auto-detect the foreground process on the PTY to determine shell type.
1107 if (self.pty_fd >= 0) {
1108 var name_buf: [64]u8 = undefined;
1109 if (cross.getForegroundProcessName(self.pty_fd, &name_buf)) |name| {
1110 self.is_fish = std.mem.eql(u8, name, "fish");
1111 std.log.debug("foreground process={s} is_fish={}", .{ name, self.is_fish });
1112 }
1113 }
1114 const cmd = payload;
1115
1116 // Daemon appends the task marker so the client never injects
1117 // shell-specific syntax, keeping Ctrl-C recovery clean.
1118 const marker = if (self.is_fish)
1119 "; echo ZMX_TASK_COMPLETED:$status"
1120 else
1121 "; echo ZMX_TASK_COMPLETED:$?";
1122
1123 if (cmd.len > 0 and cmd[cmd.len - 1] == '\r') {
1124 self.queuePtyInput(cmd[0 .. cmd.len - 1]);
1125 } else {
1126 self.queuePtyInput(cmd);
1127 }
1128 self.queuePtyInput(marker);
1129 self.queuePtyInput("\r");
1130
1131 try ipc.appendMessage(self.alloc, &client.write_buf, .Ack, "");
1132 client.has_pending_output = true;
1133 self.has_had_client = true;
1134 std.log.debug("run command len={d}", .{payload.len});
1135 }
1136
1137 pub fn handleOutput(self: *Daemon, payload: []const u8, vt_stream: anytype) !void {
1138 vt_stream.nextSlice(payload);
1139 self.has_pty_output = true;
1140 for (self.clients.items) |client| {
1141 try ipc.appendMessage(self.alloc, &client.write_buf, .Output, payload);
1142 client.has_pending_output = true;
1143 }
1144 if (self.clients.items.len > 0) {
1145 posix.kill(self.pid, posix.SIG.WINCH) catch |err| {
1146 std.log.warn("failed to send SIGWINCH err={s}", .{@errorName(err)});
1147 };
1148 }
1149 }
1150
1151 pub fn handleWrite(self: *Daemon, client: *Client, payload: []const u8) !void {
1152 // Wire format: [u32 path len][path bytes][file content]
1153 if (payload.len < @sizeOf(u32)) return error.InvalidPayload;
1154 const path_len = std.mem.bytesToValue(u32, payload[0..@sizeOf(u32)]);
1155 if (payload.len < @sizeOf(u32) + path_len) return error.InvalidPayload;
1156 const file_path = payload[@sizeOf(u32)..][0..path_len];
1157 const file_content = payload[@sizeOf(u32) + path_len ..];
1158
1159 // Inject file creation through the PTY so it works over SSH.
1160 // Base64-encode content and pipe through printf | base64 -d > file.
1161 // Chunk large files to stay under command-line length limits.
1162 // 48000 is divisible by 3 (clean base64 boundaries) and encodes
1163 // to ~64KB, well under typical ARG_MAX.
1164 const chunk_size = 48000;
1165 var offset: usize = 0;
1166 var is_first = true;
1167
1168 while (offset < file_content.len or is_first) {
1169 const end = @min(offset + chunk_size, file_content.len);
1170 const chunk = file_content[offset..end];
1171
1172 const encoded_len = std.base64.standard.Encoder.calcSize(chunk.len);
1173 const encoded = try self.alloc.alloc(u8, encoded_len);
1174 defer self.alloc.free(encoded);
1175 _ = std.base64.standard.Encoder.encode(encoded, chunk);
1176
1177 self.queuePtyInput("printf '%s' '");
1178 self.queuePtyInput(encoded);
1179 if (is_first) {
1180 self.queuePtyInput("' | base64 -d > '");
1181 } else {
1182 self.queuePtyInput("' | base64 -d >> '");
1183 }
1184 self.queuePtyInput(file_path);
1185 self.queuePtyInput("'");
1186 self.queuePtyInput("\r");
1187
1188 offset = end;
1189 is_first = false;
1190 }
1191
1192 try ipc.appendMessage(self.alloc, &client.write_buf, .Ack, "");
1193 client.has_pending_output = true;
1194 self.has_had_client = true;
1195 std.log.debug(
1196 "write command len={d} file_path={s}",
1197 .{ file_content.len, file_path },
1198 );
1199 }
1200};
1201
1202fn printVersion(cfg: *Cfg) !void {
1203 var buf: [256]u8 = undefined;
1204 var w = std.fs.File.stdout().writer(&buf);
1205 var ver = version;
1206 if (builtin.mode == .Debug) {
1207 ver = git_sha;
1208 }
1209 try w.interface.print(
1210 "zmx\t\t{s}\nghostty_vt\t{s}\nsocket_dir\t{s}\nlog_dir\t\t{s}\n",
1211 .{ ver, ghostty_version, cfg.socket_dir, cfg.log_dir },
1212 );
1213 try w.interface.flush();
1214}
1215
1216fn printCompletions(shell: completions.Shell) !void {
1217 const script = shell.getCompletionScript();
1218 var buf: [8192]u8 = undefined;
1219 var w = std.fs.File.stdout().writer(&buf);
1220 try w.interface.print("{s}\n", .{script});
1221 try w.interface.flush();
1222}
1223
1224fn help() !void {
1225 const help_text =
1226 \\zmx - session persistence for terminal processes
1227 \\
1228 \\Usage: zmx <command> [args...]
1229 \\
1230 \\Commands:
1231 \\ [a]ttach <name> [command...] Attach to session, creating if needed
1232 \\ [r]un <name> [-d] [command...] Send command without attaching
1233 \\ [s]end <name> <text...> Send raw input to session PTY
1234 \\ [p]rint <name> <text...> Inject text into session display
1235 \\ [wr]ite <name> <file_path> Write stdin to file_path through the session
1236 \\ [d]etach Detach all clients (ctrl+\\ for current client)
1237 \\ [l]ist|ls [--short] List active sessions
1238 \\ [k]ill <name>... [--force] Kill session and all attached clients
1239 \\ [hi]story <name> [--vt|--html] Output session scrollback
1240 \\ [w]ait <name>... Wait for session tasks to complete
1241 \\ [t]ail <name>... Follow session output
1242 \\ [c]ompletions <shell> Shell completions (bash, zsh, fish)
1243 \\ [v]ersion Show version
1244 \\ [h]elp Show this help
1245 \\
1246 \\Attach:
1247 \\ This will spawn a login $SHELL with a PTY. You can provide a
1248 \\ command instead of creating a shell.
1249 \\
1250 \\ Examples:
1251 \\ zmx attach dev
1252 \\ zmx attach dev vim
1253 \\
1254 \\History:
1255 \\ This should generally be used with `tail` to print the last lines
1256 \\ of the session's scrollback history.
1257 \\
1258 \\ Examples:
1259 \\ zmx history <session> | tail -100
1260 \\
1261 \\Run:
1262 \\ Commands are passed as-is: do not wrap in quotes.
1263 \\ Commands run sequentially: do not send multiple in parallel.
1264 \\ Avoid interactive programs (pagers, editors, prompts): they hang.
1265 \\
1266 \\ If the command hangs, send Ctrl+C to recover:
1267 \\ zmx run <session> $(printf '\x03')
1268 \\
1269 \\ If the command hangs, print the history to see the error:
1270 \\ zmx history <session> | tail -100
1271 \\
1272 \\ `-d` will detach from the calling terminal. Use `wait` to track
1273 \\ its status.
1274 \\
1275 \\ Examples:
1276 \\ zmx run dev ls
1277 \\ zmx run dev zig build
1278 \\ zmx run dev grep -r TODO src
1279 \\ zmx run dev git -c core.pager=cat diff
1280 \\
1281 \\Send:
1282 \\ Sends raw text to the session's PTY input (fire-and-forget).
1283 \\ Unlike `run`, no completion marker is appended and no exit code
1284 \\ is tracked. Useful for TUI applications, interactive prompts,
1285 \\ or any program that reads stdin directly.
1286 \\
1287 \\ Text is sent byte-for-byte with no automatic carriage return.
1288 \\ Append \r yourself when you want the shell to execute a command.
1289 \\
1290 \\ Text can also be piped via stdin:
1291 \\ printf 'ls -la\r' | zmx send dev
1292 \\
1293 \\ Examples:
1294 \\ printf 'echo hello\r' | zmx send dev
1295 \\ zmx send dev $(printf '\x03')
1296 \\ zmx send dev /compact
1297 \\
1298 \\Print:
1299 \\ Injects text directly into the session display and scrollback.
1300 \\ Never touches the PTY input -- the shell sees nothing.
1301 \\ Caller is responsible for newlines (\\r\\n).
1302 \\
1303 \\ Examples:
1304 \\ printf '\\r\\nhello\\r\\n' | zmx print dev
1305 \\ zmx print dev "$(printf '\\r\\nalert\\r\\n')"
1306 \\
1307 \\Write:
1308 \\ Writes stdin to file_path inside the session. Works over SSH.
1309 \\ file_path can be absolute or relative to the session shell's cwd.
1310 \\ Requires base64 and printf in the remote environment.
1311 \\ Large files are chunked automatically (~48KB per chunk).
1312 \\ File path must not contain single quotes.
1313 \\
1314 \\ Examples:
1315 \\ echo "hello" | zmx write dev /tmp/hello.txt
1316 \\ cat main.zig | zmx write dev src/main.zig
1317 \\
1318 \\Wait:
1319 \\ Used with a detached run task to track its status. Multiple
1320 \\ sessions can be provided.
1321 \\
1322 \\ Examples:
1323 \\ zmx run -d dev sleep 10
1324 \\ zmx wait dev
1325 \\ zmx wait dev other
1326 \\
1327 \\Environment variables:
1328 \\ SHELL Default shell for new sessions
1329 \\ ZMX_DIR Socket directory (priority 1)
1330 \\ XDG_RUNTIME_DIR Socket directory (priority 2)
1331 \\ TMPDIR Socket directory (priority 3)
1332 \\ ZMX_SESSION Session name (injected automatically)
1333 \\ ZMX_SESSION_PREFIX Prefix added to all session names
1334 \\ ZMX_DIR_MODE Sets mode for socket and log directories (octal, defaults to 0750)
1335 \\ ZMX_LOG_MODE Sets mode for log files (octal, defaults to 0640)
1336 \\
1337 ;
1338 var buf: [8192]u8 = undefined;
1339 var w = std.fs.File.stdout().writer(&buf);
1340 try w.interface.print(help_text, .{});
1341 try w.interface.flush();
1342}
1343
1344fn tail(client_socket_fds: std.ArrayList(i32), detached: bool, is_run_cmd: bool) !u8 {
1345 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1346 defer _ = gpa.deinit();
1347 const alloc = gpa.allocator();
1348
1349 var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(alloc, 4);
1350 defer poll_fds.deinit(alloc);
1351
1352 var read_buf = try ipc.SocketBuffer.init(alloc);
1353 defer read_buf.deinit();
1354
1355 var stdout_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
1356 defer stdout_buf.deinit(alloc);
1357
1358 var is_first_line = true;
1359 var task_complete_code: ?u8 = null;
1360
1361 while (true) {
1362 poll_fds.clearRetainingCapacity();
1363
1364 // Poll socket for read
1365 for (client_socket_fds.items) |client_sock_fd| {
1366 try poll_fds.append(alloc, .{
1367 .fd = client_sock_fd,
1368 .events = posix.POLL.IN,
1369 .revents = 0,
1370 });
1371 }
1372
1373 // Poll for write if we have pending data
1374 if (stdout_buf.items.len > 0) {
1375 try poll_fds.append(alloc, .{
1376 .fd = posix.STDOUT_FILENO,
1377 .events = posix.POLL.OUT,
1378 .revents = 0,
1379 });
1380 }
1381
1382 _ = posix.poll(poll_fds.items, -1) catch |err| {
1383 if (err == error.Interrupted) continue; // EINTR from signal, loop again
1384 return err;
1385 };
1386
1387 // Handle socket read (incoming Output messages from daemon)
1388 for (poll_fds.items) |*poll_fd| {
1389 if (poll_fd.revents & posix.POLL.IN != 0) {
1390 const n = read_buf.read(poll_fd.fd) catch |err| {
1391 if (err == error.WouldBlock) continue;
1392 if (err == error.ConnectionResetByPeer or err == error.BrokenPipe) {
1393 return 1;
1394 }
1395 std.log.err("daemon read err={s}", .{@errorName(err)});
1396 return err;
1397 };
1398 if (n == 0) {
1399 // Server closed connection
1400 return 0;
1401 }
1402
1403 while (read_buf.next()) |msg| {
1404 switch (msg.header.tag) {
1405 .Ack => {
1406 if (detached) {
1407 _ = posix.write(posix.STDOUT_FILENO, "command sent!\n") catch |err| blk: {
1408 if (err == error.WouldBlock) break :blk 0;
1409 return err;
1410 };
1411 return 0;
1412 }
1413 },
1414 .Output => {
1415 if (msg.payload.len > 0) {
1416 // strip the first line since it is an echo of
1417 // the command.
1418 if (!detached and is_run_cmd and is_first_line) {
1419 if (std.mem.indexOfScalar(u8, msg.payload, '\n')) |nl| {
1420 is_first_line = false;
1421 if (nl + 1 < msg.payload.len) {
1422 try stdout_buf.appendSlice(alloc, msg.payload[nl + 1 ..]);
1423 }
1424 }
1425 } else {
1426 try stdout_buf.appendSlice(alloc, msg.payload);
1427 }
1428 }
1429 },
1430 .TaskComplete => {
1431 task_complete_code = if (msg.payload.len > 0) msg.payload[0] else 0;
1432 },
1433 else => {},
1434 }
1435 }
1436 }
1437 }
1438
1439 if (stdout_buf.items.len > 0) {
1440 const n = posix.write(posix.STDOUT_FILENO, stdout_buf.items) catch |err| blk: {
1441 if (err == error.WouldBlock) break :blk 0;
1442 return err;
1443 };
1444 if (task_complete_code) |exit_code| {
1445 return exit_code;
1446 }
1447 if (n > 0) {
1448 try stdout_buf.replaceRange(alloc, 0, n, &[_]u8{});
1449 }
1450 }
1451
1452 // Check for HUP/ERR on any socket
1453 for (poll_fds.items) |poll_fd| {
1454 if (poll_fd.revents & (posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
1455 return 0;
1456 }
1457 }
1458 }
1459}
1460
1461fn wait(cfg: *Cfg, matchers: std.ArrayList(SessionMatch)) !void {
1462 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1463 defer _ = gpa.deinit();
1464 const alloc = gpa.allocator();
1465
1466 var stdout_buffer: [1024]u8 = undefined;
1467 var stdout_writer = std.fs.File.stdout().writer(&stdout_buffer);
1468 const stdout = &stdout_writer.interface;
1469
1470 var stderr_buffer: [1024]u8 = undefined;
1471 var stderr_writer = std.fs.File.stderr().writer(&stderr_buffer);
1472 const stderr = &stderr_writer.interface;
1473
1474 // Highest match count seen so far. Lets us distinguish "sessions haven't
1475 // appeared yet" (keep polling) from "sessions we were tracking
1476 // disappeared" (fail -- daemon crashed or was killed).
1477 var max_seen: i32 = 0;
1478 var zero_match_iters: u32 = 0;
1479
1480 var agg_exit_code: u8 = 0;
1481 while (true) {
1482 agg_exit_code = 0;
1483 var sessions = try util.get_session_entries(alloc, cfg.socket_dir);
1484 var total: i32 = 0;
1485 var done: i32 = 0;
1486
1487 for (sessions.items) |session| {
1488 var found = false;
1489 for (matchers.items) |m| {
1490 if (m.matches(session.name)) {
1491 found = true;
1492 break;
1493 }
1494 }
1495 if (!found) {
1496 continue;
1497 }
1498
1499 total += 1;
1500 if (session.is_error) {
1501 // Daemon unreachable (probe timed out). On Timeout the socket
1502 // is no longer deleted, so this session would otherwise
1503 // persist as task_ended_at==0 forever → infinite "still
1504 // waiting". Count it as done+failed so wait terminates.
1505 try stderr.print(
1506 "[{d}] task unreachable: {s} ({s})\n",
1507 .{ std.time.timestamp(), session.name, session.error_name orelse "unknown" },
1508 );
1509 try stderr.flush();
1510 agg_exit_code = 1;
1511 done += 1;
1512 continue;
1513 }
1514 if (session.task_ended_at == 0) {
1515 try stdout.print(
1516 "[{d}] waiting task={s}\n",
1517 .{ std.time.timestamp(), session.name },
1518 );
1519 try stdout.flush();
1520 continue;
1521 }
1522 try stdout.print(
1523 "[{d}] completed task={s} exit_code={d}\n",
1524 .{ session.task_ended_at.?, session.name, session.task_exit_code.? },
1525 );
1526 try stdout.flush();
1527 if (session.task_exit_code != 0) {
1528 agg_exit_code = session.task_exit_code orelse 0;
1529 }
1530 done += 1;
1531 }
1532
1533 for (sessions.items) |session| {
1534 session.deinit(alloc);
1535 }
1536 sessions.deinit(alloc);
1537
1538 // Check disappearance BEFORE completion: if one of N sessions
1539 // crashed and the remaining N-1 happen to be done, total==done
1540 // would be a false success.
1541 if (total < max_seen) {
1542 try stderr.print(
1543 "error: {d} session(s) disappeared before completing\n",
1544 .{max_seen - total},
1545 );
1546 try stderr.flush();
1547 std.process.exit(1);
1548 return;
1549 }
1550 max_seen = total;
1551
1552 if (total > 0 and total == done) {
1553 break;
1554 }
1555
1556 if (max_seen == 0) {
1557 // `zmx run foo && zmx wait foo` is essentially sequential, so
1558 // matching sessions should be visible from the first poll. If
1559 // nothing appears after a few iterations it's almost certainly a
1560 // typo, not a slow start.
1561 zero_match_iters += 1;
1562 if (zero_match_iters >= 3) {
1563 try stderr.print("error: no matching sessions found\n", .{});
1564 try stderr.flush();
1565 std.process.exit(2);
1566 return;
1567 }
1568 }
1569
1570 std.Thread.sleep(1000 * std.time.ns_per_ms);
1571 }
1572
1573 if (agg_exit_code == 0) {
1574 try stdout.print("task(s) completed!\n", .{});
1575 } else {
1576 try stdout.print("task(s) failed!\n", .{});
1577 }
1578 try stdout.flush();
1579
1580 const sessions = try util.get_session_entries(alloc, cfg.socket_dir);
1581 for (sessions.items) |session| {
1582 var found = false;
1583 for (matchers.items) |m| {
1584 if (m.matches(session.name)) {
1585 found = true;
1586 break;
1587 }
1588 }
1589 if (!found) {
1590 continue;
1591 }
1592 if (session.task_exit_code.? > 0) {
1593 try stdout.print("---\n", .{});
1594 try stdout.print("[{d}] failed task={s} exit_status={d}\n\n", .{
1595 session.task_ended_at.?,
1596 session.name,
1597 session.task_exit_code.?,
1598 });
1599 try stdout.print("See the logs:\nzmx history {s}\nzmx attach {s}\n", .{ session.name, session.name });
1600 try stdout.flush();
1601 }
1602 }
1603
1604 std.process.exit(agg_exit_code);
1605}
1606
1607fn list(cfg: *Cfg, short: bool) !void {
1608 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1609 defer _ = gpa.deinit();
1610 const alloc = gpa.allocator();
1611
1612 const current_session = socket.getSeshNameFromEnv();
1613 var buf: [4096]u8 = undefined;
1614 var stdout = std.fs.File.stdout().writer(&buf);
1615
1616 var sessions = try util.get_session_entries(alloc, cfg.socket_dir);
1617 defer {
1618 for (sessions.items) |session| {
1619 session.deinit(alloc);
1620 }
1621 sessions.deinit(alloc);
1622 }
1623
1624 if (sessions.items.len == 0) {
1625 if (short) return;
1626 var errbuf: [4096]u8 = undefined;
1627 var stderr = std.fs.File.stderr().writer(&errbuf);
1628 try stderr.interface.print("no sessions found in {s}\n", .{cfg.socket_dir});
1629 try stderr.interface.flush();
1630 return;
1631 }
1632
1633 std.mem.sort(util.SessionEntry, sessions.items, {}, util.SessionEntry.lessThan);
1634
1635 for (sessions.items) |session| {
1636 try util.writeSessionLine(&stdout.interface, session, short, current_session);
1637 try stdout.interface.flush();
1638 }
1639}
1640
1641fn detachAll(cfg: *Cfg) !void {
1642 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1643 defer _ = gpa.deinit();
1644 const alloc = gpa.allocator();
1645 const session_name = socket.getSeshNameFromEnv();
1646 if (session_name.len == 0) {
1647 std.log.err("ZMX_SESSION env var not found: are you inside a zmx session?", .{});
1648 return;
1649 }
1650
1651 var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
1652 defer dir.close();
1653
1654 const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, session_name) catch |err| switch (err) {
1655 error.NameTooLong => return socket.printSessionNameTooLong(session_name, cfg.socket_dir),
1656 error.OutOfMemory => return err,
1657 };
1658 defer alloc.free(socket_path);
1659 const fd = ipc.connectSession(socket_path) catch |err| {
1660 std.log.err("session unresponsive: {s}", .{@errorName(err)});
1661 if (err == error.ConnectionRefused) socket.cleanupStaleSocket(dir, session_name);
1662 return;
1663 };
1664 defer posix.close(fd);
1665 ipc.send(fd, .DetachAll, "") catch |err| switch (err) {
1666 error.BrokenPipe, error.ConnectionResetByPeer => return,
1667 else => return err,
1668 };
1669}
1670
1671fn kill(cfg: *Cfg, session_name: []const u8, force: bool) !void {
1672 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1673 defer _ = gpa.deinit();
1674 const alloc = gpa.allocator();
1675
1676 const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, session_name) catch |err| switch (err) {
1677 error.NameTooLong => return socket.printSessionNameTooLong(session_name, cfg.socket_dir),
1678 error.OutOfMemory => return err,
1679 };
1680 defer alloc.free(socket_path);
1681
1682 var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
1683 defer dir.close();
1684
1685 const exists = try socket.sessionExists(dir, session_name);
1686 if (!exists) {
1687 var buf: [4096]u8 = undefined;
1688 var w = std.fs.File.stderr().writer(&buf);
1689 w.interface.print("error: session \"{s}\" does not exist\n", .{session_name}) catch {};
1690 w.interface.flush() catch {};
1691 return error.SessionNotFound;
1692 }
1693 const fd = ipc.connectSession(socket_path) catch |err| {
1694 std.log.err("session unresponsive: {s}", .{@errorName(err)});
1695 var buf: [4096]u8 = undefined;
1696 var w = std.fs.File.stdout().writer(&buf);
1697 if (force or err == error.ConnectionRefused) {
1698 socket.cleanupStaleSocket(dir, session_name);
1699 w.interface.print("cleaned up stale session {s}\n", .{session_name}) catch {};
1700 } else {
1701 w.interface.print(
1702 "session {s} is unresponsive ({s})\ndaemon may be busy: try again, add `--force` flag, or kill the process directly\n",
1703 .{ session_name, @errorName(err) },
1704 ) catch {};
1705 }
1706 w.interface.flush() catch {};
1707 return;
1708 };
1709
1710 defer posix.close(fd);
1711 ipc.send(fd, .Kill, "") catch |err| switch (err) {
1712 error.BrokenPipe, error.ConnectionResetByPeer => return,
1713 else => return err,
1714 };
1715
1716 var buf: [100]u8 = undefined;
1717 var w = std.fs.File.stdout().writer(&buf);
1718 try w.interface.print("killed session {s}\n", .{session_name});
1719 try w.interface.flush();
1720}
1721
1722fn history(cfg: *Cfg, session_name: []const u8, format: util.HistoryFormat) !void {
1723 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1724 defer _ = gpa.deinit();
1725 const alloc = gpa.allocator();
1726
1727 const socket_path = socket.getSocketPath(alloc, cfg.socket_dir, session_name) catch |err| switch (err) {
1728 error.NameTooLong => return socket.printSessionNameTooLong(session_name, cfg.socket_dir),
1729 error.OutOfMemory => return err,
1730 };
1731 defer alloc.free(socket_path);
1732
1733 var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
1734 defer dir.close();
1735
1736 const exists = try socket.sessionExists(dir, session_name);
1737 if (!exists) {
1738 var buf: [4096]u8 = undefined;
1739 var w = std.fs.File.stderr().writer(&buf);
1740 w.interface.print("error: session \"{s}\" does not exist\n", .{session_name}) catch {};
1741 w.interface.flush() catch {};
1742 return error.SessionNotFound;
1743 }
1744 const fd = ipc.connectSession(socket_path) catch |err| {
1745 std.log.err("session unresponsive: {s}", .{@errorName(err)});
1746 if (err == error.ConnectionRefused) socket.cleanupStaleSocket(dir, session_name);
1747 return;
1748 };
1749 defer posix.close(fd);
1750
1751 const format_byte = [_]u8{@intFromEnum(format)};
1752 ipc.send(fd, .History, &format_byte) catch |err| switch (err) {
1753 error.BrokenPipe, error.ConnectionResetByPeer => return,
1754 else => return err,
1755 };
1756
1757 var sb = try ipc.SocketBuffer.init(alloc);
1758 defer sb.deinit();
1759
1760 while (true) {
1761 var poll_fds = [_]posix.pollfd{.{ .fd = fd, .events = posix.POLL.IN, .revents = 0 }};
1762 const poll_result = posix.poll(&poll_fds, 5000) catch return;
1763 if (poll_result == 0) {
1764 std.log.err("timeout waiting for history response", .{});
1765 return;
1766 }
1767
1768 const n = sb.read(fd) catch return;
1769 if (n == 0) return;
1770
1771 while (sb.next()) |msg| {
1772 if (msg.header.tag == .History) {
1773 _ = posix.write(posix.STDOUT_FILENO, msg.payload) catch return;
1774 return;
1775 }
1776 }
1777 }
1778}
1779
1780fn switchSesh(daemon: *Daemon, current_sesh: []const u8) !void {
1781 // we want daemon.session_name because that's the session name the user provided during zmx attach
1782 // instead of the name of the session they are currently inside of.
1783 const next_session = daemon.session_name;
1784
1785 const socket_path = socket.getSocketPath(daemon.alloc, daemon.cfg.socket_dir, current_sesh) catch |err| switch (err) {
1786 error.NameTooLong => return socket.printSessionNameTooLong(current_sesh, daemon.cfg.socket_dir),
1787 error.OutOfMemory => return err,
1788 };
1789 defer daemon.alloc.free(socket_path);
1790
1791 var dir = try std.fs.openDirAbsolute(daemon.cfg.socket_dir, .{});
1792 defer dir.close();
1793
1794 const exists = try socket.sessionExists(dir, current_sesh);
1795 if (!exists) {
1796 var buf: [4096]u8 = undefined;
1797 var w = std.fs.File.stderr().writer(&buf);
1798 w.interface.print("error: session \"{s}\" does not exist\n", .{current_sesh}) catch {};
1799 w.interface.flush() catch {};
1800 return error.SessionNotFound;
1801 }
1802 const fd = ipc.connectSession(socket_path) catch |err| {
1803 std.log.err("session unresponsive: {s}", .{@errorName(err)});
1804 if (err == error.ConnectionRefused) socket.cleanupStaleSocket(dir, current_sesh);
1805 return;
1806 };
1807 defer posix.close(fd);
1808
1809 ipc.send(fd, .Switch, next_session) catch |err| switch (err) {
1810 error.BrokenPipe, error.ConnectionResetByPeer => return,
1811 else => return err,
1812 };
1813}
1814
1815fn attach(daemon: *Daemon) !void {
1816 const sesh = socket.getSeshNameFromEnv();
1817 if (sesh.len > 0) {
1818 return switchSesh(daemon, sesh);
1819 }
1820
1821 const result = try daemon.ensureSession();
1822 if (result.is_daemon) return;
1823
1824 const client_sock = try socket.sessionConnect(daemon.socket_path);
1825 std.log.info("attached session={s}", .{daemon.session_name});
1826 // This is typically used with tcsetattr() to modify terminal settings.
1827 // - you first get the current settings with tcgetattr()
1828 // - modify the desired attributes in the termios structure
1829 // - then apply the changes with tcsetattr().
1830 // This prevents unintended side effects by preserving other settings.
1831 // restore stdin fd to its original state after exiting.
1832 // Use TCSAFLUSH to discard any unread input, preventing stale input after detach.
1833 //
1834 // tcgetattr fails when stdin is not a TTY (e.g. piped). In that case,
1835 // skip terminal setup entirely rather than applying undefined stack bytes
1836 // via tcsetattr.
1837 var orig_termios: cross.c.termios = undefined;
1838 const stdin_is_tty = cross.c.tcgetattr(posix.STDIN_FILENO, &orig_termios) == 0;
1839
1840 defer {
1841 if (stdin_is_tty) {
1842 _ = cross.c.tcsetattr(posix.STDIN_FILENO, cross.c.TCSAFLUSH, &orig_termios);
1843 }
1844 // Reset terminal modes on detach:
1845 const restore_seq = "\x1bc";
1846 _ = posix.write(posix.STDOUT_FILENO, restore_seq) catch {};
1847 }
1848
1849 if (stdin_is_tty) {
1850 var raw_termios = orig_termios;
1851 // set raw mode after successful connection.
1852 // disables canonical mode (line buffering), input echoing, signal generation from
1853 // control characters (like Ctrl+C), and flow control.
1854 cross.c.cfmakeraw(&raw_termios);
1855
1856 // Additional granular raw mode settings for precise control
1857 // (matches what abduco and shpool do)
1858 raw_termios.c_cc[cross.c.VLNEXT] = cross.c._POSIX_VDISABLE; // Disable literal-next (Ctrl-V)
1859 // We want to intercept Ctrl+\ (SIGQUIT) so we can use it as a detach key
1860 raw_termios.c_cc[cross.c.VQUIT] = cross.c._POSIX_VDISABLE; // Disable SIGQUIT (Ctrl+\)
1861 raw_termios.c_cc[cross.c.VMIN] = 1; // Minimum chars to read: return after 1 byte
1862 raw_termios.c_cc[cross.c.VTIME] = 0; // Read timeout: no timeout, return immediately
1863
1864 _ = cross.c.tcsetattr(posix.STDIN_FILENO, cross.c.TCSANOW, &raw_termios);
1865 }
1866
1867 // Clear screen before attaching. This provides a clean slate before
1868 // the session restore.
1869 const clear_seq = "\x1b[2J\x1b[H";
1870 _ = try posix.write(posix.STDOUT_FILENO, clear_seq);
1871
1872 const looper = try clientLoop(client_sock);
1873 switch (looper.kind) {
1874 .detach => return,
1875 .switch_session => {
1876 if (looper.session_name) |session_name| {
1877 var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
1878 const cwd = std.posix.getcwd(&cwd_buf) catch "";
1879 const target_path = socket.getSocketPath(
1880 daemon.alloc,
1881 daemon.cfg.socket_dir,
1882 session_name,
1883 ) catch |err| switch (err) {
1884 error.NameTooLong => return socket.printSessionNameTooLong(
1885 session_name,
1886 daemon.cfg.socket_dir,
1887 ),
1888 error.OutOfMemory => return err,
1889 };
1890
1891 const clients = try std.ArrayList(*Client).initCapacity(daemon.alloc, 10);
1892 var target_daemon = Daemon{
1893 .running = true,
1894 .cfg = daemon.cfg,
1895 .alloc = daemon.alloc,
1896 .clients = clients,
1897 .session_name = session_name,
1898 .socket_path = target_path,
1899 .pid = undefined,
1900 .cwd = cwd,
1901 .created_at = @intCast(std.time.timestamp()),
1902 .leader_client_fd = null,
1903 };
1904 return attach(&target_daemon);
1905 }
1906 },
1907 }
1908}
1909
1910fn writeFile(daemon: *Daemon, file_path: []const u8) !void {
1911 var buf: [4096]u8 = undefined;
1912 var w = std.fs.File.stdout().writer(&buf);
1913 const sesh_result = try daemon.ensureSession();
1914 if (sesh_result.is_daemon) return;
1915
1916 if (sesh_result.created) {
1917 try w.interface.print("session \"{s}\" created\n", .{daemon.session_name});
1918 try w.interface.flush();
1919 }
1920 const stdin_fd = posix.STDIN_FILENO;
1921 var stdin_buf = try std.ArrayList(u8).initCapacity(daemon.alloc, 4096);
1922 defer stdin_buf.deinit(daemon.alloc);
1923
1924 while (true) {
1925 var tmp: [4096]u8 = undefined;
1926 const n = posix.read(stdin_fd, &tmp) catch |err| {
1927 if (err == error.WouldBlock) break;
1928 return err;
1929 };
1930 if (n == 0) break;
1931 try stdin_buf.appendSlice(daemon.alloc, tmp[0..n]);
1932 }
1933
1934 const socket_path = socket.getSocketPath(
1935 daemon.alloc,
1936 daemon.cfg.socket_dir,
1937 daemon.session_name,
1938 ) catch |err| switch (err) {
1939 error.NameTooLong => return socket.printSessionNameTooLong(
1940 daemon.session_name,
1941 daemon.cfg.socket_dir,
1942 ),
1943 error.OutOfMemory => return err,
1944 };
1945 var dir = try std.fs.openDirAbsolute(daemon.cfg.socket_dir, .{});
1946 defer dir.close();
1947
1948 const result = ipc.probeSession(daemon.alloc, socket_path) catch |err| {
1949 std.log.err("session unresponsive: {s}", .{@errorName(err)});
1950 if (err == error.ConnectionRefused) {
1951 socket.cleanupStaleSocket(dir, daemon.session_name);
1952 w.interface.print("cleaned up stale session {s}\n", .{daemon.session_name}) catch {};
1953 } else {
1954 w.interface.print(
1955 "session {s} is unresponsive ({s})\ndaemon may be busy: try again\n",
1956 .{ daemon.session_name, @errorName(err) },
1957 ) catch {};
1958 }
1959 w.interface.flush() catch {};
1960 return;
1961 };
1962
1963 defer posix.close(result.fd);
1964
1965 // Build wire payload: [u32 path len][path bytes][file content]
1966 var wire_buf = try std.ArrayList(u8).initCapacity(
1967 daemon.alloc,
1968 @sizeOf(u32) + file_path.len + stdin_buf.items.len,
1969 );
1970 defer wire_buf.deinit(daemon.alloc);
1971 const path_len: u32 = @intCast(file_path.len);
1972 try wire_buf.appendSlice(daemon.alloc, std.mem.asBytes(&path_len));
1973 try wire_buf.appendSlice(daemon.alloc, file_path);
1974 try wire_buf.appendSlice(daemon.alloc, stdin_buf.items);
1975
1976 ipc.send(result.fd, .Write, wire_buf.items) catch |err| switch (err) {
1977 error.BrokenPipe, error.ConnectionResetByPeer => return,
1978 else => return err,
1979 };
1980
1981 var sb = try ipc.SocketBuffer.init(daemon.alloc);
1982 defer sb.deinit();
1983
1984 const n = sb.read(result.fd) catch return error.ReadFailed;
1985 if (n == 0) return error.ConnectionClosed;
1986
1987 while (sb.next()) |msg| {
1988 if (msg.header.tag == .Ack) {
1989 try w.interface.print("file created {s}\n", .{file_path});
1990 try w.interface.flush();
1991 return;
1992 }
1993 }
1994
1995 return error.NoAckReceived;
1996}
1997
1998fn send(cfg: *Cfg, session_name: []const u8, socket_path: []const u8, text_parts: [][]const u8, tag: ipc.Tag) !void {
1999 const alloc = std.heap.c_allocator;
2000 var buf: [4096]u8 = undefined;
2001 var w = std.fs.File.stdout().writer(&buf);
2002
2003 var payload = std.ArrayList(u8).empty;
2004 defer payload.deinit(alloc);
2005
2006 if (text_parts.len > 0) {
2007 for (text_parts, 0..) |part, i| {
2008 if (i > 0) try payload.append(alloc, ' ');
2009 try payload.appendSlice(alloc, part);
2010 }
2011 } else {
2012 // Read from stdin when no text arguments provided.
2013 const stdin_fd = posix.STDIN_FILENO;
2014 if (!std.posix.isatty(stdin_fd)) {
2015 while (true) {
2016 var tmp: [4096]u8 = undefined;
2017 const n = posix.read(stdin_fd, &tmp) catch |err| {
2018 if (err == error.WouldBlock) break;
2019 return err;
2020 };
2021 if (n == 0) break;
2022 try payload.appendSlice(alloc, tmp[0..n]);
2023 }
2024 // Strip trailing newline from piped input; the caller is
2025 // responsible for including \r when submission is desired.
2026 // For .Output the caller controls exact bytes, so don't strip.
2027 if (tag != .Output and payload.items.len > 0 and payload.items[payload.items.len - 1] == '\n') {
2028 _ = payload.pop();
2029 }
2030 }
2031 }
2032
2033 if (payload.items.len == 0) return error.TextRequired;
2034
2035 var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
2036 defer dir.close();
2037
2038 const probe_result = ipc.probeSession(alloc, socket_path) catch |err| {
2039 std.log.err("session unresponsive: {s}", .{@errorName(err)});
2040 if (err == error.ConnectionRefused) {
2041 socket.cleanupStaleSocket(dir, session_name);
2042 try w.interface.print("cleaned up stale session {s}\n", .{session_name});
2043 } else {
2044 try w.interface.print(
2045 "session {s} is unresponsive ({s})\ndaemon may be busy: try again\n",
2046 .{ session_name, @errorName(err) },
2047 );
2048 }
2049 try w.interface.flush();
2050 return;
2051 };
2052 defer posix.close(probe_result.fd);
2053
2054 ipc.send(probe_result.fd, tag, payload.items) catch |err| switch (err) {
2055 error.ConnectionResetByPeer, error.BrokenPipe => return,
2056 else => return err,
2057 };
2058}
2059
2060fn run(daemon: *Daemon, detached: bool, command_args: [][]const u8) !void {
2061 const alloc = daemon.alloc;
2062 var buf: [4096]u8 = undefined;
2063 var w = std.fs.File.stdout().writer(&buf);
2064
2065 var cmd_to_send: ?[]const u8 = null;
2066 var allocated_cmd: ?[]u8 = null;
2067 defer if (allocated_cmd) |cmd| alloc.free(cmd);
2068
2069 const result = try daemon.ensureSession();
2070 if (result.is_daemon) return;
2071
2072 if (result.created) {
2073 try w.interface.print("session \"{s}\" created\n", .{daemon.session_name});
2074 try w.interface.flush();
2075 }
2076
2077 if (command_args.len > 0) {
2078 var cmd_list = std.ArrayList(u8).empty;
2079 defer cmd_list.deinit(alloc);
2080
2081 for (command_args, 0..) |arg, i| {
2082 if (i > 0) try cmd_list.append(alloc, ' ');
2083 if (util.shellNeedsQuoting(arg)) {
2084 const quoted = try util.shellQuote(alloc, arg);
2085 defer alloc.free(quoted);
2086 try cmd_list.appendSlice(alloc, quoted);
2087 } else {
2088 try cmd_list.appendSlice(alloc, arg);
2089 }
2090 }
2091
2092 // \r, not \n: once the shell is at the readline prompt the PTY is in
2093 // raw mode; readline's accept-line binds to CR. The first-ever run
2094 // works with \n only because it arrives during shell startup while
2095 // the line discipline is still canonical.
2096 try cmd_list.append(alloc, '\r');
2097
2098 cmd_to_send = try cmd_list.toOwnedSlice(alloc);
2099 allocated_cmd = @constCast(cmd_to_send.?);
2100 } else {
2101 const stdin_fd = posix.STDIN_FILENO;
2102 if (!std.posix.isatty(stdin_fd)) {
2103 var stdin_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
2104 defer stdin_buf.deinit(alloc);
2105
2106 while (true) {
2107 var tmp: [4096]u8 = undefined;
2108 const n = posix.read(stdin_fd, &tmp) catch |err| {
2109 if (err == error.WouldBlock) break;
2110 return err;
2111 };
2112 if (n == 0) break;
2113 try stdin_buf.appendSlice(alloc, tmp[0..n]);
2114 }
2115
2116 if (stdin_buf.items.len > 0) {
2117 // Normalize any trailing newline to CR so readline (raw mode)
2118 // accepts each line.
2119 if (stdin_buf.items[stdin_buf.items.len - 1] == '\n') {
2120 stdin_buf.items[stdin_buf.items.len - 1] = '\r';
2121 } else {
2122 try stdin_buf.append(alloc, '\r');
2123 }
2124
2125 cmd_to_send = try alloc.dupe(u8, stdin_buf.items);
2126 allocated_cmd = @constCast(cmd_to_send.?);
2127 }
2128 }
2129 }
2130
2131 if (cmd_to_send == null) {
2132 return error.CommandRequired;
2133 }
2134
2135 const client_sock = ipc.connectSession(daemon.socket_path) catch |err| {
2136 std.log.err("session not ready: {s}", .{@errorName(err)});
2137 return error.SessionNotReady;
2138 };
2139 defer posix.close(client_sock);
2140
2141 var fds = try std.ArrayList(i32).initCapacity(alloc, 1);
2142 defer fds.deinit(alloc);
2143 try fds.append(alloc, client_sock);
2144
2145 ipc.send(client_sock, .Run, cmd_to_send.?) catch |err| switch (err) {
2146 error.ConnectionResetByPeer, error.BrokenPipe => return,
2147 else => return err,
2148 };
2149
2150 const exit_code = try tail(fds, detached, true);
2151 posix.exit(exit_code);
2152}
2153
2154const ClientResult = struct {
2155 kind: enum {
2156 detach,
2157 switch_session,
2158 },
2159 session_name: ?[]const u8,
2160};
2161
2162/// clientLoop sends ipc commands to its corresponding daemon. It uses poll() as its non-blocking
2163/// mechanism. It will send stdin to the daemon and receive stdout from the daemon.
2164fn clientLoop(client_sock_fd: i32) !ClientResult {
2165 // use c_allocator to avoid "reached unreachable code" panic in DebugAllocator when forking
2166 const alloc = std.heap.c_allocator;
2167 defer posix.close(client_sock_fd);
2168
2169 try openSignalPipe();
2170 installWakeHandler(posix.SIG.WINCH);
2171
2172 // Make socket non-blocking to avoid blocking on writes
2173 var sock_flags = try posix.fcntl(client_sock_fd, posix.F.GETFL, 0);
2174 sock_flags |= O_NONBLOCK;
2175 _ = try posix.fcntl(client_sock_fd, posix.F.SETFL, sock_flags);
2176
2177 // Buffer for outgoing socket writes
2178 var sock_write_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
2179 defer sock_write_buf.deinit(alloc);
2180
2181 // Send init message with terminal size (buffered)
2182 const size = ipc.getTerminalSize(posix.STDOUT_FILENO);
2183 try ipc.appendMessage(alloc, &sock_write_buf, .Init, std.mem.asBytes(&size));
2184
2185 var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(alloc, 4);
2186 defer poll_fds.deinit(alloc);
2187
2188 var read_buf = try ipc.SocketBuffer.init(alloc);
2189 defer read_buf.deinit();
2190
2191 var stdout_buf = try std.ArrayList(u8).initCapacity(alloc, 4096);
2192 defer stdout_buf.deinit(alloc);
2193
2194 const stdin_fd = posix.STDIN_FILENO;
2195
2196 // Make stdin non-blocking. O_NONBLOCK is set on the open file description,
2197 // which is shared with the parent shell; restore on exit to avoid
2198 // corrupting the parent's stdin.
2199 const stdin_orig_flags = try posix.fcntl(stdin_fd, posix.F.GETFL, 0);
2200 _ = try posix.fcntl(stdin_fd, posix.F.SETFL, stdin_orig_flags | O_NONBLOCK);
2201 defer _ = posix.fcntl(stdin_fd, posix.F.SETFL, stdin_orig_flags) catch {};
2202
2203 while (true) {
2204 poll_fds.clearRetainingCapacity();
2205
2206 try poll_fds.append(alloc, .{
2207 .fd = stdin_fd,
2208 .events = posix.POLL.IN,
2209 .revents = 0,
2210 });
2211
2212 // Poll socket for read, and also for write if we have pending data
2213 var sock_events: i16 = posix.POLL.IN;
2214 if (sock_write_buf.items.len > 0) {
2215 sock_events |= posix.POLL.OUT;
2216 }
2217 try poll_fds.append(alloc, .{
2218 .fd = client_sock_fd,
2219 .events = sock_events,
2220 .revents = 0,
2221 });
2222
2223 try poll_fds.append(alloc, .{ .fd = sig_pipe[0], .events = posix.POLL.IN, .revents = 0 });
2224
2225 if (stdout_buf.items.len > 0) {
2226 try poll_fds.append(alloc, .{
2227 .fd = posix.STDOUT_FILENO,
2228 .events = posix.POLL.OUT,
2229 .revents = 0,
2230 });
2231 }
2232
2233 _ = try posix.poll(poll_fds.items, -1);
2234
2235 if (poll_fds.items[2].revents & posix.POLL.IN != 0) {
2236 drainSignalPipe();
2237 const next_size = ipc.getTerminalSize(posix.STDOUT_FILENO);
2238 try ipc.appendMessage(alloc, &sock_write_buf, .Resize, std.mem.asBytes(&next_size));
2239 }
2240
2241 // Handle stdin -> socket (Input)
2242 const inp_flags = (posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL);
2243 if (poll_fds.items[0].revents & inp_flags != 0) {
2244 var buf: [4096]u8 = undefined;
2245 const n_opt: ?usize = posix.read(stdin_fd, &buf) catch |err| blk: {
2246 if (err == error.WouldBlock) break :blk null;
2247 return err;
2248 };
2249
2250 if (n_opt) |n| {
2251 if (n > 0) {
2252 // Check for detach sequences (ctrl+\ as first byte or Kitty escape sequence)
2253 if (util.isCtrlBackslash(buf[0..n])) {
2254 try ipc.appendMessage(alloc, &sock_write_buf, .Detach, "");
2255 } else {
2256 try ipc.appendMessage(alloc, &sock_write_buf, .Input, buf[0..n]);
2257 }
2258 } else {
2259 // EOF on stdin
2260 return ClientResult{ .kind = .detach, .session_name = null };
2261 }
2262 }
2263 }
2264
2265 // Handle socket read (incoming Output messages from daemon)
2266 if (poll_fds.items[1].revents & posix.POLL.IN != 0) {
2267 const n = read_buf.read(client_sock_fd) catch |err| {
2268 if (err == error.WouldBlock) continue;
2269 if (err == error.ConnectionResetByPeer or err == error.BrokenPipe) {
2270 return ClientResult{ .kind = .detach, .session_name = null };
2271 }
2272 std.log.err("daemon read err={s}", .{@errorName(err)});
2273 return err;
2274 };
2275 if (n == 0) {
2276 // Server closed connection
2277 return ClientResult{ .kind = .detach, .session_name = null };
2278 }
2279
2280 while (read_buf.next()) |msg| {
2281 switch (msg.header.tag) {
2282 .Output => {
2283 if (msg.payload.len > 0) {
2284 try stdout_buf.appendSlice(alloc, msg.payload);
2285 }
2286 },
2287 .Resize => {
2288 // daemon is asking for the client's window size usually in response
2289 // to this client being set as leader.
2290 const next_size = ipc.getTerminalSize(posix.STDOUT_FILENO);
2291 try ipc.appendMessage(
2292 alloc,
2293 &sock_write_buf,
2294 .Resize,
2295 std.mem.asBytes(&next_size),
2296 );
2297 },
2298 .Switch => {
2299 return ClientResult{ .kind = .switch_session, .session_name = try alloc.dupe(u8, msg.payload) };
2300 },
2301 else => {},
2302 }
2303 }
2304 }
2305
2306 // Handle socket write (flush buffered messages to daemon)
2307 if (poll_fds.items[1].revents & posix.POLL.OUT != 0) {
2308 if (sock_write_buf.items.len > 0) {
2309 const n = posix.write(client_sock_fd, sock_write_buf.items) catch |err| blk: {
2310 if (err == error.WouldBlock) break :blk 0;
2311 if (err == error.ConnectionResetByPeer or err == error.BrokenPipe) {
2312 return ClientResult{ .kind = .detach, .session_name = null };
2313 }
2314 return err;
2315 };
2316 if (n > 0) {
2317 try sock_write_buf.replaceRange(alloc, 0, n, &[_]u8{});
2318 }
2319 }
2320 }
2321
2322 if (stdout_buf.items.len > 0) {
2323 const n = posix.write(posix.STDOUT_FILENO, stdout_buf.items) catch |err| blk: {
2324 if (err == error.WouldBlock) break :blk 0;
2325 return err;
2326 };
2327 if (n > 0) {
2328 try stdout_buf.replaceRange(alloc, 0, n, &[_]u8{});
2329 }
2330 }
2331
2332 if (poll_fds.items[1].revents & (posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
2333 return ClientResult{ .kind = .detach, .session_name = null };
2334 }
2335 }
2336}
2337
2338/// dameonLoop is what the daemon runs to send and receive ipc commands from its corresponding
2339/// clients. It uses poll() as its non-blocking mechanism.
2340fn daemonLoop(daemon: *Daemon, server_sock_fd: i32, pty_fd: i32) !void {
2341 std.log.info("daemon started session={s} pty_fd={d}", .{ daemon.session_name, pty_fd });
2342 daemon.pty_fd = pty_fd;
2343 try openSignalPipe();
2344 installWakeHandler(posix.SIG.TERM);
2345 var poll_fds = try std.ArrayList(posix.pollfd).initCapacity(daemon.alloc, 8);
2346 defer poll_fds.deinit(daemon.alloc);
2347
2348 const init_size = ipc.getTerminalSize(pty_fd);
2349 var term = try ghostty_vt.Terminal.init(daemon.alloc, .{
2350 .cols = init_size.cols,
2351 .rows = init_size.rows,
2352 .max_scrollback = daemon.cfg.max_scrollback,
2353 });
2354 defer term.deinit(daemon.alloc);
2355 var vt_stream = term.vtStream();
2356 defer vt_stream.deinit();
2357
2358 daemon_loop: while (daemon.running) {
2359 poll_fds.clearRetainingCapacity();
2360
2361 try poll_fds.append(daemon.alloc, .{
2362 .fd = server_sock_fd,
2363 .events = posix.POLL.IN,
2364 .revents = 0,
2365 });
2366
2367 var pty_events: i16 = posix.POLL.IN;
2368 if (daemon.pty_write_buf.items.len > 0) {
2369 pty_events |= posix.POLL.OUT;
2370 }
2371 try poll_fds.append(daemon.alloc, .{
2372 .fd = pty_fd,
2373 .events = pty_events,
2374 .revents = 0,
2375 });
2376
2377 try poll_fds.append(daemon.alloc, .{ .fd = sig_pipe[0], .events = posix.POLL.IN, .revents = 0 });
2378
2379 for (daemon.clients.items) |client| {
2380 var events: i16 = posix.POLL.IN;
2381 if (client.has_pending_output) {
2382 events |= posix.POLL.OUT;
2383 }
2384 try poll_fds.append(daemon.alloc, .{
2385 .fd = client.socket_fd,
2386 .events = events,
2387 .revents = 0,
2388 });
2389 }
2390
2391 _ = try posix.poll(poll_fds.items, -1);
2392
2393 if (poll_fds.items[2].revents & posix.POLL.IN != 0) {
2394 drainSignalPipe();
2395 std.log.info(
2396 "SIGTERM received, shutting down gracefully session={s}",
2397 .{daemon.session_name},
2398 );
2399 break :daemon_loop;
2400 }
2401
2402 if (poll_fds.items[0].revents & (posix.POLL.ERR | posix.POLL.HUP | posix.POLL.NVAL) != 0) {
2403 std.log.err("server socket error revents={d}", .{poll_fds.items[0].revents});
2404 break :daemon_loop;
2405 } else if (poll_fds.items[0].revents & posix.POLL.IN != 0) {
2406 const client_fd = try posix.accept(
2407 server_sock_fd,
2408 null,
2409 null,
2410 posix.SOCK.NONBLOCK | posix.SOCK.CLOEXEC,
2411 );
2412 const client = try daemon.alloc.create(Client);
2413 client.* = Client{
2414 .alloc = daemon.alloc,
2415 .socket_fd = client_fd,
2416 .read_buf = try ipc.SocketBuffer.init(daemon.alloc),
2417 .write_buf = undefined,
2418 };
2419 client.write_buf = try std.ArrayList(u8).initCapacity(client.alloc, 4096);
2420 try daemon.clients.append(daemon.alloc, client);
2421 std.log.info(
2422 "client connected fd={d} total={d}",
2423 .{ client_fd, daemon.clients.items.len },
2424 );
2425 }
2426
2427 const inp_flags = posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL;
2428 if (poll_fds.items[1].revents & inp_flags != 0) {
2429 // Read from PTY
2430 var buf: [4096]u8 = undefined;
2431 const n_opt: ?usize = posix.read(pty_fd, &buf) catch |err| blk: {
2432 if (err == error.WouldBlock) break :blk null;
2433 break :blk 0;
2434 };
2435
2436 if (n_opt) |n| {
2437 if (n == 0) {
2438 // EOF: Shell exited
2439 std.log.info("shell exited pty_fd={d}", .{pty_fd});
2440 break :daemon_loop;
2441 } else {
2442 // Feed PTY output to terminal emulator for state tracking
2443 vt_stream.nextSlice(buf[0..n]);
2444 daemon.has_pty_output = true;
2445
2446 // When no real terminal client has attached yet, respond to
2447 // terminal queries (e.g. DA1/DA2) on behalf of the terminal.
2448 // This prevents fish from waiting 10s for unanswered queries.
2449 // `has_terminal_client` is only set when a client sends .Init
2450 // (a real zmx attach), not when a `zmx run` tail-only client
2451 // connects.
2452 if (!daemon.has_terminal_client and
2453 daemon.pty_write_buf.items.len < Daemon.PTY_WRITE_BUF_MAX)
2454 {
2455 util.respondToDeviceAttributes(daemon.alloc, &daemon.pty_write_buf, buf[0..n]);
2456 }
2457
2458 // In run mode, scan output for exit code marker
2459 if (daemon.is_task_mode and daemon.task_exit_code == null) {
2460 if (util.findTaskExitMarker(buf[0..n])) |exit_code| {
2461 daemon.task_exit_code = exit_code;
2462 daemon.task_ended_at = @intCast(std.time.timestamp());
2463
2464 std.log.info("task completed exit_code={d}", .{exit_code});
2465
2466 // Notify connected clients
2467 for (daemon.clients.items) |c| {
2468 ipc.appendMessage(daemon.alloc, &c.write_buf, .TaskComplete, &[_]u8{exit_code}) catch {};
2469 c.has_pending_output = true;
2470 }
2471 }
2472 }
2473
2474 // Broadcast data to all clients.
2475 // Rewrite OSC 133;A to include redraw=0 so the outer terminal
2476 // does not clear prompt lines on resize (issue #111).
2477 const broadcast_data = util.rewritePromptRedraw(daemon.alloc, buf[0..n]) orelse buf[0..n];
2478 defer if (broadcast_data.ptr != buf[0..n].ptr) daemon.alloc.free(broadcast_data);
2479 for (daemon.clients.items) |client| {
2480 ipc.appendMessage(daemon.alloc, &client.write_buf, .Output, broadcast_data) catch |err| {
2481 std.log.warn(
2482 "failed to buffer output for client err={s}",
2483 .{@errorName(err)},
2484 );
2485 continue;
2486 };
2487 client.has_pending_output = true;
2488 }
2489 }
2490 }
2491 }
2492
2493 if (poll_fds.items[1].revents & posix.POLL.OUT != 0) {
2494 while (daemon.pty_write_buf.items.len > 0) {
2495 const n = posix.write(pty_fd, daemon.pty_write_buf.items) catch |err| {
2496 if (err != error.WouldBlock) {
2497 std.log.warn("pty write failed: {s}", .{@errorName(err)});
2498 daemon.pty_write_buf.clearRetainingCapacity();
2499 }
2500 break;
2501 };
2502 if (n == 0) break;
2503 daemon.pty_write_buf.replaceRange(daemon.alloc, 0, n, &[_]u8{}) catch unreachable;
2504 }
2505 }
2506
2507 var i: usize = daemon.clients.items.len;
2508 // Only iterate over clients that were present when poll_fds was constructed
2509 // poll_fds contains [server, pty, sig_pipe, client0, client1, ...]
2510 // So number of clients in poll_fds is poll_fds.items.len - 3
2511 const num_polled_clients = poll_fds.items.len - 3;
2512 if (i > num_polled_clients) {
2513 // If we have more clients than polled (i.e. we just accepted one), start from the
2514 // polled ones
2515 i = num_polled_clients;
2516 }
2517
2518 clients_loop: while (i > 0) {
2519 i -= 1;
2520 const client = daemon.clients.items[i];
2521 const revents = poll_fds.items[i + 3].revents;
2522
2523 if (revents & posix.POLL.IN != 0) {
2524 const n = client.read_buf.read(client.socket_fd) catch |err| {
2525 if (err == error.WouldBlock) continue;
2526 std.log.debug(
2527 "client read err={s} fd={d}",
2528 .{ @errorName(err), client.socket_fd },
2529 );
2530 const last = daemon.closeClient(client, i, false);
2531 if (last) break :daemon_loop;
2532 continue;
2533 };
2534
2535 if (n == 0) {
2536 // Client closed connection
2537 const last = daemon.closeClient(client, i, false);
2538 if (last) break :daemon_loop;
2539 continue;
2540 }
2541
2542 while (client.read_buf.next()) |msg| {
2543 switch (msg.header.tag) {
2544 .Input => try daemon.handleInput(client, msg.payload),
2545 .Output => try daemon.handleOutput(msg.payload, &vt_stream),
2546 .Init => try daemon.handleInit(client, pty_fd, &term, msg.payload),
2547 .Switch => try daemon.handleSwitch(msg.payload),
2548 .Resize => try daemon.handleResize(client, pty_fd, &term, msg.payload),
2549 .Detach => {
2550 daemon.handleDetach(client, i);
2551 break :clients_loop;
2552 },
2553 .DetachAll => {
2554 daemon.handleDetachAll();
2555 break :clients_loop;
2556 },
2557 .Kill => {
2558 break :daemon_loop;
2559 },
2560 .Info => try daemon.handleInfo(client),
2561 .History => try daemon.handleHistory(client, &term, msg.payload),
2562 .Run => try daemon.handleRun(client, msg.payload),
2563 .Ack, .TaskComplete => {},
2564 .Write => try daemon.handleWrite(client, msg.payload),
2565 _ => std.log.warn(
2566 "ignoring unknown IPC tag={d}",
2567 .{@intFromEnum(msg.header.tag)},
2568 ),
2569 }
2570 }
2571 }
2572
2573 if (revents & posix.POLL.OUT != 0) {
2574 // Flush pending output buffers
2575 const n = posix.write(client.socket_fd, client.write_buf.items) catch |err| blk: {
2576 if (err == error.WouldBlock) break :blk 0;
2577 // Error on write, close client
2578 const last = daemon.closeClient(client, i, false);
2579 if (last) break :daemon_loop;
2580 continue;
2581 };
2582
2583 if (n > 0) {
2584 client.write_buf.replaceRange(daemon.alloc, 0, n, &[_]u8{}) catch unreachable;
2585 }
2586
2587 if (client.write_buf.items.len == 0) {
2588 client.has_pending_output = false;
2589 }
2590 }
2591
2592 if (revents & (posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL) != 0) {
2593 const last = daemon.closeClient(client, i, false);
2594 if (last) break :daemon_loop;
2595 }
2596 }
2597 }
2598}
2599
2600fn wakeSignalPipe(_: i32, _: *const posix.siginfo_t, _: ?*anyopaque) callconv(.c) void {
2601 const saved = std.c._errno().*;
2602 _ = std.c.write(sig_pipe[1], "x", 1);
2603 std.c._errno().* = saved;
2604}
2605
2606// std.posix.poll retries EINTR internally, so SA_RESTART is moot -- neither
2607// setting wakes the loop. The handler writes to sig_pipe instead; poll()
2608// wakes on its read end.
2609fn installWakeHandler(sig: u6) void {
2610 const act: posix.Sigaction = .{
2611 .handler = .{ .sigaction = wakeSignalPipe },
2612 .mask = posix.sigemptyset(),
2613 .flags = posix.SA.SIGINFO,
2614 };
2615 posix.sigaction(sig, &act, null);
2616}
2617
2618fn ignoreSigpipe() void {
2619 const act: posix.Sigaction = .{
2620 .handler = .{ .handler = posix.SIG.IGN },
2621 .mask = posix.sigemptyset(),
2622 .flags = 0,
2623 };
2624 posix.sigaction(posix.SIG.PIPE, &act, null);
2625}