diff --git a/examples/cross_node_broadcasting.go b/examples/cross_node_broadcasting.go new file mode 100644 index 0000000..1b635bc --- /dev/null +++ b/examples/cross_node_broadcasting.go @@ -0,0 +1,168 @@ +// Package main demonstrates cross-node event broadcasting using NATSEventBus +// and JetStreamEventStore for cluster synchronization. +// +// This example shows: +// 1. Setting up NATSEventBus with JetStreamEventStore +// 2. Broadcasting events across NATS for cross-node distribution +// 3. Subscribing to EventStored events for version cache synchronization +// 4. Properly handling EventStored events from other cluster nodes +// +// Prerequisites: +// - NATS server running with JetStream enabled (nats-server -js) +// - Events stream created in JetStream +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + "time" + + "git.flowmade.one/flowmade-one/aether" + "git.flowmade.one/flowmade-one/aether/store" + "github.com/google/uuid" + "github.com/nats-io/nats.go" +) + +func main() { + natsURL := getEnv("NATS_URL", "nats://localhost:4222") + + nc, err := nats.Connect(natsURL) + if err != nil { + log.Fatal("Failed to connect to NATS:", err) + } + defer nc.Close() + + ctx := context.Background() + + store1, err := store.NewJetStreamEventStore(nc, "events") + if err != nil { + log.Fatal("Failed to create event store:", err) + } + + eventBus1 := aether.NewNATSEventBusWithBroadcaster(nc, store1, "") + defer eventBus1.Stop() + + store2, err := store.NewJetStreamEventStore(nc, "events") + if err != nil { + log.Fatal("Failed to create event store:", err) + } + + eventBus2 := aether.NewNATSEventBusWithBroadcaster(nc, store2, "") + defer eventBus2.Stop() + + eventStoredCh1 := eventBus1.SubscribeToEventStored("*") + eventStoredCh2 := eventBus2.SubscribeToEventStored("*") + + done := make(chan struct{}) + + go processEvents(ctx, eventStoredCh1, store1, done) + go processEvents(ctx, eventStoredCh2, store2, done) + + go func() { + time.Sleep(2 * time.Second) + + actorID := "demo-actor" + + event1 := &aether.Event{ + ID: uuid.New().String(), + EventType: "OrderPlaced", + ActorID: actorID, + Version: 1, + Data: map[string]interface{}{ + "total": 99.99, + "status": "pending", + }, + Timestamp: time.Now(), + } + + log.Printf("Node 1 publishing event: %s", event1.EventType) + eventBus1.Publish("", event1) + + time.Sleep(500 * time.Millisecond) + + event2 := &aether.Event{ + ID: uuid.New().String(), + EventType: "OrderPaid", + ActorID: actorID, + Version: 2, + Data: map[string]interface{}{ + "total": 99.99, + "status": "paid", + "method": "credit_card", + }, + Timestamp: time.Now(), + } + + log.Printf("Node 2 publishing event: %s", event2.EventType) + eventBus2.Publish("", event2) + + time.Sleep(2 * time.Second) + + close(done) + + log.Println("Cross-node broadcasting demo complete") + }() + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + select { + case <-sigCh: + log.Println("Shutting down...") + case <-done: + } +} + +func processEvents(ctx context.Context, eventStoredCh <-chan *aether.Event, eventStore *store.JetStreamEventStore, done chan struct{}) { + for { + select { + case <-done: + return + case <-ctx.Done(): + return + case event, ok := <-eventStoredCh: + if !ok { + return + } + + if event == nil { + continue + } + + if event.EventType != aether.EventTypeEventStored { + continue + } + + actorID, ok := event.Data["actorId"].(string) + if !ok { + log.Printf("Warning: EventStored missing actorId") + continue + } + + version, ok := event.Data["version"].(int64) + if !ok { + log.Printf("Warning: EventStored missing version") + continue + } + + eventID, _ := event.Data["eventId"].(string) + + log.Printf("Received EventStored: actor=%s, version=%d, eventId=%s", actorID, version, eventID) + + eventStore.UpdateVersionCache(actorID, version) + + currentVersion, _ := eventStore.GetLatestVersion(actorID) + log.Printf("Updated cache: %s now has version %d (cached: %d)", actorID, version, currentVersion) + } + } +} + +func getEnv(key, defaultValue string) string { + if value := os.Getenv(key); value != "" { + return value + } + return defaultValue +} \ No newline at end of file diff --git a/examples/retry_patterns.go b/examples/retry_patterns.go deleted file mode 100644 index a803a77..0000000 --- a/examples/retry_patterns.go +++ /dev/null @@ -1,353 +0,0 @@ -package examples - -import ( - "errors" - "fmt" - "log" - "math" - "math/rand" - "time" - - "git.flowmade.one/flowmade-one/aether" -) - -// SimpleRetryPattern demonstrates a basic retry loop using VersionConflictError. -// -// This pattern is suitable for scenarios where you want to automatically retry -// with exponential backoff when version conflicts occur. -func SimpleRetryPattern(store aether.EventStore, actorID string, eventType string) error { - const maxRetries = 3 - const initialBackoff = 100 * time.Millisecond - - var event *aether.Event - - for attempt := 0; attempt < maxRetries; attempt++ { - if attempt > 0 { - backoff := time.Duration(math.Pow(2, float64(attempt-1))) * initialBackoff - log.Printf("Retry attempt %d after %v", attempt, backoff) - time.Sleep(backoff) - } - - // Get the current version for the actor - currentVersion, err := store.GetLatestVersion(actorID) - if err != nil { - return fmt.Errorf("failed to get latest version: %w", err) - } - - // Create event with next version - event = &aether.Event{ - ID: fmt.Sprintf("evt-%d-%d", time.Now().UnixNano(), attempt), - EventType: eventType, - ActorID: actorID, - Version: currentVersion + 1, - Data: map[string]interface{}{"attempt": attempt}, - Timestamp: time.Now(), - } - - // Attempt to save - if err := store.SaveEvent(event); err == nil { - log.Printf("Successfully saved event for actor %s at version %d", actorID, event.Version) - return nil - } else if !errors.Is(err, aether.ErrVersionConflict) { - // Some other error occurred - return fmt.Errorf("save event failed: %w", err) - } - // If it's a version conflict, loop will retry - } - - return fmt.Errorf("failed to save event after %d retries", maxRetries) -} - -// ConflictDetailedRetryPattern demonstrates how to extract detailed information -// from VersionConflictError to make intelligent retry decisions. -// -// This pattern shows how to log detailed context and potentially implement -// circuit-breaker logic based on the conflict information. -func ConflictDetailedRetryPattern(store aether.EventStore, actorID string, eventType string) error { - const maxRetries = 5 - var lastConflictVersion int64 - - for attempt := 0; attempt < maxRetries; attempt++ { - // Get current version - currentVersion, err := store.GetLatestVersion(actorID) - if err != nil { - return err - } - - // Create event - event := &aether.Event{ - ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()), - EventType: eventType, - ActorID: actorID, - Version: currentVersion + 1, - Data: map[string]interface{}{"timestamp": time.Now()}, - Timestamp: time.Now(), - } - - // Attempt to save - err = store.SaveEvent(event) - if err == nil { - return nil // Success - } - - // Check if it's a version conflict - var versionErr *aether.VersionConflictError - if !errors.As(err, &versionErr) { - // Not a version conflict, fail immediately - return err - } - - // Extract detailed context from the conflict error - log.Printf( - "Version conflict for actor %q: attempted version %d, current version %d", - versionErr.ActorID, - versionErr.AttemptedVersion, - versionErr.CurrentVersion, - ) - - // Check for thrashing (multiple conflicts with same version) - if lastConflictVersion == versionErr.CurrentVersion && attempt > 0 { - log.Printf("Detected version thrashing - circuit breaker would trigger here") - return fmt.Errorf("circuit breaker: too many conflicts at version %d", versionErr.CurrentVersion) - } - lastConflictVersion = versionErr.CurrentVersion - - // Exponential backoff - backoff := time.Duration(math.Pow(2, float64(attempt))) * 100 * time.Millisecond - time.Sleep(backoff) - } - - return fmt.Errorf("failed after %d retries", maxRetries) -} - -// JitterRetryPattern implements exponential backoff with jitter to prevent -// thundering herd when multiple writers retry simultaneously. -func JitterRetryPattern(store aether.EventStore, actorID string, eventType string) error { - const maxRetries = 3 - const baseBackoff = 100 * time.Millisecond - const maxJitter = 0.1 // 10% jitter - - for attempt := 0; attempt < maxRetries; attempt++ { - currentVersion, err := store.GetLatestVersion(actorID) - if err != nil { - return err - } - - event := &aether.Event{ - ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()), - EventType: eventType, - ActorID: actorID, - Version: currentVersion + 1, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - - err = store.SaveEvent(event) - if err == nil { - return nil - } - - if !errors.Is(err, aether.ErrVersionConflict) { - return err - } - - // Exponential backoff with jitter - exponentialBackoff := time.Duration(math.Pow(2, float64(attempt))) * baseBackoff - jitter := time.Duration(rand.Float64() * float64(exponentialBackoff) * maxJitter) - totalBackoff := exponentialBackoff + jitter - - log.Printf("Retrying in %v (attempt %d/%d)", totalBackoff, attempt+1, maxRetries) - time.Sleep(totalBackoff) - } - - return fmt.Errorf("failed after %d retries", maxRetries) -} - -// AdaptiveRetryPattern adjusts retry strategy based on version conflict patterns. -// -// This pattern demonstrates how application logic can use CurrentVersion to -// decide whether to retry, give up, or escalate to a higher-level handler. -func AdaptiveRetryPattern(store aether.EventStore, actorID string, eventType string) error { - const maxRetries = 3 - - for attempt := 0; attempt < maxRetries; attempt++ { - currentVersion, err := store.GetLatestVersion(actorID) - if err != nil { - return err - } - - event := &aether.Event{ - ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()), - EventType: eventType, - ActorID: actorID, - Version: currentVersion + 1, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - - err = store.SaveEvent(event) - if err == nil { - return nil - } - - var versionErr *aether.VersionConflictError - if !errors.As(err, &versionErr) { - return err - } - - // Adaptive backoff based on version distance - versionDistance := versionErr.CurrentVersion - versionErr.AttemptedVersion - if versionDistance > 10 { - // Many concurrent writers - back off more aggressively - log.Printf("High contention detected (gap: %d), aggressive backoff", versionDistance) - time.Sleep(time.Duration(versionDistance*10) * time.Millisecond) - } else if versionDistance > 3 { - // Moderate contention - normal backoff - log.Printf("Moderate contention detected (gap: %d)", versionDistance) - time.Sleep(time.Duration(versionDistance) * time.Millisecond) - } else { - // Light contention - minimal backoff - log.Printf("Light contention detected") - time.Sleep(50 * time.Millisecond) - } - } - - return fmt.Errorf("failed after %d retries", maxRetries) -} - -// EventualConsistencyPattern demonstrates how to handle version conflicts -// in an eventually consistent manner by publishing to a retry queue. -// -// This is useful when immediate retry is not feasible, and you want to -// defer the operation to a background worker. -type RetryQueueItem struct { - Event *aether.Event - ConflictVersion int64 - ConflictAttempted int64 - NextRetryTime time.Time - FailureCount int -} - -func EventualConsistencyPattern(store aether.EventStore, retryQueue chan<- RetryQueueItem, event *aether.Event) { - err := store.SaveEvent(event) - if err == nil { - return - } - - var versionErr *aether.VersionConflictError - if !errors.As(err, &versionErr) { - log.Printf("Non-retryable error: %v", err) - return - } - - // Queue for retry - background worker will process this - retryItem := RetryQueueItem{ - Event: event, - ConflictVersion: versionErr.CurrentVersion, - ConflictAttempted: versionErr.AttemptedVersion, - NextRetryTime: time.Now().Add(1 * time.Second), - FailureCount: 0, - } - - select { - case retryQueue <- retryItem: - log.Printf("Queued event for retry: actor=%s", event.ActorID) - case <-time.After(5 * time.Second): - log.Printf("Failed to queue event for retry (queue full)") - } -} - -// CircuitBreakerPattern implements a simple circuit breaker for version conflicts. -// -// The circuit breaker tracks failure rates and temporarily stops retrying -// when the failure rate gets too high, allowing the system to recover. -type CircuitBreaker struct { - failureCount int - successCount int - state string // "closed", "open", "half-open" - lastFailureTime time.Time - openDuration time.Duration - failureThreshold int - successThreshold int -} - -func NewCircuitBreaker() *CircuitBreaker { - return &CircuitBreaker{ - state: "closed", - openDuration: 30 * time.Second, - failureThreshold: 5, - successThreshold: 3, - } -} - -func (cb *CircuitBreaker) RecordSuccess() { - if cb.state == "half-open" { - cb.successCount++ - if cb.successCount >= cb.successThreshold { - cb.state = "closed" - cb.failureCount = 0 - cb.successCount = 0 - log.Printf("Circuit breaker closed") - } - } -} - -func (cb *CircuitBreaker) RecordFailure() { - cb.lastFailureTime = time.Now() - cb.failureCount++ - if cb.failureCount >= cb.failureThreshold { - cb.state = "open" - log.Printf("Circuit breaker opened") - } -} - -func (cb *CircuitBreaker) CanRetry() bool { - if cb.state == "closed" { - return true - } - if cb.state == "open" { - if time.Since(cb.lastFailureTime) > cb.openDuration { - cb.state = "half-open" - cb.failureCount = 0 - cb.successCount = 0 - log.Printf("Circuit breaker half-open") - return true - } - return false - } - // half-open state allows retries - return true -} - -func CircuitBreakerRetryPattern(store aether.EventStore, cb *CircuitBreaker, actorID string, eventType string) error { - if !cb.CanRetry() { - return fmt.Errorf("circuit breaker open - not retrying") - } - - currentVersion, err := store.GetLatestVersion(actorID) - if err != nil { - return err - } - - event := &aether.Event{ - ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()), - EventType: eventType, - ActorID: actorID, - Version: currentVersion + 1, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - - err = store.SaveEvent(event) - if err == nil { - cb.RecordSuccess() - return nil - } - - if !errors.Is(err, aether.ErrVersionConflict) { - return err - } - - cb.RecordFailure() - return fmt.Errorf("save failed with version conflict, circuit breaker status: %s", cb.state) -} diff --git a/nats_eventbus.go b/nats_eventbus.go index f0207ab..d4af93e 100644 --- a/nats_eventbus.go +++ b/nats_eventbus.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log" + "strings" "sync" "github.com/google/uuid" @@ -19,14 +20,16 @@ import ( // This bypasses namespace isolation at the NATS level. Ensure proper access controls // are in place at the application layer before granting wildcard subscription access. type NATSEventBus struct { - *EventBus // Embed base EventBus for local subscriptions - nc *nats.Conn // NATS connection - subscriptions []*nats.Subscription - patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards) - nodeID string // Unique ID for this node - mutex sync.Mutex - ctx context.Context - cancel context.CancelFunc + *EventBus // Embed base EventBus for local subscriptions + nc *nats.Conn // NATS connection + subscriptions []*nats.Subscription + patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards) + nodeID string // Unique ID for this node + streamPrefix string // NATS subject prefix for events + eventStore interface{} // Optional event store for version cache sync (jetstream.JetStreamEventStore) + mutex sync.Mutex + ctx context.Context + cancel context.CancelFunc } // eventMessage is the wire format for events sent over NATS @@ -46,6 +49,7 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) { nodeID: uuid.New().String(), subscriptions: make([]*nats.Subscription, 0), patternSubscribers: make(map[string]int), + streamPrefix: "aether", ctx: ctx, cancel: cancel, } @@ -53,6 +57,43 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) { return neb, nil } +// NewNATSEventBusWithBroadcaster creates a new NATS-backed event bus with JetStreamEventStore integration. +// The event store is used to automatically update version cache when EventStored events are received +// from other cluster nodes via NATS. This ensures cross-node version consistency. +// +// Example: +// +// eventBus := aether.NewNATSEventBusWithBroadcaster(natsConn, store, "tenant-abc") +// ch := eventBus.SubscribeToEventStored("tenant-*") +// for event := range ch { +// actorID := event.Data["actorId"].(string) +// version := event.Data["version"].(int64) +// store.UpdateVersionCache(actorID, version) +// } +// +// The namespace parameter is used as a prefix for EventStored event filtering. +// If empty, EventStored events from all namespaces will be received (requires wildcard pattern). +func NewNATSEventBusWithBroadcaster(nc *nats.Conn, store interface{}, namespace string) *NATSEventBus { + streamPrefix := "aether" + if namespace != "" { + streamPrefix = fmt.Sprintf("aether.%s", sanitizeSubject(namespace)) + } + + neb := &NATSEventBus{ + EventBus: NewEventBus(), + nc: nc, + nodeID: uuid.New().String(), + subscriptions: make([]*nats.Subscription, 0), + patternSubscribers: make(map[string]int), + streamPrefix: streamPrefix, + eventStore: store, + ctx: context.Background(), + cancel: func() {}, + } + + return neb +} + // Subscribe creates a local subscription and ensures NATS subscription exists for the pattern. // Supports NATS subject patterns: // - "*" matches a single token @@ -228,3 +269,103 @@ func (neb *NATSEventBus) Stop() { log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID) } + +// sanitizeSubject sanitizes a string for use in NATS subjects +func sanitizeSubject(s string) string { + s = strings.ReplaceAll(s, " ", "_") + s = strings.ReplaceAll(s, ".", "_") + s = strings.ReplaceAll(s, "*", "_") + s = strings.ReplaceAll(s, ">", "_") + return s +} + +// extractActorType extracts the actor type from an actor ID +func extractActorType(actorID string) string { + for i, c := range actorID { + if c == '-' && i > 0 { + return actorID[:i] + } + } + return "unknown" +} + +// SubscribeToEventStored creates a subscription to EventStored events for a namespace pattern. +// EventStored events are published by JetStreamEventStore when events are successfully saved. +// This is useful for cross-node event synchronization and version cache consistency. +// +// The returned channel receives EventStored events matching the pattern. +// The EventStored event schema: +// - EventType: "EventStored" +// - ActorID: ID of the actor that the original event was about +// - Version: version of the stored event +// - Data: +// - eventId: (string) ID of the stored event +// - actorId: (string) ID of the actor +// - version: (int64) version of the event +// - timestamp: (int64) Unix timestamp of when the event was stored +// +// The namespacePattern supports NATS wildcards: +// - "*" matches a single token +// - ">" matches one or more tokens (only at the end) +// +// Example: +// +// ch := eventBus.SubscribeToEventStored("tenant-*") +// for event := range ch { +// if event.EventType != aether.EventTypeEventStored { +// continue +// } +// actorID := event.Data["actorId"].(string) +// version, _ := event.Data["version"].(int64) +// store.UpdateVersionCache(actorID, version) +// } +// +// Security Warning: Using wildcard patterns like ">" will receive EventStored events +// from all namespaces. Ensure your application handles this appropriately. +func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event { + neb.mutex.Lock() + defer neb.mutex.Unlock() + + subject := fmt.Sprintf("%s.%s.%s", neb.streamPrefix, namespacePattern, "events.>") + + ch := make(chan *Event, 100) + + sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) { + var eventMsg eventMessage + if err := json.Unmarshal(msg.Data, &eventMsg); err != nil { + log.Printf("[NATSEventBus] Failed to unmarshal EventStored event: %v", err) + return + } + + if eventMsg.NodeID == neb.nodeID { + return + } + + if eventMsg.Event.EventType == EventTypeEventStored && neb.eventStore != nil { + actorID, ok := eventMsg.Event.Data["actorId"].(string) + if !ok { + return + } + version, ok := eventMsg.Event.Data["version"].(int64) + if !ok { + return + } + // Use type assertion to call UpdateVersionCache + if es, ok := neb.eventStore.(interface{ UpdateVersionCache(string, int64) }); ok { + es.UpdateVersionCache(actorID, version) + } + } + + neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event) + }) + + if err != nil { + log.Printf("[NATSEventBus] Failed to subscribe to EventStored: %v", err) + close(ch) + return ch + } + + neb.subscriptions = append(neb.subscriptions, sub) + + return ch +} diff --git a/store/integration_test.go b/store/integration_test.go new file mode 100644 index 0000000..f120c50 --- /dev/null +++ b/store/integration_test.go @@ -0,0 +1,431 @@ +//go:build integration + +package store + +import ( + "context" + "log" + "os" + "testing" + "time" + + "git.flowmade.one/flowmade-one/aether" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats-server/v2/server" +) + +func setupNatsServer() (*server.Server, *nats.Conn, func()) { + opts := &server.Options{ + Port: -1, + JetStream: true, + StoreDir: "/tmp/nats-test-" + time.Now().Format("20060102150405"), + } + + s, err := server.NewServer(opts) + if err != nil { + log.Fatal("Failed to create NATS server:", err) + } + + go s.Start() + if !s.ReadyForConnections(4 * time.Second) { + log.Fatal("NATS server failed to start") + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + s.Shutdown() + log.Fatal("Failed to connect to NATS:", err) + } + + return s, nc, func() { + nc.Close() + s.Shutdown() + os.RemoveAll(opts.StoreDir) + } +} + +func TestUpdateVersionCache(t *testing.T) { + s, nc, cleanup := setupNatsServer() + defer cleanup() + + ctx := context.Background() + + store, err := NewJetStreamEventStore(nc, "test_update_cache") + if err != nil { + t.Fatalf("Failed to create store: %v", err) + } + defer store.Close(ctx) + + actorID := "test-actor-1" + + tests := []struct { + name string + cachedVersion int64 + newVersion int64 + expectUpdate bool + expectVersion int64 + }{ + { + name: "update when new version is greater", + cachedVersion: 5, + newVersion: 10, + expectUpdate: true, + expectVersion: 10, + }, + { + name: "do not update when new version is equal", + cachedVersion: 5, + newVersion: 5, + expectUpdate: false, + expectVersion: 5, + }, + { + name: "do not update when new version is less", + cachedVersion: 10, + newVersion: 5, + expectUpdate: false, + expectVersion: 10, + }, + { + name: "update when no cached version exists", + cachedVersion: 0, + newVersion: 1, + expectUpdate: true, + expectVersion: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Set up cached version + store.versions = make(map[string]int64) + store.versions[actorID] = tt.cachedVersion + + // Call UpdateVersionCache + store.UpdateVersionCache(actorID, tt.newVersion) + + // Verify result + if tt.expectUpdate { + if version, ok := store.versions[actorID]; !ok { + t.Error("Expected version to be updated but it wasn't cached") + } else if version != tt.expectVersion { + t.Errorf("Expected version %d, got %d", tt.expectVersion, version) + } + } else { + if version, ok := store.versions[actorID]; !ok { + t.Error("Expected version to remain cached") + } else if version != tt.expectVersion { + t.Errorf("Expected version to remain %d, got %d", tt.expectVersion, version) + } + } + }) + } +} + +func TestUpdateVersionCache_Concurrent(t *testing.T) { + s, nc, cleanup := setupNatsServer() + defer cleanup() + + ctx := context.Background() + + store, err := NewJetStreamEventStore(nc, "test_update_cache_concurrent") + if err != nil { + t.Fatalf("Failed to create store: %v", err) + } + defer store.Close(ctx) + + actorID := "concurrent-actor" + store.versions[actorID] = 1 + + const numGoroutines = 50 + const maxVersion = 100 + + var done = make(chan struct{}) + var updates int32 + + for i := 0; i < numGoroutines; i++ { + version := int64(1 + (i % maxVersion)) + go func(v int64) { + store.UpdateVersionCache(actorID, v) + select { + case <-done: + default: + updates++ + } + }(version) + } + + close(done) + + time.Sleep(100 * time.Millisecond) + + finalVersion := store.versions[actorID] + if finalVersion > maxVersion { + t.Errorf("Expected version to be at most %d, got %d", maxVersion, finalVersion) + } +} + +func TestSubscribeToEventStored(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + s, nc, cleanup := setupNatsServer() + defer cleanup() + + ctx := context.Background() + + store, err := NewJetStreamEventStore(nc, "test_subscribe_event_stored") + if err != nil { + t.Fatalf("Failed to create store: %v", err) + } + defer store.Close(ctx) + + eventBusWithStore := NewNATSEventBusWithBroadcaster(nc, store, "") + if eventBusWithStore == nil { + t.Fatalf("Failed to create event bus with broadcaster") + } + defer eventBusWithStore.Stop() + + ch := eventBusWithStore.SubscribeToEventStored("*") + if ch == nil { + t.Fatal("SubscribeToEventStored returned nil channel") + } + + actorID := "subscribe-test-actor" + event := &aether.Event{ + ID: uuid.New().String(), + EventType: "TestEvent", + ActorID: actorID, + Version: 1, + Data: map[string]interface{}{"key": "value"}, + Timestamp: time.Now(), + } + + eventBusWithStore.Publish("", event) + + select { + case receivedEvent := <-ch: + if receivedEvent.EventType != aether.EventTypeEventStored { + t.Errorf("Expected EventTypeEventStored, got %s", receivedEvent.EventType) + } + if receivedEvent.ActorID != actorID { + t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID) + } + data, ok := receivedEvent.Data["actorId"].(string) + if !ok || data != actorID { + t.Errorf("Expected actorId in data to be %s", actorID) + } + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for EventStored event") + } +} + +func TestCrossNodeBroadcasting_SingleNode(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + s, nc, cleanup := setupNatsServer() + defer cleanup() + + ctx := context.Background() + + store, err := NewJetStreamEventStore(nc, "test_single_node_broadcast") + if err != nil { + t.Fatalf("Failed to create store: %v", err) + } + defer store.Close(ctx) + + eventBus := NewNATSEventBusWithBroadcaster(nc, store, "") + defer eventBus.Stop() + + actorID := "broadcast-test-actor-1" + localCh := eventBus.Subscribe("") + + event := &aether.Event{ + ID: uuid.New().String(), + EventType: "OrderPlaced", + ActorID: actorID, + Version: 1, + Data: map[string]interface{}{"total": 99.99}, + Timestamp: time.Now(), + } + + eventBus.Publish("", event) + + select { + case receivedEvent := <-localCh: + if receivedEvent.EventType != "OrderPlaced" { + t.Errorf("Expected OrderPlaced, got %s", receivedEvent.EventType) + } + if receivedEvent.ActorID != actorID { + t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID) + } + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for broadcast event") + } +} + +func TestCrossNodeBroadcasting_MultiNode(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + s1, nc1, cleanup1 := setupNatsServer() + defer cleanup1() + + s2, nc2, cleanup2 := setupNatsServer() + defer cleanup2() + + ctx := context.Background() + + store1, err := NewJetStreamEventStore(nc1, "test_multi_node_1") + if err != nil { + t.Fatalf("Failed to create store 1: %v", err) + } + + store2, err := NewJetStreamEventStore(nc2, "test_multi_node_2") + if err != nil { + t.Fatalf("Failed to create store 2: %v", err) + } + + eventBus1 := NewNATSEventBusWithBroadcaster(nc1, store1, "") + eventBus2 := NewNATSEventBusWithBroadcaster(nc2, store2, "") + defer eventBus1.Stop() + defer eventBus2.Stop() + + actorID := "multi-node-actor" + receiverCh := eventBus2.Subscribe("") + + event := &aether.Event{ + ID: uuid.New().String(), + EventType: "InventoryReserved", + ActorID: actorID, + Version: 1, + Data: map[string]interface{}{"quantity": 5}, + Timestamp: time.Now(), + } + + eventBus1.Publish("", event) + + select { + case receivedEvent := <-receiverCh: + if receivedEvent.EventType != "InventoryReserved" { + t.Errorf("Expected InventoryReserved, got %s", receivedEvent.EventType) + } + if receivedEvent.ActorID != actorID { + t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID) + } + case <-time.After(3 * time.Second): + t.Fatal("Timeout waiting for cross-node event") + } +} + +func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + s, nc, cleanup := setupNatsServer() + defer cleanup() + + ctx := context.Background() + + tenantAStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-a") + if err != nil { + t.Fatalf("Failed to create tenant A store: %v", err) + } + + tenantBStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-b") + if err != nil { + t.Fatalf("Failed to create tenant B store: %v", err) + } + + tenantAEventBus := NewNATSEventBusWithBroadcaster(nc, tenantAStore, "tenant-a") + tenantBEventBus := NewNATSEventBusWithBroadcaster(nc, tenantBStore, "tenant-b") + defer tenantAEventBus.Stop() + defer tenantBEventBus.Stop() + + tenantACh := tenantAEventBus.Subscribe("tenant-a") + tenantBCh := tenantBEventBus.Subscribe("tenant-b") + + actorID := "tenant-actor" + event := &aether.Event{ + ID: uuid.New().String(), + EventType: "TenantEvent", + ActorID: actorID, + Version: 1, + Data: map[string]interface{}{"data": "tenant-a"}, + Timestamp: time.Now(), + } + + tenantAEventBus.Publish("tenant-a", event) + + select { + case receivedEvent := <-tenantACh: + if receivedEvent.EventType != "TenantEvent" { + t.Errorf("Expected TenantEvent in tenant A, got %s", receivedEvent.EventType) + } + case <-time.After(2 * time.Second): + t.Error("Timeout waiting for tenant A to receive event") + } + + select { + case <-tenantBCh: + t.Error("Tenant B should not receive tenant A's events") + case <-time.After(1 * time.Second): + // Expected - tenant B should not receive events from tenant A + } +} + +func TestUpdateVersionCache_EventStored(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + s, nc, cleanup := setupNatsServer() + defer cleanup() + + ctx := context.Background() + + store, err := NewJetStreamEventStore(nc, "test_version_cache_eventstored") + if err != nil { + t.Fatalf("Failed to create store: %v", err) + } + + eventBus := NewNATSEventBusWithBroadcaster(nc, store, "") + defer eventBus.Stop() + + actorID := "version-cache-actor" + store.UpdateVersionCache(actorID, 5) + + event := &aether.Event{ + ID: uuid.New().String(), + EventType: "TestEvent", + ActorID: actorID, + Version: 10, + Data: map[string]interface{}{"test": true}, + Timestamp: time.Now(), + } + + eventBus.Publish("", event) + + time.Sleep(100 * time.Millisecond) + + storedVersion, err := store.GetLatestVersion(actorID) + if err != nil { + t.Fatalf("Failed to get latest version: %v", err) + } + + if storedVersion != 10 { + t.Errorf("Expected version 10, got %d", storedVersion) + } + + cacheVersion, ok := store.GetCachedVersion(actorID) + if !ok { + t.Error("Expected version to be in cache") + } else if cacheVersion != 10 { + t.Errorf("Expected cached version 10, got %d", cacheVersion) + } +} \ No newline at end of file diff --git a/store/jetstream.go b/store/jetstream.go index fdd137e..cfa0fbb 100644 --- a/store/jetstream.go +++ b/store/jetstream.go @@ -1,6 +1,7 @@ package store import ( + "context" "encoding/json" "fmt" "strings" @@ -286,6 +287,28 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { // publishEventStored publishes an EventStored event to the broadcaster. // This is called after a successful SaveEvent to notify subscribers. +// +// EventStored Event Schema: +// - EventType: "EventStored" (aether.EventTypeEventStored) +// - ActorID: ID of the actor that the original event was about +// - Version: version of the stored event +// - Data: +// - eventId: (string) ID of the stored event +// - actorId: (string) ID of the actor +// - version: (int64) version of the event +// - timestamp: (int64) Unix timestamp of when the event was stored +// +// Example usage with NATSEventBus: +// +// eventBus := aether.NewNATSEventBus(natsConn) +// store := store.NewJetStreamEventStoreWithBroadcaster(natsConn, "events", eventBus, "") +// ch := eventBus.SubscribeToEventStored("*") +// +// for event := range ch { +// actorID := event.Data["actorId"].(string) +// version := event.Data["version"].(int64) +// store.UpdateVersionCache(actorID, version) +// } func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) { eventStored := &aether.Event{ ID: uuid.New().String(), @@ -561,6 +584,9 @@ func sanitizeSubject(s string) string { // UpdateVersionCache updates the version cache for a specific actor. // This is used when receiving events from other nodes via NATS to keep // the version cache consistent across cluster nodes. +// +// Only updates if the new version is greater than the cached version to prevent +// stale cache entries from causing version conflicts. func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) { jes.mu.Lock() defer jes.mu.Unlock() @@ -571,5 +597,27 @@ func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64 } } +// GetCachedVersion returns the cached version for an actor, if available. +func (jes *JetStreamEventStore) GetCachedVersion(actorID string) (int64, bool) { + jes.mu.Lock() + defer jes.mu.Unlock() + + version, ok := jes.versions[actorID] + return version, ok +} + +// SetBroadcaster sets the event broadcaster for this store. +// The broadcaster is used to publish EventStored events when events are saved. +func (jes *JetStreamEventStore) SetBroadcaster(broadcaster aether.EventBroadcaster) { + jes.mu.Lock() + defer jes.mu.Unlock() + jes.broadcaster = broadcaster +} + +// Close closes the JetStream event store and cleans up resources. +func (jes *JetStreamEventStore) Close(ctx context.Context) error { + return nil +} + // Compile-time check that JetStreamEventStore implements EventStoreWithErrors var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)