[Issue #6] Add event versioning validation #34
44
CLAUDE.md
44
CLAUDE.md
@@ -79,6 +79,49 @@ store.SaveEvent(event)
|
||||
events, _ := store.GetEvents("order-123", 0)
|
||||
```
|
||||
|
||||
### Event Versioning
|
||||
|
||||
Events for each actor must have **monotonically increasing versions**. This ensures event stream integrity and enables optimistic concurrency control.
|
||||
|
||||
#### Version Semantics
|
||||
|
||||
- Each actor has an independent version sequence
|
||||
- Version must be strictly greater than the current latest version
|
||||
- For new actors (no events), the first event must have version > 0
|
||||
- Non-consecutive versions are allowed (gaps are permitted)
|
||||
|
||||
#### Optimistic Concurrency Pattern
|
||||
|
||||
```go
|
||||
// 1. Get current version
|
||||
currentVersion, _ := store.GetLatestVersion("order-123")
|
||||
|
||||
// 2. Create event with next version
|
||||
event := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
EventType: "OrderUpdated",
|
||||
ActorID: "order-123",
|
||||
Version: currentVersion + 1,
|
||||
Data: map[string]interface{}{"status": "shipped"},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// 3. Attempt to save
|
||||
err := store.SaveEvent(event)
|
||||
if errors.Is(err, aether.ErrVersionConflict) {
|
||||
// Another writer won - reload and retry if appropriate
|
||||
var versionErr *aether.VersionConflictError
|
||||
errors.As(err, &versionErr)
|
||||
log.Printf("Conflict: actor %s has version %d, attempted %d",
|
||||
versionErr.ActorID, versionErr.CurrentVersion, versionErr.AttemptedVersion)
|
||||
}
|
||||
```
|
||||
|
||||
#### Error Types
|
||||
|
||||
- `ErrVersionConflict` - Sentinel error for version conflicts (use with `errors.Is`)
|
||||
- `VersionConflictError` - Detailed error with ActorID, CurrentVersion, and AttemptedVersion
|
||||
|
||||
### Namespace Isolation
|
||||
|
||||
Namespaces provide logical boundaries for events and subscriptions:
|
||||
@@ -111,6 +154,7 @@ if manager.IsLeader() {
|
||||
## Key Patterns
|
||||
|
||||
- **Events are immutable** - Never modify, only append
|
||||
- **Versions are monotonic** - Each event must have version > previous for same actor
|
||||
- **Snapshots for performance** - Periodically snapshot state to avoid full replay
|
||||
- **Namespaces for isolation** - Not multi-tenancy, just logical boundaries
|
||||
- **NATS for everything** - Events, pub/sub, clustering all use NATS
|
||||
|
||||
52
event.go
52
event.go
@@ -1,9 +1,33 @@
|
||||
package aether
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ErrVersionConflict is returned when attempting to save an event with a version
|
||||
// that is not strictly greater than the current latest version for an actor.
|
||||
// This ensures events have monotonically increasing versions per actor.
|
||||
var ErrVersionConflict = errors.New("version conflict")
|
||||
|
||||
// VersionConflictError provides details about a version conflict.
|
||||
// It is returned when SaveEvent is called with a version <= the current latest version.
|
||||
type VersionConflictError struct {
|
||||
ActorID string
|
||||
AttemptedVersion int64
|
||||
CurrentVersion int64
|
||||
}
|
||||
|
||||
func (e *VersionConflictError) Error() string {
|
||||
return fmt.Sprintf("%s: actor %q has version %d, cannot save version %d",
|
||||
ErrVersionConflict, e.ActorID, e.CurrentVersion, e.AttemptedVersion)
|
||||
}
|
||||
|
||||
func (e *VersionConflictError) Unwrap() error {
|
||||
return ErrVersionConflict
|
||||
}
|
||||
|
||||
// Event represents a domain event in the system
|
||||
type Event struct {
|
||||
ID string `json:"id"`
|
||||
@@ -23,10 +47,36 @@ type ActorSnapshot struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
// EventStore defines the interface for event persistence
|
||||
// EventStore defines the interface for event persistence.
|
||||
//
|
||||
// # Version Semantics
|
||||
//
|
||||
// Events for an actor must have monotonically increasing versions. When SaveEvent
|
||||
// is called, the implementation must validate that the event's version is strictly
|
||||
// greater than the current latest version for that actor. If the version is less
|
||||
// than or equal to the current version, SaveEvent must return a VersionConflictError
|
||||
// (which wraps ErrVersionConflict).
|
||||
//
|
||||
// This validation ensures event stream integrity and enables optimistic concurrency
|
||||
// control. Clients should:
|
||||
// 1. Call GetLatestVersion to get the current version for an actor
|
||||
// 2. Set the new event's version to currentVersion + 1
|
||||
// 3. Call SaveEvent - if it returns ErrVersionConflict, another writer won
|
||||
// 4. On conflict, reload the latest version and retry if appropriate
|
||||
//
|
||||
// For new actors (no existing events), version 1 is expected for the first event.
|
||||
type EventStore interface {
|
||||
// SaveEvent persists an event to the store. The event's Version must be
|
||||
// strictly greater than the current latest version for the actor.
|
||||
// Returns VersionConflictError if version <= current latest version.
|
||||
SaveEvent(event *Event) error
|
||||
|
||||
// GetEvents retrieves events for an actor from a specific version (inclusive).
|
||||
// Returns an empty slice if no events exist for the actor.
|
||||
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
|
||||
|
||||
// GetLatestVersion returns the latest version for an actor.
|
||||
// Returns 0 if no events exist for the actor.
|
||||
GetLatestVersion(actorID string) (int64, error)
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
@@ -14,6 +15,8 @@ import (
|
||||
type JetStreamEventStore struct {
|
||||
js nats.JetStreamContext
|
||||
streamName string
|
||||
mu sync.Mutex // Protects version checks during SaveEvent
|
||||
versions map[string]int64 // actorID -> latest version cache
|
||||
}
|
||||
|
||||
// NewJetStreamEventStore creates a new JetStream-based event store
|
||||
@@ -41,11 +44,32 @@ func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamE
|
||||
return &JetStreamEventStore{
|
||||
js: js,
|
||||
streamName: streamName,
|
||||
versions: make(map[string]int64),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SaveEvent persists an event to JetStream
|
||||
// SaveEvent persists an event to JetStream.
|
||||
// Returns VersionConflictError if the event's version is not strictly greater
|
||||
// than the current latest version for the actor.
|
||||
func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
||||
jes.mu.Lock()
|
||||
defer jes.mu.Unlock()
|
||||
|
||||
// Get current latest version for this actor
|
||||
currentVersion, err := jes.getLatestVersionLocked(event.ActorID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get latest version: %w", err)
|
||||
}
|
||||
|
||||
// Validate version is strictly greater than current
|
||||
if event.Version <= currentVersion {
|
||||
return &aether.VersionConflictError{
|
||||
ActorID: event.ActorID,
|
||||
AttemptedVersion: event.Version,
|
||||
CurrentVersion: currentVersion,
|
||||
}
|
||||
}
|
||||
|
||||
// Serialize event to JSON
|
||||
data, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
@@ -64,11 +88,50 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
||||
return fmt.Errorf("failed to publish event to JetStream: %w", err)
|
||||
}
|
||||
|
||||
// Update version cache
|
||||
jes.versions[event.ActorID] = event.Version
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getLatestVersionLocked returns the latest version for an actor.
|
||||
// Caller must hold jes.mu.
|
||||
func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) {
|
||||
// Check cache first
|
||||
if version, ok := jes.versions[actorID]; ok {
|
||||
return version, nil
|
||||
}
|
||||
|
||||
// Fetch from JetStream
|
||||
events, err := jes.getEventsInternal(actorID, 0)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if len(events) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
latestVersion := int64(0)
|
||||
for _, event := range events {
|
||||
if event.Version > latestVersion {
|
||||
latestVersion = event.Version
|
||||
}
|
||||
}
|
||||
|
||||
// Update cache
|
||||
jes.versions[actorID] = latestVersion
|
||||
|
||||
return latestVersion, nil
|
||||
}
|
||||
|
||||
// GetEvents retrieves all events for an actor since a version
|
||||
func (jes *JetStreamEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||
return jes.getEventsInternal(actorID, fromVersion)
|
||||
}
|
||||
|
||||
// getEventsInternal is the internal implementation of GetEvents
|
||||
func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||
// Create subject filter for this actor
|
||||
subject := fmt.Sprintf("%s.events.%s.%s",
|
||||
jes.streamName,
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
@@ -22,11 +21,32 @@ func NewInMemoryEventStore() *InMemoryEventStore {
|
||||
}
|
||||
}
|
||||
|
||||
// SaveEvent saves an event to the in-memory store
|
||||
// SaveEvent saves an event to the in-memory store.
|
||||
// Returns VersionConflictError if the event's version is not strictly greater
|
||||
// than the current latest version for the actor.
|
||||
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||
es.mu.Lock()
|
||||
defer es.mu.Unlock()
|
||||
|
||||
// Get current latest version for this actor
|
||||
currentVersion := int64(0)
|
||||
if events, exists := es.events[event.ActorID]; exists {
|
||||
for _, e := range events {
|
||||
if e.Version > currentVersion {
|
||||
currentVersion = e.Version
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Validate version is strictly greater than current
|
||||
if event.Version <= currentVersion {
|
||||
return &aether.VersionConflictError{
|
||||
ActorID: event.ActorID,
|
||||
AttemptedVersion: event.Version,
|
||||
CurrentVersion: currentVersion,
|
||||
}
|
||||
}
|
||||
|
||||
if _, exists := es.events[event.ActorID]; !exists {
|
||||
es.events[event.ActorID] = make([]*aether.Event, 0)
|
||||
}
|
||||
@@ -77,7 +97,7 @@ func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) {
|
||||
// SaveSnapshot saves a snapshot to the in-memory store
|
||||
func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error {
|
||||
if snapshot == nil {
|
||||
return fmt.Errorf("snapshot cannot be nil")
|
||||
return &snapshotNilError{}
|
||||
}
|
||||
|
||||
es.mu.Lock()
|
||||
@@ -109,3 +129,10 @@ func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSn
|
||||
|
||||
return latest, nil
|
||||
}
|
||||
|
||||
// snapshotNilError is returned when attempting to save a nil snapshot
|
||||
type snapshotNilError struct{}
|
||||
|
||||
func (e *snapshotNilError) Error() string {
|
||||
return "snapshot cannot be nil"
|
||||
}
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
)
|
||||
|
||||
// === Event Store Tests (from main branch) ===
|
||||
// === Event Store Tests ===
|
||||
|
||||
func TestNewInMemoryEventStore(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
@@ -292,8 +294,8 @@ func TestGetEvents_FromVersionZero(t *testing.T) {
|
||||
func TestGetLatestVersion_ReturnsCorrectVersion(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save events with various versions
|
||||
versions := []int64{1, 3, 2, 5, 4} // Out of order
|
||||
// Save events with strictly increasing versions
|
||||
versions := []int64{1, 2, 3, 4, 5}
|
||||
for i, version := range versions {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
@@ -553,17 +555,18 @@ func TestConcurrentSaveAndGet(t *testing.T) {
|
||||
}()
|
||||
}
|
||||
|
||||
// Start writers
|
||||
// Start writers - each writer gets their own actor to avoid version conflicts
|
||||
wg.Add(numWriters)
|
||||
for w := 0; w < numWriters; w++ {
|
||||
go func(writerID int) {
|
||||
defer wg.Done()
|
||||
actorID := fmt.Sprintf("writer-actor-%d", writerID)
|
||||
for i := 0; i < writesPerWriter; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-writer-%d-%d", writerID, i),
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: int64(100 + writerID*writesPerWriter + i),
|
||||
ActorID: actorID,
|
||||
Version: int64(i + 1),
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
@@ -576,15 +579,27 @@ func TestConcurrentSaveAndGet(t *testing.T) {
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Verify final state
|
||||
// Verify actor-123 still has its original events
|
||||
events, err := store.GetEvents("actor-123", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
|
||||
expectedTotal := 10 + numWriters*writesPerWriter
|
||||
if len(events) != expectedTotal {
|
||||
t.Errorf("expected %d total events, got %d", expectedTotal, len(events))
|
||||
if len(events) != 10 {
|
||||
t.Errorf("expected 10 events for actor-123, got %d", len(events))
|
||||
}
|
||||
|
||||
// Verify each writer's actor has the correct events
|
||||
for w := 0; w < numWriters; w++ {
|
||||
actorID := fmt.Sprintf("writer-actor-%d", w)
|
||||
events, err := store.GetEvents(actorID, 0)
|
||||
if err != nil {
|
||||
t.Errorf("GetEvents failed for %s: %v", actorID, err)
|
||||
continue
|
||||
}
|
||||
if len(events) != writesPerWriter {
|
||||
t.Errorf("expected %d events for %s, got %d", writesPerWriter, actorID, len(events))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -686,8 +701,8 @@ func TestSaveEvent_EmptyData(t *testing.T) {
|
||||
func TestGetEvents_VersionEdgeCases(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save events with edge case versions
|
||||
versions := []int64{0, 1, 9223372036854775807} // Zero, one, MaxInt64
|
||||
// Save events with edge case versions (strictly increasing)
|
||||
versions := []int64{1, 100, 9223372036854775807} // one, 100, MaxInt64
|
||||
for i, version := range versions {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
@@ -698,7 +713,7 @@ func TestGetEvents_VersionEdgeCases(t *testing.T) {
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
t.Fatalf("SaveEvent failed for version %d: %v", version, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -787,6 +802,189 @@ func TestSaveEvent_SpecialActorIDs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// === Version Validation Tests ===
|
||||
|
||||
func TestSaveEvent_RejectsLowerVersion(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save first event with version 5
|
||||
event1 := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: 5,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event1); err != nil {
|
||||
t.Fatalf("SaveEvent failed for first event: %v", err)
|
||||
}
|
||||
|
||||
// Attempt to save event with lower version (should fail)
|
||||
event2 := &aether.Event{
|
||||
ID: "evt-2",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: 3,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
err := store.SaveEvent(event2)
|
||||
if err == nil {
|
||||
t.Fatal("expected error when saving event with lower version, got nil")
|
||||
}
|
||||
|
||||
// Verify it's a VersionConflictError
|
||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
t.Errorf("expected ErrVersionConflict, got %v", err)
|
||||
}
|
||||
|
||||
var versionErr *aether.VersionConflictError
|
||||
if !errors.As(err, &versionErr) {
|
||||
t.Fatalf("expected VersionConflictError, got %T", err)
|
||||
}
|
||||
|
||||
if versionErr.ActorID != "actor-123" {
|
||||
t.Errorf("ActorID mismatch: got %q, want %q", versionErr.ActorID, "actor-123")
|
||||
}
|
||||
if versionErr.CurrentVersion != 5 {
|
||||
t.Errorf("CurrentVersion mismatch: got %d, want %d", versionErr.CurrentVersion, 5)
|
||||
}
|
||||
if versionErr.AttemptedVersion != 3 {
|
||||
t.Errorf("AttemptedVersion mismatch: got %d, want %d", versionErr.AttemptedVersion, 3)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_RejectsEqualVersion(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Save first event with version 5
|
||||
event1 := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: 5,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event1); err != nil {
|
||||
t.Fatalf("SaveEvent failed for first event: %v", err)
|
||||
}
|
||||
|
||||
// Attempt to save event with equal version (should fail)
|
||||
event2 := &aether.Event{
|
||||
ID: "evt-2",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: 5,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
err := store.SaveEvent(event2)
|
||||
if err == nil {
|
||||
t.Fatal("expected error when saving event with equal version, got nil")
|
||||
}
|
||||
|
||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
t.Errorf("expected ErrVersionConflict, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_RejectsZeroVersion(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Version 0 should be rejected (not strictly greater than initial 0)
|
||||
event := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-new",
|
||||
Version: 0,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
err := store.SaveEvent(event)
|
||||
if err == nil {
|
||||
t.Fatal("expected error when saving event with version 0, got nil")
|
||||
}
|
||||
|
||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
t.Errorf("expected ErrVersionConflict, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_RejectsNegativeVersion(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-neg",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-123",
|
||||
Version: -1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
err := store.SaveEvent(event)
|
||||
if err == nil {
|
||||
t.Fatal("expected error when saving event with negative version, got nil")
|
||||
}
|
||||
|
||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
t.Errorf("expected ErrVersionConflict, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_ConcurrentWritesToSameActor(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
numGoroutines := 100
|
||||
var successCount int64
|
||||
var conflictCount int64
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// All goroutines try to save version 1
|
||||
wg.Add(numGoroutines)
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", id),
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-contested",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"goroutine": id},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
err := store.SaveEvent(event)
|
||||
if err == nil {
|
||||
atomic.AddInt64(&successCount, 1)
|
||||
} else if errors.Is(err, aether.ErrVersionConflict) {
|
||||
atomic.AddInt64(&conflictCount, 1)
|
||||
} else {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Exactly one should succeed, rest should conflict
|
||||
if successCount != 1 {
|
||||
t.Errorf("expected exactly 1 success, got %d", successCount)
|
||||
}
|
||||
if conflictCount != int64(numGoroutines-1) {
|
||||
t.Errorf("expected %d conflicts, got %d", numGoroutines-1, conflictCount)
|
||||
}
|
||||
|
||||
// Verify only one event was stored
|
||||
events, err := store.GetEvents("actor-contested", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
if len(events) != 1 {
|
||||
t.Errorf("expected 1 event, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSaveEvent(b *testing.B) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
@@ -848,7 +1046,7 @@ func BenchmarkGetLatestVersion(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
// === Snapshot Store Tests (from PR branch) ===
|
||||
// === Snapshot Store Tests ===
|
||||
|
||||
func TestSaveSnapshot_PersistsCorrectly(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
Reference in New Issue
Block a user