Implement comprehensive tests for VersionConflictError in event_test.go covering: - Error message formatting with all context fields - Field accessibility (ActorID, AttemptedVersion, CurrentVersion) - Unwrap method for error wrapping - errors.Is sentinel checking - errors.As type assertion - Application's ability to read CurrentVersion for retry strategies - Edge cases including special characters and large version numbers Add examples/ directory with standard retry patterns: - SimpleRetryPattern: Basic retry with exponential backoff - ConflictDetailedRetryPattern: Intelligent retry with conflict analysis - JitterRetryPattern: Prevent thundering herd with randomized backoff - AdaptiveRetryPattern: Adjust backoff based on contention level - EventualConsistencyPattern: Asynchronous retry via queue - CircuitBreakerPattern: Prevent cascading failures Includes comprehensive documentation in examples/README.md explaining each pattern's use cases, performance characteristics, and implementation guidance. Closes #62 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
354 lines
10 KiB
Go
354 lines
10 KiB
Go
package examples
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"math"
|
|
"math/rand"
|
|
"time"
|
|
|
|
"git.flowmade.one/flowmade-one/aether"
|
|
)
|
|
|
|
// SimpleRetryPattern demonstrates a basic retry loop using VersionConflictError.
|
|
//
|
|
// This pattern is suitable for scenarios where you want to automatically retry
|
|
// with exponential backoff when version conflicts occur.
|
|
func SimpleRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
|
const maxRetries = 3
|
|
const initialBackoff = 100 * time.Millisecond
|
|
|
|
var event *aether.Event
|
|
|
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
|
if attempt > 0 {
|
|
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * initialBackoff
|
|
log.Printf("Retry attempt %d after %v", attempt, backoff)
|
|
time.Sleep(backoff)
|
|
}
|
|
|
|
// Get the current version for the actor
|
|
currentVersion, err := store.GetLatestVersion(actorID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get latest version: %w", err)
|
|
}
|
|
|
|
// Create event with next version
|
|
event = &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d-%d", time.Now().UnixNano(), attempt),
|
|
EventType: eventType,
|
|
ActorID: actorID,
|
|
Version: currentVersion + 1,
|
|
Data: map[string]interface{}{"attempt": attempt},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
// Attempt to save
|
|
if err := store.SaveEvent(event); err == nil {
|
|
log.Printf("Successfully saved event for actor %s at version %d", actorID, event.Version)
|
|
return nil
|
|
} else if !errors.Is(err, aether.ErrVersionConflict) {
|
|
// Some other error occurred
|
|
return fmt.Errorf("save event failed: %w", err)
|
|
}
|
|
// If it's a version conflict, loop will retry
|
|
}
|
|
|
|
return fmt.Errorf("failed to save event after %d retries", maxRetries)
|
|
}
|
|
|
|
// ConflictDetailedRetryPattern demonstrates how to extract detailed information
|
|
// from VersionConflictError to make intelligent retry decisions.
|
|
//
|
|
// This pattern shows how to log detailed context and potentially implement
|
|
// circuit-breaker logic based on the conflict information.
|
|
func ConflictDetailedRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
|
const maxRetries = 5
|
|
var lastConflictVersion int64
|
|
|
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
|
// Get current version
|
|
currentVersion, err := store.GetLatestVersion(actorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create event
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
|
EventType: eventType,
|
|
ActorID: actorID,
|
|
Version: currentVersion + 1,
|
|
Data: map[string]interface{}{"timestamp": time.Now()},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
// Attempt to save
|
|
err = store.SaveEvent(event)
|
|
if err == nil {
|
|
return nil // Success
|
|
}
|
|
|
|
// Check if it's a version conflict
|
|
var versionErr *aether.VersionConflictError
|
|
if !errors.As(err, &versionErr) {
|
|
// Not a version conflict, fail immediately
|
|
return err
|
|
}
|
|
|
|
// Extract detailed context from the conflict error
|
|
log.Printf(
|
|
"Version conflict for actor %q: attempted version %d, current version %d",
|
|
versionErr.ActorID,
|
|
versionErr.AttemptedVersion,
|
|
versionErr.CurrentVersion,
|
|
)
|
|
|
|
// Check for thrashing (multiple conflicts with same version)
|
|
if lastConflictVersion == versionErr.CurrentVersion && attempt > 0 {
|
|
log.Printf("Detected version thrashing - circuit breaker would trigger here")
|
|
return fmt.Errorf("circuit breaker: too many conflicts at version %d", versionErr.CurrentVersion)
|
|
}
|
|
lastConflictVersion = versionErr.CurrentVersion
|
|
|
|
// Exponential backoff
|
|
backoff := time.Duration(math.Pow(2, float64(attempt))) * 100 * time.Millisecond
|
|
time.Sleep(backoff)
|
|
}
|
|
|
|
return fmt.Errorf("failed after %d retries", maxRetries)
|
|
}
|
|
|
|
// JitterRetryPattern implements exponential backoff with jitter to prevent
|
|
// thundering herd when multiple writers retry simultaneously.
|
|
func JitterRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
|
const maxRetries = 3
|
|
const baseBackoff = 100 * time.Millisecond
|
|
const maxJitter = 0.1 // 10% jitter
|
|
|
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
|
currentVersion, err := store.GetLatestVersion(actorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
|
EventType: eventType,
|
|
ActorID: actorID,
|
|
Version: currentVersion + 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
err = store.SaveEvent(event)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
if !errors.Is(err, aether.ErrVersionConflict) {
|
|
return err
|
|
}
|
|
|
|
// Exponential backoff with jitter
|
|
exponentialBackoff := time.Duration(math.Pow(2, float64(attempt))) * baseBackoff
|
|
jitter := time.Duration(rand.Float64() * float64(exponentialBackoff) * maxJitter)
|
|
totalBackoff := exponentialBackoff + jitter
|
|
|
|
log.Printf("Retrying in %v (attempt %d/%d)", totalBackoff, attempt+1, maxRetries)
|
|
time.Sleep(totalBackoff)
|
|
}
|
|
|
|
return fmt.Errorf("failed after %d retries", maxRetries)
|
|
}
|
|
|
|
// AdaptiveRetryPattern adjusts retry strategy based on version conflict patterns.
|
|
//
|
|
// This pattern demonstrates how application logic can use CurrentVersion to
|
|
// decide whether to retry, give up, or escalate to a higher-level handler.
|
|
func AdaptiveRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
|
const maxRetries = 3
|
|
|
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
|
currentVersion, err := store.GetLatestVersion(actorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
|
EventType: eventType,
|
|
ActorID: actorID,
|
|
Version: currentVersion + 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
err = store.SaveEvent(event)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
var versionErr *aether.VersionConflictError
|
|
if !errors.As(err, &versionErr) {
|
|
return err
|
|
}
|
|
|
|
// Adaptive backoff based on version distance
|
|
versionDistance := versionErr.CurrentVersion - versionErr.AttemptedVersion
|
|
if versionDistance > 10 {
|
|
// Many concurrent writers - back off more aggressively
|
|
log.Printf("High contention detected (gap: %d), aggressive backoff", versionDistance)
|
|
time.Sleep(time.Duration(versionDistance*10) * time.Millisecond)
|
|
} else if versionDistance > 3 {
|
|
// Moderate contention - normal backoff
|
|
log.Printf("Moderate contention detected (gap: %d)", versionDistance)
|
|
time.Sleep(time.Duration(versionDistance) * time.Millisecond)
|
|
} else {
|
|
// Light contention - minimal backoff
|
|
log.Printf("Light contention detected")
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
return fmt.Errorf("failed after %d retries", maxRetries)
|
|
}
|
|
|
|
// EventualConsistencyPattern demonstrates how to handle version conflicts
|
|
// in an eventually consistent manner by publishing to a retry queue.
|
|
//
|
|
// This is useful when immediate retry is not feasible, and you want to
|
|
// defer the operation to a background worker.
|
|
type RetryQueueItem struct {
|
|
Event *aether.Event
|
|
ConflictVersion int64
|
|
ConflictAttempted int64
|
|
NextRetryTime time.Time
|
|
FailureCount int
|
|
}
|
|
|
|
func EventualConsistencyPattern(store aether.EventStore, retryQueue chan<- RetryQueueItem, event *aether.Event) {
|
|
err := store.SaveEvent(event)
|
|
if err == nil {
|
|
return
|
|
}
|
|
|
|
var versionErr *aether.VersionConflictError
|
|
if !errors.As(err, &versionErr) {
|
|
log.Printf("Non-retryable error: %v", err)
|
|
return
|
|
}
|
|
|
|
// Queue for retry - background worker will process this
|
|
retryItem := RetryQueueItem{
|
|
Event: event,
|
|
ConflictVersion: versionErr.CurrentVersion,
|
|
ConflictAttempted: versionErr.AttemptedVersion,
|
|
NextRetryTime: time.Now().Add(1 * time.Second),
|
|
FailureCount: 0,
|
|
}
|
|
|
|
select {
|
|
case retryQueue <- retryItem:
|
|
log.Printf("Queued event for retry: actor=%s", event.ActorID)
|
|
case <-time.After(5 * time.Second):
|
|
log.Printf("Failed to queue event for retry (queue full)")
|
|
}
|
|
}
|
|
|
|
// CircuitBreakerPattern implements a simple circuit breaker for version conflicts.
|
|
//
|
|
// The circuit breaker tracks failure rates and temporarily stops retrying
|
|
// when the failure rate gets too high, allowing the system to recover.
|
|
type CircuitBreaker struct {
|
|
failureCount int
|
|
successCount int
|
|
state string // "closed", "open", "half-open"
|
|
lastFailureTime time.Time
|
|
openDuration time.Duration
|
|
failureThreshold int
|
|
successThreshold int
|
|
}
|
|
|
|
func NewCircuitBreaker() *CircuitBreaker {
|
|
return &CircuitBreaker{
|
|
state: "closed",
|
|
openDuration: 30 * time.Second,
|
|
failureThreshold: 5,
|
|
successThreshold: 3,
|
|
}
|
|
}
|
|
|
|
func (cb *CircuitBreaker) RecordSuccess() {
|
|
if cb.state == "half-open" {
|
|
cb.successCount++
|
|
if cb.successCount >= cb.successThreshold {
|
|
cb.state = "closed"
|
|
cb.failureCount = 0
|
|
cb.successCount = 0
|
|
log.Printf("Circuit breaker closed")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cb *CircuitBreaker) RecordFailure() {
|
|
cb.lastFailureTime = time.Now()
|
|
cb.failureCount++
|
|
if cb.failureCount >= cb.failureThreshold {
|
|
cb.state = "open"
|
|
log.Printf("Circuit breaker opened")
|
|
}
|
|
}
|
|
|
|
func (cb *CircuitBreaker) CanRetry() bool {
|
|
if cb.state == "closed" {
|
|
return true
|
|
}
|
|
if cb.state == "open" {
|
|
if time.Since(cb.lastFailureTime) > cb.openDuration {
|
|
cb.state = "half-open"
|
|
cb.failureCount = 0
|
|
cb.successCount = 0
|
|
log.Printf("Circuit breaker half-open")
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
// half-open state allows retries
|
|
return true
|
|
}
|
|
|
|
func CircuitBreakerRetryPattern(store aether.EventStore, cb *CircuitBreaker, actorID string, eventType string) error {
|
|
if !cb.CanRetry() {
|
|
return fmt.Errorf("circuit breaker open - not retrying")
|
|
}
|
|
|
|
currentVersion, err := store.GetLatestVersion(actorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
|
EventType: eventType,
|
|
ActorID: actorID,
|
|
Version: currentVersion + 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
err = store.SaveEvent(event)
|
|
if err == nil {
|
|
cb.RecordSuccess()
|
|
return nil
|
|
}
|
|
|
|
if !errors.Is(err, aether.ErrVersionConflict) {
|
|
return err
|
|
}
|
|
|
|
cb.RecordFailure()
|
|
return fmt.Errorf("save failed with version conflict, circuit breaker status: %s", cb.state)
|
|
}
|