import ballerina/io;
import ballerina/grpc;
import ballerina/log;
endpoint grpc:Service ep {
host:"localhost",
port:9090
};@grpc:serviceConfig {rpcEndpoint:"chat",
clientStreaming:true,
serverStreaming:true,
generateClientConnector:false}
service<grpc:Listener> Chat bind ep {
map<grpc:Service> consMap = {"_":new}; onOpen (endpoint client) {
var connID = client -> getID();
consMap[<string> connID] = client;
} onMessage (endpoint client, ChatMessage chatMsg) {
endpoint grpc:Service con;
string msg = string `{{chatMsg.name}}: {{chatMsg.message}}`;
io:println(msg);
string[] conKeys = consMap.keys();
int len = lengthof conKeys;
int i = 1;
while (i < len) {
con = <grpc:Service>consMap[conKeys[i]];
grpc:ConnectorError err = con -> send(msg);
if (err != ()) {
io:println("Error at onMessage : " + err.message);
}
i = i + 1;
}
} onError (endpoint client, grpc:ServerError err) {
if (err != ()) {
io:println("Something unexpected happens at server : " + err.message);
}
} onComplete (endpoint client) {
endpoint grpc:Service con;
var connID = client -> getID();
string msg = string `{{connID}} left the chat`;
io:println(msg);
var v = consMap.remove(<string>connID);
string[] conKeys = consMap.keys();
int len = lengthof conKeys;
int i = 1;
while (i < len) {
con = <grpc:Service>consMap[conKeys[i]];
grpc:ConnectorError err = con -> send(msg);
if (err != ()) {
io:println("Error at onComplete send message : " + err.message);
}
i = i + 1;
}
}
}type ChatMessage {
string name;
string message;
};
gRPC Bidirectional StreamingThe gRPC Server Connector can be used to expose service gRPC service over http2. This sample is to demonstrate gRPC client streaming service interacting with gRPC non-blocking client. |
|
import ballerina/io;
import ballerina/grpc;
import ballerina/log;
|
This is server implementation for bidirectional streaming scenario |
endpoint grpc:Service ep {
host:"localhost",
port:9090
};
|
Server endpoint configuration |
@grpc:serviceConfig {rpcEndpoint:"chat",
clientStreaming:true,
serverStreaming:true,
generateClientConnector:false}
service<grpc:Listener> Chat bind ep {
|
|
map<grpc:Service> consMap = {"_":new};
|
TODO: remove temp initialization when map issue is fixed. |
onOpen (endpoint client) {
var connID = client -> getID();
consMap[<string> connID] = client;
}
|
|
onMessage (endpoint client, ChatMessage chatMsg) {
endpoint grpc:Service con;
string msg = string `{{chatMsg.name}}: {{chatMsg.message}}`;
io:println(msg);
string[] conKeys = consMap.keys();
int len = lengthof conKeys;
|
|
int i = 1;
while (i < len) {
con = <grpc:Service>consMap[conKeys[i]];
grpc:ConnectorError err = con -> send(msg);
if (err != ()) {
io:println("Error at onMessage : " + err.message);
}
i = i + 1;
}
}
|
TODO: set i to zero when map issue is fixed. |
onError (endpoint client, grpc:ServerError err) {
if (err != ()) {
io:println("Something unexpected happens at server : " + err.message);
}
}
|
|
onComplete (endpoint client) {
endpoint grpc:Service con;
var connID = client -> getID();
string msg = string `{{connID}} left the chat`;
io:println(msg);
var v = consMap.remove(<string>connID);
string[] conKeys = consMap.keys();
int len = lengthof conKeys;
|
|
int i = 1;
while (i < len) {
con = <grpc:Service>consMap[conKeys[i]];
grpc:ConnectorError err = con -> send(msg);
if (err != ()) {
io:println("Error at onComplete send message : " + err.message);
}
i = i + 1;
}
}
}
|
TODO: set i to zero when map issue is fixed. |
type ChatMessage {
string name;
string message;
};
|
|
$ ballerina run grpc-client-streaming.bal
ballerina: initiating service(s) in 'grpc-client-streaming.bal'
|
Run the service |
ballerina run client
|
Run client connector |