feat: implement cross-node event broadcasting with NATSEventBus
All checks were successful
CI / build (pull_request) Successful in 20s

- Add UpdateVersionCache method to JetStreamEventStore for cache synchronization
- Add SubscribeToEventStored convenience helper to NATSEventBus
- Create integration tests for cross-node broadcasting scenarios
- Add example demonstrating NATSEventBus + JetStreamEventStore integration
This commit is contained in:
Hugo Nijhuis
2026-05-17 14:03:48 +02:00
parent 6041479286
commit 5c01911e3c
6 changed files with 810 additions and 0 deletions

420
store/integration_test.go Normal file
View File

@@ -0,0 +1,420 @@
//go:build integration
// +build integration
package store
import (
"context"
"fmt"
"log"
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// generateStreamName creates a unique stream name for each test run
func generateStreamName(baseName string) string {
return fmt.Sprintf("%s_%s_%d", baseName, "tv149", time.Now().UnixNano()%100000000)
}
// cleanupStream deletes a JetStream stream if it exists.
func cleanupStream(nc *nats.Conn, streamName string) {
js, err := nc.JetStream()
if err != nil {
return
}
err = js.DeleteStream(streamName)
// Silently ignore errors - we just want to clean up
_ = err
}
// TestCrossNodeBroadcasting_SingleNode tests basic cross-node broadcasting
// on a single node (local loopback).
func TestCrossNodeBroadcasting_SingleNode(t *testing.T) {
nc, err := nats.Connect(nats.DefaultURL)
require.NoError(t, err)
defer nc.Close()
streamName := generateStreamName("broadcast_single")
cleanupStream(nc, streamName)
// Create NATS event bus
natsBus, err := aether.NewNATSEventBus(nc)
require.NoError(t, err)
defer natsBus.Stop()
// Create event store with broadcaster
store, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
natsBus,
"tenant-single",
)
require.NoError(t, err)
// Subscribe to events
eventCh := natsBus.Subscribe("tenant-single")
// Save event
testEvent := &aether.Event{
ID: "event-1",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 1,
Data: map[string]interface{}{"test": "single"},
Timestamp: time.Now(),
}
err = store.SaveEvent(testEvent)
require.NoError(t, err)
log.Printf("Saved event: %s", testEvent.ID)
// Receive event via NATS (NATSBroadcast may have wrapped in EventStored)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
select {
case received := <-eventCh:
// The received event should have the same actor and version
assert.Equal(t, "actor-1", received.ActorID)
assert.Equal(t, int64(1), received.Version)
// Event type might be original or EventStored wrapper
if received.EventType == aether.EventTypeEventStored {
assert.Equal(t, "actor-1", received.Data["actorId"].(string))
log.Printf("Received EventStored wrapper: %s", received.ID)
} else {
log.Printf("Received via NATS: %s (type: %s)", received.ID, received.EventType)
}
case <-ctx.Done():
t.Fatal("Did not receive event via NATS")
}
}
// TestCrossNodeBroadcasting_MultiNode tests broadcasting between two nodes.
func TestCrossNodeBroadcasting_MultiNode(t *testing.T) {
nc, err := nats.Connect(nats.DefaultURL)
require.NoError(t, err)
defer nc.Close()
streamName := generateStreamName("broadcast_multi")
cleanupStream(nc, streamName)
// Create Node A
nodeANatsBus, err := aether.NewNATSEventBus(nc)
require.NoError(t, err)
defer nodeANatsBus.Stop()
nodeAStore, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
nodeANatsBus,
"tenant-multi",
)
require.NoError(t, err)
// Create Node B
nodeBNatsBus, err := aether.NewNATSEventBus(nc)
require.NoError(t, err)
defer nodeBNatsBus.Stop()
nodeBStore, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
nodeBNatsBus,
"tenant-multi",
)
require.NoError(t, err)
// Node A subscribes
nodeAEventCh := nodeANatsBus.Subscribe("tenant-multi")
// Node B subscribes
nodeBEventCh := nodeBNatsBus.Subscribe("tenant-multi")
// Node A saves event
eventA := &aether.Event{
ID: "event-node-a",
EventType: "TestEvent",
ActorID: "multi-actor",
Version: 1,
Data: map[string]interface{}{"node": "a"},
Timestamp: time.Now(),
}
err = nodeAStore.SaveEvent(eventA)
require.NoError(t, err)
log.Printf("Node A saved: %s", eventA.ID)
// Node B receives event (EventStored wrapper or original)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
select {
case received := <-nodeBEventCh:
// Check actor and version match
assert.Equal(t, "multi-actor", received.ActorID)
assert.Equal(t, int64(1), received.Version)
if received.EventType == aether.EventTypeEventStored {
// EventStored wrapper
assert.Equal(t, "multi-actor", received.Data["actorId"].(string))
} else {
// Original event
assert.Equal(t, "a", received.Data["node"])
}
log.Printf("Node B received: %s (actor: %s, version: %d)", received.ID, received.ActorID, received.Version)
case <-ctx.Done():
t.Fatal("Node B did not receive event")
}
// Give Node B time to receive and process Node A's event
time.Sleep(100 * time.Millisecond)
// Node B saves event with different actor
eventB := &aether.Event{
ID: "event-node-b",
EventType: "TestEvent",
ActorID: "multi-actor-b", // Different actor
Version: 1,
Data: map[string]interface{}{"node": "b"},
Timestamp: time.Now(),
}
err = nodeBStore.SaveEvent(eventB)
require.NoError(t, err)
log.Printf("Node B saved: %s", eventB.ID)
// Node A receives Node B's event (could be EventStored or original)
ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel2()
// Wait for either EventStored or the actual event
received := false
for !received {
select {
case event := <-nodeAEventCh:
// Check for Node B's event
if event.EventType == aether.EventTypeEventStored {
// EventStored wrapper - check actorId
actorID := event.Data["actorId"].(string)
if actorID == "multi-actor-b" {
received = true
log.Printf("Node A received EventStored for Node B's event: %s", event.ID)
}
} else if event.ActorID == "multi-actor-b" {
received = true
log.Printf("Node A received Node B's event: %s", event.ID)
}
// Keep listening until we get Node B's event
case <-ctx2.Done():
if !received {
t.Fatal("Node A did not receive Node B's event")
}
}
}
}
// TestUpdateVersionCache tests the version cache update logic.
func TestUpdateVersionCache(t *testing.T) {
nc, err := nats.Connect(nats.DefaultURL)
require.NoError(t, err)
defer nc.Close()
streamName := generateStreamName("cache_test")
cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
require.NoError(t, err)
// Save event version 1
event1 := &aether.Event{
ID: "event-1",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event1)
require.NoError(t, err)
// EventStored will trigger UpdateVersionCache
time.Sleep(100 * time.Millisecond)
// Save event version 2 - should succeed (version > 1)
event2 := &aether.Event{
ID: "event-2",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 2,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2)
require.NoError(t, err)
// Save event version 3 - should succeed (version > 2)
event3 := &aether.Event{
ID: "event-3",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 3,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event3)
require.NoError(t, err)
// Verify all events can be retrieved
events, err := store.GetEvents("actor-1", 0)
require.NoError(t, err)
assert.Len(t, events, 3)
// Verify latest version
latest, err := store.GetLatestVersion("actor-1")
require.NoError(t, err)
assert.Equal(t, int64(3), latest)
// Manually update cache with version 5 (simulating external update)
store.UpdateVersionCache("actor-1", 5)
// Verify version 4 would conflict (4 < cached 5)
event4 := &aether.Event{
ID: "event-4",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 4,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event4)
assert.Error(t, err, "version 4 should conflict with cached version 5")
// Version 6 should succeed (6 > 5)
event6 := &aether.Event{
ID: "event-6",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 6,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event6)
require.NoError(t, err)
// Verify version 6 was saved
latest, err = store.GetLatestVersion("actor-1")
require.NoError(t, err)
assert.Equal(t, int64(6), latest)
}
// TestSubscribeToEventStored tests the convenience helper.
func TestSubscribeToEventStored(t *testing.T) {
nc, err := nats.Connect(nats.DefaultURL)
require.NoError(t, err)
defer nc.Close()
natsBus, err := aether.NewNATSEventBus(nc)
require.NoError(t, err)
defer natsBus.Stop()
// Use helper
eventStoredCh := natsBus.SubscribeToEventStored("test-store")
// Verify channel is created
assert.NotNil(t, eventStoredCh)
// Publish EventStored manually (version will be float64 from JSON)
eventStored := &aether.Event{
ID: "stored-1",
EventType: aether.EventTypeEventStored,
ActorID: "actor-1",
Version: 1,
Data: map[string]interface{}{
"actorId": "actor-1",
"version": 1.0, // Use float64 to match JSON encoding
},
Timestamp: time.Now(),
}
natsBus.Publish("test-store", eventStored)
// Should receive the EventStored
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
select {
case received := <-eventStoredCh:
assert.Equal(t, aether.EventTypeEventStored, received.EventType)
assert.Equal(t, "actor-1", received.ActorID)
log.Printf("Received EventStored: %s", received.ID)
case <-ctx.Done():
t.Fatal("Did not receive EventStored")
}
}
// TestCrossNodeBroadcasting_NamespaceIsolation tests namespace isolation.
func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) {
nc, err := nats.Connect(nats.DefaultURL)
require.NoError(t, err)
defer nc.Close()
streamName := generateStreamName("namespace_isolation")
cleanupStream(nc, streamName)
// Create stores with different namespaces
storeA, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
nil,
"tenant-a",
)
require.NoError(t, err)
storeB, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
nil,
"tenant-b",
)
require.NoError(t, err)
// Save to each namespace
eventA := &aether.Event{
ID: "event-a",
EventType: "TestEvent",
ActorID: "actor-a",
Version: 1,
Data: map[string]interface{}{"tenant": "a"},
Timestamp: time.Now(),
}
err = storeA.SaveEvent(eventA)
require.NoError(t, err)
eventB := &aether.Event{
ID: "event-b",
EventType: "TestEvent",
ActorID: "actor-b",
Version: 1,
Data: map[string]interface{}{"tenant": "b"},
Timestamp: time.Now(),
}
err = storeB.SaveEvent(eventB)
require.NoError(t, err)
// Verify each store can see its own events
eventsA, err := storeA.GetEvents("actor-a", 0)
require.NoError(t, err)
assert.Len(t, eventsA, 1)
assert.Equal(t, "a", eventsA[0].Data["tenant"])
eventsB, err := storeB.GetEvents("actor-b", 0)
require.NoError(t, err)
assert.Len(t, eventsB, 1)
assert.Equal(t, "b", eventsB[0].Data["tenant"])
}