Skip to Content
🎉 gRPCity 3.0 is released. Read more →

Stream

gRPC 的流式 RPC 适合 unary 不擅长的场景:文件上传、事件流、进度回报、指标上报。gRPCity 支持三种流式形态——client stream、server stream、bidi——并提供两种 API 风格。

本页讲基于 proxy.callcallback / 事件 风格。更顺手的 for await 形式见 Async Stream 指南。

前置工作

我们的 proto 文件里的 servicerpc,需要标记清楚 使用了stream,如下所示:

stream.proto
syntax="proto3"; package stream; service Hellor { rpc ClientStreamHello (stream Message) returns (Message) {} rpc ServerStreamHello (Message) returns (stream Message) {} rpc MutualStreamHello (stream Message) returns (stream Message) {} } message Message { string message = 1; }
  • ClientStreamHello: 表示只有客户端使用了 Stream;
  • ServerStreamHello: 表示只有服务端使用了 Stream;
  • MutualStreamHello: 表示客户端和服务端都是使用了 Stream;

编写 loader.js,完成 proto 加载。

./loader.js
import { ProtoLoader } from 'grpcity' import path from 'node:path' import { fileURLToPath } from 'node:url' // __dirname for esm const __dirname = path.dirname(fileURLToPath(import.meta.url)) export default new ProtoLoader({ location: path.resolve(__dirname, './'), files: ['stream.proto'] })

服务端

完成Stream类的编写,提供了三种流式的服务端执行函数。分别是clientStreamHello(),serverStreamHello(),mutualStreamHello()

./server.js
class Stream { constructor () { this.count = 0 } // client stream - server clientStreamHello (call, callback) { call.on('data', (chunk) => { console.log(chunk.message) }) call.on('end', (chunk) => { callback(null, { message: "Hello! I'm fine, thank you!" }) }) } // client - server stream serverStreamHello (call) { console.log(call.request.message) call.write({ message: 'Hello! I got you message.' }) call.write({ message: "I'm fine, thank you" }) call.end() } // client stream - server stream mutualStreamHello (call) { call.on('data', (chunk) => { console.log(chunk.message) if (chunk.message === 'Hello!') { call.write({ message: 'Hello!' }) } else if (chunk.message === 'How are you?') { call.write({ message: "I'm fine, thank you" }) } else { call.write({ message: 'pardon?' }) } }) call.on('end', (chunk) => { call.end() }) } }

继续完成服务初始化和启动。

./server.js
async function start (addr) { await loader.init() const server = await loader.initServer() server.add('stream.Hellor', new Stream()) await server.listen(addr) console.log('start:', addr) } start('localhost:9097')

客户端

客户端使用Stream调用服务端,需要使用.call.里的函数才可以。示例如下:

./client.js
async function start (addr) { await loader.init() const clients = await loader.initClients({ services: { 'stream.Hellor': addr } }) const client = clients.get('stream.Hellor') const meta = loader.makeMetadata({ 'x-cache-control': 'max-age=100', 'x-business-id': ['grpcity', 'testing'], 'x-timestamp-client': 'begin=' + new Date().toISOString() }) // stream client to server const clientStreamHelloCall = client.call.clientStreamHello(meta, (err, response) => { if (err) { console.log(err) } else { console.log(response) } }) clientStreamHelloCall.write({ message: 'Hello!' }) clientStreamHelloCall.write({ message: 'How are you?' }) clientStreamHelloCall.end() // client to stream server const serverStreamHelloCall = client.call.serverStreamHello({ message: 'Hello! How are you?' }) serverStreamHelloCall.on('data', (chunk) => { console.log(chunk.message) }) serverStreamHelloCall.on('end', () => { console.log('server call end.') }) // stream client to stream server const mutualStreamHelloCall = client.call.mutualStreamHello() mutualStreamHelloCall.on('data', (chunk) => { console.log(chunk.message) }) mutualStreamHelloCall.on('end', () => { console.log('server call end.') }) mutualStreamHelloCall.write({ message: 'Hello!' }) mutualStreamHelloCall.write({ message: 'How are you?' }) mutualStreamHelloCall.end() } start('localhost:9097')

联调

client stream - server: 我们只执行客户端的clientStreamHello(),执行结果如下:

服务端:

Terminal
Hello! How are you?

客户端:

Terminal
{ message: "Hello! I'm fine, thank you!" }

client - server stream: 我们只执行客户端的serverStreamHello,执行结果如下:

服务端:

Terminal
Hello! How are you?

客户端:

Terminal
Hello! I got you message. I'm fine, thank you server call end.

client stream - server stream: 我们只执行客户端的mutualStreamHello,执行结果如下:

服务端:

Terminal
Hello! How are you?

客户端:

Terminal
Hello! I'm fine, thank you server call end.
Last updated on