Skip to content
Documentation
Stream

Stream

The gRPCity library provides complete stream processing capabilities, allowing you to implement business scenarios such as file uploads, event mechanisms, task monitoring, metric reporting and etc. Currently, there are three types of streaming RPCs: client-stream to server, client to server-stream, and client-stream to server-stream.

Here, we’ll demonstrate how to work with streams using callbacks and event mechanisms. We will use the proxy.call methods of the client.

If you need to learn about working with async streams, you can refer to the Async Stream guide.

Prerequisites

In our proto file, the RPCs in the service must be clearly marked as using stream. Here is an example:

stream.proto
syntax="proto3";
 
package stream;
 
service Hellor {
  rpc ClientStreamHello (stream Message) returns (Message) {}
  rpc ServerStreamHello (Message) returns (stream Message) {}
  rpc MutualStreamHello (stream Message) returns (stream Message) {}
}
 
message Message {
  string message = 1;
}
  • ClientStreamHello: Indicates that only the client uses Stream.
  • ServerStreamHello: Indicates that only the server uses Stream.
  • MutualStreamHello: Indicates that both the client and the server use Stream.

Write loader.js to complete proto loading:

./loader.js
import { ProtoLoader } from 'grpcity'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
 
// __dirname for esm
const __dirname = path.dirname(fileURLToPath(import.meta.url))
 
export default new ProtoLoader({
    location: path.resolve(__dirname, './'),
    files: ['stream.proto']
})

Server

Write the Stream class, which provides three server streaming functions: clientStreamHello(), serverStreamHello(), and mutualStreamHello().

./server.js
class Stream {
  constructor () {
    this.count = 0
  }
 
  // client stream to server
  clientStreamHello (call, callback) {
    call.on('data', (chunk) => {
      console.log(chunk.message)
    })
    call.on('end', (chunk) => {
      callback(null, { message: "Hello! I'm fine, thank you!" })
    })
  }
 
  // client to server stream
  serverStreamHello (call) {
    console.log(call.request.message)
    call.write({ message: 'Hello! I got your message.' })
    call.write({ message: "I'm fine, thank you" })
    call.end()
  }
 
  // client stream to server stream
  mutualStreamHello (call) {
    call.on('data', (chunk) => {
      console.log(chunk.message)
      if (chunk.message === 'Hello!') {
        call.write({ message: 'Hello!' })
      } else if (chunk.message === 'How are you?') {
        call.write({ message: "I'm fine, thank you" })
      } else {
        call.write({ message: 'Pardon?' })
      }
    })
    call.on('end', (chunk) => {
      call.end()
    })
  }
}

Continue with service initialization and startup:

./server.js
async function start (addr) {
  await loader.init()
 
  const server = await loader.initServer()
  server.add('stream.Hellor', new Stream())
 
  await server.listen(addr)
  console.log('start:', addr)
}
 
start('localhost:9097')

Client

To call the server using the Stream client, you need to use the functions inside .call.. Here’s an example:

./client.js
async function start (addr) {
    await loader.init()
 
    const clients = await loader.initClients({
        services: {
            'stream.Hellor': addr
        }
    })
 
    const client = clients.get('stream.Hellor')
 
    const meta = loader.makeMetadata({
        'x-cache-control': 'max-age=100',
        'x-business-id': ['grpcity', 'testing'],
        'x-timestamp-client': 'begin=' + new Date().toISOString()
    })
 
    // client stream to server
    const clientStreamHelloCall = client.call.clientStreamHello(meta, (err, response) => {
        if (err) {
            console.log(err)
        } else {
            console.log(response)
        }
    })
    clientStreamHelloCall.write({ message: 'Hello!' })
    clientStreamHelloCall.write({ message: 'How are you?' })
    clientStreamHelloCall.end()
 
    // client to server stream
    const serverStreamHelloCall = client.call.serverStreamHello({ message: 'Hello! How are you?' })
    serverStreamHelloCall.on('data', (chunk) => {
        console.log(chunk.message)
    })
    serverStreamHelloCall.on('end', () => {
        console.log('Server call end.')
    })
 
    // client stream to server stream
    const mutualStreamHelloCall = client.call.mutualStreamHello()
    mutualStreamHelloCall.on('data', (chunk) => {
        console.log(chunk.message)
    })
    mutualStreamHelloCall.on('end', () => {
        console.log('Server call end.')
    })
    mutualStreamHelloCall.write({ message: 'Hello!' })
    mutualStreamHelloCall.write({ message: 'How are you?' })
    mutualStreamHelloCall.end()
}
 
start('localhost:9097')

Debug

  • client stream to server: We only execute the client’s clientStreamHello() function, and the result is as follows:

Server:

Terminal
Hello!
How are you?

Client:

Terminal
{ message: "Hello! I'm fine, thank you!" }
  • client to server stream: We only execute the client’s serverStreamHello(), and the result is as follows:

Server:

Terminal
Hello! How are you?

Client:

Terminal
Hello! I got your message.
I'm fine, thank you
Server call end.
  • client stream to server stream: We only execute the client’s mutualStreamHello(), and the result is as follows:

Server:

Terminal
Hello!
How are you?

Client:

Terminal
Hello!
I'm fine, thank you
Server call end.