diff --git a/.gitea/workflows/ci.yaml b/.gitea/workflows/ci.yaml index e943bae..0b0e748 100644 --- a/.gitea/workflows/ci.yaml +++ b/.gitea/workflows/ci.yaml @@ -17,37 +17,3 @@ jobs: run: go build ./... - name: Test run: go test ./... - - integration: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-go@v5 - with: - go-version: '1.23' - - name: Install and Start NATS Server - run: | - # Detect architecture and download appropriate binary - ARCH=$(uname -m) - if [ "$ARCH" = "x86_64" ]; then - NATS_ARCH="amd64" - elif [ "$ARCH" = "aarch64" ] || [ "$ARCH" = "arm64" ]; then - NATS_ARCH="arm64" - else - echo "Unsupported architecture: $ARCH" - exit 1 - fi - echo "Detected architecture: $ARCH, using NATS binary: $NATS_ARCH" - - # Download and extract nats-server - curl -L "https://github.com/nats-io/nats-server/releases/download/v2.10.24/nats-server-v2.10.24-linux-${NATS_ARCH}.tar.gz" -o nats-server.tar.gz - tar -xzf nats-server.tar.gz - - # Start NATS with JetStream - ./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server -js -p 4222 & - - # Wait for NATS to be ready - sleep 3 - ./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server --version - - name: Run Integration Tests - run: go test -tags=integration -v ./... diff --git a/nats_eventbus_integration_test.go b/nats_eventbus_integration_test.go deleted file mode 100644 index 911f257..0000000 --- a/nats_eventbus_integration_test.go +++ /dev/null @@ -1,1279 +0,0 @@ -//go:build integration -// +build integration - -package aether - -import ( - "fmt" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/nats-io/nats.go" -) - -// These integration tests require a running NATS server with JetStream enabled. -// Run with: go test -tags=integration -v ./... -// -// To start NATS with JetStream: nats-server -js - -// getNATSConnection creates a new NATS connection for testing. -// Returns nil if NATS is not available, allowing tests to skip gracefully. -func getNATSConnection(t *testing.T) *nats.Conn { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Skipf("NATS not available: %v (run 'nats-server -js' to enable integration tests)", err) - return nil - } - return nc -} - -// TestNATSEventBus_CrossNodeEventDelivery tests that events are delivered across -// simulated nodes (multiple NATSEventBus instances sharing the same NATS connection). -func TestNATSEventBus_CrossNodeEventDelivery(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - // Create two "nodes" (separate NATSEventBus instances) - node1, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create node1: %v", err) - } - defer node1.Stop() - - node2, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create node2: %v", err) - } - defer node2.Stop() - - namespace := fmt.Sprintf("cross-node-test-%d", time.Now().UnixNano()) - - // Subscribe on node2 before publishing from node1 - ch2 := node2.Subscribe(namespace) - defer node2.Unsubscribe(namespace, ch2) - - // Give NATS time to set up subscription - time.Sleep(100 * time.Millisecond) - - // Publish event from node1 - event := &Event{ - ID: "evt-cross-node", - EventType: "CrossNodeTest", - ActorID: "actor-123", - Version: 1, - Data: map[string]interface{}{"source": "node1"}, - Timestamp: time.Now(), - } - node1.Publish(namespace, event) - - // Wait for event on node2 - select { - case received := <-ch2: - if received.ID != event.ID { - t.Errorf("event ID mismatch: got %q, want %q", received.ID, event.ID) - } - if received.EventType != event.EventType { - t.Errorf("event type mismatch: got %q, want %q", received.EventType, event.EventType) - } - if received.ActorID != event.ActorID { - t.Errorf("actor ID mismatch: got %q, want %q", received.ActorID, event.ActorID) - } - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for cross-node event delivery") - } -} - -// TestNATSEventBus_NamespaceIsolation tests that events in one namespace -// are not received by subscribers in other namespaces. -func TestNATSEventBus_NamespaceIsolation(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - uniqueID := time.Now().UnixNano() - namespace1 := fmt.Sprintf("namespace-a-%d", uniqueID) - namespace2 := fmt.Sprintf("namespace-b-%d", uniqueID) - - // Subscribe to both namespaces - ch1 := bus.Subscribe(namespace1) - ch2 := bus.Subscribe(namespace2) - defer bus.Unsubscribe(namespace1, ch1) - defer bus.Unsubscribe(namespace2, ch2) - - // Give NATS time to set up subscriptions - time.Sleep(100 * time.Millisecond) - - // Publish event to namespace1 - event1 := &Event{ - ID: "evt-ns1", - EventType: "Namespace1Event", - ActorID: "actor-ns1", - Version: 1, - Data: map[string]interface{}{"namespace": "1"}, - Timestamp: time.Now(), - } - bus.Publish(namespace1, event1) - - // Publish event to namespace2 - event2 := &Event{ - ID: "evt-ns2", - EventType: "Namespace2Event", - ActorID: "actor-ns2", - Version: 1, - Data: map[string]interface{}{"namespace": "2"}, - Timestamp: time.Now(), - } - bus.Publish(namespace2, event2) - - // Collect events with timeout - received1 := make([]*Event, 0) - received2 := make([]*Event, 0) - - timeout := time.After(2 * time.Second) - for { - select { - case e := <-ch1: - received1 = append(received1, e) - case e := <-ch2: - received2 = append(received2, e) - case <-timeout: - goto done - } - } -done: - - // Verify namespace1 only received namespace1 event - if len(received1) != 1 { - t.Errorf("namespace1 expected 1 event, got %d", len(received1)) - } else if received1[0].ID != "evt-ns1" { - t.Errorf("namespace1 received wrong event: %s", received1[0].ID) - } - - // Verify namespace2 only received namespace2 event - if len(received2) != 1 { - t.Errorf("namespace2 expected 1 event, got %d", len(received2)) - } else if received2[0].ID != "evt-ns2" { - t.Errorf("namespace2 received wrong event: %s", received2[0].ID) - } -} - -// TestNATSEventBus_MultipleConnectionsNamespaceIsolation tests namespace isolation -// when using separate NATS connections (more realistic distributed scenario). -func TestNATSEventBus_MultipleConnectionsNamespaceIsolation(t *testing.T) { - // Create separate NATS connections - nc1, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Skipf("NATS not available: %v", err) - } - defer nc1.Close() - - nc2, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatalf("failed to create second connection: %v", err) - } - defer nc2.Close() - - // Create buses on different connections - bus1, err := NewNATSEventBus(nc1) - if err != nil { - t.Fatalf("failed to create bus1: %v", err) - } - defer bus1.Stop() - - bus2, err := NewNATSEventBus(nc2) - if err != nil { - t.Fatalf("failed to create bus2: %v", err) - } - defer bus2.Stop() - - uniqueID := time.Now().UnixNano() - namespaceA := fmt.Sprintf("tenant-a-%d", uniqueID) - namespaceB := fmt.Sprintf("tenant-b-%d", uniqueID) - - // bus1 subscribes to namespaceA - chA := bus1.Subscribe(namespaceA) - defer bus1.Unsubscribe(namespaceA, chA) - - // bus2 subscribes to namespaceB - chB := bus2.Subscribe(namespaceB) - defer bus2.Unsubscribe(namespaceB, chB) - - time.Sleep(100 * time.Millisecond) - - // Publish events - eventA := &Event{ - ID: "evt-tenant-a", - EventType: "TenantAEvent", - ActorID: "actor-a", - Version: 1, - Data: map[string]interface{}{"tenant": "a"}, - Timestamp: time.Now(), - } - bus1.Publish(namespaceA, eventA) - - eventB := &Event{ - ID: "evt-tenant-b", - EventType: "TenantBEvent", - ActorID: "actor-b", - Version: 1, - Data: map[string]interface{}{"tenant": "b"}, - Timestamp: time.Now(), - } - bus2.Publish(namespaceB, eventB) - - // Verify isolation - receivedA := false - receivedB := false - crossTalk := false - - timeout := time.After(2 * time.Second) -loop: - for { - select { - case e := <-chA: - if e.ID == "evt-tenant-a" { - receivedA = true - } else if e.ID == "evt-tenant-b" { - crossTalk = true - } - case e := <-chB: - if e.ID == "evt-tenant-b" { - receivedB = true - } else if e.ID == "evt-tenant-a" { - crossTalk = true - } - case <-timeout: - break loop - } - } - - if !receivedA { - t.Error("namespaceA did not receive its event") - } - if !receivedB { - t.Error("namespaceB did not receive its event") - } - if crossTalk { - t.Error("cross-namespace leakage detected!") - } -} - -// TestNATSEventBus_HighThroughput tests handling of many events in rapid succession. -func TestNATSEventBus_HighThroughput(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - namespace := fmt.Sprintf("high-throughput-%d", time.Now().UnixNano()) - numEvents := 1000 - - ch := bus.Subscribe(namespace) - defer bus.Unsubscribe(namespace, ch) - - // Synchronize subscriber readiness using a barrier event. - // This ensures the NATS subscription is fully established before bulk publishing. - readyEvent := &Event{ - ID: "ready-signal", - EventType: "ReadySignal", - ActorID: "actor-throughput", - Version: 0, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - bus.Publish(namespace, readyEvent) - - // Wait for the ready signal to be received - select { - case <-ch: - // Subscriber is ready, proceed with bulk publishing - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for subscriber readiness signal") - } - - // Start receiving events BEFORE publishing to avoid buffer overflow. - // The EventBus has a 100-event buffer and uses non-blocking sends, - // so we must consume events concurrently to avoid drops. - var receivedCount int32 - receiveStart := time.Now() - done := make(chan struct{}) - - go func() { - defer close(done) - timeout := time.After(60 * time.Second) - for atomic.LoadInt32(&receivedCount) < int32(numEvents) { - select { - case <-ch: - atomic.AddInt32(&receivedCount, 1) - case <-timeout: - return - } - } - }() - - // Publish many events rapidly - start := time.Now() - for i := 0; i < numEvents; i++ { - event := &Event{ - ID: fmt.Sprintf("evt-%d", i), - EventType: "HighThroughputEvent", - ActorID: "actor-throughput", - Version: int64(i + 1), - Data: map[string]interface{}{"index": i}, - Timestamp: time.Now(), - } - bus.Publish(namespace, event) - } - publishDuration := time.Since(start) - - // Wait for receiver to finish - <-done - receiveDuration := time.Since(receiveStart) - finalCount := atomic.LoadInt32(&receivedCount) - - t.Logf("Published %d events in %v (%.0f events/sec)", numEvents, publishDuration, float64(numEvents)/publishDuration.Seconds()) - t.Logf("Received %d events in %v (%.0f events/sec)", finalCount, receiveDuration, float64(finalCount)/receiveDuration.Seconds()) - - if finalCount != int32(numEvents) { - t.Errorf("expected %d events, received %d", numEvents, finalCount) - } -} - -// TestNATSEventBus_EventOrdering tests that events are received in the order they were published. -func TestNATSEventBus_EventOrdering(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - namespace := fmt.Sprintf("ordering-%d", time.Now().UnixNano()) - numEvents := 100 - - ch := bus.Subscribe(namespace) - defer bus.Unsubscribe(namespace, ch) - - // Synchronize subscriber readiness with barrier event - readyEvent := &Event{ - ID: "ordering-ready", - EventType: "ReadySignal", - ActorID: "actor-ordering", - Version: 0, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - bus.Publish(namespace, readyEvent) - - select { - case <-ch: - // Subscriber is ready - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for ordering readiness signal") - } - - // Publish events with sequence numbers - for i := 0; i < numEvents; i++ { - event := &Event{ - ID: fmt.Sprintf("evt-%d", i), - EventType: "OrderingTest", - ActorID: "actor-ordering", - Version: int64(i + 1), - Data: map[string]interface{}{"sequence": i}, - Timestamp: time.Now(), - } - bus.Publish(namespace, event) - } - - // Receive and verify ordering - received := make([]*Event, 0, numEvents) - timeout := time.After(15 * time.Second) - -loop: - for len(received) < numEvents { - select { - case e := <-ch: - received = append(received, e) - case <-timeout: - break loop - } - } - - if len(received) != numEvents { - t.Fatalf("expected %d events, got %d", numEvents, len(received)) - } - - // Verify ordering - for i, e := range received { - expectedSeq := i - // Handle both int (local delivery) and float64 (JSON/NATS delivery) types - var actualSeq int - switch v := e.Data["sequence"].(type) { - case int: - actualSeq = v - case float64: - actualSeq = int(v) - default: - t.Errorf("event %d: sequence not found or wrong type: %T", i, e.Data["sequence"]) - continue - } - if actualSeq != expectedSeq { - t.Errorf("event %d: sequence mismatch, got %d, want %d", i, actualSeq, expectedSeq) - } - } -} - -// TestNATSEventBus_NoCrossNamespaceLeakage performs explicit cross-namespace leakage testing. -func TestNATSEventBus_NoCrossNamespaceLeakage(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - uniqueID := time.Now().UnixNano() - namespaces := []string{ - fmt.Sprintf("ns-alpha-%d", uniqueID), - fmt.Sprintf("ns-beta-%d", uniqueID), - fmt.Sprintf("ns-gamma-%d", uniqueID), - } - - // Subscribe to all namespaces - channels := make(map[string]<-chan *Event) - for _, ns := range namespaces { - ch := bus.Subscribe(ns) - channels[ns] = ch - defer bus.Unsubscribe(ns, ch) - } - - time.Sleep(150 * time.Millisecond) - - // Publish unique events to each namespace - for i, ns := range namespaces { - event := &Event{ - ID: fmt.Sprintf("evt-%s-%d", ns, i), - EventType: "LeakageTest", - ActorID: fmt.Sprintf("actor-%s", ns), - Version: 1, - Data: map[string]interface{}{"namespace": ns}, - Timestamp: time.Now(), - } - bus.Publish(ns, event) - } - - // Track received events per namespace - receivedEvents := make(map[string][]*Event) - for _, ns := range namespaces { - receivedEvents[ns] = make([]*Event, 0) - } - - // Collect events with timeout - timeout := time.After(3 * time.Second) -collecting: - for { - select { - case e := <-channels[namespaces[0]]: - receivedEvents[namespaces[0]] = append(receivedEvents[namespaces[0]], e) - case e := <-channels[namespaces[1]]: - receivedEvents[namespaces[1]] = append(receivedEvents[namespaces[1]], e) - case e := <-channels[namespaces[2]]: - receivedEvents[namespaces[2]] = append(receivedEvents[namespaces[2]], e) - case <-timeout: - break collecting - } - } - - // Verify each namespace only received its own events - for _, ns := range namespaces { - events := receivedEvents[ns] - if len(events) != 1 { - t.Errorf("namespace %s: expected 1 event, got %d", ns, len(events)) - continue - } - - expectedNS, ok := events[0].Data["namespace"].(string) - if !ok { - t.Errorf("namespace %s: could not read event namespace", ns) - continue - } - if expectedNS != ns { - t.Errorf("namespace %s: received event from namespace %s (LEAKAGE!)", ns, expectedNS) - } - } -} - -// TestNATSEventBus_ConcurrentPublishSubscribe tests concurrent publish and subscribe operations. -func TestNATSEventBus_ConcurrentPublishSubscribe(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - namespace := fmt.Sprintf("concurrent-%d", time.Now().UnixNano()) - numPublishers := 10 - numEventsPerPublisher := 50 - totalExpected := numPublishers * numEventsPerPublisher - - ch := bus.Subscribe(namespace) - defer bus.Unsubscribe(namespace, ch) - - // Synchronize publisher readiness - readyEvent := &Event{ - ID: "concurrent-ready", - EventType: "ReadySignal", - ActorID: "coordinator", - Version: 0, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - bus.Publish(namespace, readyEvent) - - select { - case <-ch: - // Subscriber is ready - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for concurrent readiness signal") - } - - // Start receiving events BEFORE publishing to avoid buffer overflow. - // The EventBus has a 100-event buffer and uses non-blocking sends. - var receivedCount int32 - done := make(chan struct{}) - - go func() { - defer close(done) - timeout := time.After(30 * time.Second) - for atomic.LoadInt32(&receivedCount) < int32(totalExpected) { - select { - case <-ch: - atomic.AddInt32(&receivedCount, 1) - case <-timeout: - return - } - } - }() - - var wg sync.WaitGroup - wg.Add(numPublishers) - - // Start concurrent publishers - for p := 0; p < numPublishers; p++ { - go func(publisherID int) { - defer wg.Done() - for i := 0; i < numEventsPerPublisher; i++ { - event := &Event{ - ID: fmt.Sprintf("evt-%d-%d", publisherID, i), - EventType: "ConcurrentEvent", - ActorID: fmt.Sprintf("actor-%d", publisherID), - Version: int64(i + 1), - Data: map[string]interface{}{"publisher": publisherID, "index": i}, - Timestamp: time.Now(), - } - bus.Publish(namespace, event) - } - }(p) - } - - // Wait for all publishers to finish - wg.Wait() - - // Wait for receiver to finish - <-done - finalCount := atomic.LoadInt32(&receivedCount) - - if finalCount != int32(totalExpected) { - t.Errorf("expected %d events, received %d", totalExpected, finalCount) - } -} - -// TestNATSEventBus_MultipleSubscribersSameNamespace tests that multiple subscribers -// to the same namespace all receive the same events. -func TestNATSEventBus_MultipleSubscribersSameNamespace(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - namespace := fmt.Sprintf("multi-sub-%d", time.Now().UnixNano()) - numSubscribers := 5 - numEvents := 20 - - // Create multiple subscribers - channels := make([]<-chan *Event, numSubscribers) - for i := 0; i < numSubscribers; i++ { - ch := bus.Subscribe(namespace) - channels[i] = ch - defer bus.Unsubscribe(namespace, ch) - } - - time.Sleep(100 * time.Millisecond) - - // Publish events - for i := 0; i < numEvents; i++ { - event := &Event{ - ID: fmt.Sprintf("evt-multi-%d", i), - EventType: "MultiSubscriberEvent", - ActorID: "actor-multi", - Version: int64(i + 1), - Data: map[string]interface{}{"index": i}, - Timestamp: time.Now(), - } - bus.Publish(namespace, event) - } - - // Count events received by each subscriber - counts := make([]int32, numSubscribers) - var wg sync.WaitGroup - wg.Add(numSubscribers) - - for i := 0; i < numSubscribers; i++ { - go func(idx int) { - defer wg.Done() - timeout := time.After(5 * time.Second) - for { - select { - case <-channels[idx]: - atomic.AddInt32(&counts[idx], 1) - if atomic.LoadInt32(&counts[idx]) >= int32(numEvents) { - return - } - case <-timeout: - return - } - } - }(i) - } - - wg.Wait() - - // Verify all subscribers received all events - for i, count := range counts { - if count != int32(numEvents) { - t.Errorf("subscriber %d: expected %d events, got %d", i, numEvents, count) - } - } -} - -// TestNATSEventBus_EventMetadataPreserved tests that event metadata is preserved -// across NATS serialization/deserialization. -func TestNATSEventBus_EventMetadataPreserved(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - namespace := fmt.Sprintf("metadata-%d", time.Now().UnixNano()) - - ch := bus.Subscribe(namespace) - defer bus.Unsubscribe(namespace, ch) - - time.Sleep(100 * time.Millisecond) - - // Create event with metadata - event := &Event{ - ID: "evt-with-metadata", - EventType: "MetadataTest", - ActorID: "actor-metadata", - Version: 1, - Data: map[string]interface{}{"test": "data"}, - Timestamp: time.Now(), - } - event.SetCorrelationID("corr-123") - event.SetCausationID("cause-456") - event.SetUserID("user-789") - event.SetTraceID("trace-abc") - event.SetSpanID("span-def") - event.SetMetadata("customKey", "customValue") - - bus.Publish(namespace, event) - - // Receive and verify metadata - select { - case received := <-ch: - if received.GetCorrelationID() != "corr-123" { - t.Errorf("correlationId mismatch: got %q", received.GetCorrelationID()) - } - if received.GetCausationID() != "cause-456" { - t.Errorf("causationId mismatch: got %q", received.GetCausationID()) - } - if received.GetUserID() != "user-789" { - t.Errorf("userId mismatch: got %q", received.GetUserID()) - } - if received.GetTraceID() != "trace-abc" { - t.Errorf("traceId mismatch: got %q", received.GetTraceID()) - } - if received.GetSpanID() != "span-def" { - t.Errorf("spanId mismatch: got %q", received.GetSpanID()) - } - if received.GetMetadata("customKey") != "customValue" { - t.Errorf("customKey mismatch: got %q", received.GetMetadata("customKey")) - } - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for event") - } -} - -// TestNATSEventBus_LargeEventPayload tests handling of events with large data payloads. -func TestNATSEventBus_LargeEventPayload(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - namespace := fmt.Sprintf("large-payload-%d", time.Now().UnixNano()) - - ch := bus.Subscribe(namespace) - defer bus.Unsubscribe(namespace, ch) - - time.Sleep(100 * time.Millisecond) - - // Create a large payload (100KB) - largeString := make([]byte, 100*1024) - for i := range largeString { - largeString[i] = byte('a' + (i % 26)) - } - - event := &Event{ - ID: "evt-large", - EventType: "LargePayloadTest", - ActorID: "actor-large", - Version: 1, - Data: map[string]interface{}{"largeField": string(largeString)}, - Timestamp: time.Now(), - } - - bus.Publish(namespace, event) - - select { - case received := <-ch: - receivedPayload, ok := received.Data["largeField"].(string) - if !ok { - t.Fatal("largeField not found or wrong type") - } - if len(receivedPayload) != len(largeString) { - t.Errorf("payload size mismatch: got %d, want %d", len(receivedPayload), len(largeString)) - } - if receivedPayload != string(largeString) { - t.Error("payload content mismatch") - } - case <-time.After(10 * time.Second): - t.Fatal("timeout waiting for large event") - } -} - -// TestNATSEventBus_SubscribeUnsubscribe tests subscribe/unsubscribe lifecycle. -func TestNATSEventBus_SubscribeUnsubscribe(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - namespace := fmt.Sprintf("sub-unsub-%d", time.Now().UnixNano()) - - // Subscribe - ch := bus.Subscribe(namespace) - time.Sleep(100 * time.Millisecond) - - // Publish event - should be received - event1 := &Event{ - ID: "evt-before-unsub", - EventType: "SubUnsubTest", - ActorID: "actor-test", - Version: 1, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - bus.Publish(namespace, event1) - - select { - case <-ch: - // Good, received event - case <-time.After(2 * time.Second): - t.Fatal("timeout waiting for event before unsubscribe") - } - - // Unsubscribe - bus.Unsubscribe(namespace, ch) - - // Re-subscribe with new channel - ch2 := bus.Subscribe(namespace) - defer bus.Unsubscribe(namespace, ch2) - time.Sleep(100 * time.Millisecond) - - // Publish another event - should be received on new channel - event2 := &Event{ - ID: "evt-after-resub", - EventType: "SubUnsubTest", - ActorID: "actor-test", - Version: 2, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - bus.Publish(namespace, event2) - - select { - case e := <-ch2: - if e.ID != "evt-after-resub" { - t.Errorf("wrong event received: %s", e.ID) - } - case <-time.After(2 * time.Second): - t.Fatal("timeout waiting for event after resubscribe") - } -} - -// TestNATSEventBus_MultipleNodesMultipleNamespaces tests complex scenario with -// multiple nodes and multiple namespaces. -func TestNATSEventBus_MultipleNodesMultipleNamespaces(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - // Create multiple nodes - numNodes := 3 - nodes := make([]*NATSEventBus, numNodes) - for i := 0; i < numNodes; i++ { - node, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create node %d: %v", i, err) - } - nodes[i] = node - defer node.Stop() - } - - uniqueID := time.Now().UnixNano() - namespaces := []string{ - fmt.Sprintf("ns-multi-1-%d", uniqueID), - fmt.Sprintf("ns-multi-2-%d", uniqueID), - } - - // Each node subscribes to both namespaces - type subscription struct { - nodeIdx int - ns string - ch <-chan *Event - } - subs := make([]subscription, 0) - for i, node := range nodes { - for _, ns := range namespaces { - ch := node.Subscribe(ns) - subs = append(subs, subscription{nodeIdx: i, ns: ns, ch: ch}) - defer node.Unsubscribe(ns, ch) - } - } - - time.Sleep(150 * time.Millisecond) - - // Node 0 publishes to namespace 1 - event1 := &Event{ - ID: "evt-n0-ns1", - EventType: "MultiNodeMultiNS", - ActorID: "actor-test", - Version: 1, - Data: map[string]interface{}{"source": "node0", "namespace": namespaces[0]}, - Timestamp: time.Now(), - } - nodes[0].Publish(namespaces[0], event1) - - // Node 1 publishes to namespace 2 - event2 := &Event{ - ID: "evt-n1-ns2", - EventType: "MultiNodeMultiNS", - ActorID: "actor-test", - Version: 1, - Data: map[string]interface{}{"source": "node1", "namespace": namespaces[1]}, - Timestamp: time.Now(), - } - nodes[1].Publish(namespaces[1], event2) - - // Collect events - receivedEvents := make(map[string]map[int][]*Event) // namespace -> nodeIdx -> events - for _, ns := range namespaces { - receivedEvents[ns] = make(map[int][]*Event) - for i := 0; i < numNodes; i++ { - receivedEvents[ns][i] = make([]*Event, 0) - } - } - - timeout := time.After(5 * time.Second) -collecting: - for { - select { - case e := <-subs[0].ch: // node 0, ns 0 - receivedEvents[namespaces[0]][0] = append(receivedEvents[namespaces[0]][0], e) - case e := <-subs[1].ch: // node 0, ns 1 - receivedEvents[namespaces[1]][0] = append(receivedEvents[namespaces[1]][0], e) - case e := <-subs[2].ch: // node 1, ns 0 - receivedEvents[namespaces[0]][1] = append(receivedEvents[namespaces[0]][1], e) - case e := <-subs[3].ch: // node 1, ns 1 - receivedEvents[namespaces[1]][1] = append(receivedEvents[namespaces[1]][1], e) - case e := <-subs[4].ch: // node 2, ns 0 - receivedEvents[namespaces[0]][2] = append(receivedEvents[namespaces[0]][2], e) - case e := <-subs[5].ch: // node 2, ns 1 - receivedEvents[namespaces[1]][2] = append(receivedEvents[namespaces[1]][2], e) - case <-timeout: - break collecting - } - } - - // Verify: namespace 0 should have event from node 0 on all nodes (except node 0 which receives locally) - // Node 0 receives locally, nodes 1 and 2 receive via NATS - for nodeIdx := 0; nodeIdx < numNodes; nodeIdx++ { - events := receivedEvents[namespaces[0]][nodeIdx] - if len(events) != 1 { - t.Errorf("node %d, namespace %s: expected 1 event, got %d", nodeIdx, namespaces[0], len(events)) - } - } - - // Verify: namespace 1 should have event from node 1 on all nodes - for nodeIdx := 0; nodeIdx < numNodes; nodeIdx++ { - events := receivedEvents[namespaces[1]][nodeIdx] - if len(events) != 1 { - t.Errorf("node %d, namespace %s: expected 1 event, got %d", nodeIdx, namespaces[1], len(events)) - } - } -} - -// TestNATSEventBus_StopCleansUp tests that Stop properly cleans up resources. -func TestNATSEventBus_StopCleansUp(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create bus: %v", err) - } - - namespace := fmt.Sprintf("stop-cleanup-%d", time.Now().UnixNano()) - - // Subscribe - ch := bus.Subscribe(namespace) - time.Sleep(100 * time.Millisecond) - - // Stop the bus - bus.Stop() - - // Verify channel is closed - select { - case _, ok := <-ch: - if ok { - t.Error("expected channel to be closed after Stop") - } - case <-time.After(1 * time.Second): - t.Error("channel was not closed after Stop") - } -} - -// BenchmarkNATSEventBus_Publish benchmarks event publishing throughput. -func BenchmarkNATSEventBus_Publish(b *testing.B) { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - b.Skipf("NATS not available: %v", err) - } - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - b.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - namespace := fmt.Sprintf("bench-%d", time.Now().UnixNano()) - event := &Event{ - ID: "evt-bench", - EventType: "BenchmarkEvent", - ActorID: "actor-bench", - Version: 1, - Data: map[string]interface{}{"key": "value"}, - Timestamp: time.Now(), - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - bus.Publish(namespace, event) - } -} - -// BenchmarkNATSEventBus_PublishReceive benchmarks end-to-end event delivery. -func BenchmarkNATSEventBus_PublishReceive(b *testing.B) { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - b.Skipf("NATS not available: %v", err) - } - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - b.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - namespace := fmt.Sprintf("bench-e2e-%d", time.Now().UnixNano()) - ch := bus.Subscribe(namespace) - defer bus.Unsubscribe(namespace, ch) - - time.Sleep(100 * time.Millisecond) - - event := &Event{ - ID: "evt-bench", - EventType: "BenchmarkEvent", - ActorID: "actor-bench", - Version: 1, - Data: map[string]interface{}{"key": "value"}, - Timestamp: time.Now(), - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - bus.Publish(namespace, event) - <-ch - } -} - -// TestNATSEventBus_ReconnectionBehavior tests that subscriptions continue to work -// after a NATS reconnection scenario. This test simulates the reconnection by -// creating a new connection and verifying the bus handles it gracefully. -// Note: True reconnection testing requires a NATS server restart which is complex -// in automated tests. This test verifies the basic resilience patterns. -func TestNATSEventBus_ReconnectionBehavior(t *testing.T) { - // Create initial connection - nc1, err := nats.Connect(nats.DefaultURL, - nats.ReconnectWait(100*time.Millisecond), - nats.MaxReconnects(5), - ) - if err != nil { - t.Skipf("NATS not available: %v", err) - } - defer nc1.Close() - - bus, err := NewNATSEventBus(nc1) - if err != nil { - t.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - namespace := fmt.Sprintf("reconnect-test-%d", time.Now().UnixNano()) - - ch := bus.Subscribe(namespace) - defer bus.Unsubscribe(namespace, ch) - - time.Sleep(100 * time.Millisecond) - - // Publish event before any issues - event1 := &Event{ - ID: "evt-before", - EventType: "ReconnectTest", - ActorID: "actor-test", - Version: 1, - Data: map[string]interface{}{"phase": "before"}, - Timestamp: time.Now(), - } - bus.Publish(namespace, event1) - - // Verify event received - select { - case e := <-ch: - if e.ID != "evt-before" { - t.Errorf("unexpected event ID: %s", e.ID) - } - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for event before reconnect test") - } - - // Simulate some delay (in real reconnect scenario, there would be a disconnect/reconnect) - time.Sleep(200 * time.Millisecond) - - // Publish event after delay - should still work - event2 := &Event{ - ID: "evt-after", - EventType: "ReconnectTest", - ActorID: "actor-test", - Version: 2, - Data: map[string]interface{}{"phase": "after"}, - Timestamp: time.Now(), - } - bus.Publish(namespace, event2) - - // Verify event received - select { - case e := <-ch: - if e.ID != "evt-after" { - t.Errorf("unexpected event ID: %s", e.ID) - } - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for event after reconnect test") - } -} - -// TestNATSEventBus_ConnectionRecoveryWithNewSubscription tests that new subscriptions -// can be created and used after the bus has been running for a while. -func TestNATSEventBus_ConnectionRecoveryWithNewSubscription(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - uniqueID := time.Now().UnixNano() - namespace1 := fmt.Sprintf("recovery-ns1-%d", uniqueID) - namespace2 := fmt.Sprintf("recovery-ns2-%d", uniqueID) - - // First subscription - ch1 := bus.Subscribe(namespace1) - defer bus.Unsubscribe(namespace1, ch1) - - time.Sleep(100 * time.Millisecond) - - // Publish and receive on first namespace - event1 := &Event{ - ID: "evt-ns1", - EventType: "RecoveryTest", - ActorID: "actor-test", - Version: 1, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - bus.Publish(namespace1, event1) - - select { - case <-ch1: - // Good - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for first event") - } - - // Wait a bit, then create new subscription (simulating late join) - time.Sleep(500 * time.Millisecond) - - ch2 := bus.Subscribe(namespace2) - defer bus.Unsubscribe(namespace2, ch2) - - time.Sleep(100 * time.Millisecond) - - // Publish and receive on second namespace - event2 := &Event{ - ID: "evt-ns2", - EventType: "RecoveryTest", - ActorID: "actor-test", - Version: 1, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - bus.Publish(namespace2, event2) - - select { - case e := <-ch2: - if e.ID != "evt-ns2" { - t.Errorf("wrong event: %s", e.ID) - } - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for second namespace event") - } - - // Verify first namespace still works - event3 := &Event{ - ID: "evt-ns1-again", - EventType: "RecoveryTest", - ActorID: "actor-test", - Version: 2, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - bus.Publish(namespace1, event3) - - select { - case e := <-ch1: - if e.ID != "evt-ns1-again" { - t.Errorf("wrong event: %s", e.ID) - } - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for third event") - } -} - -// TestNATSEventBus_GracefulDegradation tests that the bus handles publish errors gracefully. -func TestNATSEventBus_GracefulDegradation(t *testing.T) { - nc := getNATSConnection(t) - defer nc.Close() - - bus, err := NewNATSEventBus(nc) - if err != nil { - t.Fatalf("failed to create bus: %v", err) - } - defer bus.Stop() - - namespace := fmt.Sprintf("graceful-%d", time.Now().UnixNano()) - - ch := bus.Subscribe(namespace) - defer bus.Unsubscribe(namespace, ch) - - time.Sleep(100 * time.Millisecond) - - // Publish many events rapidly - should handle without panic - for i := 0; i < 100; i++ { - event := &Event{ - ID: fmt.Sprintf("evt-graceful-%d", i), - EventType: "GracefulTest", - ActorID: "actor-test", - Version: int64(i + 1), - Data: map[string]interface{}{"index": i}, - Timestamp: time.Now(), - } - // Should not panic even under load - bus.Publish(namespace, event) - } - - // Drain events - received := 0 - timeout := time.After(10 * time.Second) -draining: - for received < 100 { - select { - case <-ch: - received++ - case <-timeout: - break draining - } - } - - if received < 90 { // Allow some tolerance for timing - t.Errorf("expected at least 90 events, got %d", received) - } -} diff --git a/store/jetstream_integration_test.go b/store/jetstream_integration_test.go deleted file mode 100644 index cd47769..0000000 --- a/store/jetstream_integration_test.go +++ /dev/null @@ -1,1538 +0,0 @@ -//go:build integration - -package store - -import ( - "errors" - "fmt" - "sync" - "sync/atomic" - "testing" - "time" - - "git.flowmade.one/flowmade-one/aether" - "github.com/nats-io/nats.go" -) - -// These integration tests require a running NATS server with JetStream enabled. -// Run with: go test -tags=integration -v ./store/... -// -// To start NATS with JetStream: nats-server -js - -// getTestNATSConnection creates a new NATS connection for testing. -// Returns nil if NATS is not available, allowing tests to skip gracefully. -func getTestNATSConnection(t *testing.T) *nats.Conn { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Skipf("NATS not available: %v (run 'nats-server -js' to enable integration tests)", err) - return nil - } - - // Verify JetStream is available - js, err := nc.JetStream() - if err != nil { - nc.Close() - t.Skipf("JetStream not available: %v (run 'nats-server -js' to enable integration tests)", err) - return nil - } - - // Test JetStream connectivity - _, err = js.AccountInfo() - if err != nil { - nc.Close() - t.Skipf("JetStream not enabled: %v (run 'nats-server -js' to enable integration tests)", err) - return nil - } - - return nc -} - -// uniqueStreamName generates a unique stream name for test isolation -func uniqueStreamName(prefix string) string { - return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano()) -} - -// cleanupStream deletes a stream to clean up after tests -func cleanupStream(nc *nats.Conn, streamName string) { - js, err := nc.JetStream() - if err != nil { - return - } - _ = js.DeleteStream(streamName) -} - -// === Stream Creation and Configuration Tests === - -func TestJetStreamEventStore_StreamCreation(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-stream-creation") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create JetStreamEventStore: %v", err) - } - - if store == nil { - t.Fatal("expected non-nil store") - } - - // Verify stream was created - js, _ := nc.JetStream() - info, err := js.StreamInfo(streamName) - if err != nil { - t.Fatalf("stream was not created: %v", err) - } - - if info.Config.Name != streamName { - t.Errorf("stream name mismatch: got %q, want %q", info.Config.Name, streamName) - } - - // Verify stream has correct subjects - expectedSubjects := []string{ - fmt.Sprintf("%s.events.>", streamName), - fmt.Sprintf("%s.snapshots.>", streamName), - } - for _, expected := range expectedSubjects { - found := false - for _, subject := range info.Config.Subjects { - if subject == expected { - found = true - break - } - } - if !found { - t.Errorf("expected subject %q not found in stream config", expected) - } - } -} - -func TestJetStreamEventStore_StreamCreationWithConfig(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-stream-config") - defer cleanupStream(nc, streamName) - - config := JetStreamConfig{ - StreamRetention: 7 * 24 * time.Hour, // 7 days - ReplicaCount: 1, - } - - store, err := NewJetStreamEventStoreWithConfig(nc, streamName, config) - if err != nil { - t.Fatalf("failed to create JetStreamEventStore with config: %v", err) - } - - if store == nil { - t.Fatal("expected non-nil store") - } - - // Verify stream configuration - js, _ := nc.JetStream() - info, err := js.StreamInfo(streamName) - if err != nil { - t.Fatalf("stream was not created: %v", err) - } - - if info.Config.MaxAge != 7*24*time.Hour { - t.Errorf("MaxAge mismatch: got %v, want %v", info.Config.MaxAge, 7*24*time.Hour) - } -} - -func TestJetStreamEventStore_StreamCreationWithNamespace(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - baseName := uniqueStreamName("test-ns") - namespace := "tenant-abc" - expectedStreamName := fmt.Sprintf("%s_%s", namespace, baseName) - defer cleanupStream(nc, expectedStreamName) - - store, err := NewJetStreamEventStoreWithNamespace(nc, baseName, namespace) - if err != nil { - t.Fatalf("failed to create JetStreamEventStore with namespace: %v", err) - } - - if store.GetNamespace() != namespace { - t.Errorf("namespace mismatch: got %q, want %q", store.GetNamespace(), namespace) - } - - if store.GetStreamName() != expectedStreamName { - t.Errorf("stream name mismatch: got %q, want %q", store.GetStreamName(), expectedStreamName) - } - - // Verify namespaced stream was created - js, _ := nc.JetStream() - info, err := js.StreamInfo(expectedStreamName) - if err != nil { - t.Fatalf("namespaced stream was not created: %v", err) - } - - if info.Config.Name != expectedStreamName { - t.Errorf("stream name mismatch: got %q, want %q", info.Config.Name, expectedStreamName) - } -} - -func TestJetStreamEventStore_StreamAlreadyExists(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-stream-exists") - defer cleanupStream(nc, streamName) - - // Create first store (creates stream) - store1, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create first store: %v", err) - } - if store1 == nil { - t.Fatal("expected non-nil store") - } - - // Create second store with same stream name (should reuse existing stream) - store2, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create second store with existing stream: %v", err) - } - if store2 == nil { - t.Fatal("expected non-nil store") - } -} - -// === SaveEvent Tests === - -func TestJetStreamEventStore_SaveEvent_PersistsToJetStream(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-save-event") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - event := &aether.Event{ - ID: "evt-123", - EventType: "OrderPlaced", - ActorID: "order-456", - Version: 1, - Data: map[string]interface{}{ - "total": 100.50, - "currency": "USD", - }, - Timestamp: time.Now(), - } - - err = store.SaveEvent(event) - if err != nil { - t.Fatalf("SaveEvent failed: %v", err) - } - - // Verify event was persisted by retrieving it - events, err := store.GetEvents("order-456", 0) - if err != nil { - t.Fatalf("GetEvents failed: %v", err) - } - - if len(events) != 1 { - t.Fatalf("expected 1 event, got %d", len(events)) - } - - retrieved := events[0] - if retrieved.ID != event.ID { - t.Errorf("ID mismatch: got %q, want %q", retrieved.ID, event.ID) - } - if retrieved.EventType != event.EventType { - t.Errorf("EventType mismatch: got %q, want %q", retrieved.EventType, event.EventType) - } - if retrieved.ActorID != event.ActorID { - t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, event.ActorID) - } - if retrieved.Version != event.Version { - t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, event.Version) - } -} - -func TestJetStreamEventStore_SaveEvent_MultipleEvents(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-multi-events") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - // Save multiple events - for i := 1; i <= 10; i++ { - event := &aether.Event{ - ID: fmt.Sprintf("evt-%d", i), - EventType: "OrderUpdated", - ActorID: "order-456", - Version: int64(i), - Data: map[string]interface{}{"update": i}, - Timestamp: time.Now(), - } - if err := store.SaveEvent(event); err != nil { - t.Fatalf("SaveEvent failed for event %d: %v", i, err) - } - } - - events, err := store.GetEvents("order-456", 0) - if err != nil { - t.Fatalf("GetEvents failed: %v", err) - } - - if len(events) != 10 { - t.Errorf("expected 10 events, got %d", len(events)) - } -} - -func TestJetStreamEventStore_SaveEvent_WithMetadata(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-event-metadata") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - event := &aether.Event{ - ID: "evt-meta", - EventType: "OrderPlaced", - ActorID: "order-456", - Version: 1, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - event.SetCorrelationID("corr-123") - event.SetCausationID("cause-456") - event.SetUserID("user-789") - event.SetTraceID("trace-abc") - event.SetSpanID("span-def") - - err = store.SaveEvent(event) - if err != nil { - t.Fatalf("SaveEvent failed: %v", err) - } - - events, err := store.GetEvents("order-456", 0) - if err != nil { - t.Fatalf("GetEvents failed: %v", err) - } - - if len(events) != 1 { - t.Fatalf("expected 1 event, got %d", len(events)) - } - - retrieved := events[0] - if retrieved.GetCorrelationID() != "corr-123" { - t.Errorf("correlationId mismatch: got %q", retrieved.GetCorrelationID()) - } - if retrieved.GetCausationID() != "cause-456" { - t.Errorf("causationId mismatch: got %q", retrieved.GetCausationID()) - } - if retrieved.GetUserID() != "user-789" { - t.Errorf("userId mismatch: got %q", retrieved.GetUserID()) - } - if retrieved.GetTraceID() != "trace-abc" { - t.Errorf("traceId mismatch: got %q", retrieved.GetTraceID()) - } - if retrieved.GetSpanID() != "span-def" { - t.Errorf("spanId mismatch: got %q", retrieved.GetSpanID()) - } -} - -func TestJetStreamEventStore_SaveEvent_Deduplication(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-dedup") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - event := &aether.Event{ - ID: "evt-dedup-test", - EventType: "OrderPlaced", - ActorID: "order-456", - Version: 1, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - - // Save the same event twice (same event ID) - err = store.SaveEvent(event) - if err != nil { - t.Fatalf("first SaveEvent failed: %v", err) - } - - // Second save with same event ID and same version should fail with version conflict - // because version 1 already exists - err = store.SaveEvent(event) - if err == nil { - // If no error, that's a problem - we expected version conflict - t.Error("expected error when saving duplicate event, got nil") - } else if !errors.Is(err, aether.ErrVersionConflict) { - t.Errorf("expected version conflict error, got: %v", err) - } -} - -func TestJetStreamEventStore_SaveEvent_VersionConflict(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-version-conflict") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - // Save first event with version 5 - event1 := &aether.Event{ - ID: "evt-1", - EventType: "TestEvent", - ActorID: "actor-123", - Version: 5, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store.SaveEvent(event1); err != nil { - t.Fatalf("SaveEvent failed for first event: %v", err) - } - - // Attempt to save event with lower version (should fail) - event2 := &aether.Event{ - ID: "evt-2", - EventType: "TestEvent", - ActorID: "actor-123", - Version: 3, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - err = store.SaveEvent(event2) - if err == nil { - t.Fatal("expected error when saving event with lower version, got nil") - } - - if !errors.Is(err, aether.ErrVersionConflict) { - t.Errorf("expected ErrVersionConflict, got %v", err) - } - - var versionErr *aether.VersionConflictError - if !errors.As(err, &versionErr) { - t.Fatalf("expected VersionConflictError, got %T", err) - } - - if versionErr.ActorID != "actor-123" { - t.Errorf("ActorID mismatch: got %q, want %q", versionErr.ActorID, "actor-123") - } - if versionErr.CurrentVersion != 5 { - t.Errorf("CurrentVersion mismatch: got %d, want %d", versionErr.CurrentVersion, 5) - } - if versionErr.AttemptedVersion != 3 { - t.Errorf("AttemptedVersion mismatch: got %d, want %d", versionErr.AttemptedVersion, 3) - } -} - -func TestJetStreamEventStore_SaveEvent_VersionConflictEqual(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-version-equal") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - // Save first event with version 5 - event1 := &aether.Event{ - ID: "evt-1", - EventType: "TestEvent", - ActorID: "actor-123", - Version: 5, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store.SaveEvent(event1); err != nil { - t.Fatalf("SaveEvent failed: %v", err) - } - - // Attempt to save event with equal version (should fail) - event2 := &aether.Event{ - ID: "evt-2", - EventType: "TestEvent", - ActorID: "actor-123", - Version: 5, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - err = store.SaveEvent(event2) - if err == nil { - t.Fatal("expected error when saving event with equal version, got nil") - } - - if !errors.Is(err, aether.ErrVersionConflict) { - t.Errorf("expected ErrVersionConflict, got %v", err) - } -} - -// === GetEvents Tests === - -func TestJetStreamEventStore_GetEvents_RetrievesInOrder(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-get-order") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - // Save events in order - for i := 1; i <= 10; i++ { - event := &aether.Event{ - ID: fmt.Sprintf("evt-%d", i), - EventType: "TestEvent", - ActorID: "actor-123", - Version: int64(i), - Data: map[string]interface{}{"index": i}, - Timestamp: time.Now(), - } - if err := store.SaveEvent(event); err != nil { - t.Fatalf("SaveEvent failed: %v", err) - } - } - - events, err := store.GetEvents("actor-123", 0) - if err != nil { - t.Fatalf("GetEvents failed: %v", err) - } - - if len(events) != 10 { - t.Fatalf("expected 10 events, got %d", len(events)) - } - - // Verify order - for i, event := range events { - expectedID := fmt.Sprintf("evt-%d", i+1) - if event.ID != expectedID { - t.Errorf("event %d: got ID %q, want %q", i, event.ID, expectedID) - } - expectedVersion := int64(i + 1) - if event.Version != expectedVersion { - t.Errorf("event %d: got Version %d, want %d", i, event.Version, expectedVersion) - } - } -} - -func TestJetStreamEventStore_GetEvents_FromVersionFilters(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-from-version") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - // Save events with versions 1-10 - for i := 1; i <= 10; i++ { - event := &aether.Event{ - ID: fmt.Sprintf("evt-%d", i), - EventType: "TestEvent", - ActorID: "actor-123", - Version: int64(i), - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store.SaveEvent(event); err != nil { - t.Fatalf("SaveEvent failed: %v", err) - } - } - - testCases := []struct { - name string - fromVersion int64 - expectedLen int - minVersion int64 - }{ - {"from version 0", 0, 10, 1}, - {"from version 5", 5, 5, 6}, - {"from version 10", 10, 0, 0}, - {"from version 11", 11, 0, 0}, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - events, err := store.GetEvents("actor-123", tc.fromVersion) - if err != nil { - t.Fatalf("GetEvents failed: %v", err) - } - - if len(events) != tc.expectedLen { - t.Errorf("expected %d events, got %d", tc.expectedLen, len(events)) - } - - // Verify all returned events have version > fromVersion - for _, event := range events { - if event.Version <= tc.fromVersion { - t.Errorf("event version %d is not greater than fromVersion %d", event.Version, tc.fromVersion) - } - } - }) - } -} - -func TestJetStreamEventStore_GetEvents_NonExistentActor(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-nonexistent") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - events, err := store.GetEvents("non-existent-actor", 0) - if err != nil { - t.Fatalf("GetEvents should not error for non-existent actor: %v", err) - } - - if len(events) != 0 { - t.Errorf("expected 0 events for non-existent actor, got %d", len(events)) - } -} - -func TestJetStreamEventStore_GetEvents_MultipleActors(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-multi-actors") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - // Save events for different actors - actors := []string{"actor-1", "actor-2", "actor-3"} - for _, actorID := range actors { - for i := 1; i <= 3; i++ { - event := &aether.Event{ - ID: fmt.Sprintf("evt-%s-%d", actorID, i), - EventType: "TestEvent", - ActorID: actorID, - Version: int64(i), - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store.SaveEvent(event); err != nil { - t.Fatalf("SaveEvent failed: %v", err) - } - } - } - - // Verify each actor has its own events - for _, actorID := range actors { - events, err := store.GetEvents(actorID, 0) - if err != nil { - t.Fatalf("GetEvents failed for %s: %v", actorID, err) - } - if len(events) != 3 { - t.Errorf("expected 3 events for %s, got %d", actorID, len(events)) - } - for _, event := range events { - if event.ActorID != actorID { - t.Errorf("event has wrong ActorID: got %q, want %q", event.ActorID, actorID) - } - } - } -} - -func TestJetStreamEventStore_GetEventsWithErrors(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-with-errors") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - // Save valid events - for i := 1; i <= 5; i++ { - event := &aether.Event{ - ID: fmt.Sprintf("evt-%d", i), - EventType: "TestEvent", - ActorID: "actor-123", - Version: int64(i), - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store.SaveEvent(event); err != nil { - t.Fatalf("SaveEvent failed: %v", err) - } - } - - result, err := store.GetEventsWithErrors("actor-123", 0) - if err != nil { - t.Fatalf("GetEventsWithErrors failed: %v", err) - } - - if len(result.Events) != 5 { - t.Errorf("expected 5 events, got %d", len(result.Events)) - } - - if result.HasErrors() { - t.Errorf("expected no errors, got %d", len(result.Errors)) - } -} - -// === GetLatestVersion Tests === - -func TestJetStreamEventStore_GetLatestVersion(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-latest-version") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - // Save events with versions 1-5 - for i := 1; i <= 5; i++ { - event := &aether.Event{ - ID: fmt.Sprintf("evt-%d", i), - EventType: "TestEvent", - ActorID: "actor-123", - Version: int64(i), - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store.SaveEvent(event); err != nil { - t.Fatalf("SaveEvent failed: %v", err) - } - } - - latestVersion, err := store.GetLatestVersion("actor-123") - if err != nil { - t.Fatalf("GetLatestVersion failed: %v", err) - } - - if latestVersion != 5 { - t.Errorf("expected latest version 5, got %d", latestVersion) - } -} - -func TestJetStreamEventStore_GetLatestVersion_NonExistentActor(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-latest-nonexistent") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - latestVersion, err := store.GetLatestVersion("non-existent-actor") - if err != nil { - t.Fatalf("GetLatestVersion should not error for non-existent actor: %v", err) - } - - if latestVersion != 0 { - t.Errorf("expected version 0 for non-existent actor, got %d", latestVersion) - } -} - -func TestJetStreamEventStore_GetLatestVersion_UpdatesAfterNewEvent(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-latest-updates") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - // Save first event - event1 := &aether.Event{ - ID: "evt-1", - EventType: "TestEvent", - ActorID: "actor-123", - Version: 1, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store.SaveEvent(event1); err != nil { - t.Fatalf("SaveEvent failed: %v", err) - } - - version1, err := store.GetLatestVersion("actor-123") - if err != nil { - t.Fatalf("GetLatestVersion failed: %v", err) - } - if version1 != 1 { - t.Errorf("expected version 1, got %d", version1) - } - - // Save second event - event2 := &aether.Event{ - ID: "evt-2", - EventType: "TestEvent", - ActorID: "actor-123", - Version: 10, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store.SaveEvent(event2); err != nil { - t.Fatalf("SaveEvent failed: %v", err) - } - - version2, err := store.GetLatestVersion("actor-123") - if err != nil { - t.Fatalf("GetLatestVersion failed: %v", err) - } - if version2 != 10 { - t.Errorf("expected version 10, got %d", version2) - } -} - -// === Snapshot Tests === - -func TestJetStreamEventStore_SaveAndGetSnapshot(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-snapshot") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - snapshot := &aether.ActorSnapshot{ - ActorID: "actor-123", - Version: 10, - State: map[string]interface{}{ - "balance": 100.50, - "status": "active", - }, - Timestamp: time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC), - } - - err = store.SaveSnapshot(snapshot) - if err != nil { - t.Fatalf("SaveSnapshot failed: %v", err) - } - - retrieved, err := store.GetLatestSnapshot("actor-123") - if err != nil { - t.Fatalf("GetLatestSnapshot failed: %v", err) - } - - if retrieved.ActorID != snapshot.ActorID { - t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, snapshot.ActorID) - } - if retrieved.Version != snapshot.Version { - t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, snapshot.Version) - } - - // Check state values (JSON unmarshaling may change types) - balance, ok := retrieved.State["balance"].(float64) - if !ok { - t.Errorf("balance is not float64: %T", retrieved.State["balance"]) - } else if balance != 100.50 { - t.Errorf("balance mismatch: got %v, want %v", balance, 100.50) - } -} - -func TestJetStreamEventStore_GetLatestSnapshot_MultipleSnapshots(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-multi-snapshot") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - // Save multiple snapshots - for i := 1; i <= 5; i++ { - snapshot := &aether.ActorSnapshot{ - ActorID: "actor-123", - Version: int64(i * 10), - State: map[string]interface{}{ - "iteration": i, - }, - Timestamp: time.Now(), - } - if err := store.SaveSnapshot(snapshot); err != nil { - t.Fatalf("SaveSnapshot failed for version %d: %v", i*10, err) - } - } - - // Get latest should return the most recently saved - retrieved, err := store.GetLatestSnapshot("actor-123") - if err != nil { - t.Fatalf("GetLatestSnapshot failed: %v", err) - } - - if retrieved.Version != 50 { - t.Errorf("expected version 50, got %d", retrieved.Version) - } -} - -func TestJetStreamEventStore_GetLatestSnapshot_NonExistent(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-snapshot-nonexistent") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - _, err = store.GetLatestSnapshot("non-existent-actor") - if err == nil { - t.Error("expected error when getting snapshot for non-existent actor") - } -} - -func TestJetStreamEventStore_SnapshotWithComplexState(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-snapshot-complex") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - complexState := map[string]interface{}{ - "string": "hello", - "integer": float64(42), // JSON numbers are float64 - "float": 3.14159, - "boolean": true, - "null": nil, - "array": []interface{}{"a", "b", "c"}, - "nested": map[string]interface{}{ - "level1": map[string]interface{}{ - "level2": "deep value", - }, - }, - } - - snapshot := &aether.ActorSnapshot{ - ActorID: "actor-complex", - Version: 1, - State: complexState, - Timestamp: time.Now(), - } - - if err := store.SaveSnapshot(snapshot); err != nil { - t.Fatalf("SaveSnapshot failed: %v", err) - } - - retrieved, err := store.GetLatestSnapshot("actor-complex") - if err != nil { - t.Fatalf("GetLatestSnapshot failed: %v", err) - } - - // Verify fields - if retrieved.State["string"] != "hello" { - t.Errorf("string mismatch: got %v", retrieved.State["string"]) - } - if retrieved.State["boolean"] != true { - t.Errorf("boolean mismatch: got %v", retrieved.State["boolean"]) - } -} - -// === Namespace Isolation Tests === - -func TestJetStreamEventStore_NamespaceIsolation(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - baseName := uniqueStreamName("test-isolation") - ns1 := "tenant-a" - ns2 := "tenant-b" - expectedStream1 := fmt.Sprintf("%s_%s", ns1, baseName) - expectedStream2 := fmt.Sprintf("%s_%s", ns2, baseName) - defer cleanupStream(nc, expectedStream1) - defer cleanupStream(nc, expectedStream2) - - store1, err := NewJetStreamEventStoreWithNamespace(nc, baseName, ns1) - if err != nil { - t.Fatalf("failed to create store1: %v", err) - } - - store2, err := NewJetStreamEventStoreWithNamespace(nc, baseName, ns2) - if err != nil { - t.Fatalf("failed to create store2: %v", err) - } - - // Save events to namespace 1 - event1 := &aether.Event{ - ID: "evt-ns1", - EventType: "TestEvent", - ActorID: "actor-123", - Version: 1, - Data: map[string]interface{}{"namespace": "tenant-a"}, - Timestamp: time.Now(), - } - if err := store1.SaveEvent(event1); err != nil { - t.Fatalf("SaveEvent failed for ns1: %v", err) - } - - // Save events to namespace 2 - event2 := &aether.Event{ - ID: "evt-ns2", - EventType: "TestEvent", - ActorID: "actor-123", - Version: 1, - Data: map[string]interface{}{"namespace": "tenant-b"}, - Timestamp: time.Now(), - } - if err := store2.SaveEvent(event2); err != nil { - t.Fatalf("SaveEvent failed for ns2: %v", err) - } - - // Verify isolation: store1 only sees tenant-a events - events1, err := store1.GetEvents("actor-123", 0) - if err != nil { - t.Fatalf("GetEvents failed for store1: %v", err) - } - if len(events1) != 1 { - t.Errorf("store1: expected 1 event, got %d", len(events1)) - } - if events1[0].Data["namespace"] != "tenant-a" { - t.Errorf("store1: got event from wrong namespace: %v", events1[0].Data["namespace"]) - } - - // Verify isolation: store2 only sees tenant-b events - events2, err := store2.GetEvents("actor-123", 0) - if err != nil { - t.Fatalf("GetEvents failed for store2: %v", err) - } - if len(events2) != 1 { - t.Errorf("store2: expected 1 event, got %d", len(events2)) - } - if events2[0].Data["namespace"] != "tenant-b" { - t.Errorf("store2: got event from wrong namespace: %v", events2[0].Data["namespace"]) - } -} - -// === Concurrency Tests === - -func TestJetStreamEventStore_ConcurrentWrites(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-concurrent") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - numGoroutines := 10 - eventsPerGoroutine := 10 - - var wg sync.WaitGroup - wg.Add(numGoroutines) - - for g := 0; g < numGoroutines; g++ { - go func(goroutineID int) { - defer wg.Done() - actorID := fmt.Sprintf("actor-%d", goroutineID) - for i := 1; i <= eventsPerGoroutine; i++ { - event := &aether.Event{ - ID: fmt.Sprintf("evt-%d-%d", goroutineID, i), - EventType: "TestEvent", - ActorID: actorID, - Version: int64(i), - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store.SaveEvent(event); err != nil { - t.Errorf("SaveEvent failed: %v", err) - } - } - }(g) - } - - wg.Wait() - - // Verify each actor has all events - for g := 0; g < numGoroutines; g++ { - actorID := fmt.Sprintf("actor-%d", g) - events, err := store.GetEvents(actorID, 0) - if err != nil { - t.Errorf("GetEvents failed for %s: %v", actorID, err) - continue - } - if len(events) != eventsPerGoroutine { - t.Errorf("expected %d events for %s, got %d", eventsPerGoroutine, actorID, len(events)) - } - } -} - -func TestJetStreamEventStore_ConcurrentVersionConflict(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-concurrent-conflict") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - numGoroutines := 50 - var successCount int64 - var conflictCount int64 - var wg sync.WaitGroup - - // All goroutines try to save version 1 - wg.Add(numGoroutines) - for i := 0; i < numGoroutines; i++ { - go func(id int) { - defer wg.Done() - event := &aether.Event{ - ID: fmt.Sprintf("evt-%d", id), - EventType: "TestEvent", - ActorID: "actor-contested", - Version: 1, - Data: map[string]interface{}{"goroutine": id}, - Timestamp: time.Now(), - } - err := store.SaveEvent(event) - if err == nil { - atomic.AddInt64(&successCount, 1) - } else if errors.Is(err, aether.ErrVersionConflict) { - atomic.AddInt64(&conflictCount, 1) - } else { - t.Errorf("unexpected error: %v", err) - } - }(i) - } - - wg.Wait() - - // Exactly one should succeed - if successCount != 1 { - t.Errorf("expected exactly 1 success, got %d", successCount) - } - if conflictCount != int64(numGoroutines-1) { - t.Errorf("expected %d conflicts, got %d", numGoroutines-1, conflictCount) - } - - // Verify only one event was stored - events, err := store.GetEvents("actor-contested", 0) - if err != nil { - t.Fatalf("GetEvents failed: %v", err) - } - if len(events) != 1 { - t.Errorf("expected 1 event, got %d", len(events)) - } -} - -// === Connection Loss/Recovery Tests === - -func TestJetStreamEventStore_PersistenceAcrossConnections(t *testing.T) { - nc1 := getTestNATSConnection(t) - - streamName := uniqueStreamName("test-persistence") - defer func() { - nc := getTestNATSConnection(t) - cleanupStream(nc, streamName) - nc.Close() - }() - - // Create store and save events with first connection - store1, err := NewJetStreamEventStore(nc1, streamName) - if err != nil { - t.Fatalf("failed to create store1: %v", err) - } - - for i := 1; i <= 5; i++ { - event := &aether.Event{ - ID: fmt.Sprintf("evt-%d", i), - EventType: "TestEvent", - ActorID: "actor-123", - Version: int64(i), - Data: map[string]interface{}{"index": i}, - Timestamp: time.Now(), - } - if err := store1.SaveEvent(event); err != nil { - t.Fatalf("SaveEvent failed: %v", err) - } - } - - // Close first connection - nc1.Close() - - // Create new connection and store - nc2 := getTestNATSConnection(t) - defer nc2.Close() - - store2, err := NewJetStreamEventStore(nc2, streamName) - if err != nil { - t.Fatalf("failed to create store2: %v", err) - } - - // Verify events are still there - events, err := store2.GetEvents("actor-123", 0) - if err != nil { - t.Fatalf("GetEvents failed: %v", err) - } - - if len(events) != 5 { - t.Errorf("expected 5 events after reconnection, got %d", len(events)) - } - - // Verify we can continue adding events - event6 := &aether.Event{ - ID: "evt-6", - EventType: "TestEvent", - ActorID: "actor-123", - Version: 6, - Data: map[string]interface{}{"index": 6}, - Timestamp: time.Now(), - } - if err := store2.SaveEvent(event6); err != nil { - t.Fatalf("SaveEvent failed after reconnection: %v", err) - } - - events, err = store2.GetEvents("actor-123", 0) - if err != nil { - t.Fatalf("GetEvents failed: %v", err) - } - - if len(events) != 6 { - t.Errorf("expected 6 events after adding one, got %d", len(events)) - } -} - -func TestJetStreamEventStore_MultipleStoreInstances(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-multi-instance") - defer cleanupStream(nc, streamName) - - // Create multiple store instances on the same stream - store1, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store1: %v", err) - } - - store2, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store2: %v", err) - } - - // Save events from store1 - event1 := &aether.Event{ - ID: "evt-from-store1", - EventType: "TestEvent", - ActorID: "actor-123", - Version: 1, - Data: map[string]interface{}{"source": "store1"}, - Timestamp: time.Now(), - } - if err := store1.SaveEvent(event1); err != nil { - t.Fatalf("SaveEvent from store1 failed: %v", err) - } - - // Read from store2 - events, err := store2.GetEvents("actor-123", 0) - if err != nil { - t.Fatalf("GetEvents from store2 failed: %v", err) - } - - if len(events) != 1 { - t.Errorf("store2 should see event from store1, got %d events", len(events)) - } - - // Save from store2 (continuing version sequence) - event2 := &aether.Event{ - ID: "evt-from-store2", - EventType: "TestEvent", - ActorID: "actor-123", - Version: 2, - Data: map[string]interface{}{"source": "store2"}, - Timestamp: time.Now(), - } - if err := store2.SaveEvent(event2); err != nil { - t.Fatalf("SaveEvent from store2 failed: %v", err) - } - - // Read from store1 - events, err = store1.GetEvents("actor-123", 0) - if err != nil { - t.Fatalf("GetEvents from store1 failed: %v", err) - } - - if len(events) != 2 { - t.Errorf("store1 should see both events, got %d events", len(events)) - } -} - - -// === Cache Invalidation Tests === - -func TestJetStreamEventStore_CacheInvalidationOnExternalWrite(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-cache-invalidation") - defer cleanupStream(nc, streamName) - - // Create two stores for the same stream - store1, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store1: %v", err) - } - - store2, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store2: %v", err) - } - - actorID := "actor-cache-test" - - // store1: Save event v1 (caches version 1) - event1 := &aether.Event{ - ID: "evt-1", - EventType: "TestEvent", - ActorID: actorID, - Version: 1, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store1.SaveEvent(event1); err != nil { - t.Fatalf("SaveEvent from store1 failed: %v", err) - } - - // Verify store1 sees version 1 (uses cache) - v1, err := store1.GetLatestVersion(actorID) - if err != nil { - t.Fatalf("GetLatestVersion from store1 failed: %v", err) - } - if v1 != 1 { - t.Errorf("store1 should see version 1, got %d", v1) - } - - // store2: Save event v2 (external write from store1's perspective) - event2 := &aether.Event{ - ID: "evt-2", - EventType: "TestEvent", - ActorID: actorID, - Version: 2, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store2.SaveEvent(event2); err != nil { - t.Fatalf("SaveEvent from store2 failed: %v", err) - } - - // store1: GetLatestVersion should detect external write and return v2 - // (This triggers cache invalidation because actual version > cached version) - v2, err := store1.GetLatestVersion(actorID) - if err != nil { - t.Fatalf("GetLatestVersion from store1 (after external write) failed: %v", err) - } - if v2 != 2 { - t.Errorf("store1 should see version 2 after external write, got %d", v2) - } - - // Verify cache was repopulated - second GetLatestVersion should use cache efficiently - v2Again, err := store1.GetLatestVersion(actorID) - if err != nil { - t.Fatalf("Second GetLatestVersion from store1 failed: %v", err) - } - if v2Again != 2 { - t.Errorf("store1 cache should have version 2, got %d", v2Again) - } - - // store2: Save event v3 (another external write) - event3 := &aether.Event{ - ID: "evt-3", - EventType: "TestEvent", - ActorID: actorID, - Version: 3, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store2.SaveEvent(event3); err != nil { - t.Fatalf("SaveEvent from store2 (v3) failed: %v", err) - } - - // store1: After cache invalidation, SaveEvent should use fresh data from JetStream - event4 := &aether.Event{ - ID: "evt-4", - EventType: "TestEvent", - ActorID: actorID, - Version: 4, - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store1.SaveEvent(event4); err != nil { - t.Fatalf("SaveEvent from store1 (after cache invalidation) failed: %v", err) - } - - // Verify all 4 events are persisted - events, err := store1.GetEvents(actorID, 0) - if err != nil { - t.Fatalf("GetEvents failed: %v", err) - } - if len(events) != 4 { - t.Errorf("expected 4 events after cache invalidation, got %d", len(events)) - } -} - -// === Interface Compliance Tests === - -func TestJetStreamEventStore_ImplementsEventStore(t *testing.T) { - var _ aether.EventStore = (*JetStreamEventStore)(nil) -} - -func TestJetStreamEventStore_ImplementsEventStoreWithErrors(t *testing.T) { - var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil) -} - -func TestJetStreamEventStore_ImplementsSnapshotStore(t *testing.T) { - nc := getTestNATSConnection(t) - defer nc.Close() - - streamName := uniqueStreamName("test-interface") - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - t.Fatalf("failed to create store: %v", err) - } - - // Verify it has all SnapshotStore methods - _ = store.SaveEvent - _ = store.GetEvents - _ = store.GetLatestVersion - _ = store.GetLatestSnapshot - _ = store.SaveSnapshot -} - -// === Benchmarks === - -func BenchmarkJetStreamEventStore_SaveEvent(b *testing.B) { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - b.Skipf("NATS not available: %v", err) - } - defer nc.Close() - - streamName := fmt.Sprintf("bench-save-%d", time.Now().UnixNano()) - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - b.Fatalf("failed to create store: %v", err) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - event := &aether.Event{ - ID: fmt.Sprintf("evt-%d", i), - EventType: "BenchmarkEvent", - ActorID: "actor-bench", - Version: int64(i + 1), - Data: map[string]interface{}{"value": i}, - Timestamp: time.Now(), - } - if err := store.SaveEvent(event); err != nil { - b.Fatalf("SaveEvent failed: %v", err) - } - } -} - -func BenchmarkJetStreamEventStore_GetEvents(b *testing.B) { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - b.Skipf("NATS not available: %v", err) - } - defer nc.Close() - - streamName := fmt.Sprintf("bench-get-%d", time.Now().UnixNano()) - defer cleanupStream(nc, streamName) - - store, err := NewJetStreamEventStore(nc, streamName) - if err != nil { - b.Fatalf("failed to create store: %v", err) - } - - // Pre-populate with events - for i := 0; i < 100; i++ { - event := &aether.Event{ - ID: fmt.Sprintf("evt-%d", i), - EventType: "BenchmarkEvent", - ActorID: "actor-bench", - Version: int64(i + 1), - Data: map[string]interface{}{}, - Timestamp: time.Now(), - } - if err := store.SaveEvent(event); err != nil { - b.Fatalf("SaveEvent failed: %v", err) - } - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := store.GetEvents("actor-bench", 0) - if err != nil { - b.Fatalf("GetEvents failed: %v", err) - } - } -}