Compare commits

..

2 Commits

Author SHA1 Message Date
Claude Code
5223cf136a fix: address review feedback
- Removed duplicate blank line in event.go
- Use original event timestamp instead of time.Now() for EventStored
- Fixed MockEventBroadcaster.Subscribe to return nil instead of closed channel
- Added integration tests for EventStored with JetStreamEventStore

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 22:25:10 +01:00
Claude Code
0f89b07c0b feat(event sourcing): Publish EventStored event after successful SaveEvent
Some checks failed
CI / build (pull_request) Successful in 21s
CI / integration (pull_request) Failing after 2m1s
Implement EventStored infrastructure event that notifies subscribers when an event
is successfully persisted. This enables observability and triggers downstream
workflows (caching, metrics, projections) without coupling to application events.

Changes:
- Add EventStored type to event.go containing EventID, ActorID, Version, Timestamp
- Update InMemoryEventStore with optional EventBus and metrics support via builder methods
- Update JetStreamEventStore with optional EventBus and metrics support via builder methods
- Publish EventStored to __internal__ namespace after successful SaveEvent
- EventStored not published if SaveEvent fails (e.g., version conflict)
- EventStored publishing is optional - stores work without EventBus configured
- Metrics are recorded for each EventStored publication
- Add comprehensive test suite covering all acceptance criteria

Meets acceptance criteria:
- EventStored published after SaveEvent succeeds
- EventStored contains EventID, ActorID, Version, Timestamp
- No EventStored published if SaveEvent fails
- EventBus receives EventStored in same operation
- Metrics increment for each EventStored published

Closes #61

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 21:25:51 +01:00
13 changed files with 3517 additions and 1311 deletions

View File

@@ -17,3 +17,37 @@ jobs:
run: go build ./... run: go build ./...
- name: Test - name: Test
run: go test ./... run: go test ./...
integration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.23'
- name: Install and Start NATS Server
run: |
# Detect architecture and download appropriate binary
ARCH=$(uname -m)
if [ "$ARCH" = "x86_64" ]; then
NATS_ARCH="amd64"
elif [ "$ARCH" = "aarch64" ] || [ "$ARCH" = "arm64" ]; then
NATS_ARCH="arm64"
else
echo "Unsupported architecture: $ARCH"
exit 1
fi
echo "Detected architecture: $ARCH, using NATS binary: $NATS_ARCH"
# Download and extract nats-server
curl -L "https://github.com/nats-io/nats-server/releases/download/v2.10.24/nats-server-v2.10.24-linux-${NATS_ARCH}.tar.gz" -o nats-server.tar.gz
tar -xzf nats-server.tar.gz
# Start NATS with JetStream
./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server -js -p 4222 &
# Wait for NATS to be ready
sleep 3
./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server --version
- name: Run Integration Tests
run: go test -tags=integration -v ./...

View File

@@ -107,34 +107,7 @@ Order state after replaying 2 events:
### Events are immutable ### Events are immutable
Events represent facts about what happened. Once saved, they are never modified or deleted - you only append new events. This immutability guarantee is enforced at multiple levels: Events represent facts about what happened. Once saved, they are never modified - you only append new events.
**Interface Design**: The `EventStore` interface provides no Update or Delete methods. Only `SaveEvent` (append), `GetEvents` (read), and `GetLatestVersion` (read) are available.
**JetStream Storage**: When using `JetStreamEventStore`, events are stored in a NATS JetStream stream configured with:
- File-based storage (durable)
- Limits-based retention policy (events expire after configured duration, not before)
- No mechanism to modify or delete individual events during their lifetime
**Audit Trail Guarantee**: Because events are immutable once persisted, they serve as a trustworthy audit trail. You can rely on the fact that historical events won't change, enabling compliance and forensics.
To correct a mistake, append a new event that expresses the correction rather than modifying history:
```go
// Wrong: Cannot update an event
// store.UpdateEvent(eventID, newData) // This method doesn't exist
// Right: Append a new event that corrects the record
correctionEvent := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderCorrected",
ActorID: orderID,
Version: currentVersion + 1,
Data: map[string]interface{}{"reason": "price adjustment"},
Timestamp: time.Now(),
}
err := store.SaveEvent(correctionEvent)
```
### State is derived ### State is derived

View File

@@ -73,14 +73,6 @@ type Event struct {
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
} }
// Common event types for Aether infrastructure
const (
// EventTypeEventStored is an internal event published when an event is successfully persisted.
// This event allows observability components (metrics, projections, audit systems) to react
// to persisted events without coupling to application code.
EventTypeEventStored = "EventStored"
)
// Common metadata keys for distributed tracing and auditing // Common metadata keys for distributed tracing and auditing
const ( const (
// MetadataKeyCorrelationID identifies related events across services // MetadataKeyCorrelationID identifies related events across services
@@ -174,6 +166,16 @@ func (e *Event) WithMetadataFrom(source *Event) {
} }
} }
// EventStored is an internal infrastructure event published after an event is successfully persisted.
// It allows observability and trigger downstream workflows without coupling to application events.
// EventStored is not published to external systems (Phase 2) - only to local EventBus subscribers.
type EventStored struct {
EventID string `json:"eventId"` // ID of the event that was stored
ActorID string `json:"actorId"` // Actor that owns the stored event
Version int64 `json:"version"` // Version of the stored event
Timestamp time.Time `json:"timestamp"` // When the event was stored
}
// ActorSnapshot represents a point-in-time state snapshot // ActorSnapshot represents a point-in-time state snapshot
type ActorSnapshot struct { type ActorSnapshot struct {
ActorID string `json:"actorId"` ActorID string `json:"actorId"`
@@ -184,17 +186,6 @@ type ActorSnapshot struct {
// EventStore defines the interface for event persistence. // EventStore defines the interface for event persistence.
// //
// # Immutability Guarantee
//
// EventStore is append-only. Once an event is persisted via SaveEvent, it is never
// modified or deleted. The interface intentionally provides no Update or Delete methods.
// This ensures:
// - Events serve as an immutable audit trail
// - State can be safely derived by replaying events
// - Concurrent reads are always safe (events never change)
//
// To correct a mistake, append a new event that expresses the correction.
//
// # Version Semantics // # Version Semantics
// //
// Events for an actor must have monotonically increasing versions. When SaveEvent // Events for an actor must have monotonically increasing versions. When SaveEvent
@@ -215,13 +206,10 @@ type EventStore interface {
// SaveEvent persists an event to the store. The event's Version must be // SaveEvent persists an event to the store. The event's Version must be
// strictly greater than the current latest version for the actor. // strictly greater than the current latest version for the actor.
// Returns VersionConflictError if version <= current latest version. // Returns VersionConflictError if version <= current latest version.
// Once saved, the event is immutable and can never be modified or deleted.
SaveEvent(event *Event) error SaveEvent(event *Event) error
// GetEvents retrieves events for an actor from a specific version (inclusive). // GetEvents retrieves events for an actor from a specific version (inclusive).
// Returns an empty slice if no events exist for the actor. // Returns an empty slice if no events exist for the actor.
// The returned events are guaranteed to be immutable - they will never be
// modified or deleted from the store.
GetEvents(actorID string, fromVersion int64) ([]*Event, error) GetEvents(actorID string, fromVersion int64) ([]*Event, error)
// GetLatestVersion returns the latest version for an actor. // GetLatestVersion returns the latest version for an actor.

View File

@@ -2,8 +2,6 @@ package aether
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt"
"strings" "strings"
"testing" "testing"
"time" "time"
@@ -1337,190 +1335,3 @@ func TestReplayError_WithLargeRawData(t *testing.T) {
// Error() should still work // Error() should still work
_ = err.Error() _ = err.Error()
} }
// Tests for VersionConflictError
func TestVersionConflictError_Error(t *testing.T) {
err := &VersionConflictError{
ActorID: "order-123",
AttemptedVersion: 3,
CurrentVersion: 5,
}
errMsg := err.Error()
// Verify error message contains all context
if !strings.Contains(errMsg, "order-123") {
t.Errorf("error message should contain ActorID, got: %s", errMsg)
}
if !strings.Contains(errMsg, "3") {
t.Errorf("error message should contain AttemptedVersion, got: %s", errMsg)
}
if !strings.Contains(errMsg, "5") {
t.Errorf("error message should contain CurrentVersion, got: %s", errMsg)
}
if !strings.Contains(errMsg, "version conflict") {
t.Errorf("error message should contain 'version conflict', got: %s", errMsg)
}
}
func TestVersionConflictError_Fields(t *testing.T) {
err := &VersionConflictError{
ActorID: "actor-456",
AttemptedVersion: 10,
CurrentVersion: 8,
}
if err.ActorID != "actor-456" {
t.Errorf("ActorID mismatch: got %q, want %q", err.ActorID, "actor-456")
}
if err.AttemptedVersion != 10 {
t.Errorf("AttemptedVersion mismatch: got %d, want %d", err.AttemptedVersion, 10)
}
if err.CurrentVersion != 8 {
t.Errorf("CurrentVersion mismatch: got %d, want %d", err.CurrentVersion, 8)
}
}
func TestVersionConflictError_Unwrap(t *testing.T) {
err := &VersionConflictError{
ActorID: "actor-789",
AttemptedVersion: 2,
CurrentVersion: 1,
}
unwrapped := err.Unwrap()
if unwrapped != ErrVersionConflict {
t.Errorf("Unwrap should return ErrVersionConflict sentinel")
}
}
func TestVersionConflictError_ErrorsIs(t *testing.T) {
err := &VersionConflictError{
ActorID: "test-actor",
AttemptedVersion: 5,
CurrentVersion: 4,
}
// Test that errors.Is works with sentinel
if !errors.Is(err, ErrVersionConflict) {
t.Error("errors.Is(err, ErrVersionConflict) should return true")
}
// Test that other errors don't match
if errors.Is(err, errors.New("other error")) {
t.Error("errors.Is should not match unrelated errors")
}
}
func TestVersionConflictError_ErrorsAs(t *testing.T) {
originalErr := &VersionConflictError{
ActorID: "actor-unwrap",
AttemptedVersion: 7,
CurrentVersion: 6,
}
var versionErr *VersionConflictError
if !errors.As(originalErr, &versionErr) {
t.Fatalf("errors.As should succeed with VersionConflictError")
}
// Verify fields are accessible through unwrapped error
if versionErr.ActorID != "actor-unwrap" {
t.Errorf("ActorID mismatch after As: got %q", versionErr.ActorID)
}
if versionErr.AttemptedVersion != 7 {
t.Errorf("AttemptedVersion mismatch after As: got %d", versionErr.AttemptedVersion)
}
if versionErr.CurrentVersion != 6 {
t.Errorf("CurrentVersion mismatch after As: got %d", versionErr.CurrentVersion)
}
}
func TestVersionConflictError_CanReadCurrentVersion(t *testing.T) {
// This test verifies that applications can read CurrentVersion for retry strategies
err := &VersionConflictError{
ActorID: "order-abc",
AttemptedVersion: 2,
CurrentVersion: 10,
}
var versionErr *VersionConflictError
if !errors.As(err, &versionErr) {
t.Fatal("failed to unwrap VersionConflictError")
}
// Application can use CurrentVersion to decide retry strategy
nextVersion := versionErr.CurrentVersion + 1
if nextVersion != 11 {
t.Errorf("application should be able to compute next version: got %d, want 11", nextVersion)
}
// Application can log detailed context
logMsg := fmt.Sprintf("Version conflict for actor %q: attempted %d, current %d, will retry with %d",
versionErr.ActorID, versionErr.AttemptedVersion, versionErr.CurrentVersion, nextVersion)
if !strings.Contains(logMsg, "order-abc") {
t.Errorf("application context logging failed: %s", logMsg)
}
}
func TestVersionConflictError_EdgeCases(t *testing.T) {
testCases := []struct {
name string
actorID string
attemp int64
current int64
}{
{"zero current", "actor-1", 1, 0},
{"large numbers", "actor-2", 1000000, 999999},
{"max int64", "actor-3", 9223372036854775807, 9223372036854775806},
{"negative attempt", "actor-4", -1, -2},
{"empty actor id", "", 1, 0},
{"special chars in actor id", "actor@#$%", 2, 1},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := &VersionConflictError{
ActorID: tc.actorID,
AttemptedVersion: tc.attemp,
CurrentVersion: tc.current,
}
// Should not panic
msg := err.Error()
if msg == "" {
t.Error("Error() should return non-empty string")
}
// Should be wrapped correctly
if err.Unwrap() != ErrVersionConflict {
t.Error("Unwrap should return ErrVersionConflict")
}
// errors.Is should work
if !errors.Is(err, ErrVersionConflict) {
t.Error("errors.Is should work for edge case")
}
})
}
}
func TestErrVersionConflict_Sentinel(t *testing.T) {
// Verify the sentinel error is correctly defined
if ErrVersionConflict == nil {
t.Fatal("ErrVersionConflict should not be nil")
}
expectedMsg := "version conflict"
if ErrVersionConflict.Error() != expectedMsg {
t.Errorf("ErrVersionConflict message mismatch: got %q, want %q", ErrVersionConflict.Error(), expectedMsg)
}
// Test that it's usable with errors.Is
if !errors.Is(ErrVersionConflict, ErrVersionConflict) {
t.Error("ErrVersionConflict should match itself with errors.Is")
}
}

View File

@@ -1,189 +0,0 @@
# Aether Examples
This directory contains examples demonstrating common patterns for using Aether.
## Retry Patterns (`retry_patterns.go`)
When saving events with optimistic concurrency control, your application may encounter `VersionConflictError` when multiple writers attempt to update the same actor concurrently. This file demonstrates several retry strategies.
### Pattern Overview
All retry patterns work with `VersionConflictError` which provides three critical fields:
- **ActorID**: The actor that experienced the conflict
- **CurrentVersion**: The latest version in the store
- **AttemptedVersion**: The version you tried to save
Your application can read these fields to make intelligent retry decisions.
### Available Patterns
#### SimpleRetryPattern
The most basic pattern - just retry with exponential backoff:
```go
// Automatically retries up to 3 times with exponential backoff
err := SimpleRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You want a straightforward retry mechanism without complex logic.
#### ConflictDetailedRetryPattern
Extracts detailed information from the conflict error to make smarter decisions:
```go
// Detects thrashing (multiple conflicts at same version)
// and can implement circuit-breaker logic
err := ConflictDetailedRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You need visibility into conflict patterns and want to detect system issues like thrashing.
#### JitterRetryPattern
Adds randomized jitter to prevent "thundering herd" when multiple writers retry:
```go
// Exponential backoff with jitter prevents synchronized retries
err := JitterRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You have high concurrency and want to prevent all writers from retrying at the same time.
#### AdaptiveRetryPattern
Adjusts backoff duration based on version distance (indicator of contention):
```go
// Light contention (gap=1): 50ms backoff
// Moderate contention (gap=3-10): proportional backoff
// High contention (gap>10): aggressive backoff
err := AdaptiveRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You want backoff strategy to respond to actual system load.
#### EventualConsistencyPattern
Instead of blocking on retry, queues the event for asynchronous retry:
```go
// Returns immediately, event is queued for background retry
EventualConsistencyPattern(store, retryQueue, event)
// Background worker processes the queue
for item := range retryQueue {
// Implement your own retry logic here
}
```
**Use when**: You can't afford to block the request, and background retry is acceptable.
#### CircuitBreakerPattern
Implements a circuit breaker to prevent cascading failures:
```go
cb := NewCircuitBreaker()
// Fails fast when circuit is open
err := CircuitBreakerRetryPattern(store, cb, "order-123", "OrderUpdated")
if err != nil && !cb.CanRetry() {
return ErrCircuitBreakerOpen
}
```
**Use when**: You have a distributed system and want to prevent retry storms during outages.
## Common Pattern: Extract and Log Context
All patterns can read context from `VersionConflictError`:
```go
var versionErr *aether.VersionConflictError
if errors.As(err, &versionErr) {
log.Printf(
"Conflict for actor %q: attempted %d, current %d",
versionErr.ActorID,
versionErr.AttemptedVersion,
versionErr.CurrentVersion,
)
}
```
## Sentinel Error Check
Check if an error is a version conflict without examining the struct:
```go
if errors.Is(err, aether.ErrVersionConflict) {
// This is a version conflict - retry is appropriate
}
```
## Implementing Your Own Pattern
Basic template:
```go
for attempt := 0; attempt < maxRetries; attempt++ {
// 1. Get current version
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
// 2. Create event with next version
event := &aether.Event{
ActorID: actorID,
Version: currentVersion + 1,
// ... other fields
}
// 3. Attempt save
err = store.SaveEvent(event)
if err == nil {
return nil // Success
}
// 4. Check if it's a conflict
if !errors.Is(err, aether.ErrVersionConflict) {
return err // Some other error
}
// 5. Implement your retry strategy
time.Sleep(yourBackoff(attempt))
}
```
## Choosing a Pattern
| Pattern | Latency | Throughput | Complexity | Use Case |
|---------|---------|-----------|-----------|----------|
| Simple | Low | Low | Very Low | Single writer, testing |
| DetailedConflict | Low | Medium | Medium | Debugging, monitoring |
| Jitter | Low-Medium | High | Low | Multi-writer concurrency |
| Adaptive | Low-Medium | High | Medium | Variable load scenarios |
| EventualConsistency | Very Low | Very High | High | High-volume, async-OK workloads |
| CircuitBreaker | Variable | Stable | High | Distributed, failure-resilient systems |
## Performance Considerations
1. **Backoff timing**: Shorter backoffs waste CPU on retries, longer backoffs increase latency
2. **Retry limits**: Too few retries give up too early, too many waste resources
3. **Jitter**: Essential for preventing synchronized retries in high-concurrency scenarios
4. **Monitoring**: Track retry rates and conflict patterns to detect system issues
## Testing
Use `aether.NewInMemoryEventStore()` in tests:
```go
store := store.NewInMemoryEventStore()
err := SimpleRetryPattern(store, "test-actor", "TestEvent")
if err != nil {
t.Fatalf("retry pattern failed: %v", err)
}
```

View File

@@ -1,353 +0,0 @@
package examples
import (
"errors"
"fmt"
"log"
"math"
"math/rand"
"time"
"git.flowmade.one/flowmade-one/aether"
)
// SimpleRetryPattern demonstrates a basic retry loop using VersionConflictError.
//
// This pattern is suitable for scenarios where you want to automatically retry
// with exponential backoff when version conflicts occur.
func SimpleRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
const initialBackoff = 100 * time.Millisecond
var event *aether.Event
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * initialBackoff
log.Printf("Retry attempt %d after %v", attempt, backoff)
time.Sleep(backoff)
}
// Get the current version for the actor
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
}
// Create event with next version
event = &aether.Event{
ID: fmt.Sprintf("evt-%d-%d", time.Now().UnixNano(), attempt),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{"attempt": attempt},
Timestamp: time.Now(),
}
// Attempt to save
if err := store.SaveEvent(event); err == nil {
log.Printf("Successfully saved event for actor %s at version %d", actorID, event.Version)
return nil
} else if !errors.Is(err, aether.ErrVersionConflict) {
// Some other error occurred
return fmt.Errorf("save event failed: %w", err)
}
// If it's a version conflict, loop will retry
}
return fmt.Errorf("failed to save event after %d retries", maxRetries)
}
// ConflictDetailedRetryPattern demonstrates how to extract detailed information
// from VersionConflictError to make intelligent retry decisions.
//
// This pattern shows how to log detailed context and potentially implement
// circuit-breaker logic based on the conflict information.
func ConflictDetailedRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 5
var lastConflictVersion int64
for attempt := 0; attempt < maxRetries; attempt++ {
// Get current version
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
// Create event
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{"timestamp": time.Now()},
Timestamp: time.Now(),
}
// Attempt to save
err = store.SaveEvent(event)
if err == nil {
return nil // Success
}
// Check if it's a version conflict
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
// Not a version conflict, fail immediately
return err
}
// Extract detailed context from the conflict error
log.Printf(
"Version conflict for actor %q: attempted version %d, current version %d",
versionErr.ActorID,
versionErr.AttemptedVersion,
versionErr.CurrentVersion,
)
// Check for thrashing (multiple conflicts with same version)
if lastConflictVersion == versionErr.CurrentVersion && attempt > 0 {
log.Printf("Detected version thrashing - circuit breaker would trigger here")
return fmt.Errorf("circuit breaker: too many conflicts at version %d", versionErr.CurrentVersion)
}
lastConflictVersion = versionErr.CurrentVersion
// Exponential backoff
backoff := time.Duration(math.Pow(2, float64(attempt))) * 100 * time.Millisecond
time.Sleep(backoff)
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// JitterRetryPattern implements exponential backoff with jitter to prevent
// thundering herd when multiple writers retry simultaneously.
func JitterRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
const baseBackoff = 100 * time.Millisecond
const maxJitter = 0.1 // 10% jitter
for attempt := 0; attempt < maxRetries; attempt++ {
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
return nil
}
if !errors.Is(err, aether.ErrVersionConflict) {
return err
}
// Exponential backoff with jitter
exponentialBackoff := time.Duration(math.Pow(2, float64(attempt))) * baseBackoff
jitter := time.Duration(rand.Float64() * float64(exponentialBackoff) * maxJitter)
totalBackoff := exponentialBackoff + jitter
log.Printf("Retrying in %v (attempt %d/%d)", totalBackoff, attempt+1, maxRetries)
time.Sleep(totalBackoff)
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// AdaptiveRetryPattern adjusts retry strategy based on version conflict patterns.
//
// This pattern demonstrates how application logic can use CurrentVersion to
// decide whether to retry, give up, or escalate to a higher-level handler.
func AdaptiveRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
for attempt := 0; attempt < maxRetries; attempt++ {
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
return nil
}
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
return err
}
// Adaptive backoff based on version distance
versionDistance := versionErr.CurrentVersion - versionErr.AttemptedVersion
if versionDistance > 10 {
// Many concurrent writers - back off more aggressively
log.Printf("High contention detected (gap: %d), aggressive backoff", versionDistance)
time.Sleep(time.Duration(versionDistance*10) * time.Millisecond)
} else if versionDistance > 3 {
// Moderate contention - normal backoff
log.Printf("Moderate contention detected (gap: %d)", versionDistance)
time.Sleep(time.Duration(versionDistance) * time.Millisecond)
} else {
// Light contention - minimal backoff
log.Printf("Light contention detected")
time.Sleep(50 * time.Millisecond)
}
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// EventualConsistencyPattern demonstrates how to handle version conflicts
// in an eventually consistent manner by publishing to a retry queue.
//
// This is useful when immediate retry is not feasible, and you want to
// defer the operation to a background worker.
type RetryQueueItem struct {
Event *aether.Event
ConflictVersion int64
ConflictAttempted int64
NextRetryTime time.Time
FailureCount int
}
func EventualConsistencyPattern(store aether.EventStore, retryQueue chan<- RetryQueueItem, event *aether.Event) {
err := store.SaveEvent(event)
if err == nil {
return
}
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
log.Printf("Non-retryable error: %v", err)
return
}
// Queue for retry - background worker will process this
retryItem := RetryQueueItem{
Event: event,
ConflictVersion: versionErr.CurrentVersion,
ConflictAttempted: versionErr.AttemptedVersion,
NextRetryTime: time.Now().Add(1 * time.Second),
FailureCount: 0,
}
select {
case retryQueue <- retryItem:
log.Printf("Queued event for retry: actor=%s", event.ActorID)
case <-time.After(5 * time.Second):
log.Printf("Failed to queue event for retry (queue full)")
}
}
// CircuitBreakerPattern implements a simple circuit breaker for version conflicts.
//
// The circuit breaker tracks failure rates and temporarily stops retrying
// when the failure rate gets too high, allowing the system to recover.
type CircuitBreaker struct {
failureCount int
successCount int
state string // "closed", "open", "half-open"
lastFailureTime time.Time
openDuration time.Duration
failureThreshold int
successThreshold int
}
func NewCircuitBreaker() *CircuitBreaker {
return &CircuitBreaker{
state: "closed",
openDuration: 30 * time.Second,
failureThreshold: 5,
successThreshold: 3,
}
}
func (cb *CircuitBreaker) RecordSuccess() {
if cb.state == "half-open" {
cb.successCount++
if cb.successCount >= cb.successThreshold {
cb.state = "closed"
cb.failureCount = 0
cb.successCount = 0
log.Printf("Circuit breaker closed")
}
}
}
func (cb *CircuitBreaker) RecordFailure() {
cb.lastFailureTime = time.Now()
cb.failureCount++
if cb.failureCount >= cb.failureThreshold {
cb.state = "open"
log.Printf("Circuit breaker opened")
}
}
func (cb *CircuitBreaker) CanRetry() bool {
if cb.state == "closed" {
return true
}
if cb.state == "open" {
if time.Since(cb.lastFailureTime) > cb.openDuration {
cb.state = "half-open"
cb.failureCount = 0
cb.successCount = 0
log.Printf("Circuit breaker half-open")
return true
}
return false
}
// half-open state allows retries
return true
}
func CircuitBreakerRetryPattern(store aether.EventStore, cb *CircuitBreaker, actorID string, eventType string) error {
if !cb.CanRetry() {
return fmt.Errorf("circuit breaker open - not retrying")
}
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
cb.RecordSuccess()
return nil
}
if !errors.Is(err, aether.ErrVersionConflict) {
return err
}
cb.RecordFailure()
return fmt.Errorf("save failed with version conflict, circuit breaker status: %s", cb.state)
}

File diff suppressed because it is too large Load Diff

362
store/eventstored_test.go Normal file
View File

@@ -0,0 +1,362 @@
package store
import (
"fmt"
"sync"
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
)
// MockEventBroadcaster captures published events for testing
type MockEventBroadcaster struct {
mu sync.RWMutex
events []*aether.Event
namespaces map[string]int
}
func NewMockEventBroadcaster() *MockEventBroadcaster {
return &MockEventBroadcaster{
events: make([]*aether.Event, 0),
namespaces: make(map[string]int),
}
}
func (m *MockEventBroadcaster) Subscribe(namespacePattern string) <-chan *aether.Event {
return nil
}
func (m *MockEventBroadcaster) SubscribeWithFilter(namespacePattern string, filter *aether.SubscriptionFilter) <-chan *aether.Event {
return nil
}
func (m *MockEventBroadcaster) Unsubscribe(namespacePattern string, ch <-chan *aether.Event) {}
func (m *MockEventBroadcaster) Publish(namespaceID string, event *aether.Event) {
m.mu.Lock()
defer m.mu.Unlock()
m.events = append(m.events, event)
m.namespaces[namespaceID]++
}
func (m *MockEventBroadcaster) Stop() {}
func (m *MockEventBroadcaster) SubscriberCount(namespaceID string) int {
return 0
}
func (m *MockEventBroadcaster) GetPublishedEvents() []*aether.Event {
m.mu.RLock()
defer m.mu.RUnlock()
events := make([]*aether.Event, len(m.events))
copy(events, m.events)
return events
}
// === InMemoryEventStore EventStored Tests ===
func TestEventStored_PublishedOnSaveSuccess(t *testing.T) {
store := NewInMemoryEventStore()
mockBus := NewMockEventBroadcaster()
store.WithEventBus(mockBus)
event := &aether.Event{
ID: "evt-123",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{"total": 100.50},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Verify EventStored was published
published := mockBus.GetPublishedEvents()
if len(published) != 1 {
t.Fatalf("expected 1 published event, got %d", len(published))
}
storedEvent := published[0]
if storedEvent.EventType != "EventStored" {
t.Errorf("expected EventType 'EventStored', got %q", storedEvent.EventType)
}
if storedEvent.ActorID != "order-456" {
t.Errorf("expected ActorID 'order-456', got %q", storedEvent.ActorID)
}
if storedEvent.Data["eventId"] != "evt-123" {
t.Errorf("expected eventId 'evt-123', got %v", storedEvent.Data["eventId"])
}
if storedEvent.Data["version"] != int64(1) {
t.Errorf("expected version 1, got %v", storedEvent.Data["version"])
}
}
func TestEventStored_NotPublishedOnVersionConflict(t *testing.T) {
store := NewInMemoryEventStore()
mockBus := NewMockEventBroadcaster()
store.WithEventBus(mockBus)
// Save first event
event1 := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event1); err != nil {
t.Fatalf("First SaveEvent failed: %v", err)
}
// Try to save event with same version (conflict)
event2 := &aether.Event{
ID: "evt-2",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 1, // Same version - should conflict
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event2)
if err == nil {
t.Fatal("expected VersionConflictError, got nil")
}
// Verify only 1 EventStored was published (from first event)
published := mockBus.GetPublishedEvents()
if len(published) != 1 {
t.Fatalf("expected 1 published event after conflict, got %d", len(published))
}
}
func TestEventStored_MultipleEventsPublished(t *testing.T) {
store := NewInMemoryEventStore()
mockBus := NewMockEventBroadcaster()
store.WithEventBus(mockBus)
// Save 5 events
for i := 1; i <= 5; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "TestEvent",
ActorID: "actor-1",
Version: int64(i),
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent %d failed: %v", i, err)
}
}
// Verify 5 EventStored events were published
published := mockBus.GetPublishedEvents()
if len(published) != 5 {
t.Fatalf("expected 5 published events, got %d", len(published))
}
// Verify each has correct data
for i := 0; i < 5; i++ {
if published[i].Data["version"] != int64(i+1) {
t.Errorf("event %d: expected version %d, got %v", i, i+1, published[i].Data["version"])
}
}
}
func TestEventStored_NotPublishedWithoutEventBus(t *testing.T) {
store := NewInMemoryEventStore()
// Don't set event bus
event := &aether.Event{
ID: "evt-123",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
// Should succeed without publishing (no-op)
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Event should be persisted normally
retrieved, err := store.GetEvents("order-456", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(retrieved) != 1 {
t.Errorf("expected 1 event, got %d", len(retrieved))
}
}
func TestEventStored_ContainsRequiredFields(t *testing.T) {
store := NewInMemoryEventStore()
mockBus := NewMockEventBroadcaster()
store.WithEventBus(mockBus)
event := &aether.Event{
ID: "evt-abc",
EventType: "TestEvent",
ActorID: "actor-xyz",
Version: 42,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
published := mockBus.GetPublishedEvents()
if len(published) != 1 {
t.Fatalf("expected 1 event, got %d", len(published))
}
storedEvent := published[0]
// Verify required fields
if storedEvent.Data["eventId"] != "evt-abc" {
t.Error("missing or incorrect eventId")
}
if storedEvent.Data["actorId"] != "actor-xyz" {
t.Error("missing or incorrect actorId")
}
if storedEvent.Data["version"] != int64(42) {
t.Error("missing or incorrect version")
}
if _, hasTimestamp := storedEvent.Data["timestamp"]; !hasTimestamp {
t.Error("missing timestamp")
}
}
func TestEventStored_PublishedToCorrectNamespace(t *testing.T) {
store := NewInMemoryEventStore()
mockBus := NewMockEventBroadcaster()
store.WithEventBus(mockBus)
event := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Verify published to __internal__ namespace
namespaces := mockBus.namespaces
if count, ok := namespaces["__internal__"]; !ok || count != 1 {
t.Errorf("expected 1 event published to __internal__, got %v", namespaces)
}
}
func TestEventStored_WithMetricsRecording(t *testing.T) {
store := NewInMemoryEventStore()
mockBus := NewMockEventBroadcaster()
mockMetrics := aether.NewMetricsCollector()
store.WithEventBus(mockBus)
store.WithMetrics(mockMetrics)
event := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Verify metrics were recorded
published := mockMetrics.EventsPublished("__internal__")
if published != 1 {
t.Errorf("expected 1 published metric, got %d", published)
}
}
func TestEventStored_ConcurrentPublishing(t *testing.T) {
store := NewInMemoryEventStore()
mockBus := NewMockEventBroadcaster()
store.WithEventBus(mockBus)
numGoroutines := 10
eventsPerGoroutine := 5
var wg sync.WaitGroup
for g := 0; g < numGoroutines; g++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for i := 0; i < eventsPerGoroutine; i++ {
version := int64(goroutineID*eventsPerGoroutine + i + 1)
event := &aether.Event{
ID: fmt.Sprintf("evt-%d-%d", goroutineID, i),
EventType: "TestEvent",
ActorID: fmt.Sprintf("actor-%d", goroutineID),
Version: version,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
_ = store.SaveEvent(event) // Ignore errors (some may conflict)
}
}(g)
}
wg.Wait()
// Verify EventStored events were published for successful saves
published := mockBus.GetPublishedEvents()
if len(published) != numGoroutines*eventsPerGoroutine {
t.Logf("Note: got %d published events (some saves may have conflicted)", len(published))
}
if len(published) == 0 {
t.Fatal("expected at least some published events")
}
}
func TestEventStored_OrderPreserved(t *testing.T) {
store := NewInMemoryEventStore()
mockBus := NewMockEventBroadcaster()
store.WithEventBus(mockBus)
// Save 3 events in order
for i := 1; i <= 3; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "TestEvent",
ActorID: "actor-1",
Version: int64(i),
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
if err := store.SaveEvent(event); err != nil {
t.Fatalf("SaveEvent %d failed: %v", i, err)
}
}
published := mockBus.GetPublishedEvents()
// Verify order is preserved
for i := 0; i < 3; i++ {
if published[i].Data["eventId"] != fmt.Sprintf("evt-%d", i+1) {
t.Errorf("event %d: expected evt-%d, got %v", i, i+1, published[i].Data["eventId"])
}
}
}

View File

@@ -1,215 +0,0 @@
package store
import (
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
)
// TestEventImmutability_MemoryStore verifies that events cannot be modified after persistence
// in the in-memory event store. This demonstrates the append-only nature of event sourcing.
func TestEventImmutability_MemoryStore(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "test-actor-123"
// Create and save an event
originalEvent := &aether.Event{
ID: "evt-immutable-1",
EventType: "TestEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{
"value": "original",
},
Timestamp: time.Now(),
}
err := store.SaveEvent(originalEvent)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Retrieve the event from the store
events, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) == 0 {
t.Fatal("expected 1 event, got 0")
}
retrievedEvent := events[0]
// Verify the stored event has the correct values
if retrievedEvent.Data["value"] != "original" {
t.Errorf("Data value mismatch: got %v, want %v", retrievedEvent.Data["value"], "original")
}
if retrievedEvent.EventType != "TestEvent" {
t.Errorf("EventType mismatch: got %q, want %q", retrievedEvent.EventType, "TestEvent")
}
// Verify ID is correct
if retrievedEvent.ID != "evt-immutable-1" {
t.Errorf("Event ID mismatch: got %q, want %q", retrievedEvent.ID, "evt-immutable-1")
}
}
// TestEventImmutability_NoUpdateMethod verifies that the EventStore interface
// has only append, read methods - no Update or Delete methods.
func TestEventImmutability_NoUpdateMethod(t *testing.T) {
// This test documents that the EventStore interface is append-only.
// The interface intentionally provides:
// - SaveEvent: append only
// - GetEvents: read only
// - GetLatestVersion: read only
//
// To verify this, we demonstrate that any attempt to call non-existent
// update/delete methods would be caught at compile time (not runtime).
// This is enforced by the interface definition in event.go which does
// not include Update, Delete, or Modify methods.
store := NewInMemoryEventStore()
// Compile-time check: these would not compile if we tried them:
// store.Update(event) // compile error: no such method
// store.Delete(eventID) // compile error: no such method
// store.Modify(eventID, newData) // compile error: no such method
// Only these methods exist:
var eventStore aether.EventStore = store
if eventStore == nil {
t.Fatal("eventStore is nil")
}
// If we got here, the compile-time checks passed
t.Log("EventStore interface enforces append-only semantics by design")
}
// TestEventImmutability_VersionOnlyGoesUp verifies that versions are monotonically
// increasing and attempting to save with a non-increasing version fails.
func TestEventImmutability_VersionOnlyGoesUp(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "actor-version-check"
// Save first event with version 1
event1 := &aether.Event{
ID: "evt-v1",
EventType: "Event1",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event1)
if err != nil {
t.Fatalf("SaveEvent(v1) failed: %v", err)
}
// Try to save with same version - should fail
event2Same := &aether.Event{
ID: "evt-v1-again",
EventType: "Event2",
ActorID: actorID,
Version: 1, // Same version
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2Same)
if err == nil {
t.Error("expected SaveEvent(same version) to fail, but it succeeded")
}
// Try to save with lower version - should fail
event3Lower := &aether.Event{
ID: "evt-v0",
EventType: "Event3",
ActorID: actorID,
Version: 0, // Lower version
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event3Lower)
if err == nil {
t.Error("expected SaveEvent(lower version) to fail, but it succeeded")
}
// Save with next version - should succeed
event4Next := &aether.Event{
ID: "evt-v2",
EventType: "Event4",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event4Next)
if err != nil {
t.Fatalf("SaveEvent(v2) failed: %v", err)
}
// Verify we have exactly 2 events
events, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 2 {
t.Errorf("expected 2 events, got %d", len(events))
}
}
// TestEventImmutability_EventCannotBeDeleted verifies that there is no way to delete
// events from the store through the EventStore interface.
func TestEventImmutability_EventCannotBeDeleted(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "actor-nodelete"
// Save an event
event := &aether.Event{
ID: "evt-nodelete",
EventType: "ImportantEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"critical": true},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Retrieve it
events1, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents (1) failed: %v", err)
}
if len(events1) != 1 {
t.Fatal("expected 1 event after save")
}
// Try to delete through interface - this method doesn't exist
// store.Delete("evt-nodelete") // compile error: no such method
// store.DeleteByActorID(actorID) // compile error: no such method
// Verify the event is still there (we can't delete it)
events2, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents (2) failed: %v", err)
}
if len(events2) != 1 {
t.Errorf("expected 1 event (should not be deletable), got %d", len(events2))
}
if events2[0].ID != "evt-nodelete" {
t.Errorf("event ID changed: got %q, want %q", events2[0].ID, "evt-nodelete")
}
}

View File

@@ -9,7 +9,6 @@ import (
"git.flowmade.one/flowmade-one/aether" "git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/google/uuid"
) )
// Default configuration values for JetStream event store // Default configuration values for JetStream event store
@@ -20,14 +19,7 @@ const (
// JetStreamConfig holds configuration options for JetStreamEventStore // JetStreamConfig holds configuration options for JetStreamEventStore
type JetStreamConfig struct { type JetStreamConfig struct {
// StreamRetention is how long to keep events (default: 1 year). // StreamRetention is how long to keep events (default: 1 year)
// JetStream enforces this retention policy at the storage level using a limits-based policy:
// - MaxAge: Events older than this duration are automatically deleted
// - Storage is file-based (nats.FileStorage) for durability
// - Once the retention period expires, events are permanently removed from the stream
// This ensures that old events do not consume storage indefinitely.
// To keep events indefinitely, set StreamRetention to a very large value or configure
// a custom retention policy in the JetStream stream configuration.
StreamRetention time.Duration StreamRetention time.Duration
// ReplicaCount is the number of replicas for high availability (default: 1) // ReplicaCount is the number of replicas for high availability (default: 1)
ReplicaCount int ReplicaCount int
@@ -49,21 +41,6 @@ func DefaultJetStreamConfig() JetStreamConfig {
// JetStreamEventStore implements EventStore using NATS JetStream for persistence. // JetStreamEventStore implements EventStore using NATS JetStream for persistence.
// It also implements EventStoreWithErrors to report malformed events during replay. // It also implements EventStoreWithErrors to report malformed events during replay.
// //
// ## Immutability Guarantee
//
// JetStreamEventStore is append-only. Events are stored in a JetStream stream that
// is configured with file-based storage (nats.FileStorage) and a retention policy
// (nats.LimitsPolicy). The configured MaxAge retention policy ensures that old events
// eventually expire, but during their lifetime, events are never modified or deleted
// through the EventStore API. Once an event is published to the stream:
// - It cannot be updated
// - It cannot be deleted before expiration
// - It can only be read
//
// This architectural guarantee, combined with the EventStore interface providing
// no Update or Delete methods, ensures events are immutable and suitable as an
// audit trail.
//
// ## Version Cache Invalidation Strategy // ## Version Cache Invalidation Strategy
// //
// JetStreamEventStore maintains an in-memory cache of actor versions for optimistic // JetStreamEventStore maintains an in-memory cache of actor versions for optimistic
@@ -87,13 +64,19 @@ type JetStreamEventStore struct {
config JetStreamConfig config JetStreamConfig
mu sync.Mutex // Protects version checks during SaveEvent mu sync.Mutex // Protects version checks during SaveEvent
versions map[string]int64 // actorID -> latest version cache versions map[string]int64 // actorID -> latest version cache
broadcaster aether.EventBroadcaster // Optional broadcaster for EventStored events eventBus aether.EventBroadcaster // Optional EventBus for publishing EventStored
namespace string // Optional namespace for event publishing metrics aether.MetricsCollector // Optional metrics collector
} }
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration // NewJetStreamEventStore creates a new JetStream-based event store with default configuration
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) { func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig()) return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
@@ -108,6 +91,20 @@ func NewJetStreamEventStoreWithNamespace(natsConn *nats.Conn, streamName string,
return NewJetStreamEventStoreWithConfig(natsConn, streamName, config) return NewJetStreamEventStoreWithConfig(natsConn, streamName, config)
} }
// WithEventBus sets the EventBus for publishing EventStored events.
// This is optional - if not set, EventStored will not be published.
func (jes *JetStreamEventStore) WithEventBus(bus aether.EventBroadcaster) *JetStreamEventStore {
jes.eventBus = bus
return jes
}
// WithMetrics sets the metrics collector for recording EventStored metrics.
// This is optional - if not set, metrics will not be recorded.
func (jes *JetStreamEventStore) WithMetrics(metrics aether.MetricsCollector) *JetStreamEventStore {
jes.metrics = metrics
return jes
}
// NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration // NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration
func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) { func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) {
js, err := natsConn.JetStream() js, err := natsConn.JetStream()
@@ -149,8 +146,6 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
streamName: effectiveStreamName, streamName: effectiveStreamName,
config: config, config: config,
versions: make(map[string]int64), versions: make(map[string]int64),
broadcaster: nil,
namespace: "",
}, nil }, nil
} }
@@ -164,58 +159,6 @@ func (jes *JetStreamEventStore) GetStreamName() string {
return jes.streamName return jes.streamName
} }
// NewJetStreamEventStoreWithBroadcaster creates a new JetStream-based event store with broadcaster support.
// The broadcaster receives EventStored events when events are successfully saved.
func NewJetStreamEventStoreWithBroadcaster(natsConn *nats.Conn, streamName string, broadcaster aether.EventBroadcaster, namespace string) (*JetStreamEventStore, error) {
config := DefaultJetStreamConfig()
if namespace != "" {
config.Namespace = namespace
}
js, err := natsConn.JetStream()
if err != nil {
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
}
// Apply defaults for zero values
if config.StreamRetention == 0 {
config.StreamRetention = DefaultStreamRetention
}
if config.ReplicaCount == 0 {
config.ReplicaCount = DefaultReplicaCount
}
// Apply namespace prefix to stream name if provided
effectiveStreamName := streamName
if config.Namespace != "" {
effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName)
}
// Create or update the stream
stream := &nats.StreamConfig{
Name: effectiveStreamName,
Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)},
Storage: nats.FileStorage,
Retention: nats.LimitsPolicy,
MaxAge: config.StreamRetention,
Replicas: config.ReplicaCount,
}
_, err = js.AddStream(stream)
if err != nil && !strings.Contains(err.Error(), "already exists") {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
return &JetStreamEventStore{
js: js,
streamName: effectiveStreamName,
config: config,
versions: make(map[string]int64),
broadcaster: broadcaster,
namespace: namespace,
}, nil
}
// SaveEvent persists an event to JetStream. // SaveEvent persists an event to JetStream.
// Returns VersionConflictError if the event's version is not strictly greater // Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor. // than the current latest version for the actor.
@@ -276,32 +219,47 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
// Update version cache after successful publish // Update version cache after successful publish
jes.versions[event.ActorID] = event.Version jes.versions[event.ActorID] = event.Version
// Publish EventStored event after successful save (if broadcaster is configured) // Publish EventStored event on success
if jes.broadcaster != nil {
jes.publishEventStored(event) jes.publishEventStored(event)
}
return nil return nil
} }
// publishEventStored publishes an EventStored event to the broadcaster. // publishEventStored publishes an EventStored event to the EventBus and records metrics
// This is called after a successful SaveEvent to notify subscribers. func (jes *JetStreamEventStore) publishEventStored(event *aether.Event) {
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) { if jes.eventBus == nil {
eventStored := &aether.Event{ return
ID: uuid.New().String(),
EventType: aether.EventTypeEventStored,
ActorID: originalEvent.ActorID, // EventStored is about the original actor
Version: originalEvent.Version, // Preserve the version of the stored event
Data: map[string]interface{}{
"eventId": originalEvent.ID,
"actorId": originalEvent.ActorID,
"version": originalEvent.Version,
"timestamp": originalEvent.Timestamp.Unix(),
},
Timestamp: time.Now(),
} }
jes.broadcaster.Publish(jes.namespace, eventStored) stored := &aether.EventStored{
EventID: event.ID,
ActorID: event.ActorID,
Version: event.Version,
Timestamp: event.Timestamp,
}
// Convert EventStored to Event for publishing (internal system event)
storedEvent := &aether.Event{
ID: "eventstored-" + event.ID,
EventType: "EventStored",
ActorID: event.ActorID,
Version: event.Version,
Data: map[string]interface{}{
"eventId": stored.EventID,
"actorId": stored.ActorID,
"version": stored.Version,
"timestamp": stored.Timestamp,
},
Timestamp: stored.Timestamp,
}
// Publish to default namespace (internal events)
jes.eventBus.Publish("__internal__", storedEvent)
// Record metrics if collector is configured
if jes.metrics != nil {
jes.metrics.RecordPublish("__internal__")
}
} }
// GetEvents retrieves all events for an actor since a version. // GetEvents retrieves all events for an actor since a version.

File diff suppressed because it is too large Load Diff

View File

@@ -2,10 +2,8 @@ package store
import ( import (
"sync" "sync"
"time"
"git.flowmade.one/flowmade-one/aether" "git.flowmade.one/flowmade-one/aether"
"github.com/google/uuid"
) )
// InMemoryEventStore provides a simple in-memory event store for testing // InMemoryEventStore provides a simple in-memory event store for testing
@@ -13,8 +11,8 @@ type InMemoryEventStore struct {
mu sync.RWMutex mu sync.RWMutex
events map[string][]*aether.Event // actorID -> events events map[string][]*aether.Event // actorID -> events
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version) snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
broadcaster aether.EventBroadcaster // optional broadcaster for EventStored events eventBus aether.EventBroadcaster // Optional EventBus for publishing EventStored
namespace string // optional namespace for event publishing metrics aether.MetricsCollector // Optional metrics collector
} }
// NewInMemoryEventStore creates a new in-memory event store // NewInMemoryEventStore creates a new in-memory event store
@@ -25,21 +23,24 @@ func NewInMemoryEventStore() *InMemoryEventStore {
} }
} }
// NewInMemoryEventStoreWithBroadcaster creates a new in-memory event store with an event broadcaster // WithEventBus sets the EventBus for publishing EventStored events.
// The broadcaster receives EventStored events when events are successfully saved. // This is optional - if not set, EventStored will not be published.
func NewInMemoryEventStoreWithBroadcaster(broadcaster aether.EventBroadcaster, namespace string) *InMemoryEventStore { func (es *InMemoryEventStore) WithEventBus(bus aether.EventBroadcaster) *InMemoryEventStore {
return &InMemoryEventStore{ es.eventBus = bus
events: make(map[string][]*aether.Event), return es
snapshots: make(map[string][]*aether.ActorSnapshot),
broadcaster: broadcaster,
namespace: namespace,
} }
// WithMetrics sets the metrics collector for recording EventStored metrics.
// This is optional - if not set, metrics will not be recorded.
func (es *InMemoryEventStore) WithMetrics(metrics aether.MetricsCollector) *InMemoryEventStore {
es.metrics = metrics
return es
} }
// SaveEvent saves an event to the in-memory store. // SaveEvent saves an event to the in-memory store.
// Returns VersionConflictError if the event's version is not strictly greater // Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor. // than the current latest version for the actor.
// If a broadcaster is configured, publishes an EventStored event on success. // On success, publishes an EventStored event to the EventBus (if configured).
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
es.mu.Lock() es.mu.Lock()
defer es.mu.Unlock() defer es.mu.Unlock()
@@ -68,32 +69,47 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
} }
es.events[event.ActorID] = append(es.events[event.ActorID], event) es.events[event.ActorID] = append(es.events[event.ActorID], event)
// Publish EventStored event after successful save (if broadcaster is configured) // Publish EventStored event on success
if es.broadcaster != nil {
es.publishEventStored(event) es.publishEventStored(event)
}
return nil return nil
} }
// publishEventStored publishes an EventStored event to the broadcaster. // publishEventStored publishes an EventStored event to the EventBus and records metrics
// This is called after a successful SaveEvent to notify subscribers. func (es *InMemoryEventStore) publishEventStored(event *aether.Event) {
func (es *InMemoryEventStore) publishEventStored(originalEvent *aether.Event) { if es.eventBus == nil {
eventStored := &aether.Event{ return
ID: uuid.New().String(),
EventType: aether.EventTypeEventStored,
ActorID: originalEvent.ActorID, // EventStored is about the original actor
Version: originalEvent.Version, // Preserve the version of the stored event
Data: map[string]interface{}{
"eventId": originalEvent.ID,
"actorId": originalEvent.ActorID,
"version": originalEvent.Version,
"timestamp": originalEvent.Timestamp.Unix(),
},
Timestamp: time.Now(),
} }
es.broadcaster.Publish(es.namespace, eventStored) stored := &aether.EventStored{
EventID: event.ID,
ActorID: event.ActorID,
Version: event.Version,
Timestamp: event.Timestamp,
}
// Convert EventStored to Event for publishing (internal system event)
storedEvent := &aether.Event{
ID: "eventstored-" + event.ID,
EventType: "EventStored",
ActorID: event.ActorID,
Version: event.Version,
Data: map[string]interface{}{
"eventId": stored.EventID,
"actorId": stored.ActorID,
"version": stored.Version,
"timestamp": stored.Timestamp,
},
Timestamp: stored.Timestamp,
}
// Publish to default namespace (internal events)
es.eventBus.Publish("__internal__", storedEvent)
// Record metrics if collector is configured
if es.metrics != nil {
es.metrics.RecordPublish("__internal__")
}
} }
// GetEvents retrieves events for an actor from a specific version // GetEvents retrieves events for an actor from a specific version

View File

@@ -1905,181 +1905,3 @@ func TestSaveEvent_MetadataPreservedAcrossMultipleEvents(t *testing.T) {
} }
} }
} }
// === EventStored Publishing Tests ===
func TestSaveEvent_WithBroadcaster_PublishesEventStored(t *testing.T) {
// Create a mock broadcaster to capture published events
broadcaster := aether.NewEventBus()
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
// Subscribe to EventStored events
ch := broadcaster.Subscribe("test-namespace")
defer broadcaster.Unsubscribe("test-namespace", ch)
event := &aether.Event{
ID: "evt-123",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{
"total": 100.50,
},
Timestamp: time.Now(),
}
// Save event
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Check if EventStored was published
select {
case publishedEvent := <-ch:
if publishedEvent == nil {
t.Fatal("received nil event from broadcaster")
}
if publishedEvent.EventType != aether.EventTypeEventStored {
t.Errorf("expected EventType %q, got %q", aether.EventTypeEventStored, publishedEvent.EventType)
}
if publishedEvent.ActorID != "order-456" {
t.Errorf("expected ActorID %q, got %q", "order-456", publishedEvent.ActorID)
}
if publishedEvent.Version != 1 {
t.Errorf("expected Version 1, got %d", publishedEvent.Version)
}
// Check data contains original event info
if publishedEvent.Data["eventId"] != "evt-123" {
t.Errorf("expected eventId %q, got %q", "evt-123", publishedEvent.Data["eventId"])
}
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for EventStored event")
}
}
func TestSaveEvent_VersionConflict_NoEventStored(t *testing.T) {
broadcaster := aether.NewEventBus()
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
// Subscribe to EventStored events
ch := broadcaster.Subscribe("test-namespace")
defer broadcaster.Unsubscribe("test-namespace", ch)
// Save first event
event1 := &aether.Event{
ID: "evt-1",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event1)
if err != nil {
t.Fatalf("SaveEvent(event1) failed: %v", err)
}
// Drain the first EventStored event
select {
case <-ch:
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for first EventStored event")
}
// Try to save event with non-increasing version (should fail)
event2 := &aether.Event{
ID: "evt-2",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1, // Same version, should conflict
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2)
if !errors.Is(err, aether.ErrVersionConflict) {
t.Fatalf("expected ErrVersionConflict, got %v", err)
}
// Verify no EventStored event was published
select {
case <-ch:
t.Fatal("expected no EventStored event, but received one")
case <-time.After(50 * time.Millisecond):
// Expected - no event published
}
}
func TestSaveEvent_MultipleEvents_PublishesMultipleEventStored(t *testing.T) {
broadcaster := aether.NewEventBus()
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
// Subscribe to EventStored events
ch := broadcaster.Subscribe("test-namespace")
defer broadcaster.Unsubscribe("test-namespace", ch)
// Save multiple events
for i := int64(1); i <= 3; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "OrderPlaced",
ActorID: "order-456",
Version: i,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
}
// Verify we received 3 EventStored events in order
for i := int64(1); i <= 3; i++ {
select {
case publishedEvent := <-ch:
if publishedEvent == nil {
t.Fatal("received nil event from broadcaster")
}
if publishedEvent.Version != i {
t.Errorf("expected Version %d, got %d", i, publishedEvent.Version)
}
case <-time.After(100 * time.Millisecond):
t.Fatalf("timeout waiting for EventStored event %d", i)
}
}
}
func TestSaveEvent_WithoutBroadcaster_NoPanic(t *testing.T) {
// Test that SaveEvent works without a broadcaster (nil broadcaster)
store := NewInMemoryEventStore()
event := &aether.Event{
ID: "evt-123",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{
"total": 100.50,
},
Timestamp: time.Now(),
}
// This should not panic even though broadcaster is nil
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Verify event was saved
events, err := store.GetEvents("order-456", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 1 {
t.Fatalf("expected 1 event, got %d", len(events))
}
}