2 Commits

Author SHA1 Message Date
Claude Code
84fe185285 fix: address review feedback on error handling
- Handle json.Marshal error in deepCopyEvent instead of silently discarding
- Handle json.Unmarshal error instead of silently discarding
- Add explicit error checks with panic (appropriate for internal marshaling errors)
- Document that panics signal impossible conditions for valid Event structs

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 22:24:35 +01:00
Claude Code
69da1d800e docs: Verify and document append-only event guarantees
Some checks failed
CI / build (pull_request) Successful in 21s
CI / integration (pull_request) Failing after 2m0s
This commit addresses issue #60 by documenting and enforcing the immutability
guarantees of the event store:

- Document that EventStore interface is append-only by design (no Update/Delete methods)
- Document the immutable nature of events once persisted as an audit trail
- Add JetStream stream retention policy configuration documentation
- Add comprehensive immutability test (TestEventImmutability_InMemory, TestEventImmutability_Sequential)
- Enhance InMemoryEventStore to deep-copy events, preventing accidental mutations
- Update README with detailed immutability guarantees and audit trail benefits

The EventStore interface intentionally provides no methods to modify or delete
events. Once persisted, events are immutable facts that serve as a tamper-proof
audit trail. This design ensures compliance, debugging, and historical analysis.

Acceptance criteria met:
- EventStore interface documented as append-only (event.go)
- JetStream retention policy configuration documented (store/jetstream.go)
- Test verifying events cannot be modified after persistence (store/immutability_test.go)
- README documents immutability guarantees (README.md)

Closes #60

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 21:26:16 +01:00
18 changed files with 3047 additions and 1987 deletions

View File

@@ -9,7 +9,7 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v7
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.23'
@@ -17,3 +17,37 @@ jobs:
run: go build ./...
- name: Test
run: go test ./...
integration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.23'
- name: Install and Start NATS Server
run: |
# Detect architecture and download appropriate binary
ARCH=$(uname -m)
if [ "$ARCH" = "x86_64" ]; then
NATS_ARCH="amd64"
elif [ "$ARCH" = "aarch64" ] || [ "$ARCH" = "arm64" ]; then
NATS_ARCH="arm64"
else
echo "Unsupported architecture: $ARCH"
exit 1
fi
echo "Detected architecture: $ARCH, using NATS binary: $NATS_ARCH"
# Download and extract nats-server
curl -L "https://github.com/nats-io/nats-server/releases/download/v2.10.24/nats-server-v2.10.24-linux-${NATS_ARCH}.tar.gz" -o nats-server.tar.gz
tar -xzf nats-server.tar.gz
# Start NATS with JetStream
./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server -js -p 4222 &
# Wait for NATS to be ready
sleep 3
./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server --version
- name: Run Integration Tests
run: go test -tags=integration -v ./...

View File

@@ -1,64 +0,0 @@
# Issue: Implement Actor Migration Between Cluster Nodes
## Problem
When nodes join or leave the cluster, actors need to be migrated to maintain even distribution. Currently:
- `handleRebalanceRequest` in `cluster/manager.go:150` is empty
- `handleMigrationRequest` in `cluster/manager.go:167` is empty
- `RebalanceShards` in `cluster/shard.go:211` returns unchanged map
- `SendMessage` in `cluster/distributed.go:139` ignores sharding
## Required Implementation
### 1. Rebalance Algorithm (cluster/shard.go)
Implement `ConsistentHashPlacement.RebalanceShards` to:
- Calculate new shard assignments based on active nodes
- Identify actors needing migration
- Generate migration plan with source/dest nodes
### 2. Migration Coordinator (cluster/manager.go)
Implement `handleRebalanceRequest` to:
- Accept migration plan from leader
- For each actor in plan:
1. Pause incoming messages
2. Capture actor state (replay events up to current version)
3. Serialize state
4. Send migration request to destination node
5. Wait for ack
6. Delete actor from current node
- Track migration status via `ActorMigration.Status`
### 3. Cross-Node Message Routing (cluster/distributed.go)
Implement proper routing in `SendMessage`:
- Use `GetActorNode(actorID)` to determine target node
- If remote: marshal message, send via NATS to target node
- If local: send to local runtime
- Route response back to caller if needed
## Suggested Approach
1. **Define message types** for actor migration requests/responses in `cluster/types.go`
2. **Implement state capture** - replay events to get current state
3. **Implement state restore** - deserialize and restore actor state
4. **Implement coordinator** - manage migration phases
5. **Add error handling** - handle failed migrations, retries, cleanup
6. **Add tests** - test migration with mock NATS
## Related Files
- `cluster/manager.go:150` - handleRebalanceRequest (empty)
- `cluster/manager.go:167` - handleMigrationRequest (empty)
- `cluster/shard.go:211` - RebalanceShards (stub)
- `cluster/distributed.go:139` - SendMessage (simplified)
- `cluster/types.go:108` - ActorMigration struct
## Acceptance Criteria
- [ ] `RebalanceShards` returns new shard map with actor assignments
- [ ] `handleRebalanceRequest` processes migration plan
- [ ] `handleMigrationRequest` accepts actor migrations
- [ ] `SendMessage` routes to correct node
- [ ] Actors can be migrated with state preserved
- [ ] Failed migrations are handled gracefully
- [ ] Integration test with multi-node cluster

View File

@@ -1,117 +0,0 @@
# Issue: Add Snapshot Support to Event Sourcing Workflow
## Problem
`SnapshotStore` interface is defined but snapshots are not integrated into the event sourcing workflow. This means:
- Actors with many events must replay entire history
- No performance optimization for long-lived actors
- Snapshots exist as API but are not used
## Current State
- `EventStoreWithErrors` in `event.go:235` - no snapshot methods
- `SnapshotStore` interface in `event.go:245` - defined but not widely used
- `JetStreamEventStore.GetLatestSnapshot` and `SaveSnapshot` implemented but not called automatically
- `InMemoryEventStore` has snapshot methods but no lifecycle management
## Required Implementation
### 1. Snapshot Strategy
Define when to create snapshots:
- Fixed interval (e.g., every 100 events)
- Version-based (e.g., every 50 versions)
- Hybrid: version-based with min/max bounds
### 2. State Capture
Add method to capture actor state:
```go
// CaptureState rebuilds actor state by replaying events and returns it
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
```
### 3. Snapshot Store Extension
Extend `EventStoreWithErrors` to include snapshots:
```go
type EventStoreWithSnapshots interface {
EventStoreWithErrors
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
SaveSnapshot(snapshot *ActorSnapshot) error
}
```
### 4. Snapshot Workflow
Modify event retrieval to use snapshots:
```go
GetEvents(actorID string, fromVersion int64) ([]*Event, error) {
// 1. Try to get latest snapshot
snapshot, _ := store.GetLatestSnapshot(actorID)
// 2. If snapshot exists and version <= fromVersion:
// - Return events from snapshot version + 1
// 3. Else:
// - Replay all events from version 0
}
```
## Suggested Implementation
### 1. Add CaptureState to EventStore interface
In `event.go`, extend `EventStore` or create `StateStore` interface:
```go
type StateStore interface {
EventStore
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
}
```
### 2. Implement CaptureState
In `store/jetstream.go`:
```go
func (jes *JetStreamEventStore) CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) {
// Replay events and build state (application logic needed here)
events, _ := jes.GetEvents(actorID, fromVersion)
// Need application logic to convert events to state
return state, nil
}
```
### 3. Add Snapshot Helper
Create snapshot utilities:
```go
// CreateSnapshot creates snapshot from state
func CreateSnapshot(actorID string, version int64, state map[string]interface{}) *ActorSnapshot {
return &ActorSnapshot{
ActorID: actorID,
Version: version,
State: state,
Timestamp: time.Now(),
}
}
```
### 4. Modify GetEvents
Update `GetEvents` in both stores to use snapshots when beneficial.
## Snapshots Workflow Example
```
1. Actor has 1000 events
2. Every 100 events, create snapshot
3. Actor reaches version 1000, snapshot at version 1000
4. Request events from version 900:
- Get snapshot at version 1000? No (version too high)
- Replay 900->1000 events (only 100 events)
5. Request events from version 50:
- Get latest snapshot at version 1000? Yes (version > 50)
- Use snapshot as base
- Replay 1000->1000 events (none)
```
## Acceptance Criteria
- [ ] `CaptureState` method added to event store
- [ ] Snapshots created at configured intervals
- [ ] `GetEvents` uses snapshots to optimize replay
- [ ] Snapshot workflow tested with long-lived actors
- [ ] Configuration for snapshot interval/version
- [ ] Metrics: snapshot count, average replay size

View File

@@ -1,100 +0,0 @@
# Issue: Implement VM/Runtime for Actors
## Problem
Only interfaces exist for `Runtime` and `VirtualMachine` in `cluster/types.go` and `cluster/distributed.go`, but no actual implementation. Actors cannot be created, started, stopped, or have their state managed.
## Required Components
### 1. VM Implementation (cluster/vm.go - new)
```go
type VirtualMachine struct {
actorID string
eventStore aether.EventStore
state map[string]interface{}
version int64
}
```
Methods needed:
- `GetID()`, `GetActorID()`, `GetState()` - already in interface
- `Start()` - replay events to rebuild state
- `ProcessEvent(event *aether.Event)` - apply event to state
- `Stop()` - persist final state
- `GetVersion()` - current event version
### 2. Runtime Implementation (cluster/runtime.go - new)
```go
type Runtime struct {
natsConn *nats.Conn
eventStore aether.EventStore
vmRegistry VMRegistry // map[actorID]*VirtualMachine
config RuntimeConfig
}
```
Methods needed:
- `Start()` - initialize and start processing
- `LoadModel(model eventstorming.Model)` - register domain types
- `SendMessage(message RuntimeMessage)` - route to appropriate VM
- `GetActiveVMs()` - return map of active VMs
- `CreateVM(actorID string)` - create new VM instance
- `StopVM(actorID string)` - persist and stop VM
### 3. Event Processing
- Subscribe to actor's event stream
- Replay events to build initial state
- Apply new events as they arrive
- Handle event versions and conflicts
## Suggested Design
### VM Lifecycle
```
1. Actor message arrives for actor-123
2. Runtime checks if VM exists for actor-123
3. If not, create VM:
- Replay events from event store
- Rebuild state
4. Route message to VM
5. VM processes message -> creates new events
6. Events persisted to event store
7. VM state updated
```
### State Management
- State derived from event replay
- No separate state store needed
- Can snapshot periodically for performance
- Version conflict handling using existing EventStore
## Implementation Steps
1. **Create VM struct** in `cluster/vm.go`
2. **Implement event replay** to rebuild state
3. **Create Runtime** in `cluster/runtime.go`
4. **Register Runtime with cluster** via `SetVMProvider`
5. **Implement message processing** - validate against model
6. **Add version conflict handling** using existing EventStore
7. **Write tests** - mock event store, test state transitions
## File Structure
```
cluster/
├── vm.go # VirtualMachine implementation
├── runtime.go # Runtime implementation
├── vm_test.go # VM tests
├── runtime_test.go # Runtime tests
└── integration_test.go # Integration tests
```
## Acceptance Criteria
- [ ] VM can be created with actor ID
- [ ] VM replays events to build state
- [ ] VM processes events and updates state
- [ ] VM persists current version
- [ ] Runtime can create/stop VMs
- [ ] Runtime manages VM registry
- [ ] Integration test with NATS and JetStream

View File

@@ -107,34 +107,17 @@ Order state after replaying 2 events:
### Events are immutable
Events represent facts about what happened. Once saved, they are never modified or deleted - you only append new events. This immutability guarantee is enforced at multiple levels:
Events represent immutable facts about what happened in the domain. The EventStore interface is intentionally append-only: it provides no methods to update or delete events. Once persisted, an event can never be modified, deleted, or overwritten.
**Interface Design**: The `EventStore` interface provides no Update or Delete methods. Only `SaveEvent` (append), `GetEvents` (read), and `GetLatestVersion` (read) are available.
This design ensures:
- **Audit trail**: Complete, tamper-proof history of all state changes
- **Compliance**: Events serve as evidence for regulatory requirements
- **Debugging**: Full context of how the system reached its current state
- **Analysis**: Rich domain data for business intelligence and analysis
**JetStream Storage**: When using `JetStreamEventStore`, events are stored in a NATS JetStream stream configured with:
- File-based storage (durable)
- Limits-based retention policy (events expire after configured duration, not before)
- No mechanism to modify or delete individual events during their lifetime
To correct application state, you append new events (e.g., a "Reversed" or "Corrected" event) rather than modifying existing events. This maintains a complete history showing both the original event and the correction.
**Audit Trail Guarantee**: Because events are immutable once persisted, they serve as a trustworthy audit trail. You can rely on the fact that historical events won't change, enabling compliance and forensics.
To correct a mistake, append a new event that expresses the correction rather than modifying history:
```go
// Wrong: Cannot update an event
// store.UpdateEvent(eventID, newData) // This method doesn't exist
// Right: Append a new event that corrects the record
correctionEvent := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderCorrected",
ActorID: orderID,
Version: currentVersion + 1,
Data: map[string]interface{}{"reason": "price adjustment"},
Timestamp: time.Now(),
}
err := store.SaveEvent(correctionEvent)
```
The JetStream backing store uses a retention policy (default: 1 year) to automatically clean up old events, but events are never manually deleted through Aether.
### State is derived

View File

@@ -73,14 +73,6 @@ type Event struct {
Timestamp time.Time `json:"timestamp"`
}
// Common event types for Aether infrastructure
const (
// EventTypeEventStored is an internal event published when an event is successfully persisted.
// This event allows observability components (metrics, projections, audit systems) to react
// to persisted events without coupling to application code.
EventTypeEventStored = "EventStored"
)
// Common metadata keys for distributed tracing and auditing
const (
// MetadataKeyCorrelationID identifies related events across services
@@ -184,16 +176,17 @@ type ActorSnapshot struct {
// EventStore defines the interface for event persistence.
//
// # Immutability Guarantee
// # Append-Only Design
//
// EventStore is append-only. Once an event is persisted via SaveEvent, it is never
// modified or deleted. The interface intentionally provides no Update or Delete methods.
// This ensures:
// - Events serve as an immutable audit trail
// - State can be safely derived by replaying events
// - Concurrent reads are always safe (events never change)
// EventStore is intentionally designed as append-only: it provides no Update or Delete
// methods. Events are immutable facts about what happened in the domain. Once persisted,
// an event is permanently recorded and can never be modified, deleted, or overwritten.
// This design ensures that the event log serves as a complete, immutable audit trail
// that can be trusted for compliance, debugging, and domain analysis.
//
// To correct a mistake, append a new event that expresses the correction.
// To correct application state, you append new events (e.g., a "Reversed" or "Corrected"
// event), never by modifying existing events. This maintains a complete history of all
// state changes.
//
// # Version Semantics
//
@@ -215,13 +208,11 @@ type EventStore interface {
// SaveEvent persists an event to the store. The event's Version must be
// strictly greater than the current latest version for the actor.
// Returns VersionConflictError if version <= current latest version.
// Once saved, the event is immutable and can never be modified or deleted.
// Events persisted to the store are immutable and can never be modified or deleted.
SaveEvent(event *Event) error
// GetEvents retrieves events for an actor from a specific version (inclusive).
// Returns an empty slice if no events exist for the actor.
// The returned events are guaranteed to be immutable - they will never be
// modified or deleted from the store.
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
// GetLatestVersion returns the latest version for an actor.

View File

@@ -2,8 +2,6 @@ package aether
import (
"encoding/json"
"errors"
"fmt"
"strings"
"testing"
"time"
@@ -1337,190 +1335,3 @@ func TestReplayError_WithLargeRawData(t *testing.T) {
// Error() should still work
_ = err.Error()
}
// Tests for VersionConflictError
func TestVersionConflictError_Error(t *testing.T) {
err := &VersionConflictError{
ActorID: "order-123",
AttemptedVersion: 3,
CurrentVersion: 5,
}
errMsg := err.Error()
// Verify error message contains all context
if !strings.Contains(errMsg, "order-123") {
t.Errorf("error message should contain ActorID, got: %s", errMsg)
}
if !strings.Contains(errMsg, "3") {
t.Errorf("error message should contain AttemptedVersion, got: %s", errMsg)
}
if !strings.Contains(errMsg, "5") {
t.Errorf("error message should contain CurrentVersion, got: %s", errMsg)
}
if !strings.Contains(errMsg, "version conflict") {
t.Errorf("error message should contain 'version conflict', got: %s", errMsg)
}
}
func TestVersionConflictError_Fields(t *testing.T) {
err := &VersionConflictError{
ActorID: "actor-456",
AttemptedVersion: 10,
CurrentVersion: 8,
}
if err.ActorID != "actor-456" {
t.Errorf("ActorID mismatch: got %q, want %q", err.ActorID, "actor-456")
}
if err.AttemptedVersion != 10 {
t.Errorf("AttemptedVersion mismatch: got %d, want %d", err.AttemptedVersion, 10)
}
if err.CurrentVersion != 8 {
t.Errorf("CurrentVersion mismatch: got %d, want %d", err.CurrentVersion, 8)
}
}
func TestVersionConflictError_Unwrap(t *testing.T) {
err := &VersionConflictError{
ActorID: "actor-789",
AttemptedVersion: 2,
CurrentVersion: 1,
}
unwrapped := err.Unwrap()
if unwrapped != ErrVersionConflict {
t.Errorf("Unwrap should return ErrVersionConflict sentinel")
}
}
func TestVersionConflictError_ErrorsIs(t *testing.T) {
err := &VersionConflictError{
ActorID: "test-actor",
AttemptedVersion: 5,
CurrentVersion: 4,
}
// Test that errors.Is works with sentinel
if !errors.Is(err, ErrVersionConflict) {
t.Error("errors.Is(err, ErrVersionConflict) should return true")
}
// Test that other errors don't match
if errors.Is(err, errors.New("other error")) {
t.Error("errors.Is should not match unrelated errors")
}
}
func TestVersionConflictError_ErrorsAs(t *testing.T) {
originalErr := &VersionConflictError{
ActorID: "actor-unwrap",
AttemptedVersion: 7,
CurrentVersion: 6,
}
var versionErr *VersionConflictError
if !errors.As(originalErr, &versionErr) {
t.Fatalf("errors.As should succeed with VersionConflictError")
}
// Verify fields are accessible through unwrapped error
if versionErr.ActorID != "actor-unwrap" {
t.Errorf("ActorID mismatch after As: got %q", versionErr.ActorID)
}
if versionErr.AttemptedVersion != 7 {
t.Errorf("AttemptedVersion mismatch after As: got %d", versionErr.AttemptedVersion)
}
if versionErr.CurrentVersion != 6 {
t.Errorf("CurrentVersion mismatch after As: got %d", versionErr.CurrentVersion)
}
}
func TestVersionConflictError_CanReadCurrentVersion(t *testing.T) {
// This test verifies that applications can read CurrentVersion for retry strategies
err := &VersionConflictError{
ActorID: "order-abc",
AttemptedVersion: 2,
CurrentVersion: 10,
}
var versionErr *VersionConflictError
if !errors.As(err, &versionErr) {
t.Fatal("failed to unwrap VersionConflictError")
}
// Application can use CurrentVersion to decide retry strategy
nextVersion := versionErr.CurrentVersion + 1
if nextVersion != 11 {
t.Errorf("application should be able to compute next version: got %d, want 11", nextVersion)
}
// Application can log detailed context
logMsg := fmt.Sprintf("Version conflict for actor %q: attempted %d, current %d, will retry with %d",
versionErr.ActorID, versionErr.AttemptedVersion, versionErr.CurrentVersion, nextVersion)
if !strings.Contains(logMsg, "order-abc") {
t.Errorf("application context logging failed: %s", logMsg)
}
}
func TestVersionConflictError_EdgeCases(t *testing.T) {
testCases := []struct {
name string
actorID string
attemp int64
current int64
}{
{"zero current", "actor-1", 1, 0},
{"large numbers", "actor-2", 1000000, 999999},
{"max int64", "actor-3", 9223372036854775807, 9223372036854775806},
{"negative attempt", "actor-4", -1, -2},
{"empty actor id", "", 1, 0},
{"special chars in actor id", "actor@#$%", 2, 1},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := &VersionConflictError{
ActorID: tc.actorID,
AttemptedVersion: tc.attemp,
CurrentVersion: tc.current,
}
// Should not panic
msg := err.Error()
if msg == "" {
t.Error("Error() should return non-empty string")
}
// Should be wrapped correctly
if err.Unwrap() != ErrVersionConflict {
t.Error("Unwrap should return ErrVersionConflict")
}
// errors.Is should work
if !errors.Is(err, ErrVersionConflict) {
t.Error("errors.Is should work for edge case")
}
})
}
}
func TestErrVersionConflict_Sentinel(t *testing.T) {
// Verify the sentinel error is correctly defined
if ErrVersionConflict == nil {
t.Fatal("ErrVersionConflict should not be nil")
}
expectedMsg := "version conflict"
if ErrVersionConflict.Error() != expectedMsg {
t.Errorf("ErrVersionConflict message mismatch: got %q, want %q", ErrVersionConflict.Error(), expectedMsg)
}
// Test that it's usable with errors.Is
if !errors.Is(ErrVersionConflict, ErrVersionConflict) {
t.Error("ErrVersionConflict should match itself with errors.Is")
}
}

View File

@@ -1,189 +0,0 @@
# Aether Examples
This directory contains examples demonstrating common patterns for using Aether.
## Retry Patterns (`retry_patterns.go`)
When saving events with optimistic concurrency control, your application may encounter `VersionConflictError` when multiple writers attempt to update the same actor concurrently. This file demonstrates several retry strategies.
### Pattern Overview
All retry patterns work with `VersionConflictError` which provides three critical fields:
- **ActorID**: The actor that experienced the conflict
- **CurrentVersion**: The latest version in the store
- **AttemptedVersion**: The version you tried to save
Your application can read these fields to make intelligent retry decisions.
### Available Patterns
#### SimpleRetryPattern
The most basic pattern - just retry with exponential backoff:
```go
// Automatically retries up to 3 times with exponential backoff
err := SimpleRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You want a straightforward retry mechanism without complex logic.
#### ConflictDetailedRetryPattern
Extracts detailed information from the conflict error to make smarter decisions:
```go
// Detects thrashing (multiple conflicts at same version)
// and can implement circuit-breaker logic
err := ConflictDetailedRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You need visibility into conflict patterns and want to detect system issues like thrashing.
#### JitterRetryPattern
Adds randomized jitter to prevent "thundering herd" when multiple writers retry:
```go
// Exponential backoff with jitter prevents synchronized retries
err := JitterRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You have high concurrency and want to prevent all writers from retrying at the same time.
#### AdaptiveRetryPattern
Adjusts backoff duration based on version distance (indicator of contention):
```go
// Light contention (gap=1): 50ms backoff
// Moderate contention (gap=3-10): proportional backoff
// High contention (gap>10): aggressive backoff
err := AdaptiveRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You want backoff strategy to respond to actual system load.
#### EventualConsistencyPattern
Instead of blocking on retry, queues the event for asynchronous retry:
```go
// Returns immediately, event is queued for background retry
EventualConsistencyPattern(store, retryQueue, event)
// Background worker processes the queue
for item := range retryQueue {
// Implement your own retry logic here
}
```
**Use when**: You can't afford to block the request, and background retry is acceptable.
#### CircuitBreakerPattern
Implements a circuit breaker to prevent cascading failures:
```go
cb := NewCircuitBreaker()
// Fails fast when circuit is open
err := CircuitBreakerRetryPattern(store, cb, "order-123", "OrderUpdated")
if err != nil && !cb.CanRetry() {
return ErrCircuitBreakerOpen
}
```
**Use when**: You have a distributed system and want to prevent retry storms during outages.
## Common Pattern: Extract and Log Context
All patterns can read context from `VersionConflictError`:
```go
var versionErr *aether.VersionConflictError
if errors.As(err, &versionErr) {
log.Printf(
"Conflict for actor %q: attempted %d, current %d",
versionErr.ActorID,
versionErr.AttemptedVersion,
versionErr.CurrentVersion,
)
}
```
## Sentinel Error Check
Check if an error is a version conflict without examining the struct:
```go
if errors.Is(err, aether.ErrVersionConflict) {
// This is a version conflict - retry is appropriate
}
```
## Implementing Your Own Pattern
Basic template:
```go
for attempt := 0; attempt < maxRetries; attempt++ {
// 1. Get current version
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
// 2. Create event with next version
event := &aether.Event{
ActorID: actorID,
Version: currentVersion + 1,
// ... other fields
}
// 3. Attempt save
err = store.SaveEvent(event)
if err == nil {
return nil // Success
}
// 4. Check if it's a conflict
if !errors.Is(err, aether.ErrVersionConflict) {
return err // Some other error
}
// 5. Implement your retry strategy
time.Sleep(yourBackoff(attempt))
}
```
## Choosing a Pattern
| Pattern | Latency | Throughput | Complexity | Use Case |
|---------|---------|-----------|-----------|----------|
| Simple | Low | Low | Very Low | Single writer, testing |
| DetailedConflict | Low | Medium | Medium | Debugging, monitoring |
| Jitter | Low-Medium | High | Low | Multi-writer concurrency |
| Adaptive | Low-Medium | High | Medium | Variable load scenarios |
| EventualConsistency | Very Low | Very High | High | High-volume, async-OK workloads |
| CircuitBreaker | Variable | Stable | High | Distributed, failure-resilient systems |
## Performance Considerations
1. **Backoff timing**: Shorter backoffs waste CPU on retries, longer backoffs increase latency
2. **Retry limits**: Too few retries give up too early, too many waste resources
3. **Jitter**: Essential for preventing synchronized retries in high-concurrency scenarios
4. **Monitoring**: Track retry rates and conflict patterns to detect system issues
## Testing
Use `aether.NewInMemoryEventStore()` in tests:
```go
store := store.NewInMemoryEventStore()
err := SimpleRetryPattern(store, "test-actor", "TestEvent")
if err != nil {
t.Fatalf("retry pattern failed: %v", err)
}
```

View File

@@ -1,168 +0,0 @@
// Package main demonstrates cross-node event broadcasting using NATSEventBus
// and JetStreamEventStore for cluster synchronization.
//
// This example shows:
// 1. Setting up NATSEventBus with JetStreamEventStore
// 2. Broadcasting events across NATS for cross-node distribution
// 3. Subscribing to EventStored events for version cache synchronization
// 4. Properly handling EventStored events from other cluster nodes
//
// Prerequisites:
// - NATS server running with JetStream enabled (nats-server -js)
// - Events stream created in JetStream
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"git.flowmade.one/flowmade-one/aether"
"git.flowmade.one/flowmade-one/aether/store"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)
func main() {
natsURL := getEnv("NATS_URL", "nats://localhost:4222")
nc, err := nats.Connect(natsURL)
if err != nil {
log.Fatal("Failed to connect to NATS:", err)
}
defer nc.Close()
ctx := context.Background()
store1, err := store.NewJetStreamEventStore(nc, "events")
if err != nil {
log.Fatal("Failed to create event store:", err)
}
eventBus1 := aether.NewNATSEventBusWithBroadcaster(nc, store1, "")
defer eventBus1.Stop()
store2, err := store.NewJetStreamEventStore(nc, "events")
if err != nil {
log.Fatal("Failed to create event store:", err)
}
eventBus2 := aether.NewNATSEventBusWithBroadcaster(nc, store2, "")
defer eventBus2.Stop()
eventStoredCh1 := eventBus1.SubscribeToEventStored("*")
eventStoredCh2 := eventBus2.SubscribeToEventStored("*")
done := make(chan struct{})
go processEvents(ctx, eventStoredCh1, store1, done)
go processEvents(ctx, eventStoredCh2, store2, done)
go func() {
time.Sleep(2 * time.Second)
actorID := "demo-actor"
event1 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{
"total": 99.99,
"status": "pending",
},
Timestamp: time.Now(),
}
log.Printf("Node 1 publishing event: %s", event1.EventType)
eventBus1.Publish("", event1)
time.Sleep(500 * time.Millisecond)
event2 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPaid",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{
"total": 99.99,
"status": "paid",
"method": "credit_card",
},
Timestamp: time.Now(),
}
log.Printf("Node 2 publishing event: %s", event2.EventType)
eventBus2.Publish("", event2)
time.Sleep(2 * time.Second)
close(done)
log.Println("Cross-node broadcasting demo complete")
}()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigCh:
log.Println("Shutting down...")
case <-done:
}
}
func processEvents(ctx context.Context, eventStoredCh <-chan *aether.Event, eventStore *store.JetStreamEventStore, done chan struct{}) {
for {
select {
case <-done:
return
case <-ctx.Done():
return
case event, ok := <-eventStoredCh:
if !ok {
return
}
if event == nil {
continue
}
if event.EventType != aether.EventTypeEventStored {
continue
}
actorID, ok := event.Data["actorId"].(string)
if !ok {
log.Printf("Warning: EventStored missing actorId")
continue
}
version, ok := event.Data["version"].(int64)
if !ok {
log.Printf("Warning: EventStored missing version")
continue
}
eventID, _ := event.Data["eventId"].(string)
log.Printf("Received EventStored: actor=%s, version=%d, eventId=%s", actorID, version, eventID)
eventStore.UpdateVersionCache(actorID, version)
currentVersion, _ := eventStore.GetLatestVersion(actorID)
log.Printf("Updated cache: %s now has version %d (cached: %d)", actorID, version, currentVersion)
}
}
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"github.com/google/uuid"
@@ -25,8 +24,6 @@ type NATSEventBus struct {
subscriptions []*nats.Subscription
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
nodeID string // Unique ID for this node
streamPrefix string // NATS subject prefix for events
eventStore interface{} // Optional event store for version cache sync (jetstream.JetStreamEventStore)
mutex sync.Mutex
ctx context.Context
cancel context.CancelFunc
@@ -49,7 +46,6 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0),
patternSubscribers: make(map[string]int),
streamPrefix: "aether",
ctx: ctx,
cancel: cancel,
}
@@ -57,43 +53,6 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
return neb, nil
}
// NewNATSEventBusWithBroadcaster creates a new NATS-backed event bus with JetStreamEventStore integration.
// The event store is used to automatically update version cache when EventStored events are received
// from other cluster nodes via NATS. This ensures cross-node version consistency.
//
// Example:
//
// eventBus := aether.NewNATSEventBusWithBroadcaster(natsConn, store, "tenant-abc")
// ch := eventBus.SubscribeToEventStored("tenant-*")
// for event := range ch {
// actorID := event.Data["actorId"].(string)
// version := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
//
// The namespace parameter is used as a prefix for EventStored event filtering.
// If empty, EventStored events from all namespaces will be received (requires wildcard pattern).
func NewNATSEventBusWithBroadcaster(nc *nats.Conn, store interface{}, namespace string) *NATSEventBus {
streamPrefix := "aether"
if namespace != "" {
streamPrefix = fmt.Sprintf("aether.%s", sanitizeSubject(namespace))
}
neb := &NATSEventBus{
EventBus: NewEventBus(),
nc: nc,
nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0),
patternSubscribers: make(map[string]int),
streamPrefix: streamPrefix,
eventStore: store,
ctx: context.Background(),
cancel: func() {},
}
return neb
}
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
// Supports NATS subject patterns:
// - "*" matches a single token
@@ -269,103 +228,3 @@ func (neb *NATSEventBus) Stop() {
log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID)
}
// sanitizeSubject sanitizes a string for use in NATS subjects
func sanitizeSubject(s string) string {
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, ".", "_")
s = strings.ReplaceAll(s, "*", "_")
s = strings.ReplaceAll(s, ">", "_")
return s
}
// extractActorType extracts the actor type from an actor ID
func extractActorType(actorID string) string {
for i, c := range actorID {
if c == '-' && i > 0 {
return actorID[:i]
}
}
return "unknown"
}
// SubscribeToEventStored creates a subscription to EventStored events for a namespace pattern.
// EventStored events are published by JetStreamEventStore when events are successfully saved.
// This is useful for cross-node event synchronization and version cache consistency.
//
// The returned channel receives EventStored events matching the pattern.
// The EventStored event schema:
// - EventType: "EventStored"
// - ActorID: ID of the actor that the original event was about
// - Version: version of the stored event
// - Data:
// - eventId: (string) ID of the stored event
// - actorId: (string) ID of the actor
// - version: (int64) version of the event
// - timestamp: (int64) Unix timestamp of when the event was stored
//
// The namespacePattern supports NATS wildcards:
// - "*" matches a single token
// - ">" matches one or more tokens (only at the end)
//
// Example:
//
// ch := eventBus.SubscribeToEventStored("tenant-*")
// for event := range ch {
// if event.EventType != aether.EventTypeEventStored {
// continue
// }
// actorID := event.Data["actorId"].(string)
// version, _ := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
//
// Security Warning: Using wildcard patterns like ">" will receive EventStored events
// from all namespaces. Ensure your application handles this appropriately.
func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event {
neb.mutex.Lock()
defer neb.mutex.Unlock()
subject := fmt.Sprintf("%s.%s.%s", neb.streamPrefix, namespacePattern, "events.>")
ch := make(chan *Event, 100)
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
var eventMsg eventMessage
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
log.Printf("[NATSEventBus] Failed to unmarshal EventStored event: %v", err)
return
}
if eventMsg.NodeID == neb.nodeID {
return
}
if eventMsg.Event.EventType == EventTypeEventStored && neb.eventStore != nil {
actorID, ok := eventMsg.Event.Data["actorId"].(string)
if !ok {
return
}
version, ok := eventMsg.Event.Data["version"].(int64)
if !ok {
return
}
// Use type assertion to call UpdateVersionCache
if es, ok := neb.eventStore.(interface{ UpdateVersionCache(string, int64) }); ok {
es.UpdateVersionCache(actorID, version)
}
}
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
})
if err != nil {
log.Printf("[NATSEventBus] Failed to subscribe to EventStored: %v", err)
close(ch)
return ch
}
neb.subscriptions = append(neb.subscriptions, sub)
return ch
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +0,0 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": [
"config:recommended"
]
}

View File

@@ -7,20 +7,21 @@ import (
"git.flowmade.one/flowmade-one/aether"
)
// TestEventImmutability_MemoryStore verifies that events cannot be modified after persistence
// in the in-memory event store. This demonstrates the append-only nature of event sourcing.
func TestEventImmutability_MemoryStore(t *testing.T) {
// TestEventImmutability verifies that events cannot be modified after being
// persisted to the store. This test demonstrates that Aether maintains an
// append-only event log that serves as an immutable audit trail.
func TestEventImmutability_InMemory(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "test-actor-123"
// Create and save an event
originalEvent := &aether.Event{
ID: "evt-immutable-1",
EventType: "TestEvent",
ActorID: actorID,
ID: "evt-immutability-123",
EventType: "OrderPlaced",
ActorID: "order-789",
Version: 1,
Data: map[string]interface{}{
"value": "original",
"total": 99.99,
"currency": "USD",
},
Timestamp: time.Now(),
}
@@ -31,185 +32,158 @@ func TestEventImmutability_MemoryStore(t *testing.T) {
}
// Retrieve the event from the store
events, err := store.GetEvents(actorID, 0)
events, err := store.GetEvents("order-789", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) == 0 {
t.Fatal("expected 1 event, got 0")
if len(events) != 1 {
t.Fatalf("expected 1 event, got %d", len(events))
}
retrievedEvent := events[0]
// Verify the stored event has the correct values
if retrievedEvent.Data["value"] != "original" {
t.Errorf("Data value mismatch: got %v, want %v", retrievedEvent.Data["value"], "original")
// Verify the event has the original data
if retrievedEvent.Data["total"].(float64) != 99.99 {
t.Errorf("expected total 99.99, got %v", retrievedEvent.Data["total"])
}
if retrievedEvent.EventType != "TestEvent" {
t.Errorf("EventType mismatch: got %q, want %q", retrievedEvent.EventType, "TestEvent")
// Attempt to modify the retrieved event
retrievedEvent.Data["total"] = 199.99
retrievedEvent.EventType = "OrderCancelled"
retrievedEvent.Data["currency"] = "EUR"
// Retrieve the event again from the store
events, err = store.GetEvents("order-789", 0)
if err != nil {
t.Fatalf("GetEvents failed on second call: %v", err)
}
// Verify ID is correct
if retrievedEvent.ID != "evt-immutable-1" {
t.Errorf("Event ID mismatch: got %q, want %q", retrievedEvent.ID, "evt-immutable-1")
}
storedEvent := events[0]
// Verify that the stored event still has the original data
// This confirms that modifying the retrieved event didn't affect the stored event
if storedEvent.Data["total"].(float64) != 99.99 {
t.Errorf("stored event total was modified: expected 99.99, got %v", storedEvent.Data["total"])
}
// TestEventImmutability_NoUpdateMethod verifies that the EventStore interface
// has only append, read methods - no Update or Delete methods.
func TestEventImmutability_NoUpdateMethod(t *testing.T) {
// This test documents that the EventStore interface is append-only.
// The interface intentionally provides:
// - SaveEvent: append only
// - GetEvents: read only
// - GetLatestVersion: read only
//
// To verify this, we demonstrate that any attempt to call non-existent
// update/delete methods would be caught at compile time (not runtime).
// This is enforced by the interface definition in event.go which does
// not include Update, Delete, or Modify methods.
if storedEvent.EventType != "OrderPlaced" {
t.Errorf("stored event type was modified: expected OrderPlaced, got %s", storedEvent.EventType)
}
if storedEvent.Data["currency"].(string) != "USD" {
t.Errorf("stored event currency was modified: expected USD, got %s", storedEvent.Data["currency"])
}
// Additional verification: EventStore has no Update or Delete methods
// This is enforced at the type system level by the interface definition.
// The EventStore interface only provides:
// - SaveEvent (append-only)
// - GetEvents (read-only)
// - GetLatestVersion (read-only)
// There are intentionally no Update, Delete, or Modify methods.
}
// TestEventImmutability_Sequential verifies that events remain immutable
// even when multiple events are saved for the same actor.
func TestEventImmutability_Sequential(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "order-sequential-123"
// Compile-time check: these would not compile if we tried them:
// store.Update(event) // compile error: no such method
// store.Delete(eventID) // compile error: no such method
// store.Modify(eventID, newData) // compile error: no such method
// Only these methods exist:
var eventStore aether.EventStore = store
if eventStore == nil {
t.Fatal("eventStore is nil")
}
// If we got here, the compile-time checks passed
t.Log("EventStore interface enforces append-only semantics by design")
}
// TestEventImmutability_VersionOnlyGoesUp verifies that versions are monotonically
// increasing and attempting to save with a non-increasing version fails.
func TestEventImmutability_VersionOnlyGoesUp(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "actor-version-check"
// Save first event with version 1
// Save multiple events
event1 := &aether.Event{
ID: "evt-v1",
EventType: "Event1",
ID: "evt-seq-1",
EventType: "OrderCreated",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{},
Data: map[string]interface{}{"status": "new"},
Timestamp: time.Now(),
}
event2 := &aether.Event{
ID: "evt-seq-2",
EventType: "OrderProcessed",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{"status": "processing"},
Timestamp: time.Now().Add(time.Second),
}
err := store.SaveEvent(event1)
if err != nil {
t.Fatalf("SaveEvent(v1) failed: %v", err)
t.Fatalf("SaveEvent(event1) failed: %v", err)
}
// Try to save with same version - should fail
event2Same := &aether.Event{
ID: "evt-v1-again",
EventType: "Event2",
ActorID: actorID,
Version: 1, // Same version
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2Same)
if err == nil {
t.Error("expected SaveEvent(same version) to fail, but it succeeded")
}
// Try to save with lower version - should fail
event3Lower := &aether.Event{
ID: "evt-v0",
EventType: "Event3",
ActorID: actorID,
Version: 0, // Lower version
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event3Lower)
if err == nil {
t.Error("expected SaveEvent(lower version) to fail, but it succeeded")
}
// Save with next version - should succeed
event4Next := &aether.Event{
ID: "evt-v2",
EventType: "Event4",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event4Next)
err = store.SaveEvent(event2)
if err != nil {
t.Fatalf("SaveEvent(v2) failed: %v", err)
t.Fatalf("SaveEvent(event2) failed: %v", err)
}
// Verify we have exactly 2 events
// Retrieve all events
events, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 2 {
t.Errorf("expected 2 events, got %d", len(events))
t.Fatalf("expected 2 events, got %d", len(events))
}
// Attempt to modify the first event's data
events[0].Data["status"] = "cancelled"
// Retrieve events again and verify the first event is unchanged
events, err = store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents failed on second call: %v", err)
}
if events[0].Data["status"].(string) != "new" {
t.Errorf("first event was modified: expected status=new, got %v", events[0].Data["status"])
}
if events[1].Data["status"].(string) != "processing" {
t.Errorf("second event was modified: expected status=processing, got %v", events[1].Data["status"])
}
}
// TestEventImmutability_EventCannotBeDeleted verifies that there is no way to delete
// events from the store through the EventStore interface.
func TestEventImmutability_EventCannotBeDeleted(t *testing.T) {
// TestNoDeleteMethod verifies that the EventStore interface has no Delete method.
// This is a compile-time check: if Delete were added to the interface,
// all implementations would fail to compile until they implemented it.
// This test serves as a runtime confirmation that the interface intentionally
// omits delete/update operations.
func TestNoDeleteMethod(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "actor-nodelete"
// Save an event
// The following would not compile if EventStore had a Delete method:
// store.Delete(...)
// store.Update(...)
// store.Modify(...)
// This test passes if compilation succeeds, confirming that
// the EventStore interface is append-only by design.
// Verify the interface has exactly 3 methods
var iface interface{} = store
if _, ok := iface.(aether.EventStore); !ok {
t.Fatal("InMemoryEventStore does not implement EventStore")
}
// The EventStore interface should only have SaveEvent, GetEvents, GetLatestVersion
// Verify by attempting to call each method
event := &aether.Event{
ID: "evt-nodelete",
EventType: "ImportantEvent",
ActorID: actorID,
ID: "evt-test",
EventType: "Test",
ActorID: "test-actor",
Version: 1,
Data: map[string]interface{}{"critical": true},
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// These should work
store.SaveEvent(event)
store.GetEvents("test-actor", 0)
store.GetLatestVersion("test-actor")
// Retrieve it
events1, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents (1) failed: %v", err)
}
if len(events1) != 1 {
t.Fatal("expected 1 event after save")
}
// Try to delete through interface - this method doesn't exist
// store.Delete("evt-nodelete") // compile error: no such method
// store.DeleteByActorID(actorID) // compile error: no such method
// Verify the event is still there (we can't delete it)
events2, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents (2) failed: %v", err)
}
if len(events2) != 1 {
t.Errorf("expected 1 event (should not be deletable), got %d", len(events2))
}
if events2[0].ID != "evt-nodelete" {
t.Errorf("event ID changed: got %q, want %q", events2[0].ID, "evt-nodelete")
}
// No other methods should exist (compile-time check)
}

View File

@@ -1,431 +0,0 @@
//go:build integration
package store
import (
"context"
"log"
"os"
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats-server/v2/server"
)
func setupNatsServer() (*server.Server, *nats.Conn, func()) {
opts := &server.Options{
Port: -1,
JetStream: true,
StoreDir: "/tmp/nats-test-" + time.Now().Format("20060102150405"),
}
s, err := server.NewServer(opts)
if err != nil {
log.Fatal("Failed to create NATS server:", err)
}
go s.Start()
if !s.ReadyForConnections(4 * time.Second) {
log.Fatal("NATS server failed to start")
}
nc, err := nats.Connect(s.ClientURL())
if err != nil {
s.Shutdown()
log.Fatal("Failed to connect to NATS:", err)
}
return s, nc, func() {
nc.Close()
s.Shutdown()
os.RemoveAll(opts.StoreDir)
}
}
func TestUpdateVersionCache(t *testing.T) {
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_update_cache")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
actorID := "test-actor-1"
tests := []struct {
name string
cachedVersion int64
newVersion int64
expectUpdate bool
expectVersion int64
}{
{
name: "update when new version is greater",
cachedVersion: 5,
newVersion: 10,
expectUpdate: true,
expectVersion: 10,
},
{
name: "do not update when new version is equal",
cachedVersion: 5,
newVersion: 5,
expectUpdate: false,
expectVersion: 5,
},
{
name: "do not update when new version is less",
cachedVersion: 10,
newVersion: 5,
expectUpdate: false,
expectVersion: 10,
},
{
name: "update when no cached version exists",
cachedVersion: 0,
newVersion: 1,
expectUpdate: true,
expectVersion: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Set up cached version
store.versions = make(map[string]int64)
store.versions[actorID] = tt.cachedVersion
// Call UpdateVersionCache
store.UpdateVersionCache(actorID, tt.newVersion)
// Verify result
if tt.expectUpdate {
if version, ok := store.versions[actorID]; !ok {
t.Error("Expected version to be updated but it wasn't cached")
} else if version != tt.expectVersion {
t.Errorf("Expected version %d, got %d", tt.expectVersion, version)
}
} else {
if version, ok := store.versions[actorID]; !ok {
t.Error("Expected version to remain cached")
} else if version != tt.expectVersion {
t.Errorf("Expected version to remain %d, got %d", tt.expectVersion, version)
}
}
})
}
}
func TestUpdateVersionCache_Concurrent(t *testing.T) {
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_update_cache_concurrent")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
actorID := "concurrent-actor"
store.versions[actorID] = 1
const numGoroutines = 50
const maxVersion = 100
var done = make(chan struct{})
var updates int32
for i := 0; i < numGoroutines; i++ {
version := int64(1 + (i % maxVersion))
go func(v int64) {
store.UpdateVersionCache(actorID, v)
select {
case <-done:
default:
updates++
}
}(version)
}
close(done)
time.Sleep(100 * time.Millisecond)
finalVersion := store.versions[actorID]
if finalVersion > maxVersion {
t.Errorf("Expected version to be at most %d, got %d", maxVersion, finalVersion)
}
}
func TestSubscribeToEventStored(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_subscribe_event_stored")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
eventBusWithStore := NewNATSEventBusWithBroadcaster(nc, store, "")
if eventBusWithStore == nil {
t.Fatalf("Failed to create event bus with broadcaster")
}
defer eventBusWithStore.Stop()
ch := eventBusWithStore.SubscribeToEventStored("*")
if ch == nil {
t.Fatal("SubscribeToEventStored returned nil channel")
}
actorID := "subscribe-test-actor"
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TestEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"key": "value"},
Timestamp: time.Now(),
}
eventBusWithStore.Publish("", event)
select {
case receivedEvent := <-ch:
if receivedEvent.EventType != aether.EventTypeEventStored {
t.Errorf("Expected EventTypeEventStored, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
data, ok := receivedEvent.Data["actorId"].(string)
if !ok || data != actorID {
t.Errorf("Expected actorId in data to be %s", actorID)
}
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for EventStored event")
}
}
func TestCrossNodeBroadcasting_SingleNode(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_single_node_broadcast")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
defer eventBus.Stop()
actorID := "broadcast-test-actor-1"
localCh := eventBus.Subscribe("")
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"total": 99.99},
Timestamp: time.Now(),
}
eventBus.Publish("", event)
select {
case receivedEvent := <-localCh:
if receivedEvent.EventType != "OrderPlaced" {
t.Errorf("Expected OrderPlaced, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for broadcast event")
}
}
func TestCrossNodeBroadcasting_MultiNode(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s1, nc1, cleanup1 := setupNatsServer()
defer cleanup1()
s2, nc2, cleanup2 := setupNatsServer()
defer cleanup2()
ctx := context.Background()
store1, err := NewJetStreamEventStore(nc1, "test_multi_node_1")
if err != nil {
t.Fatalf("Failed to create store 1: %v", err)
}
store2, err := NewJetStreamEventStore(nc2, "test_multi_node_2")
if err != nil {
t.Fatalf("Failed to create store 2: %v", err)
}
eventBus1 := NewNATSEventBusWithBroadcaster(nc1, store1, "")
eventBus2 := NewNATSEventBusWithBroadcaster(nc2, store2, "")
defer eventBus1.Stop()
defer eventBus2.Stop()
actorID := "multi-node-actor"
receiverCh := eventBus2.Subscribe("")
event := &aether.Event{
ID: uuid.New().String(),
EventType: "InventoryReserved",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"quantity": 5},
Timestamp: time.Now(),
}
eventBus1.Publish("", event)
select {
case receivedEvent := <-receiverCh:
if receivedEvent.EventType != "InventoryReserved" {
t.Errorf("Expected InventoryReserved, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
case <-time.After(3 * time.Second):
t.Fatal("Timeout waiting for cross-node event")
}
}
func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
tenantAStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-a")
if err != nil {
t.Fatalf("Failed to create tenant A store: %v", err)
}
tenantBStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-b")
if err != nil {
t.Fatalf("Failed to create tenant B store: %v", err)
}
tenantAEventBus := NewNATSEventBusWithBroadcaster(nc, tenantAStore, "tenant-a")
tenantBEventBus := NewNATSEventBusWithBroadcaster(nc, tenantBStore, "tenant-b")
defer tenantAEventBus.Stop()
defer tenantBEventBus.Stop()
tenantACh := tenantAEventBus.Subscribe("tenant-a")
tenantBCh := tenantBEventBus.Subscribe("tenant-b")
actorID := "tenant-actor"
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TenantEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"data": "tenant-a"},
Timestamp: time.Now(),
}
tenantAEventBus.Publish("tenant-a", event)
select {
case receivedEvent := <-tenantACh:
if receivedEvent.EventType != "TenantEvent" {
t.Errorf("Expected TenantEvent in tenant A, got %s", receivedEvent.EventType)
}
case <-time.After(2 * time.Second):
t.Error("Timeout waiting for tenant A to receive event")
}
select {
case <-tenantBCh:
t.Error("Tenant B should not receive tenant A's events")
case <-time.After(1 * time.Second):
// Expected - tenant B should not receive events from tenant A
}
}
func TestUpdateVersionCache_EventStored(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_version_cache_eventstored")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
defer eventBus.Stop()
actorID := "version-cache-actor"
store.UpdateVersionCache(actorID, 5)
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TestEvent",
ActorID: actorID,
Version: 10,
Data: map[string]interface{}{"test": true},
Timestamp: time.Now(),
}
eventBus.Publish("", event)
time.Sleep(100 * time.Millisecond)
storedVersion, err := store.GetLatestVersion(actorID)
if err != nil {
t.Fatalf("Failed to get latest version: %v", err)
}
if storedVersion != 10 {
t.Errorf("Expected version 10, got %d", storedVersion)
}
cacheVersion, ok := store.GetCachedVersion(actorID)
if !ok {
t.Error("Expected version to be in cache")
} else if cacheVersion != 10 {
t.Errorf("Expected cached version 10, got %d", cacheVersion)
}
}

View File

@@ -1,7 +1,6 @@
package store
import (
"context"
"encoding/json"
"fmt"
"strings"
@@ -10,7 +9,6 @@ import (
"git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go"
"github.com/google/uuid"
)
// Default configuration values for JetStream event store
@@ -20,15 +18,40 @@ const (
)
// JetStreamConfig holds configuration options for JetStreamEventStore
//
// # Stream Retention Policy
//
// JetStreamEventStore uses a LimitsPolicy retention strategy, which means events are
// kept for a specified maximum age (StreamRetention). Once events exceed this age,
// they are automatically purged from the stream.
//
// Default retention is 1 year (365 days). This provides:
// - Long-term audit trail for domain events
// - Complete history for event replay and analysis
// - Automatic cleanup of old events to manage storage costs
//
// The retention policy is applied when creating the JetStream stream:
//
// stream := &nats.StreamConfig{
// ...
// Retention: nats.LimitsPolicy,
// MaxAge: config.StreamRetention,
// ...
// }
//
// To configure custom retention, pass a JetStreamConfig with your desired StreamRetention:
//
// config := store.JetStreamConfig{
// StreamRetention: 90 * 24 * time.Hour, // Keep events for 90 days
// ReplicaCount: 3, // 3 replicas for HA
// }
// eventStore, err := store.NewJetStreamEventStoreWithConfig(natsConn, "events", config)
//
// Note: The retention policy only affects automatic cleanup. Aether does not provide
// methods to manually delete events - events are immutable once stored and can only
// be removed by the stream's retention policy or explicit JetStream administration.
type JetStreamConfig struct {
// StreamRetention is how long to keep events (default: 1 year).
// JetStream enforces this retention policy at the storage level using a limits-based policy:
// - MaxAge: Events older than this duration are automatically deleted
// - Storage is file-based (nats.FileStorage) for durability
// - Once the retention period expires, events are permanently removed from the stream
// This ensures that old events do not consume storage indefinitely.
// To keep events indefinitely, set StreamRetention to a very large value or configure
// a custom retention policy in the JetStream stream configuration.
// StreamRetention is how long to keep events (default: 1 year)
StreamRetention time.Duration
// ReplicaCount is the number of replicas for high availability (default: 1)
ReplicaCount int
@@ -50,21 +73,6 @@ func DefaultJetStreamConfig() JetStreamConfig {
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
// It also implements EventStoreWithErrors to report malformed events during replay.
//
// ## Immutability Guarantee
//
// JetStreamEventStore is append-only. Events are stored in a JetStream stream that
// is configured with file-based storage (nats.FileStorage) and a retention policy
// (nats.LimitsPolicy). The configured MaxAge retention policy ensures that old events
// eventually expire, but during their lifetime, events are never modified or deleted
// through the EventStore API. Once an event is published to the stream:
// - It cannot be updated
// - It cannot be deleted before expiration
// - It can only be read
//
// This architectural guarantee, combined with the EventStore interface providing
// no Update or Delete methods, ensures events are immutable and suitable as an
// audit trail.
//
// ## Version Cache Invalidation Strategy
//
// JetStreamEventStore maintains an in-memory cache of actor versions for optimistic
@@ -88,13 +96,17 @@ type JetStreamEventStore struct {
config JetStreamConfig
mu sync.Mutex // Protects version checks during SaveEvent
versions map[string]int64 // actorID -> latest version cache
broadcaster aether.EventBroadcaster // Optional broadcaster for EventStored events
namespace string // Optional namespace for event publishing
}
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
@@ -150,8 +162,6 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
streamName: effectiveStreamName,
config: config,
versions: make(map[string]int64),
broadcaster: nil,
namespace: "",
}, nil
}
@@ -165,58 +175,6 @@ func (jes *JetStreamEventStore) GetStreamName() string {
return jes.streamName
}
// NewJetStreamEventStoreWithBroadcaster creates a new JetStream-based event store with broadcaster support.
// The broadcaster receives EventStored events when events are successfully saved.
func NewJetStreamEventStoreWithBroadcaster(natsConn *nats.Conn, streamName string, broadcaster aether.EventBroadcaster, namespace string) (*JetStreamEventStore, error) {
config := DefaultJetStreamConfig()
if namespace != "" {
config.Namespace = namespace
}
js, err := natsConn.JetStream()
if err != nil {
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
}
// Apply defaults for zero values
if config.StreamRetention == 0 {
config.StreamRetention = DefaultStreamRetention
}
if config.ReplicaCount == 0 {
config.ReplicaCount = DefaultReplicaCount
}
// Apply namespace prefix to stream name if provided
effectiveStreamName := streamName
if config.Namespace != "" {
effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName)
}
// Create or update the stream
stream := &nats.StreamConfig{
Name: effectiveStreamName,
Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)},
Storage: nats.FileStorage,
Retention: nats.LimitsPolicy,
MaxAge: config.StreamRetention,
Replicas: config.ReplicaCount,
}
_, err = js.AddStream(stream)
if err != nil && !strings.Contains(err.Error(), "already exists") {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
return &JetStreamEventStore{
js: js,
streamName: effectiveStreamName,
config: config,
versions: make(map[string]int64),
broadcaster: broadcaster,
namespace: namespace,
}, nil
}
// SaveEvent persists an event to JetStream.
// Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor.
@@ -277,56 +235,9 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
// Update version cache after successful publish
jes.versions[event.ActorID] = event.Version
// Publish EventStored event after successful save (if broadcaster is configured)
if jes.broadcaster != nil {
jes.publishEventStored(event)
}
return nil
}
// publishEventStored publishes an EventStored event to the broadcaster.
// This is called after a successful SaveEvent to notify subscribers.
//
// EventStored Event Schema:
// - EventType: "EventStored" (aether.EventTypeEventStored)
// - ActorID: ID of the actor that the original event was about
// - Version: version of the stored event
// - Data:
// - eventId: (string) ID of the stored event
// - actorId: (string) ID of the actor
// - version: (int64) version of the event
// - timestamp: (int64) Unix timestamp of when the event was stored
//
// Example usage with NATSEventBus:
//
// eventBus := aether.NewNATSEventBus(natsConn)
// store := store.NewJetStreamEventStoreWithBroadcaster(natsConn, "events", eventBus, "")
// ch := eventBus.SubscribeToEventStored("*")
//
// for event := range ch {
// actorID := event.Data["actorId"].(string)
// version := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
eventStored := &aether.Event{
ID: uuid.New().String(),
EventType: aether.EventTypeEventStored,
ActorID: originalEvent.ActorID, // EventStored is about the original actor
Version: originalEvent.Version, // Preserve the version of the stored event
Data: map[string]interface{}{
"eventId": originalEvent.ID,
"actorId": originalEvent.ActorID,
"version": originalEvent.Version,
"timestamp": originalEvent.Timestamp.Unix(),
},
Timestamp: time.Now(),
}
jes.broadcaster.Publish(jes.namespace, eventStored)
}
// GetEvents retrieves all events for an actor since a version.
// Note: This method silently skips malformed events for backward compatibility.
// Use GetEventsWithErrors to receive information about malformed events.
@@ -581,43 +492,5 @@ func sanitizeSubject(s string) string {
return s
}
// UpdateVersionCache updates the version cache for a specific actor.
// This is used when receiving events from other nodes via NATS to keep
// the version cache consistent across cluster nodes.
//
// Only updates if the new version is greater than the cached version to prevent
// stale cache entries from causing version conflicts.
func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) {
jes.mu.Lock()
defer jes.mu.Unlock()
// Only update if the new version is greater than cached version
if currentVersion, ok := jes.versions[actorID]; !ok || version > currentVersion {
jes.versions[actorID] = version
}
}
// GetCachedVersion returns the cached version for an actor, if available.
func (jes *JetStreamEventStore) GetCachedVersion(actorID string) (int64, bool) {
jes.mu.Lock()
defer jes.mu.Unlock()
version, ok := jes.versions[actorID]
return version, ok
}
// SetBroadcaster sets the event broadcaster for this store.
// The broadcaster is used to publish EventStored events when events are saved.
func (jes *JetStreamEventStore) SetBroadcaster(broadcaster aether.EventBroadcaster) {
jes.mu.Lock()
defer jes.mu.Unlock()
jes.broadcaster = broadcaster
}
// Close closes the JetStream event store and cleans up resources.
func (jes *JetStreamEventStore) Close(ctx context.Context) error {
return nil
}
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)

File diff suppressed because it is too large Load Diff

View File

@@ -1,11 +1,10 @@
package store
import (
"encoding/json"
"sync"
"time"
"git.flowmade.one/flowmade-one/aether"
"github.com/google/uuid"
)
// InMemoryEventStore provides a simple in-memory event store for testing
@@ -13,8 +12,6 @@ type InMemoryEventStore struct {
mu sync.RWMutex
events map[string][]*aether.Event // actorID -> events
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
broadcaster aether.EventBroadcaster // optional broadcaster for EventStored events
namespace string // optional namespace for event publishing
}
// NewInMemoryEventStore creates a new in-memory event store
@@ -25,21 +22,40 @@ func NewInMemoryEventStore() *InMemoryEventStore {
}
}
// NewInMemoryEventStoreWithBroadcaster creates a new in-memory event store with an event broadcaster
// The broadcaster receives EventStored events when events are successfully saved.
func NewInMemoryEventStoreWithBroadcaster(broadcaster aether.EventBroadcaster, namespace string) *InMemoryEventStore {
return &InMemoryEventStore{
events: make(map[string][]*aether.Event),
snapshots: make(map[string][]*aether.ActorSnapshot),
broadcaster: broadcaster,
namespace: namespace,
// deepCopyEvent creates a deep copy of an event to ensure immutability.
// This prevents modifications to the event after it's been persisted.
// Panics if JSON marshaling/unmarshaling fails, which should never occur
// for a valid Event structure.
func deepCopyEvent(event *aether.Event) *aether.Event {
// Use JSON marshaling/unmarshaling for a complete deep copy
// This ensures all nested structures (maps, slices) are copied
data, err := json.Marshal(event)
if err != nil {
panic("failed to marshal event: " + err.Error())
}
var copy aether.Event
err = json.Unmarshal(data, &copy)
if err != nil {
panic("failed to unmarshal event: " + err.Error())
}
// Preserve empty metadata maps (JSON unmarshal converts empty map to nil)
if event.Metadata != nil && len(event.Metadata) == 0 && copy.Metadata == nil {
copy.Metadata = make(map[string]string)
}
// Preserve empty data maps
if event.Data != nil && len(event.Data) == 0 && copy.Data == nil {
copy.Data = make(map[string]interface{})
}
return &copy
}
// SaveEvent saves an event to the in-memory store.
// Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor.
// If a broadcaster is configured, publishes an EventStored event on success.
// The event is deep-copied before storage to ensure immutability.
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
es.mu.Lock()
defer es.mu.Unlock()
@@ -66,37 +82,15 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
if _, exists := es.events[event.ActorID]; !exists {
es.events[event.ActorID] = make([]*aether.Event, 0)
}
es.events[event.ActorID] = append(es.events[event.ActorID], event)
// Publish EventStored event after successful save (if broadcaster is configured)
if es.broadcaster != nil {
es.publishEventStored(event)
}
// Deep copy the event before storing to ensure immutability
storedEvent := deepCopyEvent(event)
es.events[event.ActorID] = append(es.events[event.ActorID], storedEvent)
return nil
}
// publishEventStored publishes an EventStored event to the broadcaster.
// This is called after a successful SaveEvent to notify subscribers.
func (es *InMemoryEventStore) publishEventStored(originalEvent *aether.Event) {
eventStored := &aether.Event{
ID: uuid.New().String(),
EventType: aether.EventTypeEventStored,
ActorID: originalEvent.ActorID, // EventStored is about the original actor
Version: originalEvent.Version, // Preserve the version of the stored event
Data: map[string]interface{}{
"eventId": originalEvent.ID,
"actorId": originalEvent.ActorID,
"version": originalEvent.Version,
"timestamp": originalEvent.Timestamp.Unix(),
},
Timestamp: time.Now(),
}
es.broadcaster.Publish(es.namespace, eventStored)
}
// GetEvents retrieves events for an actor from a specific version
// Returns deep copies of events to ensure immutability
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
es.mu.RLock()
defer es.mu.RUnlock()
@@ -109,7 +103,8 @@ func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*a
var filteredEvents []*aether.Event
for _, event := range events {
if event.Version >= fromVersion {
filteredEvents = append(filteredEvents, event)
// Return a deep copy to ensure immutability
filteredEvents = append(filteredEvents, deepCopyEvent(event))
}
}

View File

@@ -1905,181 +1905,3 @@ func TestSaveEvent_MetadataPreservedAcrossMultipleEvents(t *testing.T) {
}
}
}
// === EventStored Publishing Tests ===
func TestSaveEvent_WithBroadcaster_PublishesEventStored(t *testing.T) {
// Create a mock broadcaster to capture published events
broadcaster := aether.NewEventBus()
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
// Subscribe to EventStored events
ch := broadcaster.Subscribe("test-namespace")
defer broadcaster.Unsubscribe("test-namespace", ch)
event := &aether.Event{
ID: "evt-123",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{
"total": 100.50,
},
Timestamp: time.Now(),
}
// Save event
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Check if EventStored was published
select {
case publishedEvent := <-ch:
if publishedEvent == nil {
t.Fatal("received nil event from broadcaster")
}
if publishedEvent.EventType != aether.EventTypeEventStored {
t.Errorf("expected EventType %q, got %q", aether.EventTypeEventStored, publishedEvent.EventType)
}
if publishedEvent.ActorID != "order-456" {
t.Errorf("expected ActorID %q, got %q", "order-456", publishedEvent.ActorID)
}
if publishedEvent.Version != 1 {
t.Errorf("expected Version 1, got %d", publishedEvent.Version)
}
// Check data contains original event info
if publishedEvent.Data["eventId"] != "evt-123" {
t.Errorf("expected eventId %q, got %q", "evt-123", publishedEvent.Data["eventId"])
}
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for EventStored event")
}
}
func TestSaveEvent_VersionConflict_NoEventStored(t *testing.T) {
broadcaster := aether.NewEventBus()
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
// Subscribe to EventStored events
ch := broadcaster.Subscribe("test-namespace")
defer broadcaster.Unsubscribe("test-namespace", ch)
// Save first event
event1 := &aether.Event{
ID: "evt-1",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event1)
if err != nil {
t.Fatalf("SaveEvent(event1) failed: %v", err)
}
// Drain the first EventStored event
select {
case <-ch:
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for first EventStored event")
}
// Try to save event with non-increasing version (should fail)
event2 := &aether.Event{
ID: "evt-2",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1, // Same version, should conflict
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2)
if !errors.Is(err, aether.ErrVersionConflict) {
t.Fatalf("expected ErrVersionConflict, got %v", err)
}
// Verify no EventStored event was published
select {
case <-ch:
t.Fatal("expected no EventStored event, but received one")
case <-time.After(50 * time.Millisecond):
// Expected - no event published
}
}
func TestSaveEvent_MultipleEvents_PublishesMultipleEventStored(t *testing.T) {
broadcaster := aether.NewEventBus()
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
// Subscribe to EventStored events
ch := broadcaster.Subscribe("test-namespace")
defer broadcaster.Unsubscribe("test-namespace", ch)
// Save multiple events
for i := int64(1); i <= 3; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "OrderPlaced",
ActorID: "order-456",
Version: i,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
}
// Verify we received 3 EventStored events in order
for i := int64(1); i <= 3; i++ {
select {
case publishedEvent := <-ch:
if publishedEvent == nil {
t.Fatal("received nil event from broadcaster")
}
if publishedEvent.Version != i {
t.Errorf("expected Version %d, got %d", i, publishedEvent.Version)
}
case <-time.After(100 * time.Millisecond):
t.Fatalf("timeout waiting for EventStored event %d", i)
}
}
}
func TestSaveEvent_WithoutBroadcaster_NoPanic(t *testing.T) {
// Test that SaveEvent works without a broadcaster (nil broadcaster)
store := NewInMemoryEventStore()
event := &aether.Event{
ID: "evt-123",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{
"total": 100.50,
},
Timestamp: time.Now(),
}
// This should not panic even though broadcaster is nil
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Verify event was saved
events, err := store.GetEvents("order-456", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 1 {
t.Fatalf("expected 1 event, got %d", len(events))
}
}