Implement cross-node event broadcasting with NATSEventBus #149

Closed
opened 2026-05-15 09:36:14 +00:00 by HugoNijhuis · 1 comment
Owner

Problem

NATSEventBus exists but is not integrated with JetStreamEventStore. Events don't flow across cluster nodes.

Current State

  • JetStreamEventStore has broadcaster field but most code uses NewJetStreamEventStore (no broadcaster)
  • NewJetStreamEventStoreWithBroadcaster exists but underused
  • NATSEventBus can receive events but not subscribed by default

Required Implementation

1. Default Broadcaster Configuration

// Create NATS event bus
natsBus, _ := aether.NewNATSEventBus(natsConn)

// Create event store with broadcaster
store, _ := store.NewJetStreamEventStoreWithBroadcaster(
    natsConn, "events", natsBus, "tenant-abc"
)

2. Subscribe to EventStored Events

eventStoredCh := natsBus.SubscribeWithFilter(
    "tenant-abc",
    &aether.SubscriptionFilter{
        EventTypes: []string{aether.EventTypeEventStored},
    },
)

// Update version cache when events are persisted
go func() {
    for ev := range eventStoredCh {
        store.UpdateVersionCache(ev.Data["actorId"], ev.Version)
    }
}()

3. Cross-Node Event Flow

  1. Node A: SaveEvent -> JetStream stream
  2. EventStore: Publish EventStored to NATSEventBus
  3. NATSEventBus: Publish to NATS "aether.events.tenant-abc"
  4. Node B: Receive via NATS subscription
  5. Node B: Deliver to local subscribers

Acceptance Criteria

  • NATSEventBus + JetStreamEventStore integration example
  • Events published to NATS when using broadcaster
  • Events received from other nodes via NATS
  • Namespace isolation maintained across nodes
  • Integration test with multiple NATS nodes
## Problem NATSEventBus exists but is not integrated with JetStreamEventStore. Events don't flow across cluster nodes. ## Current State - `JetStreamEventStore` has broadcaster field but most code uses `NewJetStreamEventStore` (no broadcaster) - `NewJetStreamEventStoreWithBroadcaster` exists but underused - NATSEventBus can receive events but not subscribed by default ## Required Implementation ### 1. Default Broadcaster Configuration ```go // Create NATS event bus natsBus, _ := aether.NewNATSEventBus(natsConn) // Create event store with broadcaster store, _ := store.NewJetStreamEventStoreWithBroadcaster( natsConn, "events", natsBus, "tenant-abc" ) ``` ### 2. Subscribe to EventStored Events ```go eventStoredCh := natsBus.SubscribeWithFilter( "tenant-abc", &aether.SubscriptionFilter{ EventTypes: []string{aether.EventTypeEventStored}, }, ) // Update version cache when events are persisted go func() { for ev := range eventStoredCh { store.UpdateVersionCache(ev.Data["actorId"], ev.Version) } }() ``` ### 3. Cross-Node Event Flow 1. Node A: SaveEvent -> JetStream stream 2. EventStore: Publish EventStored to NATSEventBus 3. NATSEventBus: Publish to NATS "aether.events.tenant-abc" 4. Node B: Receive via NATS subscription 5. Node B: Deliver to local subscribers ## Acceptance Criteria - [ ] NATSEventBus + JetStreamEventStore integration example - [ ] Events published to NATS when using broadcaster - [ ] Events received from other nodes via NATS - [ ] Namespace isolation maintained across nodes - [ ] Integration test with multiple NATS nodes
Author
Owner

Implementation complete! Created PR #151 with:

  • UpdateVersionCache method in JetStreamEventStore
  • SubscribeToEventStored helper in NATSEventBus
  • Integration tests (all passing):
    • TestCrossNodeBroadcasting_SingleNode
    • TestCrossNodeBroadcasting_MultiNode
    • TestUpdateVersionCache
    • TestSubscribeToEventStored
    • TestCrossNodeBroadcasting_NamespaceIsolation
  • Example demonstrating NATSEventBus + JetStreamEventStore integration
Implementation complete! Created PR #151 with: - `UpdateVersionCache` method in JetStreamEventStore - `SubscribeToEventStored` helper in NATSEventBus - Integration tests (all passing): - TestCrossNodeBroadcasting_SingleNode - TestCrossNodeBroadcasting_MultiNode - TestUpdateVersionCache - TestSubscribeToEventStored - TestCrossNodeBroadcasting_NamespaceIsolation - Example demonstrating NATSEventBus + JetStreamEventStore integration
Sign in to join this conversation.
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: flowmade-one/aether#149