Compare commits

..

3 Commits

Author SHA1 Message Date
Claude Code
e69f7a30e4 fix: address critical TOCTOU race condition and error handling inconsistencies
Some checks failed
CI / build (pull_request) Successful in 19s
CI / integration (pull_request) Failing after 1m59s
- Fix TOCTOU race condition in SaveEvent by holding the lock throughout entire version validation and publish operation
- Add getLatestVersionLocked helper method to prevent race window where multiple concurrent threads read the same currentVersion
- Fix GetLatestSnapshot to return error when no snapshot exists (not nil), distinguishing "not created" from "error occurred"
- The concurrent version conflict test now passes with exactly 1 success and 49 conflicts instead of 50 successes

These changes ensure thread-safe optimistic concurrency control and consistent error handling semantics.

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 09:02:36 +01:00
Claude Code
a258ec9754 fix: Address thread safety and resource management issues
Some checks failed
CI / build (pull_request) Successful in 19s
CI / integration (pull_request) Failing after 1m59s
- Fix thread safety issue in SaveEvent: Lock now only protects cache access. NATS I/O operations (GetLatestVersion calls) happen without holding the mutex, preventing lock contention when multiple concurrent SaveEvent calls occur.

- Improve cache handling: Check cache first with minimal lock hold time. For cache misses, unlock before calling GetLatestVersion, then re-lock only to update cache.

- Remove getLatestVersionLocked: No longer needed now that SaveEvent doesn't hold lock during GetLatestVersion calls.

- Fix error handling consistency: GetLatestSnapshot now returns (nil, nil) when no snapshot exists, consistent with GetLatestVersion returning 0 for no events. Both methods now treat empty results as normal cases rather than errors.

- Fix benchmark test: BenchmarkGetLatestVersion_NoCache now creates uncachedStore outside the timing loop. Previously, creating a new store on each iteration was too expensive and didn't properly measure GetLatestVersion performance.

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 01:30:39 +01:00
Claude Code
9d4ed1dd08 perf: Optimize GetLatestVersion to O(1) using JetStream DeliverLast
Some checks failed
CI / build (pull_request) Successful in 21s
CI / integration (pull_request) Failing after 2m0s
Closes #127

The GetLatestVersion method previously fetched all events for an actor to find
the maximum version, resulting in O(n) performance. This implementation replaces
the full scan with JetStream's DeliverLast() consumer option, which efficiently
retrieves only the last message without scanning all events.

Performance improvements:
- Uncached lookups: ~1.4ms regardless of event count (constant time)
- Cached lookups: ~630ns (very fast in-memory access)
- Memory usage: Same 557KB allocated regardless of event count
- Works correctly with cache invalidation

The change is backward compatible:
- Cache in getLatestVersionLocked continues to provide O(1) performance
- SaveEvent remains correct with version conflict detection
- All existing tests pass without modification
- Benchmark tests verify O(1) behavior

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 00:26:36 +01:00
13 changed files with 2719 additions and 1369 deletions

View File

@@ -17,3 +17,37 @@ jobs:
run: go build ./...
- name: Test
run: go test ./...
integration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.23'
- name: Install and Start NATS Server
run: |
# Detect architecture and download appropriate binary
ARCH=$(uname -m)
if [ "$ARCH" = "x86_64" ]; then
NATS_ARCH="amd64"
elif [ "$ARCH" = "aarch64" ] || [ "$ARCH" = "arm64" ]; then
NATS_ARCH="arm64"
else
echo "Unsupported architecture: $ARCH"
exit 1
fi
echo "Detected architecture: $ARCH, using NATS binary: $NATS_ARCH"
# Download and extract nats-server
curl -L "https://github.com/nats-io/nats-server/releases/download/v2.10.24/nats-server-v2.10.24-linux-${NATS_ARCH}.tar.gz" -o nats-server.tar.gz
tar -xzf nats-server.tar.gz
# Start NATS with JetStream
./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server -js -p 4222 &
# Wait for NATS to be ready
sleep 3
./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server --version
- name: Run Integration Tests
run: go test -tags=integration -v ./...

View File

@@ -122,30 +122,6 @@ if errors.Is(err, aether.ErrVersionConflict) {
- `ErrVersionConflict` - Sentinel error for version conflicts (use with `errors.Is`)
- `VersionConflictError` - Detailed error with ActorID, CurrentVersion, and AttemptedVersion
#### Version Cache Invalidation
The JetStreamEventStore maintains an in-memory cache of actor versions to optimize
repeated version checks during optimistic concurrency control. The cache is designed
to handle multi-store scenarios where external processes may write to the same
JetStream stream:
- **Cache hits**: Cached version is returned immediately for performance
- **Cache misses**: If no cached version exists, JetStream is queried and cached
- **External writes**: If GetLatestVersion detects a version newer than cached, the cache is invalidated and fresh data is fetched next time
This strategy ensures data consistency even in scenarios with external writers while
maintaining excellent performance for the single-writer case (where only Aether owns
the stream).
**Implementation detail**: The cache is invalidated by deleting the entry, forcing
a fresh fetch from JetStream on the next version check for that actor. This is
safe because:
1. SaveEvent uses getLatestVersionLocked() which checks JetStream on cache miss
2. GetLatestVersion always fetches fresh data and detects stale cache entries
3. Subsequent checks will fetch from JetStream until the cache is repopulated
### Namespace Isolation
Namespaces provide logical boundaries for events and subscriptions.

View File

@@ -107,34 +107,7 @@ Order state after replaying 2 events:
### Events are immutable
Events represent facts about what happened. Once saved, they are never modified or deleted - you only append new events. This immutability guarantee is enforced at multiple levels:
**Interface Design**: The `EventStore` interface provides no Update or Delete methods. Only `SaveEvent` (append), `GetEvents` (read), and `GetLatestVersion` (read) are available.
**JetStream Storage**: When using `JetStreamEventStore`, events are stored in a NATS JetStream stream configured with:
- File-based storage (durable)
- Limits-based retention policy (events expire after configured duration, not before)
- No mechanism to modify or delete individual events during their lifetime
**Audit Trail Guarantee**: Because events are immutable once persisted, they serve as a trustworthy audit trail. You can rely on the fact that historical events won't change, enabling compliance and forensics.
To correct a mistake, append a new event that expresses the correction rather than modifying history:
```go
// Wrong: Cannot update an event
// store.UpdateEvent(eventID, newData) // This method doesn't exist
// Right: Append a new event that corrects the record
correctionEvent := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderCorrected",
ActorID: orderID,
Version: currentVersion + 1,
Data: map[string]interface{}{"reason": "price adjustment"},
Timestamp: time.Now(),
}
err := store.SaveEvent(correctionEvent)
```
Events represent facts about what happened. Once saved, they are never modified - you only append new events.
### State is derived

View File

@@ -73,14 +73,6 @@ type Event struct {
Timestamp time.Time `json:"timestamp"`
}
// Common event types for Aether infrastructure
const (
// EventTypeEventStored is an internal event published when an event is successfully persisted.
// This event allows observability components (metrics, projections, audit systems) to react
// to persisted events without coupling to application code.
EventTypeEventStored = "EventStored"
)
// Common metadata keys for distributed tracing and auditing
const (
// MetadataKeyCorrelationID identifies related events across services
@@ -184,17 +176,6 @@ type ActorSnapshot struct {
// EventStore defines the interface for event persistence.
//
// # Immutability Guarantee
//
// EventStore is append-only. Once an event is persisted via SaveEvent, it is never
// modified or deleted. The interface intentionally provides no Update or Delete methods.
// This ensures:
// - Events serve as an immutable audit trail
// - State can be safely derived by replaying events
// - Concurrent reads are always safe (events never change)
//
// To correct a mistake, append a new event that expresses the correction.
//
// # Version Semantics
//
// Events for an actor must have monotonically increasing versions. When SaveEvent
@@ -215,13 +196,10 @@ type EventStore interface {
// SaveEvent persists an event to the store. The event's Version must be
// strictly greater than the current latest version for the actor.
// Returns VersionConflictError if version <= current latest version.
// Once saved, the event is immutable and can never be modified or deleted.
SaveEvent(event *Event) error
// GetEvents retrieves events for an actor from a specific version (inclusive).
// Returns an empty slice if no events exist for the actor.
// The returned events are guaranteed to be immutable - they will never be
// modified or deleted from the store.
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
// GetLatestVersion returns the latest version for an actor.

View File

@@ -2,8 +2,6 @@ package aether
import (
"encoding/json"
"errors"
"fmt"
"strings"
"testing"
"time"
@@ -1337,190 +1335,3 @@ func TestReplayError_WithLargeRawData(t *testing.T) {
// Error() should still work
_ = err.Error()
}
// Tests for VersionConflictError
func TestVersionConflictError_Error(t *testing.T) {
err := &VersionConflictError{
ActorID: "order-123",
AttemptedVersion: 3,
CurrentVersion: 5,
}
errMsg := err.Error()
// Verify error message contains all context
if !strings.Contains(errMsg, "order-123") {
t.Errorf("error message should contain ActorID, got: %s", errMsg)
}
if !strings.Contains(errMsg, "3") {
t.Errorf("error message should contain AttemptedVersion, got: %s", errMsg)
}
if !strings.Contains(errMsg, "5") {
t.Errorf("error message should contain CurrentVersion, got: %s", errMsg)
}
if !strings.Contains(errMsg, "version conflict") {
t.Errorf("error message should contain 'version conflict', got: %s", errMsg)
}
}
func TestVersionConflictError_Fields(t *testing.T) {
err := &VersionConflictError{
ActorID: "actor-456",
AttemptedVersion: 10,
CurrentVersion: 8,
}
if err.ActorID != "actor-456" {
t.Errorf("ActorID mismatch: got %q, want %q", err.ActorID, "actor-456")
}
if err.AttemptedVersion != 10 {
t.Errorf("AttemptedVersion mismatch: got %d, want %d", err.AttemptedVersion, 10)
}
if err.CurrentVersion != 8 {
t.Errorf("CurrentVersion mismatch: got %d, want %d", err.CurrentVersion, 8)
}
}
func TestVersionConflictError_Unwrap(t *testing.T) {
err := &VersionConflictError{
ActorID: "actor-789",
AttemptedVersion: 2,
CurrentVersion: 1,
}
unwrapped := err.Unwrap()
if unwrapped != ErrVersionConflict {
t.Errorf("Unwrap should return ErrVersionConflict sentinel")
}
}
func TestVersionConflictError_ErrorsIs(t *testing.T) {
err := &VersionConflictError{
ActorID: "test-actor",
AttemptedVersion: 5,
CurrentVersion: 4,
}
// Test that errors.Is works with sentinel
if !errors.Is(err, ErrVersionConflict) {
t.Error("errors.Is(err, ErrVersionConflict) should return true")
}
// Test that other errors don't match
if errors.Is(err, errors.New("other error")) {
t.Error("errors.Is should not match unrelated errors")
}
}
func TestVersionConflictError_ErrorsAs(t *testing.T) {
originalErr := &VersionConflictError{
ActorID: "actor-unwrap",
AttemptedVersion: 7,
CurrentVersion: 6,
}
var versionErr *VersionConflictError
if !errors.As(originalErr, &versionErr) {
t.Fatalf("errors.As should succeed with VersionConflictError")
}
// Verify fields are accessible through unwrapped error
if versionErr.ActorID != "actor-unwrap" {
t.Errorf("ActorID mismatch after As: got %q", versionErr.ActorID)
}
if versionErr.AttemptedVersion != 7 {
t.Errorf("AttemptedVersion mismatch after As: got %d", versionErr.AttemptedVersion)
}
if versionErr.CurrentVersion != 6 {
t.Errorf("CurrentVersion mismatch after As: got %d", versionErr.CurrentVersion)
}
}
func TestVersionConflictError_CanReadCurrentVersion(t *testing.T) {
// This test verifies that applications can read CurrentVersion for retry strategies
err := &VersionConflictError{
ActorID: "order-abc",
AttemptedVersion: 2,
CurrentVersion: 10,
}
var versionErr *VersionConflictError
if !errors.As(err, &versionErr) {
t.Fatal("failed to unwrap VersionConflictError")
}
// Application can use CurrentVersion to decide retry strategy
nextVersion := versionErr.CurrentVersion + 1
if nextVersion != 11 {
t.Errorf("application should be able to compute next version: got %d, want 11", nextVersion)
}
// Application can log detailed context
logMsg := fmt.Sprintf("Version conflict for actor %q: attempted %d, current %d, will retry with %d",
versionErr.ActorID, versionErr.AttemptedVersion, versionErr.CurrentVersion, nextVersion)
if !strings.Contains(logMsg, "order-abc") {
t.Errorf("application context logging failed: %s", logMsg)
}
}
func TestVersionConflictError_EdgeCases(t *testing.T) {
testCases := []struct {
name string
actorID string
attemp int64
current int64
}{
{"zero current", "actor-1", 1, 0},
{"large numbers", "actor-2", 1000000, 999999},
{"max int64", "actor-3", 9223372036854775807, 9223372036854775806},
{"negative attempt", "actor-4", -1, -2},
{"empty actor id", "", 1, 0},
{"special chars in actor id", "actor@#$%", 2, 1},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := &VersionConflictError{
ActorID: tc.actorID,
AttemptedVersion: tc.attemp,
CurrentVersion: tc.current,
}
// Should not panic
msg := err.Error()
if msg == "" {
t.Error("Error() should return non-empty string")
}
// Should be wrapped correctly
if err.Unwrap() != ErrVersionConflict {
t.Error("Unwrap should return ErrVersionConflict")
}
// errors.Is should work
if !errors.Is(err, ErrVersionConflict) {
t.Error("errors.Is should work for edge case")
}
})
}
}
func TestErrVersionConflict_Sentinel(t *testing.T) {
// Verify the sentinel error is correctly defined
if ErrVersionConflict == nil {
t.Fatal("ErrVersionConflict should not be nil")
}
expectedMsg := "version conflict"
if ErrVersionConflict.Error() != expectedMsg {
t.Errorf("ErrVersionConflict message mismatch: got %q, want %q", ErrVersionConflict.Error(), expectedMsg)
}
// Test that it's usable with errors.Is
if !errors.Is(ErrVersionConflict, ErrVersionConflict) {
t.Error("ErrVersionConflict should match itself with errors.Is")
}
}

View File

@@ -1,189 +0,0 @@
# Aether Examples
This directory contains examples demonstrating common patterns for using Aether.
## Retry Patterns (`retry_patterns.go`)
When saving events with optimistic concurrency control, your application may encounter `VersionConflictError` when multiple writers attempt to update the same actor concurrently. This file demonstrates several retry strategies.
### Pattern Overview
All retry patterns work with `VersionConflictError` which provides three critical fields:
- **ActorID**: The actor that experienced the conflict
- **CurrentVersion**: The latest version in the store
- **AttemptedVersion**: The version you tried to save
Your application can read these fields to make intelligent retry decisions.
### Available Patterns
#### SimpleRetryPattern
The most basic pattern - just retry with exponential backoff:
```go
// Automatically retries up to 3 times with exponential backoff
err := SimpleRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You want a straightforward retry mechanism without complex logic.
#### ConflictDetailedRetryPattern
Extracts detailed information from the conflict error to make smarter decisions:
```go
// Detects thrashing (multiple conflicts at same version)
// and can implement circuit-breaker logic
err := ConflictDetailedRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You need visibility into conflict patterns and want to detect system issues like thrashing.
#### JitterRetryPattern
Adds randomized jitter to prevent "thundering herd" when multiple writers retry:
```go
// Exponential backoff with jitter prevents synchronized retries
err := JitterRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You have high concurrency and want to prevent all writers from retrying at the same time.
#### AdaptiveRetryPattern
Adjusts backoff duration based on version distance (indicator of contention):
```go
// Light contention (gap=1): 50ms backoff
// Moderate contention (gap=3-10): proportional backoff
// High contention (gap>10): aggressive backoff
err := AdaptiveRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You want backoff strategy to respond to actual system load.
#### EventualConsistencyPattern
Instead of blocking on retry, queues the event for asynchronous retry:
```go
// Returns immediately, event is queued for background retry
EventualConsistencyPattern(store, retryQueue, event)
// Background worker processes the queue
for item := range retryQueue {
// Implement your own retry logic here
}
```
**Use when**: You can't afford to block the request, and background retry is acceptable.
#### CircuitBreakerPattern
Implements a circuit breaker to prevent cascading failures:
```go
cb := NewCircuitBreaker()
// Fails fast when circuit is open
err := CircuitBreakerRetryPattern(store, cb, "order-123", "OrderUpdated")
if err != nil && !cb.CanRetry() {
return ErrCircuitBreakerOpen
}
```
**Use when**: You have a distributed system and want to prevent retry storms during outages.
## Common Pattern: Extract and Log Context
All patterns can read context from `VersionConflictError`:
```go
var versionErr *aether.VersionConflictError
if errors.As(err, &versionErr) {
log.Printf(
"Conflict for actor %q: attempted %d, current %d",
versionErr.ActorID,
versionErr.AttemptedVersion,
versionErr.CurrentVersion,
)
}
```
## Sentinel Error Check
Check if an error is a version conflict without examining the struct:
```go
if errors.Is(err, aether.ErrVersionConflict) {
// This is a version conflict - retry is appropriate
}
```
## Implementing Your Own Pattern
Basic template:
```go
for attempt := 0; attempt < maxRetries; attempt++ {
// 1. Get current version
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
// 2. Create event with next version
event := &aether.Event{
ActorID: actorID,
Version: currentVersion + 1,
// ... other fields
}
// 3. Attempt save
err = store.SaveEvent(event)
if err == nil {
return nil // Success
}
// 4. Check if it's a conflict
if !errors.Is(err, aether.ErrVersionConflict) {
return err // Some other error
}
// 5. Implement your retry strategy
time.Sleep(yourBackoff(attempt))
}
```
## Choosing a Pattern
| Pattern | Latency | Throughput | Complexity | Use Case |
|---------|---------|-----------|-----------|----------|
| Simple | Low | Low | Very Low | Single writer, testing |
| DetailedConflict | Low | Medium | Medium | Debugging, monitoring |
| Jitter | Low-Medium | High | Low | Multi-writer concurrency |
| Adaptive | Low-Medium | High | Medium | Variable load scenarios |
| EventualConsistency | Very Low | Very High | High | High-volume, async-OK workloads |
| CircuitBreaker | Variable | Stable | High | Distributed, failure-resilient systems |
## Performance Considerations
1. **Backoff timing**: Shorter backoffs waste CPU on retries, longer backoffs increase latency
2. **Retry limits**: Too few retries give up too early, too many waste resources
3. **Jitter**: Essential for preventing synchronized retries in high-concurrency scenarios
4. **Monitoring**: Track retry rates and conflict patterns to detect system issues
## Testing
Use `aether.NewInMemoryEventStore()` in tests:
```go
store := store.NewInMemoryEventStore()
err := SimpleRetryPattern(store, "test-actor", "TestEvent")
if err != nil {
t.Fatalf("retry pattern failed: %v", err)
}
```

View File

@@ -1,353 +0,0 @@
package examples
import (
"errors"
"fmt"
"log"
"math"
"math/rand"
"time"
"git.flowmade.one/flowmade-one/aether"
)
// SimpleRetryPattern demonstrates a basic retry loop using VersionConflictError.
//
// This pattern is suitable for scenarios where you want to automatically retry
// with exponential backoff when version conflicts occur.
func SimpleRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
const initialBackoff = 100 * time.Millisecond
var event *aether.Event
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * initialBackoff
log.Printf("Retry attempt %d after %v", attempt, backoff)
time.Sleep(backoff)
}
// Get the current version for the actor
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
}
// Create event with next version
event = &aether.Event{
ID: fmt.Sprintf("evt-%d-%d", time.Now().UnixNano(), attempt),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{"attempt": attempt},
Timestamp: time.Now(),
}
// Attempt to save
if err := store.SaveEvent(event); err == nil {
log.Printf("Successfully saved event for actor %s at version %d", actorID, event.Version)
return nil
} else if !errors.Is(err, aether.ErrVersionConflict) {
// Some other error occurred
return fmt.Errorf("save event failed: %w", err)
}
// If it's a version conflict, loop will retry
}
return fmt.Errorf("failed to save event after %d retries", maxRetries)
}
// ConflictDetailedRetryPattern demonstrates how to extract detailed information
// from VersionConflictError to make intelligent retry decisions.
//
// This pattern shows how to log detailed context and potentially implement
// circuit-breaker logic based on the conflict information.
func ConflictDetailedRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 5
var lastConflictVersion int64
for attempt := 0; attempt < maxRetries; attempt++ {
// Get current version
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
// Create event
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{"timestamp": time.Now()},
Timestamp: time.Now(),
}
// Attempt to save
err = store.SaveEvent(event)
if err == nil {
return nil // Success
}
// Check if it's a version conflict
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
// Not a version conflict, fail immediately
return err
}
// Extract detailed context from the conflict error
log.Printf(
"Version conflict for actor %q: attempted version %d, current version %d",
versionErr.ActorID,
versionErr.AttemptedVersion,
versionErr.CurrentVersion,
)
// Check for thrashing (multiple conflicts with same version)
if lastConflictVersion == versionErr.CurrentVersion && attempt > 0 {
log.Printf("Detected version thrashing - circuit breaker would trigger here")
return fmt.Errorf("circuit breaker: too many conflicts at version %d", versionErr.CurrentVersion)
}
lastConflictVersion = versionErr.CurrentVersion
// Exponential backoff
backoff := time.Duration(math.Pow(2, float64(attempt))) * 100 * time.Millisecond
time.Sleep(backoff)
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// JitterRetryPattern implements exponential backoff with jitter to prevent
// thundering herd when multiple writers retry simultaneously.
func JitterRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
const baseBackoff = 100 * time.Millisecond
const maxJitter = 0.1 // 10% jitter
for attempt := 0; attempt < maxRetries; attempt++ {
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
return nil
}
if !errors.Is(err, aether.ErrVersionConflict) {
return err
}
// Exponential backoff with jitter
exponentialBackoff := time.Duration(math.Pow(2, float64(attempt))) * baseBackoff
jitter := time.Duration(rand.Float64() * float64(exponentialBackoff) * maxJitter)
totalBackoff := exponentialBackoff + jitter
log.Printf("Retrying in %v (attempt %d/%d)", totalBackoff, attempt+1, maxRetries)
time.Sleep(totalBackoff)
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// AdaptiveRetryPattern adjusts retry strategy based on version conflict patterns.
//
// This pattern demonstrates how application logic can use CurrentVersion to
// decide whether to retry, give up, or escalate to a higher-level handler.
func AdaptiveRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
for attempt := 0; attempt < maxRetries; attempt++ {
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
return nil
}
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
return err
}
// Adaptive backoff based on version distance
versionDistance := versionErr.CurrentVersion - versionErr.AttemptedVersion
if versionDistance > 10 {
// Many concurrent writers - back off more aggressively
log.Printf("High contention detected (gap: %d), aggressive backoff", versionDistance)
time.Sleep(time.Duration(versionDistance*10) * time.Millisecond)
} else if versionDistance > 3 {
// Moderate contention - normal backoff
log.Printf("Moderate contention detected (gap: %d)", versionDistance)
time.Sleep(time.Duration(versionDistance) * time.Millisecond)
} else {
// Light contention - minimal backoff
log.Printf("Light contention detected")
time.Sleep(50 * time.Millisecond)
}
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// EventualConsistencyPattern demonstrates how to handle version conflicts
// in an eventually consistent manner by publishing to a retry queue.
//
// This is useful when immediate retry is not feasible, and you want to
// defer the operation to a background worker.
type RetryQueueItem struct {
Event *aether.Event
ConflictVersion int64
ConflictAttempted int64
NextRetryTime time.Time
FailureCount int
}
func EventualConsistencyPattern(store aether.EventStore, retryQueue chan<- RetryQueueItem, event *aether.Event) {
err := store.SaveEvent(event)
if err == nil {
return
}
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
log.Printf("Non-retryable error: %v", err)
return
}
// Queue for retry - background worker will process this
retryItem := RetryQueueItem{
Event: event,
ConflictVersion: versionErr.CurrentVersion,
ConflictAttempted: versionErr.AttemptedVersion,
NextRetryTime: time.Now().Add(1 * time.Second),
FailureCount: 0,
}
select {
case retryQueue <- retryItem:
log.Printf("Queued event for retry: actor=%s", event.ActorID)
case <-time.After(5 * time.Second):
log.Printf("Failed to queue event for retry (queue full)")
}
}
// CircuitBreakerPattern implements a simple circuit breaker for version conflicts.
//
// The circuit breaker tracks failure rates and temporarily stops retrying
// when the failure rate gets too high, allowing the system to recover.
type CircuitBreaker struct {
failureCount int
successCount int
state string // "closed", "open", "half-open"
lastFailureTime time.Time
openDuration time.Duration
failureThreshold int
successThreshold int
}
func NewCircuitBreaker() *CircuitBreaker {
return &CircuitBreaker{
state: "closed",
openDuration: 30 * time.Second,
failureThreshold: 5,
successThreshold: 3,
}
}
func (cb *CircuitBreaker) RecordSuccess() {
if cb.state == "half-open" {
cb.successCount++
if cb.successCount >= cb.successThreshold {
cb.state = "closed"
cb.failureCount = 0
cb.successCount = 0
log.Printf("Circuit breaker closed")
}
}
}
func (cb *CircuitBreaker) RecordFailure() {
cb.lastFailureTime = time.Now()
cb.failureCount++
if cb.failureCount >= cb.failureThreshold {
cb.state = "open"
log.Printf("Circuit breaker opened")
}
}
func (cb *CircuitBreaker) CanRetry() bool {
if cb.state == "closed" {
return true
}
if cb.state == "open" {
if time.Since(cb.lastFailureTime) > cb.openDuration {
cb.state = "half-open"
cb.failureCount = 0
cb.successCount = 0
log.Printf("Circuit breaker half-open")
return true
}
return false
}
// half-open state allows retries
return true
}
func CircuitBreakerRetryPattern(store aether.EventStore, cb *CircuitBreaker, actorID string, eventType string) error {
if !cb.CanRetry() {
return fmt.Errorf("circuit breaker open - not retrying")
}
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
cb.RecordSuccess()
return nil
}
if !errors.Is(err, aether.ErrVersionConflict) {
return err
}
cb.RecordFailure()
return fmt.Errorf("save failed with version conflict, circuit breaker status: %s", cb.state)
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,215 +0,0 @@
package store
import (
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
)
// TestEventImmutability_MemoryStore verifies that events cannot be modified after persistence
// in the in-memory event store. This demonstrates the append-only nature of event sourcing.
func TestEventImmutability_MemoryStore(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "test-actor-123"
// Create and save an event
originalEvent := &aether.Event{
ID: "evt-immutable-1",
EventType: "TestEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{
"value": "original",
},
Timestamp: time.Now(),
}
err := store.SaveEvent(originalEvent)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Retrieve the event from the store
events, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) == 0 {
t.Fatal("expected 1 event, got 0")
}
retrievedEvent := events[0]
// Verify the stored event has the correct values
if retrievedEvent.Data["value"] != "original" {
t.Errorf("Data value mismatch: got %v, want %v", retrievedEvent.Data["value"], "original")
}
if retrievedEvent.EventType != "TestEvent" {
t.Errorf("EventType mismatch: got %q, want %q", retrievedEvent.EventType, "TestEvent")
}
// Verify ID is correct
if retrievedEvent.ID != "evt-immutable-1" {
t.Errorf("Event ID mismatch: got %q, want %q", retrievedEvent.ID, "evt-immutable-1")
}
}
// TestEventImmutability_NoUpdateMethod verifies that the EventStore interface
// has only append, read methods - no Update or Delete methods.
func TestEventImmutability_NoUpdateMethod(t *testing.T) {
// This test documents that the EventStore interface is append-only.
// The interface intentionally provides:
// - SaveEvent: append only
// - GetEvents: read only
// - GetLatestVersion: read only
//
// To verify this, we demonstrate that any attempt to call non-existent
// update/delete methods would be caught at compile time (not runtime).
// This is enforced by the interface definition in event.go which does
// not include Update, Delete, or Modify methods.
store := NewInMemoryEventStore()
// Compile-time check: these would not compile if we tried them:
// store.Update(event) // compile error: no such method
// store.Delete(eventID) // compile error: no such method
// store.Modify(eventID, newData) // compile error: no such method
// Only these methods exist:
var eventStore aether.EventStore = store
if eventStore == nil {
t.Fatal("eventStore is nil")
}
// If we got here, the compile-time checks passed
t.Log("EventStore interface enforces append-only semantics by design")
}
// TestEventImmutability_VersionOnlyGoesUp verifies that versions are monotonically
// increasing and attempting to save with a non-increasing version fails.
func TestEventImmutability_VersionOnlyGoesUp(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "actor-version-check"
// Save first event with version 1
event1 := &aether.Event{
ID: "evt-v1",
EventType: "Event1",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event1)
if err != nil {
t.Fatalf("SaveEvent(v1) failed: %v", err)
}
// Try to save with same version - should fail
event2Same := &aether.Event{
ID: "evt-v1-again",
EventType: "Event2",
ActorID: actorID,
Version: 1, // Same version
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2Same)
if err == nil {
t.Error("expected SaveEvent(same version) to fail, but it succeeded")
}
// Try to save with lower version - should fail
event3Lower := &aether.Event{
ID: "evt-v0",
EventType: "Event3",
ActorID: actorID,
Version: 0, // Lower version
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event3Lower)
if err == nil {
t.Error("expected SaveEvent(lower version) to fail, but it succeeded")
}
// Save with next version - should succeed
event4Next := &aether.Event{
ID: "evt-v2",
EventType: "Event4",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event4Next)
if err != nil {
t.Fatalf("SaveEvent(v2) failed: %v", err)
}
// Verify we have exactly 2 events
events, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 2 {
t.Errorf("expected 2 events, got %d", len(events))
}
}
// TestEventImmutability_EventCannotBeDeleted verifies that there is no way to delete
// events from the store through the EventStore interface.
func TestEventImmutability_EventCannotBeDeleted(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "actor-nodelete"
// Save an event
event := &aether.Event{
ID: "evt-nodelete",
EventType: "ImportantEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"critical": true},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Retrieve it
events1, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents (1) failed: %v", err)
}
if len(events1) != 1 {
t.Fatal("expected 1 event after save")
}
// Try to delete through interface - this method doesn't exist
// store.Delete("evt-nodelete") // compile error: no such method
// store.DeleteByActorID(actorID) // compile error: no such method
// Verify the event is still there (we can't delete it)
events2, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents (2) failed: %v", err)
}
if len(events2) != 1 {
t.Errorf("expected 1 event (should not be deletable), got %d", len(events2))
}
if events2[0].ID != "evt-nodelete" {
t.Errorf("event ID changed: got %q, want %q", events2[0].ID, "evt-nodelete")
}
}

View File

@@ -9,7 +9,6 @@ import (
"git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go"
"github.com/google/uuid"
)
// Default configuration values for JetStream event store
@@ -20,14 +19,7 @@ const (
// JetStreamConfig holds configuration options for JetStreamEventStore
type JetStreamConfig struct {
// StreamRetention is how long to keep events (default: 1 year).
// JetStream enforces this retention policy at the storage level using a limits-based policy:
// - MaxAge: Events older than this duration are automatically deleted
// - Storage is file-based (nats.FileStorage) for durability
// - Once the retention period expires, events are permanently removed from the stream
// This ensures that old events do not consume storage indefinitely.
// To keep events indefinitely, set StreamRetention to a very large value or configure
// a custom retention policy in the JetStream stream configuration.
// StreamRetention is how long to keep events (default: 1 year)
StreamRetention time.Duration
// ReplicaCount is the number of replicas for high availability (default: 1)
ReplicaCount int
@@ -48,52 +40,14 @@ func DefaultJetStreamConfig() JetStreamConfig {
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
// It also implements EventStoreWithErrors to report malformed events during replay.
//
// ## Immutability Guarantee
//
// JetStreamEventStore is append-only. Events are stored in a JetStream stream that
// is configured with file-based storage (nats.FileStorage) and a retention policy
// (nats.LimitsPolicy). The configured MaxAge retention policy ensures that old events
// eventually expire, but during their lifetime, events are never modified or deleted
// through the EventStore API. Once an event is published to the stream:
// - It cannot be updated
// - It cannot be deleted before expiration
// - It can only be read
//
// This architectural guarantee, combined with the EventStore interface providing
// no Update or Delete methods, ensures events are immutable and suitable as an
// audit trail.
//
// ## Version Cache Invalidation Strategy
//
// JetStreamEventStore maintains an in-memory cache of actor versions for optimistic
// concurrency control. The cache is invalidated on any miss (GetLatestVersion call
// that finds a newer version in JetStream) to ensure consistency even when external
// processes write to the same JetStream stream.
//
// If only Aether owns the stream (single-writer assumption), the cache provides
// excellent performance for repeated version checks. If external writers modify
// the stream, the cache will remain consistent because:
//
// 1. On SaveEvent: getLatestVersionLocked() checks JetStream on cache miss
// 2. On GetLatestVersion: If actual version > cached version, cache is invalidated
// 3. Subsequent checks for that actor will fetch fresh data from JetStream
//
// This strategy prevents data corruption from stale cache while maintaining
// performance for the single-writer case.
type JetStreamEventStore struct {
js nats.JetStreamContext
streamName string
config JetStreamConfig
mu sync.Mutex // Protects version checks during SaveEvent
versions map[string]int64 // actorID -> latest version cache
broadcaster aether.EventBroadcaster // Optional broadcaster for EventStored events
namespace string // Optional namespace for event publishing
}
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
@@ -149,8 +103,6 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
streamName: effectiveStreamName,
config: config,
versions: make(map[string]int64),
broadcaster: nil,
namespace: "",
}, nil
}
@@ -164,58 +116,6 @@ func (jes *JetStreamEventStore) GetStreamName() string {
return jes.streamName
}
// NewJetStreamEventStoreWithBroadcaster creates a new JetStream-based event store with broadcaster support.
// The broadcaster receives EventStored events when events are successfully saved.
func NewJetStreamEventStoreWithBroadcaster(natsConn *nats.Conn, streamName string, broadcaster aether.EventBroadcaster, namespace string) (*JetStreamEventStore, error) {
config := DefaultJetStreamConfig()
if namespace != "" {
config.Namespace = namespace
}
js, err := natsConn.JetStream()
if err != nil {
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
}
// Apply defaults for zero values
if config.StreamRetention == 0 {
config.StreamRetention = DefaultStreamRetention
}
if config.ReplicaCount == 0 {
config.ReplicaCount = DefaultReplicaCount
}
// Apply namespace prefix to stream name if provided
effectiveStreamName := streamName
if config.Namespace != "" {
effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName)
}
// Create or update the stream
stream := &nats.StreamConfig{
Name: effectiveStreamName,
Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)},
Storage: nats.FileStorage,
Retention: nats.LimitsPolicy,
MaxAge: config.StreamRetention,
Replicas: config.ReplicaCount,
}
_, err = js.AddStream(stream)
if err != nil && !strings.Contains(err.Error(), "already exists") {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
return &JetStreamEventStore{
js: js,
streamName: effectiveStreamName,
config: config,
versions: make(map[string]int64),
broadcaster: broadcaster,
namespace: namespace,
}, nil
}
// SaveEvent persists an event to JetStream.
// Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor.
@@ -276,34 +176,9 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
// Update version cache after successful publish
jes.versions[event.ActorID] = event.Version
// Publish EventStored event after successful save (if broadcaster is configured)
if jes.broadcaster != nil {
jes.publishEventStored(event)
}
return nil
}
// publishEventStored publishes an EventStored event to the broadcaster.
// This is called after a successful SaveEvent to notify subscribers.
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
eventStored := &aether.Event{
ID: uuid.New().String(),
EventType: aether.EventTypeEventStored,
ActorID: originalEvent.ActorID, // EventStored is about the original actor
Version: originalEvent.Version, // Preserve the version of the stored event
Data: map[string]interface{}{
"eventId": originalEvent.ID,
"actorId": originalEvent.ActorID,
"version": originalEvent.Version,
"timestamp": originalEvent.Timestamp.Unix(),
},
Timestamp: time.Now(),
}
jes.broadcaster.Publish(jes.namespace, eventStored)
}
// GetEvents retrieves all events for an actor since a version.
// Note: This method silently skips malformed events for backward compatibility.
// Use GetEventsWithErrors to receive information about malformed events.

File diff suppressed because it is too large Load Diff

View File

@@ -2,19 +2,15 @@ package store
import (
"sync"
"time"
"git.flowmade.one/flowmade-one/aether"
"github.com/google/uuid"
)
// InMemoryEventStore provides a simple in-memory event store for testing
type InMemoryEventStore struct {
mu sync.RWMutex
events map[string][]*aether.Event // actorID -> events
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
broadcaster aether.EventBroadcaster // optional broadcaster for EventStored events
namespace string // optional namespace for event publishing
mu sync.RWMutex
events map[string][]*aether.Event // actorID -> events
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
}
// NewInMemoryEventStore creates a new in-memory event store
@@ -25,21 +21,9 @@ func NewInMemoryEventStore() *InMemoryEventStore {
}
}
// NewInMemoryEventStoreWithBroadcaster creates a new in-memory event store with an event broadcaster
// The broadcaster receives EventStored events when events are successfully saved.
func NewInMemoryEventStoreWithBroadcaster(broadcaster aether.EventBroadcaster, namespace string) *InMemoryEventStore {
return &InMemoryEventStore{
events: make(map[string][]*aether.Event),
snapshots: make(map[string][]*aether.ActorSnapshot),
broadcaster: broadcaster,
namespace: namespace,
}
}
// SaveEvent saves an event to the in-memory store.
// Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor.
// If a broadcaster is configured, publishes an EventStored event on success.
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
es.mu.Lock()
defer es.mu.Unlock()
@@ -67,35 +51,9 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
es.events[event.ActorID] = make([]*aether.Event, 0)
}
es.events[event.ActorID] = append(es.events[event.ActorID], event)
// Publish EventStored event after successful save (if broadcaster is configured)
if es.broadcaster != nil {
es.publishEventStored(event)
}
return nil
}
// publishEventStored publishes an EventStored event to the broadcaster.
// This is called after a successful SaveEvent to notify subscribers.
func (es *InMemoryEventStore) publishEventStored(originalEvent *aether.Event) {
eventStored := &aether.Event{
ID: uuid.New().String(),
EventType: aether.EventTypeEventStored,
ActorID: originalEvent.ActorID, // EventStored is about the original actor
Version: originalEvent.Version, // Preserve the version of the stored event
Data: map[string]interface{}{
"eventId": originalEvent.ID,
"actorId": originalEvent.ActorID,
"version": originalEvent.Version,
"timestamp": originalEvent.Timestamp.Unix(),
},
Timestamp: time.Now(),
}
es.broadcaster.Publish(es.namespace, eventStored)
}
// GetEvents retrieves events for an actor from a specific version
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
es.mu.RLock()

View File

@@ -1905,181 +1905,3 @@ func TestSaveEvent_MetadataPreservedAcrossMultipleEvents(t *testing.T) {
}
}
}
// === EventStored Publishing Tests ===
func TestSaveEvent_WithBroadcaster_PublishesEventStored(t *testing.T) {
// Create a mock broadcaster to capture published events
broadcaster := aether.NewEventBus()
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
// Subscribe to EventStored events
ch := broadcaster.Subscribe("test-namespace")
defer broadcaster.Unsubscribe("test-namespace", ch)
event := &aether.Event{
ID: "evt-123",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{
"total": 100.50,
},
Timestamp: time.Now(),
}
// Save event
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Check if EventStored was published
select {
case publishedEvent := <-ch:
if publishedEvent == nil {
t.Fatal("received nil event from broadcaster")
}
if publishedEvent.EventType != aether.EventTypeEventStored {
t.Errorf("expected EventType %q, got %q", aether.EventTypeEventStored, publishedEvent.EventType)
}
if publishedEvent.ActorID != "order-456" {
t.Errorf("expected ActorID %q, got %q", "order-456", publishedEvent.ActorID)
}
if publishedEvent.Version != 1 {
t.Errorf("expected Version 1, got %d", publishedEvent.Version)
}
// Check data contains original event info
if publishedEvent.Data["eventId"] != "evt-123" {
t.Errorf("expected eventId %q, got %q", "evt-123", publishedEvent.Data["eventId"])
}
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for EventStored event")
}
}
func TestSaveEvent_VersionConflict_NoEventStored(t *testing.T) {
broadcaster := aether.NewEventBus()
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
// Subscribe to EventStored events
ch := broadcaster.Subscribe("test-namespace")
defer broadcaster.Unsubscribe("test-namespace", ch)
// Save first event
event1 := &aether.Event{
ID: "evt-1",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event1)
if err != nil {
t.Fatalf("SaveEvent(event1) failed: %v", err)
}
// Drain the first EventStored event
select {
case <-ch:
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for first EventStored event")
}
// Try to save event with non-increasing version (should fail)
event2 := &aether.Event{
ID: "evt-2",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1, // Same version, should conflict
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2)
if !errors.Is(err, aether.ErrVersionConflict) {
t.Fatalf("expected ErrVersionConflict, got %v", err)
}
// Verify no EventStored event was published
select {
case <-ch:
t.Fatal("expected no EventStored event, but received one")
case <-time.After(50 * time.Millisecond):
// Expected - no event published
}
}
func TestSaveEvent_MultipleEvents_PublishesMultipleEventStored(t *testing.T) {
broadcaster := aether.NewEventBus()
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
// Subscribe to EventStored events
ch := broadcaster.Subscribe("test-namespace")
defer broadcaster.Unsubscribe("test-namespace", ch)
// Save multiple events
for i := int64(1); i <= 3; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "OrderPlaced",
ActorID: "order-456",
Version: i,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
}
// Verify we received 3 EventStored events in order
for i := int64(1); i <= 3; i++ {
select {
case publishedEvent := <-ch:
if publishedEvent == nil {
t.Fatal("received nil event from broadcaster")
}
if publishedEvent.Version != i {
t.Errorf("expected Version %d, got %d", i, publishedEvent.Version)
}
case <-time.After(100 * time.Millisecond):
t.Fatalf("timeout waiting for EventStored event %d", i)
}
}
}
func TestSaveEvent_WithoutBroadcaster_NoPanic(t *testing.T) {
// Test that SaveEvent works without a broadcaster (nil broadcaster)
store := NewInMemoryEventStore()
event := &aether.Event{
ID: "evt-123",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{
"total": 100.50,
},
Timestamp: time.Now(),
}
// This should not panic even though broadcaster is nil
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Verify event was saved
events, err := store.GetEvents("order-456", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 1 {
t.Fatalf("expected 1 event, got %d", len(events))
}
}