外观
取消机制
Client 端取消
使用 AbortController
typescript
const controller = new AbortController()
const stream = await client.call('task', params, {
signal: controller.signal,
})
// 5 秒后取消
setTimeout(() => controller.abort(), 5000)
for await (const msg of stream) {
if (msg.type === 'progress') console.log(msg.text)
if (msg.type === 'done') console.log(msg.data)
}
// 取消后循环正常退出使用 stream.cancel()
typescript
const stream = await client.call('task', params)
// 取消
stream.cancel()Server 端处理取消
检查信号状态
typescript
async function batchHandler(params: { items: string[] }, ctx: Context) {
for (const item of params.items) {
if (ctx.signal.aborted) return // 提前退出
await process(item)
ctx.stream.send({ type: 'progress', text: `处理: ${item}` })
}
ctx.stream.send({ type: 'done', text: '完成' })
}传递信号给下游
typescript
async function fetchHandler(params: { url: string }, ctx: Context) {
// 传递给 fetch
const res = await fetch(params.url, { signal: ctx.signal })
// 传递给其他 Agent
const stream = await otherClient.call('skill', params, {
signal: ctx.signal,
})
// ...
}ctx.signal
将 ctx.signal 传递给 LLM 和下游调用,用户取消时立即中断,避免不必要的资源消耗。
监听取消事件
typescript
async function taskHandler(params: any, ctx: Context) {
ctx.signal.addEventListener('abort', () => {
console.log('收到取消,执行清理...')
cleanup()
})
await longTask()
ctx.stream.send({ type: 'done', text: '完成' })
}超时取消
typescript
const withTimeout = async <T>(
fn: (signal: AbortSignal) => Promise<T>,
ms: number
): Promise<T> => {
const controller = new AbortController()
const timeout = setTimeout(() => controller.abort(), ms)
try {
return await fn(controller.signal)
} finally {
clearTimeout(timeout)
}
}
// 30 秒超时
const stream = await withTimeout(
signal => client.call('task', params, { signal }),
30000
)取消时保存进度
typescript
async function batchHandler(params: { items: string[] }, ctx: Context) {
const results: string[] = []
for (const item of params.items) {
if (ctx.signal.aborted) {
await saveProgress(results) // 保存已处理的结果
return
}
const result = await process(item)
results.push(result)
}
ctx.stream.send({ type: 'done', text: '完成', data: { results } })
}取消信号链
多 Agent 场景中传播取消信号:
typescript
async function orchestrateHandler(params: any, ctx: Context) {
const chatClient = createAgentClient({
agentId: 'chat-agent',
address: 'a2a://localhost:50054',
})
// 传递信号到下游
const stream = await chatClient.call('chat', params, {
signal: ctx.signal,
metadata: { 'x-trace-id': ctx.getMetadata('x-trace-id') },
})
for await (const msg of stream) {
ctx.stream.send(msg) // 转发消息
}
}