- Add ErrVersionConflict error type and VersionConflictError for detailed conflict information - Implement version validation in InMemoryEventStore.SaveEvent that rejects events with version <= current latest version - Implement version validation in JetStreamEventStore.SaveEvent with version caching for performance - Add comprehensive tests for version conflict detection including concurrent writes to same actor - Document versioning semantics in EventStore interface and CLAUDE.md This ensures events have monotonically increasing versions per actor and provides clear error messages for version conflicts, enabling optimistic concurrency control patterns. Closes #6 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
139 lines
3.6 KiB
Go
139 lines
3.6 KiB
Go
package store
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"git.flowmade.one/flowmade-one/aether"
|
|
)
|
|
|
|
// InMemoryEventStore provides a simple in-memory event store for testing
|
|
type InMemoryEventStore struct {
|
|
mu sync.RWMutex
|
|
events map[string][]*aether.Event // actorID -> events
|
|
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
|
}
|
|
|
|
// NewInMemoryEventStore creates a new in-memory event store
|
|
func NewInMemoryEventStore() *InMemoryEventStore {
|
|
return &InMemoryEventStore{
|
|
events: make(map[string][]*aether.Event),
|
|
snapshots: make(map[string][]*aether.ActorSnapshot),
|
|
}
|
|
}
|
|
|
|
// SaveEvent saves an event to the in-memory store.
|
|
// Returns VersionConflictError if the event's version is not strictly greater
|
|
// than the current latest version for the actor.
|
|
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
|
es.mu.Lock()
|
|
defer es.mu.Unlock()
|
|
|
|
// Get current latest version for this actor
|
|
currentVersion := int64(0)
|
|
if events, exists := es.events[event.ActorID]; exists {
|
|
for _, e := range events {
|
|
if e.Version > currentVersion {
|
|
currentVersion = e.Version
|
|
}
|
|
}
|
|
}
|
|
|
|
// Validate version is strictly greater than current
|
|
if event.Version <= currentVersion {
|
|
return &aether.VersionConflictError{
|
|
ActorID: event.ActorID,
|
|
AttemptedVersion: event.Version,
|
|
CurrentVersion: currentVersion,
|
|
}
|
|
}
|
|
|
|
if _, exists := es.events[event.ActorID]; !exists {
|
|
es.events[event.ActorID] = make([]*aether.Event, 0)
|
|
}
|
|
es.events[event.ActorID] = append(es.events[event.ActorID], event)
|
|
return nil
|
|
}
|
|
|
|
// GetEvents retrieves events for an actor from a specific version
|
|
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
|
es.mu.RLock()
|
|
defer es.mu.RUnlock()
|
|
|
|
events, exists := es.events[actorID]
|
|
if !exists {
|
|
return []*aether.Event{}, nil
|
|
}
|
|
|
|
var filteredEvents []*aether.Event
|
|
for _, event := range events {
|
|
if event.Version >= fromVersion {
|
|
filteredEvents = append(filteredEvents, event)
|
|
}
|
|
}
|
|
|
|
return filteredEvents, nil
|
|
}
|
|
|
|
// GetLatestVersion returns the latest version for an actor
|
|
func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) {
|
|
es.mu.RLock()
|
|
defer es.mu.RUnlock()
|
|
|
|
events, exists := es.events[actorID]
|
|
if !exists || len(events) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
latestVersion := int64(0)
|
|
for _, event := range events {
|
|
if event.Version > latestVersion {
|
|
latestVersion = event.Version
|
|
}
|
|
}
|
|
|
|
return latestVersion, nil
|
|
}
|
|
|
|
// SaveSnapshot saves a snapshot to the in-memory store
|
|
func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error {
|
|
if snapshot == nil {
|
|
return &snapshotNilError{}
|
|
}
|
|
|
|
es.mu.Lock()
|
|
defer es.mu.Unlock()
|
|
|
|
if _, exists := es.snapshots[snapshot.ActorID]; !exists {
|
|
es.snapshots[snapshot.ActorID] = make([]*aether.ActorSnapshot, 0)
|
|
}
|
|
es.snapshots[snapshot.ActorID] = append(es.snapshots[snapshot.ActorID], snapshot)
|
|
return nil
|
|
}
|
|
|
|
// GetLatestSnapshot returns the most recent snapshot for an actor
|
|
func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
|
|
es.mu.RLock()
|
|
defer es.mu.RUnlock()
|
|
|
|
snapshots, exists := es.snapshots[actorID]
|
|
if !exists || len(snapshots) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
var latest *aether.ActorSnapshot
|
|
for _, snapshot := range snapshots {
|
|
if latest == nil || snapshot.Version > latest.Version {
|
|
latest = snapshot
|
|
}
|
|
}
|
|
|
|
return latest, nil
|
|
}
|
|
|
|
// snapshotNilError is returned when attempting to save a nil snapshot
|
|
type snapshotNilError struct{}
|
|
|
|
func (e *snapshotNilError) Error() string {
|
|
return "snapshot cannot be nil"
|
|
}
|