Files
aether/examples/cross_node_broadcasting.go
Hugo Nijhuis b481dae0b6
All checks were successful
CI / build (push) Successful in 22s
feat: implement cross-node event broadcasting with NATSEventBus (#151)
This PR implements cross-node event broadcasting for aether.

Changes:
- UpdateVersionCache method in JetStreamEventStore
- SubscribeToEventStored helper in NATSEventBus
- Integration tests for cross-node scenarios
- Example code demonstrating NATSEventBus + JetStreamEventStore

Tests: All integration tests passing.
Co-authored-by: Claude Code <noreply@anthropic.com>
Co-authored-by: Hugo Nijhuis <hugo.nijhuis@flowmade.one>
Reviewed-on: #151
2026-05-17 15:29:52 +00:00

168 lines
3.9 KiB
Go

// Package main demonstrates cross-node event broadcasting using NATSEventBus
// and JetStreamEventStore for cluster synchronization.
//
// This example shows:
// 1. Setting up NATSEventBus with JetStreamEventStore
// 2. Broadcasting events across NATS for cross-node distribution
// 3. Subscribing to EventStored events for version cache synchronization
// 4. Properly handling EventStored events from other cluster nodes
//
// Prerequisites:
// - NATS server running with JetStream enabled (nats-server -js)
// - Events stream created in JetStream
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"git.flowmade.one/flowmade-one/aether"
"git.flowmade.one/flowmade-one/aether/store"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)
func main() {
natsURL := getEnv("NATS_URL", "nats://localhost:4222")
nc, err := nats.Connect(natsURL)
if err != nil {
log.Fatal("Failed to connect to NATS:", err)
}
defer nc.Close()
ctx := context.Background()
store1, err := store.NewJetStreamEventStore(nc, "events")
if err != nil {
log.Fatal("Failed to create event store:", err)
}
eventBus1 := aether.NewNATSEventBusWithBroadcaster(nc, store1, "")
defer eventBus1.Stop()
store2, err := store.NewJetStreamEventStore(nc, "events")
if err != nil {
log.Fatal("Failed to create event store:", err)
}
eventBus2 := aether.NewNATSEventBusWithBroadcaster(nc, store2, "")
defer eventBus2.Stop()
eventStoredCh1 := eventBus1.SubscribeToEventStored("*")
eventStoredCh2 := eventBus2.SubscribeToEventStored("*")
done := make(chan struct{})
go processEvents(ctx, eventStoredCh1, store1, done)
go processEvents(ctx, eventStoredCh2, store2, done)
go func() {
time.Sleep(2 * time.Second)
actorID := "demo-actor"
event1 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{
"total": 99.99,
"status": "pending",
},
Timestamp: time.Now(),
}
log.Printf("Node 1 publishing event: %s", event1.EventType)
eventBus1.Publish("", event1)
time.Sleep(500 * time.Millisecond)
event2 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPaid",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{
"total": 99.99,
"status": "paid",
"method": "credit_card",
},
Timestamp: time.Now(),
}
log.Printf("Node 2 publishing event: %s", event2.EventType)
eventBus2.Publish("", event2)
time.Sleep(2 * time.Second)
close(done)
log.Println("Cross-node broadcasting demo complete")
}()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigCh:
log.Println("Shutting down...")
case <-done:
}
}
func processEvents(ctx context.Context, eventStoredCh <-chan *aether.Event, eventStore *store.JetStreamEventStore, done chan struct{}) {
for {
select {
case <-done:
return
case <-ctx.Done():
return
case event, ok := <-eventStoredCh:
if !ok {
return
}
if event == nil {
continue
}
if event.EventType != aether.EventTypeEventStored {
continue
}
actorID, ok := event.Data["actorId"].(string)
if !ok {
log.Printf("Warning: EventStored missing actorId")
continue
}
version, ok := event.Data["version"].(int64)
if !ok {
log.Printf("Warning: EventStored missing version")
continue
}
eventID, _ := event.Data["eventId"].(string)
log.Printf("Received EventStored: actor=%s, version=%d, eventId=%s", actorID, version, eventID)
eventStore.UpdateVersionCache(actorID, version)
currentVersion, _ := eventStore.GetLatestVersion(actorID)
log.Printf("Updated cache: %s now has version %d (cached: %d)", actorID, version, currentVersion)
}
}
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}