Skip to content
文档
Async Stream

Async Stream

下面我们将会展示如何使用async stream的能力。在这里也建议先看一遍 Stream,该文会有一些相似的地方在本文出现。

定义 proto

proto 定义了三个 rpc,正好对应了客户端与服务端的三种流对接方式,分别是流对点、点对流和流对流。

stream.proto
syntax="proto3";
 
package stream;
 
service Hellor {
  rpc ClientStreamHello (stream Message) returns (Message) {}
  rpc ServerStreamHello (Message) returns (stream Message) {}
  rpc MutualStreamHello (stream Message) returns (stream Message) {}
}
 
message Message {
  string message = 1;
}

加载 proto

./loader.js
import { ProtoLoader } from 'grpcity'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
 
// __dirname for esm
const __dirname = path.dirname(fileURLToPath(import.meta.url))
 
export default new ProtoLoader({
  location: path.resolve(__dirname, './'),
  files: ['stream.proto']
})

客户端

客户端的一些前期工作,如加载 proto,初始化客户端,创建新的 metadata.

./client.js
import loader from "./loader.js"
 
const start = async (addr) => {
  await loader.init()
 
  const clients = await loader.initClients({
    services: {
      'stream.Hellor': addr
    }
  })
 
  const client = clients.get('stream.Hellor')
 
  const meta = loader.makeMetadata({
    'x-cache-control': 'max-age=100',
    'x-business-id': ['grpcity', 'testing'],
    'x-timestamp-client': 'begin=' + new Date().toISOString()
  })
}
 
start('localhost:9097')

流对点

流对点,指client stream request -> server response

我们继续在start方法里流对点的流程:

const start = async (addr) => {
  // ...
 
  // stream client to server
  const clientStreamHelloCall = await client.clientStreamHello(meta)
 
  // send stream message to server
  clientStreamHelloCall.write({ message: 'Hello!' })
  clientStreamHelloCall.write({ message: 'How are you?' })
 
  // end to send message and get server response
  const writeResult = await clientStreamHelloCall.writeEnd()
  console.log(writeResult)
 
  // ...
}

运行结果如下,我们看到writeEnd()返回了这次调用服务端的返回结果{ status, metadata, response },同时也把stream message发送到了服务端。

Terminal
// client
{
  metadata: Metadata {
    internalRepr: Map(8) {
      'content-type' => [Array],
      'x-cache-control' => [Array],
      'x-business-id' => [Array],
      'x-timestamp-client' => [Array],
      'x-client-hostname' => [Array],
      'user-agent' => [Array],
      'x-timestamp-server' => [Array],
      'date' => [Array]
    },
    options: {}
  },
  response: { message: "Hello! I'm fine, thank you!" },
  status: {
    code: 0,
    details: 'OK',
    metadata: Metadata { internalRepr: Map(0) {}, options: {} }
  }
}
 
// server
{ message: 'Hello!' }
{ message: 'How are you?' }

点对流

点对流,指client request -> server stream response

我们继续在start方法里点对流的流程:

const start = async (addr) => {
  // ...
 
  const serverStreamHelloCall = await client.serverStreamHello({ message: 'Hello! How are you?' }, meta)
  const serverReadAllResult = serverStreamHelloCall.readAll()
  for await (const data of serverReadAllResult) {
      console.log(data)
  }
  const serverReadEndResult = await serverStreamHelloCall.readEnd()
  console.log(serverReadEndResult)
 
  // ...
}

运行结果如下:

我们看到readAll()返回了一个asyncIterator,需要我们使用for await获取服务端返回的逐条stream message结果。 而readEnd()返回了{ status, metadata },同时也看到服务端的打印记录,也接收到了客户端发送的message

Terminal
// client
{ message: 'Hello! I got you message.' }
{ message: "I'm fine, thank you" }
{
  metadata: Metadata {
    internalRepr: Map(8) {
      'content-type' => [Array],
      'x-cache-control' => [Array],
      'x-business-id' => [Array],
      'x-timestamp-client' => [Array],
      'x-client-hostname' => [Array],
      'user-agent' => [Array],
      'x-timestamp-server' => [Array],
      'date' => [Array]
    },
    options: {}
  },
  status: {
    code: 0,
    details: 'OK',
    metadata: Metadata { internalRepr: Map(0) {}, options: {} }
  }
}
 
// server
{ message: 'Hello! How are you?' }

流对流

流对流,指client stream request -> server stream response

我们继续在start方法里流对流的流程。我们先调用writeAll()发送3条消息,然后再调用write()发送1条消息,最后调用readAll()获取服务端返回的结果。

const start = async (addr) => {
  // ...
 
  const mutualStreamHelloCall = await client.mutualStreamHello(meta)
  mutualStreamHelloCall.writeAll([
    { message: 'Hello!' },
    { message: 'How are you?' },
    { message: 'other thing x' }
  ])
  mutualStreamHelloCall.write({ message: 'maybe' })
 
  const mutualReadAllResult = mutualStreamHelloCall.readAll()
  for await (const data of mutualReadAllResult) {
    if (data.message === 'delay 1s') {
      mutualStreamHelloCall.write({ message: 'ok, I known you delay 1s' })
      mutualStreamHelloCall.writeEnd()
    }
    console.log(data)
  }
 
  const mutualReadEndResult = await mutualStreamHelloCall.readEnd()
  console.log(mutualReadEndResult)
 
  // ..
}

运行结果如下,我们看到我们是先客户端发送消息完给服务端后,调用readAll(),获取asyncIterator,需要我们使用for await获取服务端返回的逐条stream message结果。 最后,调用readEnd()获取服务端结束流信息返回的{ status, metadata }

Terminal
// client
{ message: 'emmm...' }
{ message: 'Hello too.' }
{ message: "I'm fine, thank you" }
{ message: 'pardon?' }
{ message: 'pardon?' }
{ message: 'delay 1s' }
{
  metadata: Metadata {
    internalRepr: Map(8) {
      'content-type' => [Array],
      'x-cache-control' => [Array],
      'x-business-id' => [Array],
      'x-timestamp-client' => [Array],
      'x-client-hostname' => [Array],
      'user-agent' => [Array],
      'x-timestamp-server' => [Array],
      'date' => [Array]
    },
    options: {}
  },
  status: {
    code: 0,
    details: 'OK',
    metadata: Metadata { internalRepr: Map(0) {}, options: {} }
  }
}
 
// server
Hello!
How are you?
other thing x
maybe
client call end.

服务端

我们在这里依次实现服务端 stream 的交互功能。下面是我们的服务端启动函数,我们只需要使用 gRPCity 提供了新能力实现 Stream 类的方法即可。

./server.js
import loader from "./loader.js"
 
const start = async (addr) => {
  await loader.init()
 
  const server = await loader.initServer()
  server.add('stream.Hellor', new Stream())
 
  await server.listen(addr)
  console.log('start:', addr)
}
 
start('localhost:9097')

流对点

客户端是流信息发送,服务端只需要正常读,处理完后返回结果。 服务端读流信息的接口是call.readAll(),返回的是一个asyncIterator对象,需要我们使用for await逐条读取。

class Stream {
  // ...
  async clientStreamHello (call) {
    const metadata = call.metadata.clone()
    metadata.add('x-timestamp-server', 'received=' + new Date().toISOString())
    call.sendMetadata(metadata)
 
    for await (const data of call.readAll()) {
      console.log(data)
    }
    return { message: "Hello! I'm fine, thank you!" }
  }
  // ...
}

点对流

服务端接收客户端发送的请求后,然后不断发送流信息给客户端直至函数处理完成。 call.write()call.writeAll()call.end() 提供了服务端完成流信息发送的功能。

  • call.write(): 发送一条信息到客户端;
  • call.writeAll(): 发送多条信息到客户端;
  • call.end(): 服务端发送信息已结束,并通知客户端;
class Stream {
  // ...
  async serverStreamHello (call) {
    const metadata = call.metadata.clone()
    metadata.add('x-timestamp-server', 'received=' + new Date().toISOString())
    call.sendMetadata(metadata)
 
    console.log(call.request.message)
    call.write({ message: 'Hello! I got you message.' })
    call.write({ message: "I'm fine, thank you" })
    call.writeAll([
      { message: 'other thing x' },
      { message: 'other thing y' }
    ])
    call.end()
  }
  // ...
}

流对流

客户端和服务端都采用流的方式进行交互。 call.write()call.writeAll()call.readAll()call.end() 提供了服务端完成流信息发送的功能。

  • call.write(): 发送一条信息到客户端;
  • call.writeAll(): 发送多条信息到客户端;
  • call.readAll(): 返回一个asyncIterator对象,需要我们使用for await逐条读取客户端返回的信息;
  • call.end(): 服务端发送信息已结束,并通知客户端;
class Stream {
  // ...
  async mutualStreamHello (call) {
    const metadata = call.metadata.clone()
    metadata.add('x-timestamp-server', 'received=' + new Date().toISOString())
    call.sendMetadata(metadata)
 
    call.write({ message: 'emmm...' })
 
    for await (const data of call.readAll()) {
      console.log(data.message)
      if (data.message === 'Hello!') {
        call.write({ message: 'Hello too.' })
      } else if (data.message === 'How are you?') {
        call.write({ message: 'I\'m fine, thank you' })
        await timeout(1000)
        call.write({ message: 'delay 1s' })
        call.writeAll([
          { message: 'emm... ' },
          { message: 'emm......' }
        ])
      } else {
        call.write({ message: 'pardon?' })
      }
    }
 
    call.end()
  }
  // ...
}

注意:

  • async stream 支持中间件;
  • call proxy 中的 readAllwriteAllwriteEnd 需要是 async func;
  • 要注意区分是 流对点、点对流、还是流对流;