mirror of
https://github.com/wassname/pi-lgtm.git
synced 2026-06-27 16:46:17 +08:00
standardize RPC envelope, add rpcCall helper, version mismatch detection
This commit is contained in:
+59
-37
@@ -74,53 +74,71 @@ export default function (pi: ExtensionAPI) {
|
|||||||
/** Maps agent IDs to task IDs for O(1) completion lookup. */
|
/** Maps agent IDs to task IDs for O(1) completion lookup. */
|
||||||
const agentTaskMap = new Map<string, string>();
|
const agentTaskMap = new Map<string, string>();
|
||||||
|
|
||||||
// ── Subagent extension presence detection ──
|
// ── Subagent RPC helpers ──
|
||||||
// Two paths: (1) listen for ready broadcast (subagents loads first),
|
|
||||||
// (2) send ping on our init (tasks loads first).
|
|
||||||
let subagentsAvailable = false;
|
|
||||||
|
|
||||||
// Ping subagents extension — scoped reply channel, no filtering needed
|
/** RPC reply envelope — matches pi-mono's RpcResponse shape. */
|
||||||
const pingId = randomUUID();
|
type RpcReply<T = void> =
|
||||||
const unsubPing = pi.events.on(`subagents:rpc:ping:reply:${pingId}`, () => {
|
| { success: true; data?: T }
|
||||||
subagentsAvailable = true;
|
| { success: false; error: string };
|
||||||
unsubPing();
|
|
||||||
});
|
|
||||||
pi.events.emit("subagents:rpc:ping", { requestId: pingId });
|
|
||||||
|
|
||||||
// Also listen for ready broadcast (covers: subagents loads after us)
|
/** Call a subagents RPC method: emit request, wait for scoped reply, unwrap envelope. */
|
||||||
pi.events.on("subagents:ready", () => {
|
function rpcCall<T>(channel: string, params: Record<string, unknown>, timeoutMs: number): Promise<T> {
|
||||||
subagentsAvailable = true;
|
const requestId = randomUUID();
|
||||||
unsubPing(); // clean up ping listener if still pending
|
return new Promise<T>((resolve, reject) => {
|
||||||
});
|
const timer = setTimeout(() => { unsub(); reject(new Error(`${channel} timeout`)); }, timeoutMs);
|
||||||
|
const unsub = pi.events.on(`${channel}:reply:${requestId}`, (raw: unknown) => {
|
||||||
|
unsub(); clearTimeout(timer);
|
||||||
|
const reply = raw as RpcReply<T>;
|
||||||
|
if (reply.success) resolve(reply.data as T);
|
||||||
|
else reject(new Error(reply.error));
|
||||||
|
});
|
||||||
|
pi.events.emit(channel, { requestId, ...params });
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/** Spawn a subagent via pi.events RPC (requires @tintinweb/pi-subagents extension). */
|
/** Spawn a subagent via pi.events RPC (requires @tintinweb/pi-subagents extension). */
|
||||||
function spawnSubagent(type: string, prompt: string, options?: any): Promise<string> {
|
function spawnSubagent(type: string, prompt: string, options?: any): Promise<string> {
|
||||||
const requestId = randomUUID();
|
return rpcCall<{ id: string }>("subagents:rpc:spawn", { type, prompt, options }, 30_000).then(d => d.id);
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
const timer = setTimeout(() => { unsub(); reject(new Error("subagents:rpc:spawn timeout")); }, 30000);
|
|
||||||
const unsub = pi.events.on(`subagents:rpc:spawn:reply:${requestId}`, (p: unknown) => {
|
|
||||||
const { id, error } = p as { id?: string; error?: string };
|
|
||||||
unsub(); clearTimeout(timer);
|
|
||||||
if (error) reject(new Error(error));
|
|
||||||
else resolve(id!);
|
|
||||||
});
|
|
||||||
pi.events.emit("subagents:rpc:spawn", { requestId, type, prompt, options });
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Stop a subagent via pi.events RPC (requires @tintinweb/pi-subagents extension). */
|
/** Stop a subagent via pi.events RPC (requires @tintinweb/pi-subagents extension). */
|
||||||
function stopSubagent(agentId: string): Promise<boolean> {
|
function stopSubagent(agentId: string): Promise<void> {
|
||||||
const requestId = randomUUID();
|
return rpcCall<void>("subagents:rpc:stop", { agentId }, 10_000).catch(() => {});
|
||||||
return new Promise((resolve) => {
|
|
||||||
const timer = setTimeout(() => { unsub(); resolve(false); }, 10000);
|
|
||||||
const unsub = pi.events.on(`subagents:rpc:stop:reply:${requestId}`, (p: unknown) => {
|
|
||||||
unsub(); clearTimeout(timer);
|
|
||||||
resolve((p as any).success ?? false);
|
|
||||||
});
|
|
||||||
pi.events.emit("subagents:rpc:stop", { requestId, agentId });
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Subagent extension presence & version detection ──
|
||||||
|
const PROTOCOL_VERSION = 2;
|
||||||
|
let subagentsAvailable = false;
|
||||||
|
let pendingWarning: string | undefined;
|
||||||
|
|
||||||
|
/** Ping subagents and check protocol version. Works with any handler version. */
|
||||||
|
function checkSubagentsVersion() {
|
||||||
|
const requestId = randomUUID();
|
||||||
|
const timer = setTimeout(() => { unsub(); }, 5_000);
|
||||||
|
const unsub = pi.events.on(`subagents:rpc:ping:reply:${requestId}`, (raw: unknown) => {
|
||||||
|
unsub(); clearTimeout(timer);
|
||||||
|
const remoteVersion = (raw as any)?.data?.version as number | undefined;
|
||||||
|
if (remoteVersion === undefined) {
|
||||||
|
pendingWarning =
|
||||||
|
"@tintinweb/pi-subagents is outdated — please update for task execution support.";
|
||||||
|
} else if (remoteVersion > PROTOCOL_VERSION) {
|
||||||
|
pendingWarning =
|
||||||
|
`@tintinweb/pi-tasks is outdated (protocol v${PROTOCOL_VERSION}, ` +
|
||||||
|
`pi-subagents has v${remoteVersion}) — please update for task execution support.`;
|
||||||
|
} else if (remoteVersion < PROTOCOL_VERSION) {
|
||||||
|
pendingWarning =
|
||||||
|
`@tintinweb/pi-subagents is outdated (protocol v${remoteVersion}, ` +
|
||||||
|
`pi-tasks has v${PROTOCOL_VERSION}) — please update for task execution support.`;
|
||||||
|
} else {
|
||||||
|
subagentsAvailable = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
pi.events.emit("subagents:rpc:ping", { requestId });
|
||||||
|
}
|
||||||
|
|
||||||
|
checkSubagentsVersion();
|
||||||
|
pi.events.on("subagents:ready", () => checkSubagentsVersion());
|
||||||
|
|
||||||
/** Build a prompt for a task being executed by a subagent. */
|
/** Build a prompt for a task being executed by a subagent. */
|
||||||
function buildTaskPrompt(task: { id: string; subject: string; description: string }, additionalContext?: string): string {
|
function buildTaskPrompt(task: { id: string; subject: string; description: string }, additionalContext?: string): string {
|
||||||
let prompt = `You are executing task #${task.id}: "${task.subject}"\n\n${task.description}`;
|
let prompt = `You are executing task #${task.id}: "${task.subject}"\n\n${task.description}`;
|
||||||
@@ -283,6 +301,10 @@ export default function (pi: ExtensionAPI) {
|
|||||||
widget.setUICtx(ctx.ui as UICtx);
|
widget.setUICtx(ctx.ui as UICtx);
|
||||||
upgradeStoreIfNeeded(ctx);
|
upgradeStoreIfNeeded(ctx);
|
||||||
showPersistedTasks();
|
showPersistedTasks();
|
||||||
|
if (pendingWarning) {
|
||||||
|
ctx.ui.notify(pendingWarning, "warning");
|
||||||
|
pendingWarning = undefined;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// session_switch fires on resume (reason: "resume") — reload persisted tasks.
|
// session_switch fires on resume (reason: "resume") — reload persisted tasks.
|
||||||
|
|||||||
@@ -76,6 +76,7 @@ function mockCtx() {
|
|||||||
ui: {
|
ui: {
|
||||||
setWidget: vi.fn(),
|
setWidget: vi.fn(),
|
||||||
setStatus: vi.fn(),
|
setStatus: vi.fn(),
|
||||||
|
notify: vi.fn(),
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -83,14 +84,15 @@ function mockCtx() {
|
|||||||
// ---- Mock subagents extension (RPC responders) ----
|
// ---- Mock subagents extension (RPC responders) ----
|
||||||
|
|
||||||
/** Simulates the @tintinweb/pi-subagents extension: responds to ping + spawn RPCs and emits ready. */
|
/** Simulates the @tintinweb/pi-subagents extension: responds to ping + spawn RPCs and emits ready. */
|
||||||
function installSubagentsMock(pi: { events: { on: Function; emit: Function } }) {
|
function installSubagentsMock(pi: { events: { on: Function; emit: Function } }, opts?: { spawnError?: string }) {
|
||||||
let idCounter = 0;
|
let idCounter = 0;
|
||||||
const spawned: Array<{ id: string; type: string; prompt: string; options: any }> = [];
|
const spawned: Array<{ id: string; type: string; prompt: string; options: any }> = [];
|
||||||
|
const stopped: string[] = [];
|
||||||
|
|
||||||
// Respond to ping — reply on scoped channel
|
// Respond to ping — reply on scoped channel
|
||||||
const unsubPing = pi.events.on("subagents:rpc:ping", (data: unknown) => {
|
const unsubPing = pi.events.on("subagents:rpc:ping", (data: unknown) => {
|
||||||
const { requestId } = data as { requestId: string };
|
const { requestId } = data as { requestId: string };
|
||||||
pi.events.emit(`subagents:rpc:ping:reply:${requestId}`, {});
|
pi.events.emit(`subagents:rpc:ping:reply:${requestId}`, { success: true, data: { version: 2 } });
|
||||||
});
|
});
|
||||||
|
|
||||||
// Respond to spawn — reply on scoped channel
|
// Respond to spawn — reply on scoped channel
|
||||||
@@ -98,9 +100,25 @@ function installSubagentsMock(pi: { events: { on: Function; emit: Function } })
|
|||||||
const { requestId, type, prompt, options } = data as {
|
const { requestId, type, prompt, options } = data as {
|
||||||
requestId: string; type: string; prompt: string; options?: any;
|
requestId: string; type: string; prompt: string; options?: any;
|
||||||
};
|
};
|
||||||
|
if (opts?.spawnError) {
|
||||||
|
pi.events.emit(`subagents:rpc:spawn:reply:${requestId}`, { success: false, error: opts.spawnError });
|
||||||
|
return;
|
||||||
|
}
|
||||||
const id = `agent-${++idCounter}`;
|
const id = `agent-${++idCounter}`;
|
||||||
spawned.push({ id, type, prompt, options });
|
spawned.push({ id, type, prompt, options });
|
||||||
pi.events.emit(`subagents:rpc:spawn:reply:${requestId}`, { id });
|
pi.events.emit(`subagents:rpc:spawn:reply:${requestId}`, { success: true, data: { id } });
|
||||||
|
});
|
||||||
|
|
||||||
|
// Respond to stop — reply on scoped channel
|
||||||
|
const unsubStop = pi.events.on("subagents:rpc:stop", (data: unknown) => {
|
||||||
|
const { requestId, agentId } = data as { requestId: string; agentId: string };
|
||||||
|
const known = spawned.some(s => s.id === agentId);
|
||||||
|
if (known) {
|
||||||
|
stopped.push(agentId);
|
||||||
|
pi.events.emit(`subagents:rpc:stop:reply:${requestId}`, { success: true });
|
||||||
|
} else {
|
||||||
|
pi.events.emit(`subagents:rpc:stop:reply:${requestId}`, { success: false, error: "Agent not found" });
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Broadcast readiness
|
// Broadcast readiness
|
||||||
@@ -108,7 +126,8 @@ function installSubagentsMock(pi: { events: { on: Function; emit: Function } })
|
|||||||
|
|
||||||
return {
|
return {
|
||||||
spawned,
|
spawned,
|
||||||
unsub() { unsubPing(); unsubSpawn(); },
|
stopped,
|
||||||
|
unsub() { unsubPing(); unsubSpawn(); unsubStop(); },
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -556,12 +575,10 @@ describe("RPC protocol correctness", () => {
|
|||||||
|
|
||||||
it("spawn RPC rejects on timeout when no responder exists", async () => {
|
it("spawn RPC rejects on timeout when no responder exists", async () => {
|
||||||
const mock = mockPi();
|
const mock = mockPi();
|
||||||
|
// Install ping handler (for version check) but no spawn handler
|
||||||
|
installVersionedMock(mock.pi, 2);
|
||||||
initExtension(mock.pi as any);
|
initExtension(mock.pi as any);
|
||||||
|
|
||||||
// Emit ready AFTER init so the listener is registered — marks subagents
|
|
||||||
// as available, but there's no spawn handler installed
|
|
||||||
mock.pi.events.emit("subagents:ready", {});
|
|
||||||
|
|
||||||
await mock.executeTool("TaskCreate", {
|
await mock.executeTool("TaskCreate", {
|
||||||
subject: "Timeout test",
|
subject: "Timeout test",
|
||||||
description: "desc",
|
description: "desc",
|
||||||
@@ -603,6 +620,176 @@ describe("RPC protocol correctness", () => {
|
|||||||
|
|
||||||
rpc.unsub();
|
rpc.unsub();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("spawn RPC rejects with error message from server", async () => {
|
||||||
|
const mock = mockPi();
|
||||||
|
installSubagentsMock(mock.pi, { spawnError: "No active session" });
|
||||||
|
initExtension(mock.pi as any);
|
||||||
|
|
||||||
|
await mock.executeTool("TaskCreate", {
|
||||||
|
subject: "Err test",
|
||||||
|
description: "desc",
|
||||||
|
agentType: "general-purpose",
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = await mock.executeTool("TaskExecute", { task_ids: ["1"] });
|
||||||
|
expect(result.content[0].text).toContain("No active session");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("stop RPC resolves on success", async () => {
|
||||||
|
const mock = mockPi();
|
||||||
|
const rpc = installSubagentsMock(mock.pi);
|
||||||
|
initExtension(mock.pi as any);
|
||||||
|
|
||||||
|
// Spawn a task so we have an agent to stop
|
||||||
|
await mock.executeTool("TaskCreate", {
|
||||||
|
subject: "Stoppable",
|
||||||
|
description: "desc",
|
||||||
|
agentType: "general-purpose",
|
||||||
|
});
|
||||||
|
await mock.executeTool("TaskExecute", { task_ids: ["1"] });
|
||||||
|
expect(rpc.spawned).toHaveLength(1);
|
||||||
|
|
||||||
|
const result = await mock.executeTool("TaskStop", { task_id: "1" });
|
||||||
|
expect(result.content[0].text).toContain("stopped successfully");
|
||||||
|
expect(rpc.stopped).toContain("agent-1");
|
||||||
|
|
||||||
|
rpc.unsub();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("stop RPC returns false on error (agent not found) without throwing", async () => {
|
||||||
|
const mock = mockPi();
|
||||||
|
const rpc = installSubagentsMock(mock.pi);
|
||||||
|
initExtension(mock.pi as any);
|
||||||
|
|
||||||
|
// Create and execute a task, then simulate agent already gone
|
||||||
|
await mock.executeTool("TaskCreate", {
|
||||||
|
subject: "Ghost",
|
||||||
|
description: "desc",
|
||||||
|
agentType: "general-purpose",
|
||||||
|
});
|
||||||
|
await mock.executeTool("TaskExecute", { task_ids: ["1"] });
|
||||||
|
|
||||||
|
// Clear spawned list so the mock's stop handler won't find the agent
|
||||||
|
rpc.spawned.length = 0;
|
||||||
|
|
||||||
|
// TaskStop should still succeed (stopSubagent catches the error)
|
||||||
|
const result = await mock.executeTool("TaskStop", { task_id: "1" });
|
||||||
|
expect(result.content[0].text).toContain("stopped successfully");
|
||||||
|
|
||||||
|
rpc.unsub();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("stop RPC returns false on timeout without throwing", async () => {
|
||||||
|
const mock = mockPi();
|
||||||
|
initExtension(mock.pi as any);
|
||||||
|
|
||||||
|
// Mark subagents as available via ready broadcast, but no stop handler installed
|
||||||
|
mock.pi.events.emit("subagents:ready", {});
|
||||||
|
|
||||||
|
await mock.executeTool("TaskCreate", {
|
||||||
|
subject: "Timeout stop",
|
||||||
|
description: "desc",
|
||||||
|
agentType: "general-purpose",
|
||||||
|
});
|
||||||
|
// Manually set task as in_progress with an agentId (no spawn handler)
|
||||||
|
await mock.executeTool("TaskUpdate", {
|
||||||
|
taskId: "1",
|
||||||
|
status: "in_progress",
|
||||||
|
metadata: { agentType: "general-purpose", agentId: "ghost-agent" },
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const stopPromise = mock.executeTool("TaskStop", { task_id: "1" });
|
||||||
|
await vi.advanceTimersByTimeAsync(11000);
|
||||||
|
|
||||||
|
// Should resolve (not throw) — stopSubagent catches timeout
|
||||||
|
const result = await stopPromise;
|
||||||
|
expect(result.content[0].text).toContain("stopped successfully");
|
||||||
|
|
||||||
|
vi.useRealTimers();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
/** Install a ping-only mock with a specific protocol version (or no version for v1). */
|
||||||
|
function installVersionedMock(pi: { events: { on: Function; emit: Function } }, version?: number) {
|
||||||
|
const unsubPing = pi.events.on("subagents:rpc:ping", (data: unknown) => {
|
||||||
|
const { requestId } = data as { requestId: string };
|
||||||
|
if (version !== undefined) {
|
||||||
|
pi.events.emit(`subagents:rpc:ping:reply:${requestId}`, { success: true, data: { version } });
|
||||||
|
} else {
|
||||||
|
// v1 handler — no envelope, no version
|
||||||
|
pi.events.emit(`subagents:rpc:ping:reply:${requestId}`, {});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
pi.events.emit("subagents:ready", {});
|
||||||
|
return { unsub() { unsubPing(); } };
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("Protocol version mismatch", () => {
|
||||||
|
it("matching version — no warning", async () => {
|
||||||
|
const mock = mockPi();
|
||||||
|
installVersionedMock(mock.pi, 2);
|
||||||
|
initExtension(mock.pi as any);
|
||||||
|
|
||||||
|
// No warning on before_agent_start
|
||||||
|
const ctx = mockCtx();
|
||||||
|
await mock.fireLifecycle("before_agent_start", {}, ctx);
|
||||||
|
expect(ctx.ui.notify).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("old handler (no version) — warns about pi-subagents", async () => {
|
||||||
|
const mock = mockPi();
|
||||||
|
installVersionedMock(mock.pi); // no version = v1
|
||||||
|
initExtension(mock.pi as any);
|
||||||
|
|
||||||
|
const ctx = mockCtx();
|
||||||
|
await mock.fireLifecycle("before_agent_start", {}, ctx);
|
||||||
|
expect(ctx.ui.notify).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("pi-subagents is outdated"),
|
||||||
|
"warning",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("handler ahead (v3) — warns about pi-tasks", async () => {
|
||||||
|
const mock = mockPi();
|
||||||
|
installVersionedMock(mock.pi, 3);
|
||||||
|
initExtension(mock.pi as any);
|
||||||
|
|
||||||
|
const ctx = mockCtx();
|
||||||
|
await mock.fireLifecycle("before_agent_start", {}, ctx);
|
||||||
|
expect(ctx.ui.notify).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("pi-tasks is outdated"),
|
||||||
|
"warning",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("handler behind (v1) — warns about pi-subagents", async () => {
|
||||||
|
const mock = mockPi();
|
||||||
|
installVersionedMock(mock.pi, 1);
|
||||||
|
initExtension(mock.pi as any);
|
||||||
|
|
||||||
|
const ctx = mockCtx();
|
||||||
|
await mock.fireLifecycle("before_agent_start", {}, ctx);
|
||||||
|
expect(ctx.ui.notify).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("pi-subagents is outdated"),
|
||||||
|
"warning",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("warning shown only once", async () => {
|
||||||
|
const mock = mockPi();
|
||||||
|
installVersionedMock(mock.pi); // v1 — triggers warning
|
||||||
|
initExtension(mock.pi as any);
|
||||||
|
|
||||||
|
const ctx1 = mockCtx();
|
||||||
|
await mock.fireLifecycle("before_agent_start", {}, ctx1);
|
||||||
|
expect(ctx1.ui.notify).toHaveBeenCalledOnce();
|
||||||
|
|
||||||
|
const ctx2 = mockCtx();
|
||||||
|
await mock.fireLifecycle("before_agent_start", {}, ctx2);
|
||||||
|
expect(ctx2.ui.notify).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("Widget agent ID display", () => {
|
describe("Widget agent ID display", () => {
|
||||||
|
|||||||
Reference in New Issue
Block a user