feat: implement cross-node event broadcasting with NATSEventBus #151

Merged
HugoNijhuis merged 2 commits from issue-149-cross-node-broadcasting into main 2026-05-17 15:29:53 +00:00
5 changed files with 796 additions and 361 deletions
Showing only changes of commit 5fb68fed4a - Show all commits

View File

@@ -0,0 +1,168 @@
// Package main demonstrates cross-node event broadcasting using NATSEventBus
// and JetStreamEventStore for cluster synchronization.
//
// This example shows:
// 1. Setting up NATSEventBus with JetStreamEventStore
// 2. Broadcasting events across NATS for cross-node distribution
// 3. Subscribing to EventStored events for version cache synchronization
// 4. Properly handling EventStored events from other cluster nodes
//
// Prerequisites:
// - NATS server running with JetStream enabled (nats-server -js)
// - Events stream created in JetStream
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"git.flowmade.one/flowmade-one/aether"
"git.flowmade.one/flowmade-one/aether/store"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)
func main() {
natsURL := getEnv("NATS_URL", "nats://localhost:4222")
nc, err := nats.Connect(natsURL)
if err != nil {
log.Fatal("Failed to connect to NATS:", err)
}
defer nc.Close()
ctx := context.Background()
store1, err := store.NewJetStreamEventStore(nc, "events")
if err != nil {
log.Fatal("Failed to create event store:", err)
}
eventBus1 := aether.NewNATSEventBusWithBroadcaster(nc, store1, "")
defer eventBus1.Stop()
store2, err := store.NewJetStreamEventStore(nc, "events")
if err != nil {
log.Fatal("Failed to create event store:", err)
}
eventBus2 := aether.NewNATSEventBusWithBroadcaster(nc, store2, "")
defer eventBus2.Stop()
eventStoredCh1 := eventBus1.SubscribeToEventStored("*")
eventStoredCh2 := eventBus2.SubscribeToEventStored("*")
done := make(chan struct{})
go processEvents(ctx, eventStoredCh1, store1, done)
go processEvents(ctx, eventStoredCh2, store2, done)
go func() {
time.Sleep(2 * time.Second)
actorID := "demo-actor"
event1 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{
"total": 99.99,
"status": "pending",
},
Timestamp: time.Now(),
}
log.Printf("Node 1 publishing event: %s", event1.EventType)
eventBus1.Publish("", event1)
time.Sleep(500 * time.Millisecond)
event2 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPaid",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{
"total": 99.99,
"status": "paid",
"method": "credit_card",
},
Timestamp: time.Now(),
}
log.Printf("Node 2 publishing event: %s", event2.EventType)
eventBus2.Publish("", event2)
time.Sleep(2 * time.Second)
close(done)
log.Println("Cross-node broadcasting demo complete")
}()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigCh:
log.Println("Shutting down...")
case <-done:
}
}
func processEvents(ctx context.Context, eventStoredCh <-chan *aether.Event, eventStore *store.JetStreamEventStore, done chan struct{}) {
for {
select {
case <-done:
return
case <-ctx.Done():
return
case event, ok := <-eventStoredCh:
if !ok {
return
}
if event == nil {
continue
}
if event.EventType != aether.EventTypeEventStored {
continue
}
actorID, ok := event.Data["actorId"].(string)
if !ok {
log.Printf("Warning: EventStored missing actorId")
continue
}
version, ok := event.Data["version"].(int64)
if !ok {
log.Printf("Warning: EventStored missing version")
continue
}
eventID, _ := event.Data["eventId"].(string)
log.Printf("Received EventStored: actor=%s, version=%d, eventId=%s", actorID, version, eventID)
eventStore.UpdateVersionCache(actorID, version)
currentVersion, _ := eventStore.GetLatestVersion(actorID)
log.Printf("Updated cache: %s now has version %d (cached: %d)", actorID, version, currentVersion)
}
}
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}

View File

@@ -1,353 +0,0 @@
package examples
import (
"errors"
"fmt"
"log"
"math"
"math/rand"
"time"
"git.flowmade.one/flowmade-one/aether"
)
// SimpleRetryPattern demonstrates a basic retry loop using VersionConflictError.
//
// This pattern is suitable for scenarios where you want to automatically retry
// with exponential backoff when version conflicts occur.
func SimpleRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
const initialBackoff = 100 * time.Millisecond
var event *aether.Event
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * initialBackoff
log.Printf("Retry attempt %d after %v", attempt, backoff)
time.Sleep(backoff)
}
// Get the current version for the actor
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
}
// Create event with next version
event = &aether.Event{
ID: fmt.Sprintf("evt-%d-%d", time.Now().UnixNano(), attempt),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{"attempt": attempt},
Timestamp: time.Now(),
}
// Attempt to save
if err := store.SaveEvent(event); err == nil {
log.Printf("Successfully saved event for actor %s at version %d", actorID, event.Version)
return nil
} else if !errors.Is(err, aether.ErrVersionConflict) {
// Some other error occurred
return fmt.Errorf("save event failed: %w", err)
}
// If it's a version conflict, loop will retry
}
return fmt.Errorf("failed to save event after %d retries", maxRetries)
}
// ConflictDetailedRetryPattern demonstrates how to extract detailed information
// from VersionConflictError to make intelligent retry decisions.
//
// This pattern shows how to log detailed context and potentially implement
// circuit-breaker logic based on the conflict information.
func ConflictDetailedRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 5
var lastConflictVersion int64
for attempt := 0; attempt < maxRetries; attempt++ {
// Get current version
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
// Create event
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{"timestamp": time.Now()},
Timestamp: time.Now(),
}
// Attempt to save
err = store.SaveEvent(event)
if err == nil {
return nil // Success
}
// Check if it's a version conflict
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
// Not a version conflict, fail immediately
return err
}
// Extract detailed context from the conflict error
log.Printf(
"Version conflict for actor %q: attempted version %d, current version %d",
versionErr.ActorID,
versionErr.AttemptedVersion,
versionErr.CurrentVersion,
)
// Check for thrashing (multiple conflicts with same version)
if lastConflictVersion == versionErr.CurrentVersion && attempt > 0 {
log.Printf("Detected version thrashing - circuit breaker would trigger here")
return fmt.Errorf("circuit breaker: too many conflicts at version %d", versionErr.CurrentVersion)
}
lastConflictVersion = versionErr.CurrentVersion
// Exponential backoff
backoff := time.Duration(math.Pow(2, float64(attempt))) * 100 * time.Millisecond
time.Sleep(backoff)
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// JitterRetryPattern implements exponential backoff with jitter to prevent
// thundering herd when multiple writers retry simultaneously.
func JitterRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
const baseBackoff = 100 * time.Millisecond
const maxJitter = 0.1 // 10% jitter
for attempt := 0; attempt < maxRetries; attempt++ {
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
return nil
}
if !errors.Is(err, aether.ErrVersionConflict) {
return err
}
// Exponential backoff with jitter
exponentialBackoff := time.Duration(math.Pow(2, float64(attempt))) * baseBackoff
jitter := time.Duration(rand.Float64() * float64(exponentialBackoff) * maxJitter)
totalBackoff := exponentialBackoff + jitter
log.Printf("Retrying in %v (attempt %d/%d)", totalBackoff, attempt+1, maxRetries)
time.Sleep(totalBackoff)
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// AdaptiveRetryPattern adjusts retry strategy based on version conflict patterns.
//
// This pattern demonstrates how application logic can use CurrentVersion to
// decide whether to retry, give up, or escalate to a higher-level handler.
func AdaptiveRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
for attempt := 0; attempt < maxRetries; attempt++ {
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
return nil
}
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
return err
}
// Adaptive backoff based on version distance
versionDistance := versionErr.CurrentVersion - versionErr.AttemptedVersion
if versionDistance > 10 {
// Many concurrent writers - back off more aggressively
log.Printf("High contention detected (gap: %d), aggressive backoff", versionDistance)
time.Sleep(time.Duration(versionDistance*10) * time.Millisecond)
} else if versionDistance > 3 {
// Moderate contention - normal backoff
log.Printf("Moderate contention detected (gap: %d)", versionDistance)
time.Sleep(time.Duration(versionDistance) * time.Millisecond)
} else {
// Light contention - minimal backoff
log.Printf("Light contention detected")
time.Sleep(50 * time.Millisecond)
}
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// EventualConsistencyPattern demonstrates how to handle version conflicts
// in an eventually consistent manner by publishing to a retry queue.
//
// This is useful when immediate retry is not feasible, and you want to
// defer the operation to a background worker.
type RetryQueueItem struct {
Event *aether.Event
ConflictVersion int64
ConflictAttempted int64
NextRetryTime time.Time
FailureCount int
}
func EventualConsistencyPattern(store aether.EventStore, retryQueue chan<- RetryQueueItem, event *aether.Event) {
err := store.SaveEvent(event)
if err == nil {
return
}
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
log.Printf("Non-retryable error: %v", err)
return
}
// Queue for retry - background worker will process this
retryItem := RetryQueueItem{
Event: event,
ConflictVersion: versionErr.CurrentVersion,
ConflictAttempted: versionErr.AttemptedVersion,
NextRetryTime: time.Now().Add(1 * time.Second),
FailureCount: 0,
}
select {
case retryQueue <- retryItem:
log.Printf("Queued event for retry: actor=%s", event.ActorID)
case <-time.After(5 * time.Second):
log.Printf("Failed to queue event for retry (queue full)")
}
}
// CircuitBreakerPattern implements a simple circuit breaker for version conflicts.
//
// The circuit breaker tracks failure rates and temporarily stops retrying
// when the failure rate gets too high, allowing the system to recover.
type CircuitBreaker struct {
failureCount int
successCount int
state string // "closed", "open", "half-open"
lastFailureTime time.Time
openDuration time.Duration
failureThreshold int
successThreshold int
}
func NewCircuitBreaker() *CircuitBreaker {
return &CircuitBreaker{
state: "closed",
openDuration: 30 * time.Second,
failureThreshold: 5,
successThreshold: 3,
}
}
func (cb *CircuitBreaker) RecordSuccess() {
if cb.state == "half-open" {
cb.successCount++
if cb.successCount >= cb.successThreshold {
cb.state = "closed"
cb.failureCount = 0
cb.successCount = 0
log.Printf("Circuit breaker closed")
}
}
}
func (cb *CircuitBreaker) RecordFailure() {
cb.lastFailureTime = time.Now()
cb.failureCount++
if cb.failureCount >= cb.failureThreshold {
cb.state = "open"
log.Printf("Circuit breaker opened")
}
}
func (cb *CircuitBreaker) CanRetry() bool {
if cb.state == "closed" {
return true
}
if cb.state == "open" {
if time.Since(cb.lastFailureTime) > cb.openDuration {
cb.state = "half-open"
cb.failureCount = 0
cb.successCount = 0
log.Printf("Circuit breaker half-open")
return true
}
return false
}
// half-open state allows retries
return true
}
func CircuitBreakerRetryPattern(store aether.EventStore, cb *CircuitBreaker, actorID string, eventType string) error {
if !cb.CanRetry() {
return fmt.Errorf("circuit breaker open - not retrying")
}
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
cb.RecordSuccess()
return nil
}
if !errors.Is(err, aether.ErrVersionConflict) {
return err
}
cb.RecordFailure()
return fmt.Errorf("save failed with version conflict, circuit breaker status: %s", cb.state)
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"strings"
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
@@ -24,6 +25,8 @@ type NATSEventBus struct {
subscriptions []*nats.Subscription subscriptions []*nats.Subscription
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards) patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
nodeID string // Unique ID for this node nodeID string // Unique ID for this node
streamPrefix string // NATS subject prefix for events
eventStore interface{} // Optional event store for version cache sync (jetstream.JetStreamEventStore)
mutex sync.Mutex mutex sync.Mutex
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@@ -46,6 +49,7 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
nodeID: uuid.New().String(), nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0), subscriptions: make([]*nats.Subscription, 0),
patternSubscribers: make(map[string]int), patternSubscribers: make(map[string]int),
streamPrefix: "aether",
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
} }
@@ -53,6 +57,43 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
return neb, nil return neb, nil
} }
// NewNATSEventBusWithBroadcaster creates a new NATS-backed event bus with JetStreamEventStore integration.
// The event store is used to automatically update version cache when EventStored events are received
// from other cluster nodes via NATS. This ensures cross-node version consistency.
//
// Example:
//
// eventBus := aether.NewNATSEventBusWithBroadcaster(natsConn, store, "tenant-abc")
// ch := eventBus.SubscribeToEventStored("tenant-*")
// for event := range ch {
// actorID := event.Data["actorId"].(string)
// version := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
//
// The namespace parameter is used as a prefix for EventStored event filtering.
// If empty, EventStored events from all namespaces will be received (requires wildcard pattern).
func NewNATSEventBusWithBroadcaster(nc *nats.Conn, store interface{}, namespace string) *NATSEventBus {
streamPrefix := "aether"
if namespace != "" {
streamPrefix = fmt.Sprintf("aether.%s", sanitizeSubject(namespace))
}
neb := &NATSEventBus{
EventBus: NewEventBus(),
nc: nc,
nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0),
patternSubscribers: make(map[string]int),
streamPrefix: streamPrefix,
eventStore: store,
ctx: context.Background(),
cancel: func() {},
}
return neb
}
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern. // Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
// Supports NATS subject patterns: // Supports NATS subject patterns:
// - "*" matches a single token // - "*" matches a single token
@@ -228,3 +269,103 @@ func (neb *NATSEventBus) Stop() {
log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID) log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID)
} }
// sanitizeSubject sanitizes a string for use in NATS subjects
func sanitizeSubject(s string) string {
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, ".", "_")
s = strings.ReplaceAll(s, "*", "_")
s = strings.ReplaceAll(s, ">", "_")
return s
}
// extractActorType extracts the actor type from an actor ID
func extractActorType(actorID string) string {
for i, c := range actorID {
if c == '-' && i > 0 {
return actorID[:i]
}
}
return "unknown"
}
// SubscribeToEventStored creates a subscription to EventStored events for a namespace pattern.
// EventStored events are published by JetStreamEventStore when events are successfully saved.
// This is useful for cross-node event synchronization and version cache consistency.
//
// The returned channel receives EventStored events matching the pattern.
// The EventStored event schema:
// - EventType: "EventStored"
// - ActorID: ID of the actor that the original event was about
// - Version: version of the stored event
// - Data:
// - eventId: (string) ID of the stored event
// - actorId: (string) ID of the actor
// - version: (int64) version of the event
// - timestamp: (int64) Unix timestamp of when the event was stored
//
// The namespacePattern supports NATS wildcards:
// - "*" matches a single token
// - ">" matches one or more tokens (only at the end)
//
// Example:
//
// ch := eventBus.SubscribeToEventStored("tenant-*")
// for event := range ch {
// if event.EventType != aether.EventTypeEventStored {
// continue
// }
// actorID := event.Data["actorId"].(string)
// version, _ := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
//
// Security Warning: Using wildcard patterns like ">" will receive EventStored events
// from all namespaces. Ensure your application handles this appropriately.
func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event {
neb.mutex.Lock()
defer neb.mutex.Unlock()
subject := fmt.Sprintf("%s.%s.%s", neb.streamPrefix, namespacePattern, "events.>")
ch := make(chan *Event, 100)
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
var eventMsg eventMessage
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
log.Printf("[NATSEventBus] Failed to unmarshal EventStored event: %v", err)
return
}
if eventMsg.NodeID == neb.nodeID {
return
}
if eventMsg.Event.EventType == EventTypeEventStored && neb.eventStore != nil {
actorID, ok := eventMsg.Event.Data["actorId"].(string)
if !ok {
return
}
version, ok := eventMsg.Event.Data["version"].(int64)
if !ok {
return
}
// Use type assertion to call UpdateVersionCache
if es, ok := neb.eventStore.(interface{ UpdateVersionCache(string, int64) }); ok {
es.UpdateVersionCache(actorID, version)
}
}
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
})
if err != nil {
log.Printf("[NATSEventBus] Failed to subscribe to EventStored: %v", err)
close(ch)
return ch
}
neb.subscriptions = append(neb.subscriptions, sub)
return ch
}

431
store/integration_test.go Normal file
View File

@@ -0,0 +1,431 @@
//go:build integration
package store
import (
"context"
"log"
"os"
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats-server/v2/server"
)
func setupNatsServer() (*server.Server, *nats.Conn, func()) {
opts := &server.Options{
Port: -1,
JetStream: true,
StoreDir: "/tmp/nats-test-" + time.Now().Format("20060102150405"),
}
s, err := server.NewServer(opts)
if err != nil {
log.Fatal("Failed to create NATS server:", err)
}
go s.Start()
if !s.ReadyForConnections(4 * time.Second) {
log.Fatal("NATS server failed to start")
}
nc, err := nats.Connect(s.ClientURL())
if err != nil {
s.Shutdown()
log.Fatal("Failed to connect to NATS:", err)
}
return s, nc, func() {
nc.Close()
s.Shutdown()
os.RemoveAll(opts.StoreDir)
}
}
func TestUpdateVersionCache(t *testing.T) {
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_update_cache")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
actorID := "test-actor-1"
tests := []struct {
name string
cachedVersion int64
newVersion int64
expectUpdate bool
expectVersion int64
}{
{
name: "update when new version is greater",
cachedVersion: 5,
newVersion: 10,
expectUpdate: true,
expectVersion: 10,
},
{
name: "do not update when new version is equal",
cachedVersion: 5,
newVersion: 5,
expectUpdate: false,
expectVersion: 5,
},
{
name: "do not update when new version is less",
cachedVersion: 10,
newVersion: 5,
expectUpdate: false,
expectVersion: 10,
},
{
name: "update when no cached version exists",
cachedVersion: 0,
newVersion: 1,
expectUpdate: true,
expectVersion: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Set up cached version
store.versions = make(map[string]int64)
store.versions[actorID] = tt.cachedVersion
// Call UpdateVersionCache
store.UpdateVersionCache(actorID, tt.newVersion)
// Verify result
if tt.expectUpdate {
if version, ok := store.versions[actorID]; !ok {
t.Error("Expected version to be updated but it wasn't cached")
} else if version != tt.expectVersion {
t.Errorf("Expected version %d, got %d", tt.expectVersion, version)
}
} else {
if version, ok := store.versions[actorID]; !ok {
t.Error("Expected version to remain cached")
} else if version != tt.expectVersion {
t.Errorf("Expected version to remain %d, got %d", tt.expectVersion, version)
}
}
})
}
}
func TestUpdateVersionCache_Concurrent(t *testing.T) {
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_update_cache_concurrent")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
actorID := "concurrent-actor"
store.versions[actorID] = 1
const numGoroutines = 50
const maxVersion = 100
var done = make(chan struct{})
var updates int32
for i := 0; i < numGoroutines; i++ {
version := int64(1 + (i % maxVersion))
go func(v int64) {
store.UpdateVersionCache(actorID, v)
select {
case <-done:
default:
updates++
}
}(version)
}
close(done)
time.Sleep(100 * time.Millisecond)
finalVersion := store.versions[actorID]
if finalVersion > maxVersion {
t.Errorf("Expected version to be at most %d, got %d", maxVersion, finalVersion)
}
}
func TestSubscribeToEventStored(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_subscribe_event_stored")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
eventBusWithStore := NewNATSEventBusWithBroadcaster(nc, store, "")
if eventBusWithStore == nil {
t.Fatalf("Failed to create event bus with broadcaster")
}
defer eventBusWithStore.Stop()
ch := eventBusWithStore.SubscribeToEventStored("*")
if ch == nil {
t.Fatal("SubscribeToEventStored returned nil channel")
}
actorID := "subscribe-test-actor"
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TestEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"key": "value"},
Timestamp: time.Now(),
}
eventBusWithStore.Publish("", event)
select {
case receivedEvent := <-ch:
if receivedEvent.EventType != aether.EventTypeEventStored {
t.Errorf("Expected EventTypeEventStored, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
data, ok := receivedEvent.Data["actorId"].(string)
if !ok || data != actorID {
t.Errorf("Expected actorId in data to be %s", actorID)
}
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for EventStored event")
}
}
func TestCrossNodeBroadcasting_SingleNode(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_single_node_broadcast")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
defer eventBus.Stop()
actorID := "broadcast-test-actor-1"
localCh := eventBus.Subscribe("")
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"total": 99.99},
Timestamp: time.Now(),
}
eventBus.Publish("", event)
select {
case receivedEvent := <-localCh:
if receivedEvent.EventType != "OrderPlaced" {
t.Errorf("Expected OrderPlaced, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for broadcast event")
}
}
func TestCrossNodeBroadcasting_MultiNode(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s1, nc1, cleanup1 := setupNatsServer()
defer cleanup1()
s2, nc2, cleanup2 := setupNatsServer()
defer cleanup2()
ctx := context.Background()
store1, err := NewJetStreamEventStore(nc1, "test_multi_node_1")
if err != nil {
t.Fatalf("Failed to create store 1: %v", err)
}
store2, err := NewJetStreamEventStore(nc2, "test_multi_node_2")
if err != nil {
t.Fatalf("Failed to create store 2: %v", err)
}
eventBus1 := NewNATSEventBusWithBroadcaster(nc1, store1, "")
eventBus2 := NewNATSEventBusWithBroadcaster(nc2, store2, "")
defer eventBus1.Stop()
defer eventBus2.Stop()
actorID := "multi-node-actor"
receiverCh := eventBus2.Subscribe("")
event := &aether.Event{
ID: uuid.New().String(),
EventType: "InventoryReserved",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"quantity": 5},
Timestamp: time.Now(),
}
eventBus1.Publish("", event)
select {
case receivedEvent := <-receiverCh:
if receivedEvent.EventType != "InventoryReserved" {
t.Errorf("Expected InventoryReserved, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
case <-time.After(3 * time.Second):
t.Fatal("Timeout waiting for cross-node event")
}
}
func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
tenantAStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-a")
if err != nil {
t.Fatalf("Failed to create tenant A store: %v", err)
}
tenantBStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-b")
if err != nil {
t.Fatalf("Failed to create tenant B store: %v", err)
}
tenantAEventBus := NewNATSEventBusWithBroadcaster(nc, tenantAStore, "tenant-a")
tenantBEventBus := NewNATSEventBusWithBroadcaster(nc, tenantBStore, "tenant-b")
defer tenantAEventBus.Stop()
defer tenantBEventBus.Stop()
tenantACh := tenantAEventBus.Subscribe("tenant-a")
tenantBCh := tenantBEventBus.Subscribe("tenant-b")
actorID := "tenant-actor"
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TenantEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"data": "tenant-a"},
Timestamp: time.Now(),
}
tenantAEventBus.Publish("tenant-a", event)
select {
case receivedEvent := <-tenantACh:
if receivedEvent.EventType != "TenantEvent" {
t.Errorf("Expected TenantEvent in tenant A, got %s", receivedEvent.EventType)
}
case <-time.After(2 * time.Second):
t.Error("Timeout waiting for tenant A to receive event")
}
select {
case <-tenantBCh:
t.Error("Tenant B should not receive tenant A's events")
case <-time.After(1 * time.Second):
// Expected - tenant B should not receive events from tenant A
}
}
func TestUpdateVersionCache_EventStored(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_version_cache_eventstored")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
defer eventBus.Stop()
actorID := "version-cache-actor"
store.UpdateVersionCache(actorID, 5)
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TestEvent",
ActorID: actorID,
Version: 10,
Data: map[string]interface{}{"test": true},
Timestamp: time.Now(),
}
eventBus.Publish("", event)
time.Sleep(100 * time.Millisecond)
storedVersion, err := store.GetLatestVersion(actorID)
if err != nil {
t.Fatalf("Failed to get latest version: %v", err)
}
if storedVersion != 10 {
t.Errorf("Expected version 10, got %d", storedVersion)
}
cacheVersion, ok := store.GetCachedVersion(actorID)
if !ok {
t.Error("Expected version to be in cache")
} else if cacheVersion != 10 {
t.Errorf("Expected cached version 10, got %d", cacheVersion)
}
}

View File

@@ -1,6 +1,7 @@
package store package store
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings" "strings"
@@ -286,6 +287,28 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
// publishEventStored publishes an EventStored event to the broadcaster. // publishEventStored publishes an EventStored event to the broadcaster.
// This is called after a successful SaveEvent to notify subscribers. // This is called after a successful SaveEvent to notify subscribers.
//
// EventStored Event Schema:
// - EventType: "EventStored" (aether.EventTypeEventStored)
// - ActorID: ID of the actor that the original event was about
// - Version: version of the stored event
// - Data:
// - eventId: (string) ID of the stored event
// - actorId: (string) ID of the actor
// - version: (int64) version of the event
// - timestamp: (int64) Unix timestamp of when the event was stored
//
// Example usage with NATSEventBus:
//
// eventBus := aether.NewNATSEventBus(natsConn)
// store := store.NewJetStreamEventStoreWithBroadcaster(natsConn, "events", eventBus, "")
// ch := eventBus.SubscribeToEventStored("*")
//
// for event := range ch {
// actorID := event.Data["actorId"].(string)
// version := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) { func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
eventStored := &aether.Event{ eventStored := &aether.Event{
ID: uuid.New().String(), ID: uuid.New().String(),
@@ -561,6 +584,9 @@ func sanitizeSubject(s string) string {
// UpdateVersionCache updates the version cache for a specific actor. // UpdateVersionCache updates the version cache for a specific actor.
// This is used when receiving events from other nodes via NATS to keep // This is used when receiving events from other nodes via NATS to keep
// the version cache consistent across cluster nodes. // the version cache consistent across cluster nodes.
//
// Only updates if the new version is greater than the cached version to prevent
// stale cache entries from causing version conflicts.
func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) { func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) {
jes.mu.Lock() jes.mu.Lock()
defer jes.mu.Unlock() defer jes.mu.Unlock()
@@ -571,5 +597,27 @@ func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64
} }
} }
// GetCachedVersion returns the cached version for an actor, if available.
func (jes *JetStreamEventStore) GetCachedVersion(actorID string) (int64, bool) {
jes.mu.Lock()
defer jes.mu.Unlock()
version, ok := jes.versions[actorID]
return version, ok
}
// SetBroadcaster sets the event broadcaster for this store.
// The broadcaster is used to publish EventStored events when events are saved.
func (jes *JetStreamEventStore) SetBroadcaster(broadcaster aether.EventBroadcaster) {
jes.mu.Lock()
defer jes.mu.Unlock()
jes.broadcaster = broadcaster
}
// Close closes the JetStream event store and cleans up resources.
func (jes *JetStreamEventStore) Close(ctx context.Context) error {
return nil
}
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors // Compile-time check that JetStreamEventStore implements EventStoreWithErrors
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil) var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)