Async Stream
Here, we will demonstrate how to utilize the capability of async stream
. It is recommended to first go through the Stream guide, as there are some similarities between the two.
Define proto
This proto
defines three rpc
methods, corresponding to the three ways of connecting streams between the client and the server: client-stream, server-stream, and duplex-stream.
syntax="proto3";
package stream;
service Hellor {
rpc ClientStreamHello (stream Message) returns (Message) {}
rpc ServerStreamHello (Message) returns (stream Message) {}
rpc MutualStreamHello (stream Message) returns (stream Message) {}
}
message Message {
string message = 1;
}
Load proto
import { ProtoLoader } from 'grpcity'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
// __dirname for esm
const __dirname = path.dirname(fileURLToPath(import.meta.url))
export default new ProtoLoader({
location: path.resolve(__dirname, './'),
files: ['stream.proto']
})
Client
Some preliminary work on the client side, such as loading the proto
, initializing the client, and creating new metadata
.
import loader from "./loader.js"
const start = async (addr) => {
await loader.init()
const clients = await loader.initClients({
services: {
'stream.Hellor': addr
}
})
const client = clients.get('stream.Hellor')
const meta = loader.makeMetadata({
'x-cache-control': 'max-age=100',
'x-business-id': ['grpcity', 'testing'],
'x-timestamp-client': 'begin=' + new Date().toISOString()
})
}
start('localhost:9097')
Client-Stream
Client-stream means client stream request
-> server response
.
Let’s continue the client-side process for client-stream in the start
method:
const start = async (addr) => {
// ...
// stream client to server
const clientStreamHelloCall = await client.clientStreamHello(meta)
// send stream message to server
clientStreamHelloCall.write({ message: 'Hello!' })
clientStreamHelloCall.write({ message: 'How are you?' })
// end to send message and get server response
const writeResult = await clientStreamHelloCall.writeEnd()
console.log(writeResult)
// ...
}
The result of running this code is as follows. We can see that writeEnd()
returned the result of the server’s response to this call { status, metadata, response }
, and the stream message
was also sent to the server.
// client
{
metadata: Metadata {
internalRepr: Map(8) {
'content-type' => [Array],
'x-cache-control' => [Array],
'x-business-id' => [Array],
'x-timestamp-client' => [Array],
'x-client-hostname' => [Array],
'user-agent' => [Array],
'x-timestamp-server' => [Array],
'date' => [Array]
},
options: {}
},
response: { message: "Hello! I'm fine, thank you!" },
status: {
code: 0,
details: 'OK',
metadata: Metadata { internalRepr: Map(0) {}, options: {} }
}
}
// server
{ message: 'Hello!' }
{ message: 'How are you?' }
Server-Stream
Point-to-stream means client request -> server stream response
.
Let’s continue the point-to-stream process in the start
method:
const start = async (addr) => {
// ...
const serverStreamHelloCall = await client.serverStreamHello({ message: 'Hello! How are you?' }, meta)
const serverReadAllResult = serverStreamHelloCall.readAll()
for await (const data of serverReadAllResult) {
console.log(data)
}
const serverReadEndResult = await serverStreamHelloCall.readEnd()
console.log(serverReadEndResult)
// ...
}
The result of running this code is as follows:
We see that readAll()
returned an asyncIterator
, and we need to use for await
to get the server’s response to each stream message
.
readEnd()
returned { status, metadata }
, and we also see the server’s print record, indicating that it received the message
sent by the client.
// client
{ message: 'Hello! I got you message.' }
{ message: "I'm fine, thank you" }
{
metadata: Metadata {
internalRepr: Map(8) {
'content-type' => [Array],
'x-cache-control' => [Array],
'x-business-id' => [Array],
'x-timestamp-client' => [Array],
'x-client-hostname' => [Array],
'user-agent' => [Array],
'x-timestamp-server' => [Array],
'date' => [Array]
},
options: {}
},
status: {
code: 0,
details: 'OK',
metadata: Metadata { internalRepr: Map(0) {}, options: {} }
}
}
// server
{ message: 'Hello! How are you?' }
Duplex-Stream
Duplex-stream means client stream request -> server stream response
.
Let’s continue the duplex-stream process in the start
method. We first call writeAll()
to send 3 messages, then call write()
to send 1 message, and finally call readAll()
to get the server’s response.
const start = async (addr) => {
// ...
const mutualStreamHelloCall = await client.mutualStreamHello(meta)
mutualStreamHelloCall.writeAll([
{ message: 'Hello!' },
{ message: 'How are you?' },
{ message: 'other thing x' }
])
mutualStreamHelloCall.write({ message: 'maybe' })
const mutualReadAllResult = mutualStreamHelloCall.readAll()
for await (const data of mutualReadAllResult) {
if (data.message === 'delay 1s') {
mutualStreamHelloCall.write({ message: 'ok, I known you delay 1s' })
mutualStreamHelloCall.writeEnd()
}
console.log(data)
}
const mutualReadEndResult = await mutualStreamHelloCall.readEnd()
console.log(mutualReadEndResult)
// ..
}
The result of running this code is as follows. We see that we first send the messages to the server after the client finishes sending the message, call readAll()
to get the asyncIterator
, and use for await
to get the server’s response to each stream message
. Finally, call readEnd()
to get the { status, metadata }
from the server.
// client
{ message: 'emmm...' }
{ message: 'Hello too.' }
{ message: "I'm fine, thank you" }
{ message: 'pardon?' }
{ message: 'pardon?' }
{ message: 'delay 1s' }
{
metadata: Metadata {
internalRepr: Map(8) {
'content-type' => [Array],
'x-cache-control' => [Array],
'x-business-id' => [Array],
'x-timestamp-client' => [Array],
'x-client-hostname' => [Array],
'user-agent' => [Array],
'x-timestamp-server' => [Array],
'date' => [Array]
},
options: {}
},
status: {
code: 0,
details: 'OK',
metadata: Metadata { internalRepr: Map(0) {}, options: {} }
}
}
// server
Hello!
How are you?
other thing x
maybe
client call end.
Server
Here, we sequentially implement the server’s interaction functions for stream
. Below is our server startup function. We only need to use the new capabilities provided by gRPCity
to implement the methods of the Stream
class.
import loader from "./loader.js"
const start = async (addr) => {
await loader.init()
const server = await loader.initServer()
server.add('stream.Hellor', new Stream())
await server.listen(addr)
console.log('start:', addr)
}
start('localhost:9097')
Client-Stream
For client-stream, the server simply reads normally, processes the data, and returns the result after completion.
The interface for the server to read stream information is call.readAll()
, which returns an asyncIterator
object, and we need to use for await
to read the stream messages one by one.
class Stream {
// ...
async clientStreamHello (call) {
const metadata = call.metadata.clone()
metadata.add('x-timestamp-server', 'received=' + new Date().toISOString())
call.sendMetadata(metadata)
for await (const data of call.readAll()) {
console.log(data)
}
return { message: "Hello! I'm fine, thank you!" }
}
// ...
}
Server-Stream
For point-to-stream, the server receives the client’s request, then continuously sends stream information to the client until the function is completed.
call.write()
, call.writeAll()
, and call.end()
provide the functionality for the server to complete sending stream information.
call.write()
: Sends a message to the client;call.writeAll()
: Sends multiple messages to the client;call.end()
: Indicates that the server has finished sending information and notifies the client;
class Stream {
// ...
async serverStreamHello (call) {
const metadata = call.metadata.clone()
metadata.add('x-timestamp-server', 'received=' + new Date().toISOString())
call.sendMetadata(metadata)
console.log(call.request.message)
call.write({ message: 'Hello! I got you message.' })
call.write({ message: "I'm fine, thank you" })
call.writeAll([
{ message: 'other thing x' },
{ message: 'other thing y' }
])
call.end()
}
// ...
}
Duplex-Stream
For duplex-stream, both the client and the server use stream communication.
call.write()
, call.writeAll()
, call.readAll()
, and call.end()
provide the functionality for the server to complete sending stream information.
call.write()
: Sends a message to the client;call.writeAll()
: Sends multiple messages to the client;call.readAll()
: Returns anasyncIterator
object, and we need to usefor await
to read the messages returned by the client;call.end()
: Indicates that the server has finished sending information and notifies the client;
class Stream {
// ...
async mutualStreamHello (call) {
const metadata = call.metadata.clone()
metadata.add('x-timestamp-server', 'received=' + new Date().toISOString())
call.sendMetadata(metadata)
call.write({ message: 'emmm...' })
for await (const data of call.readAll()) {
console.log(data.message)
if (data.message === 'Hello!') {
call.write({ message: 'Hello too.' })
} else if (data.message === 'How are you?') {
call.write({ message: 'I\'m fine, thank you' })
await timeout(1000)
call.write({ message: 'delay 1s' })
call.writeAll([
{ message: 'emm... ' },
{ message: 'emm......' }
])
} else {
call.write({ message: 'pardon?' })
}
}
call.end()
}
// ...
}
Note:
async stream
supports middleware;call proxy
’sreadAll()
,writeAll()
, andwriteEnd()
need to beasync func
;- Pay attention to whether it is client-stream, server-stream, or duplex-stream.