diff --git a/store/memory.go b/store/memory.go index f0f2eb2..895a7fe 100644 --- a/store/memory.go +++ b/store/memory.go @@ -6,13 +6,15 @@ import ( // InMemoryEventStore provides a simple in-memory event store for testing type InMemoryEventStore struct { - events map[string][]*aether.Event // actorID -> events + events map[string][]*aether.Event // actorID -> events + snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version) } // NewInMemoryEventStore creates a new in-memory event store func NewInMemoryEventStore() *InMemoryEventStore { return &InMemoryEventStore{ - events: make(map[string][]*aether.Event), + events: make(map[string][]*aether.Event), + snapshots: make(map[string][]*aether.ActorSnapshot), } } @@ -58,3 +60,29 @@ func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) { return latestVersion, nil } + +// SaveSnapshot saves a snapshot to the in-memory store +func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error { + if _, exists := es.snapshots[snapshot.ActorID]; !exists { + es.snapshots[snapshot.ActorID] = make([]*aether.ActorSnapshot, 0) + } + es.snapshots[snapshot.ActorID] = append(es.snapshots[snapshot.ActorID], snapshot) + return nil +} + +// GetLatestSnapshot returns the most recent snapshot for an actor +func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) { + snapshots, exists := es.snapshots[actorID] + if !exists || len(snapshots) == 0 { + return nil, nil + } + + var latest *aether.ActorSnapshot + for _, snapshot := range snapshots { + if latest == nil || snapshot.Version > latest.Version { + latest = snapshot + } + } + + return latest, nil +} diff --git a/store/memory_test.go b/store/memory_test.go new file mode 100644 index 0000000..b098245 --- /dev/null +++ b/store/memory_test.go @@ -0,0 +1,510 @@ +package store + +import ( + "testing" + "time" + + "git.flowmade.one/flowmade-one/aether" +) + +func TestSaveSnapshot_PersistsCorrectly(t *testing.T) { + store := NewInMemoryEventStore() + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-123", + Version: 10, + State: map[string]interface{}{ + "balance": 100.50, + "status": "active", + }, + Timestamp: time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC), + } + + err := store.SaveSnapshot(snapshot) + if err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + // Verify snapshot was persisted by retrieving it + retrieved, err := store.GetLatestSnapshot("actor-123") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved == nil { + t.Fatal("expected snapshot to be persisted, got nil") + } + + if retrieved.ActorID != snapshot.ActorID { + t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, snapshot.ActorID) + } + if retrieved.Version != snapshot.Version { + t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, snapshot.Version) + } + if retrieved.State["balance"] != snapshot.State["balance"] { + t.Errorf("State.balance mismatch: got %v, want %v", retrieved.State["balance"], snapshot.State["balance"]) + } + if retrieved.State["status"] != snapshot.State["status"] { + t.Errorf("State.status mismatch: got %v, want %v", retrieved.State["status"], snapshot.State["status"]) + } + if !retrieved.Timestamp.Equal(snapshot.Timestamp) { + t.Errorf("Timestamp mismatch: got %v, want %v", retrieved.Timestamp, snapshot.Timestamp) + } +} + +func TestSaveSnapshot_MultipleSnapshots(t *testing.T) { + store := NewInMemoryEventStore() + + // Save multiple snapshots for the same actor + for i := 1; i <= 5; i++ { + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-multi", + Version: int64(i * 10), + State: map[string]interface{}{ + "iteration": i, + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed for version %d: %v", i*10, err) + } + } + + // Verify all snapshots were saved by checking the latest + retrieved, err := store.GetLatestSnapshot("actor-multi") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved.Version != 50 { + t.Errorf("expected latest version 50, got %d", retrieved.Version) + } +} + +func TestGetLatestSnapshot_ReturnsMostRecent(t *testing.T) { + store := NewInMemoryEventStore() + + // Save snapshots in non-sequential order to test version comparison + versions := []int64{5, 15, 10, 25, 20} + for _, v := range versions { + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-latest", + Version: v, + State: map[string]interface{}{ + "version": v, + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed for version %d: %v", v, err) + } + } + + latest, err := store.GetLatestSnapshot("actor-latest") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if latest == nil { + t.Fatal("expected snapshot, got nil") + } + + if latest.Version != 25 { + t.Errorf("expected latest version 25, got %d", latest.Version) + } + + // Verify the state matches the snapshot with version 25 + if latest.State["version"].(int64) != 25 { + t.Errorf("expected state.version to be 25, got %v", latest.State["version"]) + } +} + +func TestGetLatestSnapshot_NoSnapshotExists(t *testing.T) { + store := NewInMemoryEventStore() + + // Query for a non-existent actor + snapshot, err := store.GetLatestSnapshot("non-existent-actor") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if snapshot != nil { + t.Errorf("expected nil for non-existent actor, got %+v", snapshot) + } +} + +func TestGetLatestSnapshot_EmptyActorID(t *testing.T) { + store := NewInMemoryEventStore() + + // Save a snapshot with empty actor ID + snapshot := &aether.ActorSnapshot{ + ActorID: "", + Version: 1, + State: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + // Retrieve with empty actor ID + retrieved, err := store.GetLatestSnapshot("") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved == nil { + t.Fatal("expected snapshot with empty actorID, got nil") + } +} + +func TestSnapshotVersioning_RespectedAcrossActors(t *testing.T) { + store := NewInMemoryEventStore() + + // Save snapshots for different actors + actors := []string{"actor-a", "actor-b", "actor-c"} + for i, actorID := range actors { + snapshot := &aether.ActorSnapshot{ + ActorID: actorID, + Version: int64((i + 1) * 100), // Different versions per actor + State: map[string]interface{}{ + "actor": actorID, + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed for %s: %v", actorID, err) + } + } + + // Verify each actor has their own snapshot with correct version + for i, actorID := range actors { + snapshot, err := store.GetLatestSnapshot(actorID) + if err != nil { + t.Fatalf("GetLatestSnapshot failed for %s: %v", actorID, err) + } + + expectedVersion := int64((i + 1) * 100) + if snapshot.Version != expectedVersion { + t.Errorf("actor %s: expected version %d, got %d", actorID, expectedVersion, snapshot.Version) + } + } +} + +func TestSnapshotVersioning_LowerVersionAfterHigher(t *testing.T) { + store := NewInMemoryEventStore() + + // Save a high version first + highSnapshot := &aether.ActorSnapshot{ + ActorID: "actor-order", + Version: 100, + State: map[string]interface{}{ + "marker": "high", + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(highSnapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + // Save a lower version after + lowSnapshot := &aether.ActorSnapshot{ + ActorID: "actor-order", + Version: 50, + State: map[string]interface{}{ + "marker": "low", + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(lowSnapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + // GetLatestSnapshot should return the higher version (100), not the most recently saved + latest, err := store.GetLatestSnapshot("actor-order") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if latest.Version != 100 { + t.Errorf("expected version 100, got %d", latest.Version) + } + if latest.State["marker"] != "high" { + t.Errorf("expected marker 'high', got %v", latest.State["marker"]) + } +} + +func TestSnapshotDataIntegrity_ComplexState(t *testing.T) { + store := NewInMemoryEventStore() + + complexState := map[string]interface{}{ + "string": "hello", + "integer": 42, + "float": 3.14159, + "boolean": true, + "null": nil, + "array": []interface{}{"a", "b", "c"}, + "nested": map[string]interface{}{ + "level1": map[string]interface{}{ + "level2": "deep value", + }, + }, + } + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-complex", + Version: 1, + State: complexState, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-complex") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + // Verify all fields + if retrieved.State["string"] != "hello" { + t.Errorf("string mismatch: got %v", retrieved.State["string"]) + } + if retrieved.State["integer"] != 42 { + t.Errorf("integer mismatch: got %v", retrieved.State["integer"]) + } + if retrieved.State["float"] != 3.14159 { + t.Errorf("float mismatch: got %v", retrieved.State["float"]) + } + if retrieved.State["boolean"] != true { + t.Errorf("boolean mismatch: got %v", retrieved.State["boolean"]) + } + if retrieved.State["null"] != nil { + t.Errorf("null mismatch: got %v", retrieved.State["null"]) + } + + // Verify array + arr, ok := retrieved.State["array"].([]interface{}) + if !ok { + t.Fatal("array is not []interface{}") + } + if len(arr) != 3 || arr[0] != "a" || arr[1] != "b" || arr[2] != "c" { + t.Errorf("array mismatch: got %v", arr) + } + + // Verify nested structure + nested, ok := retrieved.State["nested"].(map[string]interface{}) + if !ok { + t.Fatal("nested is not map[string]interface{}") + } + level1, ok := nested["level1"].(map[string]interface{}) + if !ok { + t.Fatal("level1 is not map[string]interface{}") + } + if level1["level2"] != "deep value" { + t.Errorf("nested value mismatch: got %v", level1["level2"]) + } +} + +func TestSnapshotDataIntegrity_SpecialCharacters(t *testing.T) { + store := NewInMemoryEventStore() + + specialState := map[string]interface{}{ + "unicode": "Hello, \u4e16\u754c!", + "emoji": "\U0001F600\U0001F680", + "newlines": "line1\nline2\r\nline3", + "tabs": "col1\tcol2", + "quotes": `"double" and 'single'`, + "backslash": `path\to\file`, + } + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-special", + Version: 1, + State: specialState, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-special") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + for key, expected := range specialState { + if retrieved.State[key] != expected { + t.Errorf("State[%q] mismatch: got %q, want %q", key, retrieved.State[key], expected) + } + } +} + +func TestSnapshotDataIntegrity_EmptyState(t *testing.T) { + store := NewInMemoryEventStore() + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-empty", + Version: 1, + State: map[string]interface{}{}, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-empty") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if len(retrieved.State) != 0 { + t.Errorf("expected empty state, got %v", retrieved.State) + } +} + +func TestSnapshotDataIntegrity_NilState(t *testing.T) { + store := NewInMemoryEventStore() + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-nil", + Version: 1, + State: nil, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-nil") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved.State != nil { + t.Errorf("expected nil state, got %v", retrieved.State) + } +} + +func TestSnapshotDataIntegrity_LargeState(t *testing.T) { + store := NewInMemoryEventStore() + + // Create a large state with many entries + largeState := make(map[string]interface{}) + for i := 0; i < 1000; i++ { + largeState[string(rune('a'+i%26))+string(rune('0'+i%10))] = i + } + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-large", + Version: 1, + State: largeState, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-large") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if len(retrieved.State) != len(largeState) { + t.Errorf("state size mismatch: got %d, want %d", len(retrieved.State), len(largeState)) + } +} + +func TestSnapshotDataIntegrity_TimestampPreserved(t *testing.T) { + store := NewInMemoryEventStore() + + // Test various timestamps + timestamps := []time.Time{ + time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC), + time.Date(2020, 6, 15, 23, 59, 59, 999999999, time.UTC), + time.Time{}, // Zero time + } + + for i, ts := range timestamps { + actorID := "actor-ts-" + string(rune('a'+i)) + snapshot := &aether.ActorSnapshot{ + ActorID: actorID, + Version: 1, + State: map[string]interface{}{}, + Timestamp: ts, + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot(actorID) + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if !retrieved.Timestamp.Equal(ts) { + t.Errorf("timestamp %d mismatch: got %v, want %v", i, retrieved.Timestamp, ts) + } + } +} + +func TestSnapshotVersioning_ZeroVersion(t *testing.T) { + store := NewInMemoryEventStore() + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-zero-version", + Version: 0, + State: map[string]interface{}{"initial": true}, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-zero-version") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved.Version != 0 { + t.Errorf("expected version 0, got %d", retrieved.Version) + } +} + +func TestSnapshotVersioning_LargeVersion(t *testing.T) { + store := NewInMemoryEventStore() + + largeVersion := int64(9223372036854775807) // MaxInt64 + + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-large-version", + Version: largeVersion, + State: map[string]interface{}{"maxed": true}, + Timestamp: time.Now(), + } + + if err := store.SaveSnapshot(snapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + retrieved, err := store.GetLatestSnapshot("actor-large-version") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + + if retrieved.Version != largeVersion { + t.Errorf("expected version %d, got %d", largeVersion, retrieved.Version) + } +} + +func TestSnapshotStore_ImplementsInterface(t *testing.T) { + // Verify InMemoryEventStore implements SnapshotStore interface + var _ aether.SnapshotStore = (*InMemoryEventStore)(nil) +}