diff --git a/store/memory.go b/store/memory.go index 895a7fe..21c5b14 100644 --- a/store/memory.go +++ b/store/memory.go @@ -1,11 +1,15 @@ package store import ( + "fmt" + "sync" + "git.flowmade.one/flowmade-one/aether" ) // InMemoryEventStore provides a simple in-memory event store for testing type InMemoryEventStore struct { + mu sync.RWMutex events map[string][]*aether.Event // actorID -> events snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version) } @@ -20,6 +24,9 @@ func NewInMemoryEventStore() *InMemoryEventStore { // SaveEvent saves an event to the in-memory store func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { + es.mu.Lock() + defer es.mu.Unlock() + if _, exists := es.events[event.ActorID]; !exists { es.events[event.ActorID] = make([]*aether.Event, 0) } @@ -29,6 +36,9 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { // GetEvents retrieves events for an actor from a specific version func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) { + es.mu.RLock() + defer es.mu.RUnlock() + events, exists := es.events[actorID] if !exists { return []*aether.Event{}, nil @@ -46,6 +56,9 @@ func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*a // GetLatestVersion returns the latest version for an actor func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) { + es.mu.RLock() + defer es.mu.RUnlock() + events, exists := es.events[actorID] if !exists || len(events) == 0 { return 0, nil @@ -63,6 +76,13 @@ func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) { // SaveSnapshot saves a snapshot to the in-memory store func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error { + if snapshot == nil { + return fmt.Errorf("snapshot cannot be nil") + } + + es.mu.Lock() + defer es.mu.Unlock() + if _, exists := es.snapshots[snapshot.ActorID]; !exists { es.snapshots[snapshot.ActorID] = make([]*aether.ActorSnapshot, 0) } @@ -72,6 +92,9 @@ func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error // GetLatestSnapshot returns the most recent snapshot for an actor func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) { + es.mu.RLock() + defer es.mu.RUnlock() + snapshots, exists := es.snapshots[actorID] if !exists || len(snapshots) == 0 { return nil, nil diff --git a/store/memory_test.go b/store/memory_test.go index b098245..20b7639 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -1,12 +1,855 @@ package store import ( + "fmt" + "sync" "testing" "time" "git.flowmade.one/flowmade-one/aether" ) +// === Event Store Tests (from main branch) === + +func TestNewInMemoryEventStore(t *testing.T) { + store := NewInMemoryEventStore() + + if store == nil { + t.Fatal("NewInMemoryEventStore returned nil") + } + if store.events == nil { + t.Error("events map is nil") + } +} + +func TestSaveEvent_SingleEvent(t *testing.T) { + store := NewInMemoryEventStore() + + event := &aether.Event{ + ID: "evt-123", + EventType: "OrderPlaced", + ActorID: "order-456", + Version: 1, + Data: map[string]interface{}{ + "total": 100.50, + }, + Timestamp: time.Now(), + } + + err := store.SaveEvent(event) + if err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + // Verify event was persisted + events, err := store.GetEvents("order-456", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + if len(events) != 1 { + t.Fatalf("expected 1 event, got %d", len(events)) + } + if events[0].ID != "evt-123" { + t.Errorf("event ID mismatch: got %q, want %q", events[0].ID, "evt-123") + } +} + +func TestSaveEvent_MultipleEvents(t *testing.T) { + store := NewInMemoryEventStore() + + for i := 1; i <= 5; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "OrderUpdated", + ActorID: "order-456", + Version: int64(i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed for event %d: %v", i, err) + } + } + + events, err := store.GetEvents("order-456", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + if len(events) != 5 { + t.Errorf("expected 5 events, got %d", len(events)) + } +} + +func TestSaveEvent_MultipleActors(t *testing.T) { + store := NewInMemoryEventStore() + + // Save events for different actors + actors := []string{"actor-1", "actor-2", "actor-3"} + for _, actorID := range actors { + for i := 1; i <= 3; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%s-%d", actorID, i), + EventType: "TestEvent", + ActorID: actorID, + Version: int64(i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + } + } + + // Verify each actor has its own events + for _, actorID := range actors { + events, err := store.GetEvents(actorID, 0) + if err != nil { + t.Fatalf("GetEvents failed for %s: %v", actorID, err) + } + if len(events) != 3 { + t.Errorf("expected 3 events for %s, got %d", actorID, len(events)) + } + for _, event := range events { + if event.ActorID != actorID { + t.Errorf("event has wrong ActorID: got %q, want %q", event.ActorID, actorID) + } + } + } +} + +func TestSaveEvent_PreservesAllFields(t *testing.T) { + store := NewInMemoryEventStore() + + ts := time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC) + event := &aether.Event{ + ID: "evt-123", + EventType: "OrderPlaced", + ActorID: "order-456", + CommandID: "cmd-789", + Version: 42, + Data: map[string]interface{}{ + "total": 100.50, + "currency": "USD", + }, + Timestamp: ts, + } + + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + events, err := store.GetEvents("order-456", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + retrieved := events[0] + if retrieved.ID != event.ID { + t.Errorf("ID mismatch: got %q, want %q", retrieved.ID, event.ID) + } + if retrieved.EventType != event.EventType { + t.Errorf("EventType mismatch: got %q, want %q", retrieved.EventType, event.EventType) + } + if retrieved.ActorID != event.ActorID { + t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, event.ActorID) + } + if retrieved.CommandID != event.CommandID { + t.Errorf("CommandID mismatch: got %q, want %q", retrieved.CommandID, event.CommandID) + } + if retrieved.Version != event.Version { + t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, event.Version) + } + if !retrieved.Timestamp.Equal(event.Timestamp) { + t.Errorf("Timestamp mismatch: got %v, want %v", retrieved.Timestamp, event.Timestamp) + } + if retrieved.Data["total"] != event.Data["total"] { + t.Errorf("Data.total mismatch: got %v, want %v", retrieved.Data["total"], event.Data["total"]) + } + if retrieved.Data["currency"] != event.Data["currency"] { + t.Errorf("Data.currency mismatch: got %v, want %v", retrieved.Data["currency"], event.Data["currency"]) + } +} + +func TestGetEvents_RetrievesInOrder(t *testing.T) { + store := NewInMemoryEventStore() + + // Save events in order + for i := 1; i <= 10; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "TestEvent", + ActorID: "actor-123", + Version: int64(i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + } + + events, err := store.GetEvents("actor-123", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + // Verify events are returned in insertion order + for i, event := range events { + expectedID := fmt.Sprintf("evt-%d", i+1) + if event.ID != expectedID { + t.Errorf("event %d: got ID %q, want %q", i, event.ID, expectedID) + } + expectedVersion := int64(i + 1) + if event.Version != expectedVersion { + t.Errorf("event %d: got Version %d, want %d", i, event.Version, expectedVersion) + } + } +} + +func TestGetEvents_FromVersionFilters(t *testing.T) { + store := NewInMemoryEventStore() + + // Save events with versions 1-10 + for i := 1; i <= 10; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "TestEvent", + ActorID: "actor-123", + Version: int64(i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + } + + testCases := []struct { + name string + fromVersion int64 + expected int + minVersion int64 + }{ + {"from version 0", 0, 10, 1}, + {"from version 1", 1, 10, 1}, + {"from version 5", 5, 6, 5}, + {"from version 10", 10, 1, 10}, + {"from version 11", 11, 0, 0}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + events, err := store.GetEvents("actor-123", tc.fromVersion) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + if len(events) != tc.expected { + t.Errorf("expected %d events, got %d", tc.expected, len(events)) + } + + // Verify all returned events have version >= fromVersion + for _, event := range events { + if event.Version < tc.fromVersion { + t.Errorf("event version %d is less than fromVersion %d", event.Version, tc.fromVersion) + } + } + + // Verify minimum version if events exist + if len(events) > 0 && events[0].Version != tc.minVersion { + t.Errorf("first event version: got %d, want %d", events[0].Version, tc.minVersion) + } + }) + } +} + +func TestGetEvents_FromVersionZero(t *testing.T) { + store := NewInMemoryEventStore() + + event := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + // fromVersion 0 should return all events + events, err := store.GetEvents("actor-123", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + if len(events) != 1 { + t.Errorf("expected 1 event with fromVersion 0, got %d", len(events)) + } +} + +func TestGetLatestVersion_ReturnsCorrectVersion(t *testing.T) { + store := NewInMemoryEventStore() + + // Save events with various versions + versions := []int64{1, 3, 2, 5, 4} // Out of order + for i, version := range versions { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "TestEvent", + ActorID: "actor-123", + Version: version, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + } + + latestVersion, err := store.GetLatestVersion("actor-123") + if err != nil { + t.Fatalf("GetLatestVersion failed: %v", err) + } + + // Should return the highest version (5) + if latestVersion != 5 { + t.Errorf("expected latest version 5, got %d", latestVersion) + } +} + +func TestGetLatestVersion_SingleEvent(t *testing.T) { + store := NewInMemoryEventStore() + + event := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 42, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + latestVersion, err := store.GetLatestVersion("actor-123") + if err != nil { + t.Fatalf("GetLatestVersion failed: %v", err) + } + + if latestVersion != 42 { + t.Errorf("expected latest version 42, got %d", latestVersion) + } +} + +func TestGetLatestVersion_UpdatesAfterNewEvent(t *testing.T) { + store := NewInMemoryEventStore() + + // Save first event + event1 := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event1); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + version1, err := store.GetLatestVersion("actor-123") + if err != nil { + t.Fatalf("GetLatestVersion failed: %v", err) + } + if version1 != 1 { + t.Errorf("expected version 1, got %d", version1) + } + + // Save second event with higher version + event2 := &aether.Event{ + ID: "evt-2", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 10, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event2); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + version2, err := store.GetLatestVersion("actor-123") + if err != nil { + t.Fatalf("GetLatestVersion failed: %v", err) + } + if version2 != 10 { + t.Errorf("expected version 10, got %d", version2) + } +} + +func TestGetEvents_NonExistentActor(t *testing.T) { + store := NewInMemoryEventStore() + + // Save event for one actor + event := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + // Get events for non-existent actor + events, err := store.GetEvents("non-existent-actor", 0) + if err != nil { + t.Fatalf("GetEvents should not error for non-existent actor: %v", err) + } + + if events == nil { + t.Error("GetEvents should return non-nil slice for non-existent actor") + } + if len(events) != 0 { + t.Errorf("expected 0 events for non-existent actor, got %d", len(events)) + } +} + +func TestGetLatestVersion_NonExistentActor(t *testing.T) { + store := NewInMemoryEventStore() + + // Get latest version for non-existent actor + version, err := store.GetLatestVersion("non-existent-actor") + if err != nil { + t.Fatalf("GetLatestVersion should not error for non-existent actor: %v", err) + } + + if version != 0 { + t.Errorf("expected version 0 for non-existent actor, got %d", version) + } +} + +func TestGetLatestVersion_EmptyStore(t *testing.T) { + store := NewInMemoryEventStore() + + version, err := store.GetLatestVersion("any-actor") + if err != nil { + t.Fatalf("GetLatestVersion should not error for empty store: %v", err) + } + + if version != 0 { + t.Errorf("expected version 0 for empty store, got %d", version) + } +} + +func TestGetEvents_EmptyStore(t *testing.T) { + store := NewInMemoryEventStore() + + events, err := store.GetEvents("any-actor", 0) + if err != nil { + t.Fatalf("GetEvents should not error for empty store: %v", err) + } + + if events == nil { + t.Error("GetEvents should return non-nil slice for empty store") + } + if len(events) != 0 { + t.Errorf("expected 0 events for empty store, got %d", len(events)) + } +} + +func TestConcurrentSaveEvent(t *testing.T) { + store := NewInMemoryEventStore() + + numGoroutines := 100 + eventsPerGoroutine := 10 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for g := 0; g < numGoroutines; g++ { + go func(goroutineID int) { + defer wg.Done() + for i := 0; i < eventsPerGoroutine; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d-%d", goroutineID, i), + EventType: "TestEvent", + ActorID: fmt.Sprintf("actor-%d", goroutineID), + Version: int64(i + 1), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Errorf("SaveEvent failed: %v", err) + } + } + }(g) + } + + wg.Wait() + + // Verify each actor has the correct number of events + for g := 0; g < numGoroutines; g++ { + actorID := fmt.Sprintf("actor-%d", g) + events, err := store.GetEvents(actorID, 0) + if err != nil { + t.Errorf("GetEvents failed for %s: %v", actorID, err) + continue + } + if len(events) != eventsPerGoroutine { + t.Errorf("expected %d events for %s, got %d", eventsPerGoroutine, actorID, len(events)) + } + } +} + +func TestConcurrentSaveAndGet(t *testing.T) { + store := NewInMemoryEventStore() + + // Pre-populate with some events + for i := 1; i <= 10; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "TestEvent", + ActorID: "actor-123", + Version: int64(i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + } + + var wg sync.WaitGroup + numReaders := 50 + numWriters := 10 + readsPerReader := 100 + writesPerWriter := 10 + + // Start readers + wg.Add(numReaders) + for r := 0; r < numReaders; r++ { + go func() { + defer wg.Done() + for i := 0; i < readsPerReader; i++ { + events, err := store.GetEvents("actor-123", 0) + if err != nil { + t.Errorf("GetEvents failed: %v", err) + } + if len(events) < 10 { + t.Errorf("expected at least 10 events, got %d", len(events)) + } + + _, err = store.GetLatestVersion("actor-123") + if err != nil { + t.Errorf("GetLatestVersion failed: %v", err) + } + } + }() + } + + // Start writers + wg.Add(numWriters) + for w := 0; w < numWriters; w++ { + go func(writerID int) { + defer wg.Done() + for i := 0; i < writesPerWriter; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-writer-%d-%d", writerID, i), + EventType: "TestEvent", + ActorID: "actor-123", + Version: int64(100 + writerID*writesPerWriter + i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Errorf("SaveEvent failed: %v", err) + } + } + }(w) + } + + wg.Wait() + + // Verify final state + events, err := store.GetEvents("actor-123", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + expectedTotal := 10 + numWriters*writesPerWriter + if len(events) != expectedTotal { + t.Errorf("expected %d total events, got %d", expectedTotal, len(events)) + } +} + +func TestConcurrentGetLatestVersion(t *testing.T) { + store := NewInMemoryEventStore() + + // Save initial event + event := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 100, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + var wg sync.WaitGroup + numGoroutines := 100 + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + version, err := store.GetLatestVersion("actor-123") + if err != nil { + t.Errorf("GetLatestVersion failed: %v", err) + } + if version < 100 { + t.Errorf("expected version >= 100, got %d", version) + } + } + }() + } + + wg.Wait() +} + +func TestEventStoreInterface(t *testing.T) { + // Verify InMemoryEventStore implements EventStore interface + var _ aether.EventStore = (*InMemoryEventStore)(nil) +} + +func TestSaveEvent_NilData(t *testing.T) { + store := NewInMemoryEventStore() + + event := &aether.Event{ + ID: "evt-nil", + EventType: "NilDataEvent", + ActorID: "actor-123", + Version: 1, + Data: nil, + Timestamp: time.Now(), + } + + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed with nil data: %v", err) + } + + events, err := store.GetEvents("actor-123", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + if events[0].Data != nil { + t.Errorf("expected nil Data, got %v", events[0].Data) + } +} + +func TestSaveEvent_EmptyData(t *testing.T) { + store := NewInMemoryEventStore() + + event := &aether.Event{ + ID: "evt-empty", + EventType: "EmptyDataEvent", + ActorID: "actor-123", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed with empty data: %v", err) + } + + events, err := store.GetEvents("actor-123", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + if len(events[0].Data) != 0 { + t.Errorf("expected empty Data map, got %v", events[0].Data) + } +} + +func TestGetEvents_VersionEdgeCases(t *testing.T) { + store := NewInMemoryEventStore() + + // Save events with edge case versions + versions := []int64{0, 1, 9223372036854775807} // Zero, one, MaxInt64 + for i, version := range versions { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "TestEvent", + ActorID: "actor-123", + Version: version, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + } + + // Test fromVersion 0 returns all + events, err := store.GetEvents("actor-123", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + if len(events) != 3 { + t.Errorf("expected 3 events, got %d", len(events)) + } + + // Test fromVersion MaxInt64 returns only that event + events, err = store.GetEvents("actor-123", 9223372036854775807) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + if len(events) != 1 { + t.Errorf("expected 1 event, got %d", len(events)) + } +} + +func TestGetLatestVersion_VersionEdgeCases(t *testing.T) { + store := NewInMemoryEventStore() + + // Save event with MaxInt64 version + event := &aether.Event{ + ID: "evt-max", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 9223372036854775807, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + latestVersion, err := store.GetLatestVersion("actor-123") + if err != nil { + t.Fatalf("GetLatestVersion failed: %v", err) + } + + if latestVersion != 9223372036854775807 { + t.Errorf("expected MaxInt64 version, got %d", latestVersion) + } +} + +func TestSaveEvent_SpecialActorIDs(t *testing.T) { + store := NewInMemoryEventStore() + + specialIDs := []string{ + "simple", + "with-dashes", + "with_underscores", + "with.dots", + "with:colons", + "with/slashes", + "user@example.com", + "unicode-δΈ–η•Œ", + "", + } + + for _, actorID := range specialIDs { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%s", actorID), + EventType: "TestEvent", + ActorID: actorID, + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Errorf("SaveEvent failed for actorID %q: %v", actorID, err) + continue + } + + events, err := store.GetEvents(actorID, 0) + if err != nil { + t.Errorf("GetEvents failed for actorID %q: %v", actorID, err) + continue + } + if len(events) != 1 { + t.Errorf("expected 1 event for actorID %q, got %d", actorID, len(events)) + } + } +} + +func BenchmarkSaveEvent(b *testing.B) { + store := NewInMemoryEventStore() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "BenchmarkEvent", + ActorID: "actor-123", + Version: int64(i + 1), + Data: map[string]interface{}{"value": i}, + Timestamp: time.Now(), + } + store.SaveEvent(event) + } +} + +func BenchmarkGetEvents(b *testing.B) { + store := NewInMemoryEventStore() + + // Pre-populate with events + for i := 0; i < 1000; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "BenchmarkEvent", + ActorID: "actor-123", + Version: int64(i + 1), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + store.SaveEvent(event) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + store.GetEvents("actor-123", 0) + } +} + +func BenchmarkGetLatestVersion(b *testing.B) { + store := NewInMemoryEventStore() + + // Pre-populate with events + for i := 0; i < 1000; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "BenchmarkEvent", + ActorID: "actor-123", + Version: int64(i + 1), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + store.SaveEvent(event) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + store.GetLatestVersion("actor-123") + } +} + +// === Snapshot Store Tests (from PR branch) === + func TestSaveSnapshot_PersistsCorrectly(t *testing.T) { store := NewInMemoryEventStore() @@ -52,6 +895,15 @@ func TestSaveSnapshot_PersistsCorrectly(t *testing.T) { } } +func TestSaveSnapshot_NilSnapshot(t *testing.T) { + store := NewInMemoryEventStore() + + err := store.SaveSnapshot(nil) + if err == nil { + t.Error("expected error when saving nil snapshot, got nil") + } +} + func TestSaveSnapshot_MultipleSnapshots(t *testing.T) { store := NewInMemoryEventStore() @@ -138,9 +990,9 @@ func TestGetLatestSnapshot_EmptyActorID(t *testing.T) { // Save a snapshot with empty actor ID snapshot := &aether.ActorSnapshot{ - ActorID: "", - Version: 1, - State: map[string]interface{}{}, + ActorID: "", + Version: 1, + State: map[string]interface{}{}, Timestamp: time.Now(), } if err := store.SaveSnapshot(snapshot); err != nil { @@ -311,12 +1163,12 @@ func TestSnapshotDataIntegrity_SpecialCharacters(t *testing.T) { store := NewInMemoryEventStore() specialState := map[string]interface{}{ - "unicode": "Hello, \u4e16\u754c!", - "emoji": "\U0001F600\U0001F680", - "newlines": "line1\nline2\r\nline3", - "tabs": "col1\tcol2", - "quotes": `"double" and 'single'`, - "backslash": `path\to\file`, + "unicode": "Hello, δΈ–η•Œ!", + "emoji": "πŸ˜€πŸš€", + "newlines": "line1\nline2\r\nline3", + "tabs": "col1\tcol2", + "quotes": `"double" and 'single'`, + "backslash": `path\to\file`, } snapshot := &aether.ActorSnapshot{ @@ -393,10 +1245,10 @@ func TestSnapshotDataIntegrity_NilState(t *testing.T) { func TestSnapshotDataIntegrity_LargeState(t *testing.T) { store := NewInMemoryEventStore() - // Create a large state with many entries + // Create a large state with many entries using unique keys largeState := make(map[string]interface{}) for i := 0; i < 1000; i++ { - largeState[string(rune('a'+i%26))+string(rune('0'+i%10))] = i + largeState[fmt.Sprintf("key-%d", i)] = i } snapshot := &aether.ActorSnapshot{ @@ -431,7 +1283,7 @@ func TestSnapshotDataIntegrity_TimestampPreserved(t *testing.T) { } for i, ts := range timestamps { - actorID := "actor-ts-" + string(rune('a'+i)) + actorID := fmt.Sprintf("actor-ts-%d", i) snapshot := &aether.ActorSnapshot{ ActorID: actorID, Version: 1, @@ -508,3 +1360,131 @@ func TestSnapshotStore_ImplementsInterface(t *testing.T) { // Verify InMemoryEventStore implements SnapshotStore interface var _ aether.SnapshotStore = (*InMemoryEventStore)(nil) } + +func TestConcurrentSaveSnapshot(t *testing.T) { + store := NewInMemoryEventStore() + + numGoroutines := 100 + snapshotsPerGoroutine := 10 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for g := 0; g < numGoroutines; g++ { + go func(goroutineID int) { + defer wg.Done() + for i := 0; i < snapshotsPerGoroutine; i++ { + snapshot := &aether.ActorSnapshot{ + ActorID: fmt.Sprintf("actor-%d", goroutineID), + Version: int64(i + 1), + State: map[string]interface{}{ + "goroutine": goroutineID, + "iteration": i, + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(snapshot); err != nil { + t.Errorf("SaveSnapshot failed: %v", err) + } + } + }(g) + } + + wg.Wait() + + // Verify each actor has snapshots + for g := 0; g < numGoroutines; g++ { + actorID := fmt.Sprintf("actor-%d", g) + snapshot, err := store.GetLatestSnapshot(actorID) + if err != nil { + t.Errorf("GetLatestSnapshot failed for %s: %v", actorID, err) + continue + } + if snapshot == nil { + t.Errorf("expected snapshot for %s, got nil", actorID) + continue + } + if snapshot.Version != int64(snapshotsPerGoroutine) { + t.Errorf("expected latest version %d for %s, got %d", snapshotsPerGoroutine, actorID, snapshot.Version) + } + } +} + +func TestConcurrentSaveAndGetSnapshot(t *testing.T) { + store := NewInMemoryEventStore() + + // Pre-populate with initial snapshot + initialSnapshot := &aether.ActorSnapshot{ + ActorID: "actor-123", + Version: 1, + State: map[string]interface{}{ + "initial": true, + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(initialSnapshot); err != nil { + t.Fatalf("SaveSnapshot failed: %v", err) + } + + var wg sync.WaitGroup + numReaders := 50 + numWriters := 10 + readsPerReader := 100 + writesPerWriter := 10 + + // Start readers + wg.Add(numReaders) + for r := 0; r < numReaders; r++ { + go func() { + defer wg.Done() + for i := 0; i < readsPerReader; i++ { + snapshot, err := store.GetLatestSnapshot("actor-123") + if err != nil { + t.Errorf("GetLatestSnapshot failed: %v", err) + } + if snapshot == nil { + t.Error("expected snapshot, got nil") + } + } + }() + } + + // Start writers + wg.Add(numWriters) + for w := 0; w < numWriters; w++ { + go func(writerID int) { + defer wg.Done() + for i := 0; i < writesPerWriter; i++ { + snapshot := &aether.ActorSnapshot{ + ActorID: "actor-123", + Version: int64(100 + writerID*writesPerWriter + i), + State: map[string]interface{}{ + "writer": writerID, + "index": i, + }, + Timestamp: time.Now(), + } + if err := store.SaveSnapshot(snapshot); err != nil { + t.Errorf("SaveSnapshot failed: %v", err) + } + } + }(w) + } + + wg.Wait() + + // Verify final state - should have the highest version + snapshot, err := store.GetLatestSnapshot("actor-123") + if err != nil { + t.Fatalf("GetLatestSnapshot failed: %v", err) + } + if snapshot == nil { + t.Fatal("expected snapshot, got nil") + } + // The highest version should be around 100 + (numWriters-1)*writesPerWriter + (writesPerWriter-1) + // which is 100 + 9*10 + 9 = 199 + expectedMaxVersion := int64(100 + (numWriters-1)*writesPerWriter + (writesPerWriter - 1)) + if snapshot.Version != expectedMaxVersion { + t.Errorf("expected latest version %d, got %d", expectedMaxVersion, snapshot.Version) + } +}