All checks were successful
CI / build (pull_request) Successful in 15s
- Add Metadata field (map[string]string) to Event struct with omitempty - Add helper methods for common metadata: SetCorrelationID/GetCorrelationID, SetCausationID/GetCausationID, SetUserID/GetUserID, SetTraceID/GetTraceID, SetSpanID/GetSpanID - Add WithMetadataFrom helper for copying metadata between events - Add metadata key constants for standard fields - Add comprehensive unit tests for metadata serialization and helpers - Add store tests verifying metadata persistence Closes #7 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1710 lines
43 KiB
Go
1710 lines
43 KiB
Go
package store
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.flowmade.one/flowmade-one/aether"
|
|
)
|
|
|
|
// === Event Store Tests (from main branch) ===
|
|
|
|
func TestNewInMemoryEventStore(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
if store == nil {
|
|
t.Fatal("NewInMemoryEventStore returned nil")
|
|
}
|
|
if store.events == nil {
|
|
t.Error("events map is nil")
|
|
}
|
|
}
|
|
|
|
func TestSaveEvent_SingleEvent(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
event := &aether.Event{
|
|
ID: "evt-123",
|
|
EventType: "OrderPlaced",
|
|
ActorID: "order-456",
|
|
Version: 1,
|
|
Data: map[string]interface{}{
|
|
"total": 100.50,
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
err := store.SaveEvent(event)
|
|
if err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
// Verify event was persisted
|
|
events, err := store.GetEvents("order-456", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
if len(events) != 1 {
|
|
t.Fatalf("expected 1 event, got %d", len(events))
|
|
}
|
|
if events[0].ID != "evt-123" {
|
|
t.Errorf("event ID mismatch: got %q, want %q", events[0].ID, "evt-123")
|
|
}
|
|
}
|
|
|
|
func TestSaveEvent_MultipleEvents(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
for i := 1; i <= 5; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "OrderUpdated",
|
|
ActorID: "order-456",
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed for event %d: %v", i, err)
|
|
}
|
|
}
|
|
|
|
events, err := store.GetEvents("order-456", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
if len(events) != 5 {
|
|
t.Errorf("expected 5 events, got %d", len(events))
|
|
}
|
|
}
|
|
|
|
func TestSaveEvent_MultipleActors(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save events for different actors
|
|
actors := []string{"actor-1", "actor-2", "actor-3"}
|
|
for _, actorID := range actors {
|
|
for i := 1; i <= 3; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%s-%d", actorID, i),
|
|
EventType: "TestEvent",
|
|
ActorID: actorID,
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Verify each actor has its own events
|
|
for _, actorID := range actors {
|
|
events, err := store.GetEvents(actorID, 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed for %s: %v", actorID, err)
|
|
}
|
|
if len(events) != 3 {
|
|
t.Errorf("expected 3 events for %s, got %d", actorID, len(events))
|
|
}
|
|
for _, event := range events {
|
|
if event.ActorID != actorID {
|
|
t.Errorf("event has wrong ActorID: got %q, want %q", event.ActorID, actorID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSaveEvent_PreservesAllFields(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
ts := time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC)
|
|
event := &aether.Event{
|
|
ID: "evt-123",
|
|
EventType: "OrderPlaced",
|
|
ActorID: "order-456",
|
|
CommandID: "cmd-789",
|
|
Version: 42,
|
|
Data: map[string]interface{}{
|
|
"total": 100.50,
|
|
"currency": "USD",
|
|
},
|
|
Timestamp: ts,
|
|
}
|
|
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
events, err := store.GetEvents("order-456", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
|
|
retrieved := events[0]
|
|
if retrieved.ID != event.ID {
|
|
t.Errorf("ID mismatch: got %q, want %q", retrieved.ID, event.ID)
|
|
}
|
|
if retrieved.EventType != event.EventType {
|
|
t.Errorf("EventType mismatch: got %q, want %q", retrieved.EventType, event.EventType)
|
|
}
|
|
if retrieved.ActorID != event.ActorID {
|
|
t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, event.ActorID)
|
|
}
|
|
if retrieved.CommandID != event.CommandID {
|
|
t.Errorf("CommandID mismatch: got %q, want %q", retrieved.CommandID, event.CommandID)
|
|
}
|
|
if retrieved.Version != event.Version {
|
|
t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, event.Version)
|
|
}
|
|
if !retrieved.Timestamp.Equal(event.Timestamp) {
|
|
t.Errorf("Timestamp mismatch: got %v, want %v", retrieved.Timestamp, event.Timestamp)
|
|
}
|
|
if retrieved.Data["total"] != event.Data["total"] {
|
|
t.Errorf("Data.total mismatch: got %v, want %v", retrieved.Data["total"], event.Data["total"])
|
|
}
|
|
if retrieved.Data["currency"] != event.Data["currency"] {
|
|
t.Errorf("Data.currency mismatch: got %v, want %v", retrieved.Data["currency"], event.Data["currency"])
|
|
}
|
|
}
|
|
|
|
func TestGetEvents_RetrievesInOrder(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save events in order
|
|
for i := 1; i <= 10; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
|
|
events, err := store.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
|
|
// Verify events are returned in insertion order
|
|
for i, event := range events {
|
|
expectedID := fmt.Sprintf("evt-%d", i+1)
|
|
if event.ID != expectedID {
|
|
t.Errorf("event %d: got ID %q, want %q", i, event.ID, expectedID)
|
|
}
|
|
expectedVersion := int64(i + 1)
|
|
if event.Version != expectedVersion {
|
|
t.Errorf("event %d: got Version %d, want %d", i, event.Version, expectedVersion)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestGetEvents_FromVersionFilters(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save events with versions 1-10
|
|
for i := 1; i <= 10; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
|
|
testCases := []struct {
|
|
name string
|
|
fromVersion int64
|
|
expected int
|
|
minVersion int64
|
|
}{
|
|
{"from version 0", 0, 10, 1},
|
|
{"from version 1", 1, 10, 1},
|
|
{"from version 5", 5, 6, 5},
|
|
{"from version 10", 10, 1, 10},
|
|
{"from version 11", 11, 0, 0},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
events, err := store.GetEvents("actor-123", tc.fromVersion)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
|
|
if len(events) != tc.expected {
|
|
t.Errorf("expected %d events, got %d", tc.expected, len(events))
|
|
}
|
|
|
|
// Verify all returned events have version >= fromVersion
|
|
for _, event := range events {
|
|
if event.Version < tc.fromVersion {
|
|
t.Errorf("event version %d is less than fromVersion %d", event.Version, tc.fromVersion)
|
|
}
|
|
}
|
|
|
|
// Verify minimum version if events exist
|
|
if len(events) > 0 && events[0].Version != tc.minVersion {
|
|
t.Errorf("first event version: got %d, want %d", events[0].Version, tc.minVersion)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestGetEvents_FromVersionZero(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
event := &aether.Event{
|
|
ID: "evt-1",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
// fromVersion 0 should return all events
|
|
events, err := store.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
if len(events) != 1 {
|
|
t.Errorf("expected 1 event with fromVersion 0, got %d", len(events))
|
|
}
|
|
}
|
|
|
|
func TestGetLatestVersion_ReturnsCorrectVersion(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save events with various versions
|
|
versions := []int64{1, 3, 2, 5, 4} // Out of order
|
|
for i, version := range versions {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: version,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
|
|
latestVersion, err := store.GetLatestVersion("actor-123")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestVersion failed: %v", err)
|
|
}
|
|
|
|
// Should return the highest version (5)
|
|
if latestVersion != 5 {
|
|
t.Errorf("expected latest version 5, got %d", latestVersion)
|
|
}
|
|
}
|
|
|
|
func TestGetLatestVersion_SingleEvent(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
event := &aether.Event{
|
|
ID: "evt-1",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 42,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
latestVersion, err := store.GetLatestVersion("actor-123")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestVersion failed: %v", err)
|
|
}
|
|
|
|
if latestVersion != 42 {
|
|
t.Errorf("expected latest version 42, got %d", latestVersion)
|
|
}
|
|
}
|
|
|
|
func TestGetLatestVersion_UpdatesAfterNewEvent(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save first event
|
|
event1 := &aether.Event{
|
|
ID: "evt-1",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event1); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
version1, err := store.GetLatestVersion("actor-123")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestVersion failed: %v", err)
|
|
}
|
|
if version1 != 1 {
|
|
t.Errorf("expected version 1, got %d", version1)
|
|
}
|
|
|
|
// Save second event with higher version
|
|
event2 := &aether.Event{
|
|
ID: "evt-2",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 10,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event2); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
version2, err := store.GetLatestVersion("actor-123")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestVersion failed: %v", err)
|
|
}
|
|
if version2 != 10 {
|
|
t.Errorf("expected version 10, got %d", version2)
|
|
}
|
|
}
|
|
|
|
func TestGetEvents_NonExistentActor(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save event for one actor
|
|
event := &aether.Event{
|
|
ID: "evt-1",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
// Get events for non-existent actor
|
|
events, err := store.GetEvents("non-existent-actor", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents should not error for non-existent actor: %v", err)
|
|
}
|
|
|
|
if events == nil {
|
|
t.Error("GetEvents should return non-nil slice for non-existent actor")
|
|
}
|
|
if len(events) != 0 {
|
|
t.Errorf("expected 0 events for non-existent actor, got %d", len(events))
|
|
}
|
|
}
|
|
|
|
func TestGetLatestVersion_NonExistentActor(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Get latest version for non-existent actor
|
|
version, err := store.GetLatestVersion("non-existent-actor")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestVersion should not error for non-existent actor: %v", err)
|
|
}
|
|
|
|
if version != 0 {
|
|
t.Errorf("expected version 0 for non-existent actor, got %d", version)
|
|
}
|
|
}
|
|
|
|
func TestGetLatestVersion_EmptyStore(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
version, err := store.GetLatestVersion("any-actor")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestVersion should not error for empty store: %v", err)
|
|
}
|
|
|
|
if version != 0 {
|
|
t.Errorf("expected version 0 for empty store, got %d", version)
|
|
}
|
|
}
|
|
|
|
func TestGetEvents_EmptyStore(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
events, err := store.GetEvents("any-actor", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents should not error for empty store: %v", err)
|
|
}
|
|
|
|
if events == nil {
|
|
t.Error("GetEvents should return non-nil slice for empty store")
|
|
}
|
|
if len(events) != 0 {
|
|
t.Errorf("expected 0 events for empty store, got %d", len(events))
|
|
}
|
|
}
|
|
|
|
func TestConcurrentSaveEvent(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
numGoroutines := 100
|
|
eventsPerGoroutine := 10
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(numGoroutines)
|
|
|
|
for g := 0; g < numGoroutines; g++ {
|
|
go func(goroutineID int) {
|
|
defer wg.Done()
|
|
for i := 0; i < eventsPerGoroutine; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d-%d", goroutineID, i),
|
|
EventType: "TestEvent",
|
|
ActorID: fmt.Sprintf("actor-%d", goroutineID),
|
|
Version: int64(i + 1),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Errorf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
}(g)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Verify each actor has the correct number of events
|
|
for g := 0; g < numGoroutines; g++ {
|
|
actorID := fmt.Sprintf("actor-%d", g)
|
|
events, err := store.GetEvents(actorID, 0)
|
|
if err != nil {
|
|
t.Errorf("GetEvents failed for %s: %v", actorID, err)
|
|
continue
|
|
}
|
|
if len(events) != eventsPerGoroutine {
|
|
t.Errorf("expected %d events for %s, got %d", eventsPerGoroutine, actorID, len(events))
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConcurrentSaveAndGet(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Pre-populate with some events
|
|
for i := 1; i <= 10; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
numReaders := 50
|
|
numWriters := 10
|
|
readsPerReader := 100
|
|
writesPerWriter := 10
|
|
|
|
// Start readers
|
|
wg.Add(numReaders)
|
|
for r := 0; r < numReaders; r++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for i := 0; i < readsPerReader; i++ {
|
|
events, err := store.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Errorf("GetEvents failed: %v", err)
|
|
}
|
|
if len(events) < 10 {
|
|
t.Errorf("expected at least 10 events, got %d", len(events))
|
|
}
|
|
|
|
_, err = store.GetLatestVersion("actor-123")
|
|
if err != nil {
|
|
t.Errorf("GetLatestVersion failed: %v", err)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Start writers
|
|
wg.Add(numWriters)
|
|
for w := 0; w < numWriters; w++ {
|
|
go func(writerID int) {
|
|
defer wg.Done()
|
|
for i := 0; i < writesPerWriter; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-writer-%d-%d", writerID, i),
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: int64(100 + writerID*writesPerWriter + i),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Errorf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
}(w)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Verify final state
|
|
events, err := store.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
|
|
expectedTotal := 10 + numWriters*writesPerWriter
|
|
if len(events) != expectedTotal {
|
|
t.Errorf("expected %d total events, got %d", expectedTotal, len(events))
|
|
}
|
|
}
|
|
|
|
func TestConcurrentGetLatestVersion(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save initial event
|
|
event := &aether.Event{
|
|
ID: "evt-1",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 100,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
numGoroutines := 100
|
|
wg.Add(numGoroutines)
|
|
|
|
for i := 0; i < numGoroutines; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for j := 0; j < 100; j++ {
|
|
version, err := store.GetLatestVersion("actor-123")
|
|
if err != nil {
|
|
t.Errorf("GetLatestVersion failed: %v", err)
|
|
}
|
|
if version < 100 {
|
|
t.Errorf("expected version >= 100, got %d", version)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestEventStoreInterface(t *testing.T) {
|
|
// Verify InMemoryEventStore implements EventStore interface
|
|
var _ aether.EventStore = (*InMemoryEventStore)(nil)
|
|
}
|
|
|
|
func TestSaveEvent_NilData(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
event := &aether.Event{
|
|
ID: "evt-nil",
|
|
EventType: "NilDataEvent",
|
|
ActorID: "actor-123",
|
|
Version: 1,
|
|
Data: nil,
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed with nil data: %v", err)
|
|
}
|
|
|
|
events, err := store.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
|
|
if events[0].Data != nil {
|
|
t.Errorf("expected nil Data, got %v", events[0].Data)
|
|
}
|
|
}
|
|
|
|
func TestSaveEvent_EmptyData(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
event := &aether.Event{
|
|
ID: "evt-empty",
|
|
EventType: "EmptyDataEvent",
|
|
ActorID: "actor-123",
|
|
Version: 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed with empty data: %v", err)
|
|
}
|
|
|
|
events, err := store.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
|
|
if len(events[0].Data) != 0 {
|
|
t.Errorf("expected empty Data map, got %v", events[0].Data)
|
|
}
|
|
}
|
|
|
|
func TestGetEvents_VersionEdgeCases(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save events with edge case versions
|
|
versions := []int64{0, 1, 9223372036854775807} // Zero, one, MaxInt64
|
|
for i, version := range versions {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: version,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
|
|
// Test fromVersion 0 returns all
|
|
events, err := store.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
if len(events) != 3 {
|
|
t.Errorf("expected 3 events, got %d", len(events))
|
|
}
|
|
|
|
// Test fromVersion MaxInt64 returns only that event
|
|
events, err = store.GetEvents("actor-123", 9223372036854775807)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
if len(events) != 1 {
|
|
t.Errorf("expected 1 event, got %d", len(events))
|
|
}
|
|
}
|
|
|
|
func TestGetLatestVersion_VersionEdgeCases(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save event with MaxInt64 version
|
|
event := &aether.Event{
|
|
ID: "evt-max",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 9223372036854775807,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
latestVersion, err := store.GetLatestVersion("actor-123")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestVersion failed: %v", err)
|
|
}
|
|
|
|
if latestVersion != 9223372036854775807 {
|
|
t.Errorf("expected MaxInt64 version, got %d", latestVersion)
|
|
}
|
|
}
|
|
|
|
func TestSaveEvent_SpecialActorIDs(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
specialIDs := []string{
|
|
"simple",
|
|
"with-dashes",
|
|
"with_underscores",
|
|
"with.dots",
|
|
"with:colons",
|
|
"with/slashes",
|
|
"user@example.com",
|
|
"unicode-世界",
|
|
"",
|
|
}
|
|
|
|
for _, actorID := range specialIDs {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%s", actorID),
|
|
EventType: "TestEvent",
|
|
ActorID: actorID,
|
|
Version: 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Errorf("SaveEvent failed for actorID %q: %v", actorID, err)
|
|
continue
|
|
}
|
|
|
|
events, err := store.GetEvents(actorID, 0)
|
|
if err != nil {
|
|
t.Errorf("GetEvents failed for actorID %q: %v", actorID, err)
|
|
continue
|
|
}
|
|
if len(events) != 1 {
|
|
t.Errorf("expected 1 event for actorID %q, got %d", actorID, len(events))
|
|
}
|
|
}
|
|
}
|
|
|
|
func BenchmarkSaveEvent(b *testing.B) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "BenchmarkEvent",
|
|
ActorID: "actor-123",
|
|
Version: int64(i + 1),
|
|
Data: map[string]interface{}{"value": i},
|
|
Timestamp: time.Now(),
|
|
}
|
|
store.SaveEvent(event)
|
|
}
|
|
}
|
|
|
|
func BenchmarkGetEvents(b *testing.B) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Pre-populate with events
|
|
for i := 0; i < 1000; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "BenchmarkEvent",
|
|
ActorID: "actor-123",
|
|
Version: int64(i + 1),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
store.SaveEvent(event)
|
|
}
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
store.GetEvents("actor-123", 0)
|
|
}
|
|
}
|
|
|
|
func BenchmarkGetLatestVersion(b *testing.B) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Pre-populate with events
|
|
for i := 0; i < 1000; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "BenchmarkEvent",
|
|
ActorID: "actor-123",
|
|
Version: int64(i + 1),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
store.SaveEvent(event)
|
|
}
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
store.GetLatestVersion("actor-123")
|
|
}
|
|
}
|
|
|
|
// === Snapshot Store Tests (from PR branch) ===
|
|
|
|
func TestSaveSnapshot_PersistsCorrectly(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-123",
|
|
Version: 10,
|
|
State: map[string]interface{}{
|
|
"balance": 100.50,
|
|
"status": "active",
|
|
},
|
|
Timestamp: time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC),
|
|
}
|
|
|
|
err := store.SaveSnapshot(snapshot)
|
|
if err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
// Verify snapshot was persisted by retrieving it
|
|
retrieved, err := store.GetLatestSnapshot("actor-123")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if retrieved == nil {
|
|
t.Fatal("expected snapshot to be persisted, got nil")
|
|
}
|
|
|
|
if retrieved.ActorID != snapshot.ActorID {
|
|
t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, snapshot.ActorID)
|
|
}
|
|
if retrieved.Version != snapshot.Version {
|
|
t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, snapshot.Version)
|
|
}
|
|
if retrieved.State["balance"] != snapshot.State["balance"] {
|
|
t.Errorf("State.balance mismatch: got %v, want %v", retrieved.State["balance"], snapshot.State["balance"])
|
|
}
|
|
if retrieved.State["status"] != snapshot.State["status"] {
|
|
t.Errorf("State.status mismatch: got %v, want %v", retrieved.State["status"], snapshot.State["status"])
|
|
}
|
|
if !retrieved.Timestamp.Equal(snapshot.Timestamp) {
|
|
t.Errorf("Timestamp mismatch: got %v, want %v", retrieved.Timestamp, snapshot.Timestamp)
|
|
}
|
|
}
|
|
|
|
func TestSaveSnapshot_NilSnapshot(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
err := store.SaveSnapshot(nil)
|
|
if err == nil {
|
|
t.Error("expected error when saving nil snapshot, got nil")
|
|
}
|
|
}
|
|
|
|
func TestSaveSnapshot_MultipleSnapshots(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save multiple snapshots for the same actor
|
|
for i := 1; i <= 5; i++ {
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-multi",
|
|
Version: int64(i * 10),
|
|
State: map[string]interface{}{
|
|
"iteration": i,
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed for version %d: %v", i*10, err)
|
|
}
|
|
}
|
|
|
|
// Verify all snapshots were saved by checking the latest
|
|
retrieved, err := store.GetLatestSnapshot("actor-multi")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if retrieved.Version != 50 {
|
|
t.Errorf("expected latest version 50, got %d", retrieved.Version)
|
|
}
|
|
}
|
|
|
|
func TestGetLatestSnapshot_ReturnsMostRecent(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save snapshots in non-sequential order to test version comparison
|
|
versions := []int64{5, 15, 10, 25, 20}
|
|
for _, v := range versions {
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-latest",
|
|
Version: v,
|
|
State: map[string]interface{}{
|
|
"version": v,
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed for version %d: %v", v, err)
|
|
}
|
|
}
|
|
|
|
latest, err := store.GetLatestSnapshot("actor-latest")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if latest == nil {
|
|
t.Fatal("expected snapshot, got nil")
|
|
}
|
|
|
|
if latest.Version != 25 {
|
|
t.Errorf("expected latest version 25, got %d", latest.Version)
|
|
}
|
|
|
|
// Verify the state matches the snapshot with version 25
|
|
if latest.State["version"].(int64) != 25 {
|
|
t.Errorf("expected state.version to be 25, got %v", latest.State["version"])
|
|
}
|
|
}
|
|
|
|
func TestGetLatestSnapshot_NoSnapshotExists(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Query for a non-existent actor
|
|
snapshot, err := store.GetLatestSnapshot("non-existent-actor")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if snapshot != nil {
|
|
t.Errorf("expected nil for non-existent actor, got %+v", snapshot)
|
|
}
|
|
}
|
|
|
|
func TestGetLatestSnapshot_EmptyActorID(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save a snapshot with empty actor ID
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "",
|
|
Version: 1,
|
|
State: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
// Retrieve with empty actor ID
|
|
retrieved, err := store.GetLatestSnapshot("")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if retrieved == nil {
|
|
t.Fatal("expected snapshot with empty actorID, got nil")
|
|
}
|
|
}
|
|
|
|
func TestSnapshotVersioning_RespectedAcrossActors(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save snapshots for different actors
|
|
actors := []string{"actor-a", "actor-b", "actor-c"}
|
|
for i, actorID := range actors {
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: actorID,
|
|
Version: int64((i + 1) * 100), // Different versions per actor
|
|
State: map[string]interface{}{
|
|
"actor": actorID,
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed for %s: %v", actorID, err)
|
|
}
|
|
}
|
|
|
|
// Verify each actor has their own snapshot with correct version
|
|
for i, actorID := range actors {
|
|
snapshot, err := store.GetLatestSnapshot(actorID)
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed for %s: %v", actorID, err)
|
|
}
|
|
|
|
expectedVersion := int64((i + 1) * 100)
|
|
if snapshot.Version != expectedVersion {
|
|
t.Errorf("actor %s: expected version %d, got %d", actorID, expectedVersion, snapshot.Version)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSnapshotVersioning_LowerVersionAfterHigher(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save a high version first
|
|
highSnapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-order",
|
|
Version: 100,
|
|
State: map[string]interface{}{
|
|
"marker": "high",
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveSnapshot(highSnapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
// Save a lower version after
|
|
lowSnapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-order",
|
|
Version: 50,
|
|
State: map[string]interface{}{
|
|
"marker": "low",
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveSnapshot(lowSnapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
// GetLatestSnapshot should return the higher version (100), not the most recently saved
|
|
latest, err := store.GetLatestSnapshot("actor-order")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if latest.Version != 100 {
|
|
t.Errorf("expected version 100, got %d", latest.Version)
|
|
}
|
|
if latest.State["marker"] != "high" {
|
|
t.Errorf("expected marker 'high', got %v", latest.State["marker"])
|
|
}
|
|
}
|
|
|
|
func TestSnapshotDataIntegrity_ComplexState(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
complexState := map[string]interface{}{
|
|
"string": "hello",
|
|
"integer": 42,
|
|
"float": 3.14159,
|
|
"boolean": true,
|
|
"null": nil,
|
|
"array": []interface{}{"a", "b", "c"},
|
|
"nested": map[string]interface{}{
|
|
"level1": map[string]interface{}{
|
|
"level2": "deep value",
|
|
},
|
|
},
|
|
}
|
|
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-complex",
|
|
Version: 1,
|
|
State: complexState,
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
retrieved, err := store.GetLatestSnapshot("actor-complex")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
// Verify all fields
|
|
if retrieved.State["string"] != "hello" {
|
|
t.Errorf("string mismatch: got %v", retrieved.State["string"])
|
|
}
|
|
if retrieved.State["integer"] != 42 {
|
|
t.Errorf("integer mismatch: got %v", retrieved.State["integer"])
|
|
}
|
|
if retrieved.State["float"] != 3.14159 {
|
|
t.Errorf("float mismatch: got %v", retrieved.State["float"])
|
|
}
|
|
if retrieved.State["boolean"] != true {
|
|
t.Errorf("boolean mismatch: got %v", retrieved.State["boolean"])
|
|
}
|
|
if retrieved.State["null"] != nil {
|
|
t.Errorf("null mismatch: got %v", retrieved.State["null"])
|
|
}
|
|
|
|
// Verify array
|
|
arr, ok := retrieved.State["array"].([]interface{})
|
|
if !ok {
|
|
t.Fatal("array is not []interface{}")
|
|
}
|
|
if len(arr) != 3 || arr[0] != "a" || arr[1] != "b" || arr[2] != "c" {
|
|
t.Errorf("array mismatch: got %v", arr)
|
|
}
|
|
|
|
// Verify nested structure
|
|
nested, ok := retrieved.State["nested"].(map[string]interface{})
|
|
if !ok {
|
|
t.Fatal("nested is not map[string]interface{}")
|
|
}
|
|
level1, ok := nested["level1"].(map[string]interface{})
|
|
if !ok {
|
|
t.Fatal("level1 is not map[string]interface{}")
|
|
}
|
|
if level1["level2"] != "deep value" {
|
|
t.Errorf("nested value mismatch: got %v", level1["level2"])
|
|
}
|
|
}
|
|
|
|
func TestSnapshotDataIntegrity_SpecialCharacters(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
specialState := map[string]interface{}{
|
|
"unicode": "Hello, 世界!",
|
|
"emoji": "😀🚀",
|
|
"newlines": "line1\nline2\r\nline3",
|
|
"tabs": "col1\tcol2",
|
|
"quotes": `"double" and 'single'`,
|
|
"backslash": `path\to\file`,
|
|
}
|
|
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-special",
|
|
Version: 1,
|
|
State: specialState,
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
retrieved, err := store.GetLatestSnapshot("actor-special")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
for key, expected := range specialState {
|
|
if retrieved.State[key] != expected {
|
|
t.Errorf("State[%q] mismatch: got %q, want %q", key, retrieved.State[key], expected)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSnapshotDataIntegrity_EmptyState(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-empty",
|
|
Version: 1,
|
|
State: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
retrieved, err := store.GetLatestSnapshot("actor-empty")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if len(retrieved.State) != 0 {
|
|
t.Errorf("expected empty state, got %v", retrieved.State)
|
|
}
|
|
}
|
|
|
|
func TestSnapshotDataIntegrity_NilState(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-nil",
|
|
Version: 1,
|
|
State: nil,
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
retrieved, err := store.GetLatestSnapshot("actor-nil")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if retrieved.State != nil {
|
|
t.Errorf("expected nil state, got %v", retrieved.State)
|
|
}
|
|
}
|
|
|
|
func TestSnapshotDataIntegrity_LargeState(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Create a large state with many entries using unique keys
|
|
largeState := make(map[string]interface{})
|
|
for i := 0; i < 1000; i++ {
|
|
largeState[fmt.Sprintf("key-%d", i)] = i
|
|
}
|
|
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-large",
|
|
Version: 1,
|
|
State: largeState,
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
retrieved, err := store.GetLatestSnapshot("actor-large")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if len(retrieved.State) != len(largeState) {
|
|
t.Errorf("state size mismatch: got %d, want %d", len(retrieved.State), len(largeState))
|
|
}
|
|
}
|
|
|
|
func TestSnapshotDataIntegrity_TimestampPreserved(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Test various timestamps
|
|
timestamps := []time.Time{
|
|
time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC),
|
|
time.Date(2020, 6, 15, 23, 59, 59, 999999999, time.UTC),
|
|
time.Time{}, // Zero time
|
|
}
|
|
|
|
for i, ts := range timestamps {
|
|
actorID := fmt.Sprintf("actor-ts-%d", i)
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: actorID,
|
|
Version: 1,
|
|
State: map[string]interface{}{},
|
|
Timestamp: ts,
|
|
}
|
|
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
retrieved, err := store.GetLatestSnapshot(actorID)
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if !retrieved.Timestamp.Equal(ts) {
|
|
t.Errorf("timestamp %d mismatch: got %v, want %v", i, retrieved.Timestamp, ts)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSnapshotVersioning_ZeroVersion(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-zero-version",
|
|
Version: 0,
|
|
State: map[string]interface{}{"initial": true},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
retrieved, err := store.GetLatestSnapshot("actor-zero-version")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if retrieved.Version != 0 {
|
|
t.Errorf("expected version 0, got %d", retrieved.Version)
|
|
}
|
|
}
|
|
|
|
func TestSnapshotVersioning_LargeVersion(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
largeVersion := int64(9223372036854775807) // MaxInt64
|
|
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-large-version",
|
|
Version: largeVersion,
|
|
State: map[string]interface{}{"maxed": true},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
retrieved, err := store.GetLatestSnapshot("actor-large-version")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if retrieved.Version != largeVersion {
|
|
t.Errorf("expected version %d, got %d", largeVersion, retrieved.Version)
|
|
}
|
|
}
|
|
|
|
func TestSnapshotStore_ImplementsInterface(t *testing.T) {
|
|
// Verify InMemoryEventStore implements SnapshotStore interface
|
|
var _ aether.SnapshotStore = (*InMemoryEventStore)(nil)
|
|
}
|
|
|
|
func TestConcurrentSaveSnapshot(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
numGoroutines := 100
|
|
snapshotsPerGoroutine := 10
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(numGoroutines)
|
|
|
|
for g := 0; g < numGoroutines; g++ {
|
|
go func(goroutineID int) {
|
|
defer wg.Done()
|
|
for i := 0; i < snapshotsPerGoroutine; i++ {
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: fmt.Sprintf("actor-%d", goroutineID),
|
|
Version: int64(i + 1),
|
|
State: map[string]interface{}{
|
|
"goroutine": goroutineID,
|
|
"iteration": i,
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Errorf("SaveSnapshot failed: %v", err)
|
|
}
|
|
}
|
|
}(g)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Verify each actor has snapshots
|
|
for g := 0; g < numGoroutines; g++ {
|
|
actorID := fmt.Sprintf("actor-%d", g)
|
|
snapshot, err := store.GetLatestSnapshot(actorID)
|
|
if err != nil {
|
|
t.Errorf("GetLatestSnapshot failed for %s: %v", actorID, err)
|
|
continue
|
|
}
|
|
if snapshot == nil {
|
|
t.Errorf("expected snapshot for %s, got nil", actorID)
|
|
continue
|
|
}
|
|
if snapshot.Version != int64(snapshotsPerGoroutine) {
|
|
t.Errorf("expected latest version %d for %s, got %d", snapshotsPerGoroutine, actorID, snapshot.Version)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConcurrentSaveAndGetSnapshot(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Pre-populate with initial snapshot
|
|
initialSnapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-123",
|
|
Version: 1,
|
|
State: map[string]interface{}{
|
|
"initial": true,
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveSnapshot(initialSnapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
numReaders := 50
|
|
numWriters := 10
|
|
readsPerReader := 100
|
|
writesPerWriter := 10
|
|
|
|
// Start readers
|
|
wg.Add(numReaders)
|
|
for r := 0; r < numReaders; r++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for i := 0; i < readsPerReader; i++ {
|
|
snapshot, err := store.GetLatestSnapshot("actor-123")
|
|
if err != nil {
|
|
t.Errorf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
if snapshot == nil {
|
|
t.Error("expected snapshot, got nil")
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Start writers
|
|
wg.Add(numWriters)
|
|
for w := 0; w < numWriters; w++ {
|
|
go func(writerID int) {
|
|
defer wg.Done()
|
|
for i := 0; i < writesPerWriter; i++ {
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-123",
|
|
Version: int64(100 + writerID*writesPerWriter + i),
|
|
State: map[string]interface{}{
|
|
"writer": writerID,
|
|
"index": i,
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Errorf("SaveSnapshot failed: %v", err)
|
|
}
|
|
}
|
|
}(w)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Verify final state - should have the highest version
|
|
snapshot, err := store.GetLatestSnapshot("actor-123")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
if snapshot == nil {
|
|
t.Fatal("expected snapshot, got nil")
|
|
}
|
|
// The highest version should be around 100 + (numWriters-1)*writesPerWriter + (writesPerWriter-1)
|
|
// which is 100 + 9*10 + 9 = 199
|
|
expectedMaxVersion := int64(100 + (numWriters-1)*writesPerWriter + (writesPerWriter - 1))
|
|
if snapshot.Version != expectedMaxVersion {
|
|
t.Errorf("expected latest version %d, got %d", expectedMaxVersion, snapshot.Version)
|
|
}
|
|
}
|
|
|
|
// === Event Metadata Persistence Tests ===
|
|
|
|
func TestSaveEvent_WithMetadata(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
event := &aether.Event{
|
|
ID: "evt-meta",
|
|
EventType: "OrderPlaced",
|
|
ActorID: "order-456",
|
|
Version: 1,
|
|
Data: map[string]interface{}{
|
|
"total": 100.50,
|
|
},
|
|
Metadata: map[string]string{
|
|
"correlationId": "corr-123",
|
|
"causationId": "cause-456",
|
|
"userId": "user-789",
|
|
"traceId": "trace-abc",
|
|
"spanId": "span-def",
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
err := store.SaveEvent(event)
|
|
if err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
// Retrieve and verify metadata is persisted
|
|
events, err := store.GetEvents("order-456", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
if len(events) != 1 {
|
|
t.Fatalf("expected 1 event, got %d", len(events))
|
|
}
|
|
|
|
retrieved := events[0]
|
|
if retrieved.Metadata == nil {
|
|
t.Fatal("expected Metadata to be persisted")
|
|
}
|
|
if retrieved.Metadata["correlationId"] != "corr-123" {
|
|
t.Errorf("correlationId mismatch: got %q, want %q", retrieved.Metadata["correlationId"], "corr-123")
|
|
}
|
|
if retrieved.Metadata["causationId"] != "cause-456" {
|
|
t.Errorf("causationId mismatch: got %q, want %q", retrieved.Metadata["causationId"], "cause-456")
|
|
}
|
|
if retrieved.Metadata["userId"] != "user-789" {
|
|
t.Errorf("userId mismatch: got %q, want %q", retrieved.Metadata["userId"], "user-789")
|
|
}
|
|
if retrieved.Metadata["traceId"] != "trace-abc" {
|
|
t.Errorf("traceId mismatch: got %q, want %q", retrieved.Metadata["traceId"], "trace-abc")
|
|
}
|
|
if retrieved.Metadata["spanId"] != "span-def" {
|
|
t.Errorf("spanId mismatch: got %q, want %q", retrieved.Metadata["spanId"], "span-def")
|
|
}
|
|
}
|
|
|
|
func TestSaveEvent_WithNilMetadata(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
event := &aether.Event{
|
|
ID: "evt-nil-meta",
|
|
EventType: "OrderPlaced",
|
|
ActorID: "order-456",
|
|
Version: 1,
|
|
Data: map[string]interface{}{},
|
|
Metadata: nil,
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
err := store.SaveEvent(event)
|
|
if err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
events, err := store.GetEvents("order-456", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
if len(events) != 1 {
|
|
t.Fatalf("expected 1 event, got %d", len(events))
|
|
}
|
|
|
|
// Nil metadata should remain nil
|
|
if events[0].Metadata != nil {
|
|
t.Errorf("expected nil Metadata, got %v", events[0].Metadata)
|
|
}
|
|
}
|
|
|
|
func TestSaveEvent_WithEmptyMetadata(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
event := &aether.Event{
|
|
ID: "evt-empty-meta",
|
|
EventType: "OrderPlaced",
|
|
ActorID: "order-456",
|
|
Version: 1,
|
|
Data: map[string]interface{}{},
|
|
Metadata: map[string]string{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
err := store.SaveEvent(event)
|
|
if err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
events, err := store.GetEvents("order-456", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
if len(events) != 1 {
|
|
t.Fatalf("expected 1 event, got %d", len(events))
|
|
}
|
|
|
|
// Empty metadata should be preserved (as empty map)
|
|
if events[0].Metadata == nil {
|
|
t.Error("expected empty Metadata map to be preserved, got nil")
|
|
}
|
|
if len(events[0].Metadata) != 0 {
|
|
t.Errorf("expected empty Metadata, got %d entries", len(events[0].Metadata))
|
|
}
|
|
}
|
|
|
|
func TestSaveEvent_MetadataWithHelpers(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
event := &aether.Event{
|
|
ID: "evt-helpers",
|
|
EventType: "OrderPlaced",
|
|
ActorID: "order-456",
|
|
Version: 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
// Use helper methods to set metadata
|
|
event.SetCorrelationID("corr-helper")
|
|
event.SetCausationID("cause-helper")
|
|
event.SetUserID("user-helper")
|
|
event.SetTraceID("trace-helper")
|
|
event.SetSpanID("span-helper")
|
|
|
|
err := store.SaveEvent(event)
|
|
if err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
events, err := store.GetEvents("order-456", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
if len(events) != 1 {
|
|
t.Fatalf("expected 1 event, got %d", len(events))
|
|
}
|
|
|
|
retrieved := events[0]
|
|
if retrieved.GetCorrelationID() != "corr-helper" {
|
|
t.Errorf("GetCorrelationID mismatch: got %q", retrieved.GetCorrelationID())
|
|
}
|
|
if retrieved.GetCausationID() != "cause-helper" {
|
|
t.Errorf("GetCausationID mismatch: got %q", retrieved.GetCausationID())
|
|
}
|
|
if retrieved.GetUserID() != "user-helper" {
|
|
t.Errorf("GetUserID mismatch: got %q", retrieved.GetUserID())
|
|
}
|
|
if retrieved.GetTraceID() != "trace-helper" {
|
|
t.Errorf("GetTraceID mismatch: got %q", retrieved.GetTraceID())
|
|
}
|
|
if retrieved.GetSpanID() != "span-helper" {
|
|
t.Errorf("GetSpanID mismatch: got %q", retrieved.GetSpanID())
|
|
}
|
|
}
|
|
|
|
func TestSaveEvent_MetadataPreservedAcrossMultipleEvents(t *testing.T) {
|
|
store := NewInMemoryEventStore()
|
|
|
|
// Save multiple events with different metadata
|
|
for i := 1; i <= 3; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "OrderUpdated",
|
|
ActorID: "order-456",
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{},
|
|
Metadata: map[string]string{
|
|
"correlationId": fmt.Sprintf("corr-%d", i),
|
|
"eventIndex": fmt.Sprintf("%d", i),
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed for event %d: %v", i, err)
|
|
}
|
|
}
|
|
|
|
events, err := store.GetEvents("order-456", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
if len(events) != 3 {
|
|
t.Fatalf("expected 3 events, got %d", len(events))
|
|
}
|
|
|
|
// Verify each event has its own metadata
|
|
for i, event := range events {
|
|
expectedCorr := fmt.Sprintf("corr-%d", i+1)
|
|
expectedIdx := fmt.Sprintf("%d", i+1)
|
|
|
|
if event.Metadata["correlationId"] != expectedCorr {
|
|
t.Errorf("event %d correlationId mismatch: got %q, want %q", i+1, event.Metadata["correlationId"], expectedCorr)
|
|
}
|
|
if event.Metadata["eventIndex"] != expectedIdx {
|
|
t.Errorf("event %d eventIndex mismatch: got %q, want %q", i+1, event.Metadata["eventIndex"], expectedIdx)
|
|
}
|
|
}
|
|
}
|