Skip to Content
🎉 gRPCity 3.0 is released. Read more →
文档使用指南Async Stream

Async Stream

本页讲 gRPCity 推荐的流式 API:客户端、服务端都用 promise + for await 消费数据。如果需要老的 callback 形式,请先看 Stream 一节——两页共用 proto 与大部分 setup。

定义 proto

proto 定义了三种流式 RPC——client-stream、server-stream、bidi:

stream.proto
syntax="proto3"; package stream; service Hellor { rpc ClientStreamHello (stream Message) returns (Message) {} rpc ServerStreamHello (Message) returns (stream Message) {} rpc MutualStreamHello (stream Message) returns (stream Message) {} } message Message { string message = 1; }

加载 proto

./loader.js
import { ProtoLoader } from 'grpcity' import path from 'node:path' import { fileURLToPath } from 'node:url' // __dirname for esm const __dirname = path.dirname(fileURLToPath(import.meta.url)) export default new ProtoLoader({ location: path.resolve(__dirname, './'), files: ['stream.proto'] })

客户端

客户端的一些前期工作,如加载 proto,初始化客户端,创建新的 metadata.

./client.js
import loader from "./loader.js" const start = async (addr) => { await loader.init() const clients = await loader.initClients({ services: { 'stream.Hellor': addr } }) const client = clients.get('stream.Hellor') const meta = loader.makeMetadata({ 'x-cache-control': 'max-age=100', 'x-business-id': ['grpcity', 'testing'], 'x-timestamp-client': 'begin=' + new Date().toISOString() }) } start('localhost:9097')

流对点

流对点,指client stream request -> server response

我们继续在start方法里流对点的流程:

const start = async (addr) => { // ... // stream client to server const clientStreamHelloCall = await client.clientStreamHello(meta) // send stream message to server clientStreamHelloCall.write({ message: 'Hello!' }) clientStreamHelloCall.write({ message: 'How are you?' }) // end to send message and get server response const writeResult = await clientStreamHelloCall.writeEnd() console.log(writeResult) // ... }

运行结果如下,我们看到writeEnd()返回了这次调用服务端的返回结果{ status, metadata, response },同时也把stream message发送到了服务端。

Terminal
// client { metadata: Metadata { internalRepr: Map(8) { 'content-type' => [Array], 'x-cache-control' => [Array], 'x-business-id' => [Array], 'x-timestamp-client' => [Array], 'x-client-hostname' => [Array], 'user-agent' => [Array], 'x-timestamp-server' => [Array], 'date' => [Array] }, options: {} }, response: { message: "Hello! I'm fine, thank you!" }, status: { code: 0, details: 'OK', metadata: Metadata { internalRepr: Map(0) {}, options: {} } } } // server { message: 'Hello!' } { message: 'How are you?' }

点对流

点对流,指client request -> server stream response

我们继续在start方法里点对流的流程:

const start = async (addr) => { // ... const serverStreamHelloCall = await client.serverStreamHello({ message: 'Hello! How are you?' }, meta) const serverReadAllResult = serverStreamHelloCall.readAll() for await (const data of serverReadAllResult) { console.log(data) } const serverReadEndResult = await serverStreamHelloCall.readEnd() console.log(serverReadEndResult) // ... }

运行结果如下:

我们看到readAll()返回了一个asyncIterator,需要我们使用for await获取服务端返回的逐条stream message结果。 而readEnd()返回了{ status, metadata },同时也看到服务端的打印记录,也接收到了客户端发送的message

Terminal
// client { message: 'Hello! I got you message.' } { message: "I'm fine, thank you" } { metadata: Metadata { internalRepr: Map(8) { 'content-type' => [Array], 'x-cache-control' => [Array], 'x-business-id' => [Array], 'x-timestamp-client' => [Array], 'x-client-hostname' => [Array], 'user-agent' => [Array], 'x-timestamp-server' => [Array], 'date' => [Array] }, options: {} }, status: { code: 0, details: 'OK', metadata: Metadata { internalRepr: Map(0) {}, options: {} } } } // server { message: 'Hello! How are you?' }

流对流

流对流,指client stream request -> server stream response

我们继续在start方法里流对流的流程。我们先调用writeAll()发送3条消息,然后再调用write()发送1条消息,最后调用readAll()获取服务端返回的结果。

const start = async (addr) => { // ... const mutualStreamHelloCall = await client.mutualStreamHello(meta) mutualStreamHelloCall.writeAll([ { message: 'Hello!' }, { message: 'How are you?' }, { message: 'other thing x' } ]) mutualStreamHelloCall.write({ message: 'maybe' }) const mutualReadAllResult = mutualStreamHelloCall.readAll() for await (const data of mutualReadAllResult) { if (data.message === 'delay 1s') { mutualStreamHelloCall.write({ message: 'ok, I known you delay 1s' }) mutualStreamHelloCall.writeEnd() } console.log(data) } const mutualReadEndResult = await mutualStreamHelloCall.readEnd() console.log(mutualReadEndResult) // .. }

运行结果如下,我们看到我们是先客户端发送消息完给服务端后,调用readAll(),获取asyncIterator,需要我们使用for await获取服务端返回的逐条stream message结果。 最后,调用readEnd()获取服务端结束流信息返回的{ status, metadata }

Terminal
// client { message: 'emmm...' } { message: 'Hello too.' } { message: "I'm fine, thank you" } { message: 'pardon?' } { message: 'pardon?' } { message: 'delay 1s' } { metadata: Metadata { internalRepr: Map(8) { 'content-type' => [Array], 'x-cache-control' => [Array], 'x-business-id' => [Array], 'x-timestamp-client' => [Array], 'x-client-hostname' => [Array], 'user-agent' => [Array], 'x-timestamp-server' => [Array], 'date' => [Array] }, options: {} }, status: { code: 0, details: 'OK', metadata: Metadata { internalRepr: Map(0) {}, options: {} } } } // server Hello! How are you? other thing x maybe client call end.

服务端

我们在这里依次实现服务端 stream 的交互功能。下面是我们的服务端启动函数,我们只需要使用 gRPCity 提供了新能力实现 Stream 类的方法即可。

./server.js
import loader from "./loader.js" const start = async (addr) => { await loader.init() const server = await loader.initServer() server.add('stream.Hellor', new Stream()) await server.listen(addr) console.log('start:', addr) } start('localhost:9097')

流对点

客户端是流信息发送,服务端只需要正常读,处理完后返回结果。 服务端读流信息的接口是call.readAll(),返回的是一个asyncIterator对象,需要我们使用for await逐条读取。

class Stream { // ... async clientStreamHello (call) { const metadata = call.metadata.clone() metadata.add('x-timestamp-server', 'received=' + new Date().toISOString()) call.sendMetadata(metadata) for await (const data of call.readAll()) { console.log(data) } return { message: "Hello! I'm fine, thank you!" } } // ... }

点对流

服务端接收客户端发送的请求后,然后不断发送流信息给客户端直至函数处理完成。 call.write()call.writeAll()call.end() 提供了服务端完成流信息发送的功能。

  • call.write(): 发送一条信息到客户端;
  • call.writeAll(): 发送多条信息到客户端;
  • call.end(): 服务端发送信息已结束,并通知客户端;
class Stream { // ... async serverStreamHello (call) { const metadata = call.metadata.clone() metadata.add('x-timestamp-server', 'received=' + new Date().toISOString()) call.sendMetadata(metadata) console.log(call.request.message) call.write({ message: 'Hello! I got you message.' }) call.write({ message: "I'm fine, thank you" }) call.writeAll([ { message: 'other thing x' }, { message: 'other thing y' } ]) call.end() } // ... }

流对流

客户端和服务端都采用流的方式进行交互。 call.write()call.writeAll()call.readAll()call.end() 提供了服务端完成流信息发送的功能。

  • call.write(): 发送一条信息到客户端;
  • call.writeAll(): 发送多条信息到客户端;
  • call.readAll(): 返回一个asyncIterator对象,需要我们使用for await逐条读取客户端返回的信息;
  • call.end(): 服务端发送信息已结束,并通知客户端;
class Stream { // ... async mutualStreamHello (call) { const metadata = call.metadata.clone() metadata.add('x-timestamp-server', 'received=' + new Date().toISOString()) call.sendMetadata(metadata) call.write({ message: 'emmm...' }) for await (const data of call.readAll()) { console.log(data.message) if (data.message === 'Hello!') { call.write({ message: 'Hello too.' }) } else if (data.message === 'How are you?') { call.write({ message: 'I\'m fine, thank you' }) await timeout(1000) call.write({ message: 'delay 1s' }) call.writeAll([ { message: 'emm... ' }, { message: 'emm......' } ]) } else { call.write({ message: 'pardon?' }) } } call.end() } // ... }

注意:

  • async stream 支持中间件;
  • call proxy 中的 readAllwriteAllwriteEnd 需要是 async func;
  • 要注意区分是 流对点、点对流、还是流对流;
Last updated on