Stream
gRPCity
库提供了完整的 Stream 处理能力,流式 rpc 可以满足文件上传,事件机制,任务监听,指标回播等业务场景。
目前流式 rpc 有三种情况,以client - server
模式举例,分别是client stream - server
、client - server stream
和 client stream - server stream
。
这里展示的使用callback
和事件机制来完成流处理工作。使用的是client
的proxy
的call
里的方法。
如果需要获取async stream
的用法,可以查看 Async Stream 指南。
前置工作
我们的 proto 文件里的 service
的rpc
,需要标记清楚 使用了stream
,如下所示:
stream.proto
syntax="proto3";
package stream;
service Hellor {
rpc ClientStreamHello (stream Message) returns (Message) {}
rpc ServerStreamHello (Message) returns (stream Message) {}
rpc MutualStreamHello (stream Message) returns (stream Message) {}
}
message Message {
string message = 1;
}
ClientStreamHello
: 表示只有客户端使用了 Stream;ServerStreamHello
: 表示只有服务端使用了 Stream;MutualStreamHello
: 表示客户端和服务端都是使用了 Stream;
编写 loader.js
,完成 proto 加载。
./loader.js
import { ProtoLoader } from 'grpcity'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
// __dirname for esm
const __dirname = path.dirname(fileURLToPath(import.meta.url))
export default new ProtoLoader({
location: path.resolve(__dirname, './'),
files: ['stream.proto']
})
服务端
完成Stream
类的编写,提供了三种流式的服务端执行函数。分别是clientStreamHello()
,serverStreamHello()
,mutualStreamHello()
。
./server.js
class Stream {
constructor () {
this.count = 0
}
// client stream - server
clientStreamHello (call, callback) {
call.on('data', (chunk) => {
console.log(chunk.message)
})
call.on('end', (chunk) => {
callback(null, { message: "Hello! I'm fine, thank you!" })
})
}
// client - server stream
serverStreamHello (call) {
console.log(call.request.message)
call.write({ message: 'Hello! I got you message.' })
call.write({ message: "I'm fine, thank you" })
call.end()
}
// client stream - server stream
mutualStreamHello (call) {
call.on('data', (chunk) => {
console.log(chunk.message)
if (chunk.message === 'Hello!') {
call.write({ message: 'Hello!' })
} else if (chunk.message === 'How are you?') {
call.write({ message: "I'm fine, thank you" })
} else {
call.write({ message: 'pardon?' })
}
})
call.on('end', (chunk) => {
call.end()
})
}
}
继续完成服务初始化和启动。
./server.js
async function start (addr) {
await loader.init()
const server = await loader.initServer()
server.add('stream.Hellor', new Stream())
await server.listen(addr)
console.log('start:', addr)
}
start('localhost:9097')
客户端
客户端使用Stream
调用服务端,需要使用.call.
里的函数才可以。示例如下:
./client.js
async function start (addr) {
await loader.init()
const clients = await loader.initClients({
services: {
'stream.Hellor': addr
}
})
const client = clients.get('stream.Hellor')
const meta = loader.makeMetadata({
'x-cache-control': 'max-age=100',
'x-business-id': ['grpcity', 'testing'],
'x-timestamp-client': 'begin=' + new Date().toISOString()
})
// stream client to server
const clientStreamHelloCall = client.call.clientStreamHello(meta, (err, response) => {
if (err) {
console.log(err)
} else {
console.log(response)
}
})
clientStreamHelloCall.write({ message: 'Hello!' })
clientStreamHelloCall.write({ message: 'How are you?' })
clientStreamHelloCall.end()
// client to stream server
const serverStreamHelloCall = client.call.serverStreamHello({ message: 'Hello! How are you?' })
serverStreamHelloCall.on('data', (chunk) => {
console.log(chunk.message)
})
serverStreamHelloCall.on('end', () => {
console.log('server call end.')
})
// stream client to stream server
const mutualStreamHelloCall = client.call.mutualStreamHello()
mutualStreamHelloCall.on('data', (chunk) => {
console.log(chunk.message)
})
mutualStreamHelloCall.on('end', () => {
console.log('server call end.')
})
mutualStreamHelloCall.write({ message: 'Hello!' })
mutualStreamHelloCall.write({ message: 'How are you?' })
mutualStreamHelloCall.end()
}
start('localhost:9097')
联调
client stream - server
: 我们只执行客户端的clientStreamHello()
,执行结果如下:
服务端:
Terminal
Hello!
How are you?
客户端:
Terminal
{ message: "Hello! I'm fine, thank you!" }
client - server stream
: 我们只执行客户端的serverStreamHello
,执行结果如下:
服务端:
Terminal
Hello! How are you?
客户端:
Terminal
Hello! I got you message.
I'm fine, thank you
server call end.
client stream - server stream
: 我们只执行客户端的mutualStreamHello
,执行结果如下:
服务端:
Terminal
Hello!
How are you?
客户端:
Terminal
Hello!
I'm fine, thank you
server call end.