Compare commits
6 Commits
d929729d79
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7487a5f3af | ||
|
|
b67417ac68 | ||
|
|
5b5083dcf8 | ||
|
|
6549125f3d | ||
|
|
464fed67ec | ||
|
|
46e1c44017 |
@@ -17,37 +17,3 @@ jobs:
|
|||||||
run: go build ./...
|
run: go build ./...
|
||||||
- name: Test
|
- name: Test
|
||||||
run: go test ./...
|
run: go test ./...
|
||||||
|
|
||||||
integration:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
- uses: actions/setup-go@v5
|
|
||||||
with:
|
|
||||||
go-version: '1.23'
|
|
||||||
- name: Install and Start NATS Server
|
|
||||||
run: |
|
|
||||||
# Detect architecture and download appropriate binary
|
|
||||||
ARCH=$(uname -m)
|
|
||||||
if [ "$ARCH" = "x86_64" ]; then
|
|
||||||
NATS_ARCH="amd64"
|
|
||||||
elif [ "$ARCH" = "aarch64" ] || [ "$ARCH" = "arm64" ]; then
|
|
||||||
NATS_ARCH="arm64"
|
|
||||||
else
|
|
||||||
echo "Unsupported architecture: $ARCH"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "Detected architecture: $ARCH, using NATS binary: $NATS_ARCH"
|
|
||||||
|
|
||||||
# Download and extract nats-server
|
|
||||||
curl -L "https://github.com/nats-io/nats-server/releases/download/v2.10.24/nats-server-v2.10.24-linux-${NATS_ARCH}.tar.gz" -o nats-server.tar.gz
|
|
||||||
tar -xzf nats-server.tar.gz
|
|
||||||
|
|
||||||
# Start NATS with JetStream
|
|
||||||
./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server -js -p 4222 &
|
|
||||||
|
|
||||||
# Wait for NATS to be ready
|
|
||||||
sleep 3
|
|
||||||
./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server --version
|
|
||||||
- name: Run Integration Tests
|
|
||||||
run: go test -tags=integration -v ./...
|
|
||||||
|
|||||||
29
README.md
29
README.md
@@ -107,7 +107,34 @@ Order state after replaying 2 events:
|
|||||||
|
|
||||||
### Events are immutable
|
### Events are immutable
|
||||||
|
|
||||||
Events represent facts about what happened. Once saved, they are never modified - you only append new events.
|
Events represent facts about what happened. Once saved, they are never modified or deleted - you only append new events. This immutability guarantee is enforced at multiple levels:
|
||||||
|
|
||||||
|
**Interface Design**: The `EventStore` interface provides no Update or Delete methods. Only `SaveEvent` (append), `GetEvents` (read), and `GetLatestVersion` (read) are available.
|
||||||
|
|
||||||
|
**JetStream Storage**: When using `JetStreamEventStore`, events are stored in a NATS JetStream stream configured with:
|
||||||
|
- File-based storage (durable)
|
||||||
|
- Limits-based retention policy (events expire after configured duration, not before)
|
||||||
|
- No mechanism to modify or delete individual events during their lifetime
|
||||||
|
|
||||||
|
**Audit Trail Guarantee**: Because events are immutable once persisted, they serve as a trustworthy audit trail. You can rely on the fact that historical events won't change, enabling compliance and forensics.
|
||||||
|
|
||||||
|
To correct a mistake, append a new event that expresses the correction rather than modifying history:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Wrong: Cannot update an event
|
||||||
|
// store.UpdateEvent(eventID, newData) // This method doesn't exist
|
||||||
|
|
||||||
|
// Right: Append a new event that corrects the record
|
||||||
|
correctionEvent := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: "OrderCorrected",
|
||||||
|
ActorID: orderID,
|
||||||
|
Version: currentVersion + 1,
|
||||||
|
Data: map[string]interface{}{"reason": "price adjustment"},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
err := store.SaveEvent(correctionEvent)
|
||||||
|
```
|
||||||
|
|
||||||
### State is derived
|
### State is derived
|
||||||
|
|
||||||
|
|||||||
22
event.go
22
event.go
@@ -73,6 +73,14 @@ type Event struct {
|
|||||||
Timestamp time.Time `json:"timestamp"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Common event types for Aether infrastructure
|
||||||
|
const (
|
||||||
|
// EventTypeEventStored is an internal event published when an event is successfully persisted.
|
||||||
|
// This event allows observability components (metrics, projections, audit systems) to react
|
||||||
|
// to persisted events without coupling to application code.
|
||||||
|
EventTypeEventStored = "EventStored"
|
||||||
|
)
|
||||||
|
|
||||||
// Common metadata keys for distributed tracing and auditing
|
// Common metadata keys for distributed tracing and auditing
|
||||||
const (
|
const (
|
||||||
// MetadataKeyCorrelationID identifies related events across services
|
// MetadataKeyCorrelationID identifies related events across services
|
||||||
@@ -176,6 +184,17 @@ type ActorSnapshot struct {
|
|||||||
|
|
||||||
// EventStore defines the interface for event persistence.
|
// EventStore defines the interface for event persistence.
|
||||||
//
|
//
|
||||||
|
// # Immutability Guarantee
|
||||||
|
//
|
||||||
|
// EventStore is append-only. Once an event is persisted via SaveEvent, it is never
|
||||||
|
// modified or deleted. The interface intentionally provides no Update or Delete methods.
|
||||||
|
// This ensures:
|
||||||
|
// - Events serve as an immutable audit trail
|
||||||
|
// - State can be safely derived by replaying events
|
||||||
|
// - Concurrent reads are always safe (events never change)
|
||||||
|
//
|
||||||
|
// To correct a mistake, append a new event that expresses the correction.
|
||||||
|
//
|
||||||
// # Version Semantics
|
// # Version Semantics
|
||||||
//
|
//
|
||||||
// Events for an actor must have monotonically increasing versions. When SaveEvent
|
// Events for an actor must have monotonically increasing versions. When SaveEvent
|
||||||
@@ -196,10 +215,13 @@ type EventStore interface {
|
|||||||
// SaveEvent persists an event to the store. The event's Version must be
|
// SaveEvent persists an event to the store. The event's Version must be
|
||||||
// strictly greater than the current latest version for the actor.
|
// strictly greater than the current latest version for the actor.
|
||||||
// Returns VersionConflictError if version <= current latest version.
|
// Returns VersionConflictError if version <= current latest version.
|
||||||
|
// Once saved, the event is immutable and can never be modified or deleted.
|
||||||
SaveEvent(event *Event) error
|
SaveEvent(event *Event) error
|
||||||
|
|
||||||
// GetEvents retrieves events for an actor from a specific version (inclusive).
|
// GetEvents retrieves events for an actor from a specific version (inclusive).
|
||||||
// Returns an empty slice if no events exist for the actor.
|
// Returns an empty slice if no events exist for the actor.
|
||||||
|
// The returned events are guaranteed to be immutable - they will never be
|
||||||
|
// modified or deleted from the store.
|
||||||
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
|
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
|
||||||
|
|
||||||
// GetLatestVersion returns the latest version for an actor.
|
// GetLatestVersion returns the latest version for an actor.
|
||||||
|
|||||||
225
event_test.go
225
event_test.go
@@ -3,6 +3,7 @@ package aether
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -1339,159 +1340,187 @@ func TestReplayError_WithLargeRawData(t *testing.T) {
|
|||||||
|
|
||||||
// Tests for VersionConflictError
|
// Tests for VersionConflictError
|
||||||
|
|
||||||
func TestVersionConflictError_ErrorMessage(t *testing.T) {
|
func TestVersionConflictError_Error(t *testing.T) {
|
||||||
err := &VersionConflictError{
|
err := &VersionConflictError{
|
||||||
ActorID: "order-123",
|
ActorID: "order-123",
|
||||||
|
AttemptedVersion: 3,
|
||||||
CurrentVersion: 5,
|
CurrentVersion: 5,
|
||||||
AttemptedVersion: 5,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := err.Error()
|
errMsg := err.Error()
|
||||||
if !strings.Contains(msg, "order-123") {
|
|
||||||
t.Errorf("expected ActorID in message, got: %s", msg)
|
// Verify error message contains all context
|
||||||
|
if !strings.Contains(errMsg, "order-123") {
|
||||||
|
t.Errorf("error message should contain ActorID, got: %s", errMsg)
|
||||||
}
|
}
|
||||||
if !strings.Contains(msg, "5") {
|
if !strings.Contains(errMsg, "3") {
|
||||||
t.Errorf("expected versions in message, got: %s", msg)
|
t.Errorf("error message should contain AttemptedVersion, got: %s", errMsg)
|
||||||
}
|
}
|
||||||
if !strings.Contains(msg, "version conflict") {
|
if !strings.Contains(errMsg, "5") {
|
||||||
t.Errorf("expected 'version conflict' in message, got: %s", msg)
|
t.Errorf("error message should contain CurrentVersion, got: %s", errMsg)
|
||||||
|
}
|
||||||
|
if !strings.Contains(errMsg, "version conflict") {
|
||||||
|
t.Errorf("error message should contain 'version conflict', got: %s", errMsg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestVersionConflictError_ActorIDField(t *testing.T) {
|
func TestVersionConflictError_Fields(t *testing.T) {
|
||||||
err := &VersionConflictError{
|
err := &VersionConflictError{
|
||||||
ActorID: "test-actor-456",
|
ActorID: "actor-456",
|
||||||
CurrentVersion: 10,
|
AttemptedVersion: 10,
|
||||||
AttemptedVersion: 11,
|
CurrentVersion: 8,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err.ActorID != "test-actor-456" {
|
if err.ActorID != "actor-456" {
|
||||||
t.Errorf("ActorID field access failed: got %q, want %q", err.ActorID, "test-actor-456")
|
t.Errorf("ActorID mismatch: got %q, want %q", err.ActorID, "actor-456")
|
||||||
}
|
}
|
||||||
|
if err.AttemptedVersion != 10 {
|
||||||
|
t.Errorf("AttemptedVersion mismatch: got %d, want %d", err.AttemptedVersion, 10)
|
||||||
}
|
}
|
||||||
|
if err.CurrentVersion != 8 {
|
||||||
func TestVersionConflictError_AttemptedVersionField(t *testing.T) {
|
t.Errorf("CurrentVersion mismatch: got %d, want %d", err.CurrentVersion, 8)
|
||||||
err := &VersionConflictError{
|
|
||||||
ActorID: "actor-123",
|
|
||||||
CurrentVersion: 5,
|
|
||||||
AttemptedVersion: 99,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err.AttemptedVersion != 99 {
|
|
||||||
t.Errorf("AttemptedVersion field access failed: got %d, want %d", err.AttemptedVersion, 99)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestVersionConflictError_CurrentVersionField(t *testing.T) {
|
|
||||||
err := &VersionConflictError{
|
|
||||||
ActorID: "actor-123",
|
|
||||||
CurrentVersion: 42,
|
|
||||||
AttemptedVersion: 43,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err.CurrentVersion != 42 {
|
|
||||||
t.Errorf("CurrentVersion field access failed: got %d, want %d", err.CurrentVersion, 42)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestVersionConflictError_Unwrap(t *testing.T) {
|
func TestVersionConflictError_Unwrap(t *testing.T) {
|
||||||
err := &VersionConflictError{
|
err := &VersionConflictError{
|
||||||
ActorID: "order-456",
|
ActorID: "actor-789",
|
||||||
CurrentVersion: 3,
|
AttemptedVersion: 2,
|
||||||
AttemptedVersion: 3,
|
CurrentVersion: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
unwrapped := err.Unwrap()
|
unwrapped := err.Unwrap()
|
||||||
if !errors.Is(unwrapped, ErrVersionConflict) {
|
if unwrapped != ErrVersionConflict {
|
||||||
t.Errorf("expected Unwrap to return ErrVersionConflict sentinel")
|
t.Errorf("Unwrap should return ErrVersionConflict sentinel")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestVersionConflictError_ErrorsIs(t *testing.T) {
|
func TestVersionConflictError_ErrorsIs(t *testing.T) {
|
||||||
err := &VersionConflictError{
|
err := &VersionConflictError{
|
||||||
ActorID: "actor-789",
|
ActorID: "test-actor",
|
||||||
CurrentVersion: 7,
|
AttemptedVersion: 5,
|
||||||
AttemptedVersion: 8,
|
CurrentVersion: 4,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that errors.Is works with sentinel
|
||||||
if !errors.Is(err, ErrVersionConflict) {
|
if !errors.Is(err, ErrVersionConflict) {
|
||||||
t.Error("expected errors.Is to recognize VersionConflictError as ErrVersionConflict")
|
t.Error("errors.Is(err, ErrVersionConflict) should return true")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that other errors don't match
|
||||||
|
if errors.Is(err, errors.New("other error")) {
|
||||||
|
t.Error("errors.Is should not match unrelated errors")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestVersionConflictError_ErrorsAs(t *testing.T) {
|
func TestVersionConflictError_ErrorsAs(t *testing.T) {
|
||||||
originalErr := &VersionConflictError{
|
originalErr := &VersionConflictError{
|
||||||
ActorID: "user-123",
|
ActorID: "actor-unwrap",
|
||||||
CurrentVersion: 20,
|
AttemptedVersion: 7,
|
||||||
AttemptedVersion: 21,
|
CurrentVersion: 6,
|
||||||
}
|
}
|
||||||
|
|
||||||
var versionErr *VersionConflictError
|
var versionErr *VersionConflictError
|
||||||
if !errors.As(originalErr, &versionErr) {
|
if !errors.As(originalErr, &versionErr) {
|
||||||
t.Fatal("expected errors.As to extract VersionConflictError")
|
t.Fatalf("errors.As should succeed with VersionConflictError")
|
||||||
}
|
}
|
||||||
|
|
||||||
if versionErr.ActorID != "user-123" {
|
// Verify fields are accessible through unwrapped error
|
||||||
t.Errorf("extracted ActorID mismatch: got %q, want %q", versionErr.ActorID, "user-123")
|
if versionErr.ActorID != "actor-unwrap" {
|
||||||
|
t.Errorf("ActorID mismatch after As: got %q", versionErr.ActorID)
|
||||||
}
|
}
|
||||||
if versionErr.CurrentVersion != 20 {
|
if versionErr.AttemptedVersion != 7 {
|
||||||
t.Errorf("extracted CurrentVersion mismatch: got %d, want %d", versionErr.CurrentVersion, 20)
|
t.Errorf("AttemptedVersion mismatch after As: got %d", versionErr.AttemptedVersion)
|
||||||
}
|
}
|
||||||
if versionErr.AttemptedVersion != 21 {
|
if versionErr.CurrentVersion != 6 {
|
||||||
t.Errorf("extracted AttemptedVersion mismatch: got %d, want %d", versionErr.AttemptedVersion, 21)
|
t.Errorf("CurrentVersion mismatch after As: got %d", versionErr.CurrentVersion)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestVersionConflictError_RetryLogicWithCurrentVersion(t *testing.T) {
|
func TestVersionConflictError_CanReadCurrentVersion(t *testing.T) {
|
||||||
// Demonstrates how applications extract CurrentVersion for retry logic
|
// This test verifies that applications can read CurrentVersion for retry strategies
|
||||||
err := &VersionConflictError{
|
err := &VersionConflictError{
|
||||||
ActorID: "order-555",
|
ActorID: "order-abc",
|
||||||
CurrentVersion: 15,
|
|
||||||
AttemptedVersion: 16,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Application can check for conflict and use CurrentVersion
|
|
||||||
if errors.Is(err, ErrVersionConflict) {
|
|
||||||
var versionErr *VersionConflictError
|
|
||||||
if errors.As(err, &versionErr) {
|
|
||||||
// Application uses CurrentVersion to determine next attempt
|
|
||||||
nextVersion := versionErr.CurrentVersion + 1
|
|
||||||
if nextVersion != 16 {
|
|
||||||
t.Errorf("retry version calculation failed: got %d, want %d", nextVersion, 16)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestVersionConflictError_SpecialCharactersInActorID(t *testing.T) {
|
|
||||||
specialChars := []string{
|
|
||||||
"actor-with-dashes",
|
|
||||||
"actor_with_underscores",
|
|
||||||
"actor.with.dots",
|
|
||||||
"actor@special",
|
|
||||||
"actor with spaces",
|
|
||||||
"actor:with:colons",
|
|
||||||
"actor/with/slashes",
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, actorID := range specialChars {
|
|
||||||
t.Run(actorID, func(t *testing.T) {
|
|
||||||
err := &VersionConflictError{
|
|
||||||
ActorID: actorID,
|
|
||||||
CurrentVersion: 1,
|
|
||||||
AttemptedVersion: 2,
|
AttemptedVersion: 2,
|
||||||
|
CurrentVersion: 10,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify fields are preserved
|
var versionErr *VersionConflictError
|
||||||
if err.ActorID != actorID {
|
if !errors.As(err, &versionErr) {
|
||||||
t.Errorf("ActorID not preserved: got %q, want %q", err.ActorID, actorID)
|
t.Fatal("failed to unwrap VersionConflictError")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify message includes the ActorID
|
// Application can use CurrentVersion to decide retry strategy
|
||||||
|
nextVersion := versionErr.CurrentVersion + 1
|
||||||
|
|
||||||
|
if nextVersion != 11 {
|
||||||
|
t.Errorf("application should be able to compute next version: got %d, want 11", nextVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Application can log detailed context
|
||||||
|
logMsg := fmt.Sprintf("Version conflict for actor %q: attempted %d, current %d, will retry with %d",
|
||||||
|
versionErr.ActorID, versionErr.AttemptedVersion, versionErr.CurrentVersion, nextVersion)
|
||||||
|
|
||||||
|
if !strings.Contains(logMsg, "order-abc") {
|
||||||
|
t.Errorf("application context logging failed: %s", logMsg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVersionConflictError_EdgeCases(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
actorID string
|
||||||
|
attemp int64
|
||||||
|
current int64
|
||||||
|
}{
|
||||||
|
{"zero current", "actor-1", 1, 0},
|
||||||
|
{"large numbers", "actor-2", 1000000, 999999},
|
||||||
|
{"max int64", "actor-3", 9223372036854775807, 9223372036854775806},
|
||||||
|
{"negative attempt", "actor-4", -1, -2},
|
||||||
|
{"empty actor id", "", 1, 0},
|
||||||
|
{"special chars in actor id", "actor@#$%", 2, 1},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
err := &VersionConflictError{
|
||||||
|
ActorID: tc.actorID,
|
||||||
|
AttemptedVersion: tc.attemp,
|
||||||
|
CurrentVersion: tc.current,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should not panic
|
||||||
msg := err.Error()
|
msg := err.Error()
|
||||||
if !strings.Contains(msg, actorID) {
|
if msg == "" {
|
||||||
t.Errorf("ActorID not in error message for %q: %s", actorID, msg)
|
t.Error("Error() should return non-empty string")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should be wrapped correctly
|
||||||
|
if err.Unwrap() != ErrVersionConflict {
|
||||||
|
t.Error("Unwrap should return ErrVersionConflict")
|
||||||
|
}
|
||||||
|
|
||||||
|
// errors.Is should work
|
||||||
|
if !errors.Is(err, ErrVersionConflict) {
|
||||||
|
t.Error("errors.Is should work for edge case")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestErrVersionConflict_Sentinel(t *testing.T) {
|
||||||
|
// Verify the sentinel error is correctly defined
|
||||||
|
if ErrVersionConflict == nil {
|
||||||
|
t.Fatal("ErrVersionConflict should not be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedMsg := "version conflict"
|
||||||
|
if ErrVersionConflict.Error() != expectedMsg {
|
||||||
|
t.Errorf("ErrVersionConflict message mismatch: got %q, want %q", ErrVersionConflict.Error(), expectedMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that it's usable with errors.Is
|
||||||
|
if !errors.Is(ErrVersionConflict, ErrVersionConflict) {
|
||||||
|
t.Error("ErrVersionConflict should match itself with errors.Is")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,235 +1,189 @@
|
|||||||
# Aether Examples
|
# Aether Examples
|
||||||
|
|
||||||
Standard patterns and best practices for building with Aether.
|
This directory contains examples demonstrating common patterns for using Aether.
|
||||||
|
|
||||||
## Version Conflict Retry Patterns
|
## Retry Patterns (`retry_patterns.go`)
|
||||||
|
|
||||||
When using optimistic concurrency control with Aether's event store, version conflicts can occur when multiple writers attempt to save events for the same actor. The `VersionConflictError` provides full context about the conflict, enabling intelligent retry strategies.
|
When saving events with optimistic concurrency control, your application may encounter `VersionConflictError` when multiple writers attempt to update the same actor concurrently. This file demonstrates several retry strategies.
|
||||||
|
|
||||||
### Understanding Version Conflicts
|
### Pattern Overview
|
||||||
|
|
||||||
A version conflict occurs when:
|
All retry patterns work with `VersionConflictError` which provides three critical fields:
|
||||||
- You attempt to save an event with version `N`
|
|
||||||
- But the actor already has a version >= `N`
|
- **ActorID**: The actor that experienced the conflict
|
||||||
|
- **CurrentVersion**: The latest version in the store
|
||||||
|
- **AttemptedVersion**: The version you tried to save
|
||||||
|
|
||||||
|
Your application can read these fields to make intelligent retry decisions.
|
||||||
|
|
||||||
|
### Available Patterns
|
||||||
|
|
||||||
|
#### SimpleRetryPattern
|
||||||
|
|
||||||
|
The most basic pattern - just retry with exponential backoff:
|
||||||
|
|
||||||
Example:
|
|
||||||
```go
|
```go
|
||||||
// Actor "order-123" currently has version 5
|
// Automatically retries up to 3 times with exponential backoff
|
||||||
// Writer A reads version 5, creates version 6, saves successfully
|
err := SimpleRetryPattern(store, "order-123", "OrderUpdated")
|
||||||
// Writer B also read version 5, creates version 6, attempts save
|
|
||||||
// -> VersionConflictError: current=6, attempted=6
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Working with VersionConflictError
|
**Use when**: You want a straightforward retry mechanism without complex logic.
|
||||||
|
|
||||||
The `VersionConflictError` provides:
|
#### ConflictDetailedRetryPattern
|
||||||
- `ActorID` - The actor that had the conflict
|
|
||||||
- `CurrentVersion` - The actual current version in the store
|
Extracts detailed information from the conflict error to make smarter decisions:
|
||||||
- `AttemptedVersion` - The version you tried to save
|
|
||||||
|
```go
|
||||||
|
// Detects thrashing (multiple conflicts at same version)
|
||||||
|
// and can implement circuit-breaker logic
|
||||||
|
err := ConflictDetailedRetryPattern(store, "order-123", "OrderUpdated")
|
||||||
|
```
|
||||||
|
|
||||||
|
**Use when**: You need visibility into conflict patterns and want to detect system issues like thrashing.
|
||||||
|
|
||||||
|
#### JitterRetryPattern
|
||||||
|
|
||||||
|
Adds randomized jitter to prevent "thundering herd" when multiple writers retry:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Exponential backoff with jitter prevents synchronized retries
|
||||||
|
err := JitterRetryPattern(store, "order-123", "OrderUpdated")
|
||||||
|
```
|
||||||
|
|
||||||
|
**Use when**: You have high concurrency and want to prevent all writers from retrying at the same time.
|
||||||
|
|
||||||
|
#### AdaptiveRetryPattern
|
||||||
|
|
||||||
|
Adjusts backoff duration based on version distance (indicator of contention):
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Light contention (gap=1): 50ms backoff
|
||||||
|
// Moderate contention (gap=3-10): proportional backoff
|
||||||
|
// High contention (gap>10): aggressive backoff
|
||||||
|
err := AdaptiveRetryPattern(store, "order-123", "OrderUpdated")
|
||||||
|
```
|
||||||
|
|
||||||
|
**Use when**: You want backoff strategy to respond to actual system load.
|
||||||
|
|
||||||
|
#### EventualConsistencyPattern
|
||||||
|
|
||||||
|
Instead of blocking on retry, queues the event for asynchronous retry:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Returns immediately, event is queued for background retry
|
||||||
|
EventualConsistencyPattern(store, retryQueue, event)
|
||||||
|
|
||||||
|
// Background worker processes the queue
|
||||||
|
for item := range retryQueue {
|
||||||
|
// Implement your own retry logic here
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Use when**: You can't afford to block the request, and background retry is acceptable.
|
||||||
|
|
||||||
|
#### CircuitBreakerPattern
|
||||||
|
|
||||||
|
Implements a circuit breaker to prevent cascading failures:
|
||||||
|
|
||||||
|
```go
|
||||||
|
cb := NewCircuitBreaker()
|
||||||
|
|
||||||
|
// Fails fast when circuit is open
|
||||||
|
err := CircuitBreakerRetryPattern(store, cb, "order-123", "OrderUpdated")
|
||||||
|
if err != nil && !cb.CanRetry() {
|
||||||
|
return ErrCircuitBreakerOpen
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Use when**: You have a distributed system and want to prevent retry storms during outages.
|
||||||
|
|
||||||
|
## Common Pattern: Extract and Log Context
|
||||||
|
|
||||||
|
All patterns can read context from `VersionConflictError`:
|
||||||
|
|
||||||
Example usage:
|
|
||||||
```go
|
```go
|
||||||
err := eventStore.SaveEvent(event)
|
|
||||||
if errors.Is(err, aether.ErrVersionConflict) {
|
|
||||||
var versionErr *aether.VersionConflictError
|
var versionErr *aether.VersionConflictError
|
||||||
if errors.As(err, &versionErr) {
|
if errors.As(err, &versionErr) {
|
||||||
fmt.Printf("Conflict for actor %q: current=%d, attempted=%d",
|
log.Printf(
|
||||||
versionErr.ActorID, versionErr.CurrentVersion, versionErr.AttemptedVersion)
|
"Conflict for actor %q: attempted %d, current %d",
|
||||||
// Implement retry logic using CurrentVersion
|
versionErr.ActorID,
|
||||||
nextVersion := versionErr.CurrentVersion + 1
|
versionErr.AttemptedVersion,
|
||||||
}
|
versionErr.CurrentVersion,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
### Recommended Patterns
|
## Sentinel Error Check
|
||||||
|
|
||||||
#### Pattern 1: Simple Exponential Backoff (Recommended for Most Cases)
|
Check if an error is a version conflict without examining the struct:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
const maxRetries = 5
|
if errors.Is(err, aether.ErrVersionConflict) {
|
||||||
const baseDelay = 10 * time.Millisecond
|
// This is a version conflict - retry is appropriate
|
||||||
|
|
||||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
|
||||||
currentVersion, _ := eventStore.GetLatestVersion(actorID)
|
|
||||||
|
|
||||||
event := &aether.Event{
|
|
||||||
ActorID: actorID,
|
|
||||||
Version: currentVersion + 1,
|
|
||||||
// ...
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err := eventStore.SaveEvent(event)
|
|
||||||
if err == nil {
|
|
||||||
return nil // Success!
|
|
||||||
}
|
|
||||||
|
|
||||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
|
||||||
return err // Different error, don't retry
|
|
||||||
}
|
|
||||||
|
|
||||||
// Exponential backoff: 10ms, 20ms, 40ms, 80ms, 160ms
|
|
||||||
delay := time.Duration(baseDelay.Milliseconds() * int64(math.Pow(2, float64(attempt)))) * time.Millisecond
|
|
||||||
time.Sleep(delay)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("max retries exceeded")
|
|
||||||
```
|
```
|
||||||
|
|
||||||
**Pros:**
|
## Implementing Your Own Pattern
|
||||||
- Simple to understand and implement
|
|
||||||
- Respects store capacity
|
|
||||||
- Good for most scenarios
|
|
||||||
|
|
||||||
**Cons:**
|
Basic template:
|
||||||
- Can cause thundering herd in high-concurrency scenarios
|
|
||||||
- May not work well if conflicts are due to logical issues
|
|
||||||
|
|
||||||
#### Pattern 2: State Reload and Merge
|
|
||||||
|
|
||||||
Use this pattern when you can merge concurrent changes:
|
|
||||||
|
|
||||||
```go
|
```go
|
||||||
const maxRetries = 3
|
|
||||||
|
|
||||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||||
// Reload current state
|
// 1. Get current version
|
||||||
events, _ := eventStore.GetEvents(actorID, 0)
|
currentVersion, err := store.GetLatestVersion(actorID)
|
||||||
aggregate := rebuildFromEvents(events)
|
if err != nil {
|
||||||
|
|
||||||
// Apply your update
|
|
||||||
aggregate.Status = "shipped"
|
|
||||||
|
|
||||||
// Attempt save with new version
|
|
||||||
event := &aether.Event{
|
|
||||||
ActorID: actorID,
|
|
||||||
Version: aggregate.Version + 1,
|
|
||||||
Data: map[string]interface{}{"status": aggregate.Status},
|
|
||||||
}
|
|
||||||
|
|
||||||
err := eventStore.SaveEvent(event)
|
|
||||||
if err == nil {
|
|
||||||
return nil // Success!
|
|
||||||
}
|
|
||||||
|
|
||||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reload and retry (loop continues)
|
// 2. Create event with next version
|
||||||
}
|
event := &aether.Event{
|
||||||
```
|
ActorID: actorID,
|
||||||
|
Version: currentVersion + 1,
|
||||||
**Pros:**
|
// ... other fields
|
||||||
- Deterministic - will eventually succeed
|
|
||||||
- Can merge concurrent updates
|
|
||||||
- Good for business logic that's idempotent
|
|
||||||
|
|
||||||
**Cons:**
|
|
||||||
- More expensive (replaying events each attempt)
|
|
||||||
- Only works if updates can be safely retried
|
|
||||||
|
|
||||||
#### Pattern 3: Circuit Breaker for Cascading Failures
|
|
||||||
|
|
||||||
Use when you want to avoid hammering a saturated store:
|
|
||||||
|
|
||||||
```go
|
|
||||||
type CircuitBreaker struct {
|
|
||||||
state string // "closed", "open", "half-open"
|
|
||||||
failures int
|
|
||||||
failureThreshold int
|
|
||||||
lastFailureTime time.Time
|
|
||||||
cooldownTime time.Duration
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ... implement circuit breaker logic ...
|
// 3. Attempt save
|
||||||
|
err = store.SaveEvent(event)
|
||||||
// Usage:
|
|
||||||
if !cb.canAttempt() {
|
|
||||||
return fmt.Errorf("circuit breaker open")
|
|
||||||
}
|
|
||||||
|
|
||||||
err := eventStore.SaveEvent(event)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
cb.recordSuccess()
|
return nil // Success
|
||||||
} else if errors.Is(err, aether.ErrVersionConflict) {
|
|
||||||
cb.recordFailure()
|
|
||||||
if cb.failureCount >= cb.failureThreshold {
|
|
||||||
cb.open()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 4. Check if it's a conflict
|
||||||
|
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||||
|
return err // Some other error
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Implement your retry strategy
|
||||||
|
time.Sleep(yourBackoff(attempt))
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
**Pros:**
|
## Choosing a Pattern
|
||||||
- Prevents cascading failures
|
|
||||||
- Allows store recovery time
|
|
||||||
- Good for distributed systems
|
|
||||||
|
|
||||||
**Cons:**
|
| Pattern | Latency | Throughput | Complexity | Use Case |
|
||||||
- More complex implementation
|
|---------|---------|-----------|-----------|----------|
|
||||||
- May reject valid requests temporarily
|
| Simple | Low | Low | Very Low | Single writer, testing |
|
||||||
|
| DetailedConflict | Low | Medium | Medium | Debugging, monitoring |
|
||||||
|
| Jitter | Low-Medium | High | Low | Multi-writer concurrency |
|
||||||
|
| Adaptive | Low-Medium | High | Medium | Variable load scenarios |
|
||||||
|
| EventualConsistency | Very Low | Very High | High | High-volume, async-OK workloads |
|
||||||
|
| CircuitBreaker | Variable | Stable | High | Distributed, failure-resilient systems |
|
||||||
|
|
||||||
#### Pattern 4: Jittered Backoff for High Concurrency
|
## Performance Considerations
|
||||||
|
|
||||||
Add randomness to prevent thundering herd:
|
1. **Backoff timing**: Shorter backoffs waste CPU on retries, longer backoffs increase latency
|
||||||
|
2. **Retry limits**: Too few retries give up too early, too many waste resources
|
||||||
|
3. **Jitter**: Essential for preventing synchronized retries in high-concurrency scenarios
|
||||||
|
4. **Monitoring**: Track retry rates and conflict patterns to detect system issues
|
||||||
|
|
||||||
|
## Testing
|
||||||
|
|
||||||
|
Use `aether.NewInMemoryEventStore()` in tests:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
exponentialDelay := time.Duration(baseDelay.Milliseconds() * int64(math.Pow(2, float64(attempt)))) * time.Millisecond
|
store := store.NewInMemoryEventStore()
|
||||||
jitter := time.Duration(rand.Int63n(int64(exponentialDelay)))
|
err := SimpleRetryPattern(store, "test-actor", "TestEvent")
|
||||||
delay := exponentialDelay + jitter
|
if err != nil {
|
||||||
time.Sleep(delay)
|
t.Fatalf("retry pattern failed: %v", err)
|
||||||
```
|
|
||||||
|
|
||||||
**Pros:**
|
|
||||||
- Prevents synchronized retries
|
|
||||||
- Good for high-concurrency scenarios
|
|
||||||
|
|
||||||
**Cons:**
|
|
||||||
- Slightly more complex
|
|
||||||
- May increase total retry time
|
|
||||||
|
|
||||||
### Complete Example
|
|
||||||
|
|
||||||
See `version_conflict_retry.go` for complete, runnable examples of all patterns.
|
|
||||||
|
|
||||||
### When to Use Each Pattern
|
|
||||||
|
|
||||||
| Pattern | Use When | Avoid When |
|
|
||||||
|---------|----------|-----------|
|
|
||||||
| Exponential Backoff | Default choice for most apps | Store is consistently overloaded |
|
|
||||||
| State Reload | Updates can be safely replayed | Event replay is expensive |
|
|
||||||
| Circuit Breaker | Store is frequently saturated | You need immediate feedback |
|
|
||||||
| Jittered Backoff | Many concurrent writers | Single-threaded app |
|
|
||||||
|
|
||||||
### Monitoring Version Conflicts
|
|
||||||
|
|
||||||
Log and monitor version conflicts to understand contention patterns:
|
|
||||||
|
|
||||||
```go
|
|
||||||
var versionErr *aether.VersionConflictError
|
|
||||||
if errors.As(err, &versionErr) {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"actor_id": versionErr.ActorID,
|
|
||||||
"current_version": versionErr.CurrentVersion,
|
|
||||||
"attempted_version": versionErr.AttemptedVersion,
|
|
||||||
"version_gap": versionErr.AttemptedVersion - versionErr.CurrentVersion,
|
|
||||||
}).Warn("Version conflict")
|
|
||||||
|
|
||||||
// Alert if gap is too large (indicates stale read)
|
|
||||||
if versionErr.AttemptedVersion - versionErr.CurrentVersion > 5 {
|
|
||||||
metrics.versionConflictLargeGap.Inc()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
### Best Practices
|
|
||||||
|
|
||||||
1. **Always check the error type** - Not all errors are version conflicts
|
|
||||||
2. **Use CurrentVersion for retries** - Don't hardcode retry logic
|
|
||||||
3. **Set reasonable retry limits** - Prevent infinite loops
|
|
||||||
4. **Monitor contention** - Track version conflicts to identify hotspots
|
|
||||||
5. **Consider your domain** - Some updates can be safely retried, others cannot
|
|
||||||
6. **Test concurrent scenarios** - Version conflicts are rare in single-threaded apps
|
|
||||||
|
|
||||||
### References
|
|
||||||
|
|
||||||
- [CLAUDE.md](../CLAUDE.md) - Architecture and event versioning semantics
|
|
||||||
- [Event Sourcing Patterns](../vision.md) - Domain-driven design approach
|
|
||||||
|
|||||||
353
examples/retry_patterns.go
Normal file
353
examples/retry_patterns.go
Normal file
@@ -0,0 +1,353 @@
|
|||||||
|
package examples
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"math/rand"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SimpleRetryPattern demonstrates a basic retry loop using VersionConflictError.
|
||||||
|
//
|
||||||
|
// This pattern is suitable for scenarios where you want to automatically retry
|
||||||
|
// with exponential backoff when version conflicts occur.
|
||||||
|
func SimpleRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
||||||
|
const maxRetries = 3
|
||||||
|
const initialBackoff = 100 * time.Millisecond
|
||||||
|
|
||||||
|
var event *aether.Event
|
||||||
|
|
||||||
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||||
|
if attempt > 0 {
|
||||||
|
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * initialBackoff
|
||||||
|
log.Printf("Retry attempt %d after %v", attempt, backoff)
|
||||||
|
time.Sleep(backoff)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the current version for the actor
|
||||||
|
currentVersion, err := store.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get latest version: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create event with next version
|
||||||
|
event = &aether.Event{
|
||||||
|
ID: fmt.Sprintf("evt-%d-%d", time.Now().UnixNano(), attempt),
|
||||||
|
EventType: eventType,
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: currentVersion + 1,
|
||||||
|
Data: map[string]interface{}{"attempt": attempt},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to save
|
||||||
|
if err := store.SaveEvent(event); err == nil {
|
||||||
|
log.Printf("Successfully saved event for actor %s at version %d", actorID, event.Version)
|
||||||
|
return nil
|
||||||
|
} else if !errors.Is(err, aether.ErrVersionConflict) {
|
||||||
|
// Some other error occurred
|
||||||
|
return fmt.Errorf("save event failed: %w", err)
|
||||||
|
}
|
||||||
|
// If it's a version conflict, loop will retry
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("failed to save event after %d retries", maxRetries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConflictDetailedRetryPattern demonstrates how to extract detailed information
|
||||||
|
// from VersionConflictError to make intelligent retry decisions.
|
||||||
|
//
|
||||||
|
// This pattern shows how to log detailed context and potentially implement
|
||||||
|
// circuit-breaker logic based on the conflict information.
|
||||||
|
func ConflictDetailedRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
||||||
|
const maxRetries = 5
|
||||||
|
var lastConflictVersion int64
|
||||||
|
|
||||||
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||||
|
// Get current version
|
||||||
|
currentVersion, err := store.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create event
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
||||||
|
EventType: eventType,
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: currentVersion + 1,
|
||||||
|
Data: map[string]interface{}{"timestamp": time.Now()},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to save
|
||||||
|
err = store.SaveEvent(event)
|
||||||
|
if err == nil {
|
||||||
|
return nil // Success
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if it's a version conflict
|
||||||
|
var versionErr *aether.VersionConflictError
|
||||||
|
if !errors.As(err, &versionErr) {
|
||||||
|
// Not a version conflict, fail immediately
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract detailed context from the conflict error
|
||||||
|
log.Printf(
|
||||||
|
"Version conflict for actor %q: attempted version %d, current version %d",
|
||||||
|
versionErr.ActorID,
|
||||||
|
versionErr.AttemptedVersion,
|
||||||
|
versionErr.CurrentVersion,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Check for thrashing (multiple conflicts with same version)
|
||||||
|
if lastConflictVersion == versionErr.CurrentVersion && attempt > 0 {
|
||||||
|
log.Printf("Detected version thrashing - circuit breaker would trigger here")
|
||||||
|
return fmt.Errorf("circuit breaker: too many conflicts at version %d", versionErr.CurrentVersion)
|
||||||
|
}
|
||||||
|
lastConflictVersion = versionErr.CurrentVersion
|
||||||
|
|
||||||
|
// Exponential backoff
|
||||||
|
backoff := time.Duration(math.Pow(2, float64(attempt))) * 100 * time.Millisecond
|
||||||
|
time.Sleep(backoff)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("failed after %d retries", maxRetries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// JitterRetryPattern implements exponential backoff with jitter to prevent
|
||||||
|
// thundering herd when multiple writers retry simultaneously.
|
||||||
|
func JitterRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
||||||
|
const maxRetries = 3
|
||||||
|
const baseBackoff = 100 * time.Millisecond
|
||||||
|
const maxJitter = 0.1 // 10% jitter
|
||||||
|
|
||||||
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||||
|
currentVersion, err := store.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
||||||
|
EventType: eventType,
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: currentVersion + 1,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event)
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exponential backoff with jitter
|
||||||
|
exponentialBackoff := time.Duration(math.Pow(2, float64(attempt))) * baseBackoff
|
||||||
|
jitter := time.Duration(rand.Float64() * float64(exponentialBackoff) * maxJitter)
|
||||||
|
totalBackoff := exponentialBackoff + jitter
|
||||||
|
|
||||||
|
log.Printf("Retrying in %v (attempt %d/%d)", totalBackoff, attempt+1, maxRetries)
|
||||||
|
time.Sleep(totalBackoff)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("failed after %d retries", maxRetries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AdaptiveRetryPattern adjusts retry strategy based on version conflict patterns.
|
||||||
|
//
|
||||||
|
// This pattern demonstrates how application logic can use CurrentVersion to
|
||||||
|
// decide whether to retry, give up, or escalate to a higher-level handler.
|
||||||
|
func AdaptiveRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
||||||
|
const maxRetries = 3
|
||||||
|
|
||||||
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||||
|
currentVersion, err := store.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
||||||
|
EventType: eventType,
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: currentVersion + 1,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event)
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var versionErr *aether.VersionConflictError
|
||||||
|
if !errors.As(err, &versionErr) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adaptive backoff based on version distance
|
||||||
|
versionDistance := versionErr.CurrentVersion - versionErr.AttemptedVersion
|
||||||
|
if versionDistance > 10 {
|
||||||
|
// Many concurrent writers - back off more aggressively
|
||||||
|
log.Printf("High contention detected (gap: %d), aggressive backoff", versionDistance)
|
||||||
|
time.Sleep(time.Duration(versionDistance*10) * time.Millisecond)
|
||||||
|
} else if versionDistance > 3 {
|
||||||
|
// Moderate contention - normal backoff
|
||||||
|
log.Printf("Moderate contention detected (gap: %d)", versionDistance)
|
||||||
|
time.Sleep(time.Duration(versionDistance) * time.Millisecond)
|
||||||
|
} else {
|
||||||
|
// Light contention - minimal backoff
|
||||||
|
log.Printf("Light contention detected")
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("failed after %d retries", maxRetries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventualConsistencyPattern demonstrates how to handle version conflicts
|
||||||
|
// in an eventually consistent manner by publishing to a retry queue.
|
||||||
|
//
|
||||||
|
// This is useful when immediate retry is not feasible, and you want to
|
||||||
|
// defer the operation to a background worker.
|
||||||
|
type RetryQueueItem struct {
|
||||||
|
Event *aether.Event
|
||||||
|
ConflictVersion int64
|
||||||
|
ConflictAttempted int64
|
||||||
|
NextRetryTime time.Time
|
||||||
|
FailureCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
func EventualConsistencyPattern(store aether.EventStore, retryQueue chan<- RetryQueueItem, event *aether.Event) {
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var versionErr *aether.VersionConflictError
|
||||||
|
if !errors.As(err, &versionErr) {
|
||||||
|
log.Printf("Non-retryable error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Queue for retry - background worker will process this
|
||||||
|
retryItem := RetryQueueItem{
|
||||||
|
Event: event,
|
||||||
|
ConflictVersion: versionErr.CurrentVersion,
|
||||||
|
ConflictAttempted: versionErr.AttemptedVersion,
|
||||||
|
NextRetryTime: time.Now().Add(1 * time.Second),
|
||||||
|
FailureCount: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case retryQueue <- retryItem:
|
||||||
|
log.Printf("Queued event for retry: actor=%s", event.ActorID)
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
log.Printf("Failed to queue event for retry (queue full)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CircuitBreakerPattern implements a simple circuit breaker for version conflicts.
|
||||||
|
//
|
||||||
|
// The circuit breaker tracks failure rates and temporarily stops retrying
|
||||||
|
// when the failure rate gets too high, allowing the system to recover.
|
||||||
|
type CircuitBreaker struct {
|
||||||
|
failureCount int
|
||||||
|
successCount int
|
||||||
|
state string // "closed", "open", "half-open"
|
||||||
|
lastFailureTime time.Time
|
||||||
|
openDuration time.Duration
|
||||||
|
failureThreshold int
|
||||||
|
successThreshold int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCircuitBreaker() *CircuitBreaker {
|
||||||
|
return &CircuitBreaker{
|
||||||
|
state: "closed",
|
||||||
|
openDuration: 30 * time.Second,
|
||||||
|
failureThreshold: 5,
|
||||||
|
successThreshold: 3,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cb *CircuitBreaker) RecordSuccess() {
|
||||||
|
if cb.state == "half-open" {
|
||||||
|
cb.successCount++
|
||||||
|
if cb.successCount >= cb.successThreshold {
|
||||||
|
cb.state = "closed"
|
||||||
|
cb.failureCount = 0
|
||||||
|
cb.successCount = 0
|
||||||
|
log.Printf("Circuit breaker closed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cb *CircuitBreaker) RecordFailure() {
|
||||||
|
cb.lastFailureTime = time.Now()
|
||||||
|
cb.failureCount++
|
||||||
|
if cb.failureCount >= cb.failureThreshold {
|
||||||
|
cb.state = "open"
|
||||||
|
log.Printf("Circuit breaker opened")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cb *CircuitBreaker) CanRetry() bool {
|
||||||
|
if cb.state == "closed" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if cb.state == "open" {
|
||||||
|
if time.Since(cb.lastFailureTime) > cb.openDuration {
|
||||||
|
cb.state = "half-open"
|
||||||
|
cb.failureCount = 0
|
||||||
|
cb.successCount = 0
|
||||||
|
log.Printf("Circuit breaker half-open")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// half-open state allows retries
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func CircuitBreakerRetryPattern(store aether.EventStore, cb *CircuitBreaker, actorID string, eventType string) error {
|
||||||
|
if !cb.CanRetry() {
|
||||||
|
return fmt.Errorf("circuit breaker open - not retrying")
|
||||||
|
}
|
||||||
|
|
||||||
|
currentVersion, err := store.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
||||||
|
EventType: eventType,
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: currentVersion + 1,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event)
|
||||||
|
if err == nil {
|
||||||
|
cb.RecordSuccess()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cb.RecordFailure()
|
||||||
|
return fmt.Errorf("save failed with version conflict, circuit breaker status: %s", cb.state)
|
||||||
|
}
|
||||||
@@ -1,356 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"math"
|
|
||||||
"math/rand"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.flowmade.one/flowmade-one/aether"
|
|
||||||
"git.flowmade.one/flowmade-one/aether/store"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Example 1: Simple Retry with Exponential Backoff
|
|
||||||
// This is the most common pattern for handling version conflicts.
|
|
||||||
func simpleRetryWithExponentialBackoff(eventStore aether.EventStore, actorID string) error {
|
|
||||||
const maxRetries = 5
|
|
||||||
const baseDelay = 10 * time.Millisecond
|
|
||||||
|
|
||||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
|
||||||
// Get current version
|
|
||||||
currentVersion, err := eventStore.GetLatestVersion(actorID)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get latest version: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create event with next version
|
|
||||||
event := &aether.Event{
|
|
||||||
ID: uuid.New().String(),
|
|
||||||
EventType: "OrderUpdated",
|
|
||||||
ActorID: actorID,
|
|
||||||
Version: currentVersion + 1,
|
|
||||||
Data: map[string]interface{}{"status": "processing"},
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempt to save
|
|
||||||
err = eventStore.SaveEvent(event)
|
|
||||||
if err == nil {
|
|
||||||
// Success!
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if it's a version conflict
|
|
||||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
|
||||||
// Different error - don't retry
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Version conflict - extract details for logging
|
|
||||||
var versionErr *aether.VersionConflictError
|
|
||||||
if errors.As(err, &versionErr) {
|
|
||||||
log.Printf("Attempt %d: Version conflict for actor %q: current=%d, attempted=%d",
|
|
||||||
attempt+1, versionErr.ActorID, versionErr.CurrentVersion, versionErr.AttemptedVersion)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Last attempt - return error
|
|
||||||
if attempt == maxRetries-1 {
|
|
||||||
return fmt.Errorf("failed to save event after %d attempts: %w", maxRetries, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate delay with exponential backoff: baseDelay * 2^attempt
|
|
||||||
delay := time.Duration(baseDelay.Milliseconds()*int64(math.Pow(2, float64(attempt)))) * time.Millisecond
|
|
||||||
log.Printf("Retrying in %v...", delay)
|
|
||||||
time.Sleep(delay)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Example 2: Retry with Circuit Breaker Pattern
|
|
||||||
// Use a circuit breaker to avoid hammering the store during cascading failures.
|
|
||||||
type CircuitBreaker struct {
|
|
||||||
failureThreshold int
|
|
||||||
cooldownTime time.Duration
|
|
||||||
failures int
|
|
||||||
lastFailureTime time.Time
|
|
||||||
state string // "closed", "open", "half-open"
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cb *CircuitBreaker) canAttempt() bool {
|
|
||||||
switch cb.state {
|
|
||||||
case "closed":
|
|
||||||
return true
|
|
||||||
case "open":
|
|
||||||
// Check if we should transition to half-open
|
|
||||||
if time.Since(cb.lastFailureTime) > cb.cooldownTime {
|
|
||||||
cb.state = "half-open"
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
case "half-open":
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cb *CircuitBreaker) recordSuccess() {
|
|
||||||
cb.failures = 0
|
|
||||||
cb.state = "closed"
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cb *CircuitBreaker) recordFailure() {
|
|
||||||
cb.failures++
|
|
||||||
cb.lastFailureTime = time.Now()
|
|
||||||
if cb.failures >= cb.failureThreshold {
|
|
||||||
cb.state = "open"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func retryWithCircuitBreaker(eventStore aether.EventStore, actorID string) error {
|
|
||||||
cb := &CircuitBreaker{
|
|
||||||
failureThreshold: 5,
|
|
||||||
cooldownTime: 1 * time.Second,
|
|
||||||
state: "closed",
|
|
||||||
}
|
|
||||||
|
|
||||||
const maxAttempts = 10
|
|
||||||
|
|
||||||
for attempt := 0; attempt < maxAttempts; attempt++ {
|
|
||||||
if !cb.canAttempt() {
|
|
||||||
return fmt.Errorf("circuit breaker is open for actor %q", actorID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get current version
|
|
||||||
currentVersion, err := eventStore.GetLatestVersion(actorID)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get latest version: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create event with next version
|
|
||||||
event := &aether.Event{
|
|
||||||
ID: uuid.New().String(),
|
|
||||||
EventType: "OrderUpdated",
|
|
||||||
ActorID: actorID,
|
|
||||||
Version: currentVersion + 1,
|
|
||||||
Data: map[string]interface{}{"status": "processing"},
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempt to save
|
|
||||||
err = eventStore.SaveEvent(event)
|
|
||||||
if err == nil {
|
|
||||||
cb.recordSuccess()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if it's a version conflict
|
|
||||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
|
||||||
// Different error - don't retry
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
cb.recordFailure()
|
|
||||||
|
|
||||||
// Version conflict - extract details
|
|
||||||
var versionErr *aether.VersionConflictError
|
|
||||||
if errors.As(err, &versionErr) {
|
|
||||||
log.Printf("Circuit breaker (state=%s): Version conflict for actor %q",
|
|
||||||
cb.state, versionErr.ActorID)
|
|
||||||
}
|
|
||||||
|
|
||||||
if attempt < maxAttempts-1 {
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return fmt.Errorf("max retry attempts exceeded for actor %q", actorID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Example 3: Deterministic Retry - Always Succeed by Reloading Latest State
|
|
||||||
// Some applications can afford to reload state and merge changes.
|
|
||||||
type OrderAggregate struct {
|
|
||||||
ID string
|
|
||||||
Version int64
|
|
||||||
Status string
|
|
||||||
Amount float64
|
|
||||||
Comments []string
|
|
||||||
}
|
|
||||||
|
|
||||||
func retryWithStateReload(eventStore aether.EventStore, orderID string, updateFn func(*OrderAggregate) error) error {
|
|
||||||
const maxRetries = 3
|
|
||||||
|
|
||||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
|
||||||
// Load current state by replaying events
|
|
||||||
events, err := eventStore.GetEvents(orderID, 0)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to load events: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Rebuild aggregate from events
|
|
||||||
aggregate := &OrderAggregate{ID: orderID}
|
|
||||||
for _, e := range events {
|
|
||||||
aggregate.Version = e.Version
|
|
||||||
// Apply event to aggregate (simplified)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply update
|
|
||||||
if err := updateFn(aggregate); err != nil {
|
|
||||||
return fmt.Errorf("update function failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create event for the update
|
|
||||||
event := &aether.Event{
|
|
||||||
ID: uuid.New().String(),
|
|
||||||
EventType: "OrderUpdated",
|
|
||||||
ActorID: orderID,
|
|
||||||
Version: aggregate.Version + 1,
|
|
||||||
Data: map[string]interface{}{"status": aggregate.Status},
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempt to save
|
|
||||||
err = eventStore.SaveEvent(event)
|
|
||||||
if err == nil {
|
|
||||||
// Success!
|
|
||||||
log.Printf("Order %q updated with version %d", orderID, event.Version)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if it's a version conflict
|
|
||||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
|
||||||
// Different error - don't retry
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var versionErr *aether.VersionConflictError
|
|
||||||
if errors.As(err, &versionErr) {
|
|
||||||
log.Printf("Version conflict on attempt %d: current=%d, attempted=%d. Retrying...",
|
|
||||||
attempt+1, versionErr.CurrentVersion, versionErr.AttemptedVersion)
|
|
||||||
}
|
|
||||||
|
|
||||||
if attempt < maxRetries-1 {
|
|
||||||
// Brief delay before retry
|
|
||||||
time.Sleep(time.Duration(50*(attempt+1)) * time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return fmt.Errorf("failed to update order %q after %d retries", orderID, maxRetries)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Example 4: Jittered Retry
|
|
||||||
// Add randomness to prevent thundering herd problem.
|
|
||||||
func retryWithJitteredBackoff(eventStore aether.EventStore, actorID string) error {
|
|
||||||
const maxRetries = 5
|
|
||||||
const baseDelay = 10 * time.Millisecond
|
|
||||||
|
|
||||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
|
||||||
// Get current version
|
|
||||||
currentVersion, err := eventStore.GetLatestVersion(actorID)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get latest version: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create event with next version
|
|
||||||
event := &aether.Event{
|
|
||||||
ID: uuid.New().String(),
|
|
||||||
EventType: "OrderUpdated",
|
|
||||||
ActorID: actorID,
|
|
||||||
Version: currentVersion + 1,
|
|
||||||
Data: map[string]interface{}{"status": "processing"},
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempt to save
|
|
||||||
err = eventStore.SaveEvent(event)
|
|
||||||
if err == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if it's a version conflict
|
|
||||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if attempt == maxRetries-1 {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate delay with exponential backoff + jitter
|
|
||||||
exponentialDelay := time.Duration(baseDelay.Milliseconds()*int64(math.Pow(2, float64(attempt)))) * time.Millisecond
|
|
||||||
jitter := time.Duration(rand.Int63n(int64(exponentialDelay))) // Random jitter up to exponential delay
|
|
||||||
delay := exponentialDelay + jitter
|
|
||||||
|
|
||||||
log.Printf("Retrying in %v with jitter...", delay)
|
|
||||||
time.Sleep(delay)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Example 5: Analyzing Version Conflict Context
|
|
||||||
// Extract all relevant information from a version conflict for debugging/monitoring.
|
|
||||||
func analyzeVersionConflict(err error) {
|
|
||||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
|
||||||
log.Println("Not a version conflict error")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var versionErr *aether.VersionConflictError
|
|
||||||
if errors.As(err, &versionErr) {
|
|
||||||
log.Printf("Version Conflict Details:")
|
|
||||||
log.Printf(" Actor ID: %s", versionErr.ActorID)
|
|
||||||
log.Printf(" Current Version: %d", versionErr.CurrentVersion)
|
|
||||||
log.Printf(" Attempted Version: %d", versionErr.AttemptedVersion)
|
|
||||||
log.Printf(" Version Gap: %d", versionErr.AttemptedVersion-versionErr.CurrentVersion)
|
|
||||||
log.Printf(" Error Message: %s", versionErr.Error())
|
|
||||||
|
|
||||||
// Application-specific logic
|
|
||||||
if versionErr.CurrentVersion > versionErr.AttemptedVersion {
|
|
||||||
log.Printf("WARNING: Attempted version %d is behind current version %d",
|
|
||||||
versionErr.AttemptedVersion, versionErr.CurrentVersion)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Example usage showing all patterns
|
|
||||||
func main() {
|
|
||||||
// Create in-memory event store for demonstration
|
|
||||||
eventStore := store.NewInMemoryEventStore()
|
|
||||||
actorID := "order-123"
|
|
||||||
|
|
||||||
fmt.Println("=== Version Conflict Retry Patterns ===")
|
|
||||||
|
|
||||||
// Demonstrate patterns (using simplified versions)
|
|
||||||
log.Println("Pattern 1: Simple Exponential Backoff")
|
|
||||||
if err := simpleRetryWithExponentialBackoff(eventStore, actorID); err != nil {
|
|
||||||
log.Printf("Error: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Println("\nPattern 3: State Reload")
|
|
||||||
if err := retryWithStateReload(eventStore, actorID, func(agg *OrderAggregate) error {
|
|
||||||
agg.Status = "shipped"
|
|
||||||
return nil
|
|
||||||
}); err != nil {
|
|
||||||
log.Printf("Error: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Println("\nPattern 4: Jittered Backoff")
|
|
||||||
if err := retryWithJitteredBackoff(eventStore, actorID); err != nil {
|
|
||||||
log.Printf("Error: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("\n=== Recommended Pattern ===")
|
|
||||||
fmt.Println("Use exponential backoff (Pattern 1) for most cases:")
|
|
||||||
fmt.Println("- Simple to understand and implement")
|
|
||||||
fmt.Println("- Respects the store's capacity")
|
|
||||||
fmt.Println("- Avoids thundering herd (add jitter for high concurrency)")
|
|
||||||
fmt.Println("")
|
|
||||||
fmt.Println("Use state reload (Pattern 3) when:")
|
|
||||||
fmt.Println("- You can merge concurrent changes")
|
|
||||||
fmt.Println("- Deterministic success is required")
|
|
||||||
fmt.Println("- Replaying events is not expensive")
|
|
||||||
}
|
|
||||||
File diff suppressed because it is too large
Load Diff
215
store/immutability_test.go
Normal file
215
store/immutability_test.go
Normal file
@@ -0,0 +1,215 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestEventImmutability_MemoryStore verifies that events cannot be modified after persistence
|
||||||
|
// in the in-memory event store. This demonstrates the append-only nature of event sourcing.
|
||||||
|
func TestEventImmutability_MemoryStore(t *testing.T) {
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
actorID := "test-actor-123"
|
||||||
|
|
||||||
|
// Create and save an event
|
||||||
|
originalEvent := &aether.Event{
|
||||||
|
ID: "evt-immutable-1",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"value": "original",
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(originalEvent)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the event from the store
|
||||||
|
events, err := store.GetEvents(actorID, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(events) == 0 {
|
||||||
|
t.Fatal("expected 1 event, got 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
retrievedEvent := events[0]
|
||||||
|
|
||||||
|
// Verify the stored event has the correct values
|
||||||
|
if retrievedEvent.Data["value"] != "original" {
|
||||||
|
t.Errorf("Data value mismatch: got %v, want %v", retrievedEvent.Data["value"], "original")
|
||||||
|
}
|
||||||
|
|
||||||
|
if retrievedEvent.EventType != "TestEvent" {
|
||||||
|
t.Errorf("EventType mismatch: got %q, want %q", retrievedEvent.EventType, "TestEvent")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify ID is correct
|
||||||
|
if retrievedEvent.ID != "evt-immutable-1" {
|
||||||
|
t.Errorf("Event ID mismatch: got %q, want %q", retrievedEvent.ID, "evt-immutable-1")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEventImmutability_NoUpdateMethod verifies that the EventStore interface
|
||||||
|
// has only append, read methods - no Update or Delete methods.
|
||||||
|
func TestEventImmutability_NoUpdateMethod(t *testing.T) {
|
||||||
|
// This test documents that the EventStore interface is append-only.
|
||||||
|
// The interface intentionally provides:
|
||||||
|
// - SaveEvent: append only
|
||||||
|
// - GetEvents: read only
|
||||||
|
// - GetLatestVersion: read only
|
||||||
|
//
|
||||||
|
// To verify this, we demonstrate that any attempt to call non-existent
|
||||||
|
// update/delete methods would be caught at compile time (not runtime).
|
||||||
|
// This is enforced by the interface definition in event.go which does
|
||||||
|
// not include Update, Delete, or Modify methods.
|
||||||
|
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
|
||||||
|
// Compile-time check: these would not compile if we tried them:
|
||||||
|
// store.Update(event) // compile error: no such method
|
||||||
|
// store.Delete(eventID) // compile error: no such method
|
||||||
|
// store.Modify(eventID, newData) // compile error: no such method
|
||||||
|
|
||||||
|
// Only these methods exist:
|
||||||
|
var eventStore aether.EventStore = store
|
||||||
|
if eventStore == nil {
|
||||||
|
t.Fatal("eventStore is nil")
|
||||||
|
}
|
||||||
|
// If we got here, the compile-time checks passed
|
||||||
|
t.Log("EventStore interface enforces append-only semantics by design")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEventImmutability_VersionOnlyGoesUp verifies that versions are monotonically
|
||||||
|
// increasing and attempting to save with a non-increasing version fails.
|
||||||
|
func TestEventImmutability_VersionOnlyGoesUp(t *testing.T) {
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
actorID := "actor-version-check"
|
||||||
|
|
||||||
|
// Save first event with version 1
|
||||||
|
event1 := &aether.Event{
|
||||||
|
ID: "evt-v1",
|
||||||
|
EventType: "Event1",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(event1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent(v1) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to save with same version - should fail
|
||||||
|
event2Same := &aether.Event{
|
||||||
|
ID: "evt-v1-again",
|
||||||
|
EventType: "Event2",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1, // Same version
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event2Same)
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected SaveEvent(same version) to fail, but it succeeded")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to save with lower version - should fail
|
||||||
|
event3Lower := &aether.Event{
|
||||||
|
ID: "evt-v0",
|
||||||
|
EventType: "Event3",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 0, // Lower version
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event3Lower)
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected SaveEvent(lower version) to fail, but it succeeded")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save with next version - should succeed
|
||||||
|
event4Next := &aether.Event{
|
||||||
|
ID: "evt-v2",
|
||||||
|
EventType: "Event4",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 2,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event4Next)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent(v2) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we have exactly 2 events
|
||||||
|
events, err := store.GetEvents(actorID, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(events) != 2 {
|
||||||
|
t.Errorf("expected 2 events, got %d", len(events))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEventImmutability_EventCannotBeDeleted verifies that there is no way to delete
|
||||||
|
// events from the store through the EventStore interface.
|
||||||
|
func TestEventImmutability_EventCannotBeDeleted(t *testing.T) {
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
actorID := "actor-nodelete"
|
||||||
|
|
||||||
|
// Save an event
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: "evt-nodelete",
|
||||||
|
EventType: "ImportantEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{"critical": true},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve it
|
||||||
|
events1, err := store.GetEvents(actorID, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents (1) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(events1) != 1 {
|
||||||
|
t.Fatal("expected 1 event after save")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to delete through interface - this method doesn't exist
|
||||||
|
// store.Delete("evt-nodelete") // compile error: no such method
|
||||||
|
// store.DeleteByActorID(actorID) // compile error: no such method
|
||||||
|
|
||||||
|
// Verify the event is still there (we can't delete it)
|
||||||
|
events2, err := store.GetEvents(actorID, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents (2) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(events2) != 1 {
|
||||||
|
t.Errorf("expected 1 event (should not be deletable), got %d", len(events2))
|
||||||
|
}
|
||||||
|
|
||||||
|
if events2[0].ID != "evt-nodelete" {
|
||||||
|
t.Errorf("event ID changed: got %q, want %q", events2[0].ID, "evt-nodelete")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"git.flowmade.one/flowmade-one/aether"
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Default configuration values for JetStream event store
|
// Default configuration values for JetStream event store
|
||||||
@@ -19,7 +20,14 @@ const (
|
|||||||
|
|
||||||
// JetStreamConfig holds configuration options for JetStreamEventStore
|
// JetStreamConfig holds configuration options for JetStreamEventStore
|
||||||
type JetStreamConfig struct {
|
type JetStreamConfig struct {
|
||||||
// StreamRetention is how long to keep events (default: 1 year)
|
// StreamRetention is how long to keep events (default: 1 year).
|
||||||
|
// JetStream enforces this retention policy at the storage level using a limits-based policy:
|
||||||
|
// - MaxAge: Events older than this duration are automatically deleted
|
||||||
|
// - Storage is file-based (nats.FileStorage) for durability
|
||||||
|
// - Once the retention period expires, events are permanently removed from the stream
|
||||||
|
// This ensures that old events do not consume storage indefinitely.
|
||||||
|
// To keep events indefinitely, set StreamRetention to a very large value or configure
|
||||||
|
// a custom retention policy in the JetStream stream configuration.
|
||||||
StreamRetention time.Duration
|
StreamRetention time.Duration
|
||||||
// ReplicaCount is the number of replicas for high availability (default: 1)
|
// ReplicaCount is the number of replicas for high availability (default: 1)
|
||||||
ReplicaCount int
|
ReplicaCount int
|
||||||
@@ -41,6 +49,21 @@ func DefaultJetStreamConfig() JetStreamConfig {
|
|||||||
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
|
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
|
||||||
// It also implements EventStoreWithErrors to report malformed events during replay.
|
// It also implements EventStoreWithErrors to report malformed events during replay.
|
||||||
//
|
//
|
||||||
|
// ## Immutability Guarantee
|
||||||
|
//
|
||||||
|
// JetStreamEventStore is append-only. Events are stored in a JetStream stream that
|
||||||
|
// is configured with file-based storage (nats.FileStorage) and a retention policy
|
||||||
|
// (nats.LimitsPolicy). The configured MaxAge retention policy ensures that old events
|
||||||
|
// eventually expire, but during their lifetime, events are never modified or deleted
|
||||||
|
// through the EventStore API. Once an event is published to the stream:
|
||||||
|
// - It cannot be updated
|
||||||
|
// - It cannot be deleted before expiration
|
||||||
|
// - It can only be read
|
||||||
|
//
|
||||||
|
// This architectural guarantee, combined with the EventStore interface providing
|
||||||
|
// no Update or Delete methods, ensures events are immutable and suitable as an
|
||||||
|
// audit trail.
|
||||||
|
//
|
||||||
// ## Version Cache Invalidation Strategy
|
// ## Version Cache Invalidation Strategy
|
||||||
//
|
//
|
||||||
// JetStreamEventStore maintains an in-memory cache of actor versions for optimistic
|
// JetStreamEventStore maintains an in-memory cache of actor versions for optimistic
|
||||||
@@ -64,17 +87,13 @@ type JetStreamEventStore struct {
|
|||||||
config JetStreamConfig
|
config JetStreamConfig
|
||||||
mu sync.Mutex // Protects version checks during SaveEvent
|
mu sync.Mutex // Protects version checks during SaveEvent
|
||||||
versions map[string]int64 // actorID -> latest version cache
|
versions map[string]int64 // actorID -> latest version cache
|
||||||
|
broadcaster aether.EventBroadcaster // Optional broadcaster for EventStored events
|
||||||
|
namespace string // Optional namespace for event publishing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
|
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
|
||||||
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
|
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
|
||||||
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
|
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
|
||||||
@@ -130,6 +149,8 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
|
|||||||
streamName: effectiveStreamName,
|
streamName: effectiveStreamName,
|
||||||
config: config,
|
config: config,
|
||||||
versions: make(map[string]int64),
|
versions: make(map[string]int64),
|
||||||
|
broadcaster: nil,
|
||||||
|
namespace: "",
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -143,6 +164,58 @@ func (jes *JetStreamEventStore) GetStreamName() string {
|
|||||||
return jes.streamName
|
return jes.streamName
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewJetStreamEventStoreWithBroadcaster creates a new JetStream-based event store with broadcaster support.
|
||||||
|
// The broadcaster receives EventStored events when events are successfully saved.
|
||||||
|
func NewJetStreamEventStoreWithBroadcaster(natsConn *nats.Conn, streamName string, broadcaster aether.EventBroadcaster, namespace string) (*JetStreamEventStore, error) {
|
||||||
|
config := DefaultJetStreamConfig()
|
||||||
|
if namespace != "" {
|
||||||
|
config.Namespace = namespace
|
||||||
|
}
|
||||||
|
|
||||||
|
js, err := natsConn.JetStream()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply defaults for zero values
|
||||||
|
if config.StreamRetention == 0 {
|
||||||
|
config.StreamRetention = DefaultStreamRetention
|
||||||
|
}
|
||||||
|
if config.ReplicaCount == 0 {
|
||||||
|
config.ReplicaCount = DefaultReplicaCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply namespace prefix to stream name if provided
|
||||||
|
effectiveStreamName := streamName
|
||||||
|
if config.Namespace != "" {
|
||||||
|
effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create or update the stream
|
||||||
|
stream := &nats.StreamConfig{
|
||||||
|
Name: effectiveStreamName,
|
||||||
|
Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)},
|
||||||
|
Storage: nats.FileStorage,
|
||||||
|
Retention: nats.LimitsPolicy,
|
||||||
|
MaxAge: config.StreamRetention,
|
||||||
|
Replicas: config.ReplicaCount,
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = js.AddStream(stream)
|
||||||
|
if err != nil && !strings.Contains(err.Error(), "already exists") {
|
||||||
|
return nil, fmt.Errorf("failed to create stream: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &JetStreamEventStore{
|
||||||
|
js: js,
|
||||||
|
streamName: effectiveStreamName,
|
||||||
|
config: config,
|
||||||
|
versions: make(map[string]int64),
|
||||||
|
broadcaster: broadcaster,
|
||||||
|
namespace: namespace,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// SaveEvent persists an event to JetStream.
|
// SaveEvent persists an event to JetStream.
|
||||||
// Returns VersionConflictError if the event's version is not strictly greater
|
// Returns VersionConflictError if the event's version is not strictly greater
|
||||||
// than the current latest version for the actor.
|
// than the current latest version for the actor.
|
||||||
@@ -203,9 +276,34 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
|||||||
// Update version cache after successful publish
|
// Update version cache after successful publish
|
||||||
jes.versions[event.ActorID] = event.Version
|
jes.versions[event.ActorID] = event.Version
|
||||||
|
|
||||||
|
// Publish EventStored event after successful save (if broadcaster is configured)
|
||||||
|
if jes.broadcaster != nil {
|
||||||
|
jes.publishEventStored(event)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// publishEventStored publishes an EventStored event to the broadcaster.
|
||||||
|
// This is called after a successful SaveEvent to notify subscribers.
|
||||||
|
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
|
||||||
|
eventStored := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: aether.EventTypeEventStored,
|
||||||
|
ActorID: originalEvent.ActorID, // EventStored is about the original actor
|
||||||
|
Version: originalEvent.Version, // Preserve the version of the stored event
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"eventId": originalEvent.ID,
|
||||||
|
"actorId": originalEvent.ActorID,
|
||||||
|
"version": originalEvent.Version,
|
||||||
|
"timestamp": originalEvent.Timestamp.Unix(),
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
jes.broadcaster.Publish(jes.namespace, eventStored)
|
||||||
|
}
|
||||||
|
|
||||||
// GetEvents retrieves all events for an actor since a version.
|
// GetEvents retrieves all events for an actor since a version.
|
||||||
// Note: This method silently skips malformed events for backward compatibility.
|
// Note: This method silently skips malformed events for backward compatibility.
|
||||||
// Use GetEventsWithErrors to receive information about malformed events.
|
// Use GetEventsWithErrors to receive information about malformed events.
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -2,8 +2,10 @@ package store
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.flowmade.one/flowmade-one/aether"
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
// InMemoryEventStore provides a simple in-memory event store for testing
|
// InMemoryEventStore provides a simple in-memory event store for testing
|
||||||
@@ -11,6 +13,8 @@ type InMemoryEventStore struct {
|
|||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
events map[string][]*aether.Event // actorID -> events
|
events map[string][]*aether.Event // actorID -> events
|
||||||
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
||||||
|
broadcaster aether.EventBroadcaster // optional broadcaster for EventStored events
|
||||||
|
namespace string // optional namespace for event publishing
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewInMemoryEventStore creates a new in-memory event store
|
// NewInMemoryEventStore creates a new in-memory event store
|
||||||
@@ -21,9 +25,21 @@ func NewInMemoryEventStore() *InMemoryEventStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewInMemoryEventStoreWithBroadcaster creates a new in-memory event store with an event broadcaster
|
||||||
|
// The broadcaster receives EventStored events when events are successfully saved.
|
||||||
|
func NewInMemoryEventStoreWithBroadcaster(broadcaster aether.EventBroadcaster, namespace string) *InMemoryEventStore {
|
||||||
|
return &InMemoryEventStore{
|
||||||
|
events: make(map[string][]*aether.Event),
|
||||||
|
snapshots: make(map[string][]*aether.ActorSnapshot),
|
||||||
|
broadcaster: broadcaster,
|
||||||
|
namespace: namespace,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SaveEvent saves an event to the in-memory store.
|
// SaveEvent saves an event to the in-memory store.
|
||||||
// Returns VersionConflictError if the event's version is not strictly greater
|
// Returns VersionConflictError if the event's version is not strictly greater
|
||||||
// than the current latest version for the actor.
|
// than the current latest version for the actor.
|
||||||
|
// If a broadcaster is configured, publishes an EventStored event on success.
|
||||||
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||||
es.mu.Lock()
|
es.mu.Lock()
|
||||||
defer es.mu.Unlock()
|
defer es.mu.Unlock()
|
||||||
@@ -51,9 +67,35 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
|||||||
es.events[event.ActorID] = make([]*aether.Event, 0)
|
es.events[event.ActorID] = make([]*aether.Event, 0)
|
||||||
}
|
}
|
||||||
es.events[event.ActorID] = append(es.events[event.ActorID], event)
|
es.events[event.ActorID] = append(es.events[event.ActorID], event)
|
||||||
|
|
||||||
|
// Publish EventStored event after successful save (if broadcaster is configured)
|
||||||
|
if es.broadcaster != nil {
|
||||||
|
es.publishEventStored(event)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// publishEventStored publishes an EventStored event to the broadcaster.
|
||||||
|
// This is called after a successful SaveEvent to notify subscribers.
|
||||||
|
func (es *InMemoryEventStore) publishEventStored(originalEvent *aether.Event) {
|
||||||
|
eventStored := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: aether.EventTypeEventStored,
|
||||||
|
ActorID: originalEvent.ActorID, // EventStored is about the original actor
|
||||||
|
Version: originalEvent.Version, // Preserve the version of the stored event
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"eventId": originalEvent.ID,
|
||||||
|
"actorId": originalEvent.ActorID,
|
||||||
|
"version": originalEvent.Version,
|
||||||
|
"timestamp": originalEvent.Timestamp.Unix(),
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
es.broadcaster.Publish(es.namespace, eventStored)
|
||||||
|
}
|
||||||
|
|
||||||
// GetEvents retrieves events for an actor from a specific version
|
// GetEvents retrieves events for an actor from a specific version
|
||||||
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||||
es.mu.RLock()
|
es.mu.RLock()
|
||||||
|
|||||||
@@ -1905,3 +1905,181 @@ func TestSaveEvent_MetadataPreservedAcrossMultipleEvents(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// === EventStored Publishing Tests ===
|
||||||
|
|
||||||
|
func TestSaveEvent_WithBroadcaster_PublishesEventStored(t *testing.T) {
|
||||||
|
// Create a mock broadcaster to capture published events
|
||||||
|
broadcaster := aether.NewEventBus()
|
||||||
|
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||||
|
|
||||||
|
// Subscribe to EventStored events
|
||||||
|
ch := broadcaster.Subscribe("test-namespace")
|
||||||
|
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: "evt-123",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"total": 100.50,
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save event
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if EventStored was published
|
||||||
|
select {
|
||||||
|
case publishedEvent := <-ch:
|
||||||
|
if publishedEvent == nil {
|
||||||
|
t.Fatal("received nil event from broadcaster")
|
||||||
|
}
|
||||||
|
if publishedEvent.EventType != aether.EventTypeEventStored {
|
||||||
|
t.Errorf("expected EventType %q, got %q", aether.EventTypeEventStored, publishedEvent.EventType)
|
||||||
|
}
|
||||||
|
if publishedEvent.ActorID != "order-456" {
|
||||||
|
t.Errorf("expected ActorID %q, got %q", "order-456", publishedEvent.ActorID)
|
||||||
|
}
|
||||||
|
if publishedEvent.Version != 1 {
|
||||||
|
t.Errorf("expected Version 1, got %d", publishedEvent.Version)
|
||||||
|
}
|
||||||
|
// Check data contains original event info
|
||||||
|
if publishedEvent.Data["eventId"] != "evt-123" {
|
||||||
|
t.Errorf("expected eventId %q, got %q", "evt-123", publishedEvent.Data["eventId"])
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("timeout waiting for EventStored event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSaveEvent_VersionConflict_NoEventStored(t *testing.T) {
|
||||||
|
broadcaster := aether.NewEventBus()
|
||||||
|
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||||
|
|
||||||
|
// Subscribe to EventStored events
|
||||||
|
ch := broadcaster.Subscribe("test-namespace")
|
||||||
|
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||||
|
|
||||||
|
// Save first event
|
||||||
|
event1 := &aether.Event{
|
||||||
|
ID: "evt-1",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(event1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent(event1) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drain the first EventStored event
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("timeout waiting for first EventStored event")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to save event with non-increasing version (should fail)
|
||||||
|
event2 := &aether.Event{
|
||||||
|
ID: "evt-2",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 1, // Same version, should conflict
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event2)
|
||||||
|
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||||
|
t.Fatalf("expected ErrVersionConflict, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify no EventStored event was published
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
t.Fatal("expected no EventStored event, but received one")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
// Expected - no event published
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSaveEvent_MultipleEvents_PublishesMultipleEventStored(t *testing.T) {
|
||||||
|
broadcaster := aether.NewEventBus()
|
||||||
|
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||||
|
|
||||||
|
// Subscribe to EventStored events
|
||||||
|
ch := broadcaster.Subscribe("test-namespace")
|
||||||
|
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||||
|
|
||||||
|
// Save multiple events
|
||||||
|
for i := int64(1); i <= 3; i++ {
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: fmt.Sprintf("evt-%d", i),
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: i,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we received 3 EventStored events in order
|
||||||
|
for i := int64(1); i <= 3; i++ {
|
||||||
|
select {
|
||||||
|
case publishedEvent := <-ch:
|
||||||
|
if publishedEvent == nil {
|
||||||
|
t.Fatal("received nil event from broadcaster")
|
||||||
|
}
|
||||||
|
if publishedEvent.Version != i {
|
||||||
|
t.Errorf("expected Version %d, got %d", i, publishedEvent.Version)
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatalf("timeout waiting for EventStored event %d", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSaveEvent_WithoutBroadcaster_NoPanic(t *testing.T) {
|
||||||
|
// Test that SaveEvent works without a broadcaster (nil broadcaster)
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: "evt-123",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"total": 100.50,
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should not panic even though broadcaster is nil
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify event was saved
|
||||||
|
events, err := store.GetEvents("order-456", 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents failed: %v", err)
|
||||||
|
}
|
||||||
|
if len(events) != 1 {
|
||||||
|
t.Fatalf("expected 1 event, got %d", len(events))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user