// Package main demonstrates cross-node event broadcasting using NATSEventBus // and JetStreamEventStore for cluster synchronization. // // This example shows: // 1. Setting up NATSEventBus with JetStreamEventStore // 2. Broadcasting events across NATS for cross-node distribution // 3. Subscribing to EventStored events for version cache synchronization // 4. Properly handling EventStored events from other cluster nodes // // Prerequisites: // - NATS server running with JetStream enabled (nats-server -js) // - Events stream created in JetStream package main import ( "context" "log" "os" "os/signal" "syscall" "time" "git.flowmade.one/flowmade-one/aether" "git.flowmade.one/flowmade-one/aether/store" "github.com/google/uuid" "github.com/nats-io/nats.go" ) func main() { natsURL := getEnv("NATS_URL", "nats://localhost:4222") nc, err := nats.Connect(natsURL) if err != nil { log.Fatal("Failed to connect to NATS:", err) } defer nc.Close() ctx := context.Background() store1, err := store.NewJetStreamEventStore(nc, "events") if err != nil { log.Fatal("Failed to create event store:", err) } eventBus1 := aether.NewNATSEventBusWithBroadcaster(nc, store1, "") defer eventBus1.Stop() store2, err := store.NewJetStreamEventStore(nc, "events") if err != nil { log.Fatal("Failed to create event store:", err) } eventBus2 := aether.NewNATSEventBusWithBroadcaster(nc, store2, "") defer eventBus2.Stop() eventStoredCh1 := eventBus1.SubscribeToEventStored("*") eventStoredCh2 := eventBus2.SubscribeToEventStored("*") done := make(chan struct{}) go processEvents(ctx, eventStoredCh1, store1, done) go processEvents(ctx, eventStoredCh2, store2, done) go func() { time.Sleep(2 * time.Second) actorID := "demo-actor" event1 := &aether.Event{ ID: uuid.New().String(), EventType: "OrderPlaced", ActorID: actorID, Version: 1, Data: map[string]interface{}{ "total": 99.99, "status": "pending", }, Timestamp: time.Now(), } log.Printf("Node 1 publishing event: %s", event1.EventType) eventBus1.Publish("", event1) time.Sleep(500 * time.Millisecond) event2 := &aether.Event{ ID: uuid.New().String(), EventType: "OrderPaid", ActorID: actorID, Version: 2, Data: map[string]interface{}{ "total": 99.99, "status": "paid", "method": "credit_card", }, Timestamp: time.Now(), } log.Printf("Node 2 publishing event: %s", event2.EventType) eventBus2.Publish("", event2) time.Sleep(2 * time.Second) close(done) log.Println("Cross-node broadcasting demo complete") }() sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) select { case <-sigCh: log.Println("Shutting down...") case <-done: } } func processEvents(ctx context.Context, eventStoredCh <-chan *aether.Event, eventStore *store.JetStreamEventStore, done chan struct{}) { for { select { case <-done: return case <-ctx.Done(): return case event, ok := <-eventStoredCh: if !ok { return } if event == nil { continue } if event.EventType != aether.EventTypeEventStored { continue } actorID, ok := event.Data["actorId"].(string) if !ok { log.Printf("Warning: EventStored missing actorId") continue } version, ok := event.Data["version"].(int64) if !ok { log.Printf("Warning: EventStored missing version") continue } eventID, _ := event.Data["eventId"].(string) log.Printf("Received EventStored: actor=%s, version=%d, eventId=%s", actorID, version, eventID) eventStore.UpdateVersionCache(actorID, version) currentVersion, _ := eventStore.GetLatestVersion(actorID) log.Printf("Updated cache: %s now has version %d (cached: %d)", actorID, version, currentVersion) } } } func getEnv(key, defaultValue string) string { if value := os.Getenv(key); value != "" { return value } return defaultValue }