Stream
The gRPCity
library provides complete stream processing capabilities, allowing you to implement business scenarios such as file uploads, event mechanisms, task monitoring, metric reporting and etc.
Currently, there are three types of streaming RPCs: client-stream to server
, client to server-stream
, and client-stream to server-stream
.
Here, we’ll demonstrate how to work with streams using callbacks and event mechanisms. We will use the proxy.call
methods of the client.
If you need to learn about working with async streams
, you can refer to the Async Stream guide.
Prerequisites
In our proto file, the RPCs in the service
must be clearly marked as using stream
. Here is an example:
syntax="proto3";
package stream;
service Hellor {
rpc ClientStreamHello (stream Message) returns (Message) {}
rpc ServerStreamHello (Message) returns (stream Message) {}
rpc MutualStreamHello (stream Message) returns (stream Message) {}
}
message Message {
string message = 1;
}
ClientStreamHello
: Indicates that only the client uses Stream.ServerStreamHello
: Indicates that only the server uses Stream.MutualStreamHello
: Indicates that both the client and the server use Stream.
Write loader.js
to complete proto loading:
import { ProtoLoader } from 'grpcity'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
// __dirname for esm
const __dirname = path.dirname(fileURLToPath(import.meta.url))
export default new ProtoLoader({
location: path.resolve(__dirname, './'),
files: ['stream.proto']
})
Server
Write the Stream
class, which provides three server streaming functions: clientStreamHello()
, serverStreamHello()
, and mutualStreamHello()
.
class Stream {
constructor () {
this.count = 0
}
// client stream to server
clientStreamHello (call, callback) {
call.on('data', (chunk) => {
console.log(chunk.message)
})
call.on('end', (chunk) => {
callback(null, { message: "Hello! I'm fine, thank you!" })
})
}
// client to server stream
serverStreamHello (call) {
console.log(call.request.message)
call.write({ message: 'Hello! I got your message.' })
call.write({ message: "I'm fine, thank you" })
call.end()
}
// client stream to server stream
mutualStreamHello (call) {
call.on('data', (chunk) => {
console.log(chunk.message)
if (chunk.message === 'Hello!') {
call.write({ message: 'Hello!' })
} else if (chunk.message === 'How are you?') {
call.write({ message: "I'm fine, thank you" })
} else {
call.write({ message: 'Pardon?' })
}
})
call.on('end', (chunk) => {
call.end()
})
}
}
Continue with service initialization and startup:
async function start (addr) {
await loader.init()
const server = await loader.initServer()
server.add('stream.Hellor', new Stream())
await server.listen(addr)
console.log('start:', addr)
}
start('localhost:9097')
Client
To call the server using the Stream
client, you need to use the functions inside .call.
. Here’s an example:
async function start (addr) {
await loader.init()
const clients = await loader.initClients({
services: {
'stream.Hellor': addr
}
})
const client = clients.get('stream.Hellor')
const meta = loader.makeMetadata({
'x-cache-control': 'max-age=100',
'x-business-id': ['grpcity', 'testing'],
'x-timestamp-client': 'begin=' + new Date().toISOString()
})
// client stream to server
const clientStreamHelloCall = client.call.clientStreamHello(meta, (err, response) => {
if (err) {
console.log(err)
} else {
console.log(response)
}
})
clientStreamHelloCall.write({ message: 'Hello!' })
clientStreamHelloCall.write({ message: 'How are you?' })
clientStreamHelloCall.end()
// client to server stream
const serverStreamHelloCall = client.call.serverStreamHello({ message: 'Hello! How are you?' })
serverStreamHelloCall.on('data', (chunk) => {
console.log(chunk.message)
})
serverStreamHelloCall.on('end', () => {
console.log('Server call end.')
})
// client stream to server stream
const mutualStreamHelloCall = client.call.mutualStreamHello()
mutualStreamHelloCall.on('data', (chunk) => {
console.log(chunk.message)
})
mutualStreamHelloCall.on('end', () => {
console.log('Server call end.')
})
mutualStreamHelloCall.write({ message: 'Hello!' })
mutualStreamHelloCall.write({ message: 'How are you?' })
mutualStreamHelloCall.end()
}
start('localhost:9097')
Debug
client stream to server
: We only execute the client’sclientStreamHello()
function, and the result is as follows:
Server:
Hello!
How are you?
Client:
{ message: "Hello! I'm fine, thank you!" }
client to server stream
: We only execute the client’sserverStreamHello()
, and the result is as follows:
Server:
Hello! How are you?
Client:
Hello! I got your message.
I'm fine, thank you
Server call end.
client stream to server stream
: We only execute the client’smutualStreamHello()
, and the result is as follows:
Server:
Hello!
How are you?
Client:
Hello!
I'm fine, thank you
Server call end.