First: who am I ? 🧐

Bertrand, software engineer at Hyli

First: who am I ? 🧐

Bertrand, software engineer at Hyli, building a blockchain ⛓️

First: who am I ? 🧐

Bertrand, software engineer at Hyli, building a blockchain as a financial settlement infrastructure 💰

First: who am I ? 🧐

Bertrand, software engineer at Hyli, building a blockchain as a financial settlement infrastructure for European regulated actors. 🇪🇺

First: who am I ? 🧐

Bertrand, software engineer at Hyli, building a blockchain as a financial settlement infrastructure for European regulated actors. 🇪🇺

  • Offchain computation 🖥️

First: who am I ? 🧐

Bertrand, software engineer at Hyli, building a blockchain as a financial settlement infrastructure for European regulated actors. 🇪🇺

  • Offchain computation 🖥️
  • Proved by Zero Knowledge Proofs (ZKP) 🔐

First: who am I ? 🧐

Bertrand, software engineer at Hyli, building a blockchain as a financial settlement infrastructure for European regulated actors. 🇪🇺

  • Offchain computation 🖥️
  • Proved by Zero Knowledge Proofs (ZKP) 🔐
  • Blockchain optimized for ZKP verification ⚡

First: who am I ? 🧐

Bertrand, software engineer at Hyli, building a blockchain as a financial settlement infrastructure for European regulated actors. 🇪🇺

  • Offchain computation 🖥️
  • Proved by Zero Knowledge Proofs (ZKP) 🔐
  • Blockchain optimized for ZKP verification ⚡

Allows to preserve privacy 👻, while being decentralized

First: who am I ? 🧐

Bertrand, software engineer at Hyli, building a blockchain as a financial settlement infrastructure for European regulated actors. 🇪🇺

  • Offchain computation 🖥️
  • Proved by Zero Knowledge Proofs (ZKP) 🔐
  • Blockchain optimized for ZKP verification ⚡

Allows to preserve privacy 👻, while being decentralized

Well,

First: who am I ? 🧐

Bertrand, software engineer at Hyli, building a blockchain as a financial settlement infrastructure for European regulated actors. 🇪🇺

  • Offchain computation 🖥️
  • Proved by Zero Knowledge Proofs (ZKP) 🔐
  • Blockchain optimized for ZKP verification ⚡

Allows to preserve privacy 👻, while being decentralized

Well,

I do rust.

First: who am I ? 🧐

Bertrand, software engineer at Hyli, building a blockchain as a financial settlement infrastructure for European regulated actors. 🇪🇺

  • Offchain computation 🖥️
  • Proved by Zero Knowledge Proofs (ZKP) 🔐
  • Blockchain optimized for ZKP verification ⚡

Allows to preserve privacy 👻, while being decentralized

Well,

I do rust. (Or, lately, code-reviews of claude's generated code)

One bus to message them all: async Rust for modular monoliths

History 👴🏻

Once upon a time, at Hyli, we needed:

One bus to message them all: async Rust for modular monoliths

History 👴🏻

Once upon a time, at Hyli, we needed:


⚙️ Modularity

One bus to message them all: async Rust for modular monoliths

History 👴🏻

Once upon a time, at Hyli, we needed:


⚙️ Modularity

📦 Simple operations

One bus to message them all: async Rust for modular monoliths

History 👴🏻

Once upon a time, at Hyli, we needed:


⚙️ Modularity

📦 Simple operations

🔒 Compile-time safety

One bus to message them all: async Rust for modular monoliths

History 👴🏻

Once upon a time, at Hyli, we needed:


⚙️ Modularity ➡️ Micro-services

📦 Simple operations ➡️ Monolith

🔒 Compile-time safety ➡️ Monolith


❌ Protobuf or similar
One bus to message them all: async Rust for modular monoliths

History 👴🏻

Once upon a time, at Hyli, we needed:


⚙️ Modularity ➡️ Micro-services

📦 Simple operations ➡️ Monolith

🔒 Compile-time safety ➡️ Monolith

1) How we did it? 🤔

Micro-services ➡️ Hyli-bus

1) How we did it? 🤔

Micro-services ➡️ Hyli-bus

Service ➡️ trait Module

1) How we did it? 🤔

Micro-services ➡️ Hyli-bus

Service ➡️ trait Module

Http Layer ➡️ shared bus

1) How we did it? 🤔

Micro-services ➡️ Hyli-bus

Service ➡️ trait Module

Http Layer ➡️ shared bus

One shared handle SharedMessageBus
Arc<Mutex<Map<broadcast::Sender<M>>>>

1) How we did it? 🤔

Micro-services ➡️ Hyli-bus

Service ➡️ trait Module

Http Layer ➡️ shared bus

One shared handle SharedMessageBus
Set of mpmc channels
🗼 tokyo::sync::broadcast

1) How we did it? 🤔

Micro-services ➡️ Hyli-bus

Service ➡️ trait Module

Http Layer ➡️ shared bus

One shared handle SharedMessageBus
Set of mpmc channels
🗼 tokyo::sync::broadcast

Http API ➡️ Contract defined with macro module_bus_client!

1) How we did it? 🤔

Micro-services ➡️ Hyli-bus

Service ➡️ trait Module

Http Layer ➡️ shared bus

One shared handle SharedMessageBus
Set of mpmc channels
🗼 tokyo::sync::broadcast

Http API ➡️ Contract defined with macro module_bus_client!


Let's go for an example ! ⬇️

1) How we did it? 🤔

Example: Mempool contract 📜





module_bus_client! {
    struct MempoolBusClient {






    }
}
1) How we did it? 🤔

Example: Mempool contract 📜

enum ConsensusEvent { ... }



module_bus_client! {
    struct MempoolBusClient {


        receiver(ConsensusEvent),



    }
}
1) How we did it? 🤔

Example: Mempool contract 📜

enum ConsensusEvent { ... }
struct NodeStateEvent { ... }


module_bus_client! {
    struct MempoolBusClient {


        receiver(ConsensusEvent),
        receiver(RestApiMessage),


    }
}
1) How we did it? 🤔

Example: Mempool contract 📜

enum ConsensusEvent { ... }
struct NodeStateEvent { ... }


module_bus_client! {
    struct MempoolBusClient {


        receiver(ConsensusEvent), 
        receiver(RestApiMessage),

        query(QueryNewCut, Cut),
    }
}
1) How we did it? 🤔

Example: Mempool contract 📜

enum ConsensusEvent { ... }
struct NodeStateEvent { ... }
struct MempoolStatusEvent{ ... }

module_bus_client! {
    struct MempoolBusClient {
        sender(MempoolStatusEvent),

        receiver(ConsensusEvent),
        receiver(RestApiMessage),

        query(QueryNewCut, Cut),
    }
}
1) How we did it? 🤔

Example: Consensus contract 📜

module_bus_client! {
    struct ConsensusBusClient {
        sender(ConsensusEvent), 
        sender(Query<QueryNewCut, Cut>),

        receiver(ConsensusNetMessage),

        ...
    }
}
1) How we did it? 🤔

Example: Mempool event loop ➿

async fn run(&mut self) -> Result<()> {                                       
    module_handle_messages! { // basically a wrapper around tokio::select!
        on_self self,







    }
}
1) How we did it? 🤔

Example: Mempool event loop ➿

async fn run(&mut self) -> Result<()> {                                       
    module_handle_messages! { // basically a wrapper around tokio::select!
        on_self self,

        listen<ConsensusEvent> ev => { self.on_consensus(ev).await?; }        





    }
}
1) How we did it? 🤔

Example: Mempool event loop ➿

async fn run(&mut self) -> Result<()> {
    module_handle_messages! { // basically a wrapper around tokio::select!
        on_self self,

        listen<ConsensusEvent> ev => { self.on_consensus(ev).await?; }        
        listen<RestApiMessage> ev => { self.on_api_message(ev)?; }




    }
}
1) How we did it? 🤔

Example: Mempool event loop ➿

async fn run(&mut self) -> Result<()> {
    module_handle_messages! { // basically a wrapper around tokio::select!
        on_self self,

        listen<ConsensusEvent> ev => { self.on_consensus(ev).await?; }       
        listen<RestApiMessage> ev => { self.on_api_message(ev)?; }

        command_response<QueryNewCut, Cut> q => { self.handle_querynewcut(q) }


    }
}
1) How we did it? 🤔

Example: Mempool event loop ➿

async fn run(&mut self) -> Result<()> {
    module_handle_messages! { // basically a wrapper around tokio::select!
        on_self self,

        listen<ConsensusEvent> ev => { self.on_consensus(ev).await?; }
        listen<RestApiMessage> ev => { self.on_api_message(ev)?; }

        command_response<QueryNewCut, Cut> q => { self.handle_querynewcut(q) }

        _ = tick.tick() => { self.on_tick()?; }
    }
}
1) How we did it? 🤔

Example: Consensus event loop ➿

async fn run(&mut self) -> Result<()> {
    module_handle_messages! {
        on_self self,

        listen<ConsensusNetMessage> msg => { self.on_net_message(msg).await?; }

        ...
    }
}
1) How we did it? 🤔

Example: Mempool module 📦

struct Mempool {
    bus: MempoolBusClient,
    transactions: BtreeMap<TxHash, BlobTransaction>
}

impl Module for Mempool {
    type Context = (); // build-time configuration

    async fn build(bus: SharedMessageBus, _ctx: ()) -> anyhow::Result<Self> {
        Ok(Self { bus: MempoolBusClient::new_from_bus(bus).await })
    }

    async fn run(&mut self) -> anyhow::Result<()> {
        // [...]
        Ok(())
    }









}
1) How we did it? 🤔

Example: Mempool module 📦

struct Mempool {
    bus: MempoolBusClient,
    transactions: BtreeMap<TxHash, BlobTransaction>
}

impl Module for Mempool {
    type Context = (); // build-time configuration

    async fn build(bus: SharedMessageBus, _ctx: ()) -> anyhow::Result<Self> {
        Ok(Self { bus: MempoolBusClient::new_from_bus(bus).await })
    }

    async fn run(&mut self) -> anyhow::Result<()> {
        module_handle_messages! {
            on_self self,

            listen<ConsensusEvent> ev => { self.on_consensus(ev).await?; }
            listen<RestApiMessage> ev => { self.on_api_message(ev)?; }

            command_response<QueryNewCut, Cut> q => { self.handle_querynewcut(q) }

            _ = tick.tick() => { self.on_tick()?; }
        }
        Ok(())
    }
}
1) How we did it? 🤔
1) How we did it? 🤔
1) How we did it? 🤔
1) How we did it? 🤔
1) How we did it? 🤔
1) How we did it? 🤔
1) How we did it? 🤔

Benefits so far 💰

Typed request/response between modules

#[derive(Clone)]
pub struct QueryNewCut(LaneId);

#[derive(Clone)]
pub type Cut = Vec<(LaneId, DataProposalHash, LaneBytesSize, PoDA)>;

Compile-time contracts reduce integration bugs 🐛

1) How we did it? 🤔

Benefits so far 💰

Typed request/response between modules

#[derive(Clone)]
pub struct QueryNewCut(LaneId);

#[derive(Clone)]
pub type Cut = Vec<(LaneId, DataProposalHash, LaneBytesSize, PoDA)>;

Compile-time contracts reduce integration bugs 🐛



Part 2: how do we run it ? ⬇️

🚰

2) Day to day usage 🖥️

2.1) Runtime composition 🏃‍➡️

async fn main() {









}

2) Day to day usage 🖥️

2.1) Runtime composition 🏃‍➡️

async fn main() {
    let bus = SharedMessageBus::new();                              








}

2) Day to day usage 🖥️

2.1) Runtime composition 🏃‍➡️

async fn main() {
    let bus = SharedMessageBus::new();
    let mut handler = ModulesHandler::new(&bus, ...);







}

2) Day to day usage 🖥️

2.1) Runtime composition 🏃‍➡️

async fn main() {
    let bus = SharedMessageBus::new();
    let mut handler = ModulesHandler::new(&bus, ...);

    handler.build_module::<RestAPI>(...).await?;
    handler.build_module::<Mempool>(...).await?;
    handler.build_module::<Consensus>(...).await?;
    handler.build_module::<P2P>(...).await?;


}

2) Day to day usage 🖥️

2.1) Runtime composition 🏃‍➡️

async fn main() {
    let bus = SharedMessageBus::new();
    let mut handler = ModulesHandler::new(&bus, ...);

    handler.build_module::<RestAPI>(...).await?;
    handler.build_module::<Mempool>(...).await?;
    handler.build_module::<Consensus>(...).await?;
    handler.build_module::<P2P>(...).await?;

    handler.start_modules().await?;
}
2) Day to day usage 🖥️

2.2) Testing 🧪

2) Day to day usage 🖥️

2.2) Testing 🧪


🐒 Easy mocks

2) Day to day usage 🖥️

2.2) Testing 🧪


🐒 Easy mocks

🧩 Start only the module(s) you need

2) Day to day usage 🖥️

2.2) Testing 🧪


🐒 Easy mocks

🧩 Start only the module(s) you need

💉 Inject typed events directly

2) Day to day usage 🖥️

2.2) Testing 🧪


🐒 Easy mocks

🧩 Start only the module(s) you need

💉 Inject typed events directly

✅ Assert typed outputs deterministically

2) Day to day usage 🖥️
#[tokio::test]
async fn mempool_handle_new_tx() -> Result<()> {




















    Ok(())
}
2) Day to day usage 🖥️
#[tokio::test]
async fn mempool_handle_new_tx() -> Result<()> {
    let bus = SharedMessageBus::new();

    // 🐒 Mock the RestAPI Module
    bus_client! { struct TestClient { sender(RestApiMessage), receiver(MempoolStatusEvent) } }
    let mut client = TestClient::new_from_bus(bus.new_handle()).await;















    Ok(())
}
2) Day to day usage 🖥️
#[tokio::test]
async fn mempool_handle_new_tx() -> Result<()> {
    let bus = SharedMessageBus::new();

    // 🐒 Mock the RestAPI Module
    bus_client! { struct TestClient { sender(RestApiMessage), receiver(MempoolStatusEvent) } }
    let mut client = TestClient::new_from_bus(bus.new_handle()).await;

    // 🧩 Start only the module under test
    let mut mempool = Mempool::build(bus.new_handle(), ctx).await?;
    tokio::spawn(async move { mempool.run().await });











    Ok(())
}
2) Day to day usage 🖥️
#[tokio::test]
async fn mempool_handle_new_tx() -> Result<()> {
    let bus = SharedMessageBus::new();

    // 🐒 Mock the RestAPI Module
    bus_client! { struct TestClient { sender(RestApiMessage), receiver(MempoolStatusEvent) } }
    let mut client = TestClient::new_from_bus(bus.new_handle()).await;

    // 🧩 Start only the module under test
    let mut mempool = Mempool::build(bus.new_handle(), ctx).await?;
    tokio::spawn(async move { mempool.run().await });

    // 💉 Inject a typed event
    client.send(RestApiMessage::NewTx { tx: my_tx() })?;








    Ok(())
}
2) Day to day usage 🖥️
#[tokio::test]
async fn mempool_handle_new_tx() -> Result<()> {
    let bus = SharedMessageBus::new();

    // 🐒 Mock the RestAPI Module
    bus_client! { struct TestClient { sender(RestApiMessage), receiver(MempoolStatusEvent) } }
    let mut client = TestClient::new_from_bus(bus.new_handle()).await;

    // 🧩 Start only the module under test
    let mut mempool = Mempool::build(bus.new_handle(), ctx).await?;
    tokio::spawn(async move { mempool.run().await });

    // 💉 Inject a typed event
    client.send(RestApiMessage::NewTx { tx: my_tx() })?;

    // ✅ Assert typed output deterministically
    handle_messages! {
        on_bus bus,
        listen<MempoolStatusEvent> events => {
            assert_eq!(events.len(), 1);
        }
    }
    Ok(())
}
2) Day to day usage 🖥️
#[tokio::test]
async fn mempool_handle_new_tx() -> Result<()> {
    let bus = SharedMessageBus::new();

















    Ok(())
}
2) Day to day usage 🖥️
#[tokio::test]
async fn mempool_handle_new_tx() -> Result<()> {
    let bus = SharedMessageBus::new();

    // 🐒 Mock the RestAPI Module
    bus_client! { struct TestClient { sender(RestApiMessage) } }
    let mut client = TestClient::new_from_bus(bus.new_handle()).await;













    Ok(())
}
2) Day to day usage 🖥️
#[tokio::test]
async fn mempool_handle_new_tx() -> Result<()> {
    let bus = SharedMessageBus::new();

    // 🐒 Mock the RestAPI Module
    bus_client! { struct TestClient { sender(RestApiMessage) } }
    let mut client = TestClient::new_from_bus(bus.new_handle()).await;

    let mut mempool_events = get_receiver::<MempoolStatusEvent>(&bus).await;











    Ok(())
}
2) Day to day usage 🖥️
#[tokio::test]
async fn mempool_handle_new_tx() -> Result<()> {
    let bus = SharedMessageBus::new();

    // 🐒 Mock the RestAPI Module
    bus_client! { struct TestClient { sender(RestApiMessage) } }
    let mut client = TestClient::new_from_bus(bus.new_handle()).await;

    let mut mempool_events = get_receiver::<MempoolStatusEvent>(&bus).await;

    // 🧩 Start only the module under test
    let mut mempool = Mempool::build(bus.new_handle(), ctx).await?;
    tokio::spawn(async move { mempool.run().await });







    Ok(())
}
2) Day to day usage 🖥️
#[tokio::test]
async fn mempool_handle_new_tx() -> Result<()> {
    let bus = SharedMessageBus::new();

    // 🐒 Mock the RestAPI Module
    bus_client! { struct TestClient { sender(RestApiMessage) } }
    let mut client = TestClient::new_from_bus(bus.new_handle()).await;

    let mut mempool_events = get_receiver::<MempoolStatusEvent>(&bus).await;

    // 🧩 Start only the module under test
    let mut mempool = Mempool::build(bus.new_handle(), ctx).await?;
    tokio::spawn(async move { mempool.run().await });

    // 💉 Inject a typed event
    client.send(RestApiMessage::NewTx { tx: my_tx() })?;




    Ok(())
}
2) Day to day usage 🖥️
#[tokio::test]
async fn mempool_handle_new_tx() -> Result<()> {
    let bus = SharedMessageBus::new();

    // 🐒 Mock the RestAPI Module
    bus_client! { struct TestClient { sender(RestApiMessage) } }
    let mut client = TestClient::new_from_bus(bus.new_handle()).await;

    let mut mempool_events = get_receiver::<MempoolStatusEvent>(&bus).await;

    // 🧩 Start only the module under test
    let mut mempool = Mempool::build(bus.new_handle(), ctx).await?;
    tokio::spawn(async move { mempool.run().await });

    // 💉 Inject a typed event
    client.send(RestApiMessage::NewTx { tx: my_tx() })?;

    // ✅ Assert typed output deterministically
    let events = mempool_events.recv().await?.into_message();
    assert_eq!(events.len(), 1);
    Ok(())
}
2) Day to day usage 🖥️

2.3) Zero latency ⚡

2) Day to day usage 🖥️

2.3) Zero latency ⚡


⚡ In-process communication

2) Day to day usage 🖥️

2.3) Zero latency ⚡


⚡ In-process communication

📦️ No serialisation

2) Day to day usage 🖥️

2.3) Zero latency ⚡


⚡ In-process communication

📦️ No serialisation

🔍 No service discovery overhead

2) Day to day usage 🖥️

2.3) Zero latency ⚡


⚡ In-process communication

📦️ No serialisation

🔍 No service discovery overhead




🚰

This is the end 🎶

Key takeaways


⚙️ Modularity

📦 Simple operations

🔒 Compile-time safety

This is the end 🎶

Key takeaways


⚙️ Modularity ➡️ shared bus + module event loop ✅

📦 Simple operations

🔒 Compile-time safety

This is the end 🎶

Key takeaways


⚙️ Modularity ➡️ shared bus + module event loop ✅

📦 Simple operations ➡️ Single binary ✅

🔒 Compile-time safety

This is the end 🎶

Key takeaways


⚙️ Modularity ➡️ shared bus + module event loop ✅

📦 Simple operations ➡️ Single binary ✅

🔒 Compile-time safety ➡️ Contracts defined by macro module_bus_client! ✅

This is the end 🎶

Key takeaways


⚙️ Modularity ➡️ shared bus + module event loop ✅

📦 Simple operations ➡️ Single binary ✅

🔒 Compile-time safety ➡️ Contracts defined by macro module_bus_client! ✅


And more: observability, metrics, tracing, module persistance on shutdown...

docs.rs/hyli_bus

This is the end 🎶

Drawbacks

No listen<T> on a receiver ? Ram explodes 💥

module_bus_client! {
    struct ExplodingBusClient {
        receiver(SomeEvent),
    }
}

async fn run(&mut self) -> Result<()> {
    module_handle_messages! {
        on_self self,

        // listen<SomeEvent> ev => {}
    }
}
This is the end 🎶

Thank you for listening 🙏

Hyli

🌐 hyli.org
𝕏 @hyli_org
in Hyli-org

Myself 👋

Linkedin Bertrand Darbon
📧 bertrand@hyli.org

QR code — LinkedIn Linkedin 👋
QR code — github.com/hyli-org/hyli github.com/hyli-org/hyli

Articuler, faire des pauses, mettre un timer

More like KAFKA topics than http -> sender= broadcast

More like KAFKA topics than http -> sender= broadcast

Now we have the explicit contract of the module, what gets in & out -> Kafka / topic -> sender = broadcast PAUSE

Building before starting -> all receivers setup -> nothing lost

A bit heavy to start a tokio::select! loop in a test, so, we can do it a bit differently

(no internal network hop)

(no internal network hop)

(no internal network hop)

(no internal network hop)

Can be detected through metrics, logs...