Files
aether/store/jetstream_integration_test.go
Claude Code fd1938672e
Some checks failed
CI / build (pull_request) Successful in 19s
CI / integration (pull_request) Failing after 2m0s
fix: address review feedback on cache invalidation
- Fix cache not repopulated after invalidation: Always update cache with fresh data instead of just deleting on mismatch
- Fix race condition: Hold mutex lock during entire fetch operation to prevent SaveEvent from running between fetch and cache update
- Improve test: Add second GetLatestVersion call to verify cache was properly repopulated after invalidation

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 01:31:03 +01:00

1539 lines
40 KiB
Go

//go:build integration
package store
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go"
)
// These integration tests require a running NATS server with JetStream enabled.
// Run with: go test -tags=integration -v ./store/...
//
// To start NATS with JetStream: nats-server -js
// getTestNATSConnection creates a new NATS connection for testing.
// Returns nil if NATS is not available, allowing tests to skip gracefully.
func getTestNATSConnection(t *testing.T) *nats.Conn {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Skipf("NATS not available: %v (run 'nats-server -js' to enable integration tests)", err)
return nil
}
// Verify JetStream is available
js, err := nc.JetStream()
if err != nil {
nc.Close()
t.Skipf("JetStream not available: %v (run 'nats-server -js' to enable integration tests)", err)
return nil
}
// Test JetStream connectivity
_, err = js.AccountInfo()
if err != nil {
nc.Close()
t.Skipf("JetStream not enabled: %v (run 'nats-server -js' to enable integration tests)", err)
return nil
}
return nc
}
// uniqueStreamName generates a unique stream name for test isolation
func uniqueStreamName(prefix string) string {
return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
}
// cleanupStream deletes a stream to clean up after tests
func cleanupStream(nc *nats.Conn, streamName string) {
js, err := nc.JetStream()
if err != nil {
return
}
_ = js.DeleteStream(streamName)
}
// === Stream Creation and Configuration Tests ===
func TestJetStreamEventStore_StreamCreation(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-stream-creation")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create JetStreamEventStore: %v", err)
}
if store == nil {
t.Fatal("expected non-nil store")
}
// Verify stream was created
js, _ := nc.JetStream()
info, err := js.StreamInfo(streamName)
if err != nil {
t.Fatalf("stream was not created: %v", err)
}
if info.Config.Name != streamName {
t.Errorf("stream name mismatch: got %q, want %q", info.Config.Name, streamName)
}
// Verify stream has correct subjects
expectedSubjects := []string{
fmt.Sprintf("%s.events.>", streamName),
fmt.Sprintf("%s.snapshots.>", streamName),
}
for _, expected := range expectedSubjects {
found := false
for _, subject := range info.Config.Subjects {
if subject == expected {
found = true
break
}
}
if !found {
t.Errorf("expected subject %q not found in stream config", expected)
}
}
}
func TestJetStreamEventStore_StreamCreationWithConfig(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-stream-config")
defer cleanupStream(nc, streamName)
config := JetStreamConfig{
StreamRetention: 7 * 24 * time.Hour, // 7 days
ReplicaCount: 1,
}
store, err := NewJetStreamEventStoreWithConfig(nc, streamName, config)
if err != nil {
t.Fatalf("failed to create JetStreamEventStore with config: %v", err)
}
if store == nil {
t.Fatal("expected non-nil store")
}
// Verify stream configuration
js, _ := nc.JetStream()
info, err := js.StreamInfo(streamName)
if err != nil {
t.Fatalf("stream was not created: %v", err)
}
if info.Config.MaxAge != 7*24*time.Hour {
t.Errorf("MaxAge mismatch: got %v, want %v", info.Config.MaxAge, 7*24*time.Hour)
}
}
func TestJetStreamEventStore_StreamCreationWithNamespace(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
baseName := uniqueStreamName("test-ns")
namespace := "tenant-abc"
expectedStreamName := fmt.Sprintf("%s_%s", namespace, baseName)
defer cleanupStream(nc, expectedStreamName)
store, err := NewJetStreamEventStoreWithNamespace(nc, baseName, namespace)
if err != nil {
t.Fatalf("failed to create JetStreamEventStore with namespace: %v", err)
}
if store.GetNamespace() != namespace {
t.Errorf("namespace mismatch: got %q, want %q", store.GetNamespace(), namespace)
}
if store.GetStreamName() != expectedStreamName {
t.Errorf("stream name mismatch: got %q, want %q", store.GetStreamName(), expectedStreamName)
}
// Verify namespaced stream was created
js, _ := nc.JetStream()
info, err := js.StreamInfo(expectedStreamName)
if err != nil {
t.Fatalf("namespaced stream was not created: %v", err)
}
if info.Config.Name != expectedStreamName {
t.Errorf("stream name mismatch: got %q, want %q", info.Config.Name, expectedStreamName)
}
}
func TestJetStreamEventStore_StreamAlreadyExists(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-stream-exists")
defer cleanupStream(nc, streamName)
// Create first store (creates stream)
store1, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create first store: %v", err)
}
if store1 == nil {
t.Fatal("expected non-nil store")
}
// Create second store with same stream name (should reuse existing stream)
store2, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create second store with existing stream: %v", err)
}
if store2 == nil {
t.Fatal("expected non-nil store")
}
}
// === SaveEvent Tests ===
func TestJetStreamEventStore_SaveEvent_PersistsToJetStream(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-save-event")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
event := &aether.Event{
ID: "evt-123",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{
"total": 100.50,
"currency": "USD",
},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Verify event was persisted by retrieving it
events, err := store.GetEvents("order-456", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 1 {
t.Fatalf("expected 1 event, got %d", len(events))
}
retrieved := events[0]
if retrieved.ID != event.ID {
t.Errorf("ID mismatch: got %q, want %q", retrieved.ID, event.ID)
}
if retrieved.EventType != event.EventType {
t.Errorf("EventType mismatch: got %q, want %q", retrieved.EventType, event.EventType)
}
if retrieved.ActorID != event.ActorID {
t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, event.ActorID)
}
if retrieved.Version != event.Version {
t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, event.Version)
}
}
func TestJetStreamEventStore_SaveEvent_MultipleEvents(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-multi-events")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
// Save multiple events
for i := 1; i <= 10; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "OrderUpdated",
ActorID: "order-456",
Version: int64(i),
Data: map[string]interface{}{"update": i},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent failed for event %d: %v", i, err)
}
}
events, err := store.GetEvents("order-456", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 10 {
t.Errorf("expected 10 events, got %d", len(events))
}
}
func TestJetStreamEventStore_SaveEvent_WithMetadata(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-event-metadata")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
event := &aether.Event{
ID: "evt-meta",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
event.SetCorrelationID("corr-123")
event.SetCausationID("cause-456")
event.SetUserID("user-789")
event.SetTraceID("trace-abc")
event.SetSpanID("span-def")
err = store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
events, err := store.GetEvents("order-456", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 1 {
t.Fatalf("expected 1 event, got %d", len(events))
}
retrieved := events[0]
if retrieved.GetCorrelationID() != "corr-123" {
t.Errorf("correlationId mismatch: got %q", retrieved.GetCorrelationID())
}
if retrieved.GetCausationID() != "cause-456" {
t.Errorf("causationId mismatch: got %q", retrieved.GetCausationID())
}
if retrieved.GetUserID() != "user-789" {
t.Errorf("userId mismatch: got %q", retrieved.GetUserID())
}
if retrieved.GetTraceID() != "trace-abc" {
t.Errorf("traceId mismatch: got %q", retrieved.GetTraceID())
}
if retrieved.GetSpanID() != "span-def" {
t.Errorf("spanId mismatch: got %q", retrieved.GetSpanID())
}
}
func TestJetStreamEventStore_SaveEvent_Deduplication(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-dedup")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
event := &aether.Event{
ID: "evt-dedup-test",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
// Save the same event twice (same event ID)
err = store.SaveEvent(event)
if err != nil {
t.Fatalf("first SaveEvent failed: %v", err)
}
// Second save with same event ID and same version should fail with version conflict
// because version 1 already exists
err = store.SaveEvent(event)
if err == nil {
// If no error, that's a problem - we expected version conflict
t.Error("expected error when saving duplicate event, got nil")
} else if !errors.Is(err, aether.ErrVersionConflict) {
t.Errorf("expected version conflict error, got: %v", err)
}
}
func TestJetStreamEventStore_SaveEvent_VersionConflict(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-version-conflict")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
// Save first event with version 5
event1 := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 5,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event1); err != nil {
t.Fatalf("SaveEvent failed for first event: %v", err)
}
// Attempt to save event with lower version (should fail)
event2 := &aether.Event{
ID: "evt-2",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 3,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2)
if err == nil {
t.Fatal("expected error when saving event with lower version, got nil")
}
if !errors.Is(err, aether.ErrVersionConflict) {
t.Errorf("expected ErrVersionConflict, got %v", err)
}
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
t.Fatalf("expected VersionConflictError, got %T", err)
}
if versionErr.ActorID != "actor-123" {
t.Errorf("ActorID mismatch: got %q, want %q", versionErr.ActorID, "actor-123")
}
if versionErr.CurrentVersion != 5 {
t.Errorf("CurrentVersion mismatch: got %d, want %d", versionErr.CurrentVersion, 5)
}
if versionErr.AttemptedVersion != 3 {
t.Errorf("AttemptedVersion mismatch: got %d, want %d", versionErr.AttemptedVersion, 3)
}
}
func TestJetStreamEventStore_SaveEvent_VersionConflictEqual(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-version-equal")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
// Save first event with version 5
event1 := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 5,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event1); err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Attempt to save event with equal version (should fail)
event2 := &aether.Event{
ID: "evt-2",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 5,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2)
if err == nil {
t.Fatal("expected error when saving event with equal version, got nil")
}
if !errors.Is(err, aether.ErrVersionConflict) {
t.Errorf("expected ErrVersionConflict, got %v", err)
}
}
// === GetEvents Tests ===
func TestJetStreamEventStore_GetEvents_RetrievesInOrder(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-get-order")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
// Save events in order
for i := 1; i <= 10; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "TestEvent",
ActorID: "actor-123",
Version: int64(i),
Data: map[string]interface{}{"index": i},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
}
events, err := store.GetEvents("actor-123", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 10 {
t.Fatalf("expected 10 events, got %d", len(events))
}
// Verify order
for i, event := range events {
expectedID := fmt.Sprintf("evt-%d", i+1)
if event.ID != expectedID {
t.Errorf("event %d: got ID %q, want %q", i, event.ID, expectedID)
}
expectedVersion := int64(i + 1)
if event.Version != expectedVersion {
t.Errorf("event %d: got Version %d, want %d", i, event.Version, expectedVersion)
}
}
}
func TestJetStreamEventStore_GetEvents_FromVersionFilters(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-from-version")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
// Save events with versions 1-10
for i := 1; i <= 10; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "TestEvent",
ActorID: "actor-123",
Version: int64(i),
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
}
testCases := []struct {
name string
fromVersion int64
expectedLen int
minVersion int64
}{
{"from version 0", 0, 10, 1},
{"from version 5", 5, 5, 6},
{"from version 10", 10, 0, 0},
{"from version 11", 11, 0, 0},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
events, err := store.GetEvents("actor-123", tc.fromVersion)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != tc.expectedLen {
t.Errorf("expected %d events, got %d", tc.expectedLen, len(events))
}
// Verify all returned events have version > fromVersion
for _, event := range events {
if event.Version <= tc.fromVersion {
t.Errorf("event version %d is not greater than fromVersion %d", event.Version, tc.fromVersion)
}
}
})
}
}
func TestJetStreamEventStore_GetEvents_NonExistentActor(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-nonexistent")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
events, err := store.GetEvents("non-existent-actor", 0)
if err != nil {
t.Fatalf("GetEvents should not error for non-existent actor: %v", err)
}
if len(events) != 0 {
t.Errorf("expected 0 events for non-existent actor, got %d", len(events))
}
}
func TestJetStreamEventStore_GetEvents_MultipleActors(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-multi-actors")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
// Save events for different actors
actors := []string{"actor-1", "actor-2", "actor-3"}
for _, actorID := range actors {
for i := 1; i <= 3; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, i),
EventType: "TestEvent",
ActorID: actorID,
Version: int64(i),
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
}
}
// Verify each actor has its own events
for _, actorID := range actors {
events, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents failed for %s: %v", actorID, err)
}
if len(events) != 3 {
t.Errorf("expected 3 events for %s, got %d", actorID, len(events))
}
for _, event := range events {
if event.ActorID != actorID {
t.Errorf("event has wrong ActorID: got %q, want %q", event.ActorID, actorID)
}
}
}
}
func TestJetStreamEventStore_GetEventsWithErrors(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-with-errors")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
// Save valid events
for i := 1; i <= 5; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "TestEvent",
ActorID: "actor-123",
Version: int64(i),
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
}
result, err := store.GetEventsWithErrors("actor-123", 0)
if err != nil {
t.Fatalf("GetEventsWithErrors failed: %v", err)
}
if len(result.Events) != 5 {
t.Errorf("expected 5 events, got %d", len(result.Events))
}
if result.HasErrors() {
t.Errorf("expected no errors, got %d", len(result.Errors))
}
}
// === GetLatestVersion Tests ===
func TestJetStreamEventStore_GetLatestVersion(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-latest-version")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
// Save events with versions 1-5
for i := 1; i <= 5; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "TestEvent",
ActorID: "actor-123",
Version: int64(i),
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
}
latestVersion, err := store.GetLatestVersion("actor-123")
if err != nil {
t.Fatalf("GetLatestVersion failed: %v", err)
}
if latestVersion != 5 {
t.Errorf("expected latest version 5, got %d", latestVersion)
}
}
func TestJetStreamEventStore_GetLatestVersion_NonExistentActor(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-latest-nonexistent")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
latestVersion, err := store.GetLatestVersion("non-existent-actor")
if err != nil {
t.Fatalf("GetLatestVersion should not error for non-existent actor: %v", err)
}
if latestVersion != 0 {
t.Errorf("expected version 0 for non-existent actor, got %d", latestVersion)
}
}
func TestJetStreamEventStore_GetLatestVersion_UpdatesAfterNewEvent(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-latest-updates")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
// Save first event
event1 := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event1); err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
version1, err := store.GetLatestVersion("actor-123")
if err != nil {
t.Fatalf("GetLatestVersion failed: %v", err)
}
if version1 != 1 {
t.Errorf("expected version 1, got %d", version1)
}
// Save second event
event2 := &aether.Event{
ID: "evt-2",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 10,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event2); err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
version2, err := store.GetLatestVersion("actor-123")
if err != nil {
t.Fatalf("GetLatestVersion failed: %v", err)
}
if version2 != 10 {
t.Errorf("expected version 10, got %d", version2)
}
}
// === Snapshot Tests ===
func TestJetStreamEventStore_SaveAndGetSnapshot(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-snapshot")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
snapshot := &aether.ActorSnapshot{
ActorID: "actor-123",
Version: 10,
State: map[string]interface{}{
"balance": 100.50,
"status": "active",
},
Timestamp: time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC),
}
err = store.SaveSnapshot(snapshot)
if err != nil {
t.Fatalf("SaveSnapshot failed: %v", err)
}
retrieved, err := store.GetLatestSnapshot("actor-123")
if err != nil {
t.Fatalf("GetLatestSnapshot failed: %v", err)
}
if retrieved.ActorID != snapshot.ActorID {
t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, snapshot.ActorID)
}
if retrieved.Version != snapshot.Version {
t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, snapshot.Version)
}
// Check state values (JSON unmarshaling may change types)
balance, ok := retrieved.State["balance"].(float64)
if !ok {
t.Errorf("balance is not float64: %T", retrieved.State["balance"])
} else if balance != 100.50 {
t.Errorf("balance mismatch: got %v, want %v", balance, 100.50)
}
}
func TestJetStreamEventStore_GetLatestSnapshot_MultipleSnapshots(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-multi-snapshot")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
// Save multiple snapshots
for i := 1; i <= 5; i++ {
snapshot := &aether.ActorSnapshot{
ActorID: "actor-123",
Version: int64(i * 10),
State: map[string]interface{}{
"iteration": i,
},
Timestamp: time.Now(),
}
if err := store.SaveSnapshot(snapshot); err != nil {
t.Fatalf("SaveSnapshot failed for version %d: %v", i*10, err)
}
}
// Get latest should return the most recently saved
retrieved, err := store.GetLatestSnapshot("actor-123")
if err != nil {
t.Fatalf("GetLatestSnapshot failed: %v", err)
}
if retrieved.Version != 50 {
t.Errorf("expected version 50, got %d", retrieved.Version)
}
}
func TestJetStreamEventStore_GetLatestSnapshot_NonExistent(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-snapshot-nonexistent")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
_, err = store.GetLatestSnapshot("non-existent-actor")
if err == nil {
t.Error("expected error when getting snapshot for non-existent actor")
}
}
func TestJetStreamEventStore_SnapshotWithComplexState(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-snapshot-complex")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
complexState := map[string]interface{}{
"string": "hello",
"integer": float64(42), // JSON numbers are float64
"float": 3.14159,
"boolean": true,
"null": nil,
"array": []interface{}{"a", "b", "c"},
"nested": map[string]interface{}{
"level1": map[string]interface{}{
"level2": "deep value",
},
},
}
snapshot := &aether.ActorSnapshot{
ActorID: "actor-complex",
Version: 1,
State: complexState,
Timestamp: time.Now(),
}
if err := store.SaveSnapshot(snapshot); err != nil {
t.Fatalf("SaveSnapshot failed: %v", err)
}
retrieved, err := store.GetLatestSnapshot("actor-complex")
if err != nil {
t.Fatalf("GetLatestSnapshot failed: %v", err)
}
// Verify fields
if retrieved.State["string"] != "hello" {
t.Errorf("string mismatch: got %v", retrieved.State["string"])
}
if retrieved.State["boolean"] != true {
t.Errorf("boolean mismatch: got %v", retrieved.State["boolean"])
}
}
// === Namespace Isolation Tests ===
func TestJetStreamEventStore_NamespaceIsolation(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
baseName := uniqueStreamName("test-isolation")
ns1 := "tenant-a"
ns2 := "tenant-b"
expectedStream1 := fmt.Sprintf("%s_%s", ns1, baseName)
expectedStream2 := fmt.Sprintf("%s_%s", ns2, baseName)
defer cleanupStream(nc, expectedStream1)
defer cleanupStream(nc, expectedStream2)
store1, err := NewJetStreamEventStoreWithNamespace(nc, baseName, ns1)
if err != nil {
t.Fatalf("failed to create store1: %v", err)
}
store2, err := NewJetStreamEventStoreWithNamespace(nc, baseName, ns2)
if err != nil {
t.Fatalf("failed to create store2: %v", err)
}
// Save events to namespace 1
event1 := &aether.Event{
ID: "evt-ns1",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 1,
Data: map[string]interface{}{"namespace": "tenant-a"},
Timestamp: time.Now(),
}
if err := store1.SaveEvent(event1); err != nil {
t.Fatalf("SaveEvent failed for ns1: %v", err)
}
// Save events to namespace 2
event2 := &aether.Event{
ID: "evt-ns2",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 1,
Data: map[string]interface{}{"namespace": "tenant-b"},
Timestamp: time.Now(),
}
if err := store2.SaveEvent(event2); err != nil {
t.Fatalf("SaveEvent failed for ns2: %v", err)
}
// Verify isolation: store1 only sees tenant-a events
events1, err := store1.GetEvents("actor-123", 0)
if err != nil {
t.Fatalf("GetEvents failed for store1: %v", err)
}
if len(events1) != 1 {
t.Errorf("store1: expected 1 event, got %d", len(events1))
}
if events1[0].Data["namespace"] != "tenant-a" {
t.Errorf("store1: got event from wrong namespace: %v", events1[0].Data["namespace"])
}
// Verify isolation: store2 only sees tenant-b events
events2, err := store2.GetEvents("actor-123", 0)
if err != nil {
t.Fatalf("GetEvents failed for store2: %v", err)
}
if len(events2) != 1 {
t.Errorf("store2: expected 1 event, got %d", len(events2))
}
if events2[0].Data["namespace"] != "tenant-b" {
t.Errorf("store2: got event from wrong namespace: %v", events2[0].Data["namespace"])
}
}
// === Concurrency Tests ===
func TestJetStreamEventStore_ConcurrentWrites(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-concurrent")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
numGoroutines := 10
eventsPerGoroutine := 10
var wg sync.WaitGroup
wg.Add(numGoroutines)
for g := 0; g < numGoroutines; g++ {
go func(goroutineID int) {
defer wg.Done()
actorID := fmt.Sprintf("actor-%d", goroutineID)
for i := 1; i <= eventsPerGoroutine; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d-%d", goroutineID, i),
EventType: "TestEvent",
ActorID: actorID,
Version: int64(i),
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
t.Errorf("SaveEvent failed: %v", err)
}
}
}(g)
}
wg.Wait()
// Verify each actor has all events
for g := 0; g < numGoroutines; g++ {
actorID := fmt.Sprintf("actor-%d", g)
events, err := store.GetEvents(actorID, 0)
if err != nil {
t.Errorf("GetEvents failed for %s: %v", actorID, err)
continue
}
if len(events) != eventsPerGoroutine {
t.Errorf("expected %d events for %s, got %d", eventsPerGoroutine, actorID, len(events))
}
}
}
func TestJetStreamEventStore_ConcurrentVersionConflict(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-concurrent-conflict")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
numGoroutines := 50
var successCount int64
var conflictCount int64
var wg sync.WaitGroup
// All goroutines try to save version 1
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func(id int) {
defer wg.Done()
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", id),
EventType: "TestEvent",
ActorID: "actor-contested",
Version: 1,
Data: map[string]interface{}{"goroutine": id},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err == nil {
atomic.AddInt64(&successCount, 1)
} else if errors.Is(err, aether.ErrVersionConflict) {
atomic.AddInt64(&conflictCount, 1)
} else {
t.Errorf("unexpected error: %v", err)
}
}(i)
}
wg.Wait()
// Exactly one should succeed
if successCount != 1 {
t.Errorf("expected exactly 1 success, got %d", successCount)
}
if conflictCount != int64(numGoroutines-1) {
t.Errorf("expected %d conflicts, got %d", numGoroutines-1, conflictCount)
}
// Verify only one event was stored
events, err := store.GetEvents("actor-contested", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 1 {
t.Errorf("expected 1 event, got %d", len(events))
}
}
// === Connection Loss/Recovery Tests ===
func TestJetStreamEventStore_PersistenceAcrossConnections(t *testing.T) {
nc1 := getTestNATSConnection(t)
streamName := uniqueStreamName("test-persistence")
defer func() {
nc := getTestNATSConnection(t)
cleanupStream(nc, streamName)
nc.Close()
}()
// Create store and save events with first connection
store1, err := NewJetStreamEventStore(nc1, streamName)
if err != nil {
t.Fatalf("failed to create store1: %v", err)
}
for i := 1; i <= 5; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "TestEvent",
ActorID: "actor-123",
Version: int64(i),
Data: map[string]interface{}{"index": i},
Timestamp: time.Now(),
}
if err := store1.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
}
// Close first connection
nc1.Close()
// Create new connection and store
nc2 := getTestNATSConnection(t)
defer nc2.Close()
store2, err := NewJetStreamEventStore(nc2, streamName)
if err != nil {
t.Fatalf("failed to create store2: %v", err)
}
// Verify events are still there
events, err := store2.GetEvents("actor-123", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 5 {
t.Errorf("expected 5 events after reconnection, got %d", len(events))
}
// Verify we can continue adding events
event6 := &aether.Event{
ID: "evt-6",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 6,
Data: map[string]interface{}{"index": 6},
Timestamp: time.Now(),
}
if err := store2.SaveEvent(event6); err != nil {
t.Fatalf("SaveEvent failed after reconnection: %v", err)
}
events, err = store2.GetEvents("actor-123", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 6 {
t.Errorf("expected 6 events after adding one, got %d", len(events))
}
}
func TestJetStreamEventStore_MultipleStoreInstances(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-multi-instance")
defer cleanupStream(nc, streamName)
// Create multiple store instances on the same stream
store1, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store1: %v", err)
}
store2, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store2: %v", err)
}
// Save events from store1
event1 := &aether.Event{
ID: "evt-from-store1",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 1,
Data: map[string]interface{}{"source": "store1"},
Timestamp: time.Now(),
}
if err := store1.SaveEvent(event1); err != nil {
t.Fatalf("SaveEvent from store1 failed: %v", err)
}
// Read from store2
events, err := store2.GetEvents("actor-123", 0)
if err != nil {
t.Fatalf("GetEvents from store2 failed: %v", err)
}
if len(events) != 1 {
t.Errorf("store2 should see event from store1, got %d events", len(events))
}
// Save from store2 (continuing version sequence)
event2 := &aether.Event{
ID: "evt-from-store2",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 2,
Data: map[string]interface{}{"source": "store2"},
Timestamp: time.Now(),
}
if err := store2.SaveEvent(event2); err != nil {
t.Fatalf("SaveEvent from store2 failed: %v", err)
}
// Read from store1
events, err = store1.GetEvents("actor-123", 0)
if err != nil {
t.Fatalf("GetEvents from store1 failed: %v", err)
}
if len(events) != 2 {
t.Errorf("store1 should see both events, got %d events", len(events))
}
}
// === Cache Invalidation Tests ===
func TestJetStreamEventStore_CacheInvalidationOnExternalWrite(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-cache-invalidation")
defer cleanupStream(nc, streamName)
// Create two stores for the same stream
store1, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store1: %v", err)
}
store2, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store2: %v", err)
}
actorID := "actor-cache-test"
// store1: Save event v1 (caches version 1)
event1 := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store1.SaveEvent(event1); err != nil {
t.Fatalf("SaveEvent from store1 failed: %v", err)
}
// Verify store1 sees version 1 (uses cache)
v1, err := store1.GetLatestVersion(actorID)
if err != nil {
t.Fatalf("GetLatestVersion from store1 failed: %v", err)
}
if v1 != 1 {
t.Errorf("store1 should see version 1, got %d", v1)
}
// store2: Save event v2 (external write from store1's perspective)
event2 := &aether.Event{
ID: "evt-2",
EventType: "TestEvent",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store2.SaveEvent(event2); err != nil {
t.Fatalf("SaveEvent from store2 failed: %v", err)
}
// store1: GetLatestVersion should detect external write and return v2
// (This triggers cache invalidation because actual version > cached version)
v2, err := store1.GetLatestVersion(actorID)
if err != nil {
t.Fatalf("GetLatestVersion from store1 (after external write) failed: %v", err)
}
if v2 != 2 {
t.Errorf("store1 should see version 2 after external write, got %d", v2)
}
// Verify cache was repopulated - second GetLatestVersion should use cache efficiently
v2Again, err := store1.GetLatestVersion(actorID)
if err != nil {
t.Fatalf("Second GetLatestVersion from store1 failed: %v", err)
}
if v2Again != 2 {
t.Errorf("store1 cache should have version 2, got %d", v2Again)
}
// store2: Save event v3 (another external write)
event3 := &aether.Event{
ID: "evt-3",
EventType: "TestEvent",
ActorID: actorID,
Version: 3,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store2.SaveEvent(event3); err != nil {
t.Fatalf("SaveEvent from store2 (v3) failed: %v", err)
}
// store1: After cache invalidation, SaveEvent should use fresh data from JetStream
event4 := &aether.Event{
ID: "evt-4",
EventType: "TestEvent",
ActorID: actorID,
Version: 4,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store1.SaveEvent(event4); err != nil {
t.Fatalf("SaveEvent from store1 (after cache invalidation) failed: %v", err)
}
// Verify all 4 events are persisted
events, err := store1.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 4 {
t.Errorf("expected 4 events after cache invalidation, got %d", len(events))
}
}
// === Interface Compliance Tests ===
func TestJetStreamEventStore_ImplementsEventStore(t *testing.T) {
var _ aether.EventStore = (*JetStreamEventStore)(nil)
}
func TestJetStreamEventStore_ImplementsEventStoreWithErrors(t *testing.T) {
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)
}
func TestJetStreamEventStore_ImplementsSnapshotStore(t *testing.T) {
nc := getTestNATSConnection(t)
defer nc.Close()
streamName := uniqueStreamName("test-interface")
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
// Verify it has all SnapshotStore methods
_ = store.SaveEvent
_ = store.GetEvents
_ = store.GetLatestVersion
_ = store.GetLatestSnapshot
_ = store.SaveSnapshot
}
// === Benchmarks ===
func BenchmarkJetStreamEventStore_SaveEvent(b *testing.B) {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
b.Skipf("NATS not available: %v", err)
}
defer nc.Close()
streamName := fmt.Sprintf("bench-save-%d", time.Now().UnixNano())
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
b.Fatalf("failed to create store: %v", err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "BenchmarkEvent",
ActorID: "actor-bench",
Version: int64(i + 1),
Data: map[string]interface{}{"value": i},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
b.Fatalf("SaveEvent failed: %v", err)
}
}
}
func BenchmarkJetStreamEventStore_GetEvents(b *testing.B) {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
b.Skipf("NATS not available: %v", err)
}
defer nc.Close()
streamName := fmt.Sprintf("bench-get-%d", time.Now().UnixNano())
defer cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
if err != nil {
b.Fatalf("failed to create store: %v", err)
}
// Pre-populate with events
for i := 0; i < 100; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "BenchmarkEvent",
ActorID: "actor-bench",
Version: int64(i + 1),
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
b.Fatalf("SaveEvent failed: %v", err)
}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := store.GetEvents("actor-bench", 0)
if err != nil {
b.Fatalf("GetEvents failed: %v", err)
}
}
}