diff --git a/src/index.ts b/src/index.ts index d7fab2d..dabc7fd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -74,53 +74,71 @@ export default function (pi: ExtensionAPI) { /** Maps agent IDs to task IDs for O(1) completion lookup. */ const agentTaskMap = new Map(); - // ── Subagent extension presence detection ── - // Two paths: (1) listen for ready broadcast (subagents loads first), - // (2) send ping on our init (tasks loads first). - let subagentsAvailable = false; + // ── Subagent RPC helpers ── - // Ping subagents extension — scoped reply channel, no filtering needed - const pingId = randomUUID(); - const unsubPing = pi.events.on(`subagents:rpc:ping:reply:${pingId}`, () => { - subagentsAvailable = true; - unsubPing(); - }); - pi.events.emit("subagents:rpc:ping", { requestId: pingId }); + /** RPC reply envelope — matches pi-mono's RpcResponse shape. */ + type RpcReply = + | { success: true; data?: T } + | { success: false; error: string }; - // Also listen for ready broadcast (covers: subagents loads after us) - pi.events.on("subagents:ready", () => { - subagentsAvailable = true; - unsubPing(); // clean up ping listener if still pending - }); + /** Call a subagents RPC method: emit request, wait for scoped reply, unwrap envelope. */ + function rpcCall(channel: string, params: Record, timeoutMs: number): Promise { + const requestId = randomUUID(); + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { unsub(); reject(new Error(`${channel} timeout`)); }, timeoutMs); + const unsub = pi.events.on(`${channel}:reply:${requestId}`, (raw: unknown) => { + unsub(); clearTimeout(timer); + const reply = raw as RpcReply; + if (reply.success) resolve(reply.data as T); + else reject(new Error(reply.error)); + }); + pi.events.emit(channel, { requestId, ...params }); + }); + } /** Spawn a subagent via pi.events RPC (requires @tintinweb/pi-subagents extension). */ function spawnSubagent(type: string, prompt: string, options?: any): Promise { - const requestId = randomUUID(); - return new Promise((resolve, reject) => { - const timer = setTimeout(() => { unsub(); reject(new Error("subagents:rpc:spawn timeout")); }, 30000); - const unsub = pi.events.on(`subagents:rpc:spawn:reply:${requestId}`, (p: unknown) => { - const { id, error } = p as { id?: string; error?: string }; - unsub(); clearTimeout(timer); - if (error) reject(new Error(error)); - else resolve(id!); - }); - pi.events.emit("subagents:rpc:spawn", { requestId, type, prompt, options }); - }); + return rpcCall<{ id: string }>("subagents:rpc:spawn", { type, prompt, options }, 30_000).then(d => d.id); } /** Stop a subagent via pi.events RPC (requires @tintinweb/pi-subagents extension). */ - function stopSubagent(agentId: string): Promise { - const requestId = randomUUID(); - return new Promise((resolve) => { - const timer = setTimeout(() => { unsub(); resolve(false); }, 10000); - const unsub = pi.events.on(`subagents:rpc:stop:reply:${requestId}`, (p: unknown) => { - unsub(); clearTimeout(timer); - resolve((p as any).success ?? false); - }); - pi.events.emit("subagents:rpc:stop", { requestId, agentId }); - }); + function stopSubagent(agentId: string): Promise { + return rpcCall("subagents:rpc:stop", { agentId }, 10_000).catch(() => {}); } + // ── Subagent extension presence & version detection ── + const PROTOCOL_VERSION = 2; + let subagentsAvailable = false; + let pendingWarning: string | undefined; + + /** Ping subagents and check protocol version. Works with any handler version. */ + function checkSubagentsVersion() { + const requestId = randomUUID(); + const timer = setTimeout(() => { unsub(); }, 5_000); + const unsub = pi.events.on(`subagents:rpc:ping:reply:${requestId}`, (raw: unknown) => { + unsub(); clearTimeout(timer); + const remoteVersion = (raw as any)?.data?.version as number | undefined; + if (remoteVersion === undefined) { + pendingWarning = + "@tintinweb/pi-subagents is outdated — please update for task execution support."; + } else if (remoteVersion > PROTOCOL_VERSION) { + pendingWarning = + `@tintinweb/pi-tasks is outdated (protocol v${PROTOCOL_VERSION}, ` + + `pi-subagents has v${remoteVersion}) — please update for task execution support.`; + } else if (remoteVersion < PROTOCOL_VERSION) { + pendingWarning = + `@tintinweb/pi-subagents is outdated (protocol v${remoteVersion}, ` + + `pi-tasks has v${PROTOCOL_VERSION}) — please update for task execution support.`; + } else { + subagentsAvailable = true; + } + }); + pi.events.emit("subagents:rpc:ping", { requestId }); + } + + checkSubagentsVersion(); + pi.events.on("subagents:ready", () => checkSubagentsVersion()); + /** Build a prompt for a task being executed by a subagent. */ function buildTaskPrompt(task: { id: string; subject: string; description: string }, additionalContext?: string): string { let prompt = `You are executing task #${task.id}: "${task.subject}"\n\n${task.description}`; @@ -283,6 +301,10 @@ export default function (pi: ExtensionAPI) { widget.setUICtx(ctx.ui as UICtx); upgradeStoreIfNeeded(ctx); showPersistedTasks(); + if (pendingWarning) { + ctx.ui.notify(pendingWarning, "warning"); + pendingWarning = undefined; + } }); // session_switch fires on resume (reason: "resume") — reload persisted tasks. diff --git a/test/subagent-integration.test.ts b/test/subagent-integration.test.ts index be7485f..0f85fc7 100644 --- a/test/subagent-integration.test.ts +++ b/test/subagent-integration.test.ts @@ -76,6 +76,7 @@ function mockCtx() { ui: { setWidget: vi.fn(), setStatus: vi.fn(), + notify: vi.fn(), }, }; } @@ -83,14 +84,15 @@ function mockCtx() { // ---- Mock subagents extension (RPC responders) ---- /** Simulates the @tintinweb/pi-subagents extension: responds to ping + spawn RPCs and emits ready. */ -function installSubagentsMock(pi: { events: { on: Function; emit: Function } }) { +function installSubagentsMock(pi: { events: { on: Function; emit: Function } }, opts?: { spawnError?: string }) { let idCounter = 0; const spawned: Array<{ id: string; type: string; prompt: string; options: any }> = []; + const stopped: string[] = []; // Respond to ping — reply on scoped channel const unsubPing = pi.events.on("subagents:rpc:ping", (data: unknown) => { const { requestId } = data as { requestId: string }; - pi.events.emit(`subagents:rpc:ping:reply:${requestId}`, {}); + pi.events.emit(`subagents:rpc:ping:reply:${requestId}`, { success: true, data: { version: 2 } }); }); // Respond to spawn — reply on scoped channel @@ -98,9 +100,25 @@ function installSubagentsMock(pi: { events: { on: Function; emit: Function } }) const { requestId, type, prompt, options } = data as { requestId: string; type: string; prompt: string; options?: any; }; + if (opts?.spawnError) { + pi.events.emit(`subagents:rpc:spawn:reply:${requestId}`, { success: false, error: opts.spawnError }); + return; + } const id = `agent-${++idCounter}`; spawned.push({ id, type, prompt, options }); - pi.events.emit(`subagents:rpc:spawn:reply:${requestId}`, { id }); + pi.events.emit(`subagents:rpc:spawn:reply:${requestId}`, { success: true, data: { id } }); + }); + + // Respond to stop — reply on scoped channel + const unsubStop = pi.events.on("subagents:rpc:stop", (data: unknown) => { + const { requestId, agentId } = data as { requestId: string; agentId: string }; + const known = spawned.some(s => s.id === agentId); + if (known) { + stopped.push(agentId); + pi.events.emit(`subagents:rpc:stop:reply:${requestId}`, { success: true }); + } else { + pi.events.emit(`subagents:rpc:stop:reply:${requestId}`, { success: false, error: "Agent not found" }); + } }); // Broadcast readiness @@ -108,7 +126,8 @@ function installSubagentsMock(pi: { events: { on: Function; emit: Function } }) return { spawned, - unsub() { unsubPing(); unsubSpawn(); }, + stopped, + unsub() { unsubPing(); unsubSpawn(); unsubStop(); }, }; } @@ -556,12 +575,10 @@ describe("RPC protocol correctness", () => { it("spawn RPC rejects on timeout when no responder exists", async () => { const mock = mockPi(); + // Install ping handler (for version check) but no spawn handler + installVersionedMock(mock.pi, 2); initExtension(mock.pi as any); - // Emit ready AFTER init so the listener is registered — marks subagents - // as available, but there's no spawn handler installed - mock.pi.events.emit("subagents:ready", {}); - await mock.executeTool("TaskCreate", { subject: "Timeout test", description: "desc", @@ -603,6 +620,176 @@ describe("RPC protocol correctness", () => { rpc.unsub(); }); + + it("spawn RPC rejects with error message from server", async () => { + const mock = mockPi(); + installSubagentsMock(mock.pi, { spawnError: "No active session" }); + initExtension(mock.pi as any); + + await mock.executeTool("TaskCreate", { + subject: "Err test", + description: "desc", + agentType: "general-purpose", + }); + + const result = await mock.executeTool("TaskExecute", { task_ids: ["1"] }); + expect(result.content[0].text).toContain("No active session"); + }); + + it("stop RPC resolves on success", async () => { + const mock = mockPi(); + const rpc = installSubagentsMock(mock.pi); + initExtension(mock.pi as any); + + // Spawn a task so we have an agent to stop + await mock.executeTool("TaskCreate", { + subject: "Stoppable", + description: "desc", + agentType: "general-purpose", + }); + await mock.executeTool("TaskExecute", { task_ids: ["1"] }); + expect(rpc.spawned).toHaveLength(1); + + const result = await mock.executeTool("TaskStop", { task_id: "1" }); + expect(result.content[0].text).toContain("stopped successfully"); + expect(rpc.stopped).toContain("agent-1"); + + rpc.unsub(); + }); + + it("stop RPC returns false on error (agent not found) without throwing", async () => { + const mock = mockPi(); + const rpc = installSubagentsMock(mock.pi); + initExtension(mock.pi as any); + + // Create and execute a task, then simulate agent already gone + await mock.executeTool("TaskCreate", { + subject: "Ghost", + description: "desc", + agentType: "general-purpose", + }); + await mock.executeTool("TaskExecute", { task_ids: ["1"] }); + + // Clear spawned list so the mock's stop handler won't find the agent + rpc.spawned.length = 0; + + // TaskStop should still succeed (stopSubagent catches the error) + const result = await mock.executeTool("TaskStop", { task_id: "1" }); + expect(result.content[0].text).toContain("stopped successfully"); + + rpc.unsub(); + }); + + it("stop RPC returns false on timeout without throwing", async () => { + const mock = mockPi(); + initExtension(mock.pi as any); + + // Mark subagents as available via ready broadcast, but no stop handler installed + mock.pi.events.emit("subagents:ready", {}); + + await mock.executeTool("TaskCreate", { + subject: "Timeout stop", + description: "desc", + agentType: "general-purpose", + }); + // Manually set task as in_progress with an agentId (no spawn handler) + await mock.executeTool("TaskUpdate", { + taskId: "1", + status: "in_progress", + metadata: { agentType: "general-purpose", agentId: "ghost-agent" }, + }); + + vi.useFakeTimers(); + const stopPromise = mock.executeTool("TaskStop", { task_id: "1" }); + await vi.advanceTimersByTimeAsync(11000); + + // Should resolve (not throw) — stopSubagent catches timeout + const result = await stopPromise; + expect(result.content[0].text).toContain("stopped successfully"); + + vi.useRealTimers(); + }); +}); + +/** Install a ping-only mock with a specific protocol version (or no version for v1). */ +function installVersionedMock(pi: { events: { on: Function; emit: Function } }, version?: number) { + const unsubPing = pi.events.on("subagents:rpc:ping", (data: unknown) => { + const { requestId } = data as { requestId: string }; + if (version !== undefined) { + pi.events.emit(`subagents:rpc:ping:reply:${requestId}`, { success: true, data: { version } }); + } else { + // v1 handler — no envelope, no version + pi.events.emit(`subagents:rpc:ping:reply:${requestId}`, {}); + } + }); + pi.events.emit("subagents:ready", {}); + return { unsub() { unsubPing(); } }; +} + +describe("Protocol version mismatch", () => { + it("matching version — no warning", async () => { + const mock = mockPi(); + installVersionedMock(mock.pi, 2); + initExtension(mock.pi as any); + + // No warning on before_agent_start + const ctx = mockCtx(); + await mock.fireLifecycle("before_agent_start", {}, ctx); + expect(ctx.ui.notify).not.toHaveBeenCalled(); + }); + + it("old handler (no version) — warns about pi-subagents", async () => { + const mock = mockPi(); + installVersionedMock(mock.pi); // no version = v1 + initExtension(mock.pi as any); + + const ctx = mockCtx(); + await mock.fireLifecycle("before_agent_start", {}, ctx); + expect(ctx.ui.notify).toHaveBeenCalledWith( + expect.stringContaining("pi-subagents is outdated"), + "warning", + ); + }); + + it("handler ahead (v3) — warns about pi-tasks", async () => { + const mock = mockPi(); + installVersionedMock(mock.pi, 3); + initExtension(mock.pi as any); + + const ctx = mockCtx(); + await mock.fireLifecycle("before_agent_start", {}, ctx); + expect(ctx.ui.notify).toHaveBeenCalledWith( + expect.stringContaining("pi-tasks is outdated"), + "warning", + ); + }); + + it("handler behind (v1) — warns about pi-subagents", async () => { + const mock = mockPi(); + installVersionedMock(mock.pi, 1); + initExtension(mock.pi as any); + + const ctx = mockCtx(); + await mock.fireLifecycle("before_agent_start", {}, ctx); + expect(ctx.ui.notify).toHaveBeenCalledWith( + expect.stringContaining("pi-subagents is outdated"), + "warning", + ); + }); + + it("warning shown only once", async () => { + const mock = mockPi(); + installVersionedMock(mock.pi); // v1 — triggers warning + initExtension(mock.pi as any); + + const ctx1 = mockCtx(); + await mock.fireLifecycle("before_agent_start", {}, ctx1); + expect(ctx1.ui.notify).toHaveBeenCalledOnce(); + + const ctx2 = mockCtx(); + await mock.fireLifecycle("before_agent_start", {}, ctx2); + expect(ctx2.ui.notify).not.toHaveBeenCalled(); + }); }); describe("Widget agent ID display", () => {