//go:build integration package store import ( "errors" "fmt" "sync" "sync/atomic" "testing" "time" "git.flowmade.one/flowmade-one/aether" "github.com/nats-io/nats.go" ) // These integration tests require a running NATS server with JetStream enabled. // Run with: go test -tags=integration -v ./store/... // // To start NATS with JetStream: nats-server -js // getTestNATSConnection creates a new NATS connection for testing. // Returns nil if NATS is not available, allowing tests to skip gracefully. func getTestNATSConnection(t *testing.T) *nats.Conn { nc, err := nats.Connect(nats.DefaultURL) if err != nil { t.Skipf("NATS not available: %v (run 'nats-server -js' to enable integration tests)", err) return nil } // Verify JetStream is available js, err := nc.JetStream() if err != nil { nc.Close() t.Skipf("JetStream not available: %v (run 'nats-server -js' to enable integration tests)", err) return nil } // Test JetStream connectivity _, err = js.AccountInfo() if err != nil { nc.Close() t.Skipf("JetStream not enabled: %v (run 'nats-server -js' to enable integration tests)", err) return nil } return nc } // uniqueStreamName generates a unique stream name for test isolation func uniqueStreamName(prefix string) string { return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano()) } // cleanupStream deletes a stream to clean up after tests func cleanupStream(nc *nats.Conn, streamName string) { js, err := nc.JetStream() if err != nil { return } _ = js.DeleteStream(streamName) } // === Stream Creation and Configuration Tests === func TestJetStreamEventStore_StreamCreation(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-stream-creation") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create JetStreamEventStore: %v", err) } if store == nil { t.Fatal("expected non-nil store") } // Verify stream was created js, _ := nc.JetStream() info, err := js.StreamInfo(streamName) if err != nil { t.Fatalf("stream was not created: %v", err) } if info.Config.Name != streamName { t.Errorf("stream name mismatch: got %q, want %q", info.Config.Name, streamName) } // Verify stream has correct subjects expectedSubjects := []string{ fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName), } for _, expected := range expectedSubjects { found := false for _, subject := range info.Config.Subjects { if subject == expected { found = true break } } if !found { t.Errorf("expected subject %q not found in stream config", expected) } } } func TestJetStreamEventStore_StreamCreationWithConfig(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-stream-config") defer cleanupStream(nc, streamName) config := JetStreamConfig{ StreamRetention: 7 * 24 * time.Hour, // 7 days ReplicaCount: 1, } store, err := NewJetStreamEventStoreWithConfig(nc, streamName, config) if err != nil { t.Fatalf("failed to create JetStreamEventStore with config: %v", err) } if store == nil { t.Fatal("expected non-nil store") } // Verify stream configuration js, _ := nc.JetStream() info, err := js.StreamInfo(streamName) if err != nil { t.Fatalf("stream was not created: %v", err) } if info.Config.MaxAge != 7*24*time.Hour { t.Errorf("MaxAge mismatch: got %v, want %v", info.Config.MaxAge, 7*24*time.Hour) } } func TestJetStreamEventStore_StreamCreationWithNamespace(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() baseName := uniqueStreamName("test-ns") namespace := "tenant-abc" expectedStreamName := fmt.Sprintf("%s_%s", namespace, baseName) defer cleanupStream(nc, expectedStreamName) store, err := NewJetStreamEventStoreWithNamespace(nc, baseName, namespace) if err != nil { t.Fatalf("failed to create JetStreamEventStore with namespace: %v", err) } if store.GetNamespace() != namespace { t.Errorf("namespace mismatch: got %q, want %q", store.GetNamespace(), namespace) } if store.GetStreamName() != expectedStreamName { t.Errorf("stream name mismatch: got %q, want %q", store.GetStreamName(), expectedStreamName) } // Verify namespaced stream was created js, _ := nc.JetStream() info, err := js.StreamInfo(expectedStreamName) if err != nil { t.Fatalf("namespaced stream was not created: %v", err) } if info.Config.Name != expectedStreamName { t.Errorf("stream name mismatch: got %q, want %q", info.Config.Name, expectedStreamName) } } func TestJetStreamEventStore_StreamAlreadyExists(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-stream-exists") defer cleanupStream(nc, streamName) // Create first store (creates stream) store1, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create first store: %v", err) } if store1 == nil { t.Fatal("expected non-nil store") } // Create second store with same stream name (should reuse existing stream) store2, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create second store with existing stream: %v", err) } if store2 == nil { t.Fatal("expected non-nil store") } } // === SaveEvent Tests === func TestJetStreamEventStore_SaveEvent_PersistsToJetStream(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-save-event") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } event := &aether.Event{ ID: "evt-123", EventType: "OrderPlaced", ActorID: "order-456", Version: 1, Data: map[string]interface{}{ "total": 100.50, "currency": "USD", }, Timestamp: time.Now(), } err = store.SaveEvent(event) if err != nil { t.Fatalf("SaveEvent failed: %v", err) } // Verify event was persisted by retrieving it events, err := store.GetEvents("order-456", 0) if err != nil { t.Fatalf("GetEvents failed: %v", err) } if len(events) != 1 { t.Fatalf("expected 1 event, got %d", len(events)) } retrieved := events[0] if retrieved.ID != event.ID { t.Errorf("ID mismatch: got %q, want %q", retrieved.ID, event.ID) } if retrieved.EventType != event.EventType { t.Errorf("EventType mismatch: got %q, want %q", retrieved.EventType, event.EventType) } if retrieved.ActorID != event.ActorID { t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, event.ActorID) } if retrieved.Version != event.Version { t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, event.Version) } } func TestJetStreamEventStore_SaveEvent_MultipleEvents(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-multi-events") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } // Save multiple events for i := 1; i <= 10; i++ { event := &aether.Event{ ID: fmt.Sprintf("evt-%d", i), EventType: "OrderUpdated", ActorID: "order-456", Version: int64(i), Data: map[string]interface{}{"update": i}, Timestamp: time.Now(), } if err := store.SaveEvent(event); err != nil { t.Fatalf("SaveEvent failed for event %d: %v", i, err) } } events, err := store.GetEvents("order-456", 0) if err != nil { t.Fatalf("GetEvents failed: %v", err) } if len(events) != 10 { t.Errorf("expected 10 events, got %d", len(events)) } } func TestJetStreamEventStore_SaveEvent_WithMetadata(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-event-metadata") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } event := &aether.Event{ ID: "evt-meta", EventType: "OrderPlaced", ActorID: "order-456", Version: 1, Data: map[string]interface{}{}, Timestamp: time.Now(), } event.SetCorrelationID("corr-123") event.SetCausationID("cause-456") event.SetUserID("user-789") event.SetTraceID("trace-abc") event.SetSpanID("span-def") err = store.SaveEvent(event) if err != nil { t.Fatalf("SaveEvent failed: %v", err) } events, err := store.GetEvents("order-456", 0) if err != nil { t.Fatalf("GetEvents failed: %v", err) } if len(events) != 1 { t.Fatalf("expected 1 event, got %d", len(events)) } retrieved := events[0] if retrieved.GetCorrelationID() != "corr-123" { t.Errorf("correlationId mismatch: got %q", retrieved.GetCorrelationID()) } if retrieved.GetCausationID() != "cause-456" { t.Errorf("causationId mismatch: got %q", retrieved.GetCausationID()) } if retrieved.GetUserID() != "user-789" { t.Errorf("userId mismatch: got %q", retrieved.GetUserID()) } if retrieved.GetTraceID() != "trace-abc" { t.Errorf("traceId mismatch: got %q", retrieved.GetTraceID()) } if retrieved.GetSpanID() != "span-def" { t.Errorf("spanId mismatch: got %q", retrieved.GetSpanID()) } } func TestJetStreamEventStore_SaveEvent_Deduplication(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-dedup") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } event := &aether.Event{ ID: "evt-dedup-test", EventType: "OrderPlaced", ActorID: "order-456", Version: 1, Data: map[string]interface{}{}, Timestamp: time.Now(), } // Save the same event twice (same event ID) err = store.SaveEvent(event) if err != nil { t.Fatalf("first SaveEvent failed: %v", err) } // Second save with same event ID and same version should fail with version conflict // because version 1 already exists err = store.SaveEvent(event) if err == nil { // If no error, that's a problem - we expected version conflict t.Error("expected error when saving duplicate event, got nil") } else if !errors.Is(err, aether.ErrVersionConflict) { t.Errorf("expected version conflict error, got: %v", err) } } func TestJetStreamEventStore_SaveEvent_VersionConflict(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-version-conflict") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } // Save first event with version 5 event1 := &aether.Event{ ID: "evt-1", EventType: "TestEvent", ActorID: "actor-123", Version: 5, Data: map[string]interface{}{}, Timestamp: time.Now(), } if err := store.SaveEvent(event1); err != nil { t.Fatalf("SaveEvent failed for first event: %v", err) } // Attempt to save event with lower version (should fail) event2 := &aether.Event{ ID: "evt-2", EventType: "TestEvent", ActorID: "actor-123", Version: 3, Data: map[string]interface{}{}, Timestamp: time.Now(), } err = store.SaveEvent(event2) if err == nil { t.Fatal("expected error when saving event with lower version, got nil") } if !errors.Is(err, aether.ErrVersionConflict) { t.Errorf("expected ErrVersionConflict, got %v", err) } var versionErr *aether.VersionConflictError if !errors.As(err, &versionErr) { t.Fatalf("expected VersionConflictError, got %T", err) } if versionErr.ActorID != "actor-123" { t.Errorf("ActorID mismatch: got %q, want %q", versionErr.ActorID, "actor-123") } if versionErr.CurrentVersion != 5 { t.Errorf("CurrentVersion mismatch: got %d, want %d", versionErr.CurrentVersion, 5) } if versionErr.AttemptedVersion != 3 { t.Errorf("AttemptedVersion mismatch: got %d, want %d", versionErr.AttemptedVersion, 3) } } func TestJetStreamEventStore_SaveEvent_VersionConflictEqual(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-version-equal") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } // Save first event with version 5 event1 := &aether.Event{ ID: "evt-1", EventType: "TestEvent", ActorID: "actor-123", Version: 5, Data: map[string]interface{}{}, Timestamp: time.Now(), } if err := store.SaveEvent(event1); err != nil { t.Fatalf("SaveEvent failed: %v", err) } // Attempt to save event with equal version (should fail) event2 := &aether.Event{ ID: "evt-2", EventType: "TestEvent", ActorID: "actor-123", Version: 5, Data: map[string]interface{}{}, Timestamp: time.Now(), } err = store.SaveEvent(event2) if err == nil { t.Fatal("expected error when saving event with equal version, got nil") } if !errors.Is(err, aether.ErrVersionConflict) { t.Errorf("expected ErrVersionConflict, got %v", err) } } // === GetEvents Tests === func TestJetStreamEventStore_GetEvents_RetrievesInOrder(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-get-order") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } // Save events in order for i := 1; i <= 10; i++ { event := &aether.Event{ ID: fmt.Sprintf("evt-%d", i), EventType: "TestEvent", ActorID: "actor-123", Version: int64(i), Data: map[string]interface{}{"index": i}, Timestamp: time.Now(), } if err := store.SaveEvent(event); err != nil { t.Fatalf("SaveEvent failed: %v", err) } } events, err := store.GetEvents("actor-123", 0) if err != nil { t.Fatalf("GetEvents failed: %v", err) } if len(events) != 10 { t.Fatalf("expected 10 events, got %d", len(events)) } // Verify order for i, event := range events { expectedID := fmt.Sprintf("evt-%d", i+1) if event.ID != expectedID { t.Errorf("event %d: got ID %q, want %q", i, event.ID, expectedID) } expectedVersion := int64(i + 1) if event.Version != expectedVersion { t.Errorf("event %d: got Version %d, want %d", i, event.Version, expectedVersion) } } } func TestJetStreamEventStore_GetEvents_FromVersionFilters(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-from-version") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } // Save events with versions 1-10 for i := 1; i <= 10; i++ { event := &aether.Event{ ID: fmt.Sprintf("evt-%d", i), EventType: "TestEvent", ActorID: "actor-123", Version: int64(i), Data: map[string]interface{}{}, Timestamp: time.Now(), } if err := store.SaveEvent(event); err != nil { t.Fatalf("SaveEvent failed: %v", err) } } testCases := []struct { name string fromVersion int64 expectedLen int minVersion int64 }{ {"from version 0", 0, 10, 1}, {"from version 5", 5, 5, 6}, {"from version 10", 10, 0, 0}, {"from version 11", 11, 0, 0}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { events, err := store.GetEvents("actor-123", tc.fromVersion) if err != nil { t.Fatalf("GetEvents failed: %v", err) } if len(events) != tc.expectedLen { t.Errorf("expected %d events, got %d", tc.expectedLen, len(events)) } // Verify all returned events have version > fromVersion for _, event := range events { if event.Version <= tc.fromVersion { t.Errorf("event version %d is not greater than fromVersion %d", event.Version, tc.fromVersion) } } }) } } func TestJetStreamEventStore_GetEvents_NonExistentActor(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-nonexistent") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } events, err := store.GetEvents("non-existent-actor", 0) if err != nil { t.Fatalf("GetEvents should not error for non-existent actor: %v", err) } if len(events) != 0 { t.Errorf("expected 0 events for non-existent actor, got %d", len(events)) } } func TestJetStreamEventStore_GetEvents_MultipleActors(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-multi-actors") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } // Save events for different actors actors := []string{"actor-1", "actor-2", "actor-3"} for _, actorID := range actors { for i := 1; i <= 3; i++ { event := &aether.Event{ ID: fmt.Sprintf("evt-%s-%d", actorID, i), EventType: "TestEvent", ActorID: actorID, Version: int64(i), Data: map[string]interface{}{}, Timestamp: time.Now(), } if err := store.SaveEvent(event); err != nil { t.Fatalf("SaveEvent failed: %v", err) } } } // Verify each actor has its own events for _, actorID := range actors { events, err := store.GetEvents(actorID, 0) if err != nil { t.Fatalf("GetEvents failed for %s: %v", actorID, err) } if len(events) != 3 { t.Errorf("expected 3 events for %s, got %d", actorID, len(events)) } for _, event := range events { if event.ActorID != actorID { t.Errorf("event has wrong ActorID: got %q, want %q", event.ActorID, actorID) } } } } func TestJetStreamEventStore_GetEventsWithErrors(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-with-errors") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } // Save valid events for i := 1; i <= 5; i++ { event := &aether.Event{ ID: fmt.Sprintf("evt-%d", i), EventType: "TestEvent", ActorID: "actor-123", Version: int64(i), Data: map[string]interface{}{}, Timestamp: time.Now(), } if err := store.SaveEvent(event); err != nil { t.Fatalf("SaveEvent failed: %v", err) } } result, err := store.GetEventsWithErrors("actor-123", 0) if err != nil { t.Fatalf("GetEventsWithErrors failed: %v", err) } if len(result.Events) != 5 { t.Errorf("expected 5 events, got %d", len(result.Events)) } if result.HasErrors() { t.Errorf("expected no errors, got %d", len(result.Errors)) } } // === GetLatestVersion Tests === func TestJetStreamEventStore_GetLatestVersion(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-latest-version") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } // Save events with versions 1-5 for i := 1; i <= 5; i++ { event := &aether.Event{ ID: fmt.Sprintf("evt-%d", i), EventType: "TestEvent", ActorID: "actor-123", Version: int64(i), Data: map[string]interface{}{}, Timestamp: time.Now(), } if err := store.SaveEvent(event); err != nil { t.Fatalf("SaveEvent failed: %v", err) } } latestVersion, err := store.GetLatestVersion("actor-123") if err != nil { t.Fatalf("GetLatestVersion failed: %v", err) } if latestVersion != 5 { t.Errorf("expected latest version 5, got %d", latestVersion) } } func TestJetStreamEventStore_GetLatestVersion_NonExistentActor(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-latest-nonexistent") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } latestVersion, err := store.GetLatestVersion("non-existent-actor") if err != nil { t.Fatalf("GetLatestVersion should not error for non-existent actor: %v", err) } if latestVersion != 0 { t.Errorf("expected version 0 for non-existent actor, got %d", latestVersion) } } func TestJetStreamEventStore_GetLatestVersion_UpdatesAfterNewEvent(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-latest-updates") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } // Save first event event1 := &aether.Event{ ID: "evt-1", EventType: "TestEvent", ActorID: "actor-123", Version: 1, Data: map[string]interface{}{}, Timestamp: time.Now(), } if err := store.SaveEvent(event1); err != nil { t.Fatalf("SaveEvent failed: %v", err) } version1, err := store.GetLatestVersion("actor-123") if err != nil { t.Fatalf("GetLatestVersion failed: %v", err) } if version1 != 1 { t.Errorf("expected version 1, got %d", version1) } // Save second event event2 := &aether.Event{ ID: "evt-2", EventType: "TestEvent", ActorID: "actor-123", Version: 10, Data: map[string]interface{}{}, Timestamp: time.Now(), } if err := store.SaveEvent(event2); err != nil { t.Fatalf("SaveEvent failed: %v", err) } version2, err := store.GetLatestVersion("actor-123") if err != nil { t.Fatalf("GetLatestVersion failed: %v", err) } if version2 != 10 { t.Errorf("expected version 10, got %d", version2) } } // === Snapshot Tests === func TestJetStreamEventStore_SaveAndGetSnapshot(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-snapshot") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } snapshot := &aether.ActorSnapshot{ ActorID: "actor-123", Version: 10, State: map[string]interface{}{ "balance": 100.50, "status": "active", }, Timestamp: time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC), } err = store.SaveSnapshot(snapshot) if err != nil { t.Fatalf("SaveSnapshot failed: %v", err) } retrieved, err := store.GetLatestSnapshot("actor-123") if err != nil { t.Fatalf("GetLatestSnapshot failed: %v", err) } if retrieved.ActorID != snapshot.ActorID { t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, snapshot.ActorID) } if retrieved.Version != snapshot.Version { t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, snapshot.Version) } // Check state values (JSON unmarshaling may change types) balance, ok := retrieved.State["balance"].(float64) if !ok { t.Errorf("balance is not float64: %T", retrieved.State["balance"]) } else if balance != 100.50 { t.Errorf("balance mismatch: got %v, want %v", balance, 100.50) } } func TestJetStreamEventStore_GetLatestSnapshot_MultipleSnapshots(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-multi-snapshot") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } // Save multiple snapshots for i := 1; i <= 5; i++ { snapshot := &aether.ActorSnapshot{ ActorID: "actor-123", Version: int64(i * 10), State: map[string]interface{}{ "iteration": i, }, Timestamp: time.Now(), } if err := store.SaveSnapshot(snapshot); err != nil { t.Fatalf("SaveSnapshot failed for version %d: %v", i*10, err) } } // Get latest should return the most recently saved retrieved, err := store.GetLatestSnapshot("actor-123") if err != nil { t.Fatalf("GetLatestSnapshot failed: %v", err) } if retrieved.Version != 50 { t.Errorf("expected version 50, got %d", retrieved.Version) } } func TestJetStreamEventStore_GetLatestSnapshot_NonExistent(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-snapshot-nonexistent") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } _, err = store.GetLatestSnapshot("non-existent-actor") if err == nil { t.Error("expected error when getting snapshot for non-existent actor") } } func TestJetStreamEventStore_SnapshotWithComplexState(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-snapshot-complex") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } complexState := map[string]interface{}{ "string": "hello", "integer": float64(42), // JSON numbers are float64 "float": 3.14159, "boolean": true, "null": nil, "array": []interface{}{"a", "b", "c"}, "nested": map[string]interface{}{ "level1": map[string]interface{}{ "level2": "deep value", }, }, } snapshot := &aether.ActorSnapshot{ ActorID: "actor-complex", Version: 1, State: complexState, Timestamp: time.Now(), } if err := store.SaveSnapshot(snapshot); err != nil { t.Fatalf("SaveSnapshot failed: %v", err) } retrieved, err := store.GetLatestSnapshot("actor-complex") if err != nil { t.Fatalf("GetLatestSnapshot failed: %v", err) } // Verify fields if retrieved.State["string"] != "hello" { t.Errorf("string mismatch: got %v", retrieved.State["string"]) } if retrieved.State["boolean"] != true { t.Errorf("boolean mismatch: got %v", retrieved.State["boolean"]) } } // === Namespace Isolation Tests === func TestJetStreamEventStore_NamespaceIsolation(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() baseName := uniqueStreamName("test-isolation") ns1 := "tenant-a" ns2 := "tenant-b" expectedStream1 := fmt.Sprintf("%s_%s", ns1, baseName) expectedStream2 := fmt.Sprintf("%s_%s", ns2, baseName) defer cleanupStream(nc, expectedStream1) defer cleanupStream(nc, expectedStream2) store1, err := NewJetStreamEventStoreWithNamespace(nc, baseName, ns1) if err != nil { t.Fatalf("failed to create store1: %v", err) } store2, err := NewJetStreamEventStoreWithNamespace(nc, baseName, ns2) if err != nil { t.Fatalf("failed to create store2: %v", err) } // Save events to namespace 1 event1 := &aether.Event{ ID: "evt-ns1", EventType: "TestEvent", ActorID: "actor-123", Version: 1, Data: map[string]interface{}{"namespace": "tenant-a"}, Timestamp: time.Now(), } if err := store1.SaveEvent(event1); err != nil { t.Fatalf("SaveEvent failed for ns1: %v", err) } // Save events to namespace 2 event2 := &aether.Event{ ID: "evt-ns2", EventType: "TestEvent", ActorID: "actor-123", Version: 1, Data: map[string]interface{}{"namespace": "tenant-b"}, Timestamp: time.Now(), } if err := store2.SaveEvent(event2); err != nil { t.Fatalf("SaveEvent failed for ns2: %v", err) } // Verify isolation: store1 only sees tenant-a events events1, err := store1.GetEvents("actor-123", 0) if err != nil { t.Fatalf("GetEvents failed for store1: %v", err) } if len(events1) != 1 { t.Errorf("store1: expected 1 event, got %d", len(events1)) } if events1[0].Data["namespace"] != "tenant-a" { t.Errorf("store1: got event from wrong namespace: %v", events1[0].Data["namespace"]) } // Verify isolation: store2 only sees tenant-b events events2, err := store2.GetEvents("actor-123", 0) if err != nil { t.Fatalf("GetEvents failed for store2: %v", err) } if len(events2) != 1 { t.Errorf("store2: expected 1 event, got %d", len(events2)) } if events2[0].Data["namespace"] != "tenant-b" { t.Errorf("store2: got event from wrong namespace: %v", events2[0].Data["namespace"]) } } // === Concurrency Tests === func TestJetStreamEventStore_ConcurrentWrites(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-concurrent") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } numGoroutines := 10 eventsPerGoroutine := 10 var wg sync.WaitGroup wg.Add(numGoroutines) for g := 0; g < numGoroutines; g++ { go func(goroutineID int) { defer wg.Done() actorID := fmt.Sprintf("actor-%d", goroutineID) for i := 1; i <= eventsPerGoroutine; i++ { event := &aether.Event{ ID: fmt.Sprintf("evt-%d-%d", goroutineID, i), EventType: "TestEvent", ActorID: actorID, Version: int64(i), Data: map[string]interface{}{}, Timestamp: time.Now(), } if err := store.SaveEvent(event); err != nil { t.Errorf("SaveEvent failed: %v", err) } } }(g) } wg.Wait() // Verify each actor has all events for g := 0; g < numGoroutines; g++ { actorID := fmt.Sprintf("actor-%d", g) events, err := store.GetEvents(actorID, 0) if err != nil { t.Errorf("GetEvents failed for %s: %v", actorID, err) continue } if len(events) != eventsPerGoroutine { t.Errorf("expected %d events for %s, got %d", eventsPerGoroutine, actorID, len(events)) } } } func TestJetStreamEventStore_ConcurrentVersionConflict(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-concurrent-conflict") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } numGoroutines := 50 var successCount int64 var conflictCount int64 var wg sync.WaitGroup // All goroutines try to save version 1 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go func(id int) { defer wg.Done() event := &aether.Event{ ID: fmt.Sprintf("evt-%d", id), EventType: "TestEvent", ActorID: "actor-contested", Version: 1, Data: map[string]interface{}{"goroutine": id}, Timestamp: time.Now(), } err := store.SaveEvent(event) if err == nil { atomic.AddInt64(&successCount, 1) } else if errors.Is(err, aether.ErrVersionConflict) { atomic.AddInt64(&conflictCount, 1) } else { t.Errorf("unexpected error: %v", err) } }(i) } wg.Wait() // Exactly one should succeed if successCount != 1 { t.Errorf("expected exactly 1 success, got %d", successCount) } if conflictCount != int64(numGoroutines-1) { t.Errorf("expected %d conflicts, got %d", numGoroutines-1, conflictCount) } // Verify only one event was stored events, err := store.GetEvents("actor-contested", 0) if err != nil { t.Fatalf("GetEvents failed: %v", err) } if len(events) != 1 { t.Errorf("expected 1 event, got %d", len(events)) } } // === Connection Loss/Recovery Tests === func TestJetStreamEventStore_PersistenceAcrossConnections(t *testing.T) { nc1 := getTestNATSConnection(t) streamName := uniqueStreamName("test-persistence") defer func() { nc := getTestNATSConnection(t) cleanupStream(nc, streamName) nc.Close() }() // Create store and save events with first connection store1, err := NewJetStreamEventStore(nc1, streamName) if err != nil { t.Fatalf("failed to create store1: %v", err) } for i := 1; i <= 5; i++ { event := &aether.Event{ ID: fmt.Sprintf("evt-%d", i), EventType: "TestEvent", ActorID: "actor-123", Version: int64(i), Data: map[string]interface{}{"index": i}, Timestamp: time.Now(), } if err := store1.SaveEvent(event); err != nil { t.Fatalf("SaveEvent failed: %v", err) } } // Close first connection nc1.Close() // Create new connection and store nc2 := getTestNATSConnection(t) defer nc2.Close() store2, err := NewJetStreamEventStore(nc2, streamName) if err != nil { t.Fatalf("failed to create store2: %v", err) } // Verify events are still there events, err := store2.GetEvents("actor-123", 0) if err != nil { t.Fatalf("GetEvents failed: %v", err) } if len(events) != 5 { t.Errorf("expected 5 events after reconnection, got %d", len(events)) } // Verify we can continue adding events event6 := &aether.Event{ ID: "evt-6", EventType: "TestEvent", ActorID: "actor-123", Version: 6, Data: map[string]interface{}{"index": 6}, Timestamp: time.Now(), } if err := store2.SaveEvent(event6); err != nil { t.Fatalf("SaveEvent failed after reconnection: %v", err) } events, err = store2.GetEvents("actor-123", 0) if err != nil { t.Fatalf("GetEvents failed: %v", err) } if len(events) != 6 { t.Errorf("expected 6 events after adding one, got %d", len(events)) } } func TestJetStreamEventStore_MultipleStoreInstances(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-multi-instance") defer cleanupStream(nc, streamName) // Create multiple store instances on the same stream store1, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store1: %v", err) } store2, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store2: %v", err) } // Save events from store1 event1 := &aether.Event{ ID: "evt-from-store1", EventType: "TestEvent", ActorID: "actor-123", Version: 1, Data: map[string]interface{}{"source": "store1"}, Timestamp: time.Now(), } if err := store1.SaveEvent(event1); err != nil { t.Fatalf("SaveEvent from store1 failed: %v", err) } // Read from store2 events, err := store2.GetEvents("actor-123", 0) if err != nil { t.Fatalf("GetEvents from store2 failed: %v", err) } if len(events) != 1 { t.Errorf("store2 should see event from store1, got %d events", len(events)) } // Save from store2 (continuing version sequence) event2 := &aether.Event{ ID: "evt-from-store2", EventType: "TestEvent", ActorID: "actor-123", Version: 2, Data: map[string]interface{}{"source": "store2"}, Timestamp: time.Now(), } if err := store2.SaveEvent(event2); err != nil { t.Fatalf("SaveEvent from store2 failed: %v", err) } // Read from store1 events, err = store1.GetEvents("actor-123", 0) if err != nil { t.Fatalf("GetEvents from store1 failed: %v", err) } if len(events) != 2 { t.Errorf("store1 should see both events, got %d events", len(events)) } } // === Interface Compliance Tests === func TestJetStreamEventStore_ImplementsEventStore(t *testing.T) { var _ aether.EventStore = (*JetStreamEventStore)(nil) } func TestJetStreamEventStore_ImplementsEventStoreWithErrors(t *testing.T) { var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil) } func TestJetStreamEventStore_ImplementsSnapshotStore(t *testing.T) { nc := getTestNATSConnection(t) defer nc.Close() streamName := uniqueStreamName("test-interface") defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { t.Fatalf("failed to create store: %v", err) } // Verify it has all SnapshotStore methods _ = store.SaveEvent _ = store.GetEvents _ = store.GetLatestVersion _ = store.GetLatestSnapshot _ = store.SaveSnapshot } // === Benchmarks === func BenchmarkJetStreamEventStore_SaveEvent(b *testing.B) { nc, err := nats.Connect(nats.DefaultURL) if err != nil { b.Skipf("NATS not available: %v", err) } defer nc.Close() streamName := fmt.Sprintf("bench-save-%d", time.Now().UnixNano()) defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { b.Fatalf("failed to create store: %v", err) } b.ResetTimer() for i := 0; i < b.N; i++ { event := &aether.Event{ ID: fmt.Sprintf("evt-%d", i), EventType: "BenchmarkEvent", ActorID: "actor-bench", Version: int64(i + 1), Data: map[string]interface{}{"value": i}, Timestamp: time.Now(), } if err := store.SaveEvent(event); err != nil { b.Fatalf("SaveEvent failed: %v", err) } } } func BenchmarkJetStreamEventStore_GetEvents(b *testing.B) { nc, err := nats.Connect(nats.DefaultURL) if err != nil { b.Skipf("NATS not available: %v", err) } defer nc.Close() streamName := fmt.Sprintf("bench-get-%d", time.Now().UnixNano()) defer cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) if err != nil { b.Fatalf("failed to create store: %v", err) } // Pre-populate with events for i := 0; i < 100; i++ { event := &aether.Event{ ID: fmt.Sprintf("evt-%d", i), EventType: "BenchmarkEvent", ActorID: "actor-bench", Version: int64(i + 1), Data: map[string]interface{}{}, Timestamp: time.Now(), } if err := store.SaveEvent(event); err != nil { b.Fatalf("SaveEvent failed: %v", err) } } b.ResetTimer() for i := 0; i < b.N; i++ { _, err := store.GetEvents("actor-bench", 0) if err != nil { b.Fatalf("GetEvents failed: %v", err) } } }