Skip to content
Documentation
Async Stream

Async Stream

Here, we will demonstrate how to utilize the capability of async stream. It is recommended to first go through the Stream guide, as there are some similarities between the two.

Define proto

This proto defines three rpc methods, corresponding to the three ways of connecting streams between the client and the server: client-stream, server-stream, and duplex-stream.

stream.proto
syntax="proto3";
 
package stream;
 
service Hellor {
  rpc ClientStreamHello (stream Message) returns (Message) {}
  rpc ServerStreamHello (Message) returns (stream Message) {}
  rpc MutualStreamHello (stream Message) returns (stream Message) {}
}
 
message Message {
  string message = 1;
}

Load proto

./loader.js
import { ProtoLoader } from 'grpcity'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
 
// __dirname for esm
const __dirname = path.dirname(fileURLToPath(import.meta.url))
 
export default new ProtoLoader({
  location: path.resolve(__dirname, './'),
  files: ['stream.proto']
})

Client

Some preliminary work on the client side, such as loading the proto, initializing the client, and creating new metadata.

./client.js
import loader from "./loader.js"
 
const start = async (addr) => {
  await loader.init()
 
  const clients = await loader.initClients({
    services: {
      'stream.Hellor': addr
    }
  })
 
  const client = clients.get('stream.Hellor')
 
  const meta = loader.makeMetadata({
    'x-cache-control': 'max-age=100',
    'x-business-id': ['grpcity', 'testing'],
    'x-timestamp-client': 'begin=' + new Date().toISOString()
  })
}
 
start('localhost:9097')

Client-Stream

Client-stream means client stream request -> server response.

Let’s continue the client-side process for client-stream in the start method:

const start = async (addr) => {
  // ...
 
  // stream client to server
  const clientStreamHelloCall = await client.clientStreamHello(meta)
 
  // send stream message to server
  clientStreamHelloCall.write({ message: 'Hello!' })
  clientStreamHelloCall.write({ message: 'How are you?' })
 
  // end to send message and get server response
  const writeResult = await clientStreamHelloCall.writeEnd()
  console.log(writeResult)
 
  // ...
}

The result of running this code is as follows. We can see that writeEnd() returned the result of the server’s response to this call { status, metadata, response }, and the stream message was also sent to the server.

Terminal
// client
{
  metadata: Metadata {
    internalRepr: Map(8) {
      'content-type' => [Array],
      'x-cache-control' => [Array],
      'x-business-id' => [Array],
      'x-timestamp-client' => [Array],
      'x-client-hostname' => [Array],
      'user-agent' => [Array],
      'x-timestamp-server' => [Array],
      'date' => [Array]
    },
    options: {}
  },
  response: { message: "Hello! I'm fine, thank you!" },
  status: {
    code: 0,
    details: 'OK',
    metadata: Metadata { internalRepr: Map(0) {}, options: {} }
  }
}
 
// server
{ message: 'Hello!' }
{ message: 'How are you?' }

Server-Stream

Point-to-stream means client request -> server stream response.

Let’s continue the point-to-stream process in the start method:

const start = async (addr) => {
  // ...
 
  const serverStreamHelloCall = await client.serverStreamHello({ message: 'Hello! How are you?' }, meta)
  const serverReadAllResult = serverStreamHelloCall.readAll()
  for await (const data of serverReadAllResult) {
      console.log(data)
  }
  const serverReadEndResult = await serverStreamHelloCall.readEnd()
  console.log(serverReadEndResult)
 
  // ...
}

The result of running this code is as follows:

We see that readAll() returned an asyncIterator, and we need to use for await to get the server’s response to each stream message. readEnd() returned { status, metadata }, and we also see the server’s print record, indicating that it received the message sent by the client.

Terminal
// client
{ message: 'Hello! I got you message.' }
{ message: "I'm fine, thank you" }
{
  metadata: Metadata {
    internalRepr: Map(8) {
      'content-type' => [Array],
      'x-cache-control' => [Array],
      'x-business-id' => [Array],
      'x-timestamp-client' => [Array],
      'x-client-hostname' => [Array],
      'user-agent' => [Array],
      'x-timestamp-server' => [Array],
      'date' => [Array]
    },
    options: {}
  },
  status: {
    code: 0,
    details: 'OK',
    metadata: Metadata { internalRepr: Map(0) {}, options: {} }
  }
}
 
// server
{ message: 'Hello! How are you?' }

Duplex-Stream

Duplex-stream means client stream request -> server stream response.

Let’s continue the duplex-stream process in the start method. We first call writeAll() to send 3 messages, then call write() to send 1 message, and finally call readAll() to get the server’s response.

const start = async (addr) => {
  // ...
 
  const mutualStreamHelloCall = await client.mutualStreamHello(meta)
  mutualStreamHelloCall.writeAll([
    { message: 'Hello!' },
    { message: 'How are you?' },
    { message: 'other thing x' }
  ])
  mutualStreamHelloCall.write({ message: 'maybe' })
 
  const mutualReadAllResult = mutualStreamHelloCall.readAll()
  for await (const data of mutualReadAllResult) {
    if (data.message === 'delay 1s') {
      mutualStreamHelloCall.write({ message: 'ok, I known you delay 1s' })
      mutualStreamHelloCall.writeEnd()
    }
    console.log(data)
  }
 
  const mutualReadEndResult = await mutualStreamHelloCall.readEnd()
  console.log(mutualReadEndResult)
 
  // ..
}

The result of running this code is as follows. We see that we first send the messages to the server after the client finishes sending the message, call readAll() to get the asyncIterator, and use for await to get the server’s response to each stream message. Finally, call readEnd() to get the { status, metadata } from the server.

Terminal
// client
{ message: 'emmm...' }
{ message: 'Hello too.' }
{ message: "I'm fine, thank you" }
{ message: 'pardon?' }
{ message: 'pardon?' }
{ message: 'delay 1s' }
{
  metadata: Metadata {
    internalRepr: Map(8) {
      'content-type' => [Array],
      'x-cache-control' => [Array],
      'x-business-id' => [Array],
      'x-timestamp-client' => [Array],
      'x-client-hostname' => [Array],
      'user-agent' => [Array],
      'x-timestamp-server' => [Array],
      'date' => [Array]
    },
    options: {}
  },
  status: {
    code: 0,
    details: 'OK',
    metadata: Metadata { internalRepr: Map(0) {}, options: {} }
  }
}
 
// server
Hello!
How are you?
other thing x
maybe
client call end.

Server

Here, we sequentially implement the server’s interaction functions for stream. Below is our server startup function. We only need to use the new capabilities provided by gRPCity to implement the methods of the Stream class.

./server.js
import loader from "./loader.js"
 
const start = async (addr) => {
  await loader.init()
 
  const server = await loader.initServer()
  server.add('stream.Hellor', new Stream())
 
  await server.listen(addr)
  console.log('start:', addr)
}
 
start('localhost:9097')

Client-Stream

For client-stream, the server simply reads normally, processes the data, and returns the result after completion. The interface for the server to read stream information is call.readAll(), which returns an asyncIterator object, and we need to use for await to read the stream messages one by one.

class Stream {
  // ...
  async clientStreamHello (call) {
    const metadata = call.metadata.clone()
    metadata.add('x-timestamp-server', 'received=' + new Date().toISOString())
    call.sendMetadata(metadata)
 
    for await (const data of call.readAll()) {
      console.log(data)
    }
    return { message: "Hello! I'm fine, thank you!" }
  }
  // ...
}

Server-Stream

For point-to-stream, the server receives the client’s request, then continuously sends stream information to the client until the function is completed. call.write(), call.writeAll(), and call.end() provide the functionality for the server to complete sending stream information.

  • call.write(): Sends a message to the client;
  • call.writeAll(): Sends multiple messages to the client;
  • call.end(): Indicates that the server has finished sending information and notifies the client;
class Stream {
  // ...
  async serverStreamHello (call) {
    const metadata = call.metadata.clone()
    metadata.add('x-timestamp-server', 'received=' + new Date().toISOString())
    call.sendMetadata(metadata)
 
    console.log(call.request.message)
    call.write({ message: 'Hello! I got you message.' })
    call.write({ message: "I'm fine, thank you" })
    call.writeAll([
      { message: 'other thing x' },
      { message: 'other thing y' }
    ])
    call.end()
  }
  // ...
}

Duplex-Stream

For duplex-stream, both the client and the server use stream communication. call.write(), call.writeAll(), call.readAll(), and call.end() provide the functionality for the server to complete sending stream information.

  • call.write(): Sends a message to the client;
  • call.writeAll(): Sends multiple messages to the client;
  • call.readAll(): Returns an asyncIterator object, and we need to use for await to read the messages returned by the client;
  • call.end(): Indicates that the server has finished sending information and notifies the client;
class Stream {
  // ...
  async mutualStreamHello (call) {
    const metadata = call.metadata.clone()
    metadata.add('x-timestamp-server', 'received=' + new Date().toISOString())
    call.sendMetadata(metadata)
 
    call.write({ message: 'emmm...' })
 
    for await (const data of call.readAll()) {
      console.log(data.message)
      if (data.message === 'Hello!') {
        call.write({ message: 'Hello too.' })
      } else if (data.message === 'How are you?') {
        call.write({ message: 'I\'m fine, thank you' })
        await timeout(1000)
        call.write({ message: 'delay 1s' })
        call.writeAll([
          { message: 'emm... ' },
          { message: 'emm......' }
        ])
      } else {
        call.write({ message: 'pardon?' })
      }
    }
 
    call.end()
  }
  // ...
}

Note:

  • async stream supports middleware;
  • call proxy’s readAll(), writeAll(), and writeEnd() need to be async func;
  • Pay attention to whether it is client-stream, server-stream, or duplex-stream.