1 Commits

Author SHA1 Message Date
Hugo Nijhuis
5c01911e3c feat: implement cross-node event broadcasting with NATSEventBus
All checks were successful
CI / build (pull_request) Successful in 20s
- Add UpdateVersionCache method to JetStreamEventStore for cache synchronization
- Add SubscribeToEventStored convenience helper to NATSEventBus
- Create integration tests for cross-node broadcasting scenarios
- Add example demonstrating NATSEventBus + JetStreamEventStore integration
2026-05-17 14:07:43 +02:00
7 changed files with 811 additions and 1 deletions

View File

@@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: actions/setup-go@v5 - uses: actions/setup-go@v5
with: with:
go-version: '1.26' go-version: '1.23'
- name: Build - name: Build
run: go build ./... run: go build ./...
- name: Test - name: Test

View File

@@ -0,0 +1,317 @@
//go:build integration
// +build integration
package examples
import (
"context"
"fmt"
"log"
"time"
"git.flowmade.one/flowmade-one/aether"
"git.flowmade.one/flowmade-one/aether/store"
"github.com/nats-io/nats.go"
)
// CrossNodeBroadcasting demonstrates how to implement cross-node event broadcasting
// using NATSEventBus with JetStreamEventStore. This example shows how events persist
// to JetStream and are then broadcast to other nodes in the cluster via NATS.
//
// Key Concepts:
// 1. NATSEventBus wraps EventBus to add NATS publishing
// 2. JetStreamEventStore with broadcaster publishes EventStored to NATS
// 3. Other nodes receive these events via NATS subscription
// 4. Version cache is updated on remote events to maintain consistency
//
// Usage:
// go run examples/cross_node_broadcasting.go
func CrossNodeBroadcastingExample() {
// Connect to NATS
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
// Create NATS event bus (this will broadcast to all nodes)
natsBus, err := aether.NewNATSEventBus(nc)
if err != nil {
log.Fatalf("Failed to create NATS event bus: %v", err)
}
defer natsBus.Stop()
// Create JetStream event store WITH broadcaster
// This enables EventStored events to be published to NATS
store, err := store.NewJetStreamEventStoreWithBroadcaster(
nc,
"events",
natsBus,
"tenant-abc", // Optional namespace for isolation
)
if err != nil {
log.Fatalf("Failed to create event store: %v", err)
}
// Subscribe to EventStored events to update version cache
// This keeps the version cache synchronized across nodes
eventStoredCh := natsBus.SubscribeWithFilter(
"tenant-abc",
&aether.SubscriptionFilter{
EventTypes: []string{aether.EventTypeEventStored},
},
)
go func() {
for event := range eventStoredCh {
actorID := event.Data["actorId"].(string)
version := int64(event.Data["version"].(float64))
store.UpdateVersionCache(actorID, version)
}
}()
// Now save an event - it will be:
// 1. Persisted to JetStream
// 2. Published to NATS as EventStored
// 3. Received by other nodes via NATS
// 4. Used to update version cache
event := &aether.Event{
ID: "event-1",
EventType: "OrderPlaced",
ActorID: "order-123",
Version: 1,
Data: map[string]interface{}{
"total": 100.00,
"item": "widget",
},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err != nil {
log.Fatalf("Failed to save event: %v", err)
}
log.Println("Event saved to JetStream and broadcast to NATS")
// Other nodes in the cluster will receive this event via NATS
// and update their version cache accordingly
// Subscribe to events in this namespace
eventCh := natsBus.Subscribe("tenant-abc")
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case receivedEvent := <-eventCh:
log.Printf("Received event via NATS: %s (version %d)",
receivedEvent.EventType, receivedEvent.Version)
case <-ctx.Done():
log.Println("Timeout waiting for event")
}
}
// CrossNodeBroadcastingMultiNode demonstrates a multi-node cluster setup.
// This simulates multiple nodes connecting to the same NATS cluster.
// In production, these would be separate processes/machines.
//
// This example requires a running NATS server with JetStream enabled.
func CrossNodeBroadcastingMultiNode() {
// Connect to NATS
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
nodeID := "node-1"
log.Printf("Starting %s", nodeID)
// Each node creates its own NATS event bus and event store
natsBus, err := aether.NewNATSEventBus(nc)
if err != nil {
log.Fatalf("Failed to create NATS event bus: %v", err)
}
defer natsBus.Stop()
store, err := store.NewJetStreamEventStoreWithBroadcaster(
nc,
"events",
natsBus,
"tenant-abc",
)
if err != nil {
log.Fatalf("Failed to create event store: %v", err)
}
// Setup EventStored subscription for cache synchronization
eventStoredCh := natsBus.SubscribeWithFilter(
"tenant-abc",
&aether.SubscriptionFilter{
EventTypes: []string{aether.EventTypeEventStored},
},
)
go func() {
for event := range eventStoredCh {
actorID := event.Data["actorId"].(string)
version := int64(event.Data["version"].(float64))
store.UpdateVersionCache(actorID, version)
log.Printf("[%s] Received EventStored for %s v%d", nodeID, actorID, version)
}
}()
// Subscribe to actual events
eventCh := natsBus.Subscribe("tenant-abc")
log.Printf("[%s] Subscribed to tenant-abc", nodeID)
// Save an event
savedEvent := &aether.Event{
ID: fmt.Sprintf("event-%s-1", nodeID),
EventType: "OrderPlaced",
ActorID: "order-123",
Version: 1,
Data: map[string]interface{}{
"node": nodeID,
"total": 100.00,
},
Timestamp: time.Now(),
}
err = store.SaveEvent(savedEvent)
if err != nil {
log.Fatalf("[%s] Failed to save event: %v", nodeID, err)
}
log.Printf("[%s] Saved event to JetStream", nodeID)
// Wait to receive the event (either from local or remote)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case receivedEvent := <-eventCh:
log.Printf("[%s] Received: %s v%d from %s",
nodeID,
receivedEvent.EventType,
receivedEvent.Version,
receivedEvent.GetCorrelationID(),
)
case <-ctx.Done():
log.Printf("[%s] Timeout waiting for event", nodeID)
}
}
// DistributedOrderProcessing demonstrates a realistic scenario where
// multiple nodes process orders for the same actor. This shows how
// cross-node broadcasting ensures consistency.
func DistributedOrderProcessing() {
// Connect to NATS
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
// Node 1: Order creation
node1Bus, _ := aether.NewNATSEventBus(nc)
node1Store, _ := store.NewJetStreamEventStoreWithBroadcaster(
nc, "events", node1Bus, "orders",
)
defer node1Bus.Stop()
// Setup EventStored subscription for Node 1
eventStoredCh1 := node1Bus.SubscribeWithFilter(
"orders",
&aether.SubscriptionFilter{
EventTypes: []string{aether.EventTypeEventStored},
},
)
go func() {
for event := range eventStoredCh1 {
actorID := event.Data["actorId"].(string)
version := int64(event.Data["version"].(float64))
node1Store.UpdateVersionCache(actorID, version)
}
}()
// Node 2: Order processing (different node)
node2Bus, _ := aether.NewNATSEventBus(nc)
node2Store, _ := store.NewJetStreamEventStoreWithBroadcaster(
nc, "events", node2Bus, "orders",
)
defer node2Bus.Stop()
// Setup EventStored subscription for Node 2
eventStoredCh2 := node2Bus.SubscribeWithFilter(
"orders",
&aether.SubscriptionFilter{
EventTypes: []string{aether.EventTypeEventStored},
},
)
go func() {
for event := range eventStoredCh2 {
actorID := event.Data["actorId"].(string)
version := int64(event.Data["version"].(float64))
node2Store.UpdateVersionCache(actorID, version)
}
}()
// Node 2 also subscribes to events to receive updates
node2EventCh := node2Bus.Subscribe("orders")
// Node 1 creates an order
orderPlaced := &aether.Event{
ID: "order-created",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{
"total": 100.00,
},
Timestamp: time.Now(),
}
if err := node1Store.SaveEvent(orderPlaced); err != nil {
log.Fatalf("Failed to create order: %v", err)
}
log.Println("Node 1: Created order-456")
// Node 2 receives the OrderPlaced event via NATS
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case event := <-node2EventCh:
log.Printf("Node 2: Received %s v%d", event.EventType, event.Version)
// Node 2 processes the order (must use version 2)
orderProcessed := &aether.Event{
ID: "order-processed",
EventType: "OrderProcessed",
ActorID: "order-456",
Version: 2, // Must be > 1
Data: map[string]interface{}{
"status": "shipped",
},
Timestamp: time.Now(),
}
if err := node2Store.SaveEvent(orderProcessed); err != nil {
log.Fatalf("Node 2: Failed to process order: %v", err)
}
log.Println("Node 2: Processed order-456")
case <-ctx.Done():
log.Println("Node 2: Timeout waiting for order event")
}
// Verify event stream consistency
events, err := node1Store.GetEvents("order-456", 0)
if err != nil {
log.Fatalf("Failed to get events: %v", err)
}
log.Printf("Node 1: Event stream has %d events", len(events))
}

4
go.mod
View File

@@ -6,16 +6,19 @@ require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/nats-io/nats.go v1.37.0 github.com/nats-io/nats.go v1.37.0
github.com/prometheus/client_golang v1.23.2 github.com/prometheus/client_golang v1.23.2
github.com/stretchr/testify v1.11.1
) )
require ( require (
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/compress v1.18.0 // indirect
github.com/kr/text v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect
@@ -23,4 +26,5 @@ require (
golang.org/x/crypto v0.18.0 // indirect golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.35.0 // indirect golang.org/x/sys v0.35.0 // indirect
google.golang.org/protobuf v1.36.8 // indirect google.golang.org/protobuf v1.36.8 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
) )

View File

@@ -210,6 +210,26 @@ func (neb *NATSEventBus) Publish(namespaceID string, event *Event) {
} }
} }
// SubscribeToEventStored creates a subscription specifically for EventStored events.
// This is a convenience method for the common pattern of listening to persisted events
// to update version cache or trigger other actions.
//
// Example:
//
// eventStoredCh := natsBus.SubscribeToEventStored("tenant-abc")
// go func() {
// for event := range eventStoredCh {
// actorID := event.Data["actorId"].(string)
// version := int64(event.Data["version"].(float64))
// store.UpdateVersionCache(actorID, version)
// }
// }()
func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event {
return neb.SubscribeWithFilter(namespacePattern, &SubscriptionFilter{
EventTypes: []string{EventTypeEventStored},
})
}
// Stop closes the NATS event bus and all subscriptions // Stop closes the NATS event bus and all subscriptions
func (neb *NATSEventBus) Stop() { func (neb *NATSEventBus) Stop() {
neb.mutex.Lock() neb.mutex.Lock()

36
store/helpers_test.go Normal file
View File

@@ -0,0 +1,36 @@
//go:build integration
// +build integration
package store
import (
"testing"
"github.com/nats-io/nats.go"
)
// getTestNATSConnection returns a NATS connection for testing.
// This helper is used by benchmark tests that require NATS.
func getTestNATSConnection(t *testing.T) *nats.Conn {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Skipf("NATS not available: %v", err)
}
return nc
}
// getVersionFromEvent extracts version from EventStored event data.
func getVersionFromEvent(data map[string]interface{}) int64 {
switch v := data["version"].(type) {
case float64:
return int64(v)
case int64:
return v
case int:
return int64(v)
case uint64:
return int64(v)
default:
return 0
}
}

420
store/integration_test.go Normal file
View File

@@ -0,0 +1,420 @@
//go:build integration
// +build integration
package store
import (
"context"
"fmt"
"log"
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// generateStreamName creates a unique stream name for each test run
func generateStreamName(baseName string) string {
return fmt.Sprintf("%s_%s_%d", baseName, "tv149", time.Now().UnixNano()%100000000)
}
// cleanupStream deletes a JetStream stream if it exists.
func cleanupStream(nc *nats.Conn, streamName string) {
js, err := nc.JetStream()
if err != nil {
return
}
err = js.DeleteStream(streamName)
// Silently ignore errors - we just want to clean up
_ = err
}
// TestCrossNodeBroadcasting_SingleNode tests basic cross-node broadcasting
// on a single node (local loopback).
func TestCrossNodeBroadcasting_SingleNode(t *testing.T) {
nc, err := nats.Connect(nats.DefaultURL)
require.NoError(t, err)
defer nc.Close()
streamName := generateStreamName("broadcast_single")
cleanupStream(nc, streamName)
// Create NATS event bus
natsBus, err := aether.NewNATSEventBus(nc)
require.NoError(t, err)
defer natsBus.Stop()
// Create event store with broadcaster
store, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
natsBus,
"tenant-single",
)
require.NoError(t, err)
// Subscribe to events
eventCh := natsBus.Subscribe("tenant-single")
// Save event
testEvent := &aether.Event{
ID: "event-1",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 1,
Data: map[string]interface{}{"test": "single"},
Timestamp: time.Now(),
}
err = store.SaveEvent(testEvent)
require.NoError(t, err)
log.Printf("Saved event: %s", testEvent.ID)
// Receive event via NATS (NATSBroadcast may have wrapped in EventStored)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
select {
case received := <-eventCh:
// The received event should have the same actor and version
assert.Equal(t, "actor-1", received.ActorID)
assert.Equal(t, int64(1), received.Version)
// Event type might be original or EventStored wrapper
if received.EventType == aether.EventTypeEventStored {
assert.Equal(t, "actor-1", received.Data["actorId"].(string))
log.Printf("Received EventStored wrapper: %s", received.ID)
} else {
log.Printf("Received via NATS: %s (type: %s)", received.ID, received.EventType)
}
case <-ctx.Done():
t.Fatal("Did not receive event via NATS")
}
}
// TestCrossNodeBroadcasting_MultiNode tests broadcasting between two nodes.
func TestCrossNodeBroadcasting_MultiNode(t *testing.T) {
nc, err := nats.Connect(nats.DefaultURL)
require.NoError(t, err)
defer nc.Close()
streamName := generateStreamName("broadcast_multi")
cleanupStream(nc, streamName)
// Create Node A
nodeANatsBus, err := aether.NewNATSEventBus(nc)
require.NoError(t, err)
defer nodeANatsBus.Stop()
nodeAStore, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
nodeANatsBus,
"tenant-multi",
)
require.NoError(t, err)
// Create Node B
nodeBNatsBus, err := aether.NewNATSEventBus(nc)
require.NoError(t, err)
defer nodeBNatsBus.Stop()
nodeBStore, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
nodeBNatsBus,
"tenant-multi",
)
require.NoError(t, err)
// Node A subscribes
nodeAEventCh := nodeANatsBus.Subscribe("tenant-multi")
// Node B subscribes
nodeBEventCh := nodeBNatsBus.Subscribe("tenant-multi")
// Node A saves event
eventA := &aether.Event{
ID: "event-node-a",
EventType: "TestEvent",
ActorID: "multi-actor",
Version: 1,
Data: map[string]interface{}{"node": "a"},
Timestamp: time.Now(),
}
err = nodeAStore.SaveEvent(eventA)
require.NoError(t, err)
log.Printf("Node A saved: %s", eventA.ID)
// Node B receives event (EventStored wrapper or original)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
select {
case received := <-nodeBEventCh:
// Check actor and version match
assert.Equal(t, "multi-actor", received.ActorID)
assert.Equal(t, int64(1), received.Version)
if received.EventType == aether.EventTypeEventStored {
// EventStored wrapper
assert.Equal(t, "multi-actor", received.Data["actorId"].(string))
} else {
// Original event
assert.Equal(t, "a", received.Data["node"])
}
log.Printf("Node B received: %s (actor: %s, version: %d)", received.ID, received.ActorID, received.Version)
case <-ctx.Done():
t.Fatal("Node B did not receive event")
}
// Give Node B time to receive and process Node A's event
time.Sleep(100 * time.Millisecond)
// Node B saves event with different actor
eventB := &aether.Event{
ID: "event-node-b",
EventType: "TestEvent",
ActorID: "multi-actor-b", // Different actor
Version: 1,
Data: map[string]interface{}{"node": "b"},
Timestamp: time.Now(),
}
err = nodeBStore.SaveEvent(eventB)
require.NoError(t, err)
log.Printf("Node B saved: %s", eventB.ID)
// Node A receives Node B's event (could be EventStored or original)
ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel2()
// Wait for either EventStored or the actual event
received := false
for !received {
select {
case event := <-nodeAEventCh:
// Check for Node B's event
if event.EventType == aether.EventTypeEventStored {
// EventStored wrapper - check actorId
actorID := event.Data["actorId"].(string)
if actorID == "multi-actor-b" {
received = true
log.Printf("Node A received EventStored for Node B's event: %s", event.ID)
}
} else if event.ActorID == "multi-actor-b" {
received = true
log.Printf("Node A received Node B's event: %s", event.ID)
}
// Keep listening until we get Node B's event
case <-ctx2.Done():
if !received {
t.Fatal("Node A did not receive Node B's event")
}
}
}
}
// TestUpdateVersionCache tests the version cache update logic.
func TestUpdateVersionCache(t *testing.T) {
nc, err := nats.Connect(nats.DefaultURL)
require.NoError(t, err)
defer nc.Close()
streamName := generateStreamName("cache_test")
cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
require.NoError(t, err)
// Save event version 1
event1 := &aether.Event{
ID: "event-1",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event1)
require.NoError(t, err)
// EventStored will trigger UpdateVersionCache
time.Sleep(100 * time.Millisecond)
// Save event version 2 - should succeed (version > 1)
event2 := &aether.Event{
ID: "event-2",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 2,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2)
require.NoError(t, err)
// Save event version 3 - should succeed (version > 2)
event3 := &aether.Event{
ID: "event-3",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 3,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event3)
require.NoError(t, err)
// Verify all events can be retrieved
events, err := store.GetEvents("actor-1", 0)
require.NoError(t, err)
assert.Len(t, events, 3)
// Verify latest version
latest, err := store.GetLatestVersion("actor-1")
require.NoError(t, err)
assert.Equal(t, int64(3), latest)
// Manually update cache with version 5 (simulating external update)
store.UpdateVersionCache("actor-1", 5)
// Verify version 4 would conflict (4 < cached 5)
event4 := &aether.Event{
ID: "event-4",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 4,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event4)
assert.Error(t, err, "version 4 should conflict with cached version 5")
// Version 6 should succeed (6 > 5)
event6 := &aether.Event{
ID: "event-6",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 6,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event6)
require.NoError(t, err)
// Verify version 6 was saved
latest, err = store.GetLatestVersion("actor-1")
require.NoError(t, err)
assert.Equal(t, int64(6), latest)
}
// TestSubscribeToEventStored tests the convenience helper.
func TestSubscribeToEventStored(t *testing.T) {
nc, err := nats.Connect(nats.DefaultURL)
require.NoError(t, err)
defer nc.Close()
natsBus, err := aether.NewNATSEventBus(nc)
require.NoError(t, err)
defer natsBus.Stop()
// Use helper
eventStoredCh := natsBus.SubscribeToEventStored("test-store")
// Verify channel is created
assert.NotNil(t, eventStoredCh)
// Publish EventStored manually (version will be float64 from JSON)
eventStored := &aether.Event{
ID: "stored-1",
EventType: aether.EventTypeEventStored,
ActorID: "actor-1",
Version: 1,
Data: map[string]interface{}{
"actorId": "actor-1",
"version": 1.0, // Use float64 to match JSON encoding
},
Timestamp: time.Now(),
}
natsBus.Publish("test-store", eventStored)
// Should receive the EventStored
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
select {
case received := <-eventStoredCh:
assert.Equal(t, aether.EventTypeEventStored, received.EventType)
assert.Equal(t, "actor-1", received.ActorID)
log.Printf("Received EventStored: %s", received.ID)
case <-ctx.Done():
t.Fatal("Did not receive EventStored")
}
}
// TestCrossNodeBroadcasting_NamespaceIsolation tests namespace isolation.
func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) {
nc, err := nats.Connect(nats.DefaultURL)
require.NoError(t, err)
defer nc.Close()
streamName := generateStreamName("namespace_isolation")
cleanupStream(nc, streamName)
// Create stores with different namespaces
storeA, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
nil,
"tenant-a",
)
require.NoError(t, err)
storeB, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
nil,
"tenant-b",
)
require.NoError(t, err)
// Save to each namespace
eventA := &aether.Event{
ID: "event-a",
EventType: "TestEvent",
ActorID: "actor-a",
Version: 1,
Data: map[string]interface{}{"tenant": "a"},
Timestamp: time.Now(),
}
err = storeA.SaveEvent(eventA)
require.NoError(t, err)
eventB := &aether.Event{
ID: "event-b",
EventType: "TestEvent",
ActorID: "actor-b",
Version: 1,
Data: map[string]interface{}{"tenant": "b"},
Timestamp: time.Now(),
}
err = storeB.SaveEvent(eventB)
require.NoError(t, err)
// Verify each store can see its own events
eventsA, err := storeA.GetEvents("actor-a", 0)
require.NoError(t, err)
assert.Len(t, eventsA, 1)
assert.Equal(t, "a", eventsA[0].Data["tenant"])
eventsB, err := storeB.GetEvents("actor-b", 0)
require.NoError(t, err)
assert.Len(t, eventsB, 1)
assert.Equal(t, "b", eventsB[0].Data["tenant"])
}

View File

@@ -558,5 +558,18 @@ func sanitizeSubject(s string) string {
return s return s
} }
// UpdateVersionCache updates the version cache for a specific actor.
// This is used when receiving events from other nodes via NATS to keep
// the version cache consistent across cluster nodes.
func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) {
jes.mu.Lock()
defer jes.mu.Unlock()
// Only update if the new version is greater than cached version
if currentVersion, ok := jes.versions[actorID]; !ok || version > currentVersion {
jes.versions[actorID] = version
}
}
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors // Compile-time check that JetStreamEventStore implements EventStoreWithErrors
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil) var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)