Compare commits
3 Commits
78aaea9330
...
6549125f3d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6549125f3d | ||
|
|
464fed67ec | ||
|
|
46e1c44017 |
8
event.go
8
event.go
@@ -73,6 +73,14 @@ type Event struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
// Common event types for Aether infrastructure
|
||||
const (
|
||||
// EventTypeEventStored is an internal event published when an event is successfully persisted.
|
||||
// This event allows observability components (metrics, projections, audit systems) to react
|
||||
// to persisted events without coupling to application code.
|
||||
EventTypeEventStored = "EventStored"
|
||||
)
|
||||
|
||||
// Common metadata keys for distributed tracing and auditing
|
||||
const (
|
||||
// MetadataKeyCorrelationID identifies related events across services
|
||||
|
||||
189
event_test.go
189
event_test.go
@@ -2,6 +2,8 @@ package aether
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -1335,3 +1337,190 @@ func TestReplayError_WithLargeRawData(t *testing.T) {
|
||||
// Error() should still work
|
||||
_ = err.Error()
|
||||
}
|
||||
|
||||
// Tests for VersionConflictError
|
||||
|
||||
func TestVersionConflictError_Error(t *testing.T) {
|
||||
err := &VersionConflictError{
|
||||
ActorID: "order-123",
|
||||
AttemptedVersion: 3,
|
||||
CurrentVersion: 5,
|
||||
}
|
||||
|
||||
errMsg := err.Error()
|
||||
|
||||
// Verify error message contains all context
|
||||
if !strings.Contains(errMsg, "order-123") {
|
||||
t.Errorf("error message should contain ActorID, got: %s", errMsg)
|
||||
}
|
||||
if !strings.Contains(errMsg, "3") {
|
||||
t.Errorf("error message should contain AttemptedVersion, got: %s", errMsg)
|
||||
}
|
||||
if !strings.Contains(errMsg, "5") {
|
||||
t.Errorf("error message should contain CurrentVersion, got: %s", errMsg)
|
||||
}
|
||||
if !strings.Contains(errMsg, "version conflict") {
|
||||
t.Errorf("error message should contain 'version conflict', got: %s", errMsg)
|
||||
}
|
||||
}
|
||||
|
||||
func TestVersionConflictError_Fields(t *testing.T) {
|
||||
err := &VersionConflictError{
|
||||
ActorID: "actor-456",
|
||||
AttemptedVersion: 10,
|
||||
CurrentVersion: 8,
|
||||
}
|
||||
|
||||
if err.ActorID != "actor-456" {
|
||||
t.Errorf("ActorID mismatch: got %q, want %q", err.ActorID, "actor-456")
|
||||
}
|
||||
if err.AttemptedVersion != 10 {
|
||||
t.Errorf("AttemptedVersion mismatch: got %d, want %d", err.AttemptedVersion, 10)
|
||||
}
|
||||
if err.CurrentVersion != 8 {
|
||||
t.Errorf("CurrentVersion mismatch: got %d, want %d", err.CurrentVersion, 8)
|
||||
}
|
||||
}
|
||||
|
||||
func TestVersionConflictError_Unwrap(t *testing.T) {
|
||||
err := &VersionConflictError{
|
||||
ActorID: "actor-789",
|
||||
AttemptedVersion: 2,
|
||||
CurrentVersion: 1,
|
||||
}
|
||||
|
||||
unwrapped := err.Unwrap()
|
||||
if unwrapped != ErrVersionConflict {
|
||||
t.Errorf("Unwrap should return ErrVersionConflict sentinel")
|
||||
}
|
||||
}
|
||||
|
||||
func TestVersionConflictError_ErrorsIs(t *testing.T) {
|
||||
err := &VersionConflictError{
|
||||
ActorID: "test-actor",
|
||||
AttemptedVersion: 5,
|
||||
CurrentVersion: 4,
|
||||
}
|
||||
|
||||
// Test that errors.Is works with sentinel
|
||||
if !errors.Is(err, ErrVersionConflict) {
|
||||
t.Error("errors.Is(err, ErrVersionConflict) should return true")
|
||||
}
|
||||
|
||||
// Test that other errors don't match
|
||||
if errors.Is(err, errors.New("other error")) {
|
||||
t.Error("errors.Is should not match unrelated errors")
|
||||
}
|
||||
}
|
||||
|
||||
func TestVersionConflictError_ErrorsAs(t *testing.T) {
|
||||
originalErr := &VersionConflictError{
|
||||
ActorID: "actor-unwrap",
|
||||
AttemptedVersion: 7,
|
||||
CurrentVersion: 6,
|
||||
}
|
||||
|
||||
var versionErr *VersionConflictError
|
||||
if !errors.As(originalErr, &versionErr) {
|
||||
t.Fatalf("errors.As should succeed with VersionConflictError")
|
||||
}
|
||||
|
||||
// Verify fields are accessible through unwrapped error
|
||||
if versionErr.ActorID != "actor-unwrap" {
|
||||
t.Errorf("ActorID mismatch after As: got %q", versionErr.ActorID)
|
||||
}
|
||||
if versionErr.AttemptedVersion != 7 {
|
||||
t.Errorf("AttemptedVersion mismatch after As: got %d", versionErr.AttemptedVersion)
|
||||
}
|
||||
if versionErr.CurrentVersion != 6 {
|
||||
t.Errorf("CurrentVersion mismatch after As: got %d", versionErr.CurrentVersion)
|
||||
}
|
||||
}
|
||||
|
||||
func TestVersionConflictError_CanReadCurrentVersion(t *testing.T) {
|
||||
// This test verifies that applications can read CurrentVersion for retry strategies
|
||||
err := &VersionConflictError{
|
||||
ActorID: "order-abc",
|
||||
AttemptedVersion: 2,
|
||||
CurrentVersion: 10,
|
||||
}
|
||||
|
||||
var versionErr *VersionConflictError
|
||||
if !errors.As(err, &versionErr) {
|
||||
t.Fatal("failed to unwrap VersionConflictError")
|
||||
}
|
||||
|
||||
// Application can use CurrentVersion to decide retry strategy
|
||||
nextVersion := versionErr.CurrentVersion + 1
|
||||
|
||||
if nextVersion != 11 {
|
||||
t.Errorf("application should be able to compute next version: got %d, want 11", nextVersion)
|
||||
}
|
||||
|
||||
// Application can log detailed context
|
||||
logMsg := fmt.Sprintf("Version conflict for actor %q: attempted %d, current %d, will retry with %d",
|
||||
versionErr.ActorID, versionErr.AttemptedVersion, versionErr.CurrentVersion, nextVersion)
|
||||
|
||||
if !strings.Contains(logMsg, "order-abc") {
|
||||
t.Errorf("application context logging failed: %s", logMsg)
|
||||
}
|
||||
}
|
||||
|
||||
func TestVersionConflictError_EdgeCases(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
actorID string
|
||||
attemp int64
|
||||
current int64
|
||||
}{
|
||||
{"zero current", "actor-1", 1, 0},
|
||||
{"large numbers", "actor-2", 1000000, 999999},
|
||||
{"max int64", "actor-3", 9223372036854775807, 9223372036854775806},
|
||||
{"negative attempt", "actor-4", -1, -2},
|
||||
{"empty actor id", "", 1, 0},
|
||||
{"special chars in actor id", "actor@#$%", 2, 1},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
err := &VersionConflictError{
|
||||
ActorID: tc.actorID,
|
||||
AttemptedVersion: tc.attemp,
|
||||
CurrentVersion: tc.current,
|
||||
}
|
||||
|
||||
// Should not panic
|
||||
msg := err.Error()
|
||||
if msg == "" {
|
||||
t.Error("Error() should return non-empty string")
|
||||
}
|
||||
|
||||
// Should be wrapped correctly
|
||||
if err.Unwrap() != ErrVersionConflict {
|
||||
t.Error("Unwrap should return ErrVersionConflict")
|
||||
}
|
||||
|
||||
// errors.Is should work
|
||||
if !errors.Is(err, ErrVersionConflict) {
|
||||
t.Error("errors.Is should work for edge case")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestErrVersionConflict_Sentinel(t *testing.T) {
|
||||
// Verify the sentinel error is correctly defined
|
||||
if ErrVersionConflict == nil {
|
||||
t.Fatal("ErrVersionConflict should not be nil")
|
||||
}
|
||||
|
||||
expectedMsg := "version conflict"
|
||||
if ErrVersionConflict.Error() != expectedMsg {
|
||||
t.Errorf("ErrVersionConflict message mismatch: got %q, want %q", ErrVersionConflict.Error(), expectedMsg)
|
||||
}
|
||||
|
||||
// Test that it's usable with errors.Is
|
||||
if !errors.Is(ErrVersionConflict, ErrVersionConflict) {
|
||||
t.Error("ErrVersionConflict should match itself with errors.Is")
|
||||
}
|
||||
}
|
||||
|
||||
189
examples/README.md
Normal file
189
examples/README.md
Normal file
@@ -0,0 +1,189 @@
|
||||
# Aether Examples
|
||||
|
||||
This directory contains examples demonstrating common patterns for using Aether.
|
||||
|
||||
## Retry Patterns (`retry_patterns.go`)
|
||||
|
||||
When saving events with optimistic concurrency control, your application may encounter `VersionConflictError` when multiple writers attempt to update the same actor concurrently. This file demonstrates several retry strategies.
|
||||
|
||||
### Pattern Overview
|
||||
|
||||
All retry patterns work with `VersionConflictError` which provides three critical fields:
|
||||
|
||||
- **ActorID**: The actor that experienced the conflict
|
||||
- **CurrentVersion**: The latest version in the store
|
||||
- **AttemptedVersion**: The version you tried to save
|
||||
|
||||
Your application can read these fields to make intelligent retry decisions.
|
||||
|
||||
### Available Patterns
|
||||
|
||||
#### SimpleRetryPattern
|
||||
|
||||
The most basic pattern - just retry with exponential backoff:
|
||||
|
||||
```go
|
||||
// Automatically retries up to 3 times with exponential backoff
|
||||
err := SimpleRetryPattern(store, "order-123", "OrderUpdated")
|
||||
```
|
||||
|
||||
**Use when**: You want a straightforward retry mechanism without complex logic.
|
||||
|
||||
#### ConflictDetailedRetryPattern
|
||||
|
||||
Extracts detailed information from the conflict error to make smarter decisions:
|
||||
|
||||
```go
|
||||
// Detects thrashing (multiple conflicts at same version)
|
||||
// and can implement circuit-breaker logic
|
||||
err := ConflictDetailedRetryPattern(store, "order-123", "OrderUpdated")
|
||||
```
|
||||
|
||||
**Use when**: You need visibility into conflict patterns and want to detect system issues like thrashing.
|
||||
|
||||
#### JitterRetryPattern
|
||||
|
||||
Adds randomized jitter to prevent "thundering herd" when multiple writers retry:
|
||||
|
||||
```go
|
||||
// Exponential backoff with jitter prevents synchronized retries
|
||||
err := JitterRetryPattern(store, "order-123", "OrderUpdated")
|
||||
```
|
||||
|
||||
**Use when**: You have high concurrency and want to prevent all writers from retrying at the same time.
|
||||
|
||||
#### AdaptiveRetryPattern
|
||||
|
||||
Adjusts backoff duration based on version distance (indicator of contention):
|
||||
|
||||
```go
|
||||
// Light contention (gap=1): 50ms backoff
|
||||
// Moderate contention (gap=3-10): proportional backoff
|
||||
// High contention (gap>10): aggressive backoff
|
||||
err := AdaptiveRetryPattern(store, "order-123", "OrderUpdated")
|
||||
```
|
||||
|
||||
**Use when**: You want backoff strategy to respond to actual system load.
|
||||
|
||||
#### EventualConsistencyPattern
|
||||
|
||||
Instead of blocking on retry, queues the event for asynchronous retry:
|
||||
|
||||
```go
|
||||
// Returns immediately, event is queued for background retry
|
||||
EventualConsistencyPattern(store, retryQueue, event)
|
||||
|
||||
// Background worker processes the queue
|
||||
for item := range retryQueue {
|
||||
// Implement your own retry logic here
|
||||
}
|
||||
```
|
||||
|
||||
**Use when**: You can't afford to block the request, and background retry is acceptable.
|
||||
|
||||
#### CircuitBreakerPattern
|
||||
|
||||
Implements a circuit breaker to prevent cascading failures:
|
||||
|
||||
```go
|
||||
cb := NewCircuitBreaker()
|
||||
|
||||
// Fails fast when circuit is open
|
||||
err := CircuitBreakerRetryPattern(store, cb, "order-123", "OrderUpdated")
|
||||
if err != nil && !cb.CanRetry() {
|
||||
return ErrCircuitBreakerOpen
|
||||
}
|
||||
```
|
||||
|
||||
**Use when**: You have a distributed system and want to prevent retry storms during outages.
|
||||
|
||||
## Common Pattern: Extract and Log Context
|
||||
|
||||
All patterns can read context from `VersionConflictError`:
|
||||
|
||||
```go
|
||||
var versionErr *aether.VersionConflictError
|
||||
if errors.As(err, &versionErr) {
|
||||
log.Printf(
|
||||
"Conflict for actor %q: attempted %d, current %d",
|
||||
versionErr.ActorID,
|
||||
versionErr.AttemptedVersion,
|
||||
versionErr.CurrentVersion,
|
||||
)
|
||||
}
|
||||
```
|
||||
|
||||
## Sentinel Error Check
|
||||
|
||||
Check if an error is a version conflict without examining the struct:
|
||||
|
||||
```go
|
||||
if errors.Is(err, aether.ErrVersionConflict) {
|
||||
// This is a version conflict - retry is appropriate
|
||||
}
|
||||
```
|
||||
|
||||
## Implementing Your Own Pattern
|
||||
|
||||
Basic template:
|
||||
|
||||
```go
|
||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||
// 1. Get current version
|
||||
currentVersion, err := store.GetLatestVersion(actorID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 2. Create event with next version
|
||||
event := &aether.Event{
|
||||
ActorID: actorID,
|
||||
Version: currentVersion + 1,
|
||||
// ... other fields
|
||||
}
|
||||
|
||||
// 3. Attempt save
|
||||
err = store.SaveEvent(event)
|
||||
if err == nil {
|
||||
return nil // Success
|
||||
}
|
||||
|
||||
// 4. Check if it's a conflict
|
||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
return err // Some other error
|
||||
}
|
||||
|
||||
// 5. Implement your retry strategy
|
||||
time.Sleep(yourBackoff(attempt))
|
||||
}
|
||||
```
|
||||
|
||||
## Choosing a Pattern
|
||||
|
||||
| Pattern | Latency | Throughput | Complexity | Use Case |
|
||||
|---------|---------|-----------|-----------|----------|
|
||||
| Simple | Low | Low | Very Low | Single writer, testing |
|
||||
| DetailedConflict | Low | Medium | Medium | Debugging, monitoring |
|
||||
| Jitter | Low-Medium | High | Low | Multi-writer concurrency |
|
||||
| Adaptive | Low-Medium | High | Medium | Variable load scenarios |
|
||||
| EventualConsistency | Very Low | Very High | High | High-volume, async-OK workloads |
|
||||
| CircuitBreaker | Variable | Stable | High | Distributed, failure-resilient systems |
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
1. **Backoff timing**: Shorter backoffs waste CPU on retries, longer backoffs increase latency
|
||||
2. **Retry limits**: Too few retries give up too early, too many waste resources
|
||||
3. **Jitter**: Essential for preventing synchronized retries in high-concurrency scenarios
|
||||
4. **Monitoring**: Track retry rates and conflict patterns to detect system issues
|
||||
|
||||
## Testing
|
||||
|
||||
Use `aether.NewInMemoryEventStore()` in tests:
|
||||
|
||||
```go
|
||||
store := store.NewInMemoryEventStore()
|
||||
err := SimpleRetryPattern(store, "test-actor", "TestEvent")
|
||||
if err != nil {
|
||||
t.Fatalf("retry pattern failed: %v", err)
|
||||
}
|
||||
```
|
||||
353
examples/retry_patterns.go
Normal file
353
examples/retry_patterns.go
Normal file
@@ -0,0 +1,353 @@
|
||||
package examples
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
)
|
||||
|
||||
// SimpleRetryPattern demonstrates a basic retry loop using VersionConflictError.
|
||||
//
|
||||
// This pattern is suitable for scenarios where you want to automatically retry
|
||||
// with exponential backoff when version conflicts occur.
|
||||
func SimpleRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
||||
const maxRetries = 3
|
||||
const initialBackoff = 100 * time.Millisecond
|
||||
|
||||
var event *aether.Event
|
||||
|
||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||
if attempt > 0 {
|
||||
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * initialBackoff
|
||||
log.Printf("Retry attempt %d after %v", attempt, backoff)
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
|
||||
// Get the current version for the actor
|
||||
currentVersion, err := store.GetLatestVersion(actorID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get latest version: %w", err)
|
||||
}
|
||||
|
||||
// Create event with next version
|
||||
event = &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d-%d", time.Now().UnixNano(), attempt),
|
||||
EventType: eventType,
|
||||
ActorID: actorID,
|
||||
Version: currentVersion + 1,
|
||||
Data: map[string]interface{}{"attempt": attempt},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Attempt to save
|
||||
if err := store.SaveEvent(event); err == nil {
|
||||
log.Printf("Successfully saved event for actor %s at version %d", actorID, event.Version)
|
||||
return nil
|
||||
} else if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
// Some other error occurred
|
||||
return fmt.Errorf("save event failed: %w", err)
|
||||
}
|
||||
// If it's a version conflict, loop will retry
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to save event after %d retries", maxRetries)
|
||||
}
|
||||
|
||||
// ConflictDetailedRetryPattern demonstrates how to extract detailed information
|
||||
// from VersionConflictError to make intelligent retry decisions.
|
||||
//
|
||||
// This pattern shows how to log detailed context and potentially implement
|
||||
// circuit-breaker logic based on the conflict information.
|
||||
func ConflictDetailedRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
||||
const maxRetries = 5
|
||||
var lastConflictVersion int64
|
||||
|
||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||
// Get current version
|
||||
currentVersion, err := store.GetLatestVersion(actorID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create event
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
||||
EventType: eventType,
|
||||
ActorID: actorID,
|
||||
Version: currentVersion + 1,
|
||||
Data: map[string]interface{}{"timestamp": time.Now()},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Attempt to save
|
||||
err = store.SaveEvent(event)
|
||||
if err == nil {
|
||||
return nil // Success
|
||||
}
|
||||
|
||||
// Check if it's a version conflict
|
||||
var versionErr *aether.VersionConflictError
|
||||
if !errors.As(err, &versionErr) {
|
||||
// Not a version conflict, fail immediately
|
||||
return err
|
||||
}
|
||||
|
||||
// Extract detailed context from the conflict error
|
||||
log.Printf(
|
||||
"Version conflict for actor %q: attempted version %d, current version %d",
|
||||
versionErr.ActorID,
|
||||
versionErr.AttemptedVersion,
|
||||
versionErr.CurrentVersion,
|
||||
)
|
||||
|
||||
// Check for thrashing (multiple conflicts with same version)
|
||||
if lastConflictVersion == versionErr.CurrentVersion && attempt > 0 {
|
||||
log.Printf("Detected version thrashing - circuit breaker would trigger here")
|
||||
return fmt.Errorf("circuit breaker: too many conflicts at version %d", versionErr.CurrentVersion)
|
||||
}
|
||||
lastConflictVersion = versionErr.CurrentVersion
|
||||
|
||||
// Exponential backoff
|
||||
backoff := time.Duration(math.Pow(2, float64(attempt))) * 100 * time.Millisecond
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed after %d retries", maxRetries)
|
||||
}
|
||||
|
||||
// JitterRetryPattern implements exponential backoff with jitter to prevent
|
||||
// thundering herd when multiple writers retry simultaneously.
|
||||
func JitterRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
||||
const maxRetries = 3
|
||||
const baseBackoff = 100 * time.Millisecond
|
||||
const maxJitter = 0.1 // 10% jitter
|
||||
|
||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||
currentVersion, err := store.GetLatestVersion(actorID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
||||
EventType: eventType,
|
||||
ActorID: actorID,
|
||||
Version: currentVersion + 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = store.SaveEvent(event)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Exponential backoff with jitter
|
||||
exponentialBackoff := time.Duration(math.Pow(2, float64(attempt))) * baseBackoff
|
||||
jitter := time.Duration(rand.Float64() * float64(exponentialBackoff) * maxJitter)
|
||||
totalBackoff := exponentialBackoff + jitter
|
||||
|
||||
log.Printf("Retrying in %v (attempt %d/%d)", totalBackoff, attempt+1, maxRetries)
|
||||
time.Sleep(totalBackoff)
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed after %d retries", maxRetries)
|
||||
}
|
||||
|
||||
// AdaptiveRetryPattern adjusts retry strategy based on version conflict patterns.
|
||||
//
|
||||
// This pattern demonstrates how application logic can use CurrentVersion to
|
||||
// decide whether to retry, give up, or escalate to a higher-level handler.
|
||||
func AdaptiveRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
||||
const maxRetries = 3
|
||||
|
||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||
currentVersion, err := store.GetLatestVersion(actorID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
||||
EventType: eventType,
|
||||
ActorID: actorID,
|
||||
Version: currentVersion + 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = store.SaveEvent(event)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var versionErr *aether.VersionConflictError
|
||||
if !errors.As(err, &versionErr) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Adaptive backoff based on version distance
|
||||
versionDistance := versionErr.CurrentVersion - versionErr.AttemptedVersion
|
||||
if versionDistance > 10 {
|
||||
// Many concurrent writers - back off more aggressively
|
||||
log.Printf("High contention detected (gap: %d), aggressive backoff", versionDistance)
|
||||
time.Sleep(time.Duration(versionDistance*10) * time.Millisecond)
|
||||
} else if versionDistance > 3 {
|
||||
// Moderate contention - normal backoff
|
||||
log.Printf("Moderate contention detected (gap: %d)", versionDistance)
|
||||
time.Sleep(time.Duration(versionDistance) * time.Millisecond)
|
||||
} else {
|
||||
// Light contention - minimal backoff
|
||||
log.Printf("Light contention detected")
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed after %d retries", maxRetries)
|
||||
}
|
||||
|
||||
// EventualConsistencyPattern demonstrates how to handle version conflicts
|
||||
// in an eventually consistent manner by publishing to a retry queue.
|
||||
//
|
||||
// This is useful when immediate retry is not feasible, and you want to
|
||||
// defer the operation to a background worker.
|
||||
type RetryQueueItem struct {
|
||||
Event *aether.Event
|
||||
ConflictVersion int64
|
||||
ConflictAttempted int64
|
||||
NextRetryTime time.Time
|
||||
FailureCount int
|
||||
}
|
||||
|
||||
func EventualConsistencyPattern(store aether.EventStore, retryQueue chan<- RetryQueueItem, event *aether.Event) {
|
||||
err := store.SaveEvent(event)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var versionErr *aether.VersionConflictError
|
||||
if !errors.As(err, &versionErr) {
|
||||
log.Printf("Non-retryable error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Queue for retry - background worker will process this
|
||||
retryItem := RetryQueueItem{
|
||||
Event: event,
|
||||
ConflictVersion: versionErr.CurrentVersion,
|
||||
ConflictAttempted: versionErr.AttemptedVersion,
|
||||
NextRetryTime: time.Now().Add(1 * time.Second),
|
||||
FailureCount: 0,
|
||||
}
|
||||
|
||||
select {
|
||||
case retryQueue <- retryItem:
|
||||
log.Printf("Queued event for retry: actor=%s", event.ActorID)
|
||||
case <-time.After(5 * time.Second):
|
||||
log.Printf("Failed to queue event for retry (queue full)")
|
||||
}
|
||||
}
|
||||
|
||||
// CircuitBreakerPattern implements a simple circuit breaker for version conflicts.
|
||||
//
|
||||
// The circuit breaker tracks failure rates and temporarily stops retrying
|
||||
// when the failure rate gets too high, allowing the system to recover.
|
||||
type CircuitBreaker struct {
|
||||
failureCount int
|
||||
successCount int
|
||||
state string // "closed", "open", "half-open"
|
||||
lastFailureTime time.Time
|
||||
openDuration time.Duration
|
||||
failureThreshold int
|
||||
successThreshold int
|
||||
}
|
||||
|
||||
func NewCircuitBreaker() *CircuitBreaker {
|
||||
return &CircuitBreaker{
|
||||
state: "closed",
|
||||
openDuration: 30 * time.Second,
|
||||
failureThreshold: 5,
|
||||
successThreshold: 3,
|
||||
}
|
||||
}
|
||||
|
||||
func (cb *CircuitBreaker) RecordSuccess() {
|
||||
if cb.state == "half-open" {
|
||||
cb.successCount++
|
||||
if cb.successCount >= cb.successThreshold {
|
||||
cb.state = "closed"
|
||||
cb.failureCount = 0
|
||||
cb.successCount = 0
|
||||
log.Printf("Circuit breaker closed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cb *CircuitBreaker) RecordFailure() {
|
||||
cb.lastFailureTime = time.Now()
|
||||
cb.failureCount++
|
||||
if cb.failureCount >= cb.failureThreshold {
|
||||
cb.state = "open"
|
||||
log.Printf("Circuit breaker opened")
|
||||
}
|
||||
}
|
||||
|
||||
func (cb *CircuitBreaker) CanRetry() bool {
|
||||
if cb.state == "closed" {
|
||||
return true
|
||||
}
|
||||
if cb.state == "open" {
|
||||
if time.Since(cb.lastFailureTime) > cb.openDuration {
|
||||
cb.state = "half-open"
|
||||
cb.failureCount = 0
|
||||
cb.successCount = 0
|
||||
log.Printf("Circuit breaker half-open")
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
// half-open state allows retries
|
||||
return true
|
||||
}
|
||||
|
||||
func CircuitBreakerRetryPattern(store aether.EventStore, cb *CircuitBreaker, actorID string, eventType string) error {
|
||||
if !cb.CanRetry() {
|
||||
return fmt.Errorf("circuit breaker open - not retrying")
|
||||
}
|
||||
|
||||
currentVersion, err := store.GetLatestVersion(actorID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
||||
EventType: eventType,
|
||||
ActorID: actorID,
|
||||
Version: currentVersion + 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = store.SaveEvent(event)
|
||||
if err == nil {
|
||||
cb.RecordSuccess()
|
||||
return nil
|
||||
}
|
||||
|
||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
return err
|
||||
}
|
||||
|
||||
cb.RecordFailure()
|
||||
return fmt.Errorf("save failed with version conflict, circuit breaker status: %s", cb.state)
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Default configuration values for JetStream event store
|
||||
@@ -86,6 +87,8 @@ type JetStreamEventStore struct {
|
||||
config JetStreamConfig
|
||||
mu sync.Mutex // Protects version checks during SaveEvent
|
||||
versions map[string]int64 // actorID -> latest version cache
|
||||
broadcaster aether.EventBroadcaster // Optional broadcaster for EventStored events
|
||||
namespace string // Optional namespace for event publishing
|
||||
}
|
||||
|
||||
|
||||
@@ -146,6 +149,8 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
|
||||
streamName: effectiveStreamName,
|
||||
config: config,
|
||||
versions: make(map[string]int64),
|
||||
broadcaster: nil,
|
||||
namespace: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -159,6 +164,58 @@ func (jes *JetStreamEventStore) GetStreamName() string {
|
||||
return jes.streamName
|
||||
}
|
||||
|
||||
// NewJetStreamEventStoreWithBroadcaster creates a new JetStream-based event store with broadcaster support.
|
||||
// The broadcaster receives EventStored events when events are successfully saved.
|
||||
func NewJetStreamEventStoreWithBroadcaster(natsConn *nats.Conn, streamName string, broadcaster aether.EventBroadcaster, namespace string) (*JetStreamEventStore, error) {
|
||||
config := DefaultJetStreamConfig()
|
||||
if namespace != "" {
|
||||
config.Namespace = namespace
|
||||
}
|
||||
|
||||
js, err := natsConn.JetStream()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
|
||||
}
|
||||
|
||||
// Apply defaults for zero values
|
||||
if config.StreamRetention == 0 {
|
||||
config.StreamRetention = DefaultStreamRetention
|
||||
}
|
||||
if config.ReplicaCount == 0 {
|
||||
config.ReplicaCount = DefaultReplicaCount
|
||||
}
|
||||
|
||||
// Apply namespace prefix to stream name if provided
|
||||
effectiveStreamName := streamName
|
||||
if config.Namespace != "" {
|
||||
effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName)
|
||||
}
|
||||
|
||||
// Create or update the stream
|
||||
stream := &nats.StreamConfig{
|
||||
Name: effectiveStreamName,
|
||||
Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)},
|
||||
Storage: nats.FileStorage,
|
||||
Retention: nats.LimitsPolicy,
|
||||
MaxAge: config.StreamRetention,
|
||||
Replicas: config.ReplicaCount,
|
||||
}
|
||||
|
||||
_, err = js.AddStream(stream)
|
||||
if err != nil && !strings.Contains(err.Error(), "already exists") {
|
||||
return nil, fmt.Errorf("failed to create stream: %w", err)
|
||||
}
|
||||
|
||||
return &JetStreamEventStore{
|
||||
js: js,
|
||||
streamName: effectiveStreamName,
|
||||
config: config,
|
||||
versions: make(map[string]int64),
|
||||
broadcaster: broadcaster,
|
||||
namespace: namespace,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SaveEvent persists an event to JetStream.
|
||||
// Returns VersionConflictError if the event's version is not strictly greater
|
||||
// than the current latest version for the actor.
|
||||
@@ -219,9 +276,34 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
||||
// Update version cache after successful publish
|
||||
jes.versions[event.ActorID] = event.Version
|
||||
|
||||
// Publish EventStored event after successful save (if broadcaster is configured)
|
||||
if jes.broadcaster != nil {
|
||||
jes.publishEventStored(event)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// publishEventStored publishes an EventStored event to the broadcaster.
|
||||
// This is called after a successful SaveEvent to notify subscribers.
|
||||
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
|
||||
eventStored := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
EventType: aether.EventTypeEventStored,
|
||||
ActorID: originalEvent.ActorID, // EventStored is about the original actor
|
||||
Version: originalEvent.Version, // Preserve the version of the stored event
|
||||
Data: map[string]interface{}{
|
||||
"eventId": originalEvent.ID,
|
||||
"actorId": originalEvent.ActorID,
|
||||
"version": originalEvent.Version,
|
||||
"timestamp": originalEvent.Timestamp.Unix(),
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
jes.broadcaster.Publish(jes.namespace, eventStored)
|
||||
}
|
||||
|
||||
// GetEvents retrieves all events for an actor since a version.
|
||||
// Note: This method silently skips malformed events for backward compatibility.
|
||||
// Use GetEventsWithErrors to receive information about malformed events.
|
||||
|
||||
@@ -2,8 +2,10 @@ package store
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// InMemoryEventStore provides a simple in-memory event store for testing
|
||||
@@ -11,6 +13,8 @@ type InMemoryEventStore struct {
|
||||
mu sync.RWMutex
|
||||
events map[string][]*aether.Event // actorID -> events
|
||||
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
||||
broadcaster aether.EventBroadcaster // optional broadcaster for EventStored events
|
||||
namespace string // optional namespace for event publishing
|
||||
}
|
||||
|
||||
// NewInMemoryEventStore creates a new in-memory event store
|
||||
@@ -21,9 +25,21 @@ func NewInMemoryEventStore() *InMemoryEventStore {
|
||||
}
|
||||
}
|
||||
|
||||
// NewInMemoryEventStoreWithBroadcaster creates a new in-memory event store with an event broadcaster
|
||||
// The broadcaster receives EventStored events when events are successfully saved.
|
||||
func NewInMemoryEventStoreWithBroadcaster(broadcaster aether.EventBroadcaster, namespace string) *InMemoryEventStore {
|
||||
return &InMemoryEventStore{
|
||||
events: make(map[string][]*aether.Event),
|
||||
snapshots: make(map[string][]*aether.ActorSnapshot),
|
||||
broadcaster: broadcaster,
|
||||
namespace: namespace,
|
||||
}
|
||||
}
|
||||
|
||||
// SaveEvent saves an event to the in-memory store.
|
||||
// Returns VersionConflictError if the event's version is not strictly greater
|
||||
// than the current latest version for the actor.
|
||||
// If a broadcaster is configured, publishes an EventStored event on success.
|
||||
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||
es.mu.Lock()
|
||||
defer es.mu.Unlock()
|
||||
@@ -51,9 +67,35 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||
es.events[event.ActorID] = make([]*aether.Event, 0)
|
||||
}
|
||||
es.events[event.ActorID] = append(es.events[event.ActorID], event)
|
||||
|
||||
// Publish EventStored event after successful save (if broadcaster is configured)
|
||||
if es.broadcaster != nil {
|
||||
es.publishEventStored(event)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// publishEventStored publishes an EventStored event to the broadcaster.
|
||||
// This is called after a successful SaveEvent to notify subscribers.
|
||||
func (es *InMemoryEventStore) publishEventStored(originalEvent *aether.Event) {
|
||||
eventStored := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
EventType: aether.EventTypeEventStored,
|
||||
ActorID: originalEvent.ActorID, // EventStored is about the original actor
|
||||
Version: originalEvent.Version, // Preserve the version of the stored event
|
||||
Data: map[string]interface{}{
|
||||
"eventId": originalEvent.ID,
|
||||
"actorId": originalEvent.ActorID,
|
||||
"version": originalEvent.Version,
|
||||
"timestamp": originalEvent.Timestamp.Unix(),
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
es.broadcaster.Publish(es.namespace, eventStored)
|
||||
}
|
||||
|
||||
// GetEvents retrieves events for an actor from a specific version
|
||||
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||
es.mu.RLock()
|
||||
|
||||
@@ -1905,3 +1905,181 @@ func TestSaveEvent_MetadataPreservedAcrossMultipleEvents(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === EventStored Publishing Tests ===
|
||||
|
||||
func TestSaveEvent_WithBroadcaster_PublishesEventStored(t *testing.T) {
|
||||
// Create a mock broadcaster to capture published events
|
||||
broadcaster := aether.NewEventBus()
|
||||
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||
|
||||
// Subscribe to EventStored events
|
||||
ch := broadcaster.Subscribe("test-namespace")
|
||||
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-123",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{
|
||||
"total": 100.50,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Save event
|
||||
err := store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Check if EventStored was published
|
||||
select {
|
||||
case publishedEvent := <-ch:
|
||||
if publishedEvent == nil {
|
||||
t.Fatal("received nil event from broadcaster")
|
||||
}
|
||||
if publishedEvent.EventType != aether.EventTypeEventStored {
|
||||
t.Errorf("expected EventType %q, got %q", aether.EventTypeEventStored, publishedEvent.EventType)
|
||||
}
|
||||
if publishedEvent.ActorID != "order-456" {
|
||||
t.Errorf("expected ActorID %q, got %q", "order-456", publishedEvent.ActorID)
|
||||
}
|
||||
if publishedEvent.Version != 1 {
|
||||
t.Errorf("expected Version 1, got %d", publishedEvent.Version)
|
||||
}
|
||||
// Check data contains original event info
|
||||
if publishedEvent.Data["eventId"] != "evt-123" {
|
||||
t.Errorf("expected eventId %q, got %q", "evt-123", publishedEvent.Data["eventId"])
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("timeout waiting for EventStored event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_VersionConflict_NoEventStored(t *testing.T) {
|
||||
broadcaster := aether.NewEventBus()
|
||||
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||
|
||||
// Subscribe to EventStored events
|
||||
ch := broadcaster.Subscribe("test-namespace")
|
||||
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||
|
||||
// Save first event
|
||||
event1 := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err := store.SaveEvent(event1)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent(event1) failed: %v", err)
|
||||
}
|
||||
|
||||
// Drain the first EventStored event
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("timeout waiting for first EventStored event")
|
||||
}
|
||||
|
||||
// Try to save event with non-increasing version (should fail)
|
||||
event2 := &aether.Event{
|
||||
ID: "evt-2",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1, // Same version, should conflict
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = store.SaveEvent(event2)
|
||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
t.Fatalf("expected ErrVersionConflict, got %v", err)
|
||||
}
|
||||
|
||||
// Verify no EventStored event was published
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatal("expected no EventStored event, but received one")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected - no event published
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_MultipleEvents_PublishesMultipleEventStored(t *testing.T) {
|
||||
broadcaster := aether.NewEventBus()
|
||||
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||
|
||||
// Subscribe to EventStored events
|
||||
ch := broadcaster.Subscribe("test-namespace")
|
||||
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||
|
||||
// Save multiple events
|
||||
for i := int64(1); i <= 3; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: i,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err := store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify we received 3 EventStored events in order
|
||||
for i := int64(1); i <= 3; i++ {
|
||||
select {
|
||||
case publishedEvent := <-ch:
|
||||
if publishedEvent == nil {
|
||||
t.Fatal("received nil event from broadcaster")
|
||||
}
|
||||
if publishedEvent.Version != i {
|
||||
t.Errorf("expected Version %d, got %d", i, publishedEvent.Version)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatalf("timeout waiting for EventStored event %d", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_WithoutBroadcaster_NoPanic(t *testing.T) {
|
||||
// Test that SaveEvent works without a broadcaster (nil broadcaster)
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-123",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{
|
||||
"total": 100.50,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// This should not panic even though broadcaster is nil
|
||||
err := store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify event was saved
|
||||
events, err := store.GetEvents("order-456", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected 1 event, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user