Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] Dispatching to custom remote functions based on the message type #3670

Open
Bhashinee opened this issue Nov 18, 2022 · 12 comments
Open
Assignees
Labels
module/websocket Points/3 Status/Implemented Implemented proposals Team/PCM Protocol connector packages related issues Type/NewFeature Type/Proposal

Comments

@Bhashinee
Copy link
Member

Bhashinee commented Nov 18, 2022

Summary

Dispatching messages to custom remote functions based on the message type(declared by a field in the received message) with the end goal of generating meaningful Async APIs.

Goals

Generating meaningful AsyncAPIs and improving the readability of the code.

Motivation

With AsyncAPI gaining its's popularity with increased usage of event-driven microservices it is worthwhile to think of ways to generate AsyncAPI specifications using WebSocket service code. The motivation is to improve the service code to be more understandable to retrieve the maximum details to generate meaningful AsyncAPIs and to improve the readability of the code.

Description

In most real-world use cases, the WebSocket protocol will be used along with a sub-protocol. Most of the time those sub-protocols are differentiated from a dedicated field in the message and it contains the type of the message. For example: In Kraken API the type of the message is identified by the event field.

{"event": "ping"}
{"event": "subscribe",  "pair": [    "XBT/USD",    "XBT/EUR"  ],  "subscription": {    "name": "ticker"  }}
{"event": "heartbeat"}

Another example is GraphQL over WebSocket Protocol

The WebSocket sub-protocol for the above specification is: graphql-transport-ws. And the type of the message can be identified by the value of the field named type of the message.

{"type": "ping"}
{"type": "subscribe", "id":"1", "payload":{"query": "{ __schema { types { name } } }"}}

As of now, when using the Ballerina WebSocket service, all these messages are dispatched to the generic onMessage remote function. When the user writes a logic based on the received message, all have to be handled inside the onMessage using an if/else ladder or similar. This reduces the readability of the code.

And also, if we want to generate an AsyncAPI specification by referring to the service code, it is not possible to capture all the details like the response message for a particular type of message.

Ex:

Following is a part of the Kraken AsyncAPI specification describing the types of messages and their responses.

  messages:
    ping:
      summary: Ping server to determine whether connection is alive
      description: Client can ping server to determine whether connection is alive, server responds with pong. This is an application level ping as opposed to default ping in websockets standard which is server initiated
      payload:
        $ref: '#/components/schemas/ping'
      x-response:
        $ref: '#/components/messages/pong'
        
    unsubscribe:
      description: Unsubscribe, can specify a channelID or multiple currency pairs.
      payload:
        $ref: '#/components/schemas/subscribe'
      examples:
        - payload:
            event: unsubscribe
            pair:
              - XBT/EUR
              - XBT/USD
            subscription:
              name: ticker
        - payload:
            event: unsubscribe
            subscription:
              name: ownTrades
              token: WW91ciBhdXRoZW50aWNhdGlvbiB0b2tlbiBnb2VzIGhlcmUu
      x-response:
        $ref: '#/components/messages/subscriptionStatus'    

In the above AsyncAPI specification, it has the messages given as ping and unsubscribe. Their response messages are given by the field x-response.

If this part is written using existing WebSocket service functionalities, it would look like the following.

service class MyService {
    *websocket:Service;

    remote function onMessage(websocket:Caller caller, Ping|UnSubscribe message) returns Pong|SubscriptionStatus {
        if message is Ping {
            return {'type: WS_PONG};
        } else {
            return {'type: WS_UNSUBSCRIBE, id: "5"};
        }
    }
}

Therefore, if we have all the messages dispatched to a single onMessage remote function, it is difficult to differentiate the response for ping message and the response message for unsubscribe operation.

As a solution for this, the idea is to have custom remote functions based on the message type within the WebSocket service. For example, if the message is {"type": "ping"} it will get dispatched to onPing remote function. Similarly,

Message Remote function
{"event": "ping"} onPing
{"event": "subscribe", "pair": [ "XBT/USD", "XBT/EUR" ], "subscription": { "name": "ticker" }} onSubscribe
{"event": "heartbeat"} onHeartbeat

Dispatching rules

  1. The user can configure the field name(key) to identify the messages and the allowed values as message types.

The dispatcher is used to identify the event type of the incoming message by its value. The default value is 'type.

Ex:
incoming message = {"event": "ping"}
dispatcherKey = "event"
event/message type = "ping"
dispatching to remote function = "onPing"

@websocket:ServiceConfig {
    dispatcherKey: "event"
}
service / on new websocket:Listener(9090) {}
  1. Naming of the remote function.
  • If there are spaces and underscores between message types, those will be removed and made camel case("un subscribe" -> "onUnSubscribe").
  • The 'on' word is added as the predecessor and the remote function name is in the camel case("ping" -> "onPing").
  1. If an unmatching message type receives where a matching remote function is not implemented in the WebSocket service, it gets dispatched to the default onMessage remote function if it is implemented. Or else it will get ignored.

An example code for Kraken API with the proposed changes.

@websocket:ServiceConfig {
    dispatcherKey: "'type"
}
service / on new websocket:Listener(9090) {
    resource function get .() returns websocket:Service {
        return new MyService();
    }
}

service class MyService {
    *websocket:Service;

    remote function onPing(Ping message) returns Pong {
        return {'type: WS_PONG};
    }

    remote function onSubscribe(Subscribe message) returns SubscriptionStatus {
        return {id: "4", 'type: WS_SUBSCRIBE};
    }

    remote function onHeartbeat(Hearbeat message) {
        io:println(message);
    }
}
@Bhashinee Bhashinee self-assigned this Nov 18, 2022
@Bhashinee Bhashinee added the Team/PCM Protocol connector packages related issues label Nov 18, 2022
@Bhashinee Bhashinee moved this from Todo to In Progress in Ballerina Team Main Board Nov 22, 2022
@Bhashinee Bhashinee moved this to In Progress in Ballerina Team Main Board Nov 22, 2022
@Bhashinee Bhashinee moved this from In Progress to BackLog in Ballerina Team Main Board Nov 23, 2022
@Bhashinee Bhashinee moved this from BackLog to In Progress in Ballerina Team Main Board Jan 9, 2023
@Bhashinee Bhashinee added the Status/Active Proposals that are under review label Jan 13, 2023
@shafreenAnfar
Copy link
Contributor

We can get it to work even without allowedValues right.

Lets make key's default value type.

@shafreenAnfar
Copy link
Contributor

If we generate a client using the generated AsyncAPI how would it look like for the below?

@websocket:ServiceConfig {
    descriminator: {
        key: "event",
        allowedValues: ["ping", "heartbeat", "subscribe"]
    }
}
service / on new websocket:Listener(9090) {
    resource function get .() returns websocket:Service {
        return new MyService();
    }
}

service class MyService {
    *websocket:Service;

    remote function onPing(Ping message) returns Pong {
        return {'type: WS_PONG};
    }

    remote function onSubscribe(Subscribe message) returns SubscriptionStatus {
        return {id: "4", 'type: WS_SUBSCRIBE};
    }

    remote function onHeartbeat(Hearbeat message) {
        io:println(message);
    }
}

@shafreenAnfar
Copy link
Contributor

I prefer dispatcher over discriminator and if allowedValues are not needed, then maybe we can use dispatcherKey.

@Bhashinee
Copy link
Member Author

We can get it to work even without allowedValues right.

Lets make key's default value type.

Updated the proposal by removing the discriminator record and adding the dispatcher as suggested.

@Bhashinee
Copy link
Member Author

Bhashinee commented Jan 19, 2023

If we generate a client using the generated AsyncAPI how would it look like for the below?

@websocket:ServiceConfig {
    descriminator: {
        key: "event",
        allowedValues: ["ping", "heartbeat", "subscribe"]
    }
}
service / on new websocket:Listener(9090) {
    resource function get .() returns websocket:Service {
        return new MyService();
    }
}

service class MyService {
    *websocket:Service;

    remote function onPing(Ping message) returns Pong {
        return {'type: WS_PONG};
    }

    remote function onSubscribe(Subscribe message) returns SubscriptionStatus {
        return {id: "4", 'type: WS_SUBSCRIBE};
    }

    remote function onHeartbeat(Hearbeat message) {
        io:println(message);
    }
}

How about a client code generated as follows?

public function main() {
    // Initiate an asyncapi WebSocket client by passing the server to connect. Individual clients will be generated for each `channel`.
    // This will connect to the given server (this client is generated for the `root` channel) and subscribe to that.
    // Name of the client will be `AsyncAPI title` + `<channel>` + `Client` ignoring the spaces, underscores etc. 
    // Note:- We might need to discuss and come up with a proper naming convention.
    KrakenWebSocketApiRootClient rootClient = new(KrakenWebSocketApiClient.Server_Public);
   
    // Publish messages to the server. The message types are retrieved from the response types of the `publish` 
    Ping pingMessage = {'type: PING});
    check rootClient->ping(pingMessage);

    Subscribe subscribeMessage = {'type: SUBSCRIBE, "pair": ["XBT/USD", "XBT/EUR"], "subscription": {"name": "ticker"}};
    check rootClient->subscribe(subscribeMessage);

    UnSubscribe unSubscribeMessage = {'type: UNSUBSCRIBE, "pair": ["XBT/USD", "XBT/EUR"]};
    check rootClient->unsubscribe(unSubscribeMessage);

    // Then the client can listen to the server publishing messages. The message types are retrieved from the response types 
    // `subscribe` operation.
    // Can be done in a separate strand in a loop.
    Pong|Heartbeat|SystemStatus|SubscriptionStatus message = check rootClient->listen();
}

@shafreenAnfar
Copy link
Contributor

shafreenAnfar commented Jan 20, 2023

Looks good. Except we don't need listen() function. I was thinking of something like the below.

public function main() {
    // Initiate an asyncapi WebSocket client by passing the server to connect. Individual clients will be generated for each `channel`.
    // This will connect to the given server (this client is generated for the `root` channel) and subscribe to that.
    // Name of the client will be `AsyncAPI title` + `<channel>` + `Client` ignoring the spaces, underscores etc. 
    // Note:- We might need to discuss and come up with a proper naming convention.
    KrakenWebSocketApiRootClient rootClient = new(KrakenWebSocketApiClient.Server_Public);
   
    worker A {
       // Publish messages to the server. The message types are retrieved from the response types of the `publish` 
       while true {
          Ping pingMessage = {'type: PING});
          Pong pongMessage =  rootClient->ping(pingMessage);
       }
    }

    worker B  {
          Subscribe subscribeMessage = {'type: SUBSCRIBE, "pair": ["XBT/USD", "XBT/EUR"], "subscription": {"name": "ticker"}};
          stream<SubscriptionStatus> statusStream = check rootClient->subscribe(subscribeMessage);
          // loop stream
    }

    UnSubscribe unSubscribeMessage = {'type: UNSUBSCRIBE, "pair": ["XBT/USD", "XBT/EUR"]};
    check rootClient->unsubscribe(unSubscribeMessage);
}

@shafreenAnfar
Copy link
Contributor

shafreenAnfar commented Jun 8, 2023

Reopened the proposal as it does not discuss about error handling of each custom remote method.

At the moment we have onError for onMessage, similarly I think we need onXXXError for each custom remote method. For example see the below code.

@websocket:ServiceConfig {
    dispatcherKey: "event"
}
service / on new websocket:Listener(9090) {
    resource function get .() returns websocket:Service {
        return new MyService();
    }
}

service class MyService {
    *websocket:Service;

    remote function onPing(Ping message) returns Pong {
        return {'type: WS_PONG};
    }

    remote function onPingError(error err) {
        return io:println(message);
    }

    remote function onSubscribe(Subscribe message) returns SubscriptionStatus {
        return {id: "4", 'type: WS_SUBSCRIBE};
    }

    remote function onSubscribeError(error err) returns ErrorMessage {
        return {id: "4", 'type: WS_SUBSCRIBE};
    }

    remote function onHeartbeat(Hearbeat message) {
        io:println(message);
    }

    remote function onHeartbeatError(error err) {
        io:println(message);
    }
}

@shafreenAnfar
Copy link
Contributor

In the case of there is no matching custom onXXXError method it should dispatch to onError. Just like in switch/case/default, which is the same we used for onMessage

@shafreenAnfar
Copy link
Contributor

In addition to dispatchingKey, dispatchingStreamId is needed to allow multiplexing. If the same message is returned from two operations without the streamId complier plugin should capture it and give an error. Unlike dispatchingKey, dispatchingStreamId is only mandatory conditionally.

@shafreenAnfar
Copy link
Contributor

shafreenAnfar commented Jul 1, 2023

dispatchingStreamId is always generated on the client side. Server side only does the correlation.

@shafreenAnfar
Copy link
Contributor

It seems there is a shortcoming in this proposal.

It basically can't do something like the below.

remote function onMessage(websocket:Caller caller, string chatMessage) returns error? {
    return caller->close(4408, "Connection initialisation timeout");
}

In a custom remote function as the below

remote function onConnectionInit(string chatMessage) returns ConnectionAck|error {
    ConnectionAck connAck = { event: "connection_ack", payload: "{}" };
    return connAck;
}

Ideally we should be able to create subtype of error and return it.

@shafreenAnfar
Copy link
Contributor

Additionally, I believe, we need something like the below to handle timeouts.

remote function onConnectionInit(string chatMessage) returns ConnectionAck|error {
    ConnectionAck connAck = { event: "connection_ack", payload: "{}" };
    return connAck;
}
remote function onConnectionInitIdleTimeout(string chatMessage) returns ConnectionAck|error {
    ConnectionAck connAck = { event: "connection_ack", payload: "{}" };
    return connAck;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
module/websocket Points/3 Status/Implemented Implemented proposals Team/PCM Protocol connector packages related issues Type/NewFeature Type/Proposal
Projects
Archived in project
Development

No branches or pull requests

3 participants