Skip to Content
🎉 gRPCity 3.0 is released. Read more →

Stream

gRPC streams unlock cases the unary form can’t cover well: file uploads, event feeds, progress reporting, metric streams. gRPCity supports all three flavours — client stream, server stream, and bidi — and exposes them in two API styles.

This page covers the callback / event style accessed via proxy.call. For the more idiomatic for await form, see the Async Stream guide.

Prerequisites

In our proto file, the RPCs in the service must be clearly marked as using stream. Here is an example:

stream.proto
syntax="proto3"; package stream; service Hellor { rpc ClientStreamHello (stream Message) returns (Message) {} rpc ServerStreamHello (Message) returns (stream Message) {} rpc MutualStreamHello (stream Message) returns (stream Message) {} } message Message { string message = 1; }
  • ClientStreamHello: Indicates that only the client uses Stream.
  • ServerStreamHello: Indicates that only the server uses Stream.
  • MutualStreamHello: Indicates that both the client and the server use Stream.

Write loader.js to complete proto loading:

./loader.js
import { ProtoLoader } from 'grpcity' import path from 'node:path' import { fileURLToPath } from 'node:url' // __dirname for esm const __dirname = path.dirname(fileURLToPath(import.meta.url)) export default new ProtoLoader({ location: path.resolve(__dirname, './'), files: ['stream.proto'] })

Server

Write the Stream class, which provides three server streaming functions: clientStreamHello(), serverStreamHello(), and mutualStreamHello().

./server.js
class Stream { constructor () { this.count = 0 } // client stream to server clientStreamHello (call, callback) { call.on('data', (chunk) => { console.log(chunk.message) }) call.on('end', (chunk) => { callback(null, { message: "Hello! I'm fine, thank you!" }) }) } // client to server stream serverStreamHello (call) { console.log(call.request.message) call.write({ message: 'Hello! I got your message.' }) call.write({ message: "I'm fine, thank you" }) call.end() } // client stream to server stream mutualStreamHello (call) { call.on('data', (chunk) => { console.log(chunk.message) if (chunk.message === 'Hello!') { call.write({ message: 'Hello!' }) } else if (chunk.message === 'How are you?') { call.write({ message: "I'm fine, thank you" }) } else { call.write({ message: 'Pardon?' }) } }) call.on('end', (chunk) => { call.end() }) } }

Continue with service initialization and startup:

./server.js
async function start (addr) { await loader.init() const server = await loader.initServer() server.add('stream.Hellor', new Stream()) await server.listen(addr) console.log('start:', addr) } start('localhost:9097')

Client

To call the server using the Stream client, you need to use the functions inside .call.. Here’s an example:

./client.js
async function start (addr) { await loader.init() const clients = await loader.initClients({ services: { 'stream.Hellor': addr } }) const client = clients.get('stream.Hellor') const meta = loader.makeMetadata({ 'x-cache-control': 'max-age=100', 'x-business-id': ['grpcity', 'testing'], 'x-timestamp-client': 'begin=' + new Date().toISOString() }) // client stream to server const clientStreamHelloCall = client.call.clientStreamHello(meta, (err, response) => { if (err) { console.log(err) } else { console.log(response) } }) clientStreamHelloCall.write({ message: 'Hello!' }) clientStreamHelloCall.write({ message: 'How are you?' }) clientStreamHelloCall.end() // client to server stream const serverStreamHelloCall = client.call.serverStreamHello({ message: 'Hello! How are you?' }) serverStreamHelloCall.on('data', (chunk) => { console.log(chunk.message) }) serverStreamHelloCall.on('end', () => { console.log('Server call end.') }) // client stream to server stream const mutualStreamHelloCall = client.call.mutualStreamHello() mutualStreamHelloCall.on('data', (chunk) => { console.log(chunk.message) }) mutualStreamHelloCall.on('end', () => { console.log('Server call end.') }) mutualStreamHelloCall.write({ message: 'Hello!' }) mutualStreamHelloCall.write({ message: 'How are you?' }) mutualStreamHelloCall.end() } start('localhost:9097')

Debug

  • client stream to server: We only execute the client’s clientStreamHello() function, and the result is as follows:

Server:

Terminal
Hello! How are you?

Client:

Terminal
{ message: "Hello! I'm fine, thank you!" }
  • client to server stream: We only execute the client’s serverStreamHello(), and the result is as follows:

Server:

Terminal
Hello! How are you?

Client:

Terminal
Hello! I got your message. I'm fine, thank you Server call end.
  • client stream to server stream: We only execute the client’s mutualStreamHello(), and the result is as follows:

Server:

Terminal
Hello! How are you?

Client:

Terminal
Hello! I'm fine, thank you Server call end.
Last updated on