Add event versioning validation
All checks were successful
CI / build (pull_request) Successful in 16s
CI / build (push) Successful in 15s

- Add ErrVersionConflict error type and VersionConflictError for detailed
  conflict information
- Implement version validation in InMemoryEventStore.SaveEvent that rejects
  events with version <= current latest version
- Implement version validation in JetStreamEventStore.SaveEvent with version
  caching for performance
- Add comprehensive tests for version conflict detection including concurrent
  writes to same actor
- Document versioning semantics in EventStore interface and CLAUDE.md

This ensures events have monotonically increasing versions per actor and
provides clear error messages for version conflicts, enabling optimistic
concurrency control patterns.

Closes #6

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit was merged in pull request #34.
This commit is contained in:
2026-01-09 17:56:50 +01:00
parent a269da4520
commit 02847bdaf5
5 changed files with 401 additions and 19 deletions

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"git.flowmade.one/flowmade-one/aether"
@@ -14,6 +15,8 @@ import (
type JetStreamEventStore struct {
js nats.JetStreamContext
streamName string
mu sync.Mutex // Protects version checks during SaveEvent
versions map[string]int64 // actorID -> latest version cache
}
// NewJetStreamEventStore creates a new JetStream-based event store
@@ -41,11 +44,32 @@ func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamE
return &JetStreamEventStore{
js: js,
streamName: streamName,
versions: make(map[string]int64),
}, nil
}
// SaveEvent persists an event to JetStream
// SaveEvent persists an event to JetStream.
// Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor.
func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
jes.mu.Lock()
defer jes.mu.Unlock()
// Get current latest version for this actor
currentVersion, err := jes.getLatestVersionLocked(event.ActorID)
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
}
// Validate version is strictly greater than current
if event.Version <= currentVersion {
return &aether.VersionConflictError{
ActorID: event.ActorID,
AttemptedVersion: event.Version,
CurrentVersion: currentVersion,
}
}
// Serialize event to JSON
data, err := json.Marshal(event)
if err != nil {
@@ -64,11 +88,50 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
return fmt.Errorf("failed to publish event to JetStream: %w", err)
}
// Update version cache
jes.versions[event.ActorID] = event.Version
return nil
}
// getLatestVersionLocked returns the latest version for an actor.
// Caller must hold jes.mu.
func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) {
// Check cache first
if version, ok := jes.versions[actorID]; ok {
return version, nil
}
// Fetch from JetStream
events, err := jes.getEventsInternal(actorID, 0)
if err != nil {
return 0, err
}
if len(events) == 0 {
return 0, nil
}
latestVersion := int64(0)
for _, event := range events {
if event.Version > latestVersion {
latestVersion = event.Version
}
}
// Update cache
jes.versions[actorID] = latestVersion
return latestVersion, nil
}
// GetEvents retrieves all events for an actor since a version
func (jes *JetStreamEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
return jes.getEventsInternal(actorID, fromVersion)
}
// getEventsInternal is the internal implementation of GetEvents
func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion int64) ([]*aether.Event, error) {
// Create subject filter for this actor
subject := fmt.Sprintf("%s.events.%s.%s",
jes.streamName,

View File

@@ -1,7 +1,6 @@
package store
import (
"fmt"
"sync"
"git.flowmade.one/flowmade-one/aether"
@@ -22,11 +21,32 @@ func NewInMemoryEventStore() *InMemoryEventStore {
}
}
// SaveEvent saves an event to the in-memory store
// SaveEvent saves an event to the in-memory store.
// Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor.
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
es.mu.Lock()
defer es.mu.Unlock()
// Get current latest version for this actor
currentVersion := int64(0)
if events, exists := es.events[event.ActorID]; exists {
for _, e := range events {
if e.Version > currentVersion {
currentVersion = e.Version
}
}
}
// Validate version is strictly greater than current
if event.Version <= currentVersion {
return &aether.VersionConflictError{
ActorID: event.ActorID,
AttemptedVersion: event.Version,
CurrentVersion: currentVersion,
}
}
if _, exists := es.events[event.ActorID]; !exists {
es.events[event.ActorID] = make([]*aether.Event, 0)
}
@@ -77,7 +97,7 @@ func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) {
// SaveSnapshot saves a snapshot to the in-memory store
func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error {
if snapshot == nil {
return fmt.Errorf("snapshot cannot be nil")
return &snapshotNilError{}
}
es.mu.Lock()
@@ -109,3 +129,10 @@ func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSn
return latest, nil
}
// snapshotNilError is returned when attempting to save a nil snapshot
type snapshotNilError struct{}
func (e *snapshotNilError) Error() string {
return "snapshot cannot be nil"
}

View File

@@ -1,15 +1,17 @@
package store
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
)
// === Event Store Tests (from main branch) ===
// === Event Store Tests ===
func TestNewInMemoryEventStore(t *testing.T) {
store := NewInMemoryEventStore()
@@ -292,8 +294,8 @@ func TestGetEvents_FromVersionZero(t *testing.T) {
func TestGetLatestVersion_ReturnsCorrectVersion(t *testing.T) {
store := NewInMemoryEventStore()
// Save events with various versions
versions := []int64{1, 3, 2, 5, 4} // Out of order
// Save events with strictly increasing versions
versions := []int64{1, 2, 3, 4, 5}
for i, version := range versions {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
@@ -553,17 +555,18 @@ func TestConcurrentSaveAndGet(t *testing.T) {
}()
}
// Start writers
// Start writers - each writer gets their own actor to avoid version conflicts
wg.Add(numWriters)
for w := 0; w < numWriters; w++ {
go func(writerID int) {
defer wg.Done()
actorID := fmt.Sprintf("writer-actor-%d", writerID)
for i := 0; i < writesPerWriter; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-writer-%d-%d", writerID, i),
EventType: "TestEvent",
ActorID: "actor-123",
Version: int64(100 + writerID*writesPerWriter + i),
ActorID: actorID,
Version: int64(i + 1),
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
@@ -576,15 +579,27 @@ func TestConcurrentSaveAndGet(t *testing.T) {
wg.Wait()
// Verify final state
// Verify actor-123 still has its original events
events, err := store.GetEvents("actor-123", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
expectedTotal := 10 + numWriters*writesPerWriter
if len(events) != expectedTotal {
t.Errorf("expected %d total events, got %d", expectedTotal, len(events))
if len(events) != 10 {
t.Errorf("expected 10 events for actor-123, got %d", len(events))
}
// Verify each writer's actor has the correct events
for w := 0; w < numWriters; w++ {
actorID := fmt.Sprintf("writer-actor-%d", w)
events, err := store.GetEvents(actorID, 0)
if err != nil {
t.Errorf("GetEvents failed for %s: %v", actorID, err)
continue
}
if len(events) != writesPerWriter {
t.Errorf("expected %d events for %s, got %d", writesPerWriter, actorID, len(events))
}
}
}
@@ -686,8 +701,8 @@ func TestSaveEvent_EmptyData(t *testing.T) {
func TestGetEvents_VersionEdgeCases(t *testing.T) {
store := NewInMemoryEventStore()
// Save events with edge case versions
versions := []int64{0, 1, 9223372036854775807} // Zero, one, MaxInt64
// Save events with edge case versions (strictly increasing)
versions := []int64{1, 100, 9223372036854775807} // one, 100, MaxInt64
for i, version := range versions {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
@@ -698,7 +713,7 @@ func TestGetEvents_VersionEdgeCases(t *testing.T) {
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent failed: %v", err)
t.Fatalf("SaveEvent failed for version %d: %v", version, err)
}
}
@@ -787,6 +802,189 @@ func TestSaveEvent_SpecialActorIDs(t *testing.T) {
}
}
// === Version Validation Tests ===
func TestSaveEvent_RejectsLowerVersion(t *testing.T) {
store := NewInMemoryEventStore()
// Save first event with version 5
event1 := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 5,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event1); err != nil {
t.Fatalf("SaveEvent failed for first event: %v", err)
}
// Attempt to save event with lower version (should fail)
event2 := &aether.Event{
ID: "evt-2",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 3,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event2)
if err == nil {
t.Fatal("expected error when saving event with lower version, got nil")
}
// Verify it's a VersionConflictError
if !errors.Is(err, aether.ErrVersionConflict) {
t.Errorf("expected ErrVersionConflict, got %v", err)
}
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
t.Fatalf("expected VersionConflictError, got %T", err)
}
if versionErr.ActorID != "actor-123" {
t.Errorf("ActorID mismatch: got %q, want %q", versionErr.ActorID, "actor-123")
}
if versionErr.CurrentVersion != 5 {
t.Errorf("CurrentVersion mismatch: got %d, want %d", versionErr.CurrentVersion, 5)
}
if versionErr.AttemptedVersion != 3 {
t.Errorf("AttemptedVersion mismatch: got %d, want %d", versionErr.AttemptedVersion, 3)
}
}
func TestSaveEvent_RejectsEqualVersion(t *testing.T) {
store := NewInMemoryEventStore()
// Save first event with version 5
event1 := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 5,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event1); err != nil {
t.Fatalf("SaveEvent failed for first event: %v", err)
}
// Attempt to save event with equal version (should fail)
event2 := &aether.Event{
ID: "evt-2",
EventType: "TestEvent",
ActorID: "actor-123",
Version: 5,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event2)
if err == nil {
t.Fatal("expected error when saving event with equal version, got nil")
}
if !errors.Is(err, aether.ErrVersionConflict) {
t.Errorf("expected ErrVersionConflict, got %v", err)
}
}
func TestSaveEvent_RejectsZeroVersion(t *testing.T) {
store := NewInMemoryEventStore()
// Version 0 should be rejected (not strictly greater than initial 0)
event := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-new",
Version: 0,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err == nil {
t.Fatal("expected error when saving event with version 0, got nil")
}
if !errors.Is(err, aether.ErrVersionConflict) {
t.Errorf("expected ErrVersionConflict, got %v", err)
}
}
func TestSaveEvent_RejectsNegativeVersion(t *testing.T) {
store := NewInMemoryEventStore()
event := &aether.Event{
ID: "evt-neg",
EventType: "TestEvent",
ActorID: "actor-123",
Version: -1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err == nil {
t.Fatal("expected error when saving event with negative version, got nil")
}
if !errors.Is(err, aether.ErrVersionConflict) {
t.Errorf("expected ErrVersionConflict, got %v", err)
}
}
func TestSaveEvent_ConcurrentWritesToSameActor(t *testing.T) {
store := NewInMemoryEventStore()
numGoroutines := 100
var successCount int64
var conflictCount int64
var wg sync.WaitGroup
// All goroutines try to save version 1
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func(id int) {
defer wg.Done()
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", id),
EventType: "TestEvent",
ActorID: "actor-contested",
Version: 1,
Data: map[string]interface{}{"goroutine": id},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err == nil {
atomic.AddInt64(&successCount, 1)
} else if errors.Is(err, aether.ErrVersionConflict) {
atomic.AddInt64(&conflictCount, 1)
} else {
t.Errorf("unexpected error: %v", err)
}
}(i)
}
wg.Wait()
// Exactly one should succeed, rest should conflict
if successCount != 1 {
t.Errorf("expected exactly 1 success, got %d", successCount)
}
if conflictCount != int64(numGoroutines-1) {
t.Errorf("expected %d conflicts, got %d", numGoroutines-1, conflictCount)
}
// Verify only one event was stored
events, err := store.GetEvents("actor-contested", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 1 {
t.Errorf("expected 1 event, got %d", len(events))
}
}
func BenchmarkSaveEvent(b *testing.B) {
store := NewInMemoryEventStore()
@@ -848,7 +1046,7 @@ func BenchmarkGetLatestVersion(b *testing.B) {
}
}
// === Snapshot Store Tests (from PR branch) ===
// === Snapshot Store Tests ===
func TestSaveSnapshot_PersistsCorrectly(t *testing.T) {
store := NewInMemoryEventStore()