Skip to content

双向流通信

通信流程

双向流通信时序图

Server 端发送消息

typescript
async function taskHandler(params: any, ctx: Context) {
  // 发送进度
  ctx.stream.send({
    type: 'progress',
    text: '处理中...',
    data: { percent: 50 },
  })

  // 发送问题(等待用户回答)
  ctx.stream.send({
    type: 'question',
    text: '选择选项',
    data: { questionId: 'q1', options: ['A', 'B', 'C'] },
  })

  // 等待回答
  for await (const msg of ctx.stream) {
    if (msg.type === 'answer' && msg.data.questionId === 'q1') {
      // 处理回答...
      break
    }
  }

  // 返回结果
  ctx.stream.send({ type: 'done', text: '完成', data: { result: 'success' } })
}

Client 端接收和发送

typescript
const stream = await client.call('task', params)

for await (const msg of stream) {
  if (msg.type === 'progress') {
    console.log('进度:', msg.text)
  }
  if (msg.type === 'question') {
    stream.send({
      type: 'answer',
      data: { questionId: msg.data.questionId, answer: 'A' },
    })
  }
  if (msg.type === 'done') {
    console.log('结果:', msg.data)
  }
}

流结束机制

Server Handler 执行完毕后,框架自动结束流,Client 的 for await 循环自动退出,无需手动 break

流式 LLM 输出

Server 端

typescript
async function chatHandler(params: { message: string }, ctx: Context) {
  let response = ''
  for await (const chunk of llm.stream(params.message, { signal: ctx.signal })) {
    response += chunk
    ctx.stream.send({ type: 'progress', text: chunk })
  }
  ctx.stream.send({ type: 'done', text: '完成', data: { response } })
}

Client 端

typescript
const stream = await client.call('chat', { message: 'Hello' })

let response = ''
for await (const msg of stream) {
  if (msg.type === 'progress') {
    response += msg.text
  }
  if (msg.type === 'done') {
    console.log('完整响应:', response)
  }
}

多轮问答

typescript
async function wizardHandler(params: any, ctx: Context) {
  // 第一个问题
  ctx.stream.send({
    type: 'question',
    text: '项目名称?',
    data: { questionId: 'name' },
  })

  let name = ''
  for await (const msg of ctx.stream) {
    if (msg.type === 'answer' && msg.data.questionId === 'name') {
      name = msg.data.answer
      break
    }
  }

  // 第二个问题
  ctx.stream.send({
    type: 'question',
    text: '选择模板',
    data: { questionId: 'tpl', options: ['react', 'vue'] },
  })

  let template = ''
  for await (const msg of ctx.stream) {
    if (msg.type === 'answer' && msg.data.questionId === 'tpl') {
      template = msg.data.answer
      break
    }
  }

  ctx.stream.send({ type: 'done', text: '完成', data: { name, template } })
}

进度追踪

typescript
async function batchHandler(params: { items: string[] }, ctx: Context) {
  const items = params.items
  const total = items.length

  for (let i = 0; i < total; i++) {
    ctx.stream.send({
      type: 'progress',
      text: `处理 ${i + 1}/${total}`,
      data: { current: i + 1, total, percent: Math.round(((i + 1) / total) * 100) },
    })
    await process(items[i])
  }

  ctx.stream.send({ type: 'done', text: '完成', data: { processed: total } })
}

消息类型

类型方向用途
progressServer → Client进度/流式输出
questionServer → Client向用户提问
answerClient → Server回答问题
doneServer → Client任务完成
errorServer → Client发生错误

MIT Licensed