[Issue #6] Add event versioning validation #34

Merged
HugoNijhuis merged 1 commits from issue-6-event-versioning-validation into main 2026-01-09 17:09:01 +00:00
5 changed files with 401 additions and 19 deletions

View File

@@ -79,6 +79,49 @@ store.SaveEvent(event)
events, _ := store.GetEvents("order-123", 0) events, _ := store.GetEvents("order-123", 0)
``` ```
### Event Versioning
Events for each actor must have **monotonically increasing versions**. This ensures event stream integrity and enables optimistic concurrency control.
#### Version Semantics
- Each actor has an independent version sequence
- Version must be strictly greater than the current latest version
- For new actors (no events), the first event must have version > 0
- Non-consecutive versions are allowed (gaps are permitted)
#### Optimistic Concurrency Pattern
```go
// 1. Get current version
currentVersion, _ := store.GetLatestVersion("order-123")
// 2. Create event with next version
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderUpdated",
ActorID: "order-123",
Version: currentVersion + 1,
Data: map[string]interface{}{"status": "shipped"},
Timestamp: time.Now(),
}
// 3. Attempt to save
err := store.SaveEvent(event)
if errors.Is(err, aether.ErrVersionConflict) {
// Another writer won - reload and retry if appropriate
var versionErr *aether.VersionConflictError
errors.As(err, &versionErr)
log.Printf("Conflict: actor %s has version %d, attempted %d",
versionErr.ActorID, versionErr.CurrentVersion, versionErr.AttemptedVersion)
}
```
#### Error Types
- `ErrVersionConflict` - Sentinel error for version conflicts (use with `errors.Is`)
- `VersionConflictError` - Detailed error with ActorID, CurrentVersion, and AttemptedVersion
### Namespace Isolation ### Namespace Isolation
Namespaces provide logical boundaries for events and subscriptions: Namespaces provide logical boundaries for events and subscriptions:
@@ -111,6 +154,7 @@ if manager.IsLeader() {
## Key Patterns ## Key Patterns
- **Events are immutable** - Never modify, only append - **Events are immutable** - Never modify, only append
- **Versions are monotonic** - Each event must have version > previous for same actor
- **Snapshots for performance** - Periodically snapshot state to avoid full replay - **Snapshots for performance** - Periodically snapshot state to avoid full replay
- **Namespaces for isolation** - Not multi-tenancy, just logical boundaries - **Namespaces for isolation** - Not multi-tenancy, just logical boundaries
- **NATS for everything** - Events, pub/sub, clustering all use NATS - **NATS for everything** - Events, pub/sub, clustering all use NATS

View File

@@ -1,9 +1,33 @@
package aether package aether
import ( import (
"errors"
"fmt"
"time" "time"
) )
// ErrVersionConflict is returned when attempting to save an event with a version
// that is not strictly greater than the current latest version for an actor.
// This ensures events have monotonically increasing versions per actor.
var ErrVersionConflict = errors.New("version conflict")
// VersionConflictError provides details about a version conflict.
// It is returned when SaveEvent is called with a version <= the current latest version.
type VersionConflictError struct {
ActorID string
AttemptedVersion int64
CurrentVersion int64
}
func (e *VersionConflictError) Error() string {
return fmt.Sprintf("%s: actor %q has version %d, cannot save version %d",
ErrVersionConflict, e.ActorID, e.CurrentVersion, e.AttemptedVersion)
}
func (e *VersionConflictError) Unwrap() error {
return ErrVersionConflict
}
// Event represents a domain event in the system // Event represents a domain event in the system
type Event struct { type Event struct {
ID string `json:"id"` ID string `json:"id"`
@@ -23,10 +47,36 @@ type ActorSnapshot struct {
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
} }
// EventStore defines the interface for event persistence // EventStore defines the interface for event persistence.
//
// # Version Semantics
//
// Events for an actor must have monotonically increasing versions. When SaveEvent
// is called, the implementation must validate that the event's version is strictly
// greater than the current latest version for that actor. If the version is less
// than or equal to the current version, SaveEvent must return a VersionConflictError
// (which wraps ErrVersionConflict).
//
// This validation ensures event stream integrity and enables optimistic concurrency
// control. Clients should:
// 1. Call GetLatestVersion to get the current version for an actor
// 2. Set the new event's version to currentVersion + 1
// 3. Call SaveEvent - if it returns ErrVersionConflict, another writer won
// 4. On conflict, reload the latest version and retry if appropriate
//
// For new actors (no existing events), version 1 is expected for the first event.
type EventStore interface { type EventStore interface {
// SaveEvent persists an event to the store. The event's Version must be
// strictly greater than the current latest version for the actor.
// Returns VersionConflictError if version <= current latest version.
SaveEvent(event *Event) error SaveEvent(event *Event) error
// GetEvents retrieves events for an actor from a specific version (inclusive).
// Returns an empty slice if no events exist for the actor.
GetEvents(actorID string, fromVersion int64) ([]*Event, error) GetEvents(actorID string, fromVersion int64) ([]*Event, error)
// GetLatestVersion returns the latest version for an actor.
// Returns 0 if no events exist for the actor.
GetLatestVersion(actorID string) (int64, error) GetLatestVersion(actorID string) (int64, error)
} }

View File

@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"git.flowmade.one/flowmade-one/aether" "git.flowmade.one/flowmade-one/aether"
@@ -14,6 +15,8 @@ import (
type JetStreamEventStore struct { type JetStreamEventStore struct {
js nats.JetStreamContext js nats.JetStreamContext
streamName string streamName string
mu sync.Mutex // Protects version checks during SaveEvent
versions map[string]int64 // actorID -> latest version cache
} }
// NewJetStreamEventStore creates a new JetStream-based event store // NewJetStreamEventStore creates a new JetStream-based event store
@@ -41,11 +44,32 @@ func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamE
return &JetStreamEventStore{ return &JetStreamEventStore{
js: js, js: js,
streamName: streamName, streamName: streamName,
versions: make(map[string]int64),
}, nil }, nil
} }
// SaveEvent persists an event to JetStream // SaveEvent persists an event to JetStream.
// Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor.
func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
jes.mu.Lock()
defer jes.mu.Unlock()
// Get current latest version for this actor
currentVersion, err := jes.getLatestVersionLocked(event.ActorID)
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
}
// Validate version is strictly greater than current
if event.Version <= currentVersion {
return &aether.VersionConflictError{
ActorID: event.ActorID,
AttemptedVersion: event.Version,
CurrentVersion: currentVersion,
}
}
// Serialize event to JSON // Serialize event to JSON
data, err := json.Marshal(event) data, err := json.Marshal(event)
if err != nil { if err != nil {
@@ -64,11 +88,50 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
return fmt.Errorf("failed to publish event to JetStream: %w", err) return fmt.Errorf("failed to publish event to JetStream: %w", err)
} }
// Update version cache
jes.versions[event.ActorID] = event.Version
return nil return nil
} }
// getLatestVersionLocked returns the latest version for an actor.
// Caller must hold jes.mu.
func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) {
// Check cache first
if version, ok := jes.versions[actorID]; ok {
return version, nil
}
// Fetch from JetStream
events, err := jes.getEventsInternal(actorID, 0)
if err != nil {
return 0, err
}
if len(events) == 0 {
return 0, nil
}
latestVersion := int64(0)
for _, event := range events {
if event.Version > latestVersion {
latestVersion = event.Version
}
}
// Update cache
jes.versions[actorID] = latestVersion
return latestVersion, nil
}
// GetEvents retrieves all events for an actor since a version // GetEvents retrieves all events for an actor since a version
func (jes *JetStreamEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) { func (jes *JetStreamEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
return jes.getEventsInternal(actorID, fromVersion)
}
// getEventsInternal is the internal implementation of GetEvents
func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion int64) ([]*aether.Event, error) {
// Create subject filter for this actor // Create subject filter for this actor
subject := fmt.Sprintf("%s.events.%s.%s", subject := fmt.Sprintf("%s.events.%s.%s",
jes.streamName, jes.streamName,

View File

@@ -1,7 +1,6 @@
package store package store
import ( import (
"fmt"
"sync" "sync"
"git.flowmade.one/flowmade-one/aether" "git.flowmade.one/flowmade-one/aether"
@@ -22,11 +21,32 @@ func NewInMemoryEventStore() *InMemoryEventStore {
} }
} }
// SaveEvent saves an event to the in-memory store // SaveEvent saves an event to the in-memory store.
// Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor.
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
es.mu.Lock() es.mu.Lock()
defer es.mu.Unlock() defer es.mu.Unlock()
// Get current latest version for this actor
currentVersion := int64(0)
if events, exists := es.events[event.ActorID]; exists {
for _, e := range events {
if e.Version > currentVersion {
currentVersion = e.Version
}
}
}
// Validate version is strictly greater than current
if event.Version <= currentVersion {
return &aether.VersionConflictError{
ActorID: event.ActorID,
AttemptedVersion: event.Version,
CurrentVersion: currentVersion,
}
}
if _, exists := es.events[event.ActorID]; !exists { if _, exists := es.events[event.ActorID]; !exists {
es.events[event.ActorID] = make([]*aether.Event, 0) es.events[event.ActorID] = make([]*aether.Event, 0)
} }
@@ -77,7 +97,7 @@ func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) {
// SaveSnapshot saves a snapshot to the in-memory store // SaveSnapshot saves a snapshot to the in-memory store
func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error { func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error {
if snapshot == nil { if snapshot == nil {
return fmt.Errorf("snapshot cannot be nil") return &snapshotNilError{}
} }
es.mu.Lock() es.mu.Lock()
@@ -109,3 +129,10 @@ func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSn
return latest, nil return latest, nil
} }
// snapshotNilError is returned when attempting to save a nil snapshot
type snapshotNilError struct{}
func (e *snapshotNilError) Error() string {
return "snapshot cannot be nil"
}

View File

@@ -1,15 +1,17 @@
package store package store
import ( import (
"errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
"git.flowmade.one/flowmade-one/aether" "git.flowmade.one/flowmade-one/aether"
) )
// === Event Store Tests (from main branch) === // === Event Store Tests ===
func TestNewInMemoryEventStore(t *testing.T) { func TestNewInMemoryEventStore(t *testing.T) {
store := NewInMemoryEventStore() store := NewInMemoryEventStore()
@@ -292,8 +294,8 @@ func TestGetEvents_FromVersionZero(t *testing.T) {
func TestGetLatestVersion_ReturnsCorrectVersion(t *testing.T) { func TestGetLatestVersion_ReturnsCorrectVersion(t *testing.T) {
store := NewInMemoryEventStore() store := NewInMemoryEventStore()
// Save events with various versions // Save events with strictly increasing versions
versions := []int64{1, 3, 2, 5, 4} // Out of order versions := []int64{1, 2, 3, 4, 5}
for i, version := range versions { for i, version := range versions {
event := &aether.Event{ event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i), ID: fmt.Sprintf("evt-%d", i),
@@ -553,17 +555,18 @@ func TestConcurrentSaveAndGet(t *testing.T) {
}() }()
} }
// Start writers // Start writers - each writer gets their own actor to avoid version conflicts
wg.Add(numWriters) wg.Add(numWriters)
for w := 0; w < numWriters; w++ { for w := 0; w < numWriters; w++ {
go func(writerID int) { go func(writerID int) {
defer wg.Done() defer wg.Done()
actorID := fmt.Sprintf("writer-actor-%d", writerID)
for i := 0; i < writesPerWriter; i++ { for i := 0; i < writesPerWriter; i++ {
event := &aether.Event{ event := &aether.Event{
ID: fmt.Sprintf("evt-writer-%d-%d", writerID, i), ID: fmt.Sprintf("evt-writer-%d-%d", writerID, i),
EventType: "TestEvent", EventType: "TestEvent",
ActorID: "actor-123", ActorID: actorID,
Version: int64(100 + writerID*writesPerWriter + i), Version: int64(i + 1),
Data: map[string]interface{}{}, Data: map[string]interface{}{},
Timestamp: time.Now(), Timestamp: time.Now(),
} }
@@ -576,15 +579,27 @@ func TestConcurrentSaveAndGet(t *testing.T) {
wg.Wait() wg.Wait()
// Verify final state // Verify actor-123 still has its original events
events, err := store.GetEvents("actor-123", 0) events, err := store.GetEvents("actor-123", 0)
if err != nil { if err != nil {
t.Fatalf("GetEvents failed: %v", err) t.Fatalf("GetEvents failed: %v", err)
} }
expectedTotal := 10 + numWriters*writesPerWriter if len(events) != 10 {
if len(events) != expectedTotal { t.Errorf("expected 10 events for actor-123, got %d", len(events))
t.Errorf("expected %d total events, got %d", expectedTotal, len(events)) }
// Verify each writer's actor has the correct events
for w := 0; w < numWriters; w++ {
actorID := fmt.Sprintf("writer-actor-%d", w)
events, err := store.GetEvents(actorID, 0)
if err != nil {
t.Errorf("GetEvents failed for %s: %v", actorID, err)
continue
}
if len(events) != writesPerWriter {
t.Errorf("expected %d events for %s, got %d", writesPerWriter, actorID, len(events))
}
} }
} }
@@ -686,8 +701,8 @@ func TestSaveEvent_EmptyData(t *testing.T) {
func TestGetEvents_VersionEdgeCases(t *testing.T) { func TestGetEvents_VersionEdgeCases(t *testing.T) {
store := NewInMemoryEventStore() store := NewInMemoryEventStore()
// Save events with edge case versions // Save events with edge case versions (strictly increasing)
versions := []int64{0, 1, 9223372036854775807} // Zero, one, MaxInt64 versions := []int64{1, 100, 9223372036854775807} // one, 100, MaxInt64
for i, version := range versions { for i, version := range versions {
event := &aether.Event{ event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i), ID: fmt.Sprintf("evt-%d", i),
@@ -698,7 +713,7 @@ func TestGetEvents_VersionEdgeCases(t *testing.T) {
Timestamp: time.Now(), Timestamp: time.Now(),
} }
if err := store.SaveEvent(event); err != nil { if err := store.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent failed: %v", err) t.Fatalf("SaveEvent failed for version %d: %v", version, err)
} }
} }
@@ -787,6 +802,189 @@ func TestSaveEvent_SpecialActorIDs(t *testing.T) {
} }
} }
// === Version Validation Tests ===
func TestSaveEvent_RejectsLowerVersion(t *testing.T) {
store := NewInMemoryEventStore()
// Save first event with version 5
event1 := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 5,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event1); err != nil {
t.Fatalf("SaveEvent failed for first event: %v", err)
}
// Attempt to save event with lower version (should fail)
event2 := &aether.Event{
ID: "evt-2",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 3,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event2)
if err == nil {
t.Fatal("expected error when saving event with lower version, got nil")
}
// Verify it's a VersionConflictError
if !errors.Is(err, aether.ErrVersionConflict) {
t.Errorf("expected ErrVersionConflict, got %v", err)
}
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
t.Fatalf("expected VersionConflictError, got %T", err)
}
if versionErr.ActorID != "actor-123" {
t.Errorf("ActorID mismatch: got %q, want %q", versionErr.ActorID, "actor-123")
}
if versionErr.CurrentVersion != 5 {
t.Errorf("CurrentVersion mismatch: got %d, want %d", versionErr.CurrentVersion, 5)
}
if versionErr.AttemptedVersion != 3 {
t.Errorf("AttemptedVersion mismatch: got %d, want %d", versionErr.AttemptedVersion, 3)
}
}
func TestSaveEvent_RejectsEqualVersion(t *testing.T) {
store := NewInMemoryEventStore()
// Save first event with version 5
event1 := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 5,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event1); err != nil {
t.Fatalf("SaveEvent failed for first event: %v", err)
}
// Attempt to save event with equal version (should fail)
event2 := &aether.Event{
ID: "evt-2",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 5,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event2)
if err == nil {
t.Fatal("expected error when saving event with equal version, got nil")
}
if !errors.Is(err, aether.ErrVersionConflict) {
t.Errorf("expected ErrVersionConflict, got %v", err)
}
}
func TestSaveEvent_RejectsZeroVersion(t *testing.T) {
store := NewInMemoryEventStore()
// Version 0 should be rejected (not strictly greater than initial 0)
event := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-new",
Version: 0,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err == nil {
t.Fatal("expected error when saving event with version 0, got nil")
}
if !errors.Is(err, aether.ErrVersionConflict) {
t.Errorf("expected ErrVersionConflict, got %v", err)
}
}
func TestSaveEvent_RejectsNegativeVersion(t *testing.T) {
store := NewInMemoryEventStore()
event := &aether.Event{
ID: "evt-neg",
EventType: "TestEvent",
ActorID: "actor-123",
Version: -1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err == nil {
t.Fatal("expected error when saving event with negative version, got nil")
}
if !errors.Is(err, aether.ErrVersionConflict) {
t.Errorf("expected ErrVersionConflict, got %v", err)
}
}
func TestSaveEvent_ConcurrentWritesToSameActor(t *testing.T) {
store := NewInMemoryEventStore()
numGoroutines := 100
var successCount int64
var conflictCount int64
var wg sync.WaitGroup
// All goroutines try to save version 1
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func(id int) {
defer wg.Done()
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", id),
EventType: "TestEvent",
ActorID: "actor-contested",
Version: 1,
Data: map[string]interface{}{"goroutine": id},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err == nil {
atomic.AddInt64(&successCount, 1)
} else if errors.Is(err, aether.ErrVersionConflict) {
atomic.AddInt64(&conflictCount, 1)
} else {
t.Errorf("unexpected error: %v", err)
}
}(i)
}
wg.Wait()
// Exactly one should succeed, rest should conflict
if successCount != 1 {
t.Errorf("expected exactly 1 success, got %d", successCount)
}
if conflictCount != int64(numGoroutines-1) {
t.Errorf("expected %d conflicts, got %d", numGoroutines-1, conflictCount)
}
// Verify only one event was stored
events, err := store.GetEvents("actor-contested", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 1 {
t.Errorf("expected 1 event, got %d", len(events))
}
}
func BenchmarkSaveEvent(b *testing.B) { func BenchmarkSaveEvent(b *testing.B) {
store := NewInMemoryEventStore() store := NewInMemoryEventStore()
@@ -848,7 +1046,7 @@ func BenchmarkGetLatestVersion(b *testing.B) {
} }
} }
// === Snapshot Store Tests (from PR branch) === // === Snapshot Store Tests ===
func TestSaveSnapshot_PersistsCorrectly(t *testing.T) { func TestSaveSnapshot_PersistsCorrectly(t *testing.T) {
store := NewInMemoryEventStore() store := NewInMemoryEventStore()