[Issue #4] Add SnapshotStore unit tests (#31)
All checks were successful
CI / build (push) Successful in 15s
All checks were successful
CI / build (push) Successful in 15s
This commit was merged in pull request #31.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
@@ -8,14 +9,16 @@ import (
|
||||
|
||||
// InMemoryEventStore provides a simple in-memory event store for testing
|
||||
type InMemoryEventStore struct {
|
||||
mu sync.RWMutex
|
||||
events map[string][]*aether.Event // actorID -> events
|
||||
mu sync.RWMutex
|
||||
events map[string][]*aether.Event // actorID -> events
|
||||
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
||||
}
|
||||
|
||||
// NewInMemoryEventStore creates a new in-memory event store
|
||||
func NewInMemoryEventStore() *InMemoryEventStore {
|
||||
return &InMemoryEventStore{
|
||||
events: make(map[string][]*aether.Event),
|
||||
events: make(map[string][]*aether.Event),
|
||||
snapshots: make(map[string][]*aether.ActorSnapshot),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,3 +73,39 @@ func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) {
|
||||
|
||||
return latestVersion, nil
|
||||
}
|
||||
|
||||
// SaveSnapshot saves a snapshot to the in-memory store
|
||||
func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error {
|
||||
if snapshot == nil {
|
||||
return fmt.Errorf("snapshot cannot be nil")
|
||||
}
|
||||
|
||||
es.mu.Lock()
|
||||
defer es.mu.Unlock()
|
||||
|
||||
if _, exists := es.snapshots[snapshot.ActorID]; !exists {
|
||||
es.snapshots[snapshot.ActorID] = make([]*aether.ActorSnapshot, 0)
|
||||
}
|
||||
es.snapshots[snapshot.ActorID] = append(es.snapshots[snapshot.ActorID], snapshot)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetLatestSnapshot returns the most recent snapshot for an actor
|
||||
func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
|
||||
es.mu.RLock()
|
||||
defer es.mu.RUnlock()
|
||||
|
||||
snapshots, exists := es.snapshots[actorID]
|
||||
if !exists || len(snapshots) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var latest *aether.ActorSnapshot
|
||||
for _, snapshot := range snapshots {
|
||||
if latest == nil || snapshot.Version > latest.Version {
|
||||
latest = snapshot
|
||||
}
|
||||
}
|
||||
|
||||
return latest, nil
|
||||
}
|
||||
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
)
|
||||
|
||||
// === Event Store Tests (from main branch) ===
|
||||
|
||||
func TestNewInMemoryEventStore(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
@@ -756,7 +758,7 @@ func TestSaveEvent_SpecialActorIDs(t *testing.T) {
|
||||
"with:colons",
|
||||
"with/slashes",
|
||||
"user@example.com",
|
||||
"unicode-\u4e16\u754c",
|
||||
"unicode-世界",
|
||||
"",
|
||||
}
|
||||
|
||||
@@ -845,3 +847,644 @@ func BenchmarkGetLatestVersion(b *testing.B) {
|
||||
store.GetLatestVersion("actor-123")
|
||||
}
|
||||
}
|
||||
|
||||
// === Snapshot Store Tests (from PR branch) ===
|
||||
|
||||
func TestSaveSnapshot_PersistsCorrectly(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-123",
|
||||
Version: 10,
|
||||
State: map[string]interface{}{
|
||||
"balance": 100.50,
|
||||
"status": "active",
|
||||
},
|
||||
Timestamp: time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC),
|
||||
}
|
||||
|
||||
err := store.SaveSnapshot(snapshot)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify snapshot was persisted by retrieving it
|
||||
retrieved, err := store.GetLatestSnapshot("actor-123")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
if retrieved == nil {
|
||||
t.Fatal("expected snapshot to be persisted, got nil")
|
||||
}
|
||||
|
||||
if retrieved.ActorID != snapshot.ActorID {
|
||||
t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, snapshot.ActorID)
|
||||
}
|
||||
if retrieved.Version != snapshot.Version {
|
||||
t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, snapshot.Version)
|
||||
}
|
||||
if retrieved.State["balance"] != snapshot.State["balance"] {
|
||||
t.Errorf("State.balance mismatch: got %v, want %v", retrieved.State["balance"], snapshot.State["balance"])
|
||||
}
|
||||
if retrieved.State["status"] != snapshot.State["status"] {
|
||||
t.Errorf("State.status mismatch: got %v, want %v", retrieved.State["status"], snapshot.State["status"])
|
||||
}
|
||||
if !retrieved.Timestamp.Equal(snapshot.Timestamp) {
|
||||
t.Errorf("Timestamp mismatch: got %v, want %v", retrieved.Timestamp, snapshot.Timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveSnapshot_NilSnapshot(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
err := store.SaveSnapshot(nil)
|
||||
if err == nil {
|
||||
t.Error("expected error when saving nil snapshot, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveSnapshot_MultipleSnapshots(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save multiple snapshots for the same actor
|
||||
for i := 1; i <= 5; i++ {
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-multi",
|
||||
Version: int64(i * 10),
|
||||
State: map[string]interface{}{
|
||||
"iteration": i,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed for version %d: %v", i*10, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify all snapshots were saved by checking the latest
|
||||
retrieved, err := store.GetLatestSnapshot("actor-multi")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
if retrieved.Version != 50 {
|
||||
t.Errorf("expected latest version 50, got %d", retrieved.Version)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetLatestSnapshot_ReturnsMostRecent(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save snapshots in non-sequential order to test version comparison
|
||||
versions := []int64{5, 15, 10, 25, 20}
|
||||
for _, v := range versions {
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-latest",
|
||||
Version: v,
|
||||
State: map[string]interface{}{
|
||||
"version": v,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed for version %d: %v", v, err)
|
||||
}
|
||||
}
|
||||
|
||||
latest, err := store.GetLatestSnapshot("actor-latest")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
if latest == nil {
|
||||
t.Fatal("expected snapshot, got nil")
|
||||
}
|
||||
|
||||
if latest.Version != 25 {
|
||||
t.Errorf("expected latest version 25, got %d", latest.Version)
|
||||
}
|
||||
|
||||
// Verify the state matches the snapshot with version 25
|
||||
if latest.State["version"].(int64) != 25 {
|
||||
t.Errorf("expected state.version to be 25, got %v", latest.State["version"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetLatestSnapshot_NoSnapshotExists(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Query for a non-existent actor
|
||||
snapshot, err := store.GetLatestSnapshot("non-existent-actor")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
if snapshot != nil {
|
||||
t.Errorf("expected nil for non-existent actor, got %+v", snapshot)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetLatestSnapshot_EmptyActorID(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save a snapshot with empty actor ID
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: "",
|
||||
Version: 1,
|
||||
State: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
// Retrieve with empty actor ID
|
||||
retrieved, err := store.GetLatestSnapshot("")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
if retrieved == nil {
|
||||
t.Fatal("expected snapshot with empty actorID, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotVersioning_RespectedAcrossActors(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save snapshots for different actors
|
||||
actors := []string{"actor-a", "actor-b", "actor-c"}
|
||||
for i, actorID := range actors {
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: actorID,
|
||||
Version: int64((i + 1) * 100), // Different versions per actor
|
||||
State: map[string]interface{}{
|
||||
"actor": actorID,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed for %s: %v", actorID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify each actor has their own snapshot with correct version
|
||||
for i, actorID := range actors {
|
||||
snapshot, err := store.GetLatestSnapshot(actorID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed for %s: %v", actorID, err)
|
||||
}
|
||||
|
||||
expectedVersion := int64((i + 1) * 100)
|
||||
if snapshot.Version != expectedVersion {
|
||||
t.Errorf("actor %s: expected version %d, got %d", actorID, expectedVersion, snapshot.Version)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotVersioning_LowerVersionAfterHigher(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save a high version first
|
||||
highSnapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-order",
|
||||
Version: 100,
|
||||
State: map[string]interface{}{
|
||||
"marker": "high",
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveSnapshot(highSnapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
// Save a lower version after
|
||||
lowSnapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-order",
|
||||
Version: 50,
|
||||
State: map[string]interface{}{
|
||||
"marker": "low",
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveSnapshot(lowSnapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
// GetLatestSnapshot should return the higher version (100), not the most recently saved
|
||||
latest, err := store.GetLatestSnapshot("actor-order")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
if latest.Version != 100 {
|
||||
t.Errorf("expected version 100, got %d", latest.Version)
|
||||
}
|
||||
if latest.State["marker"] != "high" {
|
||||
t.Errorf("expected marker 'high', got %v", latest.State["marker"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotDataIntegrity_ComplexState(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
complexState := map[string]interface{}{
|
||||
"string": "hello",
|
||||
"integer": 42,
|
||||
"float": 3.14159,
|
||||
"boolean": true,
|
||||
"null": nil,
|
||||
"array": []interface{}{"a", "b", "c"},
|
||||
"nested": map[string]interface{}{
|
||||
"level1": map[string]interface{}{
|
||||
"level2": "deep value",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-complex",
|
||||
Version: 1,
|
||||
State: complexState,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
retrieved, err := store.GetLatestSnapshot("actor-complex")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify all fields
|
||||
if retrieved.State["string"] != "hello" {
|
||||
t.Errorf("string mismatch: got %v", retrieved.State["string"])
|
||||
}
|
||||
if retrieved.State["integer"] != 42 {
|
||||
t.Errorf("integer mismatch: got %v", retrieved.State["integer"])
|
||||
}
|
||||
if retrieved.State["float"] != 3.14159 {
|
||||
t.Errorf("float mismatch: got %v", retrieved.State["float"])
|
||||
}
|
||||
if retrieved.State["boolean"] != true {
|
||||
t.Errorf("boolean mismatch: got %v", retrieved.State["boolean"])
|
||||
}
|
||||
if retrieved.State["null"] != nil {
|
||||
t.Errorf("null mismatch: got %v", retrieved.State["null"])
|
||||
}
|
||||
|
||||
// Verify array
|
||||
arr, ok := retrieved.State["array"].([]interface{})
|
||||
if !ok {
|
||||
t.Fatal("array is not []interface{}")
|
||||
}
|
||||
if len(arr) != 3 || arr[0] != "a" || arr[1] != "b" || arr[2] != "c" {
|
||||
t.Errorf("array mismatch: got %v", arr)
|
||||
}
|
||||
|
||||
// Verify nested structure
|
||||
nested, ok := retrieved.State["nested"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatal("nested is not map[string]interface{}")
|
||||
}
|
||||
level1, ok := nested["level1"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatal("level1 is not map[string]interface{}")
|
||||
}
|
||||
if level1["level2"] != "deep value" {
|
||||
t.Errorf("nested value mismatch: got %v", level1["level2"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotDataIntegrity_SpecialCharacters(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
specialState := map[string]interface{}{
|
||||
"unicode": "Hello, 世界!",
|
||||
"emoji": "😀🚀",
|
||||
"newlines": "line1\nline2\r\nline3",
|
||||
"tabs": "col1\tcol2",
|
||||
"quotes": `"double" and 'single'`,
|
||||
"backslash": `path\to\file`,
|
||||
}
|
||||
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-special",
|
||||
Version: 1,
|
||||
State: specialState,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
retrieved, err := store.GetLatestSnapshot("actor-special")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
for key, expected := range specialState {
|
||||
if retrieved.State[key] != expected {
|
||||
t.Errorf("State[%q] mismatch: got %q, want %q", key, retrieved.State[key], expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotDataIntegrity_EmptyState(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-empty",
|
||||
Version: 1,
|
||||
State: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
retrieved, err := store.GetLatestSnapshot("actor-empty")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
if len(retrieved.State) != 0 {
|
||||
t.Errorf("expected empty state, got %v", retrieved.State)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotDataIntegrity_NilState(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-nil",
|
||||
Version: 1,
|
||||
State: nil,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
retrieved, err := store.GetLatestSnapshot("actor-nil")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
if retrieved.State != nil {
|
||||
t.Errorf("expected nil state, got %v", retrieved.State)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotDataIntegrity_LargeState(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Create a large state with many entries using unique keys
|
||||
largeState := make(map[string]interface{})
|
||||
for i := 0; i < 1000; i++ {
|
||||
largeState[fmt.Sprintf("key-%d", i)] = i
|
||||
}
|
||||
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-large",
|
||||
Version: 1,
|
||||
State: largeState,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
retrieved, err := store.GetLatestSnapshot("actor-large")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
if len(retrieved.State) != len(largeState) {
|
||||
t.Errorf("state size mismatch: got %d, want %d", len(retrieved.State), len(largeState))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotDataIntegrity_TimestampPreserved(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Test various timestamps
|
||||
timestamps := []time.Time{
|
||||
time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC),
|
||||
time.Date(2020, 6, 15, 23, 59, 59, 999999999, time.UTC),
|
||||
time.Time{}, // Zero time
|
||||
}
|
||||
|
||||
for i, ts := range timestamps {
|
||||
actorID := fmt.Sprintf("actor-ts-%d", i)
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: actorID,
|
||||
Version: 1,
|
||||
State: map[string]interface{}{},
|
||||
Timestamp: ts,
|
||||
}
|
||||
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
retrieved, err := store.GetLatestSnapshot(actorID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
if !retrieved.Timestamp.Equal(ts) {
|
||||
t.Errorf("timestamp %d mismatch: got %v, want %v", i, retrieved.Timestamp, ts)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotVersioning_ZeroVersion(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-zero-version",
|
||||
Version: 0,
|
||||
State: map[string]interface{}{"initial": true},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
retrieved, err := store.GetLatestSnapshot("actor-zero-version")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
if retrieved.Version != 0 {
|
||||
t.Errorf("expected version 0, got %d", retrieved.Version)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotVersioning_LargeVersion(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
largeVersion := int64(9223372036854775807) // MaxInt64
|
||||
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-large-version",
|
||||
Version: largeVersion,
|
||||
State: map[string]interface{}{"maxed": true},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
retrieved, err := store.GetLatestSnapshot("actor-large-version")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
if retrieved.Version != largeVersion {
|
||||
t.Errorf("expected version %d, got %d", largeVersion, retrieved.Version)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotStore_ImplementsInterface(t *testing.T) {
|
||||
// Verify InMemoryEventStore implements SnapshotStore interface
|
||||
var _ aether.SnapshotStore = (*InMemoryEventStore)(nil)
|
||||
}
|
||||
|
||||
func TestConcurrentSaveSnapshot(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
numGoroutines := 100
|
||||
snapshotsPerGoroutine := 10
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numGoroutines)
|
||||
|
||||
for g := 0; g < numGoroutines; g++ {
|
||||
go func(goroutineID int) {
|
||||
defer wg.Done()
|
||||
for i := 0; i < snapshotsPerGoroutine; i++ {
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: fmt.Sprintf("actor-%d", goroutineID),
|
||||
Version: int64(i + 1),
|
||||
State: map[string]interface{}{
|
||||
"goroutine": goroutineID,
|
||||
"iteration": i,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Errorf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
}
|
||||
}(g)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Verify each actor has snapshots
|
||||
for g := 0; g < numGoroutines; g++ {
|
||||
actorID := fmt.Sprintf("actor-%d", g)
|
||||
snapshot, err := store.GetLatestSnapshot(actorID)
|
||||
if err != nil {
|
||||
t.Errorf("GetLatestSnapshot failed for %s: %v", actorID, err)
|
||||
continue
|
||||
}
|
||||
if snapshot == nil {
|
||||
t.Errorf("expected snapshot for %s, got nil", actorID)
|
||||
continue
|
||||
}
|
||||
if snapshot.Version != int64(snapshotsPerGoroutine) {
|
||||
t.Errorf("expected latest version %d for %s, got %d", snapshotsPerGoroutine, actorID, snapshot.Version)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentSaveAndGetSnapshot(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Pre-populate with initial snapshot
|
||||
initialSnapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-123",
|
||||
Version: 1,
|
||||
State: map[string]interface{}{
|
||||
"initial": true,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveSnapshot(initialSnapshot); err != nil {
|
||||
t.Fatalf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
numReaders := 50
|
||||
numWriters := 10
|
||||
readsPerReader := 100
|
||||
writesPerWriter := 10
|
||||
|
||||
// Start readers
|
||||
wg.Add(numReaders)
|
||||
for r := 0; r < numReaders; r++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < readsPerReader; i++ {
|
||||
snapshot, err := store.GetLatestSnapshot("actor-123")
|
||||
if err != nil {
|
||||
t.Errorf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
if snapshot == nil {
|
||||
t.Error("expected snapshot, got nil")
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Start writers
|
||||
wg.Add(numWriters)
|
||||
for w := 0; w < numWriters; w++ {
|
||||
go func(writerID int) {
|
||||
defer wg.Done()
|
||||
for i := 0; i < writesPerWriter; i++ {
|
||||
snapshot := &aether.ActorSnapshot{
|
||||
ActorID: "actor-123",
|
||||
Version: int64(100 + writerID*writesPerWriter + i),
|
||||
State: map[string]interface{}{
|
||||
"writer": writerID,
|
||||
"index": i,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveSnapshot(snapshot); err != nil {
|
||||
t.Errorf("SaveSnapshot failed: %v", err)
|
||||
}
|
||||
}
|
||||
}(w)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Verify final state - should have the highest version
|
||||
snapshot, err := store.GetLatestSnapshot("actor-123")
|
||||
if err != nil {
|
||||
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
||||
}
|
||||
if snapshot == nil {
|
||||
t.Fatal("expected snapshot, got nil")
|
||||
}
|
||||
// The highest version should be around 100 + (numWriters-1)*writesPerWriter + (writesPerWriter-1)
|
||||
// which is 100 + 9*10 + 9 = 199
|
||||
expectedMaxVersion := int64(100 + (numWriters-1)*writesPerWriter + (writesPerWriter - 1))
|
||||
if snapshot.Version != expectedMaxVersion {
|
||||
t.Errorf("expected latest version %d, got %d", expectedMaxVersion, snapshot.Version)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user