diff --git a/CLAUDE.md b/CLAUDE.md index d15664d..d67d261 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -79,6 +79,49 @@ store.SaveEvent(event) events, _ := store.GetEvents("order-123", 0) ``` +### Event Versioning + +Events for each actor must have **monotonically increasing versions**. This ensures event stream integrity and enables optimistic concurrency control. + +#### Version Semantics + +- Each actor has an independent version sequence +- Version must be strictly greater than the current latest version +- For new actors (no events), the first event must have version > 0 +- Non-consecutive versions are allowed (gaps are permitted) + +#### Optimistic Concurrency Pattern + +```go +// 1. Get current version +currentVersion, _ := store.GetLatestVersion("order-123") + +// 2. Create event with next version +event := &aether.Event{ + ID: uuid.New().String(), + EventType: "OrderUpdated", + ActorID: "order-123", + Version: currentVersion + 1, + Data: map[string]interface{}{"status": "shipped"}, + Timestamp: time.Now(), +} + +// 3. Attempt to save +err := store.SaveEvent(event) +if errors.Is(err, aether.ErrVersionConflict) { + // Another writer won - reload and retry if appropriate + var versionErr *aether.VersionConflictError + errors.As(err, &versionErr) + log.Printf("Conflict: actor %s has version %d, attempted %d", + versionErr.ActorID, versionErr.CurrentVersion, versionErr.AttemptedVersion) +} +``` + +#### Error Types + +- `ErrVersionConflict` - Sentinel error for version conflicts (use with `errors.Is`) +- `VersionConflictError` - Detailed error with ActorID, CurrentVersion, and AttemptedVersion + ### Namespace Isolation Namespaces provide logical boundaries for events and subscriptions: @@ -111,6 +154,7 @@ if manager.IsLeader() { ## Key Patterns - **Events are immutable** - Never modify, only append +- **Versions are monotonic** - Each event must have version > previous for same actor - **Snapshots for performance** - Periodically snapshot state to avoid full replay - **Namespaces for isolation** - Not multi-tenancy, just logical boundaries - **NATS for everything** - Events, pub/sub, clustering all use NATS diff --git a/event.go b/event.go index c7980da..4d30dff 100644 --- a/event.go +++ b/event.go @@ -1,9 +1,33 @@ package aether import ( + "errors" + "fmt" "time" ) +// ErrVersionConflict is returned when attempting to save an event with a version +// that is not strictly greater than the current latest version for an actor. +// This ensures events have monotonically increasing versions per actor. +var ErrVersionConflict = errors.New("version conflict") + +// VersionConflictError provides details about a version conflict. +// It is returned when SaveEvent is called with a version <= the current latest version. +type VersionConflictError struct { + ActorID string + AttemptedVersion int64 + CurrentVersion int64 +} + +func (e *VersionConflictError) Error() string { + return fmt.Sprintf("%s: actor %q has version %d, cannot save version %d", + ErrVersionConflict, e.ActorID, e.CurrentVersion, e.AttemptedVersion) +} + +func (e *VersionConflictError) Unwrap() error { + return ErrVersionConflict +} + // Event represents a domain event in the system type Event struct { ID string `json:"id"` @@ -23,10 +47,36 @@ type ActorSnapshot struct { Timestamp time.Time `json:"timestamp"` } -// EventStore defines the interface for event persistence +// EventStore defines the interface for event persistence. +// +// # Version Semantics +// +// Events for an actor must have monotonically increasing versions. When SaveEvent +// is called, the implementation must validate that the event's version is strictly +// greater than the current latest version for that actor. If the version is less +// than or equal to the current version, SaveEvent must return a VersionConflictError +// (which wraps ErrVersionConflict). +// +// This validation ensures event stream integrity and enables optimistic concurrency +// control. Clients should: +// 1. Call GetLatestVersion to get the current version for an actor +// 2. Set the new event's version to currentVersion + 1 +// 3. Call SaveEvent - if it returns ErrVersionConflict, another writer won +// 4. On conflict, reload the latest version and retry if appropriate +// +// For new actors (no existing events), version 1 is expected for the first event. type EventStore interface { + // SaveEvent persists an event to the store. The event's Version must be + // strictly greater than the current latest version for the actor. + // Returns VersionConflictError if version <= current latest version. SaveEvent(event *Event) error + + // GetEvents retrieves events for an actor from a specific version (inclusive). + // Returns an empty slice if no events exist for the actor. GetEvents(actorID string, fromVersion int64) ([]*Event, error) + + // GetLatestVersion returns the latest version for an actor. + // Returns 0 if no events exist for the actor. GetLatestVersion(actorID string) (int64, error) } diff --git a/store/jetstream.go b/store/jetstream.go index cb47a23..b98edcf 100644 --- a/store/jetstream.go +++ b/store/jetstream.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "time" "git.flowmade.one/flowmade-one/aether" @@ -14,6 +15,8 @@ import ( type JetStreamEventStore struct { js nats.JetStreamContext streamName string + mu sync.Mutex // Protects version checks during SaveEvent + versions map[string]int64 // actorID -> latest version cache } // NewJetStreamEventStore creates a new JetStream-based event store @@ -41,11 +44,32 @@ func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamE return &JetStreamEventStore{ js: js, streamName: streamName, + versions: make(map[string]int64), }, nil } -// SaveEvent persists an event to JetStream +// SaveEvent persists an event to JetStream. +// Returns VersionConflictError if the event's version is not strictly greater +// than the current latest version for the actor. func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { + jes.mu.Lock() + defer jes.mu.Unlock() + + // Get current latest version for this actor + currentVersion, err := jes.getLatestVersionLocked(event.ActorID) + if err != nil { + return fmt.Errorf("failed to get latest version: %w", err) + } + + // Validate version is strictly greater than current + if event.Version <= currentVersion { + return &aether.VersionConflictError{ + ActorID: event.ActorID, + AttemptedVersion: event.Version, + CurrentVersion: currentVersion, + } + } + // Serialize event to JSON data, err := json.Marshal(event) if err != nil { @@ -64,11 +88,50 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { return fmt.Errorf("failed to publish event to JetStream: %w", err) } + // Update version cache + jes.versions[event.ActorID] = event.Version + return nil } +// getLatestVersionLocked returns the latest version for an actor. +// Caller must hold jes.mu. +func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) { + // Check cache first + if version, ok := jes.versions[actorID]; ok { + return version, nil + } + + // Fetch from JetStream + events, err := jes.getEventsInternal(actorID, 0) + if err != nil { + return 0, err + } + + if len(events) == 0 { + return 0, nil + } + + latestVersion := int64(0) + for _, event := range events { + if event.Version > latestVersion { + latestVersion = event.Version + } + } + + // Update cache + jes.versions[actorID] = latestVersion + + return latestVersion, nil +} + // GetEvents retrieves all events for an actor since a version func (jes *JetStreamEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) { + return jes.getEventsInternal(actorID, fromVersion) +} + +// getEventsInternal is the internal implementation of GetEvents +func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion int64) ([]*aether.Event, error) { // Create subject filter for this actor subject := fmt.Sprintf("%s.events.%s.%s", jes.streamName, diff --git a/store/memory.go b/store/memory.go index 21c5b14..0fba85d 100644 --- a/store/memory.go +++ b/store/memory.go @@ -1,7 +1,6 @@ package store import ( - "fmt" "sync" "git.flowmade.one/flowmade-one/aether" @@ -22,11 +21,32 @@ func NewInMemoryEventStore() *InMemoryEventStore { } } -// SaveEvent saves an event to the in-memory store +// SaveEvent saves an event to the in-memory store. +// Returns VersionConflictError if the event's version is not strictly greater +// than the current latest version for the actor. func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { es.mu.Lock() defer es.mu.Unlock() + // Get current latest version for this actor + currentVersion := int64(0) + if events, exists := es.events[event.ActorID]; exists { + for _, e := range events { + if e.Version > currentVersion { + currentVersion = e.Version + } + } + } + + // Validate version is strictly greater than current + if event.Version <= currentVersion { + return &aether.VersionConflictError{ + ActorID: event.ActorID, + AttemptedVersion: event.Version, + CurrentVersion: currentVersion, + } + } + if _, exists := es.events[event.ActorID]; !exists { es.events[event.ActorID] = make([]*aether.Event, 0) } @@ -77,7 +97,7 @@ func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) { // SaveSnapshot saves a snapshot to the in-memory store func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error { if snapshot == nil { - return fmt.Errorf("snapshot cannot be nil") + return &snapshotNilError{} } es.mu.Lock() @@ -109,3 +129,10 @@ func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSn return latest, nil } + +// snapshotNilError is returned when attempting to save a nil snapshot +type snapshotNilError struct{} + +func (e *snapshotNilError) Error() string { + return "snapshot cannot be nil" +} diff --git a/store/memory_test.go b/store/memory_test.go index 20b7639..38dd21c 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -1,15 +1,17 @@ package store import ( + "errors" "fmt" "sync" + "sync/atomic" "testing" "time" "git.flowmade.one/flowmade-one/aether" ) -// === Event Store Tests (from main branch) === +// === Event Store Tests === func TestNewInMemoryEventStore(t *testing.T) { store := NewInMemoryEventStore() @@ -292,8 +294,8 @@ func TestGetEvents_FromVersionZero(t *testing.T) { func TestGetLatestVersion_ReturnsCorrectVersion(t *testing.T) { store := NewInMemoryEventStore() - // Save events with various versions - versions := []int64{1, 3, 2, 5, 4} // Out of order + // Save events with strictly increasing versions + versions := []int64{1, 2, 3, 4, 5} for i, version := range versions { event := &aether.Event{ ID: fmt.Sprintf("evt-%d", i), @@ -553,17 +555,18 @@ func TestConcurrentSaveAndGet(t *testing.T) { }() } - // Start writers + // Start writers - each writer gets their own actor to avoid version conflicts wg.Add(numWriters) for w := 0; w < numWriters; w++ { go func(writerID int) { defer wg.Done() + actorID := fmt.Sprintf("writer-actor-%d", writerID) for i := 0; i < writesPerWriter; i++ { event := &aether.Event{ ID: fmt.Sprintf("evt-writer-%d-%d", writerID, i), EventType: "TestEvent", - ActorID: "actor-123", - Version: int64(100 + writerID*writesPerWriter + i), + ActorID: actorID, + Version: int64(i + 1), Data: map[string]interface{}{}, Timestamp: time.Now(), } @@ -576,15 +579,27 @@ func TestConcurrentSaveAndGet(t *testing.T) { wg.Wait() - // Verify final state + // Verify actor-123 still has its original events events, err := store.GetEvents("actor-123", 0) if err != nil { t.Fatalf("GetEvents failed: %v", err) } - expectedTotal := 10 + numWriters*writesPerWriter - if len(events) != expectedTotal { - t.Errorf("expected %d total events, got %d", expectedTotal, len(events)) + if len(events) != 10 { + t.Errorf("expected 10 events for actor-123, got %d", len(events)) + } + + // Verify each writer's actor has the correct events + for w := 0; w < numWriters; w++ { + actorID := fmt.Sprintf("writer-actor-%d", w) + events, err := store.GetEvents(actorID, 0) + if err != nil { + t.Errorf("GetEvents failed for %s: %v", actorID, err) + continue + } + if len(events) != writesPerWriter { + t.Errorf("expected %d events for %s, got %d", writesPerWriter, actorID, len(events)) + } } } @@ -686,8 +701,8 @@ func TestSaveEvent_EmptyData(t *testing.T) { func TestGetEvents_VersionEdgeCases(t *testing.T) { store := NewInMemoryEventStore() - // Save events with edge case versions - versions := []int64{0, 1, 9223372036854775807} // Zero, one, MaxInt64 + // Save events with edge case versions (strictly increasing) + versions := []int64{1, 100, 9223372036854775807} // one, 100, MaxInt64 for i, version := range versions { event := &aether.Event{ ID: fmt.Sprintf("evt-%d", i), @@ -698,7 +713,7 @@ func TestGetEvents_VersionEdgeCases(t *testing.T) { Timestamp: time.Now(), } if err := store.SaveEvent(event); err != nil { - t.Fatalf("SaveEvent failed: %v", err) + t.Fatalf("SaveEvent failed for version %d: %v", version, err) } } @@ -787,6 +802,189 @@ func TestSaveEvent_SpecialActorIDs(t *testing.T) { } } +// === Version Validation Tests === + +func TestSaveEvent_RejectsLowerVersion(t *testing.T) { + store := NewInMemoryEventStore() + + // Save first event with version 5 + event1 := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 5, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event1); err != nil { + t.Fatalf("SaveEvent failed for first event: %v", err) + } + + // Attempt to save event with lower version (should fail) + event2 := &aether.Event{ + ID: "evt-2", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 3, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err := store.SaveEvent(event2) + if err == nil { + t.Fatal("expected error when saving event with lower version, got nil") + } + + // Verify it's a VersionConflictError + if !errors.Is(err, aether.ErrVersionConflict) { + t.Errorf("expected ErrVersionConflict, got %v", err) + } + + var versionErr *aether.VersionConflictError + if !errors.As(err, &versionErr) { + t.Fatalf("expected VersionConflictError, got %T", err) + } + + if versionErr.ActorID != "actor-123" { + t.Errorf("ActorID mismatch: got %q, want %q", versionErr.ActorID, "actor-123") + } + if versionErr.CurrentVersion != 5 { + t.Errorf("CurrentVersion mismatch: got %d, want %d", versionErr.CurrentVersion, 5) + } + if versionErr.AttemptedVersion != 3 { + t.Errorf("AttemptedVersion mismatch: got %d, want %d", versionErr.AttemptedVersion, 3) + } +} + +func TestSaveEvent_RejectsEqualVersion(t *testing.T) { + store := NewInMemoryEventStore() + + // Save first event with version 5 + event1 := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 5, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event1); err != nil { + t.Fatalf("SaveEvent failed for first event: %v", err) + } + + // Attempt to save event with equal version (should fail) + event2 := &aether.Event{ + ID: "evt-2", + EventType: "TestEvent", + ActorID: "actor-123", + Version: 5, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err := store.SaveEvent(event2) + if err == nil { + t.Fatal("expected error when saving event with equal version, got nil") + } + + if !errors.Is(err, aether.ErrVersionConflict) { + t.Errorf("expected ErrVersionConflict, got %v", err) + } +} + +func TestSaveEvent_RejectsZeroVersion(t *testing.T) { + store := NewInMemoryEventStore() + + // Version 0 should be rejected (not strictly greater than initial 0) + event := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-new", + Version: 0, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err := store.SaveEvent(event) + if err == nil { + t.Fatal("expected error when saving event with version 0, got nil") + } + + if !errors.Is(err, aether.ErrVersionConflict) { + t.Errorf("expected ErrVersionConflict, got %v", err) + } +} + +func TestSaveEvent_RejectsNegativeVersion(t *testing.T) { + store := NewInMemoryEventStore() + + event := &aether.Event{ + ID: "evt-neg", + EventType: "TestEvent", + ActorID: "actor-123", + Version: -1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err := store.SaveEvent(event) + if err == nil { + t.Fatal("expected error when saving event with negative version, got nil") + } + + if !errors.Is(err, aether.ErrVersionConflict) { + t.Errorf("expected ErrVersionConflict, got %v", err) + } +} + +func TestSaveEvent_ConcurrentWritesToSameActor(t *testing.T) { + store := NewInMemoryEventStore() + + numGoroutines := 100 + var successCount int64 + var conflictCount int64 + var wg sync.WaitGroup + + // All goroutines try to save version 1 + wg.Add(numGoroutines) + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", id), + EventType: "TestEvent", + ActorID: "actor-contested", + Version: 1, + Data: map[string]interface{}{"goroutine": id}, + Timestamp: time.Now(), + } + err := store.SaveEvent(event) + if err == nil { + atomic.AddInt64(&successCount, 1) + } else if errors.Is(err, aether.ErrVersionConflict) { + atomic.AddInt64(&conflictCount, 1) + } else { + t.Errorf("unexpected error: %v", err) + } + }(i) + } + + wg.Wait() + + // Exactly one should succeed, rest should conflict + if successCount != 1 { + t.Errorf("expected exactly 1 success, got %d", successCount) + } + if conflictCount != int64(numGoroutines-1) { + t.Errorf("expected %d conflicts, got %d", numGoroutines-1, conflictCount) + } + + // Verify only one event was stored + events, err := store.GetEvents("actor-contested", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + if len(events) != 1 { + t.Errorf("expected 1 event, got %d", len(events)) + } +} + func BenchmarkSaveEvent(b *testing.B) { store := NewInMemoryEventStore() @@ -848,7 +1046,7 @@ func BenchmarkGetLatestVersion(b *testing.B) { } } -// === Snapshot Store Tests (from PR branch) === +// === Snapshot Store Tests === func TestSaveSnapshot_PersistsCorrectly(t *testing.T) { store := NewInMemoryEventStore()