Stream
gRPC streams unlock cases the unary form can’t cover well: file uploads, event feeds, progress reporting, metric streams. gRPCity supports all three flavours — client stream, server stream, and bidi — and exposes them in two API styles.
This page covers the callback / event style accessed via proxy.call. For the more idiomatic for await form, see the Async Stream guide.
Prerequisites
In our proto file, the RPCs in the service must be clearly marked as using stream. Here is an example:
syntax="proto3";
package stream;
service Hellor {
rpc ClientStreamHello (stream Message) returns (Message) {}
rpc ServerStreamHello (Message) returns (stream Message) {}
rpc MutualStreamHello (stream Message) returns (stream Message) {}
}
message Message {
string message = 1;
}ClientStreamHello: Indicates that only the client uses Stream.ServerStreamHello: Indicates that only the server uses Stream.MutualStreamHello: Indicates that both the client and the server use Stream.
Write loader.js to complete proto loading:
import { ProtoLoader } from 'grpcity'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
// __dirname for esm
const __dirname = path.dirname(fileURLToPath(import.meta.url))
export default new ProtoLoader({
location: path.resolve(__dirname, './'),
files: ['stream.proto']
})Server
Write the Stream class, which provides three server streaming functions: clientStreamHello(), serverStreamHello(), and mutualStreamHello().
class Stream {
constructor () {
this.count = 0
}
// client stream to server
clientStreamHello (call, callback) {
call.on('data', (chunk) => {
console.log(chunk.message)
})
call.on('end', (chunk) => {
callback(null, { message: "Hello! I'm fine, thank you!" })
})
}
// client to server stream
serverStreamHello (call) {
console.log(call.request.message)
call.write({ message: 'Hello! I got your message.' })
call.write({ message: "I'm fine, thank you" })
call.end()
}
// client stream to server stream
mutualStreamHello (call) {
call.on('data', (chunk) => {
console.log(chunk.message)
if (chunk.message === 'Hello!') {
call.write({ message: 'Hello!' })
} else if (chunk.message === 'How are you?') {
call.write({ message: "I'm fine, thank you" })
} else {
call.write({ message: 'Pardon?' })
}
})
call.on('end', (chunk) => {
call.end()
})
}
}Continue with service initialization and startup:
async function start (addr) {
await loader.init()
const server = await loader.initServer()
server.add('stream.Hellor', new Stream())
await server.listen(addr)
console.log('start:', addr)
}
start('localhost:9097')Client
To call the server using the Stream client, you need to use the functions inside .call.. Here’s an example:
async function start (addr) {
await loader.init()
const clients = await loader.initClients({
services: {
'stream.Hellor': addr
}
})
const client = clients.get('stream.Hellor')
const meta = loader.makeMetadata({
'x-cache-control': 'max-age=100',
'x-business-id': ['grpcity', 'testing'],
'x-timestamp-client': 'begin=' + new Date().toISOString()
})
// client stream to server
const clientStreamHelloCall = client.call.clientStreamHello(meta, (err, response) => {
if (err) {
console.log(err)
} else {
console.log(response)
}
})
clientStreamHelloCall.write({ message: 'Hello!' })
clientStreamHelloCall.write({ message: 'How are you?' })
clientStreamHelloCall.end()
// client to server stream
const serverStreamHelloCall = client.call.serverStreamHello({ message: 'Hello! How are you?' })
serverStreamHelloCall.on('data', (chunk) => {
console.log(chunk.message)
})
serverStreamHelloCall.on('end', () => {
console.log('Server call end.')
})
// client stream to server stream
const mutualStreamHelloCall = client.call.mutualStreamHello()
mutualStreamHelloCall.on('data', (chunk) => {
console.log(chunk.message)
})
mutualStreamHelloCall.on('end', () => {
console.log('Server call end.')
})
mutualStreamHelloCall.write({ message: 'Hello!' })
mutualStreamHelloCall.write({ message: 'How are you?' })
mutualStreamHelloCall.end()
}
start('localhost:9097')Debug
client stream to server: We only execute the client’sclientStreamHello()function, and the result is as follows:
Server:
Hello!
How are you?Client:
{ message: "Hello! I'm fine, thank you!" }client to server stream: We only execute the client’sserverStreamHello(), and the result is as follows:
Server:
Hello! How are you?Client:
Hello! I got your message.
I'm fine, thank you
Server call end.client stream to server stream: We only execute the client’smutualStreamHello(), and the result is as follows:
Server:
Hello!
How are you?Client:
Hello!
I'm fine, thank you
Server call end.