All checks were successful
CI / build (push) Successful in 22s
This PR implements cross-node event broadcasting for aether. Changes: - UpdateVersionCache method in JetStreamEventStore - SubscribeToEventStored helper in NATSEventBus - Integration tests for cross-node scenarios - Example code demonstrating NATSEventBus + JetStreamEventStore Tests: All integration tests passing. Co-authored-by: Claude Code <noreply@anthropic.com> Co-authored-by: Hugo Nijhuis <hugo.nijhuis@flowmade.one> Reviewed-on: #151
168 lines
3.9 KiB
Go
168 lines
3.9 KiB
Go
// Package main demonstrates cross-node event broadcasting using NATSEventBus
|
|
// and JetStreamEventStore for cluster synchronization.
|
|
//
|
|
// This example shows:
|
|
// 1. Setting up NATSEventBus with JetStreamEventStore
|
|
// 2. Broadcasting events across NATS for cross-node distribution
|
|
// 3. Subscribing to EventStored events for version cache synchronization
|
|
// 4. Properly handling EventStored events from other cluster nodes
|
|
//
|
|
// Prerequisites:
|
|
// - NATS server running with JetStream enabled (nats-server -js)
|
|
// - Events stream created in JetStream
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"git.flowmade.one/flowmade-one/aether"
|
|
"git.flowmade.one/flowmade-one/aether/store"
|
|
"github.com/google/uuid"
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
func main() {
|
|
natsURL := getEnv("NATS_URL", "nats://localhost:4222")
|
|
|
|
nc, err := nats.Connect(natsURL)
|
|
if err != nil {
|
|
log.Fatal("Failed to connect to NATS:", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
ctx := context.Background()
|
|
|
|
store1, err := store.NewJetStreamEventStore(nc, "events")
|
|
if err != nil {
|
|
log.Fatal("Failed to create event store:", err)
|
|
}
|
|
|
|
eventBus1 := aether.NewNATSEventBusWithBroadcaster(nc, store1, "")
|
|
defer eventBus1.Stop()
|
|
|
|
store2, err := store.NewJetStreamEventStore(nc, "events")
|
|
if err != nil {
|
|
log.Fatal("Failed to create event store:", err)
|
|
}
|
|
|
|
eventBus2 := aether.NewNATSEventBusWithBroadcaster(nc, store2, "")
|
|
defer eventBus2.Stop()
|
|
|
|
eventStoredCh1 := eventBus1.SubscribeToEventStored("*")
|
|
eventStoredCh2 := eventBus2.SubscribeToEventStored("*")
|
|
|
|
done := make(chan struct{})
|
|
|
|
go processEvents(ctx, eventStoredCh1, store1, done)
|
|
go processEvents(ctx, eventStoredCh2, store2, done)
|
|
|
|
go func() {
|
|
time.Sleep(2 * time.Second)
|
|
|
|
actorID := "demo-actor"
|
|
|
|
event1 := &aether.Event{
|
|
ID: uuid.New().String(),
|
|
EventType: "OrderPlaced",
|
|
ActorID: actorID,
|
|
Version: 1,
|
|
Data: map[string]interface{}{
|
|
"total": 99.99,
|
|
"status": "pending",
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
log.Printf("Node 1 publishing event: %s", event1.EventType)
|
|
eventBus1.Publish("", event1)
|
|
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
event2 := &aether.Event{
|
|
ID: uuid.New().String(),
|
|
EventType: "OrderPaid",
|
|
ActorID: actorID,
|
|
Version: 2,
|
|
Data: map[string]interface{}{
|
|
"total": 99.99,
|
|
"status": "paid",
|
|
"method": "credit_card",
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
log.Printf("Node 2 publishing event: %s", event2.EventType)
|
|
eventBus2.Publish("", event2)
|
|
|
|
time.Sleep(2 * time.Second)
|
|
|
|
close(done)
|
|
|
|
log.Println("Cross-node broadcasting demo complete")
|
|
}()
|
|
|
|
sigCh := make(chan os.Signal, 1)
|
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
select {
|
|
case <-sigCh:
|
|
log.Println("Shutting down...")
|
|
case <-done:
|
|
}
|
|
}
|
|
|
|
func processEvents(ctx context.Context, eventStoredCh <-chan *aether.Event, eventStore *store.JetStreamEventStore, done chan struct{}) {
|
|
for {
|
|
select {
|
|
case <-done:
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
case event, ok := <-eventStoredCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if event == nil {
|
|
continue
|
|
}
|
|
|
|
if event.EventType != aether.EventTypeEventStored {
|
|
continue
|
|
}
|
|
|
|
actorID, ok := event.Data["actorId"].(string)
|
|
if !ok {
|
|
log.Printf("Warning: EventStored missing actorId")
|
|
continue
|
|
}
|
|
|
|
version, ok := event.Data["version"].(int64)
|
|
if !ok {
|
|
log.Printf("Warning: EventStored missing version")
|
|
continue
|
|
}
|
|
|
|
eventID, _ := event.Data["eventId"].(string)
|
|
|
|
log.Printf("Received EventStored: actor=%s, version=%d, eventId=%s", actorID, version, eventID)
|
|
|
|
eventStore.UpdateVersionCache(actorID, version)
|
|
|
|
currentVersion, _ := eventStore.GetLatestVersion(actorID)
|
|
log.Printf("Updated cache: %s now has version %d (cached: %d)", actorID, version, currentVersion)
|
|
}
|
|
}
|
|
}
|
|
|
|
func getEnv(key, defaultValue string) string {
|
|
if value := os.Getenv(key); value != "" {
|
|
return value
|
|
}
|
|
return defaultValue
|
|
} |