mirror of
https://github.com/wassname/pi-telegram.git
synced 2026-06-27 19:48:13 +08:00
2983 lines
86 KiB
TypeScript
2983 lines
86 KiB
TypeScript
/**
|
|
* Regression tests for Telegram queue and runtime decision helpers
|
|
* Exercises queue ordering, mutation, dispatch planning, lifecycle plans, and model-switch guard behavior
|
|
*/
|
|
|
|
import assert from "node:assert/strict";
|
|
import { mkdir, readFile, rm, writeFile } from "node:fs/promises";
|
|
import { homedir } from "node:os";
|
|
import { join } from "node:path";
|
|
import test from "node:test";
|
|
|
|
import telegramExtension, { __telegramTestUtils } from "../index.ts";
|
|
import {
|
|
buildTelegramAgentStartPlan,
|
|
buildTelegramSessionShutdownState,
|
|
buildTelegramSessionStartState,
|
|
executeTelegramControlItemRuntime,
|
|
executeTelegramQueueDispatchPlan,
|
|
getNextTelegramToolExecutionCount,
|
|
shouldStartTelegramPolling,
|
|
} from "../lib/queue.ts";
|
|
|
|
async function waitForCondition(
|
|
predicate: () => boolean,
|
|
timeoutMs = 2000,
|
|
): Promise<void> {
|
|
const deadline = Date.now() + timeoutMs;
|
|
while (Date.now() < deadline) {
|
|
if (predicate()) return;
|
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
|
}
|
|
throw new Error("Timed out waiting for condition");
|
|
}
|
|
|
|
test("Control-lane items sort before priority and default prompt items", () => {
|
|
const queueItemType = undefined as
|
|
| Parameters<typeof __telegramTestUtils.compareTelegramQueueItems>[0]
|
|
| undefined;
|
|
const defaultPrompt: typeof queueItemType = {
|
|
kind: "prompt",
|
|
chatId: 1,
|
|
replyToMessageId: 1,
|
|
sourceMessageIds: [1],
|
|
queueOrder: 10,
|
|
queueLane: "default",
|
|
laneOrder: 10,
|
|
queuedAttachments: [],
|
|
content: [{ type: "text", text: "default" }],
|
|
historyText: "default",
|
|
statusSummary: "default",
|
|
};
|
|
const priorityPrompt: typeof queueItemType = {
|
|
kind: "prompt",
|
|
chatId: 1,
|
|
replyToMessageId: 2,
|
|
sourceMessageIds: [2],
|
|
queueOrder: 11,
|
|
queueLane: "priority",
|
|
laneOrder: 0,
|
|
queuedAttachments: [],
|
|
content: [{ type: "text", text: "priority" }],
|
|
historyText: "priority",
|
|
statusSummary: "priority",
|
|
};
|
|
const controlItem: typeof queueItemType = {
|
|
kind: "control",
|
|
controlType: "status",
|
|
chatId: 1,
|
|
replyToMessageId: 3,
|
|
queueOrder: 12,
|
|
queueLane: "control",
|
|
laneOrder: 0,
|
|
statusSummary: "control",
|
|
execute: async () => {},
|
|
};
|
|
const items = [defaultPrompt, controlItem, priorityPrompt].sort(
|
|
__telegramTestUtils.compareTelegramQueueItems,
|
|
);
|
|
assert.deepEqual(
|
|
items.map((item) => item?.statusSummary),
|
|
["control", "priority", "default"],
|
|
);
|
|
});
|
|
|
|
test("Queue mutation helpers remove prompt items by Telegram message id", () => {
|
|
const queueItemType = undefined as
|
|
| Parameters<
|
|
typeof __telegramTestUtils.removeTelegramQueueItemsByMessageIds
|
|
>[0][number]
|
|
| undefined;
|
|
const promptItem: typeof queueItemType = {
|
|
kind: "prompt",
|
|
chatId: 1,
|
|
replyToMessageId: 1,
|
|
sourceMessageIds: [11, 12],
|
|
queueOrder: 1,
|
|
queueLane: "default",
|
|
laneOrder: 1,
|
|
queuedAttachments: [],
|
|
content: [{ type: "text", text: "prompt" }],
|
|
historyText: "prompt history",
|
|
statusSummary: "prompt",
|
|
};
|
|
const controlItem: typeof queueItemType = {
|
|
kind: "control",
|
|
controlType: "status",
|
|
chatId: 1,
|
|
replyToMessageId: 2,
|
|
queueOrder: 2,
|
|
queueLane: "control",
|
|
laneOrder: 0,
|
|
statusSummary: "control",
|
|
execute: async () => {},
|
|
};
|
|
const result = __telegramTestUtils.removeTelegramQueueItemsByMessageIds(
|
|
[promptItem, controlItem],
|
|
[12],
|
|
);
|
|
assert.equal(result.removedCount, 1);
|
|
assert.deepEqual(
|
|
result.items.map((item) => item.statusSummary),
|
|
["control"],
|
|
);
|
|
});
|
|
|
|
test("Queue mutation helpers apply and clear prompt priority without touching control items", () => {
|
|
const queueItemType = undefined as
|
|
| Parameters<
|
|
typeof __telegramTestUtils.prioritizeTelegramQueuePrompt
|
|
>[0][number]
|
|
| undefined;
|
|
const promptItem: typeof queueItemType = {
|
|
kind: "prompt",
|
|
chatId: 1,
|
|
replyToMessageId: 1,
|
|
sourceMessageIds: [11],
|
|
queueOrder: 4,
|
|
queueLane: "default",
|
|
laneOrder: 4,
|
|
queuedAttachments: [],
|
|
content: [{ type: "text", text: "prompt" }],
|
|
historyText: "prompt history",
|
|
statusSummary: "prompt",
|
|
};
|
|
const controlItem: typeof queueItemType = {
|
|
kind: "control",
|
|
controlType: "status",
|
|
chatId: 1,
|
|
replyToMessageId: 2,
|
|
queueOrder: 5,
|
|
queueLane: "control",
|
|
laneOrder: 0,
|
|
statusSummary: "control",
|
|
execute: async () => {},
|
|
};
|
|
const prioritized = __telegramTestUtils.prioritizeTelegramQueuePrompt(
|
|
[promptItem, controlItem],
|
|
11,
|
|
0,
|
|
);
|
|
assert.equal(prioritized.changed, true);
|
|
assert.equal(prioritized.items[0]?.queueLane, "priority");
|
|
const cleared = __telegramTestUtils.clearTelegramQueuePromptPriority(
|
|
prioritized.items,
|
|
11,
|
|
);
|
|
assert.equal(cleared.changed, true);
|
|
assert.equal(cleared.items[0]?.queueLane, "default");
|
|
assert.equal(cleared.items[1]?.queueLane, "control");
|
|
});
|
|
|
|
test("History partition keeps control items queued and extracts prompt items", () => {
|
|
const queueItemType = undefined as
|
|
| Parameters<
|
|
typeof __telegramTestUtils.partitionTelegramQueueItemsForHistory
|
|
>[0][number]
|
|
| undefined;
|
|
const promptItem: typeof queueItemType = {
|
|
kind: "prompt",
|
|
chatId: 1,
|
|
replyToMessageId: 1,
|
|
sourceMessageIds: [1],
|
|
queueOrder: 1,
|
|
queueLane: "default",
|
|
laneOrder: 1,
|
|
queuedAttachments: [],
|
|
content: [{ type: "text", text: "prompt" }],
|
|
historyText: "prompt history",
|
|
statusSummary: "prompt",
|
|
};
|
|
const controlItem: typeof queueItemType = {
|
|
kind: "control",
|
|
controlType: "status",
|
|
chatId: 1,
|
|
replyToMessageId: 2,
|
|
queueOrder: 2,
|
|
queueLane: "control",
|
|
laneOrder: 0,
|
|
statusSummary: "control",
|
|
execute: async () => {},
|
|
};
|
|
const result = __telegramTestUtils.partitionTelegramQueueItemsForHistory([
|
|
promptItem,
|
|
controlItem,
|
|
]);
|
|
assert.deepEqual(
|
|
result.historyTurns.map((item) => item.statusSummary),
|
|
["prompt"],
|
|
);
|
|
assert.deepEqual(
|
|
result.remainingItems.map((item) => item.statusSummary),
|
|
["control"],
|
|
);
|
|
});
|
|
|
|
test("Dispatch planning returns the prompt item when dispatch is allowed", () => {
|
|
const queueItemType = undefined as
|
|
| Parameters<
|
|
typeof __telegramTestUtils.planNextTelegramQueueAction
|
|
>[0][number]
|
|
| undefined;
|
|
const controlItem: typeof queueItemType = {
|
|
kind: "control",
|
|
controlType: "status",
|
|
chatId: 1,
|
|
replyToMessageId: 1,
|
|
queueOrder: 1,
|
|
queueLane: "control",
|
|
laneOrder: 0,
|
|
statusSummary: "control",
|
|
execute: async () => {},
|
|
};
|
|
const promptItem: typeof queueItemType = {
|
|
kind: "prompt",
|
|
chatId: 1,
|
|
replyToMessageId: 2,
|
|
sourceMessageIds: [2],
|
|
queueOrder: 2,
|
|
queueLane: "default",
|
|
laneOrder: 2,
|
|
queuedAttachments: [],
|
|
content: [{ type: "text", text: "prompt" }],
|
|
historyText: "prompt history",
|
|
statusSummary: "prompt",
|
|
};
|
|
const result = __telegramTestUtils.planNextTelegramQueueAction(
|
|
[promptItem, controlItem],
|
|
true,
|
|
);
|
|
assert.equal(result.kind, "prompt");
|
|
assert.equal(
|
|
result.kind === "prompt" ? result.item.statusSummary : "",
|
|
"prompt",
|
|
);
|
|
assert.deepEqual(
|
|
result.remainingItems.map((item) => item.statusSummary),
|
|
["prompt", "control"],
|
|
);
|
|
});
|
|
|
|
test("Dispatch planning runs control items before normal prompts", () => {
|
|
const queueItemType = undefined as
|
|
| Parameters<
|
|
typeof __telegramTestUtils.planNextTelegramQueueAction
|
|
>[0][number]
|
|
| undefined;
|
|
const controlItem: typeof queueItemType = {
|
|
kind: "control",
|
|
controlType: "status",
|
|
chatId: 1,
|
|
replyToMessageId: 1,
|
|
queueOrder: 1,
|
|
queueLane: "control",
|
|
laneOrder: 0,
|
|
statusSummary: "control",
|
|
execute: async () => {},
|
|
};
|
|
const promptItem: typeof queueItemType = {
|
|
kind: "prompt",
|
|
chatId: 1,
|
|
replyToMessageId: 2,
|
|
sourceMessageIds: [2],
|
|
queueOrder: 2,
|
|
queueLane: "default",
|
|
laneOrder: 2,
|
|
queuedAttachments: [],
|
|
content: [{ type: "text", text: "prompt" }],
|
|
historyText: "prompt history",
|
|
statusSummary: "prompt",
|
|
};
|
|
const result = __telegramTestUtils.planNextTelegramQueueAction(
|
|
[controlItem, promptItem],
|
|
true,
|
|
);
|
|
assert.equal(result.kind, "control");
|
|
assert.equal(
|
|
result.kind === "control" ? result.item.statusSummary : "",
|
|
"control",
|
|
);
|
|
assert.deepEqual(
|
|
result.remainingItems.map((item) => item.statusSummary),
|
|
["prompt"],
|
|
);
|
|
});
|
|
|
|
test("Dispatch planning returns none when dispatch is blocked", () => {
|
|
const queueItemType = undefined as
|
|
| Parameters<
|
|
typeof __telegramTestUtils.planNextTelegramQueueAction
|
|
>[0][number]
|
|
| undefined;
|
|
const promptItem: typeof queueItemType = {
|
|
kind: "prompt",
|
|
chatId: 1,
|
|
replyToMessageId: 2,
|
|
sourceMessageIds: [2],
|
|
queueOrder: 2,
|
|
queueLane: "default",
|
|
laneOrder: 2,
|
|
queuedAttachments: [],
|
|
content: [{ type: "text", text: "prompt" }],
|
|
historyText: "prompt history",
|
|
statusSummary: "prompt",
|
|
};
|
|
const result = __telegramTestUtils.planNextTelegramQueueAction(
|
|
[promptItem],
|
|
false,
|
|
);
|
|
assert.equal(result.kind, "none");
|
|
assert.deepEqual(
|
|
result.remainingItems.map((item) => item.statusSummary),
|
|
["prompt"],
|
|
);
|
|
});
|
|
|
|
test("Control-item dispatch sequencing hands off to the next prompt", () => {
|
|
const queueItemType = undefined as
|
|
| Parameters<
|
|
typeof __telegramTestUtils.planNextTelegramQueueAction
|
|
>[0][number]
|
|
| undefined;
|
|
const controlItem: typeof queueItemType = {
|
|
kind: "control",
|
|
controlType: "status",
|
|
chatId: 1,
|
|
replyToMessageId: 1,
|
|
queueOrder: 1,
|
|
queueLane: "control",
|
|
laneOrder: 0,
|
|
statusSummary: "control",
|
|
execute: async () => {},
|
|
};
|
|
const promptItem: typeof queueItemType = {
|
|
kind: "prompt",
|
|
chatId: 1,
|
|
replyToMessageId: 2,
|
|
sourceMessageIds: [2],
|
|
queueOrder: 2,
|
|
queueLane: "default",
|
|
laneOrder: 2,
|
|
queuedAttachments: [],
|
|
content: [{ type: "text", text: "prompt" }],
|
|
historyText: "prompt history",
|
|
statusSummary: "prompt",
|
|
};
|
|
const firstStep = __telegramTestUtils.planNextTelegramQueueAction(
|
|
[controlItem, promptItem],
|
|
true,
|
|
);
|
|
assert.equal(firstStep.kind, "control");
|
|
const secondStep = __telegramTestUtils.planNextTelegramQueueAction(
|
|
firstStep.remainingItems,
|
|
true,
|
|
);
|
|
assert.equal(secondStep.kind, "prompt");
|
|
assert.equal(
|
|
secondStep.kind === "prompt" ? secondStep.item.statusSummary : "",
|
|
"prompt",
|
|
);
|
|
});
|
|
|
|
test("Preserved abort leaves queued prompts waiting for explicit continuation", () => {
|
|
assert.equal(
|
|
__telegramTestUtils.shouldDispatchAfterTelegramAgentEnd({
|
|
hasTurn: true,
|
|
stopReason: "aborted",
|
|
preserveQueuedTurnsAsHistory: true,
|
|
}),
|
|
false,
|
|
);
|
|
const queueItemType = undefined as
|
|
| Parameters<
|
|
typeof __telegramTestUtils.planNextTelegramQueueAction
|
|
>[0][number]
|
|
| undefined;
|
|
const promptItem: typeof queueItemType = {
|
|
kind: "prompt",
|
|
chatId: 1,
|
|
replyToMessageId: 2,
|
|
sourceMessageIds: [2],
|
|
queueOrder: 2,
|
|
queueLane: "default",
|
|
laneOrder: 2,
|
|
queuedAttachments: [],
|
|
content: [{ type: "text", text: "prompt" }],
|
|
historyText: "prompt history",
|
|
statusSummary: "prompt",
|
|
};
|
|
const blockedDispatch = __telegramTestUtils.planNextTelegramQueueAction(
|
|
[promptItem],
|
|
__telegramTestUtils.shouldDispatchAfterTelegramAgentEnd({
|
|
hasTurn: true,
|
|
stopReason: "aborted",
|
|
preserveQueuedTurnsAsHistory: true,
|
|
}),
|
|
);
|
|
assert.equal(blockedDispatch.kind, "none");
|
|
assert.deepEqual(
|
|
blockedDispatch.remainingItems.map((item) => item.statusSummary),
|
|
["prompt"],
|
|
);
|
|
});
|
|
|
|
test("Agent end dispatch policy resumes after success and error, but not preserved aborts", () => {
|
|
assert.equal(
|
|
__telegramTestUtils.shouldDispatchAfterTelegramAgentEnd({
|
|
hasTurn: false,
|
|
preserveQueuedTurnsAsHistory: false,
|
|
}),
|
|
true,
|
|
);
|
|
assert.equal(
|
|
__telegramTestUtils.shouldDispatchAfterTelegramAgentEnd({
|
|
hasTurn: true,
|
|
stopReason: "error",
|
|
preserveQueuedTurnsAsHistory: false,
|
|
}),
|
|
true,
|
|
);
|
|
assert.equal(
|
|
__telegramTestUtils.shouldDispatchAfterTelegramAgentEnd({
|
|
hasTurn: true,
|
|
stopReason: "aborted",
|
|
preserveQueuedTurnsAsHistory: false,
|
|
}),
|
|
true,
|
|
);
|
|
assert.equal(
|
|
__telegramTestUtils.shouldDispatchAfterTelegramAgentEnd({
|
|
hasTurn: true,
|
|
stopReason: "aborted",
|
|
preserveQueuedTurnsAsHistory: true,
|
|
}),
|
|
false,
|
|
);
|
|
});
|
|
|
|
test("Agent end plan classifies turn outcomes correctly", () => {
|
|
const noTurnPlan = __telegramTestUtils.buildTelegramAgentEndPlan({
|
|
hasTurn: false,
|
|
preserveQueuedTurnsAsHistory: false,
|
|
hasFinalText: false,
|
|
hasQueuedAttachments: false,
|
|
});
|
|
assert.equal(noTurnPlan.kind, "no-turn");
|
|
assert.equal(noTurnPlan.shouldDispatchNext, true);
|
|
const abortedPlan = __telegramTestUtils.buildTelegramAgentEndPlan({
|
|
hasTurn: true,
|
|
stopReason: "aborted",
|
|
preserveQueuedTurnsAsHistory: true,
|
|
hasFinalText: false,
|
|
hasQueuedAttachments: true,
|
|
});
|
|
assert.equal(abortedPlan.kind, "aborted");
|
|
assert.equal(abortedPlan.shouldClearPreview, true);
|
|
assert.equal(abortedPlan.shouldDispatchNext, false);
|
|
const errorPlan = __telegramTestUtils.buildTelegramAgentEndPlan({
|
|
hasTurn: true,
|
|
stopReason: "error",
|
|
preserveQueuedTurnsAsHistory: false,
|
|
hasFinalText: false,
|
|
hasQueuedAttachments: false,
|
|
});
|
|
assert.equal(errorPlan.kind, "error");
|
|
assert.equal(errorPlan.shouldSendErrorMessage, true);
|
|
const attachmentPlan = __telegramTestUtils.buildTelegramAgentEndPlan({
|
|
hasTurn: true,
|
|
preserveQueuedTurnsAsHistory: false,
|
|
hasFinalText: false,
|
|
hasQueuedAttachments: true,
|
|
});
|
|
assert.equal(attachmentPlan.kind, "attachments-only");
|
|
assert.equal(attachmentPlan.shouldSendAttachmentNotice, true);
|
|
const textPlan = __telegramTestUtils.buildTelegramAgentEndPlan({
|
|
hasTurn: true,
|
|
preserveQueuedTurnsAsHistory: false,
|
|
hasFinalText: true,
|
|
hasQueuedAttachments: false,
|
|
});
|
|
assert.equal(textPlan.kind, "text");
|
|
assert.equal(textPlan.shouldClearPreview, false);
|
|
});
|
|
|
|
test("Agent start plan consumes a dispatched prompt and resets transient flags", () => {
|
|
const queueItemType = undefined as
|
|
| Parameters<typeof buildTelegramAgentStartPlan>[0]["queuedItems"][number]
|
|
| undefined;
|
|
const promptItem: typeof queueItemType = {
|
|
kind: "prompt",
|
|
chatId: 1,
|
|
replyToMessageId: 2,
|
|
sourceMessageIds: [2],
|
|
queueOrder: 2,
|
|
queueLane: "default",
|
|
laneOrder: 2,
|
|
queuedAttachments: [],
|
|
content: [{ type: "text", text: "prompt" }],
|
|
historyText: "prompt history",
|
|
statusSummary: "prompt",
|
|
};
|
|
const plan = buildTelegramAgentStartPlan({
|
|
queuedItems: [promptItem],
|
|
hasPendingDispatch: true,
|
|
hasActiveTurn: false,
|
|
});
|
|
assert.equal(plan.activeTurn?.statusSummary, "prompt");
|
|
assert.equal(plan.shouldClearDispatchPending, true);
|
|
assert.equal(plan.shouldResetPendingModelSwitch, true);
|
|
assert.equal(plan.shouldResetToolExecutions, true);
|
|
assert.deepEqual(plan.remainingItems, []);
|
|
});
|
|
|
|
test("Tool execution count helper respects active-turn presence", () => {
|
|
assert.equal(
|
|
getNextTelegramToolExecutionCount({
|
|
hasActiveTurn: true,
|
|
currentCount: 0,
|
|
event: "start",
|
|
}),
|
|
1,
|
|
);
|
|
assert.equal(
|
|
getNextTelegramToolExecutionCount({
|
|
hasActiveTurn: true,
|
|
currentCount: 1,
|
|
event: "end",
|
|
}),
|
|
0,
|
|
);
|
|
assert.equal(
|
|
getNextTelegramToolExecutionCount({
|
|
hasActiveTurn: false,
|
|
currentCount: 3,
|
|
event: "end",
|
|
}),
|
|
3,
|
|
);
|
|
});
|
|
|
|
test("Dispatch is allowed only when every guard is clear", () => {
|
|
assert.equal(
|
|
__telegramTestUtils.canDispatchTelegramTurnState({
|
|
compactionInProgress: false,
|
|
hasActiveTelegramTurn: false,
|
|
hasPendingTelegramDispatch: false,
|
|
isIdle: true,
|
|
hasPendingMessages: false,
|
|
}),
|
|
true,
|
|
);
|
|
});
|
|
|
|
test("Dispatch is blocked during compaction", () => {
|
|
assert.equal(
|
|
__telegramTestUtils.canDispatchTelegramTurnState({
|
|
compactionInProgress: true,
|
|
hasActiveTelegramTurn: false,
|
|
hasPendingTelegramDispatch: false,
|
|
isIdle: true,
|
|
hasPendingMessages: false,
|
|
}),
|
|
false,
|
|
);
|
|
});
|
|
|
|
test("Dispatch is blocked while a Telegram turn is active or pending", () => {
|
|
assert.equal(
|
|
__telegramTestUtils.canDispatchTelegramTurnState({
|
|
compactionInProgress: false,
|
|
hasActiveTelegramTurn: true,
|
|
hasPendingTelegramDispatch: false,
|
|
isIdle: true,
|
|
hasPendingMessages: false,
|
|
}),
|
|
false,
|
|
);
|
|
assert.equal(
|
|
__telegramTestUtils.canDispatchTelegramTurnState({
|
|
compactionInProgress: false,
|
|
hasActiveTelegramTurn: false,
|
|
hasPendingTelegramDispatch: true,
|
|
isIdle: true,
|
|
hasPendingMessages: false,
|
|
}),
|
|
false,
|
|
);
|
|
});
|
|
|
|
test("Dispatch is blocked when pi is busy or has pending messages", () => {
|
|
assert.equal(
|
|
__telegramTestUtils.canDispatchTelegramTurnState({
|
|
compactionInProgress: false,
|
|
hasActiveTelegramTurn: false,
|
|
hasPendingTelegramDispatch: false,
|
|
isIdle: false,
|
|
hasPendingMessages: false,
|
|
}),
|
|
false,
|
|
);
|
|
assert.equal(
|
|
__telegramTestUtils.canDispatchTelegramTurnState({
|
|
compactionInProgress: false,
|
|
hasActiveTelegramTurn: false,
|
|
hasPendingTelegramDispatch: false,
|
|
isIdle: true,
|
|
hasPendingMessages: true,
|
|
}),
|
|
false,
|
|
);
|
|
});
|
|
|
|
test("In-flight model switch is allowed only for active Telegram turns with abort support", () => {
|
|
assert.equal(
|
|
__telegramTestUtils.canRestartTelegramTurnForModelSwitch({
|
|
isIdle: false,
|
|
hasActiveTelegramTurn: true,
|
|
hasAbortHandler: true,
|
|
}),
|
|
true,
|
|
);
|
|
assert.equal(
|
|
__telegramTestUtils.canRestartTelegramTurnForModelSwitch({
|
|
isIdle: true,
|
|
hasActiveTelegramTurn: true,
|
|
hasAbortHandler: true,
|
|
}),
|
|
false,
|
|
);
|
|
assert.equal(
|
|
__telegramTestUtils.canRestartTelegramTurnForModelSwitch({
|
|
isIdle: false,
|
|
hasActiveTelegramTurn: false,
|
|
hasAbortHandler: true,
|
|
}),
|
|
false,
|
|
);
|
|
assert.equal(
|
|
__telegramTestUtils.canRestartTelegramTurnForModelSwitch({
|
|
isIdle: false,
|
|
hasActiveTelegramTurn: true,
|
|
hasAbortHandler: false,
|
|
}),
|
|
false,
|
|
);
|
|
});
|
|
|
|
test("Pending model switch abort waits until no tool executions remain", () => {
|
|
assert.equal(
|
|
__telegramTestUtils.shouldTriggerPendingTelegramModelSwitchAbort({
|
|
hasPendingModelSwitch: true,
|
|
hasActiveTelegramTurn: true,
|
|
hasAbortHandler: true,
|
|
activeToolExecutions: 0,
|
|
}),
|
|
true,
|
|
);
|
|
assert.equal(
|
|
__telegramTestUtils.shouldTriggerPendingTelegramModelSwitchAbort({
|
|
hasPendingModelSwitch: true,
|
|
hasActiveTelegramTurn: true,
|
|
hasAbortHandler: true,
|
|
activeToolExecutions: 1,
|
|
}),
|
|
false,
|
|
);
|
|
assert.equal(
|
|
__telegramTestUtils.shouldTriggerPendingTelegramModelSwitchAbort({
|
|
hasPendingModelSwitch: false,
|
|
hasActiveTelegramTurn: true,
|
|
hasAbortHandler: true,
|
|
activeToolExecutions: 0,
|
|
}),
|
|
false,
|
|
);
|
|
});
|
|
|
|
test("Model-switch continuation restart queues before abort when state is present", () => {
|
|
const events: string[] = [];
|
|
assert.equal(
|
|
__telegramTestUtils.restartTelegramModelSwitchContinuation({
|
|
activeTurn: { id: 1 },
|
|
abort: () => {
|
|
events.push("abort");
|
|
},
|
|
selection: { model: { provider: "openai", id: "gpt-5" } },
|
|
queueContinuation: (turn, selection) => {
|
|
events.push(`queue:${turn.id}:${selection.model.id}`);
|
|
},
|
|
}),
|
|
true,
|
|
);
|
|
assert.deepEqual(events, ["queue:1:gpt-5", "abort"]);
|
|
assert.equal(
|
|
__telegramTestUtils.restartTelegramModelSwitchContinuation({
|
|
activeTurn: undefined,
|
|
abort: () => {},
|
|
selection: { model: { provider: "openai", id: "gpt-5" } },
|
|
queueContinuation: () => {
|
|
events.push("unexpected");
|
|
},
|
|
}),
|
|
false,
|
|
);
|
|
});
|
|
|
|
test("Continuation prompt stays Telegram-scoped and resume-oriented", () => {
|
|
const text = __telegramTestUtils.buildTelegramModelSwitchContinuationText(
|
|
{ provider: "openai", id: "gpt-5" },
|
|
"high",
|
|
);
|
|
assert.match(text, /^\[telegram\]/);
|
|
assert.match(text, /Continue the interrupted previous Telegram request/);
|
|
assert.match(text, /openai\/gpt-5/);
|
|
assert.match(text, /thinking level \(high\)/);
|
|
});
|
|
|
|
test("Control runtime runs the control item and always settles", async () => {
|
|
const events: string[] = [];
|
|
await executeTelegramControlItemRuntime(
|
|
{
|
|
kind: "control",
|
|
controlType: "status",
|
|
chatId: 1,
|
|
replyToMessageId: 2,
|
|
queueOrder: 1,
|
|
queueLane: "control",
|
|
laneOrder: 0,
|
|
statusSummary: "status",
|
|
execute: async () => {
|
|
events.push("execute");
|
|
},
|
|
},
|
|
{
|
|
ctx: {} as never,
|
|
sendTextReply: async () => {
|
|
events.push("reply");
|
|
return undefined;
|
|
},
|
|
onSettled: () => {
|
|
events.push("settled");
|
|
},
|
|
},
|
|
);
|
|
assert.deepEqual(events, ["execute", "settled"]);
|
|
});
|
|
|
|
test("Control runtime reports failures before settling", async () => {
|
|
const events: string[] = [];
|
|
await executeTelegramControlItemRuntime(
|
|
{
|
|
kind: "control",
|
|
controlType: "model",
|
|
chatId: 3,
|
|
replyToMessageId: 4,
|
|
queueOrder: 2,
|
|
queueLane: "control",
|
|
laneOrder: 1,
|
|
statusSummary: "model",
|
|
execute: async () => {
|
|
throw new Error("boom");
|
|
},
|
|
},
|
|
{
|
|
ctx: {} as never,
|
|
sendTextReply: async (_chatId, _replyToMessageId, text) => {
|
|
events.push(text);
|
|
return undefined;
|
|
},
|
|
onSettled: () => {
|
|
events.push("settled");
|
|
},
|
|
},
|
|
);
|
|
assert.deepEqual(events, ["Telegram control action failed: boom", "settled"]);
|
|
});
|
|
|
|
test("Dispatch runtime idles on none and executes control items directly", () => {
|
|
const events: string[] = [];
|
|
executeTelegramQueueDispatchPlan(
|
|
{ kind: "none", remainingItems: [] },
|
|
{
|
|
executeControlItem: () => {
|
|
events.push("control");
|
|
},
|
|
onPromptDispatchStart: () => {
|
|
events.push("prompt-start");
|
|
},
|
|
sendUserMessage: () => {
|
|
events.push("prompt");
|
|
},
|
|
onPromptDispatchFailure: (message) => {
|
|
events.push(`error:${message}`);
|
|
},
|
|
onIdle: () => {
|
|
events.push("idle");
|
|
},
|
|
},
|
|
);
|
|
executeTelegramQueueDispatchPlan(
|
|
{
|
|
kind: "control",
|
|
item: {
|
|
kind: "control",
|
|
controlType: "status",
|
|
chatId: 1,
|
|
replyToMessageId: 1,
|
|
queueOrder: 1,
|
|
queueLane: "control",
|
|
laneOrder: 0,
|
|
statusSummary: "status",
|
|
execute: async () => {},
|
|
},
|
|
remainingItems: [],
|
|
},
|
|
{
|
|
executeControlItem: () => {
|
|
events.push("control");
|
|
},
|
|
onPromptDispatchStart: () => {
|
|
events.push("prompt-start");
|
|
},
|
|
sendUserMessage: () => {
|
|
events.push("prompt");
|
|
},
|
|
onPromptDispatchFailure: (message) => {
|
|
events.push(`error:${message}`);
|
|
},
|
|
onIdle: () => {
|
|
events.push("idle");
|
|
},
|
|
},
|
|
);
|
|
assert.deepEqual(events, ["idle", "control"]);
|
|
});
|
|
|
|
test("Dispatch runtime reports prompt dispatch failures after starting", () => {
|
|
const events: string[] = [];
|
|
executeTelegramQueueDispatchPlan(
|
|
{
|
|
kind: "prompt",
|
|
item: {
|
|
kind: "prompt",
|
|
chatId: 2,
|
|
replyToMessageId: 3,
|
|
sourceMessageIds: [3],
|
|
queueOrder: 2,
|
|
queueLane: "default",
|
|
laneOrder: 2,
|
|
queuedAttachments: [],
|
|
content: [{ type: "text", text: "prompt" }],
|
|
historyText: "prompt",
|
|
statusSummary: "prompt",
|
|
},
|
|
remainingItems: [],
|
|
},
|
|
{
|
|
executeControlItem: () => {
|
|
events.push("control");
|
|
},
|
|
onPromptDispatchStart: (chatId) => {
|
|
events.push(`start:${chatId}`);
|
|
},
|
|
sendUserMessage: () => {
|
|
throw new Error("boom");
|
|
},
|
|
onPromptDispatchFailure: (message) => {
|
|
events.push(`error:${message}`);
|
|
},
|
|
onIdle: () => {
|
|
events.push("idle");
|
|
},
|
|
},
|
|
);
|
|
assert.deepEqual(events, ["start:2", "error:boom"]);
|
|
});
|
|
|
|
test("Session runtime helper starts polling only when a bot token exists and polling is idle", () => {
|
|
assert.equal(
|
|
shouldStartTelegramPolling({
|
|
hasBotToken: true,
|
|
hasPollingPromise: false,
|
|
}),
|
|
true,
|
|
);
|
|
assert.equal(
|
|
shouldStartTelegramPolling({
|
|
hasBotToken: false,
|
|
hasPollingPromise: false,
|
|
}),
|
|
false,
|
|
);
|
|
assert.equal(
|
|
shouldStartTelegramPolling({
|
|
hasBotToken: true,
|
|
hasPollingPromise: true,
|
|
}),
|
|
false,
|
|
);
|
|
});
|
|
|
|
test("Session runtime helper resets session start state", () => {
|
|
const currentModel = { provider: "openai", id: "gpt-5" } as const;
|
|
const state = buildTelegramSessionStartState(currentModel as never);
|
|
assert.equal(state.currentTelegramModel, currentModel);
|
|
assert.equal(state.activeTelegramToolExecutions, 0);
|
|
assert.equal(state.nextQueuedTelegramItemOrder, 0);
|
|
assert.equal(state.nextQueuedTelegramControlOrder, 0);
|
|
assert.equal(state.telegramTurnDispatchPending, false);
|
|
assert.equal(state.compactionInProgress, false);
|
|
});
|
|
|
|
test("Session runtime helper clears shutdown state", () => {
|
|
const state = buildTelegramSessionShutdownState<string>();
|
|
assert.deepEqual(state.queuedTelegramItems, []);
|
|
assert.equal(state.nextQueuedTelegramItemOrder, 0);
|
|
assert.equal(state.nextQueuedTelegramControlOrder, 0);
|
|
assert.equal(state.nextPriorityReactionOrder, 0);
|
|
assert.equal(state.currentTelegramModel, undefined);
|
|
assert.equal(state.activeTelegramToolExecutions, 0);
|
|
assert.equal(state.telegramTurnDispatchPending, false);
|
|
assert.equal(state.compactionInProgress, false);
|
|
assert.equal(state.preserveQueuedTurnsAsHistory, false);
|
|
});
|
|
|
|
test("Extension runtime polls, pairs, and dispatches an inbound Telegram turn into pi", async () => {
|
|
const agentDir = join(homedir(), ".pi", "agent");
|
|
const configPath = join(agentDir, "telegram.json");
|
|
const previousConfig = await readFile(configPath, "utf8").catch(
|
|
() => undefined,
|
|
);
|
|
const handlers = new Map<
|
|
string,
|
|
(event: unknown, ctx: unknown) => Promise<unknown>
|
|
>();
|
|
const commands = new Map<
|
|
string,
|
|
{ handler: (args: string, ctx: unknown) => Promise<void> }
|
|
>();
|
|
const sentMessages: Array<string | Array<{ type: string; text?: string }>> =
|
|
[];
|
|
let resolveDispatch:
|
|
| ((value: string | Array<{ type: string; text?: string }>) => void)
|
|
| undefined;
|
|
const dispatched = new Promise<
|
|
string | Array<{ type: string; text?: string }>
|
|
>((resolve) => {
|
|
resolveDispatch = resolve;
|
|
});
|
|
const pi = {
|
|
on: (
|
|
event: string,
|
|
handler: (event: unknown, ctx: unknown) => Promise<unknown>,
|
|
) => {
|
|
handlers.set(event, handler);
|
|
},
|
|
registerCommand: (
|
|
name: string,
|
|
definition: { handler: (args: string, ctx: unknown) => Promise<void> },
|
|
) => {
|
|
commands.set(name, definition);
|
|
},
|
|
registerTool: () => {},
|
|
sendUserMessage: (
|
|
content: string | Array<{ type: string; text?: string }>,
|
|
) => {
|
|
sentMessages.push(content);
|
|
resolveDispatch?.(content);
|
|
},
|
|
getThinkingLevel: () => "medium",
|
|
} as never;
|
|
const originalFetch = globalThis.fetch;
|
|
let getUpdatesCalls = 0;
|
|
const apiCalls: string[] = [];
|
|
globalThis.fetch = async (input) => {
|
|
const url = typeof input === "string" ? input : input.toString();
|
|
const method = url.split("/").at(-1) ?? "";
|
|
apiCalls.push(method);
|
|
if (method === "deleteWebhook") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "getUpdates") {
|
|
getUpdatesCalls += 1;
|
|
if (getUpdatesCalls === 1) {
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 1,
|
|
message: {
|
|
message_id: 42,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "hello from telegram",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response;
|
|
}
|
|
throw new DOMException("stop", "AbortError");
|
|
}
|
|
if (method === "sendMessage") {
|
|
return {
|
|
json: async () => ({ ok: true, result: { message_id: 100 } }),
|
|
} as Response;
|
|
}
|
|
if (method === "sendChatAction") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
throw new Error(`Unexpected Telegram API method: ${method}`);
|
|
};
|
|
try {
|
|
await mkdir(agentDir, { recursive: true });
|
|
await writeFile(
|
|
configPath,
|
|
JSON.stringify({ botToken: "123:abc", lastUpdateId: 0 }, null, "\t") +
|
|
"\n",
|
|
"utf8",
|
|
);
|
|
telegramExtension(pi);
|
|
const ctx = {
|
|
hasUI: true,
|
|
model: undefined,
|
|
signal: undefined,
|
|
ui: {
|
|
theme: {
|
|
fg: (_token: string, text: string) => text,
|
|
},
|
|
setStatus: () => {},
|
|
notify: () => {},
|
|
},
|
|
isIdle: () => true,
|
|
hasPendingMessages: () => false,
|
|
abort: () => {},
|
|
} as never;
|
|
await handlers.get("session_start")?.({}, ctx);
|
|
await commands.get("telegram-connect")?.handler("", ctx);
|
|
const dispatchedContent = await dispatched;
|
|
assert.equal(sentMessages.length, 1);
|
|
assert.equal(Array.isArray(dispatchedContent), true);
|
|
assert.equal(apiCalls.includes("sendMessage"), true);
|
|
assert.equal(apiCalls.includes("sendChatAction"), true);
|
|
const promptBlocks = dispatchedContent as Array<{
|
|
type: string;
|
|
text?: string;
|
|
}>;
|
|
assert.equal(promptBlocks[0]?.type, "text");
|
|
assert.match(
|
|
promptBlocks[0]?.text ?? "",
|
|
/^\[telegram\] hello from telegram$/,
|
|
);
|
|
await handlers.get("session_shutdown")?.({}, ctx);
|
|
} finally {
|
|
globalThis.fetch = originalFetch;
|
|
if (previousConfig === undefined) {
|
|
await rm(configPath, { force: true });
|
|
} else {
|
|
await writeFile(configPath, previousConfig, "utf8");
|
|
}
|
|
}
|
|
});
|
|
|
|
test("Extension runtime finalizes a drafted preview into the final Telegram reply on agent end", async () => {
|
|
const agentDir = join(homedir(), ".pi", "agent");
|
|
const configPath = join(agentDir, "telegram.json");
|
|
const previousConfig = await readFile(configPath, "utf8").catch(
|
|
() => undefined,
|
|
);
|
|
const handlers = new Map<
|
|
string,
|
|
(event: unknown, ctx: unknown) => Promise<unknown>
|
|
>();
|
|
const commands = new Map<
|
|
string,
|
|
{ handler: (args: string, ctx: unknown) => Promise<void> }
|
|
>();
|
|
let resolveDispatch: (() => void) | undefined;
|
|
const dispatched = new Promise<void>((resolve) => {
|
|
resolveDispatch = resolve;
|
|
});
|
|
const draftTexts: string[] = [];
|
|
const sentTexts: string[] = [];
|
|
const pi = {
|
|
on: (
|
|
event: string,
|
|
handler: (event: unknown, ctx: unknown) => Promise<unknown>,
|
|
) => {
|
|
handlers.set(event, handler);
|
|
},
|
|
registerCommand: (
|
|
name: string,
|
|
definition: { handler: (args: string, ctx: unknown) => Promise<void> },
|
|
) => {
|
|
commands.set(name, definition);
|
|
},
|
|
registerTool: () => {},
|
|
sendUserMessage: () => {
|
|
resolveDispatch?.();
|
|
},
|
|
getThinkingLevel: () => "medium",
|
|
} as never;
|
|
const originalFetch = globalThis.fetch;
|
|
let getUpdatesCalls = 0;
|
|
globalThis.fetch = async (input, init) => {
|
|
const url = typeof input === "string" ? input : input.toString();
|
|
const method = url.split("/").at(-1) ?? "";
|
|
const body =
|
|
typeof init?.body === "string"
|
|
? (JSON.parse(init.body) as Record<string, unknown>)
|
|
: undefined;
|
|
if (method === "deleteWebhook") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "getUpdates") {
|
|
getUpdatesCalls += 1;
|
|
if (getUpdatesCalls === 1) {
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 1,
|
|
message: {
|
|
message_id: 7,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "please answer",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response;
|
|
}
|
|
throw new DOMException("stop", "AbortError");
|
|
}
|
|
if (method === "sendMessageDraft") {
|
|
draftTexts.push(String(body?.text ?? ""));
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "sendMessage") {
|
|
sentTexts.push(String(body?.text ?? ""));
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: { message_id: 100 + sentTexts.length },
|
|
}),
|
|
} as Response;
|
|
}
|
|
if (method === "sendChatAction") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "editMessageText") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
throw new Error(`Unexpected Telegram API method: ${method}`);
|
|
};
|
|
try {
|
|
await mkdir(agentDir, { recursive: true });
|
|
await writeFile(
|
|
configPath,
|
|
JSON.stringify(
|
|
{ botToken: "123:abc", allowedUserId: 77, lastUpdateId: 0 },
|
|
null,
|
|
"\t",
|
|
) + "\n",
|
|
"utf8",
|
|
);
|
|
telegramExtension(pi);
|
|
const ctx = {
|
|
hasUI: true,
|
|
model: undefined,
|
|
signal: undefined,
|
|
ui: {
|
|
theme: {
|
|
fg: (_token: string, text: string) => text,
|
|
},
|
|
setStatus: () => {},
|
|
notify: () => {},
|
|
},
|
|
isIdle: () => true,
|
|
hasPendingMessages: () => false,
|
|
abort: () => {},
|
|
} as never;
|
|
await handlers.get("session_start")?.({}, ctx);
|
|
await commands.get("telegram-connect")?.handler("", ctx);
|
|
await dispatched;
|
|
await handlers.get("agent_start")?.({}, ctx);
|
|
await handlers.get("message_update")?.(
|
|
{
|
|
message: {
|
|
role: "assistant",
|
|
content: [{ type: "text", text: "Draft **preview**" }],
|
|
},
|
|
},
|
|
ctx,
|
|
);
|
|
await new Promise((resolve) => setTimeout(resolve, 850));
|
|
await handlers.get("agent_end")?.(
|
|
{
|
|
messages: [
|
|
{
|
|
role: "assistant",
|
|
content: [{ type: "text", text: "Final **answer**" }],
|
|
},
|
|
],
|
|
},
|
|
ctx,
|
|
);
|
|
assert.deepEqual(draftTexts, ["Draft preview", "Final answer", ""]);
|
|
assert.equal(sentTexts.length, 1);
|
|
assert.match(sentTexts[0] ?? "", /Final <b>answer<\/b>/);
|
|
await handlers.get("session_shutdown")?.({}, ctx);
|
|
} finally {
|
|
globalThis.fetch = originalFetch;
|
|
if (previousConfig === undefined) {
|
|
await rm(configPath, { force: true });
|
|
} else {
|
|
await writeFile(configPath, previousConfig, "utf8");
|
|
}
|
|
}
|
|
});
|
|
|
|
test("Extension runtime carries queued follow-ups into history after an aborted turn", async () => {
|
|
const agentDir = join(homedir(), ".pi", "agent");
|
|
const configPath = join(agentDir, "telegram.json");
|
|
const previousConfig = await readFile(configPath, "utf8").catch(
|
|
() => undefined,
|
|
);
|
|
const handlers = new Map<
|
|
string,
|
|
(event: unknown, ctx: unknown) => Promise<unknown>
|
|
>();
|
|
const commands = new Map<
|
|
string,
|
|
{ handler: (args: string, ctx: unknown) => Promise<void> }
|
|
>();
|
|
const sentMessages: Array<string | Array<{ type: string; text?: string }>> =
|
|
[];
|
|
let firstDispatchResolved = false;
|
|
let secondUpdatesResolve: ((value: Response) => void) | undefined;
|
|
let thirdUpdatesResolve: ((value: Response) => void) | undefined;
|
|
let fourthUpdatesResolve: ((value: Response) => void) | undefined;
|
|
const secondUpdates = new Promise<Response>((resolve) => {
|
|
secondUpdatesResolve = resolve;
|
|
});
|
|
const thirdUpdates = new Promise<Response>((resolve) => {
|
|
thirdUpdatesResolve = resolve;
|
|
});
|
|
const fourthUpdates = new Promise<Response>((resolve) => {
|
|
fourthUpdatesResolve = resolve;
|
|
});
|
|
const pi = {
|
|
on: (
|
|
event: string,
|
|
handler: (event: unknown, ctx: unknown) => Promise<unknown>,
|
|
) => {
|
|
handlers.set(event, handler);
|
|
},
|
|
registerCommand: (
|
|
name: string,
|
|
definition: { handler: (args: string, ctx: unknown) => Promise<void> },
|
|
) => {
|
|
commands.set(name, definition);
|
|
},
|
|
registerTool: () => {},
|
|
sendUserMessage: (
|
|
content: string | Array<{ type: string; text?: string }>,
|
|
) => {
|
|
sentMessages.push(content);
|
|
firstDispatchResolved = true;
|
|
},
|
|
getThinkingLevel: () => "medium",
|
|
} as never;
|
|
const originalFetch = globalThis.fetch;
|
|
let getUpdatesCalls = 0;
|
|
const sendTexts: string[] = [];
|
|
globalThis.fetch = async (input, init) => {
|
|
const url = typeof input === "string" ? input : input.toString();
|
|
const method = url.split("/").at(-1) ?? "";
|
|
const body =
|
|
typeof init?.body === "string"
|
|
? (JSON.parse(init.body) as Record<string, unknown>)
|
|
: undefined;
|
|
if (method === "deleteWebhook") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "getUpdates") {
|
|
getUpdatesCalls += 1;
|
|
if (getUpdatesCalls === 1) {
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 1,
|
|
message: {
|
|
message_id: 10,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "first request",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response;
|
|
}
|
|
if (getUpdatesCalls === 2) return secondUpdates;
|
|
if (getUpdatesCalls === 3) return thirdUpdates;
|
|
if (getUpdatesCalls === 4) return fourthUpdates;
|
|
throw new DOMException("stop", "AbortError");
|
|
}
|
|
if (method === "sendMessage") {
|
|
sendTexts.push(String(body?.text ?? ""));
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: { message_id: 100 + sendTexts.length },
|
|
}),
|
|
} as Response;
|
|
}
|
|
if (method === "sendChatAction") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
throw new Error(`Unexpected Telegram API method: ${method}`);
|
|
};
|
|
try {
|
|
await mkdir(agentDir, { recursive: true });
|
|
await writeFile(
|
|
configPath,
|
|
JSON.stringify(
|
|
{ botToken: "123:abc", allowedUserId: 77, lastUpdateId: 0 },
|
|
null,
|
|
"\t",
|
|
) + "\n",
|
|
"utf8",
|
|
);
|
|
telegramExtension(pi);
|
|
const baseCtx = {
|
|
hasUI: true,
|
|
model: undefined,
|
|
signal: undefined,
|
|
ui: {
|
|
theme: {
|
|
fg: (_token: string, text: string) => text,
|
|
},
|
|
setStatus: () => {},
|
|
notify: () => {},
|
|
},
|
|
hasPendingMessages: () => false,
|
|
};
|
|
const idleCtx = {
|
|
...baseCtx,
|
|
isIdle: () => true,
|
|
abort: () => {},
|
|
} as never;
|
|
let aborted = false;
|
|
const activeCtx = {
|
|
...baseCtx,
|
|
isIdle: () => false,
|
|
abort: () => {
|
|
aborted = true;
|
|
},
|
|
} as never;
|
|
await handlers.get("session_start")?.({}, idleCtx);
|
|
await commands.get("telegram-connect")?.handler("", idleCtx);
|
|
await waitForCondition(() => firstDispatchResolved);
|
|
await handlers.get("agent_start")?.({}, activeCtx);
|
|
secondUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 2,
|
|
message: {
|
|
message_id: 11,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "follow up",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() => getUpdatesCalls >= 3);
|
|
thirdUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 3,
|
|
message: {
|
|
message_id: 12,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "/stop",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() => aborted);
|
|
await handlers.get("agent_end")?.(
|
|
{
|
|
messages: [
|
|
{
|
|
role: "assistant",
|
|
stopReason: "aborted",
|
|
content: [{ type: "text", text: "" }],
|
|
},
|
|
],
|
|
},
|
|
idleCtx,
|
|
);
|
|
const dispatchCountBeforeNextTurn = sentMessages.length;
|
|
fourthUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 4,
|
|
message: {
|
|
message_id: 13,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "new request",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(
|
|
() => sentMessages.length === dispatchCountBeforeNextTurn + 1,
|
|
);
|
|
const promptBlocks = sentMessages.at(-1) as Array<{
|
|
type: string;
|
|
text?: string;
|
|
}>;
|
|
const promptText = promptBlocks[0]?.text ?? "";
|
|
assert.match(promptText, /^\[telegram\]/);
|
|
assert.match(
|
|
promptText,
|
|
/Earlier Telegram messages arrived after an aborted turn/,
|
|
);
|
|
assert.match(promptText, /1\. follow up/);
|
|
assert.match(promptText, /Current Telegram message:\nnew request/);
|
|
assert.equal(sendTexts.includes("Aborted current turn."), true);
|
|
await handlers.get("session_shutdown")?.({}, idleCtx);
|
|
} finally {
|
|
globalThis.fetch = originalFetch;
|
|
if (previousConfig === undefined) {
|
|
await rm(configPath, { force: true });
|
|
} else {
|
|
await writeFile(configPath, previousConfig, "utf8");
|
|
}
|
|
}
|
|
});
|
|
|
|
test("Extension runtime runs queued status control before the next queued prompt after agent end", async () => {
|
|
const agentDir = join(homedir(), ".pi", "agent");
|
|
const configPath = join(agentDir, "telegram.json");
|
|
const previousConfig = await readFile(configPath, "utf8").catch(
|
|
() => undefined,
|
|
);
|
|
const handlers = new Map<
|
|
string,
|
|
(event: unknown, ctx: unknown) => Promise<unknown>
|
|
>();
|
|
const commands = new Map<
|
|
string,
|
|
{ handler: (args: string, ctx: unknown) => Promise<void> }
|
|
>();
|
|
const runtimeEvents: string[] = [];
|
|
let firstDispatchResolved = false;
|
|
let secondUpdatesResolve: ((value: Response) => void) | undefined;
|
|
let thirdUpdatesResolve: ((value: Response) => void) | undefined;
|
|
const secondUpdates = new Promise<Response>((resolve) => {
|
|
secondUpdatesResolve = resolve;
|
|
});
|
|
const thirdUpdates = new Promise<Response>((resolve) => {
|
|
thirdUpdatesResolve = resolve;
|
|
});
|
|
const pi = {
|
|
on: (
|
|
event: string,
|
|
handler: (event: unknown, ctx: unknown) => Promise<unknown>,
|
|
) => {
|
|
handlers.set(event, handler);
|
|
},
|
|
registerCommand: (
|
|
name: string,
|
|
definition: { handler: (args: string, ctx: unknown) => Promise<void> },
|
|
) => {
|
|
commands.set(name, definition);
|
|
},
|
|
registerTool: () => {},
|
|
sendUserMessage: (
|
|
content: string | Array<{ type: string; text?: string }>,
|
|
) => {
|
|
const promptBlocks = content as Array<{ type: string; text?: string }>;
|
|
runtimeEvents.push(`dispatch:${promptBlocks[0]?.text ?? ""}`);
|
|
firstDispatchResolved = true;
|
|
},
|
|
getThinkingLevel: () => "medium",
|
|
} as never;
|
|
const originalFetch = globalThis.fetch;
|
|
let getUpdatesCalls = 0;
|
|
globalThis.fetch = async (input, init) => {
|
|
const url = typeof input === "string" ? input : input.toString();
|
|
const method = url.split("/").at(-1) ?? "";
|
|
const body =
|
|
typeof init?.body === "string"
|
|
? (JSON.parse(init.body) as Record<string, unknown>)
|
|
: undefined;
|
|
if (method === "deleteWebhook") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "getUpdates") {
|
|
getUpdatesCalls += 1;
|
|
if (getUpdatesCalls === 1) {
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 1,
|
|
message: {
|
|
message_id: 20,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "first request",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response;
|
|
}
|
|
if (getUpdatesCalls === 2) return secondUpdates;
|
|
if (getUpdatesCalls === 3) return thirdUpdates;
|
|
throw new DOMException("stop", "AbortError");
|
|
}
|
|
if (method === "sendMessage") {
|
|
runtimeEvents.push(`send:${String(body?.text ?? "")}`);
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: { message_id: 100 + runtimeEvents.length },
|
|
}),
|
|
} as Response;
|
|
}
|
|
if (method === "sendChatAction") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
throw new Error(`Unexpected Telegram API method: ${method}`);
|
|
};
|
|
try {
|
|
await mkdir(agentDir, { recursive: true });
|
|
await writeFile(
|
|
configPath,
|
|
JSON.stringify(
|
|
{ botToken: "123:abc", allowedUserId: 77, lastUpdateId: 0 },
|
|
null,
|
|
"\t",
|
|
) + "\n",
|
|
"utf8",
|
|
);
|
|
telegramExtension(pi);
|
|
const baseCtx = {
|
|
hasUI: true,
|
|
cwd: process.cwd(),
|
|
model: undefined,
|
|
signal: undefined,
|
|
ui: {
|
|
theme: {
|
|
fg: (_token: string, text: string) => text,
|
|
},
|
|
setStatus: () => {},
|
|
notify: () => {},
|
|
},
|
|
sessionManager: {
|
|
getEntries: () => [],
|
|
},
|
|
modelRegistry: {
|
|
refresh: () => {},
|
|
getAvailable: () => [],
|
|
isUsingOAuth: () => false,
|
|
},
|
|
getContextUsage: () => undefined,
|
|
hasPendingMessages: () => false,
|
|
abort: () => {},
|
|
};
|
|
const idleCtx = {
|
|
...baseCtx,
|
|
isIdle: () => true,
|
|
} as never;
|
|
const activeCtx = {
|
|
...baseCtx,
|
|
isIdle: () => false,
|
|
} as never;
|
|
await handlers.get("session_start")?.({}, idleCtx);
|
|
await commands.get("telegram-connect")?.handler("", idleCtx);
|
|
await waitForCondition(() => firstDispatchResolved);
|
|
await handlers.get("agent_start")?.({}, activeCtx);
|
|
secondUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 2,
|
|
message: {
|
|
message_id: 21,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "/status",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() => getUpdatesCalls >= 3);
|
|
thirdUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 3,
|
|
message: {
|
|
message_id: 22,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "follow up after status",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() => runtimeEvents.length >= 1);
|
|
await handlers.get("agent_end")?.(
|
|
{
|
|
messages: [
|
|
{
|
|
role: "assistant",
|
|
content: [{ type: "text", text: "" }],
|
|
},
|
|
],
|
|
},
|
|
idleCtx,
|
|
);
|
|
await waitForCondition(() => runtimeEvents.length >= 3);
|
|
assert.equal(runtimeEvents[0], "dispatch:[telegram] first request");
|
|
assert.match(runtimeEvents[1] ?? "", /^send:<b>Context:<\/b>/);
|
|
assert.equal(
|
|
runtimeEvents[2],
|
|
"dispatch:[telegram] follow up after status",
|
|
);
|
|
await handlers.get("session_shutdown")?.({}, idleCtx);
|
|
} finally {
|
|
globalThis.fetch = originalFetch;
|
|
if (previousConfig === undefined) {
|
|
await rm(configPath, { force: true });
|
|
} else {
|
|
await writeFile(configPath, previousConfig, "utf8");
|
|
}
|
|
}
|
|
});
|
|
|
|
test("Extension runtime runs queued model control before the next queued prompt after agent end", async () => {
|
|
const agentDir = join(homedir(), ".pi", "agent");
|
|
const configPath = join(agentDir, "telegram.json");
|
|
const previousConfig = await readFile(configPath, "utf8").catch(
|
|
() => undefined,
|
|
);
|
|
const handlers = new Map<
|
|
string,
|
|
(event: unknown, ctx: unknown) => Promise<unknown>
|
|
>();
|
|
const commands = new Map<
|
|
string,
|
|
{ handler: (args: string, ctx: unknown) => Promise<void> }
|
|
>();
|
|
const runtimeEvents: string[] = [];
|
|
const modelA = {
|
|
provider: "openai",
|
|
id: "gpt-a",
|
|
reasoning: true,
|
|
} as const;
|
|
const modelB = {
|
|
provider: "anthropic",
|
|
id: "claude-b",
|
|
reasoning: false,
|
|
} as const;
|
|
let firstDispatchResolved = false;
|
|
let secondUpdatesResolve: ((value: Response) => void) | undefined;
|
|
let thirdUpdatesResolve: ((value: Response) => void) | undefined;
|
|
const secondUpdates = new Promise<Response>((resolve) => {
|
|
secondUpdatesResolve = resolve;
|
|
});
|
|
const thirdUpdates = new Promise<Response>((resolve) => {
|
|
thirdUpdatesResolve = resolve;
|
|
});
|
|
const pi = {
|
|
on: (
|
|
event: string,
|
|
handler: (event: unknown, ctx: unknown) => Promise<unknown>,
|
|
) => {
|
|
handlers.set(event, handler);
|
|
},
|
|
registerCommand: (
|
|
name: string,
|
|
definition: { handler: (args: string, ctx: unknown) => Promise<void> },
|
|
) => {
|
|
commands.set(name, definition);
|
|
},
|
|
registerTool: () => {},
|
|
sendUserMessage: (
|
|
content: string | Array<{ type: string; text?: string }>,
|
|
) => {
|
|
const promptBlocks = content as Array<{ type: string; text?: string }>;
|
|
runtimeEvents.push(`dispatch:${promptBlocks[0]?.text ?? ""}`);
|
|
firstDispatchResolved = true;
|
|
},
|
|
getThinkingLevel: () => "medium",
|
|
} as never;
|
|
const originalFetch = globalThis.fetch;
|
|
let getUpdatesCalls = 0;
|
|
globalThis.fetch = async (input, init) => {
|
|
const url = typeof input === "string" ? input : input.toString();
|
|
const method = url.split("/").at(-1) ?? "";
|
|
const body =
|
|
typeof init?.body === "string"
|
|
? (JSON.parse(init.body) as Record<string, unknown>)
|
|
: undefined;
|
|
if (method === "deleteWebhook") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "getUpdates") {
|
|
getUpdatesCalls += 1;
|
|
if (getUpdatesCalls === 1) {
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 1,
|
|
message: {
|
|
message_id: 23,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "first request",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response;
|
|
}
|
|
if (getUpdatesCalls === 2) return secondUpdates;
|
|
if (getUpdatesCalls === 3) return thirdUpdates;
|
|
throw new DOMException("stop", "AbortError");
|
|
}
|
|
if (method === "sendMessage") {
|
|
runtimeEvents.push(`send:${String(body?.text ?? "")}`);
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: { message_id: 100 + runtimeEvents.length },
|
|
}),
|
|
} as Response;
|
|
}
|
|
if (method === "sendChatAction") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
throw new Error(`Unexpected Telegram API method: ${method}`);
|
|
};
|
|
try {
|
|
await mkdir(agentDir, { recursive: true });
|
|
await writeFile(
|
|
configPath,
|
|
JSON.stringify(
|
|
{ botToken: "123:abc", allowedUserId: 77, lastUpdateId: 0 },
|
|
null,
|
|
"\t",
|
|
) + "\n",
|
|
"utf8",
|
|
);
|
|
telegramExtension(pi);
|
|
const baseCtx = {
|
|
hasUI: true,
|
|
cwd: process.cwd(),
|
|
model: modelA,
|
|
signal: undefined,
|
|
ui: {
|
|
theme: {
|
|
fg: (_token: string, text: string) => text,
|
|
},
|
|
setStatus: () => {},
|
|
notify: () => {},
|
|
},
|
|
sessionManager: {
|
|
getEntries: () => [],
|
|
},
|
|
modelRegistry: {
|
|
refresh: () => {},
|
|
getAvailable: () => [modelA, modelB],
|
|
isUsingOAuth: () => false,
|
|
},
|
|
getContextUsage: () => undefined,
|
|
hasPendingMessages: () => false,
|
|
abort: () => {},
|
|
};
|
|
const idleCtx = {
|
|
...baseCtx,
|
|
isIdle: () => true,
|
|
} as never;
|
|
const activeCtx = {
|
|
...baseCtx,
|
|
isIdle: () => false,
|
|
} as never;
|
|
await handlers.get("session_start")?.({}, idleCtx);
|
|
await commands.get("telegram-connect")?.handler("", idleCtx);
|
|
await waitForCondition(() => firstDispatchResolved);
|
|
await handlers.get("agent_start")?.({}, activeCtx);
|
|
secondUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 2,
|
|
message: {
|
|
message_id: 24,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "/model",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() => getUpdatesCalls >= 3);
|
|
thirdUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 3,
|
|
message: {
|
|
message_id: 25,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "follow up after model",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() => runtimeEvents.length >= 1);
|
|
await handlers.get("agent_end")?.(
|
|
{
|
|
messages: [
|
|
{
|
|
role: "assistant",
|
|
content: [{ type: "text", text: "" }],
|
|
},
|
|
],
|
|
},
|
|
idleCtx,
|
|
);
|
|
await waitForCondition(() => runtimeEvents.length >= 3);
|
|
assert.equal(runtimeEvents[0], "dispatch:[telegram] first request");
|
|
assert.equal(runtimeEvents[1], "send:<b>Choose a model:</b>");
|
|
assert.equal(runtimeEvents[2], "dispatch:[telegram] follow up after model");
|
|
await handlers.get("session_shutdown")?.({}, idleCtx);
|
|
} finally {
|
|
globalThis.fetch = originalFetch;
|
|
if (previousConfig === undefined) {
|
|
await rm(configPath, { force: true });
|
|
} else {
|
|
await writeFile(configPath, previousConfig, "utf8");
|
|
}
|
|
}
|
|
});
|
|
|
|
test("Extension runtime keeps queued turns blocked until compaction completes", async () => {
|
|
const agentDir = join(homedir(), ".pi", "agent");
|
|
const configPath = join(agentDir, "telegram.json");
|
|
const previousConfig = await readFile(configPath, "utf8").catch(
|
|
() => undefined,
|
|
);
|
|
const handlers = new Map<
|
|
string,
|
|
(event: unknown, ctx: unknown) => Promise<unknown>
|
|
>();
|
|
const commands = new Map<
|
|
string,
|
|
{ handler: (args: string, ctx: unknown) => Promise<void> }
|
|
>();
|
|
const runtimeEvents: string[] = [];
|
|
let compactHooks:
|
|
| {
|
|
onComplete: () => void;
|
|
onError: (error: unknown) => void;
|
|
}
|
|
| undefined;
|
|
let secondUpdatesResolve: ((value: Response) => void) | undefined;
|
|
const secondUpdates = new Promise<Response>((resolve) => {
|
|
secondUpdatesResolve = resolve;
|
|
});
|
|
const pi = {
|
|
on: (
|
|
event: string,
|
|
handler: (event: unknown, ctx: unknown) => Promise<unknown>,
|
|
) => {
|
|
handlers.set(event, handler);
|
|
},
|
|
registerCommand: (
|
|
name: string,
|
|
definition: { handler: (args: string, ctx: unknown) => Promise<void> },
|
|
) => {
|
|
commands.set(name, definition);
|
|
},
|
|
registerTool: () => {},
|
|
sendUserMessage: (
|
|
content: string | Array<{ type: string; text?: string }>,
|
|
) => {
|
|
const promptBlocks = content as Array<{ type: string; text?: string }>;
|
|
runtimeEvents.push(`dispatch:${promptBlocks[0]?.text ?? ""}`);
|
|
},
|
|
getThinkingLevel: () => "medium",
|
|
} as never;
|
|
const originalFetch = globalThis.fetch;
|
|
let getUpdatesCalls = 0;
|
|
globalThis.fetch = async (input, init) => {
|
|
const url = typeof input === "string" ? input : input.toString();
|
|
const method = url.split("/").at(-1) ?? "";
|
|
const body =
|
|
typeof init?.body === "string"
|
|
? (JSON.parse(init.body) as Record<string, unknown>)
|
|
: undefined;
|
|
if (method === "deleteWebhook") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "getUpdates") {
|
|
getUpdatesCalls += 1;
|
|
if (getUpdatesCalls === 1) {
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 1,
|
|
message: {
|
|
message_id: 30,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "/compact",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response;
|
|
}
|
|
if (getUpdatesCalls === 2) {
|
|
return secondUpdates;
|
|
}
|
|
throw new DOMException("stop", "AbortError");
|
|
}
|
|
if (method === "sendMessage") {
|
|
runtimeEvents.push(`send:${String(body?.text ?? "")}`);
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: { message_id: 100 + runtimeEvents.length },
|
|
}),
|
|
} as Response;
|
|
}
|
|
throw new Error(`Unexpected Telegram API method: ${method}`);
|
|
};
|
|
try {
|
|
await mkdir(agentDir, { recursive: true });
|
|
await writeFile(
|
|
configPath,
|
|
JSON.stringify(
|
|
{ botToken: "123:abc", allowedUserId: 77, lastUpdateId: 0 },
|
|
null,
|
|
"\t",
|
|
) + "\n",
|
|
"utf8",
|
|
);
|
|
telegramExtension(pi);
|
|
const ctx = {
|
|
hasUI: true,
|
|
model: undefined,
|
|
signal: undefined,
|
|
ui: {
|
|
theme: {
|
|
fg: (_token: string, text: string) => text,
|
|
},
|
|
setStatus: () => {},
|
|
notify: () => {},
|
|
},
|
|
isIdle: () => true,
|
|
hasPendingMessages: () => false,
|
|
abort: () => {},
|
|
compact: (hooks: {
|
|
onComplete: () => void;
|
|
onError: (error: unknown) => void;
|
|
}) => {
|
|
compactHooks = hooks;
|
|
runtimeEvents.push("compact:start");
|
|
},
|
|
} as never;
|
|
await handlers.get("session_start")?.({}, ctx);
|
|
await commands.get("telegram-connect")?.handler("", ctx);
|
|
await waitForCondition(() => runtimeEvents.includes("compact:start"));
|
|
assert.equal(runtimeEvents.includes("send:Compaction started."), true);
|
|
secondUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 2,
|
|
message: {
|
|
message_id: 31,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "follow up after compaction",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() => getUpdatesCalls >= 3);
|
|
assert.equal(
|
|
runtimeEvents.some(
|
|
(event) => event === "dispatch:[telegram] follow up after compaction",
|
|
),
|
|
false,
|
|
);
|
|
compactHooks?.onComplete();
|
|
await waitForCondition(() =>
|
|
runtimeEvents.includes("dispatch:[telegram] follow up after compaction"),
|
|
);
|
|
await waitForCondition(() =>
|
|
runtimeEvents.includes("send:Compaction completed."),
|
|
);
|
|
await handlers.get("session_shutdown")?.({}, ctx);
|
|
} finally {
|
|
globalThis.fetch = originalFetch;
|
|
if (previousConfig === undefined) {
|
|
await rm(configPath, { force: true });
|
|
} else {
|
|
await writeFile(configPath, previousConfig, "utf8");
|
|
}
|
|
}
|
|
});
|
|
|
|
test("Extension runtime coalesces media-group updates into one delayed dispatch", async () => {
|
|
const agentDir = join(homedir(), ".pi", "agent");
|
|
const configPath = join(agentDir, "telegram.json");
|
|
const previousConfig = await readFile(configPath, "utf8").catch(
|
|
() => undefined,
|
|
);
|
|
const handlers = new Map<
|
|
string,
|
|
(event: unknown, ctx: unknown) => Promise<unknown>
|
|
>();
|
|
const commands = new Map<
|
|
string,
|
|
{ handler: (args: string, ctx: unknown) => Promise<void> }
|
|
>();
|
|
const runtimeEvents: string[] = [];
|
|
const pi = {
|
|
on: (
|
|
event: string,
|
|
handler: (event: unknown, ctx: unknown) => Promise<unknown>,
|
|
) => {
|
|
handlers.set(event, handler);
|
|
},
|
|
registerCommand: (
|
|
name: string,
|
|
definition: { handler: (args: string, ctx: unknown) => Promise<void> },
|
|
) => {
|
|
commands.set(name, definition);
|
|
},
|
|
registerTool: () => {},
|
|
sendUserMessage: (
|
|
content: string | Array<{ type: string; text?: string }>,
|
|
) => {
|
|
const promptBlocks = content as Array<{ type: string; text?: string }>;
|
|
runtimeEvents.push(`dispatch:${promptBlocks[0]?.text ?? ""}`);
|
|
},
|
|
getThinkingLevel: () => "medium",
|
|
} as never;
|
|
const originalFetch = globalThis.fetch;
|
|
let getUpdatesCalls = 0;
|
|
globalThis.fetch = async (input) => {
|
|
const url = typeof input === "string" ? input : input.toString();
|
|
const method = url.split("/").at(-1) ?? "";
|
|
if (method === "deleteWebhook") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "getUpdates") {
|
|
getUpdatesCalls += 1;
|
|
if (getUpdatesCalls === 1) {
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 1,
|
|
message: {
|
|
message_id: 40,
|
|
media_group_id: "album-1",
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
caption: "first caption",
|
|
},
|
|
},
|
|
{
|
|
_: "other",
|
|
update_id: 2,
|
|
message: {
|
|
message_id: 41,
|
|
media_group_id: "album-1",
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
caption: "second caption",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response;
|
|
}
|
|
throw new DOMException("stop", "AbortError");
|
|
}
|
|
throw new Error(`Unexpected Telegram API method: ${method}`);
|
|
};
|
|
try {
|
|
await mkdir(agentDir, { recursive: true });
|
|
await writeFile(
|
|
configPath,
|
|
JSON.stringify(
|
|
{ botToken: "123:abc", allowedUserId: 77, lastUpdateId: 0 },
|
|
null,
|
|
"\t",
|
|
) + "\n",
|
|
"utf8",
|
|
);
|
|
telegramExtension(pi);
|
|
const ctx = {
|
|
hasUI: true,
|
|
model: undefined,
|
|
signal: undefined,
|
|
ui: {
|
|
theme: {
|
|
fg: (_token: string, text: string) => text,
|
|
},
|
|
setStatus: () => {},
|
|
notify: () => {},
|
|
},
|
|
isIdle: () => true,
|
|
hasPendingMessages: () => false,
|
|
abort: () => {},
|
|
} as never;
|
|
await handlers.get("session_start")?.({}, ctx);
|
|
await commands.get("telegram-connect")?.handler("", ctx);
|
|
await new Promise((resolve) => setTimeout(resolve, 300));
|
|
assert.equal(runtimeEvents.length, 0);
|
|
await waitForCondition(() => runtimeEvents.length === 1, 2500);
|
|
assert.equal(
|
|
runtimeEvents[0],
|
|
"dispatch:[telegram] first caption\n\nsecond caption",
|
|
);
|
|
await handlers.get("session_shutdown")?.({}, ctx);
|
|
} finally {
|
|
globalThis.fetch = originalFetch;
|
|
if (previousConfig === undefined) {
|
|
await rm(configPath, { force: true });
|
|
} else {
|
|
await writeFile(configPath, previousConfig, "utf8");
|
|
}
|
|
}
|
|
});
|
|
|
|
test("Extension runtime applies reaction priority and removal before the next dispatch", async () => {
|
|
const agentDir = join(homedir(), ".pi", "agent");
|
|
const configPath = join(agentDir, "telegram.json");
|
|
const previousConfig = await readFile(configPath, "utf8").catch(
|
|
() => undefined,
|
|
);
|
|
const handlers = new Map<
|
|
string,
|
|
(event: unknown, ctx: unknown) => Promise<unknown>
|
|
>();
|
|
const commands = new Map<
|
|
string,
|
|
{ handler: (args: string, ctx: unknown) => Promise<void> }
|
|
>();
|
|
const runtimeEvents: string[] = [];
|
|
let firstDispatchResolved = false;
|
|
let secondUpdatesResolve: ((value: Response) => void) | undefined;
|
|
let thirdUpdatesResolve: ((value: Response) => void) | undefined;
|
|
let fourthUpdatesResolve: ((value: Response) => void) | undefined;
|
|
let fifthUpdatesResolve: ((value: Response) => void) | undefined;
|
|
const secondUpdates = new Promise<Response>((resolve) => {
|
|
secondUpdatesResolve = resolve;
|
|
});
|
|
const thirdUpdates = new Promise<Response>((resolve) => {
|
|
thirdUpdatesResolve = resolve;
|
|
});
|
|
const fourthUpdates = new Promise<Response>((resolve) => {
|
|
fourthUpdatesResolve = resolve;
|
|
});
|
|
const fifthUpdates = new Promise<Response>((resolve) => {
|
|
fifthUpdatesResolve = resolve;
|
|
});
|
|
const pi = {
|
|
on: (
|
|
event: string,
|
|
handler: (event: unknown, ctx: unknown) => Promise<unknown>,
|
|
) => {
|
|
handlers.set(event, handler);
|
|
},
|
|
registerCommand: (
|
|
name: string,
|
|
definition: { handler: (args: string, ctx: unknown) => Promise<void> },
|
|
) => {
|
|
commands.set(name, definition);
|
|
},
|
|
registerTool: () => {},
|
|
sendUserMessage: (
|
|
content: string | Array<{ type: string; text?: string }>,
|
|
) => {
|
|
const promptBlocks = content as Array<{ type: string; text?: string }>;
|
|
runtimeEvents.push(`dispatch:${promptBlocks[0]?.text ?? ""}`);
|
|
firstDispatchResolved = true;
|
|
},
|
|
getThinkingLevel: () => "medium",
|
|
} as never;
|
|
const originalFetch = globalThis.fetch;
|
|
let getUpdatesCalls = 0;
|
|
globalThis.fetch = async (input) => {
|
|
const url = typeof input === "string" ? input : input.toString();
|
|
const method = url.split("/").at(-1) ?? "";
|
|
if (method === "deleteWebhook") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "getUpdates") {
|
|
getUpdatesCalls += 1;
|
|
if (getUpdatesCalls === 1) {
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 1,
|
|
message: {
|
|
message_id: 30,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "first request",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response;
|
|
}
|
|
if (getUpdatesCalls === 2) return secondUpdates;
|
|
if (getUpdatesCalls === 3) return thirdUpdates;
|
|
if (getUpdatesCalls === 4) return fourthUpdates;
|
|
if (getUpdatesCalls === 5) return fifthUpdates;
|
|
throw new DOMException("stop", "AbortError");
|
|
}
|
|
if (method === "sendChatAction") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
throw new Error(`Unexpected Telegram API method: ${method}`);
|
|
};
|
|
try {
|
|
await mkdir(agentDir, { recursive: true });
|
|
await writeFile(
|
|
configPath,
|
|
JSON.stringify(
|
|
{ botToken: "123:abc", allowedUserId: 77, lastUpdateId: 0 },
|
|
null,
|
|
"\t",
|
|
) + "\n",
|
|
"utf8",
|
|
);
|
|
telegramExtension(pi);
|
|
const baseCtx = {
|
|
hasUI: true,
|
|
model: undefined,
|
|
signal: undefined,
|
|
ui: {
|
|
theme: {
|
|
fg: (_token: string, text: string) => text,
|
|
},
|
|
setStatus: () => {},
|
|
notify: () => {},
|
|
},
|
|
hasPendingMessages: () => false,
|
|
abort: () => {},
|
|
};
|
|
const idleCtx = {
|
|
...baseCtx,
|
|
isIdle: () => true,
|
|
} as never;
|
|
const activeCtx = {
|
|
...baseCtx,
|
|
isIdle: () => false,
|
|
} as never;
|
|
await handlers.get("session_start")?.({}, idleCtx);
|
|
await commands.get("telegram-connect")?.handler("", idleCtx);
|
|
await waitForCondition(() => firstDispatchResolved);
|
|
await handlers.get("agent_start")?.({}, activeCtx);
|
|
secondUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 2,
|
|
message: {
|
|
message_id: 31,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "older waiting",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() => getUpdatesCalls >= 3);
|
|
thirdUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 3,
|
|
message: {
|
|
message_id: 32,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "newer waiting",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() => getUpdatesCalls >= 4);
|
|
fourthUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 4,
|
|
message_reaction: {
|
|
chat: { id: 99, type: "private" },
|
|
message_id: 32,
|
|
user: { id: 77, is_bot: false, first_name: "Test" },
|
|
old_reaction: [],
|
|
new_reaction: [{ type: "emoji", emoji: "👍" }],
|
|
date: 1,
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() => getUpdatesCalls >= 5);
|
|
fifthUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 5,
|
|
message_reaction: {
|
|
chat: { id: 99, type: "private" },
|
|
message_id: 31,
|
|
user: { id: 77, is_bot: false, first_name: "Test" },
|
|
old_reaction: [],
|
|
new_reaction: [{ type: "emoji", emoji: "👎" }],
|
|
date: 2,
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() => getUpdatesCalls >= 6);
|
|
await handlers.get("agent_end")?.(
|
|
{
|
|
messages: [
|
|
{
|
|
role: "assistant",
|
|
content: [{ type: "text", text: "" }],
|
|
},
|
|
],
|
|
},
|
|
idleCtx,
|
|
);
|
|
await waitForCondition(() => runtimeEvents.length === 2);
|
|
assert.equal(runtimeEvents[0], "dispatch:[telegram] first request");
|
|
assert.equal(runtimeEvents[1], "dispatch:[telegram] newer waiting");
|
|
await handlers.get("agent_start")?.({}, activeCtx);
|
|
await handlers.get("agent_end")?.(
|
|
{
|
|
messages: [
|
|
{
|
|
role: "assistant",
|
|
content: [{ type: "text", text: "" }],
|
|
},
|
|
],
|
|
},
|
|
idleCtx,
|
|
);
|
|
await new Promise((resolve) => setTimeout(resolve, 50));
|
|
assert.deepEqual(runtimeEvents, [
|
|
"dispatch:[telegram] first request",
|
|
"dispatch:[telegram] newer waiting",
|
|
]);
|
|
await handlers.get("session_shutdown")?.({}, idleCtx);
|
|
} finally {
|
|
globalThis.fetch = originalFetch;
|
|
if (previousConfig === undefined) {
|
|
await rm(configPath, { force: true });
|
|
} else {
|
|
await writeFile(configPath, previousConfig, "utf8");
|
|
}
|
|
}
|
|
});
|
|
|
|
test("Extension runtime switches model in flight and dispatches a continuation turn after abort", async () => {
|
|
const agentDir = join(homedir(), ".pi", "agent");
|
|
const configPath = join(agentDir, "telegram.json");
|
|
const previousConfig = await readFile(configPath, "utf8").catch(
|
|
() => undefined,
|
|
);
|
|
const handlers = new Map<
|
|
string,
|
|
(event: unknown, ctx: unknown) => Promise<unknown>
|
|
>();
|
|
const commands = new Map<
|
|
string,
|
|
{ handler: (args: string, ctx: unknown) => Promise<void> }
|
|
>();
|
|
const runtimeEvents: string[] = [];
|
|
const modelA = {
|
|
provider: "openai",
|
|
id: "gpt-a",
|
|
reasoning: true,
|
|
} as const;
|
|
const modelB = {
|
|
provider: "anthropic",
|
|
id: "claude-b",
|
|
reasoning: false,
|
|
} as const;
|
|
let idle = true;
|
|
let aborted = false;
|
|
const setModels: Array<string> = [];
|
|
let secondUpdatesResolve: ((value: Response) => void) | undefined;
|
|
let thirdUpdatesResolve: ((value: Response) => void) | undefined;
|
|
const secondUpdates = new Promise<Response>((resolve) => {
|
|
secondUpdatesResolve = resolve;
|
|
});
|
|
const thirdUpdates = new Promise<Response>((resolve) => {
|
|
thirdUpdatesResolve = resolve;
|
|
});
|
|
const pi = {
|
|
on: (
|
|
event: string,
|
|
handler: (event: unknown, ctx: unknown) => Promise<unknown>,
|
|
) => {
|
|
handlers.set(event, handler);
|
|
},
|
|
registerCommand: (
|
|
name: string,
|
|
definition: { handler: (args: string, ctx: unknown) => Promise<void> },
|
|
) => {
|
|
commands.set(name, definition);
|
|
},
|
|
registerTool: () => {},
|
|
sendUserMessage: (
|
|
content: string | Array<{ type: string; text?: string }>,
|
|
) => {
|
|
const promptBlocks = content as Array<{ type: string; text?: string }>;
|
|
runtimeEvents.push(`dispatch:${promptBlocks[0]?.text ?? ""}`);
|
|
},
|
|
getThinkingLevel: () => "medium",
|
|
setModel: async (model: { provider: string; id: string }) => {
|
|
setModels.push(`${model.provider}/${model.id}`);
|
|
return true;
|
|
},
|
|
setThinkingLevel: () => {},
|
|
} as never;
|
|
const originalFetch = globalThis.fetch;
|
|
let getUpdatesCalls = 0;
|
|
let nextMessageId = 100;
|
|
const callbackAnswers: string[] = [];
|
|
globalThis.fetch = async (input, init) => {
|
|
const url = typeof input === "string" ? input : input.toString();
|
|
const method = url.split("/").at(-1) ?? "";
|
|
const body =
|
|
typeof init?.body === "string"
|
|
? (JSON.parse(init.body) as Record<string, unknown>)
|
|
: undefined;
|
|
if (method === "deleteWebhook") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "getUpdates") {
|
|
getUpdatesCalls += 1;
|
|
if (getUpdatesCalls === 1) {
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 1,
|
|
message: {
|
|
message_id: 40,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "/model",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response;
|
|
}
|
|
if (getUpdatesCalls === 2) return secondUpdates;
|
|
if (getUpdatesCalls === 3) return thirdUpdates;
|
|
throw new DOMException("stop", "AbortError");
|
|
}
|
|
if (method === "sendMessage") {
|
|
runtimeEvents.push(`send:${String(body?.text ?? "")}`);
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: { message_id: nextMessageId++ },
|
|
}),
|
|
} as Response;
|
|
}
|
|
if (method === "editMessageText") {
|
|
runtimeEvents.push(`edit:${String(body?.text ?? "")}`);
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "answerCallbackQuery") {
|
|
callbackAnswers.push(String(body?.text ?? ""));
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "sendChatAction") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
throw new Error(`Unexpected Telegram API method: ${method}`);
|
|
};
|
|
try {
|
|
await mkdir(agentDir, { recursive: true });
|
|
await writeFile(
|
|
configPath,
|
|
JSON.stringify(
|
|
{ botToken: "123:abc", allowedUserId: 77, lastUpdateId: 0 },
|
|
null,
|
|
"\t",
|
|
) + "\n",
|
|
"utf8",
|
|
);
|
|
telegramExtension(pi);
|
|
const ctx = {
|
|
hasUI: true,
|
|
cwd: process.cwd(),
|
|
model: modelA,
|
|
signal: undefined,
|
|
ui: {
|
|
theme: {
|
|
fg: (_token: string, text: string) => text,
|
|
},
|
|
setStatus: () => {},
|
|
notify: () => {},
|
|
},
|
|
sessionManager: {
|
|
getEntries: () => [],
|
|
},
|
|
modelRegistry: {
|
|
refresh: () => {},
|
|
getAvailable: () => [modelA, modelB],
|
|
isUsingOAuth: () => false,
|
|
},
|
|
getContextUsage: () => undefined,
|
|
hasPendingMessages: () => false,
|
|
isIdle: () => idle,
|
|
abort: () => {
|
|
aborted = true;
|
|
},
|
|
} as never;
|
|
await handlers.get("session_start")?.({}, ctx);
|
|
await commands.get("telegram-connect")?.handler("", ctx);
|
|
await waitForCondition(() =>
|
|
runtimeEvents.some((event) => event === "send:<b>Choose a model:</b>"),
|
|
);
|
|
secondUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 2,
|
|
message: {
|
|
message_id: 41,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "first request",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() =>
|
|
runtimeEvents.some(
|
|
(event) => event === "dispatch:[telegram] first request",
|
|
),
|
|
);
|
|
idle = false;
|
|
await handlers.get("agent_start")?.({}, ctx);
|
|
thirdUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 3,
|
|
callback_query: {
|
|
id: "cb-1",
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
data: "model:pick:1",
|
|
message: {
|
|
message_id: 100,
|
|
chat: { id: 99, type: "private" },
|
|
},
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() => aborted);
|
|
assert.deepEqual(setModels, ["anthropic/claude-b"]);
|
|
assert.equal(
|
|
callbackAnswers.includes("Switching to claude-b and continuing…"),
|
|
true,
|
|
);
|
|
idle = true;
|
|
await handlers.get("agent_end")?.(
|
|
{
|
|
messages: [
|
|
{
|
|
role: "assistant",
|
|
stopReason: "aborted",
|
|
content: [{ type: "text", text: "" }],
|
|
},
|
|
],
|
|
},
|
|
ctx,
|
|
);
|
|
await waitForCondition(() =>
|
|
runtimeEvents.some((event) =>
|
|
event.includes(
|
|
"Continue the interrupted previous Telegram request using the newly selected model (anthropic/claude-b)",
|
|
),
|
|
),
|
|
);
|
|
assert.equal(
|
|
runtimeEvents.includes("dispatch:[telegram] first request"),
|
|
true,
|
|
);
|
|
assert.equal(
|
|
runtimeEvents.some((event) =>
|
|
event.includes(
|
|
"dispatch:[telegram] Continue the interrupted previous Telegram request using the newly selected model (anthropic/claude-b)",
|
|
),
|
|
),
|
|
true,
|
|
);
|
|
await handlers.get("session_shutdown")?.({}, ctx);
|
|
} finally {
|
|
globalThis.fetch = originalFetch;
|
|
if (previousConfig === undefined) {
|
|
await rm(configPath, { force: true });
|
|
} else {
|
|
await writeFile(configPath, previousConfig, "utf8");
|
|
}
|
|
}
|
|
});
|
|
|
|
test("Extension runtime delays model-switch abort until the active tool finishes", async () => {
|
|
const agentDir = join(homedir(), ".pi", "agent");
|
|
const configPath = join(agentDir, "telegram.json");
|
|
const previousConfig = await readFile(configPath, "utf8").catch(
|
|
() => undefined,
|
|
);
|
|
const handlers = new Map<
|
|
string,
|
|
(event: unknown, ctx: unknown) => Promise<unknown>
|
|
>();
|
|
const commands = new Map<
|
|
string,
|
|
{ handler: (args: string, ctx: unknown) => Promise<void> }
|
|
>();
|
|
const runtimeEvents: string[] = [];
|
|
const modelA = {
|
|
provider: "openai",
|
|
id: "gpt-a",
|
|
reasoning: true,
|
|
} as const;
|
|
const modelB = {
|
|
provider: "anthropic",
|
|
id: "claude-b",
|
|
reasoning: false,
|
|
} as const;
|
|
let idle = true;
|
|
let aborted = false;
|
|
const setModels: Array<string> = [];
|
|
let secondUpdatesResolve: ((value: Response) => void) | undefined;
|
|
let thirdUpdatesResolve: ((value: Response) => void) | undefined;
|
|
const secondUpdates = new Promise<Response>((resolve) => {
|
|
secondUpdatesResolve = resolve;
|
|
});
|
|
const thirdUpdates = new Promise<Response>((resolve) => {
|
|
thirdUpdatesResolve = resolve;
|
|
});
|
|
const pi = {
|
|
on: (
|
|
event: string,
|
|
handler: (event: unknown, ctx: unknown) => Promise<unknown>,
|
|
) => {
|
|
handlers.set(event, handler);
|
|
},
|
|
registerCommand: (
|
|
name: string,
|
|
definition: { handler: (args: string, ctx: unknown) => Promise<void> },
|
|
) => {
|
|
commands.set(name, definition);
|
|
},
|
|
registerTool: () => {},
|
|
sendUserMessage: (
|
|
content: string | Array<{ type: string; text?: string }>,
|
|
) => {
|
|
const promptBlocks = content as Array<{ type: string; text?: string }>;
|
|
runtimeEvents.push(`dispatch:${promptBlocks[0]?.text ?? ""}`);
|
|
},
|
|
getThinkingLevel: () => "medium",
|
|
setModel: async (model: { provider: string; id: string }) => {
|
|
setModels.push(`${model.provider}/${model.id}`);
|
|
return true;
|
|
},
|
|
setThinkingLevel: () => {},
|
|
} as never;
|
|
const originalFetch = globalThis.fetch;
|
|
let getUpdatesCalls = 0;
|
|
let nextMessageId = 100;
|
|
const callbackAnswers: string[] = [];
|
|
globalThis.fetch = async (input, init) => {
|
|
const url = typeof input === "string" ? input : input.toString();
|
|
const method = url.split("/").at(-1) ?? "";
|
|
const body =
|
|
typeof init?.body === "string"
|
|
? (JSON.parse(init.body) as Record<string, unknown>)
|
|
: undefined;
|
|
if (method === "deleteWebhook") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "getUpdates") {
|
|
getUpdatesCalls += 1;
|
|
if (getUpdatesCalls === 1) {
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 1,
|
|
message: {
|
|
message_id: 50,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "/model",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response;
|
|
}
|
|
if (getUpdatesCalls === 2) return secondUpdates;
|
|
if (getUpdatesCalls === 3) return thirdUpdates;
|
|
throw new DOMException("stop", "AbortError");
|
|
}
|
|
if (method === "sendMessage") {
|
|
runtimeEvents.push(`send:${String(body?.text ?? "")}`);
|
|
return {
|
|
json: async () => ({
|
|
ok: true,
|
|
result: { message_id: nextMessageId++ },
|
|
}),
|
|
} as Response;
|
|
}
|
|
if (method === "editMessageText") {
|
|
runtimeEvents.push(`edit:${String(body?.text ?? "")}`);
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "answerCallbackQuery") {
|
|
callbackAnswers.push(String(body?.text ?? ""));
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
if (method === "sendChatAction") {
|
|
return { json: async () => ({ ok: true, result: true }) } as Response;
|
|
}
|
|
throw new Error(`Unexpected Telegram API method: ${method}`);
|
|
};
|
|
try {
|
|
await mkdir(agentDir, { recursive: true });
|
|
await writeFile(
|
|
configPath,
|
|
JSON.stringify(
|
|
{ botToken: "123:abc", allowedUserId: 77, lastUpdateId: 0 },
|
|
null,
|
|
"\t",
|
|
) + "\n",
|
|
"utf8",
|
|
);
|
|
telegramExtension(pi);
|
|
const ctx = {
|
|
hasUI: true,
|
|
cwd: process.cwd(),
|
|
model: modelA,
|
|
signal: undefined,
|
|
ui: {
|
|
theme: {
|
|
fg: (_token: string, text: string) => text,
|
|
},
|
|
setStatus: () => {},
|
|
notify: () => {},
|
|
},
|
|
sessionManager: {
|
|
getEntries: () => [],
|
|
},
|
|
modelRegistry: {
|
|
refresh: () => {},
|
|
getAvailable: () => [modelA, modelB],
|
|
isUsingOAuth: () => false,
|
|
},
|
|
getContextUsage: () => undefined,
|
|
hasPendingMessages: () => false,
|
|
isIdle: () => idle,
|
|
abort: () => {
|
|
aborted = true;
|
|
},
|
|
} as never;
|
|
await handlers.get("session_start")?.({}, ctx);
|
|
await commands.get("telegram-connect")?.handler("", ctx);
|
|
await waitForCondition(() =>
|
|
runtimeEvents.some((event) => event === "send:<b>Choose a model:</b>"),
|
|
);
|
|
secondUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 2,
|
|
message: {
|
|
message_id: 51,
|
|
chat: { id: 99, type: "private" },
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
text: "first request",
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() =>
|
|
runtimeEvents.some(
|
|
(event) => event === "dispatch:[telegram] first request",
|
|
),
|
|
);
|
|
idle = false;
|
|
await handlers.get("agent_start")?.({}, ctx);
|
|
await handlers.get("tool_execution_start")?.({}, ctx);
|
|
thirdUpdatesResolve?.({
|
|
json: async () => ({
|
|
ok: true,
|
|
result: [
|
|
{
|
|
_: "other",
|
|
update_id: 3,
|
|
callback_query: {
|
|
id: "cb-2",
|
|
from: { id: 77, is_bot: false, first_name: "Test" },
|
|
data: "model:pick:1",
|
|
message: {
|
|
message_id: 100,
|
|
chat: { id: 99, type: "private" },
|
|
},
|
|
},
|
|
},
|
|
],
|
|
}),
|
|
} as Response);
|
|
await waitForCondition(() =>
|
|
callbackAnswers.includes(
|
|
"Switched to claude-b. Restarting after the current tool finishes…",
|
|
),
|
|
);
|
|
assert.deepEqual(setModels, ["anthropic/claude-b"]);
|
|
assert.equal(aborted, false);
|
|
await handlers.get("tool_execution_end")?.({}, ctx);
|
|
await waitForCondition(() => aborted);
|
|
idle = true;
|
|
await handlers.get("agent_end")?.(
|
|
{
|
|
messages: [
|
|
{
|
|
role: "assistant",
|
|
stopReason: "aborted",
|
|
content: [{ type: "text", text: "" }],
|
|
},
|
|
],
|
|
},
|
|
ctx,
|
|
);
|
|
await waitForCondition(() =>
|
|
runtimeEvents.some((event) =>
|
|
event.includes(
|
|
"dispatch:[telegram] Continue the interrupted previous Telegram request using the newly selected model (anthropic/claude-b)",
|
|
),
|
|
),
|
|
);
|
|
await handlers.get("session_shutdown")?.({}, ctx);
|
|
} finally {
|
|
globalThis.fetch = originalFetch;
|
|
if (previousConfig === undefined) {
|
|
await rm(configPath, { force: true });
|
|
} else {
|
|
await writeFile(configPath, previousConfig, "utf8");
|
|
}
|
|
}
|
|
});
|