From 7085c682c324f74627acbc0ecab2ad8874521e5a Mon Sep 17 00:00:00 2001 From: Hugo Nijhuis Date: Sat, 10 Jan 2026 23:49:14 +0100 Subject: [PATCH] Add integration tests for JetStreamEventStore This commit adds comprehensive integration tests for JetStreamEventStore that validate production event store behavior against a real NATS server. Tests include: - Stream creation and configuration - SaveEvent persistence to JetStream - GetEvents retrieval in correct order - GetLatestVersion functionality - Snapshot save/load operations - Namespace isolation between stores - Concurrent writes and version conflict handling - Persistence across connection disconnects - Multiple store instance coordination Also updates CI workflow to run integration tests with a NATS server enabled with JetStream. Closes #10 Co-Authored-By: Claude Opus 4.5 --- .gitea/workflows/ci.yaml | 30 + store/jetstream_integration_test.go | 1425 +++++++++++++++++++++++++++ 2 files changed, 1455 insertions(+) create mode 100644 store/jetstream_integration_test.go diff --git a/.gitea/workflows/ci.yaml b/.gitea/workflows/ci.yaml index 0b0e748..f31ca4d 100644 --- a/.gitea/workflows/ci.yaml +++ b/.gitea/workflows/ci.yaml @@ -17,3 +17,33 @@ jobs: run: go build ./... - name: Test run: go test ./... + + integration: + runs-on: ubuntu-latest + services: + nats: + image: nats:latest + options: --name nats -p 4222:4222 + # Enable JetStream via command line args + # Note: The 'args' field may not be supported in all CI runners + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version: '1.23' + - name: Install NATS Server + run: | + # Download and install nats-server + curl -L https://github.com/nats-io/nats-server/releases/download/v2.10.24/nats-server-v2.10.24-linux-amd64.tar.gz -o nats-server.tar.gz + tar -xzf nats-server.tar.gz + sudo mv nats-server-v2.10.24-linux-amd64/nats-server /usr/local/bin/ + nats-server --version + - name: Start NATS with JetStream + run: | + nats-server -js -p 4222 & + # Wait for NATS to be ready + sleep 3 + # Verify NATS is running + curl -s http://localhost:8222/healthz || echo "Health check not available, but NATS may still be running" + - name: Run Integration Tests + run: go test -tags=integration -v ./... diff --git a/store/jetstream_integration_test.go b/store/jetstream_integration_test.go new file mode 100644 index 0000000..7d0c826 --- /dev/null +++ b/store/jetstream_integration_test.go @@ -0,0 +1,1425 @@ +// +build integration + +package store + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "git.flowmade.one/flowmade-one/aether" + "github.com/nats-io/nats.go" +) + +// These integration tests require a running NATS server with JetStream enabled. +// Run with: go test -tags=integration -v ./store/... +// +// To start NATS with JetStream: nats-server -js + +// getTestNATSConnection creates a new NATS connection for testing. +// Returns nil if NATS is not available, allowing tests to skip gracefully. +func getTestNATSConnection(t *testing.T) *nats.Conn { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + t.Skipf("NATS not available: %v (run 'nats-server -js' to enable integration tests)", err) + return nil + } + + // Verify JetStream is available + js, err := nc.JetStream() + if err != nil { + nc.Close() + t.Skipf("JetStream not available: %v (run 'nats-server -js' to enable integration tests)", err) + return nil + } + + // Test JetStream connectivity + _, err = js.AccountInfo() + if err != nil { + nc.Close() + t.Skipf("JetStream not enabled: %v (run 'nats-server -js' to enable integration tests)", err) + return nil + } + + return nc +} + +// uniqueStreamName generates a unique stream name for test isolation +func uniqueStreamName(prefix string) string { + return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano()) +} + +// cleanupStream deletes a stream to clean up after tests +func cleanupStream(nc *nats.Conn, streamName string) { + js, err := nc.JetStream() + if err != nil { + return + } + _ = js.DeleteStream(streamName) +} + +// === Stream Creation and Configuration Tests === + +func TestJetStreamEventStore_StreamCreation(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-stream-creation") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create JetStreamEventStore: %v", err) + } + + if store == nil { + t.Fatal("expected non-nil store") + } + + // Verify stream was created + js, _ := nc.JetStream() + info, err := js.StreamInfo(streamName) + if err != nil { + t.Fatalf("stream was not created: %v", err) + } + + if info.Config.Name != streamName { + t.Errorf("stream name mismatch: got %q, want %q", info.Config.Name, streamName) + } + + // Verify stream has correct subjects + expectedSubjects := []string{ + fmt.Sprintf("%s.events.>", streamName), + fmt.Sprintf("%s.snapshots.>", streamName), + } + for _, expected := range expectedSubjects { + found := false + for _, subject := range info.Config.Subjects { + if subject == expected { + found = true + break + } + } + if !found { + t.Errorf("expected subject %q not found in stream config", expected) + } + } +} + +func TestJetStreamEventStore_StreamCreationWithConfig(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-stream-config") + defer cleanupStream(nc, streamName) + + config := JetStreamConfig{ + StreamRetention: 7 * 24 * time.Hour, // 7 days + ReplicaCount: 1, + } + + store, err := NewJetStreamEventStoreWithConfig(nc, streamName, config) + if err != nil { + t.Fatalf("failed to create JetStreamEventStore with config: %v", err) + } + + if store == nil { + t.Fatal("expected non-nil store") + } + + // Verify stream configuration + js, _ := nc.JetStream() + info, err := js.StreamInfo(streamName) + if err != nil { + t.Fatalf("stream was not created: %v", err) + } + + if info.Config.MaxAge != 7*24*time.Hour { + t.Errorf("MaxAge mismatch: got %v, want %v", info.Config.MaxAge, 7*24*time.Hour) + } +} + +func TestJetStreamEventStore_StreamCreationWithNamespace(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + baseName := uniqueStreamName("test-ns") + namespace := "tenant-abc" + expectedStreamName := fmt.Sprintf("%s_%s", namespace, baseName) + defer cleanupStream(nc, expectedStreamName) + + store, err := NewJetStreamEventStoreWithNamespace(nc, baseName, namespace) + if err != nil { + t.Fatalf("failed to create JetStreamEventStore with namespace: %v", err) + } + + if store.GetNamespace() != namespace { + t.Errorf("namespace mismatch: got %q, want %q", store.GetNamespace(), namespace) + } + + if store.GetStreamName() != expectedStreamName { + t.Errorf("stream name mismatch: got %q, want %q", store.GetStreamName(), expectedStreamName) + } + + // Verify namespaced stream was created + js, _ := nc.JetStream() + info, err := js.StreamInfo(expectedStreamName) + if err != nil { + t.Fatalf("namespaced stream was not created: %v", err) + } + + if info.Config.Name != expectedStreamName { + t.Errorf("stream name mismatch: got %q, want %q", info.Config.Name, expectedStreamName) + } +} + +func TestJetStreamEventStore_StreamAlreadyExists(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-stream-exists") + defer cleanupStream(nc, streamName) + + // Create first store (creates stream) + store1, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create first store: %v", err) + } + if store1 == nil { + t.Fatal("expected non-nil store") + } + + // Create second store with same stream name (should reuse existing stream) + store2, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create second store with existing stream: %v", err) + } + if store2 == nil { + t.Fatal("expected non-nil store") + } +} + +// === SaveEvent Tests === + +func TestJetStreamEventStore_SaveEvent_PersistsToJetStream(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-save-event") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + event := &aether.Event{ + ID: "evt-123", + EventType: "OrderPlaced", + ActorID: "order-456", + Version: 1, + Data: map[string]interface{}{ + "total": 100.50, + "currency": "USD", + }, + Timestamp: time.Now(), + } + + err = store.SaveEvent(event) + if err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + // Verify event was persisted by retrieving it + events, err := store.GetEvents("order-456", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + if len(events) != 1 { + t.Fatalf("expected 1 event, got %d", len(events)) + } + + retrieved := events[0] + if retrieved.ID != event.ID { + t.Errorf("ID mismatch: got %q, want %q", retrieved.ID, event.ID) + } + if retrieved.EventType != event.EventType { + t.Errorf("EventType mismatch: got %q, want %q", retrieved.EventType, event.EventType) + } + if retrieved.ActorID != event.ActorID { + t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, event.ActorID) + } + if retrieved.Version != event.Version { + t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, event.Version) + } +} + +func TestJetStreamEventStore_SaveEvent_MultipleEvents(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-multi-events") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + // Save multiple events + for i := 1; i <= 10; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "OrderUpdated", + ActorID: "order-456", + Version: int64(i), + Data: map[string]interface{}{"update": i}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed for event %d: %v", i, err) + } + } + + events, err := store.GetEvents("order-456", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + if len(events) != 10 { + t.Errorf("expected 10 events, got %d", len(events)) + } +} + +func TestJetStreamEventStore_SaveEvent_WithMetadata(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-event-metadata") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + event := &aether.Event{ + ID: "evt-meta", + EventType: "OrderPlaced", + ActorID: "order-456", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + event.SetCorrelationID("corr-123") + event.SetCausationID("cause-456") + event.SetUserID("user-789") + event.SetTraceID("trace-abc") + event.SetSpanID("span-def") + + err = store.SaveEvent(event) + if err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + events, err := store.GetEvents("order-456", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + if len(events) != 1 { + t.Fatalf("expected 1 event, got %d", len(events)) + } + + retrieved := events[0] + if retrieved.GetCorrelationID() != "corr-123" { + t.Errorf("correlationId mismatch: got %q", retrieved.GetCorrelationID()) + } + if retrieved.GetCausationID() != "cause-456" { + t.Errorf("causationId mismatch: got %q", retrieved.GetCausationID()) + } + if retrieved.GetUserID() != "user-789" { + t.Errorf("userId mismatch: got %q", retrieved.GetUserID()) + } + if retrieved.GetTraceID() != "trace-abc" { + t.Errorf("traceId mismatch: got %q", retrieved.GetTraceID()) + } + if retrieved.GetSpanID() != "span-def" { + t.Errorf("spanId mismatch: got %q", retrieved.GetSpanID()) + } +} + +func TestJetStreamEventStore_SaveEvent_Deduplication(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-dedup") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + event := &aether.Event{ + ID: "evt-dedup-test", + EventType: "OrderPlaced", + ActorID: "order-456", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + + // Save the same event twice (same event ID) + err = store.SaveEvent(event) + if err != nil { + t.Fatalf("first SaveEvent failed: %v", err) + } + + // Second save with same event ID and same version should fail with version conflict + // because version 1 already exists + err = store.SaveEvent(event) + if err == nil { + // If no error, that's a problem - we expected version conflict + t.Error("expected error when saving duplicate event, got nil") + } else if !errors.Is(err, aether.ErrVersionConflict) { + t.Errorf("expected version conflict error, got: %v", err) + } +} + +func TestJetStreamEventStore_SaveEvent_VersionConflict(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-version-conflict") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + // Save first event with version 5 + event1 := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 5, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event1); err != nil { + t.Fatalf("SaveEvent failed for first event: %v", err) + } + + // Attempt to save event with lower version (should fail) + event2 := &aether.Event{ + ID: "evt-2", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 3, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err = store.SaveEvent(event2) + if err == nil { + t.Fatal("expected error when saving event with lower version, got nil") + } + + if !errors.Is(err, aether.ErrVersionConflict) { + t.Errorf("expected ErrVersionConflict, got %v", err) + } + + var versionErr *aether.VersionConflictError + if !errors.As(err, &versionErr) { + t.Fatalf("expected VersionConflictError, got %T", err) + } + + if versionErr.ActorID != "actor-123" { + t.Errorf("ActorID mismatch: got %q, want %q", versionErr.ActorID, "actor-123") + } + if versionErr.CurrentVersion != 5 { + t.Errorf("CurrentVersion mismatch: got %d, want %d", versionErr.CurrentVersion, 5) + } + if versionErr.AttemptedVersion != 3 { + t.Errorf("AttemptedVersion mismatch: got %d, want %d", versionErr.AttemptedVersion, 3) + } +} + +func TestJetStreamEventStore_SaveEvent_VersionConflictEqual(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-version-equal") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + // Save first event with version 5 + event1 := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 5, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event1); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + // Attempt to save event with equal version (should fail) + event2 := &aether.Event{ + ID: "evt-2", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 5, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err = store.SaveEvent(event2) + if err == nil { + t.Fatal("expected error when saving event with equal version, got nil") + } + + if !errors.Is(err, aether.ErrVersionConflict) { + t.Errorf("expected ErrVersionConflict, got %v", err) + } +} + +// === GetEvents Tests === + +func TestJetStreamEventStore_GetEvents_RetrievesInOrder(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-get-order") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + // Save events in order + for i := 1; i <= 10; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "TestEvent", + ActorID: "actor-123", + Version: int64(i), + Data: map[string]interface{}{"index": i}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + } + + events, err := store.GetEvents("actor-123", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + if len(events) != 10 { + t.Fatalf("expected 10 events, got %d", len(events)) + } + + // Verify order + for i, event := range events { + expectedID := fmt.Sprintf("evt-%d", i+1) + if event.ID != expectedID { + t.Errorf("event %d: got ID %q, want %q", i, event.ID, expectedID) + } + expectedVersion := int64(i + 1) + if event.Version != expectedVersion { + t.Errorf("event %d: got Version %d, want %d", i, event.Version, expectedVersion) + } + } +} + +func TestJetStreamEventStore_GetEvents_FromVersionFilters(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-from-version") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + // Save events with versions 1-10 + for i := 1; i <= 10; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "TestEvent", + ActorID: "actor-123", + Version: int64(i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + } + + testCases := []struct { + name string + fromVersion int64 + expectedLen int + minVersion int64 + }{ + {"from version 0", 0, 10, 1}, + {"from version 5", 5, 5, 6}, + {"from version 10", 10, 0, 0}, + {"from version 11", 11, 0, 0}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + events, err := store.GetEvents("actor-123", tc.fromVersion) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + if len(events) != tc.expectedLen { + t.Errorf("expected %d events, got %d", tc.expectedLen, len(events)) + } + + // Verify all returned events have version > fromVersion + for _, event := range events { + if event.Version <= tc.fromVersion { + t.Errorf("event version %d is not greater than fromVersion %d", event.Version, tc.fromVersion) + } + } + }) + } +} + +func TestJetStreamEventStore_GetEvents_NonExistentActor(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-nonexistent") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + events, err := store.GetEvents("non-existent-actor", 0) + if err != nil { + t.Fatalf("GetEvents should not error for non-existent actor: %v", err) + } + + if len(events) != 0 { + t.Errorf("expected 0 events for non-existent actor, got %d", len(events)) + } +} + +func TestJetStreamEventStore_GetEvents_MultipleActors(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-multi-actors") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + // Save events for different actors + actors := []string{"actor-1", "actor-2", "actor-3"} + for _, actorID := range actors { + for i := 1; i <= 3; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%s-%d", actorID, i), + EventType: "TestEvent", + ActorID: actorID, + Version: int64(i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + } + } + + // Verify each actor has its own events + for _, actorID := range actors { + events, err := store.GetEvents(actorID, 0) + if err != nil { + t.Fatalf("GetEvents failed for %s: %v", actorID, err) + } + if len(events) != 3 { + t.Errorf("expected 3 events for %s, got %d", actorID, len(events)) + } + for _, event := range events { + if event.ActorID != actorID { + t.Errorf("event has wrong ActorID: got %q, want %q", event.ActorID, actorID) + } + } + } +} + +func TestJetStreamEventStore_GetEventsWithErrors(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-with-errors") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + // Save valid events + for i := 1; i <= 5; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "TestEvent", + ActorID: "actor-123", + Version: int64(i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + } + + result, err := store.GetEventsWithErrors("actor-123", 0) + if err != nil { + t.Fatalf("GetEventsWithErrors failed: %v", err) + } + + if len(result.Events) != 5 { + t.Errorf("expected 5 events, got %d", len(result.Events)) + } + + if result.HasErrors() { + t.Errorf("expected no errors, got %d", len(result.Errors)) + } +} + +// === GetLatestVersion Tests === + +func TestJetStreamEventStore_GetLatestVersion(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-latest-version") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + // Save events with versions 1-5 + for i := 1; i <= 5; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "TestEvent", + ActorID: "actor-123", + Version: int64(i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + } + + latestVersion, err := store.GetLatestVersion("actor-123") + if err != nil { + t.Fatalf("GetLatestVersion failed: %v", err) + } + + if latestVersion != 5 { + t.Errorf("expected latest version 5, got %d", latestVersion) + } +} + +func TestJetStreamEventStore_GetLatestVersion_NonExistentActor(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-latest-nonexistent") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + latestVersion, err := store.GetLatestVersion("non-existent-actor") + if err != nil { + t.Fatalf("GetLatestVersion should not error for non-existent actor: %v", err) + } + + if latestVersion != 0 { + t.Errorf("expected version 0 for non-existent actor, got %d", latestVersion) + } +} + +func TestJetStreamEventStore_GetLatestVersion_UpdatesAfterNewEvent(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-latest-updates") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + // Save first event + event1 := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event1); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + version1, err := store.GetLatestVersion("actor-123") + if err != nil { + t.Fatalf("GetLatestVersion failed: %v", err) + } + if version1 != 1 { + t.Errorf("expected version 1, got %d", version1) + } + + // Save second event + event2 := &aether.Event{ + ID: "evt-2", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 10, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event2); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + version2, err := store.GetLatestVersion("actor-123") + if err != nil { + t.Fatalf("GetLatestVersion failed: %v", err) + } + if version2 != 10 { + t.Errorf("expected version 10, got %d", version2) + } +} + +// === Snapshot Tests === + +func TestJetStreamEventStore_SaveAndGetSnapshot(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-snapshot") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-123", + Version: 10, + State: map[string]interface{}{ + "balance": 100.50, + "status": "active", + }, + Timestamp: time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC), + } + + err = store.SaveSnapshot(snapshot) + if err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-123") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved.ActorID != snapshot.ActorID { + t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, snapshot.ActorID) + } + if retrieved.Version != snapshot.Version { + t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, snapshot.Version) + } + + // Check state values (JSON unmarshaling may change types) + balance, ok := retrieved.State["balance"].(float64) + if !ok { + t.Errorf("balance is not float64: %T", retrieved.State["balance"]) + } else if balance != 100.50 { + t.Errorf("balance mismatch: got %v, want %v", balance, 100.50) + } +} + +func TestJetStreamEventStore_GetLatestSnapshot_MultipleSnapshots(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-multi-snapshot") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + // Save multiple snapshots + for i := 1; i <= 5; i++ { + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-123", + Version: int64(i * 10), + State: map[string]interface{}{ + "iteration": i, + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed for version %d: %v", i*10, err) + } + } + + // Get latest should return the most recently saved + retrieved, err := store.GetLatestSnapshot("actor-123") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved.Version != 50 { + t.Errorf("expected version 50, got %d", retrieved.Version) + } +} + +func TestJetStreamEventStore_GetLatestSnapshot_NonExistent(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-snapshot-nonexistent") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + _, err = store.GetLatestSnapshot("non-existent-actor") + if err == nil { + t.Error("expected error when getting snapshot for non-existent actor") + } +} + +func TestJetStreamEventStore_SnapshotWithComplexState(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-snapshot-complex") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + complexState := map[string]interface{}{ + "string": "hello", + "integer": float64(42), // JSON numbers are float64 + "float": 3.14159, + "boolean": true, + "null": nil, + "array": []interface{}{"a", "b", "c"}, + "nested": map[string]interface{}{ + "level1": map[string]interface{}{ + "level2": "deep value", + }, + }, + } + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-complex", + Version: 1, + State: complexState, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-complex") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + // Verify fields + if retrieved.State["string"] != "hello" { + t.Errorf("string mismatch: got %v", retrieved.State["string"]) + } + if retrieved.State["boolean"] != true { + t.Errorf("boolean mismatch: got %v", retrieved.State["boolean"]) + } +} + +// === Namespace Isolation Tests === + +func TestJetStreamEventStore_NamespaceIsolation(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + baseName := uniqueStreamName("test-isolation") + ns1 := "tenant-a" + ns2 := "tenant-b" + expectedStream1 := fmt.Sprintf("%s_%s", ns1, baseName) + expectedStream2 := fmt.Sprintf("%s_%s", ns2, baseName) + defer cleanupStream(nc, expectedStream1) + defer cleanupStream(nc, expectedStream2) + + store1, err := NewJetStreamEventStoreWithNamespace(nc, baseName, ns1) + if err != nil { + t.Fatalf("failed to create store1: %v", err) + } + + store2, err := NewJetStreamEventStoreWithNamespace(nc, baseName, ns2) + if err != nil { + t.Fatalf("failed to create store2: %v", err) + } + + // Save events to namespace 1 + event1 := &aether.Event{ + ID: "evt-ns1", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 1, + Data: map[string]interface{}{"namespace": "tenant-a"}, + Timestamp: time.Now(), + } + if err := store1.SaveEvent(event1); err != nil { + t.Fatalf("SaveEvent failed for ns1: %v", err) + } + + // Save events to namespace 2 + event2 := &aether.Event{ + ID: "evt-ns2", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 1, + Data: map[string]interface{}{"namespace": "tenant-b"}, + Timestamp: time.Now(), + } + if err := store2.SaveEvent(event2); err != nil { + t.Fatalf("SaveEvent failed for ns2: %v", err) + } + + // Verify isolation: store1 only sees tenant-a events + events1, err := store1.GetEvents("actor-123", 0) + if err != nil { + t.Fatalf("GetEvents failed for store1: %v", err) + } + if len(events1) != 1 { + t.Errorf("store1: expected 1 event, got %d", len(events1)) + } + if events1[0].Data["namespace"] != "tenant-a" { + t.Errorf("store1: got event from wrong namespace: %v", events1[0].Data["namespace"]) + } + + // Verify isolation: store2 only sees tenant-b events + events2, err := store2.GetEvents("actor-123", 0) + if err != nil { + t.Fatalf("GetEvents failed for store2: %v", err) + } + if len(events2) != 1 { + t.Errorf("store2: expected 1 event, got %d", len(events2)) + } + if events2[0].Data["namespace"] != "tenant-b" { + t.Errorf("store2: got event from wrong namespace: %v", events2[0].Data["namespace"]) + } +} + +// === Concurrency Tests === + +func TestJetStreamEventStore_ConcurrentWrites(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-concurrent") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + numGoroutines := 10 + eventsPerGoroutine := 10 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for g := 0; g < numGoroutines; g++ { + go func(goroutineID int) { + defer wg.Done() + actorID := fmt.Sprintf("actor-%d", goroutineID) + for i := 1; i <= eventsPerGoroutine; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d-%d", goroutineID, i), + EventType: "TestEvent", + ActorID: actorID, + Version: int64(i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Errorf("SaveEvent failed: %v", err) + } + } + }(g) + } + + wg.Wait() + + // Verify each actor has all events + for g := 0; g < numGoroutines; g++ { + actorID := fmt.Sprintf("actor-%d", g) + events, err := store.GetEvents(actorID, 0) + if err != nil { + t.Errorf("GetEvents failed for %s: %v", actorID, err) + continue + } + if len(events) != eventsPerGoroutine { + t.Errorf("expected %d events for %s, got %d", eventsPerGoroutine, actorID, len(events)) + } + } +} + +func TestJetStreamEventStore_ConcurrentVersionConflict(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-concurrent-conflict") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + numGoroutines := 50 + var successCount int64 + var conflictCount int64 + var wg sync.WaitGroup + + // All goroutines try to save version 1 + wg.Add(numGoroutines) + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", id), + EventType: "TestEvent", + ActorID: "actor-contested", + Version: 1, + Data: map[string]interface{}{"goroutine": id}, + Timestamp: time.Now(), + } + err := store.SaveEvent(event) + if err == nil { + atomic.AddInt64(&successCount, 1) + } else if errors.Is(err, aether.ErrVersionConflict) { + atomic.AddInt64(&conflictCount, 1) + } else { + t.Errorf("unexpected error: %v", err) + } + }(i) + } + + wg.Wait() + + // Exactly one should succeed + if successCount != 1 { + t.Errorf("expected exactly 1 success, got %d", successCount) + } + if conflictCount != int64(numGoroutines-1) { + t.Errorf("expected %d conflicts, got %d", numGoroutines-1, conflictCount) + } + + // Verify only one event was stored + events, err := store.GetEvents("actor-contested", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + if len(events) != 1 { + t.Errorf("expected 1 event, got %d", len(events)) + } +} + +// === Connection Loss/Recovery Tests === + +func TestJetStreamEventStore_PersistenceAcrossConnections(t *testing.T) { + nc1 := getTestNATSConnection(t) + + streamName := uniqueStreamName("test-persistence") + defer func() { + nc := getTestNATSConnection(t) + cleanupStream(nc, streamName) + nc.Close() + }() + + // Create store and save events with first connection + store1, err := NewJetStreamEventStore(nc1, streamName) + if err != nil { + t.Fatalf("failed to create store1: %v", err) + } + + for i := 1; i <= 5; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "TestEvent", + ActorID: "actor-123", + Version: int64(i), + Data: map[string]interface{}{"index": i}, + Timestamp: time.Now(), + } + if err := store1.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + } + + // Close first connection + nc1.Close() + + // Create new connection and store + nc2 := getTestNATSConnection(t) + defer nc2.Close() + + store2, err := NewJetStreamEventStore(nc2, streamName) + if err != nil { + t.Fatalf("failed to create store2: %v", err) + } + + // Verify events are still there + events, err := store2.GetEvents("actor-123", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + if len(events) != 5 { + t.Errorf("expected 5 events after reconnection, got %d", len(events)) + } + + // Verify we can continue adding events + event6 := &aether.Event{ + ID: "evt-6", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 6, + Data: map[string]interface{}{"index": 6}, + Timestamp: time.Now(), + } + if err := store2.SaveEvent(event6); err != nil { + t.Fatalf("SaveEvent failed after reconnection: %v", err) + } + + events, err = store2.GetEvents("actor-123", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + if len(events) != 6 { + t.Errorf("expected 6 events after adding one, got %d", len(events)) + } +} + +func TestJetStreamEventStore_MultipleStoreInstances(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-multi-instance") + defer cleanupStream(nc, streamName) + + // Create multiple store instances on the same stream + store1, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store1: %v", err) + } + + store2, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store2: %v", err) + } + + // Save events from store1 + event1 := &aether.Event{ + ID: "evt-from-store1", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 1, + Data: map[string]interface{}{"source": "store1"}, + Timestamp: time.Now(), + } + if err := store1.SaveEvent(event1); err != nil { + t.Fatalf("SaveEvent from store1 failed: %v", err) + } + + // Read from store2 + events, err := store2.GetEvents("actor-123", 0) + if err != nil { + t.Fatalf("GetEvents from store2 failed: %v", err) + } + + if len(events) != 1 { + t.Errorf("store2 should see event from store1, got %d events", len(events)) + } + + // Save from store2 (continuing version sequence) + event2 := &aether.Event{ + ID: "evt-from-store2", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 2, + Data: map[string]interface{}{"source": "store2"}, + Timestamp: time.Now(), + } + if err := store2.SaveEvent(event2); err != nil { + t.Fatalf("SaveEvent from store2 failed: %v", err) + } + + // Read from store1 + events, err = store1.GetEvents("actor-123", 0) + if err != nil { + t.Fatalf("GetEvents from store1 failed: %v", err) + } + + if len(events) != 2 { + t.Errorf("store1 should see both events, got %d events", len(events)) + } +} + +// === Interface Compliance Tests === + +func TestJetStreamEventStore_ImplementsEventStore(t *testing.T) { + var _ aether.EventStore = (*JetStreamEventStore)(nil) +} + +func TestJetStreamEventStore_ImplementsEventStoreWithErrors(t *testing.T) { + var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil) +} + +func TestJetStreamEventStore_ImplementsSnapshotStore(t *testing.T) { + nc := getTestNATSConnection(t) + defer nc.Close() + + streamName := uniqueStreamName("test-interface") + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + // Verify it has all SnapshotStore methods + _ = store.SaveEvent + _ = store.GetEvents + _ = store.GetLatestVersion + _ = store.GetLatestSnapshot + _ = store.SaveSnapshot +} + +// === Benchmarks === + +func BenchmarkJetStreamEventStore_SaveEvent(b *testing.B) { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + b.Skipf("NATS not available: %v", err) + } + defer nc.Close() + + streamName := fmt.Sprintf("bench-save-%d", time.Now().UnixNano()) + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + b.Fatalf("failed to create store: %v", err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "BenchmarkEvent", + ActorID: "actor-bench", + Version: int64(i + 1), + Data: map[string]interface{}{"value": i}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + b.Fatalf("SaveEvent failed: %v", err) + } + } +} + +func BenchmarkJetStreamEventStore_GetEvents(b *testing.B) { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + b.Skipf("NATS not available: %v", err) + } + defer nc.Close() + + streamName := fmt.Sprintf("bench-get-%d", time.Now().UnixNano()) + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + b.Fatalf("failed to create store: %v", err) + } + + // Pre-populate with events + for i := 0; i < 100; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "BenchmarkEvent", + ActorID: "actor-bench", + Version: int64(i + 1), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + b.Fatalf("SaveEvent failed: %v", err) + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := store.GetEvents("actor-bench", 0) + if err != nil { + b.Fatalf("GetEvents failed: %v", err) + } + } +}