Olúṣeun

> Image by vectorjuice on Freepik
Event Streaming: Powering Real-Time Communication in Web Systems



Attending a live concert of your favourite artist is an exhilarating experience that can create memories that will last a lifetime. From the opening notes to the final encore, every moment is filled with excitement and anticipation, making it an unforgettable experience. The concert is able to produce such thrills in real-time, through the collaborative efforts of several departments working together and independently. Cues are taken from other departments, triggering other departments to execute an action. Direct communication between them is vital. Your favourite software can replicate these basic concepts to give us the experience you get that makes you come back.

Centralised and Decentralised systems also entail several micro-services all interacting with each other in one way or the other. In this article, we will be exploring how components in the system interact with each other in real-time

Connect

Components in the system, called "Nodes," have at least an identity (which can be an XOR name) and an address (which can be the Socket Address).


 struct Node {
    id: XorName,
    addr: SocketAddr
} 

Nodes can be discovered by their addresses. They can be discovered through their unique addresses and need to maintain a record of other nodes they are connected to. This basically forms the fundamental building blocks for distributed and decentralised systems powering blockchain technologies. To establish connections, nodes can use standard networking protocols such as TCP, UDP, or even the novel QUIC protocol. Once connected, nodes can interact with each other by sending message streams as bytes.

Events Streaming

 enum Events {
    ShowStarts,
    ArtistsIn(String),
    NextSong(String),
    NewRelease(String)
} 

The event's headliners step onto the stage, and instantly the lights come on…

As events (cues) happen and are detected by a node, the node sends messages to relevant node(s) about this event. These messages can be used as triggers. In the context of web systems, a centralised system, this can happen with micro-services as well as components

Discoverability and activation are the only important factors, regardless of the component language it is written in.

Within the system, nodes can act as publishers, subscribers or both. Publishers send messages to nodes subscribed to a particular message type.

 fn sort_message(event: Events) -> String {
    match event {
        Events::ArtistsIn(e) => format!("{e} is now on stage"),
        Events::ShowStarts => "Show starts".to_string(),
        Events::NextSong(e) => format!("Song playing now: {e}"),
        Events::NewRelease(e) => format!("Playing new song released: {e}"),
    }
 } 

There are various platforms available that can offer these services, including open-source platforms. The most notable of these platforms is Kafka, which is pull-based. Other platforms include NATs (a push-based service) and gRPC, among others.

A simple use case. You have a modularised web-based system. It has a back-end server responsible for handling data entry and storing it in a data warehouse. In the context of this, every storage action is an event. Whenever this data storage event occurs, it triggers a call to a mobile app which is a component of the web system.

The server sends a message once it stores the data

 let event = sort_message(event_type);
 nc.publish("december-convert", event)
    .await
    .unwrap();

Data is instantly received by the mobile app component.

 let event = nc.subscribe("december-convert")
    .await
    .unwrap();
 println("{}", event);

 ~ % ShowStarts

Staying Awake

In most cases, your back-end server code runs on the main thread of your program. It is meant to handle incoming requests (GET, POST, etc.) and respond accordingly. This would hinder your node from actively listening for incoming messages.

However, we can improve this process by using background threads. By making our node actively listen for new events, we can run the code on a separate thread, which improves the performance and overall efficiency of the server.

Here's an example of how this can be achieved:


 task::spawn(async move {
    let new_message = nc.subscribe("december-convert").await.unwrap();

    while running.load(Ordering::SeqCst) {
        if let Some(msg) = new_message.try_next() {
            let event = str::from_utf8(&msg.data).unwrap();
            what_just_happened(event)
                .await
                .unwrap();
        }
    }
 }); 

Bammm!

Conclusion

Using message streams can be a game-changer for your projects. It's no wonder that many companies use them in one way or another. Some of the companies that have adopted message streams include CloudFare, Netflix.

Overall, it provides a great user experience which is ultimately the goal. 😀