Skip to content
文档
Stream

Stream

gRPCity 库提供了完整的 Stream 处理能力,流式 rpc 可以满足文件上传,事件机制,任务监听,指标回播等业务场景。 目前流式 rpc 有三种情况,以client - server模式举例,分别是client stream - serverclient - server streamclient stream - server stream

这里展示的使用callback和事件机制来完成流处理工作。使用的是clientproxycall里的方法。

如果需要获取async stream的用法,可以查看 Async Stream 指南。

前置工作

我们的 proto 文件里的 servicerpc,需要标记清楚 使用了stream,如下所示:

stream.proto
syntax="proto3";
 
package stream;
 
service Hellor {
  rpc ClientStreamHello (stream Message) returns (Message) {}
  rpc ServerStreamHello (Message) returns (stream Message) {}
  rpc MutualStreamHello (stream Message) returns (stream Message) {}
}
 
message Message {
  string message = 1;
}
  • ClientStreamHello: 表示只有客户端使用了 Stream;
  • ServerStreamHello: 表示只有服务端使用了 Stream;
  • MutualStreamHello: 表示客户端和服务端都是使用了 Stream;

编写 loader.js,完成 proto 加载。

./loader.js
import { ProtoLoader } from 'grpcity'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
 
// __dirname for esm
const __dirname = path.dirname(fileURLToPath(import.meta.url))
 
export default new ProtoLoader({
    location: path.resolve(__dirname, './'),
    files: ['stream.proto']
})

服务端

完成Stream类的编写,提供了三种流式的服务端执行函数。分别是clientStreamHello(),serverStreamHello(),mutualStreamHello()

./server.js
class Stream {
  constructor () {
    this.count = 0
  }
 
  // client stream - server
  clientStreamHello (call, callback) {
    call.on('data', (chunk) => {
      console.log(chunk.message)
    })
    call.on('end', (chunk) => {
      callback(null, { message: "Hello! I'm fine, thank you!" })
    })
  }
 
  // client - server stream
  serverStreamHello (call) {
    console.log(call.request.message)
    call.write({ message: 'Hello! I got you message.' })
    call.write({ message: "I'm fine, thank you" })
    call.end()
  }
 
  // client stream - server stream
  mutualStreamHello (call) {
    call.on('data', (chunk) => {
      console.log(chunk.message)
      if (chunk.message === 'Hello!') {
        call.write({ message: 'Hello!' })
      } else if (chunk.message === 'How are you?') {
        call.write({ message: "I'm fine, thank you" })
      } else {
        call.write({ message: 'pardon?' })
      }
    })
    call.on('end', (chunk) => {
      call.end()
    })
  }
}

继续完成服务初始化和启动。

./server.js
async function start (addr) {
  await loader.init()
 
  const server = await loader.initServer()
  server.add('stream.Hellor', new Stream())
 
  await server.listen(addr)
  console.log('start:', addr)
}
 
start('localhost:9097')

客户端

客户端使用Stream调用服务端,需要使用.call.里的函数才可以。示例如下:

./client.js
async function start (addr) {
    await loader.init()
 
    const clients = await loader.initClients({
        services: {
            'stream.Hellor': addr
        }
    })
 
    const client = clients.get('stream.Hellor')
 
    const meta = loader.makeMetadata({
        'x-cache-control': 'max-age=100',
        'x-business-id': ['grpcity', 'testing'],
        'x-timestamp-client': 'begin=' + new Date().toISOString()
    })
 
    // stream client to server
    const clientStreamHelloCall = client.call.clientStreamHello(meta, (err, response) => {
        if (err) {
            console.log(err)
        } else {
            console.log(response)
        }
    })
    clientStreamHelloCall.write({ message: 'Hello!' })
    clientStreamHelloCall.write({ message: 'How are you?' })
    clientStreamHelloCall.end()
 
    // client to stream server
    const serverStreamHelloCall = client.call.serverStreamHello({ message: 'Hello! How are you?' })
    serverStreamHelloCall.on('data', (chunk) => {
        console.log(chunk.message)
    })
    serverStreamHelloCall.on('end', () => {
        console.log('server call end.')
    })
 
    // stream client to stream server
    const mutualStreamHelloCall = client.call.mutualStreamHello()
    mutualStreamHelloCall.on('data', (chunk) => {
        console.log(chunk.message)
    })
    mutualStreamHelloCall.on('end', () => {
        console.log('server call end.')
    })
    mutualStreamHelloCall.write({ message: 'Hello!' })
    mutualStreamHelloCall.write({ message: 'How are you?' })
    mutualStreamHelloCall.end()
}
 
start('localhost:9097')

联调

client stream - server: 我们只执行客户端的clientStreamHello(),执行结果如下:

服务端:

Terminal
Hello!
How are you?

客户端:

Terminal
{ message: "Hello! I'm fine, thank you!" }

client - server stream: 我们只执行客户端的serverStreamHello,执行结果如下:

服务端:

Terminal
Hello! How are you?

客户端:

Terminal
Hello! I got you message.
I'm fine, thank you
server call end.

client stream - server stream: 我们只执行客户端的mutualStreamHello,执行结果如下:

服务端:

Terminal
Hello!
How are you?

客户端:

Terminal
Hello!
I'm fine, thank you
server call end.