外观
双向流通信
通信流程
Server 端发送消息
typescript
async function taskHandler(params: any, ctx: Context) {
// 发送进度
ctx.stream.send({
type: 'progress',
text: '处理中...',
data: { percent: 50 },
})
// 发送问题(等待用户回答)
ctx.stream.send({
type: 'question',
text: '选择选项',
data: { questionId: 'q1', options: ['A', 'B', 'C'] },
})
// 等待回答
for await (const msg of ctx.stream) {
if (msg.type === 'answer' && msg.data.questionId === 'q1') {
// 处理回答...
break
}
}
// 返回结果
ctx.stream.send({ type: 'done', text: '完成', data: { result: 'success' } })
}Client 端接收和发送
typescript
const stream = await client.call('task', params)
for await (const msg of stream) {
if (msg.type === 'progress') {
console.log('进度:', msg.text)
}
if (msg.type === 'question') {
stream.send({
type: 'answer',
data: { questionId: msg.data.questionId, answer: 'A' },
})
}
if (msg.type === 'done') {
console.log('结果:', msg.data)
}
}流结束机制
Server Handler 执行完毕后,框架自动结束流,Client 的 for await 循环自动退出,无需手动 break。
流式 LLM 输出
Server 端
typescript
async function chatHandler(params: { message: string }, ctx: Context) {
let response = ''
for await (const chunk of llm.stream(params.message, { signal: ctx.signal })) {
response += chunk
ctx.stream.send({ type: 'progress', text: chunk })
}
ctx.stream.send({ type: 'done', text: '完成', data: { response } })
}Client 端
typescript
const stream = await client.call('chat', { message: 'Hello' })
let response = ''
for await (const msg of stream) {
if (msg.type === 'progress') {
response += msg.text
}
if (msg.type === 'done') {
console.log('完整响应:', response)
}
}多轮问答
typescript
async function wizardHandler(params: any, ctx: Context) {
// 第一个问题
ctx.stream.send({
type: 'question',
text: '项目名称?',
data: { questionId: 'name' },
})
let name = ''
for await (const msg of ctx.stream) {
if (msg.type === 'answer' && msg.data.questionId === 'name') {
name = msg.data.answer
break
}
}
// 第二个问题
ctx.stream.send({
type: 'question',
text: '选择模板',
data: { questionId: 'tpl', options: ['react', 'vue'] },
})
let template = ''
for await (const msg of ctx.stream) {
if (msg.type === 'answer' && msg.data.questionId === 'tpl') {
template = msg.data.answer
break
}
}
ctx.stream.send({ type: 'done', text: '完成', data: { name, template } })
}进度追踪
typescript
async function batchHandler(params: { items: string[] }, ctx: Context) {
const items = params.items
const total = items.length
for (let i = 0; i < total; i++) {
ctx.stream.send({
type: 'progress',
text: `处理 ${i + 1}/${total}`,
data: { current: i + 1, total, percent: Math.round(((i + 1) / total) * 100) },
})
await process(items[i])
}
ctx.stream.send({ type: 'done', text: '完成', data: { processed: total } })
}消息类型
| 类型 | 方向 | 用途 |
|---|---|---|
progress | Server → Client | 进度/流式输出 |
question | Server → Client | 向用户提问 |
answer | Client → Server | 回答问题 |
done | Server → Client | 任务完成 |
error | Server → Client | 发生错误 |