repos / zmx

session persistence for terminal processes
git clone https://github.com/neurosnap/zmx.git

commit
7d2b71e
parent
0b2df2b
author
Eric Bower
date
2025-12-03 23:16:20 -0500 EST
refactor: when daemon is unresponsive, remove it

This commit does a better job removing a unix socket file when
the daemon is unresponsive, likely indicating that the process has
already been killed.

We accomplish this by creating a probSession fn that will connect to the
unix socket, send the Info event, and timeout after 1 second.  After
that point we remove the unix socket file.
1 files changed,  +92, -54
M src/main.zig
+92, -54
  1@@ -9,6 +9,7 @@ var log_system = log.LogSystem{};
  2 
  3 pub const std_options: std.Options = .{
  4     .logFn = zmxLogFn,
  5+    .log_level = .debug,
  6 };
  7 
  8 fn zmxLogFn(
  9@@ -210,6 +211,8 @@ fn list(cfg: *Cfg) !void {
 10     defer dir.close();
 11     var iter = dir.iterate();
 12     var hasSessions = false;
 13+    var buf: [4096]u8 = undefined;
 14+    var w = std.fs.File.stdout().writer(&buf);
 15     while (try iter.next()) |entry| {
 16         const exists = sessionExists(dir, entry.name) catch continue;
 17         if (exists) {
 18@@ -217,50 +220,22 @@ fn list(cfg: *Cfg) !void {
 19             const socket_path = try getSocketPath(alloc, cfg.socket_dir, entry.name);
 20             defer alloc.free(socket_path);
 21 
 22-            const fd = sessionConnect(socket_path) catch |err| {
 23-                var msg_buf: [256]u8 = undefined;
 24-                const msg = try std.fmt.bufPrint(&msg_buf, "could not connect to session {s}: {s}\n", .{ entry.name, @errorName(err) });
 25-                try std.fs.File.stdout().writeAll(msg);
 26+            const result = probeSession(alloc, socket_path) catch |err| {
 27+                w.interface.print("session_name={s}\tstatus={s}\t(cleaning up)\n", .{ entry.name, @errorName(err) }) catch {};
 28+                w.interface.flush() catch {};
 29+                cleanupStaleSocket(dir, entry.name);
 30                 continue;
 31             };
 32-            defer posix.close(fd);
 33-
 34-            try ipc.send(fd, .Info, "");
 35+            defer posix.close(result.fd);
 36 
 37-            var sb = try ipc.SocketBuffer.init(alloc);
 38-            defer sb.deinit();
 39-
 40-            var info: ?ipc.Info = null;
 41-            read_loop: while (true) {
 42-                const n = try sb.read(fd);
 43-                if (n == 0) break; // EOF
 44-
 45-                while (sb.next()) |msg| {
 46-                    if (msg.header.tag == .Info) {
 47-                        if (msg.payload.len == @sizeOf(ipc.Info)) {
 48-                            info = std.mem.bytesToValue(ipc.Info, msg.payload[0..@sizeOf(ipc.Info)]);
 49-                            break :read_loop;
 50-                        }
 51-                    }
 52-                }
 53-            }
 54-
 55-            if (info) |i| {
 56-                var msg_buf: [256]u8 = undefined;
 57-                const msg = try std.fmt.bufPrint(&msg_buf, "session_name={s}\tpid={d}\tclients={d}\n", .{ entry.name, i.pid, i.clients_len });
 58-                try std.fs.File.stdout().writeAll(msg);
 59-            } else {
 60-                var msg_buf: [256]u8 = undefined;
 61-                const msg = try std.fmt.bufPrint(&msg_buf, "session_name={s}\tpid=?\tclients=?\n", .{entry.name});
 62-                try std.fs.File.stdout().writeAll(msg);
 63-            }
 64+            try w.interface.print("session_name={s}\tpid={d}\tclients={d}\n", .{ entry.name, result.info.pid, result.info.clients_len });
 65+            try w.interface.flush();
 66         }
 67     }
 68 
 69     if (!hasSessions) {
 70-        var msg_buf: [256]u8 = undefined;
 71-        const msg = try std.fmt.bufPrint(&msg_buf, "no sessions found in {s}\n", .{cfg.socket_dir});
 72-        try std.fs.File.stdout().writeAll(msg);
 73+        try w.interface.print("no sessions found in {s}\n", .{cfg.socket_dir});
 74+        try w.interface.flush();
 75     }
 76 }
 77 
 78@@ -277,11 +252,18 @@ fn detachAll(cfg: *Cfg) !void {
 79     };
 80     defer alloc.free(session_name);
 81 
 82+    var dir = try std.fs.openDirAbsolute(cfg.socket_dir, .{});
 83+    defer dir.close();
 84+
 85     const socket_path = try getSocketPath(alloc, cfg.socket_dir, session_name);
 86     defer alloc.free(socket_path);
 87-    const client_sock_fd = try sessionConnect(socket_path);
 88-    defer posix.close(client_sock_fd);
 89-    ipc.send(client_sock_fd, .DetachAll, "") catch |err| switch (err) {
 90+    const result = probeSession(alloc, socket_path) catch |err| {
 91+        std.log.err("session unresponsive: {s}", .{@errorName(err)});
 92+        cleanupStaleSocket(dir, session_name);
 93+        return;
 94+    };
 95+    defer posix.close(result.fd);
 96+    ipc.send(result.fd, .DetachAll, "") catch |err| switch (err) {
 97         error.BrokenPipe, error.ConnectionResetByPeer => return,
 98         else => return err,
 99     };
100@@ -298,13 +280,22 @@ fn kill(cfg: *Cfg, session_name: []const u8) !void {
101     const exists = try sessionExists(dir, session_name);
102     if (!exists) {
103         std.log.err("cannot kill session because it does not exist session_name={s}", .{session_name});
104+        return;
105     }
106 
107     const socket_path = try getSocketPath(alloc, cfg.socket_dir, session_name);
108     defer alloc.free(socket_path);
109-    const client_sock_fd = try sessionConnect(socket_path);
110-    defer posix.close(client_sock_fd);
111-    ipc.send(client_sock_fd, .Kill, "") catch |err| switch (err) {
112+    const result = probeSession(alloc, socket_path) catch |err| {
113+        std.log.err("session unresponsive: {s}", .{@errorName(err)});
114+        cleanupStaleSocket(dir, session_name);
115+        var buf: [4096]u8 = undefined;
116+        var w = std.fs.File.stdout().writer(&buf);
117+        w.interface.print("cleaned up stale session {s}\n", .{session_name}) catch {};
118+        w.interface.flush() catch {};
119+        return;
120+    };
121+    defer posix.close(result.fd);
122+    ipc.send(result.fd, .Kill, "") catch |err| switch (err) {
123         error.BrokenPipe, error.ConnectionResetByPeer => return,
124         else => return err,
125     };
126@@ -323,20 +314,14 @@ fn attach(daemon: *Daemon) !void {
127     var should_create = !exists;
128 
129     if (exists) {
130-        const fd = sessionConnect(daemon.socket_path) catch |err| switch (err) {
131-            error.ConnectionRefused => blk: {
132-                std.log.warn("stale socket found, cleaning up fname={s}", .{daemon.session_name});
133-                try dir.deleteFile(daemon.session_name);
134-                should_create = true;
135-                break :blk -1;
136-            },
137-            else => return err,
138-        };
139-        if (fd != -1) {
140-            posix.close(fd);
141+        if (probeSession(daemon.alloc, daemon.socket_path)) |result| {
142+            posix.close(result.fd);
143             if (daemon.command != null) {
144                 std.log.warn("session already exists, ignoring command session={s}", .{daemon.session_name});
145             }
146+        } else |_| {
147+            cleanupStaleSocket(dir, daemon.session_name);
148+            should_create = true;
149         }
150     }
151 
152@@ -878,6 +863,59 @@ fn sessionConnect(fname: []const u8) !i32 {
153     return socket_fd;
154 }
155 
156+const SessionProbeError = error{
157+    Timeout,
158+    ConnectionRefused,
159+    Unexpected,
160+};
161+
162+const SessionProbeResult = struct {
163+    fd: i32,
164+    info: ipc.Info,
165+};
166+
167+fn probeSession(alloc: std.mem.Allocator, socket_path: []const u8) SessionProbeError!SessionProbeResult {
168+    const timeout_ms = 1000;
169+    const fd = sessionConnect(socket_path) catch |err| switch (err) {
170+        error.ConnectionRefused => return error.ConnectionRefused,
171+        else => return error.Unexpected,
172+    };
173+    errdefer posix.close(fd);
174+
175+    ipc.send(fd, .Info, "") catch return error.Unexpected;
176+
177+    var poll_fds = [_]posix.pollfd{.{ .fd = fd, .events = posix.POLL.IN, .revents = 0 }};
178+    const poll_result = posix.poll(&poll_fds, timeout_ms) catch return error.Unexpected;
179+    if (poll_result == 0) {
180+        return error.Timeout;
181+    }
182+
183+    var sb = ipc.SocketBuffer.init(alloc) catch return error.Unexpected;
184+    defer sb.deinit();
185+
186+    const n = sb.read(fd) catch return error.Unexpected;
187+    if (n == 0) return error.Unexpected;
188+
189+    while (sb.next()) |msg| {
190+        if (msg.header.tag == .Info) {
191+            if (msg.payload.len == @sizeOf(ipc.Info)) {
192+                return .{
193+                    .fd = fd,
194+                    .info = std.mem.bytesToValue(ipc.Info, msg.payload[0..@sizeOf(ipc.Info)]),
195+                };
196+            }
197+        }
198+    }
199+    return error.Unexpected;
200+}
201+
202+fn cleanupStaleSocket(dir: std.fs.Dir, session_name: []const u8) void {
203+    std.log.warn("stale socket found, cleaning up session={s}", .{session_name});
204+    dir.deleteFile(session_name) catch |err| {
205+        std.log.warn("failed to delete stale socket err={s}", .{@errorName(err)});
206+    };
207+}
208+
209 fn sessionExists(dir: std.fs.Dir, name: []const u8) !bool {
210     const stat = dir.statFile(name) catch |err| switch (err) {
211         error.FileNotFound => return false,