1 Commits

Author SHA1 Message Date
Claude Code
8c5ac500b6 feat(event-sourcing): Publish EventStored after successful SaveEvent
Some checks failed
CI / build (pull_request) Successful in 22s
CI / integration (pull_request) Failing after 2m1s
Add EventStored internal event published to the EventBus when events are
successfully persisted. This allows observability components (metrics,
projections, audit systems) to react to persisted events without coupling
to application code.

Implementation:
- Add EventTypeEventStored constant to define the event type
- Update InMemoryEventStore with optional EventBroadcaster support
- Add NewInMemoryEventStoreWithBroadcaster constructor
- Update JetStreamEventStore with EventBroadcaster support
- Add NewJetStreamEventStoreWithBroadcaster constructor
- Implement publishEventStored() helper method
- Publish EventStored containing EventID, ActorID, Version, Timestamp
- Only publish on successful SaveEvent (not on version conflicts)
- Automatically recorded in metrics through normal Publish flow

Test coverage:
- EventStored published after successful SaveEvent
- No EventStored published on version conflict
- Multiple EventStored events published in order
- SaveEvent works correctly without broadcaster (nil-safe)

Closes #61

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 21:45:11 +01:00
16 changed files with 2844 additions and 1755 deletions

View File

@@ -9,7 +9,7 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v7
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.23'
@@ -17,3 +17,37 @@ jobs:
run: go build ./...
- name: Test
run: go test ./...
integration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.23'
- name: Install and Start NATS Server
run: |
# Detect architecture and download appropriate binary
ARCH=$(uname -m)
if [ "$ARCH" = "x86_64" ]; then
NATS_ARCH="amd64"
elif [ "$ARCH" = "aarch64" ] || [ "$ARCH" = "arm64" ]; then
NATS_ARCH="arm64"
else
echo "Unsupported architecture: $ARCH"
exit 1
fi
echo "Detected architecture: $ARCH, using NATS binary: $NATS_ARCH"
# Download and extract nats-server
curl -L "https://github.com/nats-io/nats-server/releases/download/v2.10.24/nats-server-v2.10.24-linux-${NATS_ARCH}.tar.gz" -o nats-server.tar.gz
tar -xzf nats-server.tar.gz
# Start NATS with JetStream
./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server -js -p 4222 &
# Wait for NATS to be ready
sleep 3
./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server --version
- name: Run Integration Tests
run: go test -tags=integration -v ./...

View File

@@ -1,64 +0,0 @@
# Issue: Implement Actor Migration Between Cluster Nodes
## Problem
When nodes join or leave the cluster, actors need to be migrated to maintain even distribution. Currently:
- `handleRebalanceRequest` in `cluster/manager.go:150` is empty
- `handleMigrationRequest` in `cluster/manager.go:167` is empty
- `RebalanceShards` in `cluster/shard.go:211` returns unchanged map
- `SendMessage` in `cluster/distributed.go:139` ignores sharding
## Required Implementation
### 1. Rebalance Algorithm (cluster/shard.go)
Implement `ConsistentHashPlacement.RebalanceShards` to:
- Calculate new shard assignments based on active nodes
- Identify actors needing migration
- Generate migration plan with source/dest nodes
### 2. Migration Coordinator (cluster/manager.go)
Implement `handleRebalanceRequest` to:
- Accept migration plan from leader
- For each actor in plan:
1. Pause incoming messages
2. Capture actor state (replay events up to current version)
3. Serialize state
4. Send migration request to destination node
5. Wait for ack
6. Delete actor from current node
- Track migration status via `ActorMigration.Status`
### 3. Cross-Node Message Routing (cluster/distributed.go)
Implement proper routing in `SendMessage`:
- Use `GetActorNode(actorID)` to determine target node
- If remote: marshal message, send via NATS to target node
- If local: send to local runtime
- Route response back to caller if needed
## Suggested Approach
1. **Define message types** for actor migration requests/responses in `cluster/types.go`
2. **Implement state capture** - replay events to get current state
3. **Implement state restore** - deserialize and restore actor state
4. **Implement coordinator** - manage migration phases
5. **Add error handling** - handle failed migrations, retries, cleanup
6. **Add tests** - test migration with mock NATS
## Related Files
- `cluster/manager.go:150` - handleRebalanceRequest (empty)
- `cluster/manager.go:167` - handleMigrationRequest (empty)
- `cluster/shard.go:211` - RebalanceShards (stub)
- `cluster/distributed.go:139` - SendMessage (simplified)
- `cluster/types.go:108` - ActorMigration struct
## Acceptance Criteria
- [ ] `RebalanceShards` returns new shard map with actor assignments
- [ ] `handleRebalanceRequest` processes migration plan
- [ ] `handleMigrationRequest` accepts actor migrations
- [ ] `SendMessage` routes to correct node
- [ ] Actors can be migrated with state preserved
- [ ] Failed migrations are handled gracefully
- [ ] Integration test with multi-node cluster

View File

@@ -1,117 +0,0 @@
# Issue: Add Snapshot Support to Event Sourcing Workflow
## Problem
`SnapshotStore` interface is defined but snapshots are not integrated into the event sourcing workflow. This means:
- Actors with many events must replay entire history
- No performance optimization for long-lived actors
- Snapshots exist as API but are not used
## Current State
- `EventStoreWithErrors` in `event.go:235` - no snapshot methods
- `SnapshotStore` interface in `event.go:245` - defined but not widely used
- `JetStreamEventStore.GetLatestSnapshot` and `SaveSnapshot` implemented but not called automatically
- `InMemoryEventStore` has snapshot methods but no lifecycle management
## Required Implementation
### 1. Snapshot Strategy
Define when to create snapshots:
- Fixed interval (e.g., every 100 events)
- Version-based (e.g., every 50 versions)
- Hybrid: version-based with min/max bounds
### 2. State Capture
Add method to capture actor state:
```go
// CaptureState rebuilds actor state by replaying events and returns it
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
```
### 3. Snapshot Store Extension
Extend `EventStoreWithErrors` to include snapshots:
```go
type EventStoreWithSnapshots interface {
EventStoreWithErrors
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
SaveSnapshot(snapshot *ActorSnapshot) error
}
```
### 4. Snapshot Workflow
Modify event retrieval to use snapshots:
```go
GetEvents(actorID string, fromVersion int64) ([]*Event, error) {
// 1. Try to get latest snapshot
snapshot, _ := store.GetLatestSnapshot(actorID)
// 2. If snapshot exists and version <= fromVersion:
// - Return events from snapshot version + 1
// 3. Else:
// - Replay all events from version 0
}
```
## Suggested Implementation
### 1. Add CaptureState to EventStore interface
In `event.go`, extend `EventStore` or create `StateStore` interface:
```go
type StateStore interface {
EventStore
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
}
```
### 2. Implement CaptureState
In `store/jetstream.go`:
```go
func (jes *JetStreamEventStore) CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) {
// Replay events and build state (application logic needed here)
events, _ := jes.GetEvents(actorID, fromVersion)
// Need application logic to convert events to state
return state, nil
}
```
### 3. Add Snapshot Helper
Create snapshot utilities:
```go
// CreateSnapshot creates snapshot from state
func CreateSnapshot(actorID string, version int64, state map[string]interface{}) *ActorSnapshot {
return &ActorSnapshot{
ActorID: actorID,
Version: version,
State: state,
Timestamp: time.Now(),
}
}
```
### 4. Modify GetEvents
Update `GetEvents` in both stores to use snapshots when beneficial.
## Snapshots Workflow Example
```
1. Actor has 1000 events
2. Every 100 events, create snapshot
3. Actor reaches version 1000, snapshot at version 1000
4. Request events from version 900:
- Get snapshot at version 1000? No (version too high)
- Replay 900->1000 events (only 100 events)
5. Request events from version 50:
- Get latest snapshot at version 1000? Yes (version > 50)
- Use snapshot as base
- Replay 1000->1000 events (none)
```
## Acceptance Criteria
- [ ] `CaptureState` method added to event store
- [ ] Snapshots created at configured intervals
- [ ] `GetEvents` uses snapshots to optimize replay
- [ ] Snapshot workflow tested with long-lived actors
- [ ] Configuration for snapshot interval/version
- [ ] Metrics: snapshot count, average replay size

View File

@@ -1,100 +0,0 @@
# Issue: Implement VM/Runtime for Actors
## Problem
Only interfaces exist for `Runtime` and `VirtualMachine` in `cluster/types.go` and `cluster/distributed.go`, but no actual implementation. Actors cannot be created, started, stopped, or have their state managed.
## Required Components
### 1. VM Implementation (cluster/vm.go - new)
```go
type VirtualMachine struct {
actorID string
eventStore aether.EventStore
state map[string]interface{}
version int64
}
```
Methods needed:
- `GetID()`, `GetActorID()`, `GetState()` - already in interface
- `Start()` - replay events to rebuild state
- `ProcessEvent(event *aether.Event)` - apply event to state
- `Stop()` - persist final state
- `GetVersion()` - current event version
### 2. Runtime Implementation (cluster/runtime.go - new)
```go
type Runtime struct {
natsConn *nats.Conn
eventStore aether.EventStore
vmRegistry VMRegistry // map[actorID]*VirtualMachine
config RuntimeConfig
}
```
Methods needed:
- `Start()` - initialize and start processing
- `LoadModel(model eventstorming.Model)` - register domain types
- `SendMessage(message RuntimeMessage)` - route to appropriate VM
- `GetActiveVMs()` - return map of active VMs
- `CreateVM(actorID string)` - create new VM instance
- `StopVM(actorID string)` - persist and stop VM
### 3. Event Processing
- Subscribe to actor's event stream
- Replay events to build initial state
- Apply new events as they arrive
- Handle event versions and conflicts
## Suggested Design
### VM Lifecycle
```
1. Actor message arrives for actor-123
2. Runtime checks if VM exists for actor-123
3. If not, create VM:
- Replay events from event store
- Rebuild state
4. Route message to VM
5. VM processes message -> creates new events
6. Events persisted to event store
7. VM state updated
```
### State Management
- State derived from event replay
- No separate state store needed
- Can snapshot periodically for performance
- Version conflict handling using existing EventStore
## Implementation Steps
1. **Create VM struct** in `cluster/vm.go`
2. **Implement event replay** to rebuild state
3. **Create Runtime** in `cluster/runtime.go`
4. **Register Runtime with cluster** via `SetVMProvider`
5. **Implement message processing** - validate against model
6. **Add version conflict handling** using existing EventStore
7. **Write tests** - mock event store, test state transitions
## File Structure
```
cluster/
├── vm.go # VirtualMachine implementation
├── runtime.go # Runtime implementation
├── vm_test.go # VM tests
├── runtime_test.go # Runtime tests
└── integration_test.go # Integration tests
```
## Acceptance Criteria
- [ ] VM can be created with actor ID
- [ ] VM replays events to build state
- [ ] VM processes events and updates state
- [ ] VM persists current version
- [ ] Runtime can create/stop VMs
- [ ] Runtime manages VM registry
- [ ] Integration test with NATS and JetStream

View File

@@ -107,34 +107,7 @@ Order state after replaying 2 events:
### Events are immutable
Events represent facts about what happened. Once saved, they are never modified or deleted - you only append new events. This immutability guarantee is enforced at multiple levels:
**Interface Design**: The `EventStore` interface provides no Update or Delete methods. Only `SaveEvent` (append), `GetEvents` (read), and `GetLatestVersion` (read) are available.
**JetStream Storage**: When using `JetStreamEventStore`, events are stored in a NATS JetStream stream configured with:
- File-based storage (durable)
- Limits-based retention policy (events expire after configured duration, not before)
- No mechanism to modify or delete individual events during their lifetime
**Audit Trail Guarantee**: Because events are immutable once persisted, they serve as a trustworthy audit trail. You can rely on the fact that historical events won't change, enabling compliance and forensics.
To correct a mistake, append a new event that expresses the correction rather than modifying history:
```go
// Wrong: Cannot update an event
// store.UpdateEvent(eventID, newData) // This method doesn't exist
// Right: Append a new event that corrects the record
correctionEvent := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderCorrected",
ActorID: orderID,
Version: currentVersion + 1,
Data: map[string]interface{}{"reason": "price adjustment"},
Timestamp: time.Now(),
}
err := store.SaveEvent(correctionEvent)
```
Events represent facts about what happened. Once saved, they are never modified - you only append new events.
### State is derived

View File

@@ -184,17 +184,6 @@ type ActorSnapshot struct {
// EventStore defines the interface for event persistence.
//
// # Immutability Guarantee
//
// EventStore is append-only. Once an event is persisted via SaveEvent, it is never
// modified or deleted. The interface intentionally provides no Update or Delete methods.
// This ensures:
// - Events serve as an immutable audit trail
// - State can be safely derived by replaying events
// - Concurrent reads are always safe (events never change)
//
// To correct a mistake, append a new event that expresses the correction.
//
// # Version Semantics
//
// Events for an actor must have monotonically increasing versions. When SaveEvent
@@ -215,13 +204,10 @@ type EventStore interface {
// SaveEvent persists an event to the store. The event's Version must be
// strictly greater than the current latest version for the actor.
// Returns VersionConflictError if version <= current latest version.
// Once saved, the event is immutable and can never be modified or deleted.
SaveEvent(event *Event) error
// GetEvents retrieves events for an actor from a specific version (inclusive).
// Returns an empty slice if no events exist for the actor.
// The returned events are guaranteed to be immutable - they will never be
// modified or deleted from the store.
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
// GetLatestVersion returns the latest version for an actor.

View File

@@ -2,8 +2,6 @@ package aether
import (
"encoding/json"
"errors"
"fmt"
"strings"
"testing"
"time"
@@ -1337,190 +1335,3 @@ func TestReplayError_WithLargeRawData(t *testing.T) {
// Error() should still work
_ = err.Error()
}
// Tests for VersionConflictError
func TestVersionConflictError_Error(t *testing.T) {
err := &VersionConflictError{
ActorID: "order-123",
AttemptedVersion: 3,
CurrentVersion: 5,
}
errMsg := err.Error()
// Verify error message contains all context
if !strings.Contains(errMsg, "order-123") {
t.Errorf("error message should contain ActorID, got: %s", errMsg)
}
if !strings.Contains(errMsg, "3") {
t.Errorf("error message should contain AttemptedVersion, got: %s", errMsg)
}
if !strings.Contains(errMsg, "5") {
t.Errorf("error message should contain CurrentVersion, got: %s", errMsg)
}
if !strings.Contains(errMsg, "version conflict") {
t.Errorf("error message should contain 'version conflict', got: %s", errMsg)
}
}
func TestVersionConflictError_Fields(t *testing.T) {
err := &VersionConflictError{
ActorID: "actor-456",
AttemptedVersion: 10,
CurrentVersion: 8,
}
if err.ActorID != "actor-456" {
t.Errorf("ActorID mismatch: got %q, want %q", err.ActorID, "actor-456")
}
if err.AttemptedVersion != 10 {
t.Errorf("AttemptedVersion mismatch: got %d, want %d", err.AttemptedVersion, 10)
}
if err.CurrentVersion != 8 {
t.Errorf("CurrentVersion mismatch: got %d, want %d", err.CurrentVersion, 8)
}
}
func TestVersionConflictError_Unwrap(t *testing.T) {
err := &VersionConflictError{
ActorID: "actor-789",
AttemptedVersion: 2,
CurrentVersion: 1,
}
unwrapped := err.Unwrap()
if unwrapped != ErrVersionConflict {
t.Errorf("Unwrap should return ErrVersionConflict sentinel")
}
}
func TestVersionConflictError_ErrorsIs(t *testing.T) {
err := &VersionConflictError{
ActorID: "test-actor",
AttemptedVersion: 5,
CurrentVersion: 4,
}
// Test that errors.Is works with sentinel
if !errors.Is(err, ErrVersionConflict) {
t.Error("errors.Is(err, ErrVersionConflict) should return true")
}
// Test that other errors don't match
if errors.Is(err, errors.New("other error")) {
t.Error("errors.Is should not match unrelated errors")
}
}
func TestVersionConflictError_ErrorsAs(t *testing.T) {
originalErr := &VersionConflictError{
ActorID: "actor-unwrap",
AttemptedVersion: 7,
CurrentVersion: 6,
}
var versionErr *VersionConflictError
if !errors.As(originalErr, &versionErr) {
t.Fatalf("errors.As should succeed with VersionConflictError")
}
// Verify fields are accessible through unwrapped error
if versionErr.ActorID != "actor-unwrap" {
t.Errorf("ActorID mismatch after As: got %q", versionErr.ActorID)
}
if versionErr.AttemptedVersion != 7 {
t.Errorf("AttemptedVersion mismatch after As: got %d", versionErr.AttemptedVersion)
}
if versionErr.CurrentVersion != 6 {
t.Errorf("CurrentVersion mismatch after As: got %d", versionErr.CurrentVersion)
}
}
func TestVersionConflictError_CanReadCurrentVersion(t *testing.T) {
// This test verifies that applications can read CurrentVersion for retry strategies
err := &VersionConflictError{
ActorID: "order-abc",
AttemptedVersion: 2,
CurrentVersion: 10,
}
var versionErr *VersionConflictError
if !errors.As(err, &versionErr) {
t.Fatal("failed to unwrap VersionConflictError")
}
// Application can use CurrentVersion to decide retry strategy
nextVersion := versionErr.CurrentVersion + 1
if nextVersion != 11 {
t.Errorf("application should be able to compute next version: got %d, want 11", nextVersion)
}
// Application can log detailed context
logMsg := fmt.Sprintf("Version conflict for actor %q: attempted %d, current %d, will retry with %d",
versionErr.ActorID, versionErr.AttemptedVersion, versionErr.CurrentVersion, nextVersion)
if !strings.Contains(logMsg, "order-abc") {
t.Errorf("application context logging failed: %s", logMsg)
}
}
func TestVersionConflictError_EdgeCases(t *testing.T) {
testCases := []struct {
name string
actorID string
attemp int64
current int64
}{
{"zero current", "actor-1", 1, 0},
{"large numbers", "actor-2", 1000000, 999999},
{"max int64", "actor-3", 9223372036854775807, 9223372036854775806},
{"negative attempt", "actor-4", -1, -2},
{"empty actor id", "", 1, 0},
{"special chars in actor id", "actor@#$%", 2, 1},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := &VersionConflictError{
ActorID: tc.actorID,
AttemptedVersion: tc.attemp,
CurrentVersion: tc.current,
}
// Should not panic
msg := err.Error()
if msg == "" {
t.Error("Error() should return non-empty string")
}
// Should be wrapped correctly
if err.Unwrap() != ErrVersionConflict {
t.Error("Unwrap should return ErrVersionConflict")
}
// errors.Is should work
if !errors.Is(err, ErrVersionConflict) {
t.Error("errors.Is should work for edge case")
}
})
}
}
func TestErrVersionConflict_Sentinel(t *testing.T) {
// Verify the sentinel error is correctly defined
if ErrVersionConflict == nil {
t.Fatal("ErrVersionConflict should not be nil")
}
expectedMsg := "version conflict"
if ErrVersionConflict.Error() != expectedMsg {
t.Errorf("ErrVersionConflict message mismatch: got %q, want %q", ErrVersionConflict.Error(), expectedMsg)
}
// Test that it's usable with errors.Is
if !errors.Is(ErrVersionConflict, ErrVersionConflict) {
t.Error("ErrVersionConflict should match itself with errors.Is")
}
}

View File

@@ -1,189 +0,0 @@
# Aether Examples
This directory contains examples demonstrating common patterns for using Aether.
## Retry Patterns (`retry_patterns.go`)
When saving events with optimistic concurrency control, your application may encounter `VersionConflictError` when multiple writers attempt to update the same actor concurrently. This file demonstrates several retry strategies.
### Pattern Overview
All retry patterns work with `VersionConflictError` which provides three critical fields:
- **ActorID**: The actor that experienced the conflict
- **CurrentVersion**: The latest version in the store
- **AttemptedVersion**: The version you tried to save
Your application can read these fields to make intelligent retry decisions.
### Available Patterns
#### SimpleRetryPattern
The most basic pattern - just retry with exponential backoff:
```go
// Automatically retries up to 3 times with exponential backoff
err := SimpleRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You want a straightforward retry mechanism without complex logic.
#### ConflictDetailedRetryPattern
Extracts detailed information from the conflict error to make smarter decisions:
```go
// Detects thrashing (multiple conflicts at same version)
// and can implement circuit-breaker logic
err := ConflictDetailedRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You need visibility into conflict patterns and want to detect system issues like thrashing.
#### JitterRetryPattern
Adds randomized jitter to prevent "thundering herd" when multiple writers retry:
```go
// Exponential backoff with jitter prevents synchronized retries
err := JitterRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You have high concurrency and want to prevent all writers from retrying at the same time.
#### AdaptiveRetryPattern
Adjusts backoff duration based on version distance (indicator of contention):
```go
// Light contention (gap=1): 50ms backoff
// Moderate contention (gap=3-10): proportional backoff
// High contention (gap>10): aggressive backoff
err := AdaptiveRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You want backoff strategy to respond to actual system load.
#### EventualConsistencyPattern
Instead of blocking on retry, queues the event for asynchronous retry:
```go
// Returns immediately, event is queued for background retry
EventualConsistencyPattern(store, retryQueue, event)
// Background worker processes the queue
for item := range retryQueue {
// Implement your own retry logic here
}
```
**Use when**: You can't afford to block the request, and background retry is acceptable.
#### CircuitBreakerPattern
Implements a circuit breaker to prevent cascading failures:
```go
cb := NewCircuitBreaker()
// Fails fast when circuit is open
err := CircuitBreakerRetryPattern(store, cb, "order-123", "OrderUpdated")
if err != nil && !cb.CanRetry() {
return ErrCircuitBreakerOpen
}
```
**Use when**: You have a distributed system and want to prevent retry storms during outages.
## Common Pattern: Extract and Log Context
All patterns can read context from `VersionConflictError`:
```go
var versionErr *aether.VersionConflictError
if errors.As(err, &versionErr) {
log.Printf(
"Conflict for actor %q: attempted %d, current %d",
versionErr.ActorID,
versionErr.AttemptedVersion,
versionErr.CurrentVersion,
)
}
```
## Sentinel Error Check
Check if an error is a version conflict without examining the struct:
```go
if errors.Is(err, aether.ErrVersionConflict) {
// This is a version conflict - retry is appropriate
}
```
## Implementing Your Own Pattern
Basic template:
```go
for attempt := 0; attempt < maxRetries; attempt++ {
// 1. Get current version
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
// 2. Create event with next version
event := &aether.Event{
ActorID: actorID,
Version: currentVersion + 1,
// ... other fields
}
// 3. Attempt save
err = store.SaveEvent(event)
if err == nil {
return nil // Success
}
// 4. Check if it's a conflict
if !errors.Is(err, aether.ErrVersionConflict) {
return err // Some other error
}
// 5. Implement your retry strategy
time.Sleep(yourBackoff(attempt))
}
```
## Choosing a Pattern
| Pattern | Latency | Throughput | Complexity | Use Case |
|---------|---------|-----------|-----------|----------|
| Simple | Low | Low | Very Low | Single writer, testing |
| DetailedConflict | Low | Medium | Medium | Debugging, monitoring |
| Jitter | Low-Medium | High | Low | Multi-writer concurrency |
| Adaptive | Low-Medium | High | Medium | Variable load scenarios |
| EventualConsistency | Very Low | Very High | High | High-volume, async-OK workloads |
| CircuitBreaker | Variable | Stable | High | Distributed, failure-resilient systems |
## Performance Considerations
1. **Backoff timing**: Shorter backoffs waste CPU on retries, longer backoffs increase latency
2. **Retry limits**: Too few retries give up too early, too many waste resources
3. **Jitter**: Essential for preventing synchronized retries in high-concurrency scenarios
4. **Monitoring**: Track retry rates and conflict patterns to detect system issues
## Testing
Use `aether.NewInMemoryEventStore()` in tests:
```go
store := store.NewInMemoryEventStore()
err := SimpleRetryPattern(store, "test-actor", "TestEvent")
if err != nil {
t.Fatalf("retry pattern failed: %v", err)
}
```

View File

@@ -1,168 +0,0 @@
// Package main demonstrates cross-node event broadcasting using NATSEventBus
// and JetStreamEventStore for cluster synchronization.
//
// This example shows:
// 1. Setting up NATSEventBus with JetStreamEventStore
// 2. Broadcasting events across NATS for cross-node distribution
// 3. Subscribing to EventStored events for version cache synchronization
// 4. Properly handling EventStored events from other cluster nodes
//
// Prerequisites:
// - NATS server running with JetStream enabled (nats-server -js)
// - Events stream created in JetStream
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"git.flowmade.one/flowmade-one/aether"
"git.flowmade.one/flowmade-one/aether/store"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)
func main() {
natsURL := getEnv("NATS_URL", "nats://localhost:4222")
nc, err := nats.Connect(natsURL)
if err != nil {
log.Fatal("Failed to connect to NATS:", err)
}
defer nc.Close()
ctx := context.Background()
store1, err := store.NewJetStreamEventStore(nc, "events")
if err != nil {
log.Fatal("Failed to create event store:", err)
}
eventBus1 := aether.NewNATSEventBusWithBroadcaster(nc, store1, "")
defer eventBus1.Stop()
store2, err := store.NewJetStreamEventStore(nc, "events")
if err != nil {
log.Fatal("Failed to create event store:", err)
}
eventBus2 := aether.NewNATSEventBusWithBroadcaster(nc, store2, "")
defer eventBus2.Stop()
eventStoredCh1 := eventBus1.SubscribeToEventStored("*")
eventStoredCh2 := eventBus2.SubscribeToEventStored("*")
done := make(chan struct{})
go processEvents(ctx, eventStoredCh1, store1, done)
go processEvents(ctx, eventStoredCh2, store2, done)
go func() {
time.Sleep(2 * time.Second)
actorID := "demo-actor"
event1 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{
"total": 99.99,
"status": "pending",
},
Timestamp: time.Now(),
}
log.Printf("Node 1 publishing event: %s", event1.EventType)
eventBus1.Publish("", event1)
time.Sleep(500 * time.Millisecond)
event2 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPaid",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{
"total": 99.99,
"status": "paid",
"method": "credit_card",
},
Timestamp: time.Now(),
}
log.Printf("Node 2 publishing event: %s", event2.EventType)
eventBus2.Publish("", event2)
time.Sleep(2 * time.Second)
close(done)
log.Println("Cross-node broadcasting demo complete")
}()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigCh:
log.Println("Shutting down...")
case <-done:
}
}
func processEvents(ctx context.Context, eventStoredCh <-chan *aether.Event, eventStore *store.JetStreamEventStore, done chan struct{}) {
for {
select {
case <-done:
return
case <-ctx.Done():
return
case event, ok := <-eventStoredCh:
if !ok {
return
}
if event == nil {
continue
}
if event.EventType != aether.EventTypeEventStored {
continue
}
actorID, ok := event.Data["actorId"].(string)
if !ok {
log.Printf("Warning: EventStored missing actorId")
continue
}
version, ok := event.Data["version"].(int64)
if !ok {
log.Printf("Warning: EventStored missing version")
continue
}
eventID, _ := event.Data["eventId"].(string)
log.Printf("Received EventStored: actor=%s, version=%d, eventId=%s", actorID, version, eventID)
eventStore.UpdateVersionCache(actorID, version)
currentVersion, _ := eventStore.GetLatestVersion(actorID)
log.Printf("Updated cache: %s now has version %d (cached: %d)", actorID, version, currentVersion)
}
}
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"github.com/google/uuid"
@@ -25,8 +24,6 @@ type NATSEventBus struct {
subscriptions []*nats.Subscription
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
nodeID string // Unique ID for this node
streamPrefix string // NATS subject prefix for events
eventStore interface{} // Optional event store for version cache sync (jetstream.JetStreamEventStore)
mutex sync.Mutex
ctx context.Context
cancel context.CancelFunc
@@ -49,7 +46,6 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0),
patternSubscribers: make(map[string]int),
streamPrefix: "aether",
ctx: ctx,
cancel: cancel,
}
@@ -57,43 +53,6 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
return neb, nil
}
// NewNATSEventBusWithBroadcaster creates a new NATS-backed event bus with JetStreamEventStore integration.
// The event store is used to automatically update version cache when EventStored events are received
// from other cluster nodes via NATS. This ensures cross-node version consistency.
//
// Example:
//
// eventBus := aether.NewNATSEventBusWithBroadcaster(natsConn, store, "tenant-abc")
// ch := eventBus.SubscribeToEventStored("tenant-*")
// for event := range ch {
// actorID := event.Data["actorId"].(string)
// version := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
//
// The namespace parameter is used as a prefix for EventStored event filtering.
// If empty, EventStored events from all namespaces will be received (requires wildcard pattern).
func NewNATSEventBusWithBroadcaster(nc *nats.Conn, store interface{}, namespace string) *NATSEventBus {
streamPrefix := "aether"
if namespace != "" {
streamPrefix = fmt.Sprintf("aether.%s", sanitizeSubject(namespace))
}
neb := &NATSEventBus{
EventBus: NewEventBus(),
nc: nc,
nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0),
patternSubscribers: make(map[string]int),
streamPrefix: streamPrefix,
eventStore: store,
ctx: context.Background(),
cancel: func() {},
}
return neb
}
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
// Supports NATS subject patterns:
// - "*" matches a single token
@@ -269,103 +228,3 @@ func (neb *NATSEventBus) Stop() {
log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID)
}
// sanitizeSubject sanitizes a string for use in NATS subjects
func sanitizeSubject(s string) string {
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, ".", "_")
s = strings.ReplaceAll(s, "*", "_")
s = strings.ReplaceAll(s, ">", "_")
return s
}
// extractActorType extracts the actor type from an actor ID
func extractActorType(actorID string) string {
for i, c := range actorID {
if c == '-' && i > 0 {
return actorID[:i]
}
}
return "unknown"
}
// SubscribeToEventStored creates a subscription to EventStored events for a namespace pattern.
// EventStored events are published by JetStreamEventStore when events are successfully saved.
// This is useful for cross-node event synchronization and version cache consistency.
//
// The returned channel receives EventStored events matching the pattern.
// The EventStored event schema:
// - EventType: "EventStored"
// - ActorID: ID of the actor that the original event was about
// - Version: version of the stored event
// - Data:
// - eventId: (string) ID of the stored event
// - actorId: (string) ID of the actor
// - version: (int64) version of the event
// - timestamp: (int64) Unix timestamp of when the event was stored
//
// The namespacePattern supports NATS wildcards:
// - "*" matches a single token
// - ">" matches one or more tokens (only at the end)
//
// Example:
//
// ch := eventBus.SubscribeToEventStored("tenant-*")
// for event := range ch {
// if event.EventType != aether.EventTypeEventStored {
// continue
// }
// actorID := event.Data["actorId"].(string)
// version, _ := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
//
// Security Warning: Using wildcard patterns like ">" will receive EventStored events
// from all namespaces. Ensure your application handles this appropriately.
func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event {
neb.mutex.Lock()
defer neb.mutex.Unlock()
subject := fmt.Sprintf("%s.%s.%s", neb.streamPrefix, namespacePattern, "events.>")
ch := make(chan *Event, 100)
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
var eventMsg eventMessage
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
log.Printf("[NATSEventBus] Failed to unmarshal EventStored event: %v", err)
return
}
if eventMsg.NodeID == neb.nodeID {
return
}
if eventMsg.Event.EventType == EventTypeEventStored && neb.eventStore != nil {
actorID, ok := eventMsg.Event.Data["actorId"].(string)
if !ok {
return
}
version, ok := eventMsg.Event.Data["version"].(int64)
if !ok {
return
}
// Use type assertion to call UpdateVersionCache
if es, ok := neb.eventStore.(interface{ UpdateVersionCache(string, int64) }); ok {
es.UpdateVersionCache(actorID, version)
}
}
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
})
if err != nil {
log.Printf("[NATSEventBus] Failed to subscribe to EventStored: %v", err)
close(ch)
return ch
}
neb.subscriptions = append(neb.subscriptions, sub)
return ch
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +0,0 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": [
"config:recommended"
]
}

View File

@@ -1,215 +0,0 @@
package store
import (
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
)
// TestEventImmutability_MemoryStore verifies that events cannot be modified after persistence
// in the in-memory event store. This demonstrates the append-only nature of event sourcing.
func TestEventImmutability_MemoryStore(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "test-actor-123"
// Create and save an event
originalEvent := &aether.Event{
ID: "evt-immutable-1",
EventType: "TestEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{
"value": "original",
},
Timestamp: time.Now(),
}
err := store.SaveEvent(originalEvent)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Retrieve the event from the store
events, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) == 0 {
t.Fatal("expected 1 event, got 0")
}
retrievedEvent := events[0]
// Verify the stored event has the correct values
if retrievedEvent.Data["value"] != "original" {
t.Errorf("Data value mismatch: got %v, want %v", retrievedEvent.Data["value"], "original")
}
if retrievedEvent.EventType != "TestEvent" {
t.Errorf("EventType mismatch: got %q, want %q", retrievedEvent.EventType, "TestEvent")
}
// Verify ID is correct
if retrievedEvent.ID != "evt-immutable-1" {
t.Errorf("Event ID mismatch: got %q, want %q", retrievedEvent.ID, "evt-immutable-1")
}
}
// TestEventImmutability_NoUpdateMethod verifies that the EventStore interface
// has only append, read methods - no Update or Delete methods.
func TestEventImmutability_NoUpdateMethod(t *testing.T) {
// This test documents that the EventStore interface is append-only.
// The interface intentionally provides:
// - SaveEvent: append only
// - GetEvents: read only
// - GetLatestVersion: read only
//
// To verify this, we demonstrate that any attempt to call non-existent
// update/delete methods would be caught at compile time (not runtime).
// This is enforced by the interface definition in event.go which does
// not include Update, Delete, or Modify methods.
store := NewInMemoryEventStore()
// Compile-time check: these would not compile if we tried them:
// store.Update(event) // compile error: no such method
// store.Delete(eventID) // compile error: no such method
// store.Modify(eventID, newData) // compile error: no such method
// Only these methods exist:
var eventStore aether.EventStore = store
if eventStore == nil {
t.Fatal("eventStore is nil")
}
// If we got here, the compile-time checks passed
t.Log("EventStore interface enforces append-only semantics by design")
}
// TestEventImmutability_VersionOnlyGoesUp verifies that versions are monotonically
// increasing and attempting to save with a non-increasing version fails.
func TestEventImmutability_VersionOnlyGoesUp(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "actor-version-check"
// Save first event with version 1
event1 := &aether.Event{
ID: "evt-v1",
EventType: "Event1",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event1)
if err != nil {
t.Fatalf("SaveEvent(v1) failed: %v", err)
}
// Try to save with same version - should fail
event2Same := &aether.Event{
ID: "evt-v1-again",
EventType: "Event2",
ActorID: actorID,
Version: 1, // Same version
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2Same)
if err == nil {
t.Error("expected SaveEvent(same version) to fail, but it succeeded")
}
// Try to save with lower version - should fail
event3Lower := &aether.Event{
ID: "evt-v0",
EventType: "Event3",
ActorID: actorID,
Version: 0, // Lower version
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event3Lower)
if err == nil {
t.Error("expected SaveEvent(lower version) to fail, but it succeeded")
}
// Save with next version - should succeed
event4Next := &aether.Event{
ID: "evt-v2",
EventType: "Event4",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event4Next)
if err != nil {
t.Fatalf("SaveEvent(v2) failed: %v", err)
}
// Verify we have exactly 2 events
events, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 2 {
t.Errorf("expected 2 events, got %d", len(events))
}
}
// TestEventImmutability_EventCannotBeDeleted verifies that there is no way to delete
// events from the store through the EventStore interface.
func TestEventImmutability_EventCannotBeDeleted(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "actor-nodelete"
// Save an event
event := &aether.Event{
ID: "evt-nodelete",
EventType: "ImportantEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"critical": true},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Retrieve it
events1, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents (1) failed: %v", err)
}
if len(events1) != 1 {
t.Fatal("expected 1 event after save")
}
// Try to delete through interface - this method doesn't exist
// store.Delete("evt-nodelete") // compile error: no such method
// store.DeleteByActorID(actorID) // compile error: no such method
// Verify the event is still there (we can't delete it)
events2, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents (2) failed: %v", err)
}
if len(events2) != 1 {
t.Errorf("expected 1 event (should not be deletable), got %d", len(events2))
}
if events2[0].ID != "evt-nodelete" {
t.Errorf("event ID changed: got %q, want %q", events2[0].ID, "evt-nodelete")
}
}

View File

@@ -1,431 +0,0 @@
//go:build integration
package store
import (
"context"
"log"
"os"
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats-server/v2/server"
)
func setupNatsServer() (*server.Server, *nats.Conn, func()) {
opts := &server.Options{
Port: -1,
JetStream: true,
StoreDir: "/tmp/nats-test-" + time.Now().Format("20060102150405"),
}
s, err := server.NewServer(opts)
if err != nil {
log.Fatal("Failed to create NATS server:", err)
}
go s.Start()
if !s.ReadyForConnections(4 * time.Second) {
log.Fatal("NATS server failed to start")
}
nc, err := nats.Connect(s.ClientURL())
if err != nil {
s.Shutdown()
log.Fatal("Failed to connect to NATS:", err)
}
return s, nc, func() {
nc.Close()
s.Shutdown()
os.RemoveAll(opts.StoreDir)
}
}
func TestUpdateVersionCache(t *testing.T) {
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_update_cache")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
actorID := "test-actor-1"
tests := []struct {
name string
cachedVersion int64
newVersion int64
expectUpdate bool
expectVersion int64
}{
{
name: "update when new version is greater",
cachedVersion: 5,
newVersion: 10,
expectUpdate: true,
expectVersion: 10,
},
{
name: "do not update when new version is equal",
cachedVersion: 5,
newVersion: 5,
expectUpdate: false,
expectVersion: 5,
},
{
name: "do not update when new version is less",
cachedVersion: 10,
newVersion: 5,
expectUpdate: false,
expectVersion: 10,
},
{
name: "update when no cached version exists",
cachedVersion: 0,
newVersion: 1,
expectUpdate: true,
expectVersion: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Set up cached version
store.versions = make(map[string]int64)
store.versions[actorID] = tt.cachedVersion
// Call UpdateVersionCache
store.UpdateVersionCache(actorID, tt.newVersion)
// Verify result
if tt.expectUpdate {
if version, ok := store.versions[actorID]; !ok {
t.Error("Expected version to be updated but it wasn't cached")
} else if version != tt.expectVersion {
t.Errorf("Expected version %d, got %d", tt.expectVersion, version)
}
} else {
if version, ok := store.versions[actorID]; !ok {
t.Error("Expected version to remain cached")
} else if version != tt.expectVersion {
t.Errorf("Expected version to remain %d, got %d", tt.expectVersion, version)
}
}
})
}
}
func TestUpdateVersionCache_Concurrent(t *testing.T) {
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_update_cache_concurrent")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
actorID := "concurrent-actor"
store.versions[actorID] = 1
const numGoroutines = 50
const maxVersion = 100
var done = make(chan struct{})
var updates int32
for i := 0; i < numGoroutines; i++ {
version := int64(1 + (i % maxVersion))
go func(v int64) {
store.UpdateVersionCache(actorID, v)
select {
case <-done:
default:
updates++
}
}(version)
}
close(done)
time.Sleep(100 * time.Millisecond)
finalVersion := store.versions[actorID]
if finalVersion > maxVersion {
t.Errorf("Expected version to be at most %d, got %d", maxVersion, finalVersion)
}
}
func TestSubscribeToEventStored(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_subscribe_event_stored")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
eventBusWithStore := NewNATSEventBusWithBroadcaster(nc, store, "")
if eventBusWithStore == nil {
t.Fatalf("Failed to create event bus with broadcaster")
}
defer eventBusWithStore.Stop()
ch := eventBusWithStore.SubscribeToEventStored("*")
if ch == nil {
t.Fatal("SubscribeToEventStored returned nil channel")
}
actorID := "subscribe-test-actor"
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TestEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"key": "value"},
Timestamp: time.Now(),
}
eventBusWithStore.Publish("", event)
select {
case receivedEvent := <-ch:
if receivedEvent.EventType != aether.EventTypeEventStored {
t.Errorf("Expected EventTypeEventStored, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
data, ok := receivedEvent.Data["actorId"].(string)
if !ok || data != actorID {
t.Errorf("Expected actorId in data to be %s", actorID)
}
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for EventStored event")
}
}
func TestCrossNodeBroadcasting_SingleNode(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_single_node_broadcast")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
defer eventBus.Stop()
actorID := "broadcast-test-actor-1"
localCh := eventBus.Subscribe("")
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"total": 99.99},
Timestamp: time.Now(),
}
eventBus.Publish("", event)
select {
case receivedEvent := <-localCh:
if receivedEvent.EventType != "OrderPlaced" {
t.Errorf("Expected OrderPlaced, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for broadcast event")
}
}
func TestCrossNodeBroadcasting_MultiNode(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s1, nc1, cleanup1 := setupNatsServer()
defer cleanup1()
s2, nc2, cleanup2 := setupNatsServer()
defer cleanup2()
ctx := context.Background()
store1, err := NewJetStreamEventStore(nc1, "test_multi_node_1")
if err != nil {
t.Fatalf("Failed to create store 1: %v", err)
}
store2, err := NewJetStreamEventStore(nc2, "test_multi_node_2")
if err != nil {
t.Fatalf("Failed to create store 2: %v", err)
}
eventBus1 := NewNATSEventBusWithBroadcaster(nc1, store1, "")
eventBus2 := NewNATSEventBusWithBroadcaster(nc2, store2, "")
defer eventBus1.Stop()
defer eventBus2.Stop()
actorID := "multi-node-actor"
receiverCh := eventBus2.Subscribe("")
event := &aether.Event{
ID: uuid.New().String(),
EventType: "InventoryReserved",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"quantity": 5},
Timestamp: time.Now(),
}
eventBus1.Publish("", event)
select {
case receivedEvent := <-receiverCh:
if receivedEvent.EventType != "InventoryReserved" {
t.Errorf("Expected InventoryReserved, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
case <-time.After(3 * time.Second):
t.Fatal("Timeout waiting for cross-node event")
}
}
func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
tenantAStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-a")
if err != nil {
t.Fatalf("Failed to create tenant A store: %v", err)
}
tenantBStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-b")
if err != nil {
t.Fatalf("Failed to create tenant B store: %v", err)
}
tenantAEventBus := NewNATSEventBusWithBroadcaster(nc, tenantAStore, "tenant-a")
tenantBEventBus := NewNATSEventBusWithBroadcaster(nc, tenantBStore, "tenant-b")
defer tenantAEventBus.Stop()
defer tenantBEventBus.Stop()
tenantACh := tenantAEventBus.Subscribe("tenant-a")
tenantBCh := tenantBEventBus.Subscribe("tenant-b")
actorID := "tenant-actor"
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TenantEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"data": "tenant-a"},
Timestamp: time.Now(),
}
tenantAEventBus.Publish("tenant-a", event)
select {
case receivedEvent := <-tenantACh:
if receivedEvent.EventType != "TenantEvent" {
t.Errorf("Expected TenantEvent in tenant A, got %s", receivedEvent.EventType)
}
case <-time.After(2 * time.Second):
t.Error("Timeout waiting for tenant A to receive event")
}
select {
case <-tenantBCh:
t.Error("Tenant B should not receive tenant A's events")
case <-time.After(1 * time.Second):
// Expected - tenant B should not receive events from tenant A
}
}
func TestUpdateVersionCache_EventStored(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_version_cache_eventstored")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
defer eventBus.Stop()
actorID := "version-cache-actor"
store.UpdateVersionCache(actorID, 5)
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TestEvent",
ActorID: actorID,
Version: 10,
Data: map[string]interface{}{"test": true},
Timestamp: time.Now(),
}
eventBus.Publish("", event)
time.Sleep(100 * time.Millisecond)
storedVersion, err := store.GetLatestVersion(actorID)
if err != nil {
t.Fatalf("Failed to get latest version: %v", err)
}
if storedVersion != 10 {
t.Errorf("Expected version 10, got %d", storedVersion)
}
cacheVersion, ok := store.GetCachedVersion(actorID)
if !ok {
t.Error("Expected version to be in cache")
} else if cacheVersion != 10 {
t.Errorf("Expected cached version 10, got %d", cacheVersion)
}
}

View File

@@ -1,7 +1,6 @@
package store
import (
"context"
"encoding/json"
"fmt"
"strings"
@@ -21,14 +20,7 @@ const (
// JetStreamConfig holds configuration options for JetStreamEventStore
type JetStreamConfig struct {
// StreamRetention is how long to keep events (default: 1 year).
// JetStream enforces this retention policy at the storage level using a limits-based policy:
// - MaxAge: Events older than this duration are automatically deleted
// - Storage is file-based (nats.FileStorage) for durability
// - Once the retention period expires, events are permanently removed from the stream
// This ensures that old events do not consume storage indefinitely.
// To keep events indefinitely, set StreamRetention to a very large value or configure
// a custom retention policy in the JetStream stream configuration.
// StreamRetention is how long to keep events (default: 1 year)
StreamRetention time.Duration
// ReplicaCount is the number of replicas for high availability (default: 1)
ReplicaCount int
@@ -50,21 +42,6 @@ func DefaultJetStreamConfig() JetStreamConfig {
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
// It also implements EventStoreWithErrors to report malformed events during replay.
//
// ## Immutability Guarantee
//
// JetStreamEventStore is append-only. Events are stored in a JetStream stream that
// is configured with file-based storage (nats.FileStorage) and a retention policy
// (nats.LimitsPolicy). The configured MaxAge retention policy ensures that old events
// eventually expire, but during their lifetime, events are never modified or deleted
// through the EventStore API. Once an event is published to the stream:
// - It cannot be updated
// - It cannot be deleted before expiration
// - It can only be read
//
// This architectural guarantee, combined with the EventStore interface providing
// no Update or Delete methods, ensures events are immutable and suitable as an
// audit trail.
//
// ## Version Cache Invalidation Strategy
//
// JetStreamEventStore maintains an in-memory cache of actor versions for optimistic
@@ -95,6 +72,12 @@ type JetStreamEventStore struct {
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
@@ -287,28 +270,6 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
// publishEventStored publishes an EventStored event to the broadcaster.
// This is called after a successful SaveEvent to notify subscribers.
//
// EventStored Event Schema:
// - EventType: "EventStored" (aether.EventTypeEventStored)
// - ActorID: ID of the actor that the original event was about
// - Version: version of the stored event
// - Data:
// - eventId: (string) ID of the stored event
// - actorId: (string) ID of the actor
// - version: (int64) version of the event
// - timestamp: (int64) Unix timestamp of when the event was stored
//
// Example usage with NATSEventBus:
//
// eventBus := aether.NewNATSEventBus(natsConn)
// store := store.NewJetStreamEventStoreWithBroadcaster(natsConn, "events", eventBus, "")
// ch := eventBus.SubscribeToEventStored("*")
//
// for event := range ch {
// actorID := event.Data["actorId"].(string)
// version := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
eventStored := &aether.Event{
ID: uuid.New().String(),
@@ -581,43 +542,5 @@ func sanitizeSubject(s string) string {
return s
}
// UpdateVersionCache updates the version cache for a specific actor.
// This is used when receiving events from other nodes via NATS to keep
// the version cache consistent across cluster nodes.
//
// Only updates if the new version is greater than the cached version to prevent
// stale cache entries from causing version conflicts.
func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) {
jes.mu.Lock()
defer jes.mu.Unlock()
// Only update if the new version is greater than cached version
if currentVersion, ok := jes.versions[actorID]; !ok || version > currentVersion {
jes.versions[actorID] = version
}
}
// GetCachedVersion returns the cached version for an actor, if available.
func (jes *JetStreamEventStore) GetCachedVersion(actorID string) (int64, bool) {
jes.mu.Lock()
defer jes.mu.Unlock()
version, ok := jes.versions[actorID]
return version, ok
}
// SetBroadcaster sets the event broadcaster for this store.
// The broadcaster is used to publish EventStored events when events are saved.
func (jes *JetStreamEventStore) SetBroadcaster(broadcaster aether.EventBroadcaster) {
jes.mu.Lock()
defer jes.mu.Unlock()
jes.broadcaster = broadcaster
}
// Close closes the JetStream event store and cleans up resources.
func (jes *JetStreamEventStore) Close(ctx context.Context) error {
return nil
}
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)

File diff suppressed because it is too large Load Diff