Add comprehensive unit tests for InMemoryEventStore
- Test SaveEvent persists events correctly (single, multiple, multi-actor) - Test GetEvents retrieves events in insertion order - Test GetEvents with fromVersion filtering - Test GetLatestVersion returns correct version - Test behavior with non-existent actor IDs (returns empty/zero) - Test concurrent access safety with race detector - Add mutex protection to InMemoryEventStore for thread safety Closes #3 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit was merged in pull request #32.
This commit is contained in:
@@ -1,11 +1,14 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
)
|
||||
|
||||
// InMemoryEventStore provides a simple in-memory event store for testing
|
||||
type InMemoryEventStore struct {
|
||||
mu sync.RWMutex
|
||||
events map[string][]*aether.Event // actorID -> events
|
||||
}
|
||||
|
||||
@@ -18,6 +21,9 @@ func NewInMemoryEventStore() *InMemoryEventStore {
|
||||
|
||||
// SaveEvent saves an event to the in-memory store
|
||||
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||
es.mu.Lock()
|
||||
defer es.mu.Unlock()
|
||||
|
||||
if _, exists := es.events[event.ActorID]; !exists {
|
||||
es.events[event.ActorID] = make([]*aether.Event, 0)
|
||||
}
|
||||
@@ -27,6 +33,9 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||
|
||||
// GetEvents retrieves events for an actor from a specific version
|
||||
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||
es.mu.RLock()
|
||||
defer es.mu.RUnlock()
|
||||
|
||||
events, exists := es.events[actorID]
|
||||
if !exists {
|
||||
return []*aether.Event{}, nil
|
||||
@@ -44,6 +53,9 @@ func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*a
|
||||
|
||||
// GetLatestVersion returns the latest version for an actor
|
||||
func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) {
|
||||
es.mu.RLock()
|
||||
defer es.mu.RUnlock()
|
||||
|
||||
events, exists := es.events[actorID]
|
||||
if !exists || len(events) == 0 {
|
||||
return 0, nil
|
||||
|
||||
847
store/memory_test.go
Normal file
847
store/memory_test.go
Normal file
@@ -0,0 +1,847 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
)
|
||||
|
||||
func TestNewInMemoryEventStore(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
if store == nil {
|
||||
t.Fatal("NewInMemoryEventStore returned nil")
|
||||
}
|
||||
if store.events == nil {
|
||||
t.Error("events map is nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_SingleEvent(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-123",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{
|
||||
"total": 100.50,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err := store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify event was persisted
|
||||
events, err := store.GetEvents("order-456", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected 1 event, got %d", len(events))
|
||||
}
|
||||
if events[0].ID != "evt-123" {
|
||||
t.Errorf("event ID mismatch: got %q, want %q", events[0].ID, "evt-123")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_MultipleEvents(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
for i := 1; i <= 5; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "OrderUpdated",
|
||||
ActorID: "order-456",
|
||||
Version: int64(i),
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed for event %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
events, err := store.GetEvents("order-456", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
if len(events) != 5 {
|
||||
t.Errorf("expected 5 events, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_MultipleActors(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save events for different actors
|
||||
actors := []string{"actor-1", "actor-2", "actor-3"}
|
||||
for _, actorID := range actors {
|
||||
for i := 1; i <= 3; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%s-%d", actorID, i),
|
||||
EventType: "TestEvent",
|
||||
ActorID: actorID,
|
||||
Version: int64(i),
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Verify each actor has its own events
|
||||
for _, actorID := range actors {
|
||||
events, err := store.GetEvents(actorID, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed for %s: %v", actorID, err)
|
||||
}
|
||||
if len(events) != 3 {
|
||||
t.Errorf("expected 3 events for %s, got %d", actorID, len(events))
|
||||
}
|
||||
for _, event := range events {
|
||||
if event.ActorID != actorID {
|
||||
t.Errorf("event has wrong ActorID: got %q, want %q", event.ActorID, actorID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_PreservesAllFields(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
ts := time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC)
|
||||
event := &aether.Event{
|
||||
ID: "evt-123",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
CommandID: "cmd-789",
|
||||
Version: 42,
|
||||
Data: map[string]interface{}{
|
||||
"total": 100.50,
|
||||
"currency": "USD",
|
||||
},
|
||||
Timestamp: ts,
|
||||
}
|
||||
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
events, err := store.GetEvents("order-456", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
|
||||
retrieved := events[0]
|
||||
if retrieved.ID != event.ID {
|
||||
t.Errorf("ID mismatch: got %q, want %q", retrieved.ID, event.ID)
|
||||
}
|
||||
if retrieved.EventType != event.EventType {
|
||||
t.Errorf("EventType mismatch: got %q, want %q", retrieved.EventType, event.EventType)
|
||||
}
|
||||
if retrieved.ActorID != event.ActorID {
|
||||
t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, event.ActorID)
|
||||
}
|
||||
if retrieved.CommandID != event.CommandID {
|
||||
t.Errorf("CommandID mismatch: got %q, want %q", retrieved.CommandID, event.CommandID)
|
||||
}
|
||||
if retrieved.Version != event.Version {
|
||||
t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, event.Version)
|
||||
}
|
||||
if !retrieved.Timestamp.Equal(event.Timestamp) {
|
||||
t.Errorf("Timestamp mismatch: got %v, want %v", retrieved.Timestamp, event.Timestamp)
|
||||
}
|
||||
if retrieved.Data["total"] != event.Data["total"] {
|
||||
t.Errorf("Data.total mismatch: got %v, want %v", retrieved.Data["total"], event.Data["total"])
|
||||
}
|
||||
if retrieved.Data["currency"] != event.Data["currency"] {
|
||||
t.Errorf("Data.currency mismatch: got %v, want %v", retrieved.Data["currency"], event.Data["currency"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetEvents_RetrievesInOrder(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save events in order
|
||||
for i := 1; i <= 10; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: int64(i),
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
events, err := store.GetEvents("actor-123", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify events are returned in insertion order
|
||||
for i, event := range events {
|
||||
expectedID := fmt.Sprintf("evt-%d", i+1)
|
||||
if event.ID != expectedID {
|
||||
t.Errorf("event %d: got ID %q, want %q", i, event.ID, expectedID)
|
||||
}
|
||||
expectedVersion := int64(i + 1)
|
||||
if event.Version != expectedVersion {
|
||||
t.Errorf("event %d: got Version %d, want %d", i, event.Version, expectedVersion)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetEvents_FromVersionFilters(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save events with versions 1-10
|
||||
for i := 1; i <= 10; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: int64(i),
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
fromVersion int64
|
||||
expected int
|
||||
minVersion int64
|
||||
}{
|
||||
{"from version 0", 0, 10, 1},
|
||||
{"from version 1", 1, 10, 1},
|
||||
{"from version 5", 5, 6, 5},
|
||||
{"from version 10", 10, 1, 10},
|
||||
{"from version 11", 11, 0, 0},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
events, err := store.GetEvents("actor-123", tc.fromVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
|
||||
if len(events) != tc.expected {
|
||||
t.Errorf("expected %d events, got %d", tc.expected, len(events))
|
||||
}
|
||||
|
||||
// Verify all returned events have version >= fromVersion
|
||||
for _, event := range events {
|
||||
if event.Version < tc.fromVersion {
|
||||
t.Errorf("event version %d is less than fromVersion %d", event.Version, tc.fromVersion)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify minimum version if events exist
|
||||
if len(events) > 0 && events[0].Version != tc.minVersion {
|
||||
t.Errorf("first event version: got %d, want %d", events[0].Version, tc.minVersion)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetEvents_FromVersionZero(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// fromVersion 0 should return all events
|
||||
events, err := store.GetEvents("actor-123", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
if len(events) != 1 {
|
||||
t.Errorf("expected 1 event with fromVersion 0, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetLatestVersion_ReturnsCorrectVersion(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save events with various versions
|
||||
versions := []int64{1, 3, 2, 5, 4} // Out of order
|
||||
for i, version := range versions {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: version,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
latestVersion, err := store.GetLatestVersion("actor-123")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestVersion failed: %v", err)
|
||||
}
|
||||
|
||||
// Should return the highest version (5)
|
||||
if latestVersion != 5 {
|
||||
t.Errorf("expected latest version 5, got %d", latestVersion)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetLatestVersion_SingleEvent(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: 42,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
latestVersion, err := store.GetLatestVersion("actor-123")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestVersion failed: %v", err)
|
||||
}
|
||||
|
||||
if latestVersion != 42 {
|
||||
t.Errorf("expected latest version 42, got %d", latestVersion)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetLatestVersion_UpdatesAfterNewEvent(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save first event
|
||||
event1 := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event1); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
version1, err := store.GetLatestVersion("actor-123")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestVersion failed: %v", err)
|
||||
}
|
||||
if version1 != 1 {
|
||||
t.Errorf("expected version 1, got %d", version1)
|
||||
}
|
||||
|
||||
// Save second event with higher version
|
||||
event2 := &aether.Event{
|
||||
ID: "evt-2",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: 10,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event2); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
version2, err := store.GetLatestVersion("actor-123")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestVersion failed: %v", err)
|
||||
}
|
||||
if version2 != 10 {
|
||||
t.Errorf("expected version 10, got %d", version2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetEvents_NonExistentActor(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save event for one actor
|
||||
event := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Get events for non-existent actor
|
||||
events, err := store.GetEvents("non-existent-actor", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents should not error for non-existent actor: %v", err)
|
||||
}
|
||||
|
||||
if events == nil {
|
||||
t.Error("GetEvents should return non-nil slice for non-existent actor")
|
||||
}
|
||||
if len(events) != 0 {
|
||||
t.Errorf("expected 0 events for non-existent actor, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetLatestVersion_NonExistentActor(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Get latest version for non-existent actor
|
||||
version, err := store.GetLatestVersion("non-existent-actor")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestVersion should not error for non-existent actor: %v", err)
|
||||
}
|
||||
|
||||
if version != 0 {
|
||||
t.Errorf("expected version 0 for non-existent actor, got %d", version)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetLatestVersion_EmptyStore(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
version, err := store.GetLatestVersion("any-actor")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestVersion should not error for empty store: %v", err)
|
||||
}
|
||||
|
||||
if version != 0 {
|
||||
t.Errorf("expected version 0 for empty store, got %d", version)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetEvents_EmptyStore(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
events, err := store.GetEvents("any-actor", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents should not error for empty store: %v", err)
|
||||
}
|
||||
|
||||
if events == nil {
|
||||
t.Error("GetEvents should return non-nil slice for empty store")
|
||||
}
|
||||
if len(events) != 0 {
|
||||
t.Errorf("expected 0 events for empty store, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentSaveEvent(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
numGoroutines := 100
|
||||
eventsPerGoroutine := 10
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numGoroutines)
|
||||
|
||||
for g := 0; g < numGoroutines; g++ {
|
||||
go func(goroutineID int) {
|
||||
defer wg.Done()
|
||||
for i := 0; i < eventsPerGoroutine; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d-%d", goroutineID, i),
|
||||
EventType: "TestEvent",
|
||||
ActorID: fmt.Sprintf("actor-%d", goroutineID),
|
||||
Version: int64(i + 1),
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Errorf("SaveEvent failed: %v", err)
|
||||
}
|
||||
}
|
||||
}(g)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Verify each actor has the correct number of events
|
||||
for g := 0; g < numGoroutines; g++ {
|
||||
actorID := fmt.Sprintf("actor-%d", g)
|
||||
events, err := store.GetEvents(actorID, 0)
|
||||
if err != nil {
|
||||
t.Errorf("GetEvents failed for %s: %v", actorID, err)
|
||||
continue
|
||||
}
|
||||
if len(events) != eventsPerGoroutine {
|
||||
t.Errorf("expected %d events for %s, got %d", eventsPerGoroutine, actorID, len(events))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentSaveAndGet(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Pre-populate with some events
|
||||
for i := 1; i <= 10; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: int64(i),
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
numReaders := 50
|
||||
numWriters := 10
|
||||
readsPerReader := 100
|
||||
writesPerWriter := 10
|
||||
|
||||
// Start readers
|
||||
wg.Add(numReaders)
|
||||
for r := 0; r < numReaders; r++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < readsPerReader; i++ {
|
||||
events, err := store.GetEvents("actor-123", 0)
|
||||
if err != nil {
|
||||
t.Errorf("GetEvents failed: %v", err)
|
||||
}
|
||||
if len(events) < 10 {
|
||||
t.Errorf("expected at least 10 events, got %d", len(events))
|
||||
}
|
||||
|
||||
_, err = store.GetLatestVersion("actor-123")
|
||||
if err != nil {
|
||||
t.Errorf("GetLatestVersion failed: %v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Start writers
|
||||
wg.Add(numWriters)
|
||||
for w := 0; w < numWriters; w++ {
|
||||
go func(writerID int) {
|
||||
defer wg.Done()
|
||||
for i := 0; i < writesPerWriter; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-writer-%d-%d", writerID, i),
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: int64(100 + writerID*writesPerWriter + i),
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Errorf("SaveEvent failed: %v", err)
|
||||
}
|
||||
}
|
||||
}(w)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Verify final state
|
||||
events, err := store.GetEvents("actor-123", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
|
||||
expectedTotal := 10 + numWriters*writesPerWriter
|
||||
if len(events) != expectedTotal {
|
||||
t.Errorf("expected %d total events, got %d", expectedTotal, len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentGetLatestVersion(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save initial event
|
||||
event := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: 100,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
numGoroutines := 100
|
||||
wg.Add(numGoroutines)
|
||||
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < 100; j++ {
|
||||
version, err := store.GetLatestVersion("actor-123")
|
||||
if err != nil {
|
||||
t.Errorf("GetLatestVersion failed: %v", err)
|
||||
}
|
||||
if version < 100 {
|
||||
t.Errorf("expected version >= 100, got %d", version)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestEventStoreInterface(t *testing.T) {
|
||||
// Verify InMemoryEventStore implements EventStore interface
|
||||
var _ aether.EventStore = (*InMemoryEventStore)(nil)
|
||||
}
|
||||
|
||||
func TestSaveEvent_NilData(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-nil",
|
||||
EventType: "NilDataEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: 1,
|
||||
Data: nil,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed with nil data: %v", err)
|
||||
}
|
||||
|
||||
events, err := store.GetEvents("actor-123", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
|
||||
if events[0].Data != nil {
|
||||
t.Errorf("expected nil Data, got %v", events[0].Data)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_EmptyData(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-empty",
|
||||
EventType: "EmptyDataEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed with empty data: %v", err)
|
||||
}
|
||||
|
||||
events, err := store.GetEvents("actor-123", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
|
||||
if len(events[0].Data) != 0 {
|
||||
t.Errorf("expected empty Data map, got %v", events[0].Data)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetEvents_VersionEdgeCases(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save events with edge case versions
|
||||
versions := []int64{0, 1, 9223372036854775807} // Zero, one, MaxInt64
|
||||
for i, version := range versions {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: version,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Test fromVersion 0 returns all
|
||||
events, err := store.GetEvents("actor-123", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
if len(events) != 3 {
|
||||
t.Errorf("expected 3 events, got %d", len(events))
|
||||
}
|
||||
|
||||
// Test fromVersion MaxInt64 returns only that event
|
||||
events, err = store.GetEvents("actor-123", 9223372036854775807)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
if len(events) != 1 {
|
||||
t.Errorf("expected 1 event, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetLatestVersion_VersionEdgeCases(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save event with MaxInt64 version
|
||||
event := &aether.Event{
|
||||
ID: "evt-max",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: 9223372036854775807,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
latestVersion, err := store.GetLatestVersion("actor-123")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestVersion failed: %v", err)
|
||||
}
|
||||
|
||||
if latestVersion != 9223372036854775807 {
|
||||
t.Errorf("expected MaxInt64 version, got %d", latestVersion)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_SpecialActorIDs(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
specialIDs := []string{
|
||||
"simple",
|
||||
"with-dashes",
|
||||
"with_underscores",
|
||||
"with.dots",
|
||||
"with:colons",
|
||||
"with/slashes",
|
||||
"user@example.com",
|
||||
"unicode-\u4e16\u754c",
|
||||
"",
|
||||
}
|
||||
|
||||
for _, actorID := range specialIDs {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%s", actorID),
|
||||
EventType: "TestEvent",
|
||||
ActorID: actorID,
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Errorf("SaveEvent failed for actorID %q: %v", actorID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
events, err := store.GetEvents(actorID, 0)
|
||||
if err != nil {
|
||||
t.Errorf("GetEvents failed for actorID %q: %v", actorID, err)
|
||||
continue
|
||||
}
|
||||
if len(events) != 1 {
|
||||
t.Errorf("expected 1 event for actorID %q, got %d", actorID, len(events))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSaveEvent(b *testing.B) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "BenchmarkEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: int64(i + 1),
|
||||
Data: map[string]interface{}{"value": i},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
store.SaveEvent(event)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGetEvents(b *testing.B) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Pre-populate with events
|
||||
for i := 0; i < 1000; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "BenchmarkEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: int64(i + 1),
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
store.SaveEvent(event)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
store.GetEvents("actor-123", 0)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGetLatestVersion(b *testing.B) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Pre-populate with events
|
||||
for i := 0; i < 1000; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "BenchmarkEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: int64(i + 1),
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
store.SaveEvent(event)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
store.GetLatestVersion("actor-123")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user