Fix Telegram stop recovery and trace output

This commit is contained in:
wassname
2026-04-24 21:23:50 +08:00
parent 39da73ce3c
commit 14798607c6
10 changed files with 466 additions and 35 deletions
+225 -4
View File
@@ -1097,8 +1097,11 @@ test("Extension runtime polls, pairs, and dispatches an inbound Telegram turn in
await mkdir(agentDir, { recursive: true });
await writeFile(
configPath,
JSON.stringify({ botToken: "123:abc", lastUpdateId: 0 }, null, "\t") +
"\n",
JSON.stringify(
{ botToken: "123:abc", allowedUserId: 77, lastUpdateId: 0 },
null,
"\t",
) + "\n",
"utf8",
);
telegramExtension(pi);
@@ -1122,7 +1125,6 @@ test("Extension runtime polls, pairs, and dispatches an inbound Telegram turn in
const dispatchedContent = await dispatched;
assert.equal(sentMessages.length, 1);
assert.equal(Array.isArray(dispatchedContent), true);
assert.equal(apiCalls.includes("sendMessage"), true);
assert.equal(apiCalls.includes("sendChatAction"), true);
const promptBlocks = dispatchedContent as Array<{
type: string;
@@ -1292,7 +1294,7 @@ test("Extension runtime finalizes a drafted preview into the final Telegram repl
},
ctx,
);
assert.deepEqual(draftTexts, ["Draft preview", "Final answer", ""]);
assert.deepEqual(draftTexts, ["Draft preview", ""]);
assert.equal(sentTexts.length, 1);
assert.match(sentTexts[0] ?? "", /Final <b>answer<\/b>/);
await handlers.get("session_shutdown")?.({}, ctx);
@@ -1545,6 +1547,225 @@ test("Extension runtime carries queued follow-ups into history after an aborted
}
});
test("Extension runtime recovers from a stale aborted Telegram turn on the next message", async () => {
const agentDir = join(homedir(), ".pi", "agent");
const configPath = join(agentDir, "telegram.json");
const previousConfig = await readFile(configPath, "utf8").catch(
() => undefined,
);
const handlers = new Map<
string,
(event: unknown, ctx: unknown) => Promise<unknown>
>();
const commands = new Map<
string,
{ handler: (args: string, ctx: unknown) => Promise<void> }
>();
const sentMessages: Array<string | Array<{ type: string; text?: string }>> =
[];
let firstDispatchResolved = false;
let secondUpdatesResolve: ((value: Response) => void) | undefined;
let thirdUpdatesResolve: ((value: Response) => void) | undefined;
const secondUpdates = new Promise<Response>((resolve) => {
secondUpdatesResolve = resolve;
});
const thirdUpdates = new Promise<Response>((resolve) => {
thirdUpdatesResolve = resolve;
});
const pi = {
on: (
event: string,
handler: (event: unknown, ctx: unknown) => Promise<unknown>,
) => {
handlers.set(event, handler);
},
registerCommand: (
name: string,
definition: { handler: (args: string, ctx: unknown) => Promise<void> },
) => {
commands.set(name, definition);
},
registerTool: () => {},
sendUserMessage: (
content: string | Array<{ type: string; text?: string }>,
) => {
sentMessages.push(content);
firstDispatchResolved = true;
},
getThinkingLevel: () => "medium",
} as never;
const originalFetch = globalThis.fetch;
let getUpdatesCalls = 0;
const sendTexts: string[] = [];
globalThis.fetch = async (input, init) => {
const url = typeof input === "string" ? input : input.toString();
const method = url.split("/").at(-1) ?? "";
const body =
typeof init?.body === "string"
? (JSON.parse(init.body) as Record<string, unknown>)
: undefined;
if (method === "deleteWebhook") {
return { json: async () => ({ ok: true, result: true }) } as Response;
}
if (method === "getUpdates") {
getUpdatesCalls += 1;
if (getUpdatesCalls === 1) {
return {
json: async () => ({
ok: true,
result: [
{
_: "other",
update_id: 1,
message: {
message_id: 20,
chat: { id: 99, type: "private" },
from: { id: 77, is_bot: false, first_name: "Test" },
text: "first request",
},
},
],
}),
} as Response;
}
if (getUpdatesCalls === 2) return secondUpdates;
if (getUpdatesCalls === 3) return thirdUpdates;
throw new DOMException("stop", "AbortError");
}
if (method === "sendMessage") {
sendTexts.push(String(body?.text ?? ""));
return {
json: async () => ({
ok: true,
result: { message_id: 300 + sendTexts.length },
}),
} as Response;
}
if (method === "sendChatAction") {
return { json: async () => ({ ok: true, result: true }) } as Response;
}
throw new Error(`Unexpected Telegram API method: ${method}`);
};
try {
await mkdir(agentDir, { recursive: true });
await writeFile(
configPath,
JSON.stringify(
{ botToken: "123:abc", allowedUserId: 77, lastUpdateId: 0 },
null,
"\t",
) + "\n",
"utf8",
);
telegramExtension(pi);
const baseCtx = {
hasUI: true,
model: undefined,
signal: undefined,
ui: {
theme: {
fg: (_token: string, text: string) => text,
},
setStatus: () => {},
notify: () => {},
},
hasPendingMessages: () => false,
};
let aborted = false;
const idleCtx = {
...baseCtx,
isIdle: () => true,
abort: () => {},
} as never;
const activeCtx = {
...baseCtx,
isIdle: () => false,
abort: () => {
aborted = true;
},
} as never;
await handlers.get("session_start")?.({}, idleCtx);
await commands.get("telegram-connect")?.handler("", idleCtx);
await waitForCondition(() => firstDispatchResolved);
await handlers.get("agent_start")?.({}, activeCtx);
secondUpdatesResolve?.({
json: async () => ({
ok: true,
result: [
{
_: "other",
update_id: 2,
message: {
message_id: 21,
chat: { id: 99, type: "private" },
from: { id: 77, is_bot: false, first_name: "Test" },
text: "/stop",
},
},
],
}),
} as Response);
await waitForCondition(() => aborted);
await new Promise((resolve) =>
setTimeout(resolve, __telegramTestUtils.STALE_ABORT_RECOVERY_GRACE_MS + 50),
);
const dispatchCountBeforeRecovery = sentMessages.length;
thirdUpdatesResolve?.({
json: async () => ({
ok: true,
result: [
{
_: "other",
update_id: 3,
message: {
message_id: 22,
chat: { id: 99, type: "private" },
from: { id: 77, is_bot: false, first_name: "Test" },
text: "after stop",
},
},
],
}),
} as Response);
await waitForCondition(
() => sentMessages.length === dispatchCountBeforeRecovery + 1,
);
const promptBlocks = sentMessages.at(-1) as Array<{
type: string;
text?: string;
}>;
const promptText = promptBlocks[0]?.text ?? "";
assert.match(promptText, /^\[telegram\]/);
assert.equal(promptText, "[telegram] after stop");
assert.equal(
promptText.includes("Earlier Telegram messages arrived after an aborted turn"),
false,
);
assert.equal(sendTexts.includes("Aborted current turn."), true);
await handlers.get("session_shutdown")?.({}, idleCtx);
} finally {
globalThis.fetch = originalFetch;
if (previousConfig === undefined) {
await rm(configPath, { force: true });
} else {
await writeFile(configPath, previousConfig, "utf8");
}
}
});
test("Shell command replies keep long output tails instead of slicing them away", () => {
const reply = __telegramTestUtils.buildShellCommandReply({
shellCmd: "printf x",
stdout: "x".repeat(5000),
stderr: "",
exitCode: 0,
});
assert.match(reply, /\*\*Shell\*\*/);
assert.match(reply, /\*\*stdout\*\*/);
assert.ok(reply.includes("x".repeat(64)));
assert.ok(reply.endsWith("x".repeat(64) + "\n```"));
});
test("Extension runtime runs queued status control before the next queued prompt after agent end", async () => {
const agentDir = join(homedir(), ".pi", "agent");
const configPath = join(agentDir, "telegram.json");
+16 -2
View File
@@ -33,7 +33,8 @@ test("renderBlockMessage truncates thinking in compact mode", () => {
const compact = renderBlockMessage(block, "compact")!;
const full = renderBlockMessage(block, "full")!;
assert.ok(compact.length < full.length);
assert.ok(compact.includes("…"));
assert.match(compact, /\[compact trace truncated; use \/trace for full\]/);
assert.ok(full.includes(longText));
});
test("renderBlockMessage renders tool_call block", () => {
@@ -70,7 +71,20 @@ test("renderBlockMessage truncates tool_result in compact mode", () => {
const compact = renderBlockMessage(block, "compact")!;
const full = renderBlockMessage(block, "full")!;
assert.ok(compact.length < full.length);
assert.ok(compact.includes("…"));
assert.match(compact, /\[compact trace truncated; use \/trace for full\]/);
assert.ok(full.includes("x".repeat(600)));
});
test("renderBlockMessage marks truncated tool_call args in compact mode", () => {
const block = {
type: "tool_call" as const,
name: "write_file",
argsText: "x".repeat(600),
};
const compact = renderBlockMessage(block, "compact")!;
const full = renderBlockMessage(block, "full")!;
assert.match(compact, /\[compact trace truncated; use \/trace for full\]/);
assert.ok(full.includes("x".repeat(600)));
});
test("Nested lists stay out of code blocks", () => {
+3 -3
View File
@@ -130,13 +130,13 @@ test("Reply previews truncate long flush text and compute final text fallback",
buildTelegramPreviewFlushText({
state: {
mode: "message",
pendingText: "abcdef",
pendingText: "abcdefghijklmnopqrstuvwxyz",
lastSentText: "",
},
maxMessageLength: 3,
maxMessageLength: 24,
renderPreviewText: (markdown) => markdown,
}),
"abc",
"abc…\n[preview truncated]",
);
assert.equal(
buildTelegramPreviewFinalText({