Skip to content

取消机制

Client 端取消

使用 AbortController

typescript
const controller = new AbortController()

const stream = await client.call('task', params, {
  signal: controller.signal,
})

// 5 秒后取消
setTimeout(() => controller.abort(), 5000)

for await (const msg of stream) {
  if (msg.type === 'progress') console.log(msg.text)
  if (msg.type === 'done') console.log(msg.data)
}
// 取消后循环正常退出

使用 stream.cancel()

typescript
const stream = await client.call('task', params)

// 取消
stream.cancel()

Server 端处理取消

检查信号状态

typescript
async function batchHandler(params: { items: string[] }, ctx: Context) {
  for (const item of params.items) {
    if (ctx.signal.aborted) return  // 提前退出
    await process(item)
    ctx.stream.send({ type: 'progress', text: `处理: ${item}` })
  }
  ctx.stream.send({ type: 'done', text: '完成' })
}

传递信号给下游

typescript
async function fetchHandler(params: { url: string }, ctx: Context) {
  // 传递给 fetch
  const res = await fetch(params.url, { signal: ctx.signal })

  // 传递给其他 Agent
  const stream = await otherClient.call('skill', params, {
    signal: ctx.signal,
  })

  // ...
}

ctx.signal

ctx.signal 传递给 LLM 和下游调用,用户取消时立即中断,避免不必要的资源消耗。

监听取消事件

typescript
async function taskHandler(params: any, ctx: Context) {
  ctx.signal.addEventListener('abort', () => {
    console.log('收到取消,执行清理...')
    cleanup()
  })

  await longTask()
  ctx.stream.send({ type: 'done', text: '完成' })
}

超时取消

typescript
const withTimeout = async <T>(
  fn: (signal: AbortSignal) => Promise<T>,
  ms: number
): Promise<T> => {
  const controller = new AbortController()
  const timeout = setTimeout(() => controller.abort(), ms)
  try {
    return await fn(controller.signal)
  } finally {
    clearTimeout(timeout)
  }
}

// 30 秒超时
const stream = await withTimeout(
  signal => client.call('task', params, { signal }),
  30000
)

取消时保存进度

typescript
async function batchHandler(params: { items: string[] }, ctx: Context) {
  const results: string[] = []

  for (const item of params.items) {
    if (ctx.signal.aborted) {
      await saveProgress(results)  // 保存已处理的结果
      return
    }
    const result = await process(item)
    results.push(result)
  }

  ctx.stream.send({ type: 'done', text: '完成', data: { results } })
}

取消信号链

多 Agent 场景中传播取消信号:

typescript
async function orchestrateHandler(params: any, ctx: Context) {
  const chatClient = createAgentClient({
    agentId: 'chat-agent',
    address: 'a2a://localhost:50054',
  })

  // 传递信号到下游
  const stream = await chatClient.call('chat', params, {
    signal: ctx.signal,
    metadata: { 'x-trace-id': ctx.getMetadata('x-trace-id') },
  })

  for await (const msg of stream) {
    ctx.stream.send(msg)  // 转发消息
  }
}

MIT Licensed