Async Stream
下面我们将会展示如何使用async stream
的能力。在这里也建议先看一遍 Stream,该文会有一些相似的地方在本文出现。
定义 proto
该 proto
定义了三个 rpc
,正好对应了客户端与服务端的三种流对接方式,分别是流对点、点对流和流对流。
syntax="proto3";
package stream;
service Hellor {
rpc ClientStreamHello (stream Message) returns (Message) {}
rpc ServerStreamHello (Message) returns (stream Message) {}
rpc MutualStreamHello (stream Message) returns (stream Message) {}
}
message Message {
string message = 1;
}
加载 proto
import { ProtoLoader } from 'grpcity'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
// __dirname for esm
const __dirname = path.dirname(fileURLToPath(import.meta.url))
export default new ProtoLoader({
location: path.resolve(__dirname, './'),
files: ['stream.proto']
})
客户端
客户端的一些前期工作,如加载 proto
,初始化客户端,创建新的 metadata
.
import loader from "./loader.js"
const start = async (addr) => {
await loader.init()
const clients = await loader.initClients({
services: {
'stream.Hellor': addr
}
})
const client = clients.get('stream.Hellor')
const meta = loader.makeMetadata({
'x-cache-control': 'max-age=100',
'x-business-id': ['grpcity', 'testing'],
'x-timestamp-client': 'begin=' + new Date().toISOString()
})
}
start('localhost:9097')
流对点
流对点,指client stream request
-> server response
。
我们继续在start
方法里流对点的流程:
const start = async (addr) => {
// ...
// stream client to server
const clientStreamHelloCall = await client.clientStreamHello(meta)
// send stream message to server
clientStreamHelloCall.write({ message: 'Hello!' })
clientStreamHelloCall.write({ message: 'How are you?' })
// end to send message and get server response
const writeResult = await clientStreamHelloCall.writeEnd()
console.log(writeResult)
// ...
}
运行结果如下,我们看到writeEnd()
返回了这次调用服务端的返回结果{ status, metadata, response }
,同时也把stream message
发送到了服务端。
// client
{
metadata: Metadata {
internalRepr: Map(8) {
'content-type' => [Array],
'x-cache-control' => [Array],
'x-business-id' => [Array],
'x-timestamp-client' => [Array],
'x-client-hostname' => [Array],
'user-agent' => [Array],
'x-timestamp-server' => [Array],
'date' => [Array]
},
options: {}
},
response: { message: "Hello! I'm fine, thank you!" },
status: {
code: 0,
details: 'OK',
metadata: Metadata { internalRepr: Map(0) {}, options: {} }
}
}
// server
{ message: 'Hello!' }
{ message: 'How are you?' }
点对流
点对流,指client request -> server stream response
。
我们继续在start
方法里点对流的流程:
const start = async (addr) => {
// ...
const serverStreamHelloCall = await client.serverStreamHello({ message: 'Hello! How are you?' }, meta)
const serverReadAllResult = serverStreamHelloCall.readAll()
for await (const data of serverReadAllResult) {
console.log(data)
}
const serverReadEndResult = await serverStreamHelloCall.readEnd()
console.log(serverReadEndResult)
// ...
}
运行结果如下:
我们看到readAll()
返回了一个asyncIterator
,需要我们使用for await
获取服务端返回的逐条stream message
结果。
而readEnd()
返回了{ status, metadata }
,同时也看到服务端的打印记录,也接收到了客户端发送的message
。
// client
{ message: 'Hello! I got you message.' }
{ message: "I'm fine, thank you" }
{
metadata: Metadata {
internalRepr: Map(8) {
'content-type' => [Array],
'x-cache-control' => [Array],
'x-business-id' => [Array],
'x-timestamp-client' => [Array],
'x-client-hostname' => [Array],
'user-agent' => [Array],
'x-timestamp-server' => [Array],
'date' => [Array]
},
options: {}
},
status: {
code: 0,
details: 'OK',
metadata: Metadata { internalRepr: Map(0) {}, options: {} }
}
}
// server
{ message: 'Hello! How are you?' }
流对流
流对流,指client stream request -> server stream response
。
我们继续在start
方法里流对流的流程。我们先调用writeAll()
发送3条消息,然后再调用write()
发送1条消息,最后调用readAll()
获取服务端返回的结果。
const start = async (addr) => {
// ...
const mutualStreamHelloCall = await client.mutualStreamHello(meta)
mutualStreamHelloCall.writeAll([
{ message: 'Hello!' },
{ message: 'How are you?' },
{ message: 'other thing x' }
])
mutualStreamHelloCall.write({ message: 'maybe' })
const mutualReadAllResult = mutualStreamHelloCall.readAll()
for await (const data of mutualReadAllResult) {
if (data.message === 'delay 1s') {
mutualStreamHelloCall.write({ message: 'ok, I known you delay 1s' })
mutualStreamHelloCall.writeEnd()
}
console.log(data)
}
const mutualReadEndResult = await mutualStreamHelloCall.readEnd()
console.log(mutualReadEndResult)
// ..
}
运行结果如下,我们看到我们是先客户端发送消息完给服务端后,调用readAll()
,获取asyncIterator
,需要我们使用for await
获取服务端返回的逐条stream message
结果。
最后,调用readEnd()
获取服务端结束流信息返回的{ status, metadata }
。
// client
{ message: 'emmm...' }
{ message: 'Hello too.' }
{ message: "I'm fine, thank you" }
{ message: 'pardon?' }
{ message: 'pardon?' }
{ message: 'delay 1s' }
{
metadata: Metadata {
internalRepr: Map(8) {
'content-type' => [Array],
'x-cache-control' => [Array],
'x-business-id' => [Array],
'x-timestamp-client' => [Array],
'x-client-hostname' => [Array],
'user-agent' => [Array],
'x-timestamp-server' => [Array],
'date' => [Array]
},
options: {}
},
status: {
code: 0,
details: 'OK',
metadata: Metadata { internalRepr: Map(0) {}, options: {} }
}
}
// server
Hello!
How are you?
other thing x
maybe
client call end.
服务端
我们在这里依次实现服务端 stream
的交互功能。下面是我们的服务端启动函数,我们只需要使用 gRPCity
提供了新能力实现 Stream
类的方法即可。
import loader from "./loader.js"
const start = async (addr) => {
await loader.init()
const server = await loader.initServer()
server.add('stream.Hellor', new Stream())
await server.listen(addr)
console.log('start:', addr)
}
start('localhost:9097')
流对点
客户端是流信息发送,服务端只需要正常读,处理完后返回结果。
服务端读流信息的接口是call.readAll()
,返回的是一个asyncIterator
对象,需要我们使用for await
逐条读取。
class Stream {
// ...
async clientStreamHello (call) {
const metadata = call.metadata.clone()
metadata.add('x-timestamp-server', 'received=' + new Date().toISOString())
call.sendMetadata(metadata)
for await (const data of call.readAll()) {
console.log(data)
}
return { message: "Hello! I'm fine, thank you!" }
}
// ...
}
点对流
服务端接收客户端发送的请求后,然后不断发送流信息给客户端直至函数处理完成。
call.write()
、call.writeAll()
和 call.end()
提供了服务端完成流信息发送的功能。
call.write()
: 发送一条信息到客户端;call.writeAll()
: 发送多条信息到客户端;call.end()
: 服务端发送信息已结束,并通知客户端;
class Stream {
// ...
async serverStreamHello (call) {
const metadata = call.metadata.clone()
metadata.add('x-timestamp-server', 'received=' + new Date().toISOString())
call.sendMetadata(metadata)
console.log(call.request.message)
call.write({ message: 'Hello! I got you message.' })
call.write({ message: "I'm fine, thank you" })
call.writeAll([
{ message: 'other thing x' },
{ message: 'other thing y' }
])
call.end()
}
// ...
}
流对流
客户端和服务端都采用流的方式进行交互。
call.write()
、call.writeAll()
、call.readAll()
和 call.end()
提供了服务端完成流信息发送的功能。
call.write()
: 发送一条信息到客户端;call.writeAll()
: 发送多条信息到客户端;call.readAll()
: 返回一个asyncIterator
对象,需要我们使用for await
逐条读取客户端返回的信息;call.end()
: 服务端发送信息已结束,并通知客户端;
class Stream {
// ...
async mutualStreamHello (call) {
const metadata = call.metadata.clone()
metadata.add('x-timestamp-server', 'received=' + new Date().toISOString())
call.sendMetadata(metadata)
call.write({ message: 'emmm...' })
for await (const data of call.readAll()) {
console.log(data.message)
if (data.message === 'Hello!') {
call.write({ message: 'Hello too.' })
} else if (data.message === 'How are you?') {
call.write({ message: 'I\'m fine, thank you' })
await timeout(1000)
call.write({ message: 'delay 1s' })
call.writeAll([
{ message: 'emm... ' },
{ message: 'emm......' }
])
} else {
call.write({ message: 'pardon?' })
}
}
call.end()
}
// ...
}
注意:
async stream
支持中间件;call proxy
中的readAll
、writeAll
、writeEnd
需要是async func
;- 要注意区分是 流对点、点对流、还是流对流;