From 5c01911e3c2e240257f2a6de3558e4b0c2dd7195 Mon Sep 17 00:00:00 2001 From: Hugo Nijhuis Date: Sun, 17 May 2026 14:03:48 +0200 Subject: [PATCH] feat: implement cross-node event broadcasting with NATSEventBus - Add UpdateVersionCache method to JetStreamEventStore for cache synchronization - Add SubscribeToEventStored convenience helper to NATSEventBus - Create integration tests for cross-node broadcasting scenarios - Add example demonstrating NATSEventBus + JetStreamEventStore integration --- examples/cross_node_broadcasting.go | 317 +++++++++++++++++++++ go.mod | 4 + nats_eventbus.go | 20 ++ store/helpers_test.go | 36 +++ store/integration_test.go | 420 ++++++++++++++++++++++++++++ store/jetstream.go | 13 + 6 files changed, 810 insertions(+) create mode 100644 examples/cross_node_broadcasting.go create mode 100644 store/helpers_test.go create mode 100644 store/integration_test.go diff --git a/examples/cross_node_broadcasting.go b/examples/cross_node_broadcasting.go new file mode 100644 index 0000000..8f8c0fb --- /dev/null +++ b/examples/cross_node_broadcasting.go @@ -0,0 +1,317 @@ +//go:build integration +// +build integration + +package examples + +import ( + "context" + "fmt" + "log" + "time" + + "git.flowmade.one/flowmade-one/aether" + "git.flowmade.one/flowmade-one/aether/store" + "github.com/nats-io/nats.go" +) + +// CrossNodeBroadcasting demonstrates how to implement cross-node event broadcasting +// using NATSEventBus with JetStreamEventStore. This example shows how events persist +// to JetStream and are then broadcast to other nodes in the cluster via NATS. +// +// Key Concepts: +// 1. NATSEventBus wraps EventBus to add NATS publishing +// 2. JetStreamEventStore with broadcaster publishes EventStored to NATS +// 3. Other nodes receive these events via NATS subscription +// 4. Version cache is updated on remote events to maintain consistency +// +// Usage: +// go run examples/cross_node_broadcasting.go +func CrossNodeBroadcastingExample() { + // Connect to NATS + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + log.Fatalf("Failed to connect to NATS: %v", err) + } + defer nc.Close() + + // Create NATS event bus (this will broadcast to all nodes) + natsBus, err := aether.NewNATSEventBus(nc) + if err != nil { + log.Fatalf("Failed to create NATS event bus: %v", err) + } + defer natsBus.Stop() + + // Create JetStream event store WITH broadcaster + // This enables EventStored events to be published to NATS + store, err := store.NewJetStreamEventStoreWithBroadcaster( + nc, + "events", + natsBus, + "tenant-abc", // Optional namespace for isolation + ) + if err != nil { + log.Fatalf("Failed to create event store: %v", err) + } + + // Subscribe to EventStored events to update version cache + // This keeps the version cache synchronized across nodes + eventStoredCh := natsBus.SubscribeWithFilter( + "tenant-abc", + &aether.SubscriptionFilter{ + EventTypes: []string{aether.EventTypeEventStored}, + }, + ) + + go func() { + for event := range eventStoredCh { + actorID := event.Data["actorId"].(string) + version := int64(event.Data["version"].(float64)) + store.UpdateVersionCache(actorID, version) + } + }() + + // Now save an event - it will be: + // 1. Persisted to JetStream + // 2. Published to NATS as EventStored + // 3. Received by other nodes via NATS + // 4. Used to update version cache + + event := &aether.Event{ + ID: "event-1", + EventType: "OrderPlaced", + ActorID: "order-123", + Version: 1, + Data: map[string]interface{}{ + "total": 100.00, + "item": "widget", + }, + Timestamp: time.Now(), + } + + err = store.SaveEvent(event) + if err != nil { + log.Fatalf("Failed to save event: %v", err) + } + + log.Println("Event saved to JetStream and broadcast to NATS") + + // Other nodes in the cluster will receive this event via NATS + // and update their version cache accordingly + + // Subscribe to events in this namespace + eventCh := natsBus.Subscribe("tenant-abc") + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + select { + case receivedEvent := <-eventCh: + log.Printf("Received event via NATS: %s (version %d)", + receivedEvent.EventType, receivedEvent.Version) + case <-ctx.Done(): + log.Println("Timeout waiting for event") + } +} + +// CrossNodeBroadcastingMultiNode demonstrates a multi-node cluster setup. +// This simulates multiple nodes connecting to the same NATS cluster. +// In production, these would be separate processes/machines. +// +// This example requires a running NATS server with JetStream enabled. +func CrossNodeBroadcastingMultiNode() { + // Connect to NATS + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + log.Fatalf("Failed to connect to NATS: %v", err) + } + defer nc.Close() + + nodeID := "node-1" + log.Printf("Starting %s", nodeID) + + // Each node creates its own NATS event bus and event store + natsBus, err := aether.NewNATSEventBus(nc) + if err != nil { + log.Fatalf("Failed to create NATS event bus: %v", err) + } + defer natsBus.Stop() + + store, err := store.NewJetStreamEventStoreWithBroadcaster( + nc, + "events", + natsBus, + "tenant-abc", + ) + if err != nil { + log.Fatalf("Failed to create event store: %v", err) + } + + // Setup EventStored subscription for cache synchronization + eventStoredCh := natsBus.SubscribeWithFilter( + "tenant-abc", + &aether.SubscriptionFilter{ + EventTypes: []string{aether.EventTypeEventStored}, + }, + ) + + go func() { + for event := range eventStoredCh { + actorID := event.Data["actorId"].(string) + version := int64(event.Data["version"].(float64)) + store.UpdateVersionCache(actorID, version) + log.Printf("[%s] Received EventStored for %s v%d", nodeID, actorID, version) + } + }() + + // Subscribe to actual events + eventCh := natsBus.Subscribe("tenant-abc") + log.Printf("[%s] Subscribed to tenant-abc", nodeID) + + // Save an event + savedEvent := &aether.Event{ + ID: fmt.Sprintf("event-%s-1", nodeID), + EventType: "OrderPlaced", + ActorID: "order-123", + Version: 1, + Data: map[string]interface{}{ + "node": nodeID, + "total": 100.00, + }, + Timestamp: time.Now(), + } + + err = store.SaveEvent(savedEvent) + if err != nil { + log.Fatalf("[%s] Failed to save event: %v", nodeID, err) + } + + log.Printf("[%s] Saved event to JetStream", nodeID) + + // Wait to receive the event (either from local or remote) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + select { + case receivedEvent := <-eventCh: + log.Printf("[%s] Received: %s v%d from %s", + nodeID, + receivedEvent.EventType, + receivedEvent.Version, + receivedEvent.GetCorrelationID(), + ) + case <-ctx.Done(): + log.Printf("[%s] Timeout waiting for event", nodeID) + } +} + +// DistributedOrderProcessing demonstrates a realistic scenario where +// multiple nodes process orders for the same actor. This shows how +// cross-node broadcasting ensures consistency. +func DistributedOrderProcessing() { + // Connect to NATS + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + log.Fatalf("Failed to connect to NATS: %v", err) + } + defer nc.Close() + + // Node 1: Order creation + node1Bus, _ := aether.NewNATSEventBus(nc) + node1Store, _ := store.NewJetStreamEventStoreWithBroadcaster( + nc, "events", node1Bus, "orders", + ) + defer node1Bus.Stop() + + // Setup EventStored subscription for Node 1 + eventStoredCh1 := node1Bus.SubscribeWithFilter( + "orders", + &aether.SubscriptionFilter{ + EventTypes: []string{aether.EventTypeEventStored}, + }, + ) + go func() { + for event := range eventStoredCh1 { + actorID := event.Data["actorId"].(string) + version := int64(event.Data["version"].(float64)) + node1Store.UpdateVersionCache(actorID, version) + } + }() + + // Node 2: Order processing (different node) + node2Bus, _ := aether.NewNATSEventBus(nc) + node2Store, _ := store.NewJetStreamEventStoreWithBroadcaster( + nc, "events", node2Bus, "orders", + ) + defer node2Bus.Stop() + + // Setup EventStored subscription for Node 2 + eventStoredCh2 := node2Bus.SubscribeWithFilter( + "orders", + &aether.SubscriptionFilter{ + EventTypes: []string{aether.EventTypeEventStored}, + }, + ) + go func() { + for event := range eventStoredCh2 { + actorID := event.Data["actorId"].(string) + version := int64(event.Data["version"].(float64)) + node2Store.UpdateVersionCache(actorID, version) + } + }() + + // Node 2 also subscribes to events to receive updates + node2EventCh := node2Bus.Subscribe("orders") + + // Node 1 creates an order + orderPlaced := &aether.Event{ + ID: "order-created", + EventType: "OrderPlaced", + ActorID: "order-456", + Version: 1, + Data: map[string]interface{}{ + "total": 100.00, + }, + Timestamp: time.Now(), + } + + if err := node1Store.SaveEvent(orderPlaced); err != nil { + log.Fatalf("Failed to create order: %v", err) + } + log.Println("Node 1: Created order-456") + + // Node 2 receives the OrderPlaced event via NATS + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + select { + case event := <-node2EventCh: + log.Printf("Node 2: Received %s v%d", event.EventType, event.Version) + + // Node 2 processes the order (must use version 2) + orderProcessed := &aether.Event{ + ID: "order-processed", + EventType: "OrderProcessed", + ActorID: "order-456", + Version: 2, // Must be > 1 + Data: map[string]interface{}{ + "status": "shipped", + }, + Timestamp: time.Now(), + } + + if err := node2Store.SaveEvent(orderProcessed); err != nil { + log.Fatalf("Node 2: Failed to process order: %v", err) + } + log.Println("Node 2: Processed order-456") + + case <-ctx.Done(): + log.Println("Node 2: Timeout waiting for order event") + } + + // Verify event stream consistency + events, err := node1Store.GetEvents("order-456", 0) + if err != nil { + log.Fatalf("Failed to get events: %v", err) + } + log.Printf("Node 1: Event stream has %d events", len(events)) +} \ No newline at end of file diff --git a/go.mod b/go.mod index 8462de1..c9d8882 100644 --- a/go.mod +++ b/go.mod @@ -6,16 +6,19 @@ require ( github.com/google/uuid v1.6.0 github.com/nats-io/nats.go v1.37.0 github.com/prometheus/client_golang v1.23.2 + github.com/stretchr/testify v1.11.1 ) require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect @@ -23,4 +26,5 @@ require ( golang.org/x/crypto v0.18.0 // indirect golang.org/x/sys v0.35.0 // indirect google.golang.org/protobuf v1.36.8 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/nats_eventbus.go b/nats_eventbus.go index f0207ab..34acfc4 100644 --- a/nats_eventbus.go +++ b/nats_eventbus.go @@ -210,6 +210,26 @@ func (neb *NATSEventBus) Publish(namespaceID string, event *Event) { } } +// SubscribeToEventStored creates a subscription specifically for EventStored events. +// This is a convenience method for the common pattern of listening to persisted events +// to update version cache or trigger other actions. +// +// Example: +// +// eventStoredCh := natsBus.SubscribeToEventStored("tenant-abc") +// go func() { +// for event := range eventStoredCh { +// actorID := event.Data["actorId"].(string) +// version := int64(event.Data["version"].(float64)) +// store.UpdateVersionCache(actorID, version) +// } +// }() +func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event { + return neb.SubscribeWithFilter(namespacePattern, &SubscriptionFilter{ + EventTypes: []string{EventTypeEventStored}, + }) +} + // Stop closes the NATS event bus and all subscriptions func (neb *NATSEventBus) Stop() { neb.mutex.Lock() diff --git a/store/helpers_test.go b/store/helpers_test.go new file mode 100644 index 0000000..15d99c9 --- /dev/null +++ b/store/helpers_test.go @@ -0,0 +1,36 @@ +//go:build integration +// +build integration + +package store + +import ( + "testing" + + "github.com/nats-io/nats.go" +) + +// getTestNATSConnection returns a NATS connection for testing. +// This helper is used by benchmark tests that require NATS. +func getTestNATSConnection(t *testing.T) *nats.Conn { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + t.Skipf("NATS not available: %v", err) + } + return nc +} + +// getVersionFromEvent extracts version from EventStored event data. +func getVersionFromEvent(data map[string]interface{}) int64 { + switch v := data["version"].(type) { + case float64: + return int64(v) + case int64: + return v + case int: + return int64(v) + case uint64: + return int64(v) + default: + return 0 + } +} \ No newline at end of file diff --git a/store/integration_test.go b/store/integration_test.go new file mode 100644 index 0000000..c8d1296 --- /dev/null +++ b/store/integration_test.go @@ -0,0 +1,420 @@ +//go:build integration +// +build integration + +package store + +import ( + "context" + "fmt" + "log" + "testing" + "time" + + "git.flowmade.one/flowmade-one/aether" + "github.com/nats-io/nats.go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// generateStreamName creates a unique stream name for each test run +func generateStreamName(baseName string) string { + return fmt.Sprintf("%s_%s_%d", baseName, "tv149", time.Now().UnixNano()%100000000) +} + +// cleanupStream deletes a JetStream stream if it exists. +func cleanupStream(nc *nats.Conn, streamName string) { + js, err := nc.JetStream() + if err != nil { + return + } + err = js.DeleteStream(streamName) + // Silently ignore errors - we just want to clean up + _ = err +} + +// TestCrossNodeBroadcasting_SingleNode tests basic cross-node broadcasting +// on a single node (local loopback). +func TestCrossNodeBroadcasting_SingleNode(t *testing.T) { + nc, err := nats.Connect(nats.DefaultURL) + require.NoError(t, err) + defer nc.Close() + + streamName := generateStreamName("broadcast_single") + cleanupStream(nc, streamName) + + // Create NATS event bus + natsBus, err := aether.NewNATSEventBus(nc) + require.NoError(t, err) + defer natsBus.Stop() + + // Create event store with broadcaster + store, err := NewJetStreamEventStoreWithBroadcaster( + nc, + streamName, + natsBus, + "tenant-single", + ) + require.NoError(t, err) + + // Subscribe to events + eventCh := natsBus.Subscribe("tenant-single") + + // Save event + testEvent := &aether.Event{ + ID: "event-1", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 1, + Data: map[string]interface{}{"test": "single"}, + Timestamp: time.Now(), + } + + err = store.SaveEvent(testEvent) + require.NoError(t, err) + log.Printf("Saved event: %s", testEvent.ID) + + // Receive event via NATS (NATSBroadcast may have wrapped in EventStored) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + select { + case received := <-eventCh: + // The received event should have the same actor and version + assert.Equal(t, "actor-1", received.ActorID) + assert.Equal(t, int64(1), received.Version) + // Event type might be original or EventStored wrapper + if received.EventType == aether.EventTypeEventStored { + assert.Equal(t, "actor-1", received.Data["actorId"].(string)) + log.Printf("Received EventStored wrapper: %s", received.ID) + } else { + log.Printf("Received via NATS: %s (type: %s)", received.ID, received.EventType) + } + case <-ctx.Done(): + t.Fatal("Did not receive event via NATS") + } +} + +// TestCrossNodeBroadcasting_MultiNode tests broadcasting between two nodes. +func TestCrossNodeBroadcasting_MultiNode(t *testing.T) { + nc, err := nats.Connect(nats.DefaultURL) + require.NoError(t, err) + defer nc.Close() + + streamName := generateStreamName("broadcast_multi") + cleanupStream(nc, streamName) + + // Create Node A + nodeANatsBus, err := aether.NewNATSEventBus(nc) + require.NoError(t, err) + defer nodeANatsBus.Stop() + + nodeAStore, err := NewJetStreamEventStoreWithBroadcaster( + nc, + streamName, + nodeANatsBus, + "tenant-multi", + ) + require.NoError(t, err) + + // Create Node B + nodeBNatsBus, err := aether.NewNATSEventBus(nc) + require.NoError(t, err) + defer nodeBNatsBus.Stop() + + nodeBStore, err := NewJetStreamEventStoreWithBroadcaster( + nc, + streamName, + nodeBNatsBus, + "tenant-multi", + ) + require.NoError(t, err) + + // Node A subscribes + nodeAEventCh := nodeANatsBus.Subscribe("tenant-multi") + + // Node B subscribes + nodeBEventCh := nodeBNatsBus.Subscribe("tenant-multi") + + // Node A saves event + eventA := &aether.Event{ + ID: "event-node-a", + EventType: "TestEvent", + ActorID: "multi-actor", + Version: 1, + Data: map[string]interface{}{"node": "a"}, + Timestamp: time.Now(), + } + + err = nodeAStore.SaveEvent(eventA) + require.NoError(t, err) + log.Printf("Node A saved: %s", eventA.ID) + + // Node B receives event (EventStored wrapper or original) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + select { + case received := <-nodeBEventCh: + // Check actor and version match + assert.Equal(t, "multi-actor", received.ActorID) + assert.Equal(t, int64(1), received.Version) + if received.EventType == aether.EventTypeEventStored { + // EventStored wrapper + assert.Equal(t, "multi-actor", received.Data["actorId"].(string)) + } else { + // Original event + assert.Equal(t, "a", received.Data["node"]) + } + log.Printf("Node B received: %s (actor: %s, version: %d)", received.ID, received.ActorID, received.Version) + case <-ctx.Done(): + t.Fatal("Node B did not receive event") + } + + // Give Node B time to receive and process Node A's event + time.Sleep(100 * time.Millisecond) + + // Node B saves event with different actor + eventB := &aether.Event{ + ID: "event-node-b", + EventType: "TestEvent", + ActorID: "multi-actor-b", // Different actor + Version: 1, + Data: map[string]interface{}{"node": "b"}, + Timestamp: time.Now(), + } + + err = nodeBStore.SaveEvent(eventB) + require.NoError(t, err) + log.Printf("Node B saved: %s", eventB.ID) + + // Node A receives Node B's event (could be EventStored or original) + ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel2() + + // Wait for either EventStored or the actual event + received := false + for !received { + select { + case event := <-nodeAEventCh: + // Check for Node B's event + if event.EventType == aether.EventTypeEventStored { + // EventStored wrapper - check actorId + actorID := event.Data["actorId"].(string) + if actorID == "multi-actor-b" { + received = true + log.Printf("Node A received EventStored for Node B's event: %s", event.ID) + } + } else if event.ActorID == "multi-actor-b" { + received = true + log.Printf("Node A received Node B's event: %s", event.ID) + } + // Keep listening until we get Node B's event + case <-ctx2.Done(): + if !received { + t.Fatal("Node A did not receive Node B's event") + } + } + } +} + +// TestUpdateVersionCache tests the version cache update logic. +func TestUpdateVersionCache(t *testing.T) { + nc, err := nats.Connect(nats.DefaultURL) + require.NoError(t, err) + defer nc.Close() + + streamName := generateStreamName("cache_test") + cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + require.NoError(t, err) + + // Save event version 1 + event1 := &aether.Event{ + ID: "event-1", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err = store.SaveEvent(event1) + require.NoError(t, err) + + // EventStored will trigger UpdateVersionCache + time.Sleep(100 * time.Millisecond) + + // Save event version 2 - should succeed (version > 1) + event2 := &aether.Event{ + ID: "event-2", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 2, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err = store.SaveEvent(event2) + require.NoError(t, err) + + // Save event version 3 - should succeed (version > 2) + event3 := &aether.Event{ + ID: "event-3", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 3, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err = store.SaveEvent(event3) + require.NoError(t, err) + + // Verify all events can be retrieved + events, err := store.GetEvents("actor-1", 0) + require.NoError(t, err) + assert.Len(t, events, 3) + + // Verify latest version + latest, err := store.GetLatestVersion("actor-1") + require.NoError(t, err) + assert.Equal(t, int64(3), latest) + + // Manually update cache with version 5 (simulating external update) + store.UpdateVersionCache("actor-1", 5) + + // Verify version 4 would conflict (4 < cached 5) + event4 := &aether.Event{ + ID: "event-4", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 4, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err = store.SaveEvent(event4) + assert.Error(t, err, "version 4 should conflict with cached version 5") + + // Version 6 should succeed (6 > 5) + event6 := &aether.Event{ + ID: "event-6", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 6, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err = store.SaveEvent(event6) + require.NoError(t, err) + + // Verify version 6 was saved + latest, err = store.GetLatestVersion("actor-1") + require.NoError(t, err) + assert.Equal(t, int64(6), latest) +} + +// TestSubscribeToEventStored tests the convenience helper. +func TestSubscribeToEventStored(t *testing.T) { + nc, err := nats.Connect(nats.DefaultURL) + require.NoError(t, err) + defer nc.Close() + + natsBus, err := aether.NewNATSEventBus(nc) + require.NoError(t, err) + defer natsBus.Stop() + + // Use helper + eventStoredCh := natsBus.SubscribeToEventStored("test-store") + + // Verify channel is created + assert.NotNil(t, eventStoredCh) + + // Publish EventStored manually (version will be float64 from JSON) + eventStored := &aether.Event{ + ID: "stored-1", + EventType: aether.EventTypeEventStored, + ActorID: "actor-1", + Version: 1, + Data: map[string]interface{}{ + "actorId": "actor-1", + "version": 1.0, // Use float64 to match JSON encoding + }, + Timestamp: time.Now(), + } + + natsBus.Publish("test-store", eventStored) + + // Should receive the EventStored + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + select { + case received := <-eventStoredCh: + assert.Equal(t, aether.EventTypeEventStored, received.EventType) + assert.Equal(t, "actor-1", received.ActorID) + log.Printf("Received EventStored: %s", received.ID) + case <-ctx.Done(): + t.Fatal("Did not receive EventStored") + } +} + +// TestCrossNodeBroadcasting_NamespaceIsolation tests namespace isolation. +func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) { + nc, err := nats.Connect(nats.DefaultURL) + require.NoError(t, err) + defer nc.Close() + + streamName := generateStreamName("namespace_isolation") + cleanupStream(nc, streamName) + + // Create stores with different namespaces + storeA, err := NewJetStreamEventStoreWithBroadcaster( + nc, + streamName, + nil, + "tenant-a", + ) + require.NoError(t, err) + + storeB, err := NewJetStreamEventStoreWithBroadcaster( + nc, + streamName, + nil, + "tenant-b", + ) + require.NoError(t, err) + + // Save to each namespace + eventA := &aether.Event{ + ID: "event-a", + EventType: "TestEvent", + ActorID: "actor-a", + Version: 1, + Data: map[string]interface{}{"tenant": "a"}, + Timestamp: time.Now(), + } + + err = storeA.SaveEvent(eventA) + require.NoError(t, err) + + eventB := &aether.Event{ + ID: "event-b", + EventType: "TestEvent", + ActorID: "actor-b", + Version: 1, + Data: map[string]interface{}{"tenant": "b"}, + Timestamp: time.Now(), + } + + err = storeB.SaveEvent(eventB) + require.NoError(t, err) + + // Verify each store can see its own events + eventsA, err := storeA.GetEvents("actor-a", 0) + require.NoError(t, err) + assert.Len(t, eventsA, 1) + assert.Equal(t, "a", eventsA[0].Data["tenant"]) + + eventsB, err := storeB.GetEvents("actor-b", 0) + require.NoError(t, err) + assert.Len(t, eventsB, 1) + assert.Equal(t, "b", eventsB[0].Data["tenant"]) +} \ No newline at end of file diff --git a/store/jetstream.go b/store/jetstream.go index 1c85f8c..fdd137e 100644 --- a/store/jetstream.go +++ b/store/jetstream.go @@ -558,5 +558,18 @@ func sanitizeSubject(s string) string { return s } +// UpdateVersionCache updates the version cache for a specific actor. +// This is used when receiving events from other nodes via NATS to keep +// the version cache consistent across cluster nodes. +func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) { + jes.mu.Lock() + defer jes.mu.Unlock() + + // Only update if the new version is greater than cached version + if currentVersion, ok := jes.versions[actorID]; !ok || version > currentVersion { + jes.versions[actorID] = version + } +} + // Compile-time check that JetStreamEventStore implements EventStoreWithErrors var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)