From a269da4520f853cf1bdc152d99240c3756a48cd5 Mon Sep 17 00:00:00 2001 From: Hugo Nijhuis Date: Fri, 9 Jan 2026 16:37:23 +0000 Subject: [PATCH] [Issue #4] Add SnapshotStore unit tests (#31) --- store/memory.go | 45 ++- store/memory_test.go | 645 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 686 insertions(+), 4 deletions(-) diff --git a/store/memory.go b/store/memory.go index 4108b7b..21c5b14 100644 --- a/store/memory.go +++ b/store/memory.go @@ -1,6 +1,7 @@ package store import ( + "fmt" "sync" "git.flowmade.one/flowmade-one/aether" @@ -8,14 +9,16 @@ import ( // InMemoryEventStore provides a simple in-memory event store for testing type InMemoryEventStore struct { - mu sync.RWMutex - events map[string][]*aether.Event // actorID -> events + mu sync.RWMutex + events map[string][]*aether.Event // actorID -> events + snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version) } // NewInMemoryEventStore creates a new in-memory event store func NewInMemoryEventStore() *InMemoryEventStore { return &InMemoryEventStore{ - events: make(map[string][]*aether.Event), + events: make(map[string][]*aether.Event), + snapshots: make(map[string][]*aether.ActorSnapshot), } } @@ -70,3 +73,39 @@ func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) { return latestVersion, nil } + +// SaveSnapshot saves a snapshot to the in-memory store +func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error { + if snapshot == nil { + return fmt.Errorf("snapshot cannot be nil") + } + + es.mu.Lock() + defer es.mu.Unlock() + + if _, exists := es.snapshots[snapshot.ActorID]; !exists { + es.snapshots[snapshot.ActorID] = make([]*aether.ActorSnapshot, 0) + } + es.snapshots[snapshot.ActorID] = append(es.snapshots[snapshot.ActorID], snapshot) + return nil +} + +// GetLatestSnapshot returns the most recent snapshot for an actor +func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) { + es.mu.RLock() + defer es.mu.RUnlock() + + snapshots, exists := es.snapshots[actorID] + if !exists || len(snapshots) == 0 { + return nil, nil + } + + var latest *aether.ActorSnapshot + for _, snapshot := range snapshots { + if latest == nil || snapshot.Version > latest.Version { + latest = snapshot + } + } + + return latest, nil +} diff --git a/store/memory_test.go b/store/memory_test.go index 6c75051..20b7639 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -9,6 +9,8 @@ import ( "git.flowmade.one/flowmade-one/aether" ) +// === Event Store Tests (from main branch) === + func TestNewInMemoryEventStore(t *testing.T) { store := NewInMemoryEventStore() @@ -756,7 +758,7 @@ func TestSaveEvent_SpecialActorIDs(t *testing.T) { "with:colons", "with/slashes", "user@example.com", - "unicode-\u4e16\u754c", + "unicode-δΈ–η•Œ", "", } @@ -845,3 +847,644 @@ func BenchmarkGetLatestVersion(b *testing.B) { store.GetLatestVersion("actor-123") } } + +// === Snapshot Store Tests (from PR branch) === + +func TestSaveSnapshot_PersistsCorrectly(t *testing.T) { + store := NewInMemoryEventStore() + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-123", + Version: 10, + State: map[string]interface{}{ + "balance": 100.50, + "status": "active", + }, + Timestamp: time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC), + } + + err := store.SaveSnapshot(snapshot) + if err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + // Verify snapshot was persisted by retrieving it + retrieved, err := store.GetLatestSnapshot("actor-123") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved == nil { + t.Fatal("expected snapshot to be persisted, got nil") + } + + if retrieved.ActorID != snapshot.ActorID { + t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, snapshot.ActorID) + } + if retrieved.Version != snapshot.Version { + t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, snapshot.Version) + } + if retrieved.State["balance"] != snapshot.State["balance"] { + t.Errorf("State.balance mismatch: got %v, want %v", retrieved.State["balance"], snapshot.State["balance"]) + } + if retrieved.State["status"] != snapshot.State["status"] { + t.Errorf("State.status mismatch: got %v, want %v", retrieved.State["status"], snapshot.State["status"]) + } + if !retrieved.Timestamp.Equal(snapshot.Timestamp) { + t.Errorf("Timestamp mismatch: got %v, want %v", retrieved.Timestamp, snapshot.Timestamp) + } +} + +func TestSaveSnapshot_NilSnapshot(t *testing.T) { + store := NewInMemoryEventStore() + + err := store.SaveSnapshot(nil) + if err == nil { + t.Error("expected error when saving nil snapshot, got nil") + } +} + +func TestSaveSnapshot_MultipleSnapshots(t *testing.T) { + store := NewInMemoryEventStore() + + // Save multiple snapshots for the same actor + for i := 1; i <= 5; i++ { + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-multi", + Version: int64(i * 10), + State: map[string]interface{}{ + "iteration": i, + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed for version %d: %v", i*10, err) + } + } + + // Verify all snapshots were saved by checking the latest + retrieved, err := store.GetLatestSnapshot("actor-multi") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved.Version != 50 { + t.Errorf("expected latest version 50, got %d", retrieved.Version) + } +} + +func TestGetLatestSnapshot_ReturnsMostRecent(t *testing.T) { + store := NewInMemoryEventStore() + + // Save snapshots in non-sequential order to test version comparison + versions := []int64{5, 15, 10, 25, 20} + for _, v := range versions { + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-latest", + Version: v, + State: map[string]interface{}{ + "version": v, + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed for version %d: %v", v, err) + } + } + + latest, err := store.GetLatestSnapshot("actor-latest") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if latest == nil { + t.Fatal("expected snapshot, got nil") + } + + if latest.Version != 25 { + t.Errorf("expected latest version 25, got %d", latest.Version) + } + + // Verify the state matches the snapshot with version 25 + if latest.State["version"].(int64) != 25 { + t.Errorf("expected state.version to be 25, got %v", latest.State["version"]) + } +} + +func TestGetLatestSnapshot_NoSnapshotExists(t *testing.T) { + store := NewInMemoryEventStore() + + // Query for a non-existent actor + snapshot, err := store.GetLatestSnapshot("non-existent-actor") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if snapshot != nil { + t.Errorf("expected nil for non-existent actor, got %+v", snapshot) + } +} + +func TestGetLatestSnapshot_EmptyActorID(t *testing.T) { + store := NewInMemoryEventStore() + + // Save a snapshot with empty actor ID + snapshot := &aether.ActorSnapshot{ + ActorID: "", + Version: 1, + State: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + // Retrieve with empty actor ID + retrieved, err := store.GetLatestSnapshot("") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved == nil { + t.Fatal("expected snapshot with empty actorID, got nil") + } +} + +func TestSnapshotVersioning_RespectedAcrossActors(t *testing.T) { + store := NewInMemoryEventStore() + + // Save snapshots for different actors + actors := []string{"actor-a", "actor-b", "actor-c"} + for i, actorID := range actors { + snapshot := &aether.ActorSnapshot{ + ActorID: actorID, + Version: int64((i + 1) * 100), // Different versions per actor + State: map[string]interface{}{ + "actor": actorID, + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed for %s: %v", actorID, err) + } + } + + // Verify each actor has their own snapshot with correct version + for i, actorID := range actors { + snapshot, err := store.GetLatestSnapshot(actorID) + if err != nil { + t.Fatalf("GetLatestSnapshot failed for %s: %v", actorID, err) + } + + expectedVersion := int64((i + 1) * 100) + if snapshot.Version != expectedVersion { + t.Errorf("actor %s: expected version %d, got %d", actorID, expectedVersion, snapshot.Version) + } + } +} + +func TestSnapshotVersioning_LowerVersionAfterHigher(t *testing.T) { + store := NewInMemoryEventStore() + + // Save a high version first + highSnapshot := &aether.ActorSnapshot{ + ActorID: "actor-order", + Version: 100, + State: map[string]interface{}{ + "marker": "high", + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(highSnapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + // Save a lower version after + lowSnapshot := &aether.ActorSnapshot{ + ActorID: "actor-order", + Version: 50, + State: map[string]interface{}{ + "marker": "low", + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(lowSnapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + // GetLatestSnapshot should return the higher version (100), not the most recently saved + latest, err := store.GetLatestSnapshot("actor-order") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if latest.Version != 100 { + t.Errorf("expected version 100, got %d", latest.Version) + } + if latest.State["marker"] != "high" { + t.Errorf("expected marker 'high', got %v", latest.State["marker"]) + } +} + +func TestSnapshotDataIntegrity_ComplexState(t *testing.T) { + store := NewInMemoryEventStore() + + complexState := map[string]interface{}{ + "string": "hello", + "integer": 42, + "float": 3.14159, + "boolean": true, + "null": nil, + "array": []interface{}{"a", "b", "c"}, + "nested": map[string]interface{}{ + "level1": map[string]interface{}{ + "level2": "deep value", + }, + }, + } + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-complex", + Version: 1, + State: complexState, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-complex") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + // Verify all fields + if retrieved.State["string"] != "hello" { + t.Errorf("string mismatch: got %v", retrieved.State["string"]) + } + if retrieved.State["integer"] != 42 { + t.Errorf("integer mismatch: got %v", retrieved.State["integer"]) + } + if retrieved.State["float"] != 3.14159 { + t.Errorf("float mismatch: got %v", retrieved.State["float"]) + } + if retrieved.State["boolean"] != true { + t.Errorf("boolean mismatch: got %v", retrieved.State["boolean"]) + } + if retrieved.State["null"] != nil { + t.Errorf("null mismatch: got %v", retrieved.State["null"]) + } + + // Verify array + arr, ok := retrieved.State["array"].([]interface{}) + if !ok { + t.Fatal("array is not []interface{}") + } + if len(arr) != 3 || arr[0] != "a" || arr[1] != "b" || arr[2] != "c" { + t.Errorf("array mismatch: got %v", arr) + } + + // Verify nested structure + nested, ok := retrieved.State["nested"].(map[string]interface{}) + if !ok { + t.Fatal("nested is not map[string]interface{}") + } + level1, ok := nested["level1"].(map[string]interface{}) + if !ok { + t.Fatal("level1 is not map[string]interface{}") + } + if level1["level2"] != "deep value" { + t.Errorf("nested value mismatch: got %v", level1["level2"]) + } +} + +func TestSnapshotDataIntegrity_SpecialCharacters(t *testing.T) { + store := NewInMemoryEventStore() + + specialState := map[string]interface{}{ + "unicode": "Hello, δΈ–η•Œ!", + "emoji": "πŸ˜€πŸš€", + "newlines": "line1\nline2\r\nline3", + "tabs": "col1\tcol2", + "quotes": `"double" and 'single'`, + "backslash": `path\to\file`, + } + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-special", + Version: 1, + State: specialState, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-special") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + for key, expected := range specialState { + if retrieved.State[key] != expected { + t.Errorf("State[%q] mismatch: got %q, want %q", key, retrieved.State[key], expected) + } + } +} + +func TestSnapshotDataIntegrity_EmptyState(t *testing.T) { + store := NewInMemoryEventStore() + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-empty", + Version: 1, + State: map[string]interface{}{}, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-empty") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if len(retrieved.State) != 0 { + t.Errorf("expected empty state, got %v", retrieved.State) + } +} + +func TestSnapshotDataIntegrity_NilState(t *testing.T) { + store := NewInMemoryEventStore() + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-nil", + Version: 1, + State: nil, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-nil") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved.State != nil { + t.Errorf("expected nil state, got %v", retrieved.State) + } +} + +func TestSnapshotDataIntegrity_LargeState(t *testing.T) { + store := NewInMemoryEventStore() + + // Create a large state with many entries using unique keys + largeState := make(map[string]interface{}) + for i := 0; i < 1000; i++ { + largeState[fmt.Sprintf("key-%d", i)] = i + } + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-large", + Version: 1, + State: largeState, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-large") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if len(retrieved.State) != len(largeState) { + t.Errorf("state size mismatch: got %d, want %d", len(retrieved.State), len(largeState)) + } +} + +func TestSnapshotDataIntegrity_TimestampPreserved(t *testing.T) { + store := NewInMemoryEventStore() + + // Test various timestamps + timestamps := []time.Time{ + time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC), + time.Date(2020, 6, 15, 23, 59, 59, 999999999, time.UTC), + time.Time{}, // Zero time + } + + for i, ts := range timestamps { + actorID := fmt.Sprintf("actor-ts-%d", i) + snapshot := &aether.ActorSnapshot{ + ActorID: actorID, + Version: 1, + State: map[string]interface{}{}, + Timestamp: ts, + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot(actorID) + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if !retrieved.Timestamp.Equal(ts) { + t.Errorf("timestamp %d mismatch: got %v, want %v", i, retrieved.Timestamp, ts) + } + } +} + +func TestSnapshotVersioning_ZeroVersion(t *testing.T) { + store := NewInMemoryEventStore() + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-zero-version", + Version: 0, + State: map[string]interface{}{"initial": true}, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-zero-version") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved.Version != 0 { + t.Errorf("expected version 0, got %d", retrieved.Version) + } +} + +func TestSnapshotVersioning_LargeVersion(t *testing.T) { + store := NewInMemoryEventStore() + + largeVersion := int64(9223372036854775807) // MaxInt64 + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-large-version", + Version: largeVersion, + State: map[string]interface{}{"maxed": true}, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-large-version") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved.Version != largeVersion { + t.Errorf("expected version %d, got %d", largeVersion, retrieved.Version) + } +} + +func TestSnapshotStore_ImplementsInterface(t *testing.T) { + // Verify InMemoryEventStore implements SnapshotStore interface + var _ aether.SnapshotStore = (*InMemoryEventStore)(nil) +} + +func TestConcurrentSaveSnapshot(t *testing.T) { + store := NewInMemoryEventStore() + + numGoroutines := 100 + snapshotsPerGoroutine := 10 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for g := 0; g < numGoroutines; g++ { + go func(goroutineID int) { + defer wg.Done() + for i := 0; i < snapshotsPerGoroutine; i++ { + snapshot := &aether.ActorSnapshot{ + ActorID: fmt.Sprintf("actor-%d", goroutineID), + Version: int64(i + 1), + State: map[string]interface{}{ + "goroutine": goroutineID, + "iteration": i, + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(snapshot); err != nil { + t.Errorf("SaveSnapshot failed: %v", err) + } + } + }(g) + } + + wg.Wait() + + // Verify each actor has snapshots + for g := 0; g < numGoroutines; g++ { + actorID := fmt.Sprintf("actor-%d", g) + snapshot, err := store.GetLatestSnapshot(actorID) + if err != nil { + t.Errorf("GetLatestSnapshot failed for %s: %v", actorID, err) + continue + } + if snapshot == nil { + t.Errorf("expected snapshot for %s, got nil", actorID) + continue + } + if snapshot.Version != int64(snapshotsPerGoroutine) { + t.Errorf("expected latest version %d for %s, got %d", snapshotsPerGoroutine, actorID, snapshot.Version) + } + } +} + +func TestConcurrentSaveAndGetSnapshot(t *testing.T) { + store := NewInMemoryEventStore() + + // Pre-populate with initial snapshot + initialSnapshot := &aether.ActorSnapshot{ + ActorID: "actor-123", + Version: 1, + State: map[string]interface{}{ + "initial": true, + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(initialSnapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + var wg sync.WaitGroup + numReaders := 50 + numWriters := 10 + readsPerReader := 100 + writesPerWriter := 10 + + // Start readers + wg.Add(numReaders) + for r := 0; r < numReaders; r++ { + go func() { + defer wg.Done() + for i := 0; i < readsPerReader; i++ { + snapshot, err := store.GetLatestSnapshot("actor-123") + if err != nil { + t.Errorf("GetLatestSnapshot failed: %v", err) + } + if snapshot == nil { + t.Error("expected snapshot, got nil") + } + } + }() + } + + // Start writers + wg.Add(numWriters) + for w := 0; w < numWriters; w++ { + go func(writerID int) { + defer wg.Done() + for i := 0; i < writesPerWriter; i++ { + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-123", + Version: int64(100 + writerID*writesPerWriter + i), + State: map[string]interface{}{ + "writer": writerID, + "index": i, + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(snapshot); err != nil { + t.Errorf("SaveSnapshot failed: %v", err) + } + } + }(w) + } + + wg.Wait() + + // Verify final state - should have the highest version + snapshot, err := store.GetLatestSnapshot("actor-123") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + if snapshot == nil { + t.Fatal("expected snapshot, got nil") + } + // The highest version should be around 100 + (numWriters-1)*writesPerWriter + (writesPerWriter-1) + // which is 100 + 9*10 + 9 = 199 + expectedMaxVersion := int64(100 + (numWriters-1)*writesPerWriter + (writesPerWriter - 1)) + if snapshot.Version != expectedMaxVersion { + t.Errorf("expected latest version %d, got %d", expectedMaxVersion, snapshot.Version) + } +}