Compare commits
13 Commits
20d688f2a2
...
issue-149-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5fb68fed4a | ||
|
|
f46348ab07 | ||
| 6041479286 | |||
|
|
7487a5f3af | ||
|
|
b67417ac68 | ||
|
|
5b5083dcf8 | ||
|
|
6549125f3d | ||
|
|
464fed67ec | ||
|
|
46e1c44017 | ||
| bcbec9ab94 | |||
|
|
de30e1ef1b | ||
|
|
b9e641c2aa | ||
|
|
ec3db5668f |
@@ -17,37 +17,3 @@ jobs:
|
|||||||
run: go build ./...
|
run: go build ./...
|
||||||
- name: Test
|
- name: Test
|
||||||
run: go test ./...
|
run: go test ./...
|
||||||
|
|
||||||
integration:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
- uses: actions/setup-go@v5
|
|
||||||
with:
|
|
||||||
go-version: '1.23'
|
|
||||||
- name: Install and Start NATS Server
|
|
||||||
run: |
|
|
||||||
# Detect architecture and download appropriate binary
|
|
||||||
ARCH=$(uname -m)
|
|
||||||
if [ "$ARCH" = "x86_64" ]; then
|
|
||||||
NATS_ARCH="amd64"
|
|
||||||
elif [ "$ARCH" = "aarch64" ] || [ "$ARCH" = "arm64" ]; then
|
|
||||||
NATS_ARCH="arm64"
|
|
||||||
else
|
|
||||||
echo "Unsupported architecture: $ARCH"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "Detected architecture: $ARCH, using NATS binary: $NATS_ARCH"
|
|
||||||
|
|
||||||
# Download and extract nats-server
|
|
||||||
curl -L "https://github.com/nats-io/nats-server/releases/download/v2.10.24/nats-server-v2.10.24-linux-${NATS_ARCH}.tar.gz" -o nats-server.tar.gz
|
|
||||||
tar -xzf nats-server.tar.gz
|
|
||||||
|
|
||||||
# Start NATS with JetStream
|
|
||||||
./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server -js -p 4222 &
|
|
||||||
|
|
||||||
# Wait for NATS to be ready
|
|
||||||
sleep 3
|
|
||||||
./nats-server-v2.10.24-linux-${NATS_ARCH}/nats-server --version
|
|
||||||
- name: Run Integration Tests
|
|
||||||
run: go test -tags=integration -v ./...
|
|
||||||
|
|||||||
64
.product-strategy/ISSUE_MIGRATION.md
Normal file
64
.product-strategy/ISSUE_MIGRATION.md
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
# Issue: Implement Actor Migration Between Cluster Nodes
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
|
||||||
|
When nodes join or leave the cluster, actors need to be migrated to maintain even distribution. Currently:
|
||||||
|
|
||||||
|
- `handleRebalanceRequest` in `cluster/manager.go:150` is empty
|
||||||
|
- `handleMigrationRequest` in `cluster/manager.go:167` is empty
|
||||||
|
- `RebalanceShards` in `cluster/shard.go:211` returns unchanged map
|
||||||
|
- `SendMessage` in `cluster/distributed.go:139` ignores sharding
|
||||||
|
|
||||||
|
## Required Implementation
|
||||||
|
|
||||||
|
### 1. Rebalance Algorithm (cluster/shard.go)
|
||||||
|
Implement `ConsistentHashPlacement.RebalanceShards` to:
|
||||||
|
- Calculate new shard assignments based on active nodes
|
||||||
|
- Identify actors needing migration
|
||||||
|
- Generate migration plan with source/dest nodes
|
||||||
|
|
||||||
|
### 2. Migration Coordinator (cluster/manager.go)
|
||||||
|
Implement `handleRebalanceRequest` to:
|
||||||
|
- Accept migration plan from leader
|
||||||
|
- For each actor in plan:
|
||||||
|
1. Pause incoming messages
|
||||||
|
2. Capture actor state (replay events up to current version)
|
||||||
|
3. Serialize state
|
||||||
|
4. Send migration request to destination node
|
||||||
|
5. Wait for ack
|
||||||
|
6. Delete actor from current node
|
||||||
|
- Track migration status via `ActorMigration.Status`
|
||||||
|
|
||||||
|
### 3. Cross-Node Message Routing (cluster/distributed.go)
|
||||||
|
Implement proper routing in `SendMessage`:
|
||||||
|
- Use `GetActorNode(actorID)` to determine target node
|
||||||
|
- If remote: marshal message, send via NATS to target node
|
||||||
|
- If local: send to local runtime
|
||||||
|
- Route response back to caller if needed
|
||||||
|
|
||||||
|
## Suggested Approach
|
||||||
|
|
||||||
|
1. **Define message types** for actor migration requests/responses in `cluster/types.go`
|
||||||
|
2. **Implement state capture** - replay events to get current state
|
||||||
|
3. **Implement state restore** - deserialize and restore actor state
|
||||||
|
4. **Implement coordinator** - manage migration phases
|
||||||
|
5. **Add error handling** - handle failed migrations, retries, cleanup
|
||||||
|
6. **Add tests** - test migration with mock NATS
|
||||||
|
|
||||||
|
## Related Files
|
||||||
|
|
||||||
|
- `cluster/manager.go:150` - handleRebalanceRequest (empty)
|
||||||
|
- `cluster/manager.go:167` - handleMigrationRequest (empty)
|
||||||
|
- `cluster/shard.go:211` - RebalanceShards (stub)
|
||||||
|
- `cluster/distributed.go:139` - SendMessage (simplified)
|
||||||
|
- `cluster/types.go:108` - ActorMigration struct
|
||||||
|
|
||||||
|
## Acceptance Criteria
|
||||||
|
|
||||||
|
- [ ] `RebalanceShards` returns new shard map with actor assignments
|
||||||
|
- [ ] `handleRebalanceRequest` processes migration plan
|
||||||
|
- [ ] `handleMigrationRequest` accepts actor migrations
|
||||||
|
- [ ] `SendMessage` routes to correct node
|
||||||
|
- [ ] Actors can be migrated with state preserved
|
||||||
|
- [ ] Failed migrations are handled gracefully
|
||||||
|
- [ ] Integration test with multi-node cluster
|
||||||
117
.product-strategy/ISSUE_SNAPSHOTS.md
Normal file
117
.product-strategy/ISSUE_SNAPSHOTS.md
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
# Issue: Add Snapshot Support to Event Sourcing Workflow
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
|
||||||
|
`SnapshotStore` interface is defined but snapshots are not integrated into the event sourcing workflow. This means:
|
||||||
|
- Actors with many events must replay entire history
|
||||||
|
- No performance optimization for long-lived actors
|
||||||
|
- Snapshots exist as API but are not used
|
||||||
|
|
||||||
|
## Current State
|
||||||
|
|
||||||
|
- `EventStoreWithErrors` in `event.go:235` - no snapshot methods
|
||||||
|
- `SnapshotStore` interface in `event.go:245` - defined but not widely used
|
||||||
|
- `JetStreamEventStore.GetLatestSnapshot` and `SaveSnapshot` implemented but not called automatically
|
||||||
|
- `InMemoryEventStore` has snapshot methods but no lifecycle management
|
||||||
|
|
||||||
|
## Required Implementation
|
||||||
|
|
||||||
|
### 1. Snapshot Strategy
|
||||||
|
Define when to create snapshots:
|
||||||
|
- Fixed interval (e.g., every 100 events)
|
||||||
|
- Version-based (e.g., every 50 versions)
|
||||||
|
- Hybrid: version-based with min/max bounds
|
||||||
|
|
||||||
|
### 2. State Capture
|
||||||
|
Add method to capture actor state:
|
||||||
|
```go
|
||||||
|
// CaptureState rebuilds actor state by replaying events and returns it
|
||||||
|
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. Snapshot Store Extension
|
||||||
|
Extend `EventStoreWithErrors` to include snapshots:
|
||||||
|
```go
|
||||||
|
type EventStoreWithSnapshots interface {
|
||||||
|
EventStoreWithErrors
|
||||||
|
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
|
||||||
|
SaveSnapshot(snapshot *ActorSnapshot) error
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 4. Snapshot Workflow
|
||||||
|
Modify event retrieval to use snapshots:
|
||||||
|
```go
|
||||||
|
GetEvents(actorID string, fromVersion int64) ([]*Event, error) {
|
||||||
|
// 1. Try to get latest snapshot
|
||||||
|
snapshot, _ := store.GetLatestSnapshot(actorID)
|
||||||
|
|
||||||
|
// 2. If snapshot exists and version <= fromVersion:
|
||||||
|
// - Return events from snapshot version + 1
|
||||||
|
// 3. Else:
|
||||||
|
// - Replay all events from version 0
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Suggested Implementation
|
||||||
|
|
||||||
|
### 1. Add CaptureState to EventStore interface
|
||||||
|
In `event.go`, extend `EventStore` or create `StateStore` interface:
|
||||||
|
```go
|
||||||
|
type StateStore interface {
|
||||||
|
EventStore
|
||||||
|
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. Implement CaptureState
|
||||||
|
In `store/jetstream.go`:
|
||||||
|
```go
|
||||||
|
func (jes *JetStreamEventStore) CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) {
|
||||||
|
// Replay events and build state (application logic needed here)
|
||||||
|
events, _ := jes.GetEvents(actorID, fromVersion)
|
||||||
|
// Need application logic to convert events to state
|
||||||
|
return state, nil
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. Add Snapshot Helper
|
||||||
|
Create snapshot utilities:
|
||||||
|
```go
|
||||||
|
// CreateSnapshot creates snapshot from state
|
||||||
|
func CreateSnapshot(actorID string, version int64, state map[string]interface{}) *ActorSnapshot {
|
||||||
|
return &ActorSnapshot{
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: version,
|
||||||
|
State: state,
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 4. Modify GetEvents
|
||||||
|
Update `GetEvents` in both stores to use snapshots when beneficial.
|
||||||
|
|
||||||
|
## Snapshots Workflow Example
|
||||||
|
|
||||||
|
```
|
||||||
|
1. Actor has 1000 events
|
||||||
|
2. Every 100 events, create snapshot
|
||||||
|
3. Actor reaches version 1000, snapshot at version 1000
|
||||||
|
4. Request events from version 900:
|
||||||
|
- Get snapshot at version 1000? No (version too high)
|
||||||
|
- Replay 900->1000 events (only 100 events)
|
||||||
|
5. Request events from version 50:
|
||||||
|
- Get latest snapshot at version 1000? Yes (version > 50)
|
||||||
|
- Use snapshot as base
|
||||||
|
- Replay 1000->1000 events (none)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Acceptance Criteria
|
||||||
|
|
||||||
|
- [ ] `CaptureState` method added to event store
|
||||||
|
- [ ] Snapshots created at configured intervals
|
||||||
|
- [ ] `GetEvents` uses snapshots to optimize replay
|
||||||
|
- [ ] Snapshot workflow tested with long-lived actors
|
||||||
|
- [ ] Configuration for snapshot interval/version
|
||||||
|
- [ ] Metrics: snapshot count, average replay size
|
||||||
100
.product-strategy/ISSUE_VM_RUNTIME.md
Normal file
100
.product-strategy/ISSUE_VM_RUNTIME.md
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
# Issue: Implement VM/Runtime for Actors
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
|
||||||
|
Only interfaces exist for `Runtime` and `VirtualMachine` in `cluster/types.go` and `cluster/distributed.go`, but no actual implementation. Actors cannot be created, started, stopped, or have their state managed.
|
||||||
|
|
||||||
|
## Required Components
|
||||||
|
|
||||||
|
### 1. VM Implementation (cluster/vm.go - new)
|
||||||
|
```go
|
||||||
|
type VirtualMachine struct {
|
||||||
|
actorID string
|
||||||
|
eventStore aether.EventStore
|
||||||
|
state map[string]interface{}
|
||||||
|
version int64
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Methods needed:
|
||||||
|
- `GetID()`, `GetActorID()`, `GetState()` - already in interface
|
||||||
|
- `Start()` - replay events to rebuild state
|
||||||
|
- `ProcessEvent(event *aether.Event)` - apply event to state
|
||||||
|
- `Stop()` - persist final state
|
||||||
|
- `GetVersion()` - current event version
|
||||||
|
|
||||||
|
### 2. Runtime Implementation (cluster/runtime.go - new)
|
||||||
|
```go
|
||||||
|
type Runtime struct {
|
||||||
|
natsConn *nats.Conn
|
||||||
|
eventStore aether.EventStore
|
||||||
|
vmRegistry VMRegistry // map[actorID]*VirtualMachine
|
||||||
|
config RuntimeConfig
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Methods needed:
|
||||||
|
- `Start()` - initialize and start processing
|
||||||
|
- `LoadModel(model eventstorming.Model)` - register domain types
|
||||||
|
- `SendMessage(message RuntimeMessage)` - route to appropriate VM
|
||||||
|
- `GetActiveVMs()` - return map of active VMs
|
||||||
|
- `CreateVM(actorID string)` - create new VM instance
|
||||||
|
- `StopVM(actorID string)` - persist and stop VM
|
||||||
|
|
||||||
|
### 3. Event Processing
|
||||||
|
- Subscribe to actor's event stream
|
||||||
|
- Replay events to build initial state
|
||||||
|
- Apply new events as they arrive
|
||||||
|
- Handle event versions and conflicts
|
||||||
|
|
||||||
|
## Suggested Design
|
||||||
|
|
||||||
|
### VM Lifecycle
|
||||||
|
```
|
||||||
|
1. Actor message arrives for actor-123
|
||||||
|
2. Runtime checks if VM exists for actor-123
|
||||||
|
3. If not, create VM:
|
||||||
|
- Replay events from event store
|
||||||
|
- Rebuild state
|
||||||
|
4. Route message to VM
|
||||||
|
5. VM processes message -> creates new events
|
||||||
|
6. Events persisted to event store
|
||||||
|
7. VM state updated
|
||||||
|
```
|
||||||
|
|
||||||
|
### State Management
|
||||||
|
- State derived from event replay
|
||||||
|
- No separate state store needed
|
||||||
|
- Can snapshot periodically for performance
|
||||||
|
- Version conflict handling using existing EventStore
|
||||||
|
|
||||||
|
## Implementation Steps
|
||||||
|
|
||||||
|
1. **Create VM struct** in `cluster/vm.go`
|
||||||
|
2. **Implement event replay** to rebuild state
|
||||||
|
3. **Create Runtime** in `cluster/runtime.go`
|
||||||
|
4. **Register Runtime with cluster** via `SetVMProvider`
|
||||||
|
5. **Implement message processing** - validate against model
|
||||||
|
6. **Add version conflict handling** using existing EventStore
|
||||||
|
7. **Write tests** - mock event store, test state transitions
|
||||||
|
|
||||||
|
## File Structure
|
||||||
|
|
||||||
|
```
|
||||||
|
cluster/
|
||||||
|
├── vm.go # VirtualMachine implementation
|
||||||
|
├── runtime.go # Runtime implementation
|
||||||
|
├── vm_test.go # VM tests
|
||||||
|
├── runtime_test.go # Runtime tests
|
||||||
|
└── integration_test.go # Integration tests
|
||||||
|
```
|
||||||
|
|
||||||
|
## Acceptance Criteria
|
||||||
|
|
||||||
|
- [ ] VM can be created with actor ID
|
||||||
|
- [ ] VM replays events to build state
|
||||||
|
- [ ] VM processes events and updates state
|
||||||
|
- [ ] VM persists current version
|
||||||
|
- [ ] Runtime can create/stop VMs
|
||||||
|
- [ ] Runtime manages VM registry
|
||||||
|
- [ ] Integration test with NATS and JetStream
|
||||||
29
README.md
29
README.md
@@ -107,7 +107,34 @@ Order state after replaying 2 events:
|
|||||||
|
|
||||||
### Events are immutable
|
### Events are immutable
|
||||||
|
|
||||||
Events represent facts about what happened. Once saved, they are never modified - you only append new events.
|
Events represent facts about what happened. Once saved, they are never modified or deleted - you only append new events. This immutability guarantee is enforced at multiple levels:
|
||||||
|
|
||||||
|
**Interface Design**: The `EventStore` interface provides no Update or Delete methods. Only `SaveEvent` (append), `GetEvents` (read), and `GetLatestVersion` (read) are available.
|
||||||
|
|
||||||
|
**JetStream Storage**: When using `JetStreamEventStore`, events are stored in a NATS JetStream stream configured with:
|
||||||
|
- File-based storage (durable)
|
||||||
|
- Limits-based retention policy (events expire after configured duration, not before)
|
||||||
|
- No mechanism to modify or delete individual events during their lifetime
|
||||||
|
|
||||||
|
**Audit Trail Guarantee**: Because events are immutable once persisted, they serve as a trustworthy audit trail. You can rely on the fact that historical events won't change, enabling compliance and forensics.
|
||||||
|
|
||||||
|
To correct a mistake, append a new event that expresses the correction rather than modifying history:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Wrong: Cannot update an event
|
||||||
|
// store.UpdateEvent(eventID, newData) // This method doesn't exist
|
||||||
|
|
||||||
|
// Right: Append a new event that corrects the record
|
||||||
|
correctionEvent := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: "OrderCorrected",
|
||||||
|
ActorID: orderID,
|
||||||
|
Version: currentVersion + 1,
|
||||||
|
Data: map[string]interface{}{"reason": "price adjustment"},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
err := store.SaveEvent(correctionEvent)
|
||||||
|
```
|
||||||
|
|
||||||
### State is derived
|
### State is derived
|
||||||
|
|
||||||
|
|||||||
22
event.go
22
event.go
@@ -73,6 +73,14 @@ type Event struct {
|
|||||||
Timestamp time.Time `json:"timestamp"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Common event types for Aether infrastructure
|
||||||
|
const (
|
||||||
|
// EventTypeEventStored is an internal event published when an event is successfully persisted.
|
||||||
|
// This event allows observability components (metrics, projections, audit systems) to react
|
||||||
|
// to persisted events without coupling to application code.
|
||||||
|
EventTypeEventStored = "EventStored"
|
||||||
|
)
|
||||||
|
|
||||||
// Common metadata keys for distributed tracing and auditing
|
// Common metadata keys for distributed tracing and auditing
|
||||||
const (
|
const (
|
||||||
// MetadataKeyCorrelationID identifies related events across services
|
// MetadataKeyCorrelationID identifies related events across services
|
||||||
@@ -176,6 +184,17 @@ type ActorSnapshot struct {
|
|||||||
|
|
||||||
// EventStore defines the interface for event persistence.
|
// EventStore defines the interface for event persistence.
|
||||||
//
|
//
|
||||||
|
// # Immutability Guarantee
|
||||||
|
//
|
||||||
|
// EventStore is append-only. Once an event is persisted via SaveEvent, it is never
|
||||||
|
// modified or deleted. The interface intentionally provides no Update or Delete methods.
|
||||||
|
// This ensures:
|
||||||
|
// - Events serve as an immutable audit trail
|
||||||
|
// - State can be safely derived by replaying events
|
||||||
|
// - Concurrent reads are always safe (events never change)
|
||||||
|
//
|
||||||
|
// To correct a mistake, append a new event that expresses the correction.
|
||||||
|
//
|
||||||
// # Version Semantics
|
// # Version Semantics
|
||||||
//
|
//
|
||||||
// Events for an actor must have monotonically increasing versions. When SaveEvent
|
// Events for an actor must have monotonically increasing versions. When SaveEvent
|
||||||
@@ -196,10 +215,13 @@ type EventStore interface {
|
|||||||
// SaveEvent persists an event to the store. The event's Version must be
|
// SaveEvent persists an event to the store. The event's Version must be
|
||||||
// strictly greater than the current latest version for the actor.
|
// strictly greater than the current latest version for the actor.
|
||||||
// Returns VersionConflictError if version <= current latest version.
|
// Returns VersionConflictError if version <= current latest version.
|
||||||
|
// Once saved, the event is immutable and can never be modified or deleted.
|
||||||
SaveEvent(event *Event) error
|
SaveEvent(event *Event) error
|
||||||
|
|
||||||
// GetEvents retrieves events for an actor from a specific version (inclusive).
|
// GetEvents retrieves events for an actor from a specific version (inclusive).
|
||||||
// Returns an empty slice if no events exist for the actor.
|
// Returns an empty slice if no events exist for the actor.
|
||||||
|
// The returned events are guaranteed to be immutable - they will never be
|
||||||
|
// modified or deleted from the store.
|
||||||
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
|
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
|
||||||
|
|
||||||
// GetLatestVersion returns the latest version for an actor.
|
// GetLatestVersion returns the latest version for an actor.
|
||||||
|
|||||||
189
event_test.go
189
event_test.go
@@ -2,6 +2,8 @@ package aether
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -1335,3 +1337,190 @@ func TestReplayError_WithLargeRawData(t *testing.T) {
|
|||||||
// Error() should still work
|
// Error() should still work
|
||||||
_ = err.Error()
|
_ = err.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests for VersionConflictError
|
||||||
|
|
||||||
|
func TestVersionConflictError_Error(t *testing.T) {
|
||||||
|
err := &VersionConflictError{
|
||||||
|
ActorID: "order-123",
|
||||||
|
AttemptedVersion: 3,
|
||||||
|
CurrentVersion: 5,
|
||||||
|
}
|
||||||
|
|
||||||
|
errMsg := err.Error()
|
||||||
|
|
||||||
|
// Verify error message contains all context
|
||||||
|
if !strings.Contains(errMsg, "order-123") {
|
||||||
|
t.Errorf("error message should contain ActorID, got: %s", errMsg)
|
||||||
|
}
|
||||||
|
if !strings.Contains(errMsg, "3") {
|
||||||
|
t.Errorf("error message should contain AttemptedVersion, got: %s", errMsg)
|
||||||
|
}
|
||||||
|
if !strings.Contains(errMsg, "5") {
|
||||||
|
t.Errorf("error message should contain CurrentVersion, got: %s", errMsg)
|
||||||
|
}
|
||||||
|
if !strings.Contains(errMsg, "version conflict") {
|
||||||
|
t.Errorf("error message should contain 'version conflict', got: %s", errMsg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVersionConflictError_Fields(t *testing.T) {
|
||||||
|
err := &VersionConflictError{
|
||||||
|
ActorID: "actor-456",
|
||||||
|
AttemptedVersion: 10,
|
||||||
|
CurrentVersion: 8,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err.ActorID != "actor-456" {
|
||||||
|
t.Errorf("ActorID mismatch: got %q, want %q", err.ActorID, "actor-456")
|
||||||
|
}
|
||||||
|
if err.AttemptedVersion != 10 {
|
||||||
|
t.Errorf("AttemptedVersion mismatch: got %d, want %d", err.AttemptedVersion, 10)
|
||||||
|
}
|
||||||
|
if err.CurrentVersion != 8 {
|
||||||
|
t.Errorf("CurrentVersion mismatch: got %d, want %d", err.CurrentVersion, 8)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVersionConflictError_Unwrap(t *testing.T) {
|
||||||
|
err := &VersionConflictError{
|
||||||
|
ActorID: "actor-789",
|
||||||
|
AttemptedVersion: 2,
|
||||||
|
CurrentVersion: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
unwrapped := err.Unwrap()
|
||||||
|
if unwrapped != ErrVersionConflict {
|
||||||
|
t.Errorf("Unwrap should return ErrVersionConflict sentinel")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVersionConflictError_ErrorsIs(t *testing.T) {
|
||||||
|
err := &VersionConflictError{
|
||||||
|
ActorID: "test-actor",
|
||||||
|
AttemptedVersion: 5,
|
||||||
|
CurrentVersion: 4,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that errors.Is works with sentinel
|
||||||
|
if !errors.Is(err, ErrVersionConflict) {
|
||||||
|
t.Error("errors.Is(err, ErrVersionConflict) should return true")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that other errors don't match
|
||||||
|
if errors.Is(err, errors.New("other error")) {
|
||||||
|
t.Error("errors.Is should not match unrelated errors")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVersionConflictError_ErrorsAs(t *testing.T) {
|
||||||
|
originalErr := &VersionConflictError{
|
||||||
|
ActorID: "actor-unwrap",
|
||||||
|
AttemptedVersion: 7,
|
||||||
|
CurrentVersion: 6,
|
||||||
|
}
|
||||||
|
|
||||||
|
var versionErr *VersionConflictError
|
||||||
|
if !errors.As(originalErr, &versionErr) {
|
||||||
|
t.Fatalf("errors.As should succeed with VersionConflictError")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify fields are accessible through unwrapped error
|
||||||
|
if versionErr.ActorID != "actor-unwrap" {
|
||||||
|
t.Errorf("ActorID mismatch after As: got %q", versionErr.ActorID)
|
||||||
|
}
|
||||||
|
if versionErr.AttemptedVersion != 7 {
|
||||||
|
t.Errorf("AttemptedVersion mismatch after As: got %d", versionErr.AttemptedVersion)
|
||||||
|
}
|
||||||
|
if versionErr.CurrentVersion != 6 {
|
||||||
|
t.Errorf("CurrentVersion mismatch after As: got %d", versionErr.CurrentVersion)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVersionConflictError_CanReadCurrentVersion(t *testing.T) {
|
||||||
|
// This test verifies that applications can read CurrentVersion for retry strategies
|
||||||
|
err := &VersionConflictError{
|
||||||
|
ActorID: "order-abc",
|
||||||
|
AttemptedVersion: 2,
|
||||||
|
CurrentVersion: 10,
|
||||||
|
}
|
||||||
|
|
||||||
|
var versionErr *VersionConflictError
|
||||||
|
if !errors.As(err, &versionErr) {
|
||||||
|
t.Fatal("failed to unwrap VersionConflictError")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Application can use CurrentVersion to decide retry strategy
|
||||||
|
nextVersion := versionErr.CurrentVersion + 1
|
||||||
|
|
||||||
|
if nextVersion != 11 {
|
||||||
|
t.Errorf("application should be able to compute next version: got %d, want 11", nextVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Application can log detailed context
|
||||||
|
logMsg := fmt.Sprintf("Version conflict for actor %q: attempted %d, current %d, will retry with %d",
|
||||||
|
versionErr.ActorID, versionErr.AttemptedVersion, versionErr.CurrentVersion, nextVersion)
|
||||||
|
|
||||||
|
if !strings.Contains(logMsg, "order-abc") {
|
||||||
|
t.Errorf("application context logging failed: %s", logMsg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVersionConflictError_EdgeCases(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
actorID string
|
||||||
|
attemp int64
|
||||||
|
current int64
|
||||||
|
}{
|
||||||
|
{"zero current", "actor-1", 1, 0},
|
||||||
|
{"large numbers", "actor-2", 1000000, 999999},
|
||||||
|
{"max int64", "actor-3", 9223372036854775807, 9223372036854775806},
|
||||||
|
{"negative attempt", "actor-4", -1, -2},
|
||||||
|
{"empty actor id", "", 1, 0},
|
||||||
|
{"special chars in actor id", "actor@#$%", 2, 1},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
err := &VersionConflictError{
|
||||||
|
ActorID: tc.actorID,
|
||||||
|
AttemptedVersion: tc.attemp,
|
||||||
|
CurrentVersion: tc.current,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should not panic
|
||||||
|
msg := err.Error()
|
||||||
|
if msg == "" {
|
||||||
|
t.Error("Error() should return non-empty string")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should be wrapped correctly
|
||||||
|
if err.Unwrap() != ErrVersionConflict {
|
||||||
|
t.Error("Unwrap should return ErrVersionConflict")
|
||||||
|
}
|
||||||
|
|
||||||
|
// errors.Is should work
|
||||||
|
if !errors.Is(err, ErrVersionConflict) {
|
||||||
|
t.Error("errors.Is should work for edge case")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestErrVersionConflict_Sentinel(t *testing.T) {
|
||||||
|
// Verify the sentinel error is correctly defined
|
||||||
|
if ErrVersionConflict == nil {
|
||||||
|
t.Fatal("ErrVersionConflict should not be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedMsg := "version conflict"
|
||||||
|
if ErrVersionConflict.Error() != expectedMsg {
|
||||||
|
t.Errorf("ErrVersionConflict message mismatch: got %q, want %q", ErrVersionConflict.Error(), expectedMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that it's usable with errors.Is
|
||||||
|
if !errors.Is(ErrVersionConflict, ErrVersionConflict) {
|
||||||
|
t.Error("ErrVersionConflict should match itself with errors.Is")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
189
examples/README.md
Normal file
189
examples/README.md
Normal file
@@ -0,0 +1,189 @@
|
|||||||
|
# Aether Examples
|
||||||
|
|
||||||
|
This directory contains examples demonstrating common patterns for using Aether.
|
||||||
|
|
||||||
|
## Retry Patterns (`retry_patterns.go`)
|
||||||
|
|
||||||
|
When saving events with optimistic concurrency control, your application may encounter `VersionConflictError` when multiple writers attempt to update the same actor concurrently. This file demonstrates several retry strategies.
|
||||||
|
|
||||||
|
### Pattern Overview
|
||||||
|
|
||||||
|
All retry patterns work with `VersionConflictError` which provides three critical fields:
|
||||||
|
|
||||||
|
- **ActorID**: The actor that experienced the conflict
|
||||||
|
- **CurrentVersion**: The latest version in the store
|
||||||
|
- **AttemptedVersion**: The version you tried to save
|
||||||
|
|
||||||
|
Your application can read these fields to make intelligent retry decisions.
|
||||||
|
|
||||||
|
### Available Patterns
|
||||||
|
|
||||||
|
#### SimpleRetryPattern
|
||||||
|
|
||||||
|
The most basic pattern - just retry with exponential backoff:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Automatically retries up to 3 times with exponential backoff
|
||||||
|
err := SimpleRetryPattern(store, "order-123", "OrderUpdated")
|
||||||
|
```
|
||||||
|
|
||||||
|
**Use when**: You want a straightforward retry mechanism without complex logic.
|
||||||
|
|
||||||
|
#### ConflictDetailedRetryPattern
|
||||||
|
|
||||||
|
Extracts detailed information from the conflict error to make smarter decisions:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Detects thrashing (multiple conflicts at same version)
|
||||||
|
// and can implement circuit-breaker logic
|
||||||
|
err := ConflictDetailedRetryPattern(store, "order-123", "OrderUpdated")
|
||||||
|
```
|
||||||
|
|
||||||
|
**Use when**: You need visibility into conflict patterns and want to detect system issues like thrashing.
|
||||||
|
|
||||||
|
#### JitterRetryPattern
|
||||||
|
|
||||||
|
Adds randomized jitter to prevent "thundering herd" when multiple writers retry:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Exponential backoff with jitter prevents synchronized retries
|
||||||
|
err := JitterRetryPattern(store, "order-123", "OrderUpdated")
|
||||||
|
```
|
||||||
|
|
||||||
|
**Use when**: You have high concurrency and want to prevent all writers from retrying at the same time.
|
||||||
|
|
||||||
|
#### AdaptiveRetryPattern
|
||||||
|
|
||||||
|
Adjusts backoff duration based on version distance (indicator of contention):
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Light contention (gap=1): 50ms backoff
|
||||||
|
// Moderate contention (gap=3-10): proportional backoff
|
||||||
|
// High contention (gap>10): aggressive backoff
|
||||||
|
err := AdaptiveRetryPattern(store, "order-123", "OrderUpdated")
|
||||||
|
```
|
||||||
|
|
||||||
|
**Use when**: You want backoff strategy to respond to actual system load.
|
||||||
|
|
||||||
|
#### EventualConsistencyPattern
|
||||||
|
|
||||||
|
Instead of blocking on retry, queues the event for asynchronous retry:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Returns immediately, event is queued for background retry
|
||||||
|
EventualConsistencyPattern(store, retryQueue, event)
|
||||||
|
|
||||||
|
// Background worker processes the queue
|
||||||
|
for item := range retryQueue {
|
||||||
|
// Implement your own retry logic here
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Use when**: You can't afford to block the request, and background retry is acceptable.
|
||||||
|
|
||||||
|
#### CircuitBreakerPattern
|
||||||
|
|
||||||
|
Implements a circuit breaker to prevent cascading failures:
|
||||||
|
|
||||||
|
```go
|
||||||
|
cb := NewCircuitBreaker()
|
||||||
|
|
||||||
|
// Fails fast when circuit is open
|
||||||
|
err := CircuitBreakerRetryPattern(store, cb, "order-123", "OrderUpdated")
|
||||||
|
if err != nil && !cb.CanRetry() {
|
||||||
|
return ErrCircuitBreakerOpen
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Use when**: You have a distributed system and want to prevent retry storms during outages.
|
||||||
|
|
||||||
|
## Common Pattern: Extract and Log Context
|
||||||
|
|
||||||
|
All patterns can read context from `VersionConflictError`:
|
||||||
|
|
||||||
|
```go
|
||||||
|
var versionErr *aether.VersionConflictError
|
||||||
|
if errors.As(err, &versionErr) {
|
||||||
|
log.Printf(
|
||||||
|
"Conflict for actor %q: attempted %d, current %d",
|
||||||
|
versionErr.ActorID,
|
||||||
|
versionErr.AttemptedVersion,
|
||||||
|
versionErr.CurrentVersion,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Sentinel Error Check
|
||||||
|
|
||||||
|
Check if an error is a version conflict without examining the struct:
|
||||||
|
|
||||||
|
```go
|
||||||
|
if errors.Is(err, aether.ErrVersionConflict) {
|
||||||
|
// This is a version conflict - retry is appropriate
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Implementing Your Own Pattern
|
||||||
|
|
||||||
|
Basic template:
|
||||||
|
|
||||||
|
```go
|
||||||
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||||
|
// 1. Get current version
|
||||||
|
currentVersion, err := store.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Create event with next version
|
||||||
|
event := &aether.Event{
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: currentVersion + 1,
|
||||||
|
// ... other fields
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Attempt save
|
||||||
|
err = store.SaveEvent(event)
|
||||||
|
if err == nil {
|
||||||
|
return nil // Success
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Check if it's a conflict
|
||||||
|
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||||
|
return err // Some other error
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Implement your retry strategy
|
||||||
|
time.Sleep(yourBackoff(attempt))
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Choosing a Pattern
|
||||||
|
|
||||||
|
| Pattern | Latency | Throughput | Complexity | Use Case |
|
||||||
|
|---------|---------|-----------|-----------|----------|
|
||||||
|
| Simple | Low | Low | Very Low | Single writer, testing |
|
||||||
|
| DetailedConflict | Low | Medium | Medium | Debugging, monitoring |
|
||||||
|
| Jitter | Low-Medium | High | Low | Multi-writer concurrency |
|
||||||
|
| Adaptive | Low-Medium | High | Medium | Variable load scenarios |
|
||||||
|
| EventualConsistency | Very Low | Very High | High | High-volume, async-OK workloads |
|
||||||
|
| CircuitBreaker | Variable | Stable | High | Distributed, failure-resilient systems |
|
||||||
|
|
||||||
|
## Performance Considerations
|
||||||
|
|
||||||
|
1. **Backoff timing**: Shorter backoffs waste CPU on retries, longer backoffs increase latency
|
||||||
|
2. **Retry limits**: Too few retries give up too early, too many waste resources
|
||||||
|
3. **Jitter**: Essential for preventing synchronized retries in high-concurrency scenarios
|
||||||
|
4. **Monitoring**: Track retry rates and conflict patterns to detect system issues
|
||||||
|
|
||||||
|
## Testing
|
||||||
|
|
||||||
|
Use `aether.NewInMemoryEventStore()` in tests:
|
||||||
|
|
||||||
|
```go
|
||||||
|
store := store.NewInMemoryEventStore()
|
||||||
|
err := SimpleRetryPattern(store, "test-actor", "TestEvent")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("retry pattern failed: %v", err)
|
||||||
|
}
|
||||||
|
```
|
||||||
168
examples/cross_node_broadcasting.go
Normal file
168
examples/cross_node_broadcasting.go
Normal file
@@ -0,0 +1,168 @@
|
|||||||
|
// Package main demonstrates cross-node event broadcasting using NATSEventBus
|
||||||
|
// and JetStreamEventStore for cluster synchronization.
|
||||||
|
//
|
||||||
|
// This example shows:
|
||||||
|
// 1. Setting up NATSEventBus with JetStreamEventStore
|
||||||
|
// 2. Broadcasting events across NATS for cross-node distribution
|
||||||
|
// 3. Subscribing to EventStored events for version cache synchronization
|
||||||
|
// 4. Properly handling EventStored events from other cluster nodes
|
||||||
|
//
|
||||||
|
// Prerequisites:
|
||||||
|
// - NATS server running with JetStream enabled (nats-server -js)
|
||||||
|
// - Events stream created in JetStream
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
"git.flowmade.one/flowmade-one/aether/store"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
natsURL := getEnv("NATS_URL", "nats://localhost:4222")
|
||||||
|
|
||||||
|
nc, err := nats.Connect(natsURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to connect to NATS:", err)
|
||||||
|
}
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
store1, err := store.NewJetStreamEventStore(nc, "events")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to create event store:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
eventBus1 := aether.NewNATSEventBusWithBroadcaster(nc, store1, "")
|
||||||
|
defer eventBus1.Stop()
|
||||||
|
|
||||||
|
store2, err := store.NewJetStreamEventStore(nc, "events")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to create event store:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
eventBus2 := aether.NewNATSEventBusWithBroadcaster(nc, store2, "")
|
||||||
|
defer eventBus2.Stop()
|
||||||
|
|
||||||
|
eventStoredCh1 := eventBus1.SubscribeToEventStored("*")
|
||||||
|
eventStoredCh2 := eventBus2.SubscribeToEventStored("*")
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
go processEvents(ctx, eventStoredCh1, store1, done)
|
||||||
|
go processEvents(ctx, eventStoredCh2, store2, done)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
actorID := "demo-actor"
|
||||||
|
|
||||||
|
event1 := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"total": 99.99,
|
||||||
|
"status": "pending",
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Node 1 publishing event: %s", event1.EventType)
|
||||||
|
eventBus1.Publish("", event1)
|
||||||
|
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
|
||||||
|
event2 := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: "OrderPaid",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 2,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"total": 99.99,
|
||||||
|
"status": "paid",
|
||||||
|
"method": "credit_card",
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Node 2 publishing event: %s", event2.EventType)
|
||||||
|
eventBus2.Publish("", event2)
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
close(done)
|
||||||
|
|
||||||
|
log.Println("Cross-node broadcasting demo complete")
|
||||||
|
}()
|
||||||
|
|
||||||
|
sigCh := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-sigCh:
|
||||||
|
log.Println("Shutting down...")
|
||||||
|
case <-done:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func processEvents(ctx context.Context, eventStoredCh <-chan *aether.Event, eventStore *store.JetStreamEventStore, done chan struct{}) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case event, ok := <-eventStoredCh:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if event == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if event.EventType != aether.EventTypeEventStored {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
actorID, ok := event.Data["actorId"].(string)
|
||||||
|
if !ok {
|
||||||
|
log.Printf("Warning: EventStored missing actorId")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
version, ok := event.Data["version"].(int64)
|
||||||
|
if !ok {
|
||||||
|
log.Printf("Warning: EventStored missing version")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
eventID, _ := event.Data["eventId"].(string)
|
||||||
|
|
||||||
|
log.Printf("Received EventStored: actor=%s, version=%d, eventId=%s", actorID, version, eventID)
|
||||||
|
|
||||||
|
eventStore.UpdateVersionCache(actorID, version)
|
||||||
|
|
||||||
|
currentVersion, _ := eventStore.GetLatestVersion(actorID)
|
||||||
|
log.Printf("Updated cache: %s now has version %d (cached: %d)", actorID, version, currentVersion)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getEnv(key, defaultValue string) string {
|
||||||
|
if value := os.Getenv(key); value != "" {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
return defaultValue
|
||||||
|
}
|
||||||
157
nats_eventbus.go
157
nats_eventbus.go
@@ -5,6 +5,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
@@ -19,14 +20,16 @@ import (
|
|||||||
// This bypasses namespace isolation at the NATS level. Ensure proper access controls
|
// This bypasses namespace isolation at the NATS level. Ensure proper access controls
|
||||||
// are in place at the application layer before granting wildcard subscription access.
|
// are in place at the application layer before granting wildcard subscription access.
|
||||||
type NATSEventBus struct {
|
type NATSEventBus struct {
|
||||||
*EventBus // Embed base EventBus for local subscriptions
|
*EventBus // Embed base EventBus for local subscriptions
|
||||||
nc *nats.Conn // NATS connection
|
nc *nats.Conn // NATS connection
|
||||||
subscriptions []*nats.Subscription
|
subscriptions []*nats.Subscription
|
||||||
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
|
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
|
||||||
nodeID string // Unique ID for this node
|
nodeID string // Unique ID for this node
|
||||||
mutex sync.Mutex
|
streamPrefix string // NATS subject prefix for events
|
||||||
ctx context.Context
|
eventStore interface{} // Optional event store for version cache sync (jetstream.JetStreamEventStore)
|
||||||
cancel context.CancelFunc
|
mutex sync.Mutex
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// eventMessage is the wire format for events sent over NATS
|
// eventMessage is the wire format for events sent over NATS
|
||||||
@@ -46,6 +49,7 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
|
|||||||
nodeID: uuid.New().String(),
|
nodeID: uuid.New().String(),
|
||||||
subscriptions: make([]*nats.Subscription, 0),
|
subscriptions: make([]*nats.Subscription, 0),
|
||||||
patternSubscribers: make(map[string]int),
|
patternSubscribers: make(map[string]int),
|
||||||
|
streamPrefix: "aether",
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
@@ -53,6 +57,43 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
|
|||||||
return neb, nil
|
return neb, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewNATSEventBusWithBroadcaster creates a new NATS-backed event bus with JetStreamEventStore integration.
|
||||||
|
// The event store is used to automatically update version cache when EventStored events are received
|
||||||
|
// from other cluster nodes via NATS. This ensures cross-node version consistency.
|
||||||
|
//
|
||||||
|
// Example:
|
||||||
|
//
|
||||||
|
// eventBus := aether.NewNATSEventBusWithBroadcaster(natsConn, store, "tenant-abc")
|
||||||
|
// ch := eventBus.SubscribeToEventStored("tenant-*")
|
||||||
|
// for event := range ch {
|
||||||
|
// actorID := event.Data["actorId"].(string)
|
||||||
|
// version := event.Data["version"].(int64)
|
||||||
|
// store.UpdateVersionCache(actorID, version)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// The namespace parameter is used as a prefix for EventStored event filtering.
|
||||||
|
// If empty, EventStored events from all namespaces will be received (requires wildcard pattern).
|
||||||
|
func NewNATSEventBusWithBroadcaster(nc *nats.Conn, store interface{}, namespace string) *NATSEventBus {
|
||||||
|
streamPrefix := "aether"
|
||||||
|
if namespace != "" {
|
||||||
|
streamPrefix = fmt.Sprintf("aether.%s", sanitizeSubject(namespace))
|
||||||
|
}
|
||||||
|
|
||||||
|
neb := &NATSEventBus{
|
||||||
|
EventBus: NewEventBus(),
|
||||||
|
nc: nc,
|
||||||
|
nodeID: uuid.New().String(),
|
||||||
|
subscriptions: make([]*nats.Subscription, 0),
|
||||||
|
patternSubscribers: make(map[string]int),
|
||||||
|
streamPrefix: streamPrefix,
|
||||||
|
eventStore: store,
|
||||||
|
ctx: context.Background(),
|
||||||
|
cancel: func() {},
|
||||||
|
}
|
||||||
|
|
||||||
|
return neb
|
||||||
|
}
|
||||||
|
|
||||||
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
|
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
|
||||||
// Supports NATS subject patterns:
|
// Supports NATS subject patterns:
|
||||||
// - "*" matches a single token
|
// - "*" matches a single token
|
||||||
@@ -228,3 +269,103 @@ func (neb *NATSEventBus) Stop() {
|
|||||||
|
|
||||||
log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID)
|
log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sanitizeSubject sanitizes a string for use in NATS subjects
|
||||||
|
func sanitizeSubject(s string) string {
|
||||||
|
s = strings.ReplaceAll(s, " ", "_")
|
||||||
|
s = strings.ReplaceAll(s, ".", "_")
|
||||||
|
s = strings.ReplaceAll(s, "*", "_")
|
||||||
|
s = strings.ReplaceAll(s, ">", "_")
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// extractActorType extracts the actor type from an actor ID
|
||||||
|
func extractActorType(actorID string) string {
|
||||||
|
for i, c := range actorID {
|
||||||
|
if c == '-' && i > 0 {
|
||||||
|
return actorID[:i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubscribeToEventStored creates a subscription to EventStored events for a namespace pattern.
|
||||||
|
// EventStored events are published by JetStreamEventStore when events are successfully saved.
|
||||||
|
// This is useful for cross-node event synchronization and version cache consistency.
|
||||||
|
//
|
||||||
|
// The returned channel receives EventStored events matching the pattern.
|
||||||
|
// The EventStored event schema:
|
||||||
|
// - EventType: "EventStored"
|
||||||
|
// - ActorID: ID of the actor that the original event was about
|
||||||
|
// - Version: version of the stored event
|
||||||
|
// - Data:
|
||||||
|
// - eventId: (string) ID of the stored event
|
||||||
|
// - actorId: (string) ID of the actor
|
||||||
|
// - version: (int64) version of the event
|
||||||
|
// - timestamp: (int64) Unix timestamp of when the event was stored
|
||||||
|
//
|
||||||
|
// The namespacePattern supports NATS wildcards:
|
||||||
|
// - "*" matches a single token
|
||||||
|
// - ">" matches one or more tokens (only at the end)
|
||||||
|
//
|
||||||
|
// Example:
|
||||||
|
//
|
||||||
|
// ch := eventBus.SubscribeToEventStored("tenant-*")
|
||||||
|
// for event := range ch {
|
||||||
|
// if event.EventType != aether.EventTypeEventStored {
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
// actorID := event.Data["actorId"].(string)
|
||||||
|
// version, _ := event.Data["version"].(int64)
|
||||||
|
// store.UpdateVersionCache(actorID, version)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// Security Warning: Using wildcard patterns like ">" will receive EventStored events
|
||||||
|
// from all namespaces. Ensure your application handles this appropriately.
|
||||||
|
func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event {
|
||||||
|
neb.mutex.Lock()
|
||||||
|
defer neb.mutex.Unlock()
|
||||||
|
|
||||||
|
subject := fmt.Sprintf("%s.%s.%s", neb.streamPrefix, namespacePattern, "events.>")
|
||||||
|
|
||||||
|
ch := make(chan *Event, 100)
|
||||||
|
|
||||||
|
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
|
||||||
|
var eventMsg eventMessage
|
||||||
|
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
|
||||||
|
log.Printf("[NATSEventBus] Failed to unmarshal EventStored event: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if eventMsg.NodeID == neb.nodeID {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if eventMsg.Event.EventType == EventTypeEventStored && neb.eventStore != nil {
|
||||||
|
actorID, ok := eventMsg.Event.Data["actorId"].(string)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
version, ok := eventMsg.Event.Data["version"].(int64)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Use type assertion to call UpdateVersionCache
|
||||||
|
if es, ok := neb.eventStore.(interface{ UpdateVersionCache(string, int64) }); ok {
|
||||||
|
es.UpdateVersionCache(actorID, version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[NATSEventBus] Failed to subscribe to EventStored: %v", err)
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
neb.subscriptions = append(neb.subscriptions, sub)
|
||||||
|
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
6
renovate.json
Normal file
6
renovate.json
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
{
|
||||||
|
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
|
||||||
|
"extends": [
|
||||||
|
"config:recommended"
|
||||||
|
]
|
||||||
|
}
|
||||||
215
store/immutability_test.go
Normal file
215
store/immutability_test.go
Normal file
@@ -0,0 +1,215 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestEventImmutability_MemoryStore verifies that events cannot be modified after persistence
|
||||||
|
// in the in-memory event store. This demonstrates the append-only nature of event sourcing.
|
||||||
|
func TestEventImmutability_MemoryStore(t *testing.T) {
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
actorID := "test-actor-123"
|
||||||
|
|
||||||
|
// Create and save an event
|
||||||
|
originalEvent := &aether.Event{
|
||||||
|
ID: "evt-immutable-1",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"value": "original",
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(originalEvent)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the event from the store
|
||||||
|
events, err := store.GetEvents(actorID, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(events) == 0 {
|
||||||
|
t.Fatal("expected 1 event, got 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
retrievedEvent := events[0]
|
||||||
|
|
||||||
|
// Verify the stored event has the correct values
|
||||||
|
if retrievedEvent.Data["value"] != "original" {
|
||||||
|
t.Errorf("Data value mismatch: got %v, want %v", retrievedEvent.Data["value"], "original")
|
||||||
|
}
|
||||||
|
|
||||||
|
if retrievedEvent.EventType != "TestEvent" {
|
||||||
|
t.Errorf("EventType mismatch: got %q, want %q", retrievedEvent.EventType, "TestEvent")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify ID is correct
|
||||||
|
if retrievedEvent.ID != "evt-immutable-1" {
|
||||||
|
t.Errorf("Event ID mismatch: got %q, want %q", retrievedEvent.ID, "evt-immutable-1")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEventImmutability_NoUpdateMethod verifies that the EventStore interface
|
||||||
|
// has only append, read methods - no Update or Delete methods.
|
||||||
|
func TestEventImmutability_NoUpdateMethod(t *testing.T) {
|
||||||
|
// This test documents that the EventStore interface is append-only.
|
||||||
|
// The interface intentionally provides:
|
||||||
|
// - SaveEvent: append only
|
||||||
|
// - GetEvents: read only
|
||||||
|
// - GetLatestVersion: read only
|
||||||
|
//
|
||||||
|
// To verify this, we demonstrate that any attempt to call non-existent
|
||||||
|
// update/delete methods would be caught at compile time (not runtime).
|
||||||
|
// This is enforced by the interface definition in event.go which does
|
||||||
|
// not include Update, Delete, or Modify methods.
|
||||||
|
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
|
||||||
|
// Compile-time check: these would not compile if we tried them:
|
||||||
|
// store.Update(event) // compile error: no such method
|
||||||
|
// store.Delete(eventID) // compile error: no such method
|
||||||
|
// store.Modify(eventID, newData) // compile error: no such method
|
||||||
|
|
||||||
|
// Only these methods exist:
|
||||||
|
var eventStore aether.EventStore = store
|
||||||
|
if eventStore == nil {
|
||||||
|
t.Fatal("eventStore is nil")
|
||||||
|
}
|
||||||
|
// If we got here, the compile-time checks passed
|
||||||
|
t.Log("EventStore interface enforces append-only semantics by design")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEventImmutability_VersionOnlyGoesUp verifies that versions are monotonically
|
||||||
|
// increasing and attempting to save with a non-increasing version fails.
|
||||||
|
func TestEventImmutability_VersionOnlyGoesUp(t *testing.T) {
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
actorID := "actor-version-check"
|
||||||
|
|
||||||
|
// Save first event with version 1
|
||||||
|
event1 := &aether.Event{
|
||||||
|
ID: "evt-v1",
|
||||||
|
EventType: "Event1",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(event1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent(v1) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to save with same version - should fail
|
||||||
|
event2Same := &aether.Event{
|
||||||
|
ID: "evt-v1-again",
|
||||||
|
EventType: "Event2",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1, // Same version
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event2Same)
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected SaveEvent(same version) to fail, but it succeeded")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to save with lower version - should fail
|
||||||
|
event3Lower := &aether.Event{
|
||||||
|
ID: "evt-v0",
|
||||||
|
EventType: "Event3",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 0, // Lower version
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event3Lower)
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected SaveEvent(lower version) to fail, but it succeeded")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save with next version - should succeed
|
||||||
|
event4Next := &aether.Event{
|
||||||
|
ID: "evt-v2",
|
||||||
|
EventType: "Event4",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 2,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event4Next)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent(v2) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we have exactly 2 events
|
||||||
|
events, err := store.GetEvents(actorID, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(events) != 2 {
|
||||||
|
t.Errorf("expected 2 events, got %d", len(events))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEventImmutability_EventCannotBeDeleted verifies that there is no way to delete
|
||||||
|
// events from the store through the EventStore interface.
|
||||||
|
func TestEventImmutability_EventCannotBeDeleted(t *testing.T) {
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
actorID := "actor-nodelete"
|
||||||
|
|
||||||
|
// Save an event
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: "evt-nodelete",
|
||||||
|
EventType: "ImportantEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{"critical": true},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve it
|
||||||
|
events1, err := store.GetEvents(actorID, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents (1) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(events1) != 1 {
|
||||||
|
t.Fatal("expected 1 event after save")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to delete through interface - this method doesn't exist
|
||||||
|
// store.Delete("evt-nodelete") // compile error: no such method
|
||||||
|
// store.DeleteByActorID(actorID) // compile error: no such method
|
||||||
|
|
||||||
|
// Verify the event is still there (we can't delete it)
|
||||||
|
events2, err := store.GetEvents(actorID, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents (2) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(events2) != 1 {
|
||||||
|
t.Errorf("expected 1 event (should not be deletable), got %d", len(events2))
|
||||||
|
}
|
||||||
|
|
||||||
|
if events2[0].ID != "evt-nodelete" {
|
||||||
|
t.Errorf("event ID changed: got %q, want %q", events2[0].ID, "evt-nodelete")
|
||||||
|
}
|
||||||
|
}
|
||||||
431
store/integration_test.go
Normal file
431
store/integration_test.go
Normal file
@@ -0,0 +1,431 @@
|
|||||||
|
//go:build integration
|
||||||
|
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/nats-io/nats-server/v2/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
func setupNatsServer() (*server.Server, *nats.Conn, func()) {
|
||||||
|
opts := &server.Options{
|
||||||
|
Port: -1,
|
||||||
|
JetStream: true,
|
||||||
|
StoreDir: "/tmp/nats-test-" + time.Now().Format("20060102150405"),
|
||||||
|
}
|
||||||
|
|
||||||
|
s, err := server.NewServer(opts)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to create NATS server:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
go s.Start()
|
||||||
|
if !s.ReadyForConnections(4 * time.Second) {
|
||||||
|
log.Fatal("NATS server failed to start")
|
||||||
|
}
|
||||||
|
|
||||||
|
nc, err := nats.Connect(s.ClientURL())
|
||||||
|
if err != nil {
|
||||||
|
s.Shutdown()
|
||||||
|
log.Fatal("Failed to connect to NATS:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s, nc, func() {
|
||||||
|
nc.Close()
|
||||||
|
s.Shutdown()
|
||||||
|
os.RemoveAll(opts.StoreDir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateVersionCache(t *testing.T) {
|
||||||
|
s, nc, cleanup := setupNatsServer()
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
store, err := NewJetStreamEventStore(nc, "test_update_cache")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create store: %v", err)
|
||||||
|
}
|
||||||
|
defer store.Close(ctx)
|
||||||
|
|
||||||
|
actorID := "test-actor-1"
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
cachedVersion int64
|
||||||
|
newVersion int64
|
||||||
|
expectUpdate bool
|
||||||
|
expectVersion int64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "update when new version is greater",
|
||||||
|
cachedVersion: 5,
|
||||||
|
newVersion: 10,
|
||||||
|
expectUpdate: true,
|
||||||
|
expectVersion: 10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "do not update when new version is equal",
|
||||||
|
cachedVersion: 5,
|
||||||
|
newVersion: 5,
|
||||||
|
expectUpdate: false,
|
||||||
|
expectVersion: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "do not update when new version is less",
|
||||||
|
cachedVersion: 10,
|
||||||
|
newVersion: 5,
|
||||||
|
expectUpdate: false,
|
||||||
|
expectVersion: 10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "update when no cached version exists",
|
||||||
|
cachedVersion: 0,
|
||||||
|
newVersion: 1,
|
||||||
|
expectUpdate: true,
|
||||||
|
expectVersion: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
// Set up cached version
|
||||||
|
store.versions = make(map[string]int64)
|
||||||
|
store.versions[actorID] = tt.cachedVersion
|
||||||
|
|
||||||
|
// Call UpdateVersionCache
|
||||||
|
store.UpdateVersionCache(actorID, tt.newVersion)
|
||||||
|
|
||||||
|
// Verify result
|
||||||
|
if tt.expectUpdate {
|
||||||
|
if version, ok := store.versions[actorID]; !ok {
|
||||||
|
t.Error("Expected version to be updated but it wasn't cached")
|
||||||
|
} else if version != tt.expectVersion {
|
||||||
|
t.Errorf("Expected version %d, got %d", tt.expectVersion, version)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if version, ok := store.versions[actorID]; !ok {
|
||||||
|
t.Error("Expected version to remain cached")
|
||||||
|
} else if version != tt.expectVersion {
|
||||||
|
t.Errorf("Expected version to remain %d, got %d", tt.expectVersion, version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateVersionCache_Concurrent(t *testing.T) {
|
||||||
|
s, nc, cleanup := setupNatsServer()
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
store, err := NewJetStreamEventStore(nc, "test_update_cache_concurrent")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create store: %v", err)
|
||||||
|
}
|
||||||
|
defer store.Close(ctx)
|
||||||
|
|
||||||
|
actorID := "concurrent-actor"
|
||||||
|
store.versions[actorID] = 1
|
||||||
|
|
||||||
|
const numGoroutines = 50
|
||||||
|
const maxVersion = 100
|
||||||
|
|
||||||
|
var done = make(chan struct{})
|
||||||
|
var updates int32
|
||||||
|
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
version := int64(1 + (i % maxVersion))
|
||||||
|
go func(v int64) {
|
||||||
|
store.UpdateVersionCache(actorID, v)
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
default:
|
||||||
|
updates++
|
||||||
|
}
|
||||||
|
}(version)
|
||||||
|
}
|
||||||
|
|
||||||
|
close(done)
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
finalVersion := store.versions[actorID]
|
||||||
|
if finalVersion > maxVersion {
|
||||||
|
t.Errorf("Expected version to be at most %d, got %d", maxVersion, finalVersion)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubscribeToEventStored(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping integration test")
|
||||||
|
}
|
||||||
|
|
||||||
|
s, nc, cleanup := setupNatsServer()
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
store, err := NewJetStreamEventStore(nc, "test_subscribe_event_stored")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create store: %v", err)
|
||||||
|
}
|
||||||
|
defer store.Close(ctx)
|
||||||
|
|
||||||
|
eventBusWithStore := NewNATSEventBusWithBroadcaster(nc, store, "")
|
||||||
|
if eventBusWithStore == nil {
|
||||||
|
t.Fatalf("Failed to create event bus with broadcaster")
|
||||||
|
}
|
||||||
|
defer eventBusWithStore.Stop()
|
||||||
|
|
||||||
|
ch := eventBusWithStore.SubscribeToEventStored("*")
|
||||||
|
if ch == nil {
|
||||||
|
t.Fatal("SubscribeToEventStored returned nil channel")
|
||||||
|
}
|
||||||
|
|
||||||
|
actorID := "subscribe-test-actor"
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{"key": "value"},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
eventBusWithStore.Publish("", event)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case receivedEvent := <-ch:
|
||||||
|
if receivedEvent.EventType != aether.EventTypeEventStored {
|
||||||
|
t.Errorf("Expected EventTypeEventStored, got %s", receivedEvent.EventType)
|
||||||
|
}
|
||||||
|
if receivedEvent.ActorID != actorID {
|
||||||
|
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
|
||||||
|
}
|
||||||
|
data, ok := receivedEvent.Data["actorId"].(string)
|
||||||
|
if !ok || data != actorID {
|
||||||
|
t.Errorf("Expected actorId in data to be %s", actorID)
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("Timeout waiting for EventStored event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCrossNodeBroadcasting_SingleNode(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping integration test")
|
||||||
|
}
|
||||||
|
|
||||||
|
s, nc, cleanup := setupNatsServer()
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
store, err := NewJetStreamEventStore(nc, "test_single_node_broadcast")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create store: %v", err)
|
||||||
|
}
|
||||||
|
defer store.Close(ctx)
|
||||||
|
|
||||||
|
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
|
||||||
|
defer eventBus.Stop()
|
||||||
|
|
||||||
|
actorID := "broadcast-test-actor-1"
|
||||||
|
localCh := eventBus.Subscribe("")
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{"total": 99.99},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
eventBus.Publish("", event)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case receivedEvent := <-localCh:
|
||||||
|
if receivedEvent.EventType != "OrderPlaced" {
|
||||||
|
t.Errorf("Expected OrderPlaced, got %s", receivedEvent.EventType)
|
||||||
|
}
|
||||||
|
if receivedEvent.ActorID != actorID {
|
||||||
|
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("Timeout waiting for broadcast event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCrossNodeBroadcasting_MultiNode(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping integration test")
|
||||||
|
}
|
||||||
|
|
||||||
|
s1, nc1, cleanup1 := setupNatsServer()
|
||||||
|
defer cleanup1()
|
||||||
|
|
||||||
|
s2, nc2, cleanup2 := setupNatsServer()
|
||||||
|
defer cleanup2()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
store1, err := NewJetStreamEventStore(nc1, "test_multi_node_1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create store 1: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
store2, err := NewJetStreamEventStore(nc2, "test_multi_node_2")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create store 2: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
eventBus1 := NewNATSEventBusWithBroadcaster(nc1, store1, "")
|
||||||
|
eventBus2 := NewNATSEventBusWithBroadcaster(nc2, store2, "")
|
||||||
|
defer eventBus1.Stop()
|
||||||
|
defer eventBus2.Stop()
|
||||||
|
|
||||||
|
actorID := "multi-node-actor"
|
||||||
|
receiverCh := eventBus2.Subscribe("")
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: "InventoryReserved",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{"quantity": 5},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
eventBus1.Publish("", event)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case receivedEvent := <-receiverCh:
|
||||||
|
if receivedEvent.EventType != "InventoryReserved" {
|
||||||
|
t.Errorf("Expected InventoryReserved, got %s", receivedEvent.EventType)
|
||||||
|
}
|
||||||
|
if receivedEvent.ActorID != actorID {
|
||||||
|
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
|
||||||
|
}
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("Timeout waiting for cross-node event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping integration test")
|
||||||
|
}
|
||||||
|
|
||||||
|
s, nc, cleanup := setupNatsServer()
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
tenantAStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-a")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create tenant A store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tenantBStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-b")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create tenant B store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tenantAEventBus := NewNATSEventBusWithBroadcaster(nc, tenantAStore, "tenant-a")
|
||||||
|
tenantBEventBus := NewNATSEventBusWithBroadcaster(nc, tenantBStore, "tenant-b")
|
||||||
|
defer tenantAEventBus.Stop()
|
||||||
|
defer tenantBEventBus.Stop()
|
||||||
|
|
||||||
|
tenantACh := tenantAEventBus.Subscribe("tenant-a")
|
||||||
|
tenantBCh := tenantBEventBus.Subscribe("tenant-b")
|
||||||
|
|
||||||
|
actorID := "tenant-actor"
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: "TenantEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{"data": "tenant-a"},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
tenantAEventBus.Publish("tenant-a", event)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case receivedEvent := <-tenantACh:
|
||||||
|
if receivedEvent.EventType != "TenantEvent" {
|
||||||
|
t.Errorf("Expected TenantEvent in tenant A, got %s", receivedEvent.EventType)
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Error("Timeout waiting for tenant A to receive event")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-tenantBCh:
|
||||||
|
t.Error("Tenant B should not receive tenant A's events")
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
// Expected - tenant B should not receive events from tenant A
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateVersionCache_EventStored(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping integration test")
|
||||||
|
}
|
||||||
|
|
||||||
|
s, nc, cleanup := setupNatsServer()
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
store, err := NewJetStreamEventStore(nc, "test_version_cache_eventstored")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
|
||||||
|
defer eventBus.Stop()
|
||||||
|
|
||||||
|
actorID := "version-cache-actor"
|
||||||
|
store.UpdateVersionCache(actorID, 5)
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 10,
|
||||||
|
Data: map[string]interface{}{"test": true},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
eventBus.Publish("", event)
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
storedVersion, err := store.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to get latest version: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if storedVersion != 10 {
|
||||||
|
t.Errorf("Expected version 10, got %d", storedVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheVersion, ok := store.GetCachedVersion(actorID)
|
||||||
|
if !ok {
|
||||||
|
t.Error("Expected version to be in cache")
|
||||||
|
} else if cacheVersion != 10 {
|
||||||
|
t.Errorf("Expected cached version 10, got %d", cacheVersion)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -9,6 +10,7 @@ import (
|
|||||||
|
|
||||||
"git.flowmade.one/flowmade-one/aether"
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Default configuration values for JetStream event store
|
// Default configuration values for JetStream event store
|
||||||
@@ -19,7 +21,14 @@ const (
|
|||||||
|
|
||||||
// JetStreamConfig holds configuration options for JetStreamEventStore
|
// JetStreamConfig holds configuration options for JetStreamEventStore
|
||||||
type JetStreamConfig struct {
|
type JetStreamConfig struct {
|
||||||
// StreamRetention is how long to keep events (default: 1 year)
|
// StreamRetention is how long to keep events (default: 1 year).
|
||||||
|
// JetStream enforces this retention policy at the storage level using a limits-based policy:
|
||||||
|
// - MaxAge: Events older than this duration are automatically deleted
|
||||||
|
// - Storage is file-based (nats.FileStorage) for durability
|
||||||
|
// - Once the retention period expires, events are permanently removed from the stream
|
||||||
|
// This ensures that old events do not consume storage indefinitely.
|
||||||
|
// To keep events indefinitely, set StreamRetention to a very large value or configure
|
||||||
|
// a custom retention policy in the JetStream stream configuration.
|
||||||
StreamRetention time.Duration
|
StreamRetention time.Duration
|
||||||
// ReplicaCount is the number of replicas for high availability (default: 1)
|
// ReplicaCount is the number of replicas for high availability (default: 1)
|
||||||
ReplicaCount int
|
ReplicaCount int
|
||||||
@@ -41,6 +50,21 @@ func DefaultJetStreamConfig() JetStreamConfig {
|
|||||||
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
|
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
|
||||||
// It also implements EventStoreWithErrors to report malformed events during replay.
|
// It also implements EventStoreWithErrors to report malformed events during replay.
|
||||||
//
|
//
|
||||||
|
// ## Immutability Guarantee
|
||||||
|
//
|
||||||
|
// JetStreamEventStore is append-only. Events are stored in a JetStream stream that
|
||||||
|
// is configured with file-based storage (nats.FileStorage) and a retention policy
|
||||||
|
// (nats.LimitsPolicy). The configured MaxAge retention policy ensures that old events
|
||||||
|
// eventually expire, but during their lifetime, events are never modified or deleted
|
||||||
|
// through the EventStore API. Once an event is published to the stream:
|
||||||
|
// - It cannot be updated
|
||||||
|
// - It cannot be deleted before expiration
|
||||||
|
// - It can only be read
|
||||||
|
//
|
||||||
|
// This architectural guarantee, combined with the EventStore interface providing
|
||||||
|
// no Update or Delete methods, ensures events are immutable and suitable as an
|
||||||
|
// audit trail.
|
||||||
|
//
|
||||||
// ## Version Cache Invalidation Strategy
|
// ## Version Cache Invalidation Strategy
|
||||||
//
|
//
|
||||||
// JetStreamEventStore maintains an in-memory cache of actor versions for optimistic
|
// JetStreamEventStore maintains an in-memory cache of actor versions for optimistic
|
||||||
@@ -64,17 +88,13 @@ type JetStreamEventStore struct {
|
|||||||
config JetStreamConfig
|
config JetStreamConfig
|
||||||
mu sync.Mutex // Protects version checks during SaveEvent
|
mu sync.Mutex // Protects version checks during SaveEvent
|
||||||
versions map[string]int64 // actorID -> latest version cache
|
versions map[string]int64 // actorID -> latest version cache
|
||||||
|
broadcaster aether.EventBroadcaster // Optional broadcaster for EventStored events
|
||||||
|
namespace string // Optional namespace for event publishing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
|
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
|
||||||
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
|
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
|
||||||
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
|
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
|
||||||
@@ -130,6 +150,8 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
|
|||||||
streamName: effectiveStreamName,
|
streamName: effectiveStreamName,
|
||||||
config: config,
|
config: config,
|
||||||
versions: make(map[string]int64),
|
versions: make(map[string]int64),
|
||||||
|
broadcaster: nil,
|
||||||
|
namespace: "",
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -143,6 +165,58 @@ func (jes *JetStreamEventStore) GetStreamName() string {
|
|||||||
return jes.streamName
|
return jes.streamName
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewJetStreamEventStoreWithBroadcaster creates a new JetStream-based event store with broadcaster support.
|
||||||
|
// The broadcaster receives EventStored events when events are successfully saved.
|
||||||
|
func NewJetStreamEventStoreWithBroadcaster(natsConn *nats.Conn, streamName string, broadcaster aether.EventBroadcaster, namespace string) (*JetStreamEventStore, error) {
|
||||||
|
config := DefaultJetStreamConfig()
|
||||||
|
if namespace != "" {
|
||||||
|
config.Namespace = namespace
|
||||||
|
}
|
||||||
|
|
||||||
|
js, err := natsConn.JetStream()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply defaults for zero values
|
||||||
|
if config.StreamRetention == 0 {
|
||||||
|
config.StreamRetention = DefaultStreamRetention
|
||||||
|
}
|
||||||
|
if config.ReplicaCount == 0 {
|
||||||
|
config.ReplicaCount = DefaultReplicaCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply namespace prefix to stream name if provided
|
||||||
|
effectiveStreamName := streamName
|
||||||
|
if config.Namespace != "" {
|
||||||
|
effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create or update the stream
|
||||||
|
stream := &nats.StreamConfig{
|
||||||
|
Name: effectiveStreamName,
|
||||||
|
Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)},
|
||||||
|
Storage: nats.FileStorage,
|
||||||
|
Retention: nats.LimitsPolicy,
|
||||||
|
MaxAge: config.StreamRetention,
|
||||||
|
Replicas: config.ReplicaCount,
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = js.AddStream(stream)
|
||||||
|
if err != nil && !strings.Contains(err.Error(), "already exists") {
|
||||||
|
return nil, fmt.Errorf("failed to create stream: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &JetStreamEventStore{
|
||||||
|
js: js,
|
||||||
|
streamName: effectiveStreamName,
|
||||||
|
config: config,
|
||||||
|
versions: make(map[string]int64),
|
||||||
|
broadcaster: broadcaster,
|
||||||
|
namespace: namespace,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// SaveEvent persists an event to JetStream.
|
// SaveEvent persists an event to JetStream.
|
||||||
// Returns VersionConflictError if the event's version is not strictly greater
|
// Returns VersionConflictError if the event's version is not strictly greater
|
||||||
// than the current latest version for the actor.
|
// than the current latest version for the actor.
|
||||||
@@ -150,19 +224,36 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
|||||||
jes.mu.Lock()
|
jes.mu.Lock()
|
||||||
defer jes.mu.Unlock()
|
defer jes.mu.Unlock()
|
||||||
|
|
||||||
// Get current latest version for this actor
|
// Check cache first
|
||||||
currentVersion, err := jes.getLatestVersionLocked(event.ActorID)
|
if version, ok := jes.versions[event.ActorID]; ok {
|
||||||
if err != nil {
|
// Validate version against cached version
|
||||||
return fmt.Errorf("failed to get latest version: %w", err)
|
if event.Version <= version {
|
||||||
}
|
return &aether.VersionConflictError{
|
||||||
|
ActorID: event.ActorID,
|
||||||
// Validate version is strictly greater than current
|
AttemptedVersion: event.Version,
|
||||||
if event.Version <= currentVersion {
|
CurrentVersion: version,
|
||||||
return &aether.VersionConflictError{
|
}
|
||||||
ActorID: event.ActorID,
|
|
||||||
AttemptedVersion: event.Version,
|
|
||||||
CurrentVersion: currentVersion,
|
|
||||||
}
|
}
|
||||||
|
// Version check passed, proceed with publish while holding lock
|
||||||
|
} else {
|
||||||
|
// Cache miss - need to check actual stream
|
||||||
|
// Get current latest version while holding lock to prevent TOCTOU race
|
||||||
|
currentVersion, err := jes.getLatestVersionLocked(event.ActorID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get latest version: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate version is strictly greater than current
|
||||||
|
if event.Version <= currentVersion {
|
||||||
|
return &aether.VersionConflictError{
|
||||||
|
ActorID: event.ActorID,
|
||||||
|
AttemptedVersion: event.Version,
|
||||||
|
CurrentVersion: currentVersion,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update cache with current version
|
||||||
|
jes.versions[event.ActorID] = currentVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serialize event to JSON
|
// Serialize event to JSON
|
||||||
@@ -183,41 +274,57 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
|||||||
return fmt.Errorf("failed to publish event to JetStream: %w", err)
|
return fmt.Errorf("failed to publish event to JetStream: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update version cache
|
// Update version cache after successful publish
|
||||||
jes.versions[event.ActorID] = event.Version
|
jes.versions[event.ActorID] = event.Version
|
||||||
|
|
||||||
|
// Publish EventStored event after successful save (if broadcaster is configured)
|
||||||
|
if jes.broadcaster != nil {
|
||||||
|
jes.publishEventStored(event)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getLatestVersionLocked returns the latest version for an actor.
|
// publishEventStored publishes an EventStored event to the broadcaster.
|
||||||
// Caller must hold jes.mu.
|
// This is called after a successful SaveEvent to notify subscribers.
|
||||||
func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) {
|
//
|
||||||
// Check cache first
|
// EventStored Event Schema:
|
||||||
if version, ok := jes.versions[actorID]; ok {
|
// - EventType: "EventStored" (aether.EventTypeEventStored)
|
||||||
return version, nil
|
// - ActorID: ID of the actor that the original event was about
|
||||||
|
// - Version: version of the stored event
|
||||||
|
// - Data:
|
||||||
|
// - eventId: (string) ID of the stored event
|
||||||
|
// - actorId: (string) ID of the actor
|
||||||
|
// - version: (int64) version of the event
|
||||||
|
// - timestamp: (int64) Unix timestamp of when the event was stored
|
||||||
|
//
|
||||||
|
// Example usage with NATSEventBus:
|
||||||
|
//
|
||||||
|
// eventBus := aether.NewNATSEventBus(natsConn)
|
||||||
|
// store := store.NewJetStreamEventStoreWithBroadcaster(natsConn, "events", eventBus, "")
|
||||||
|
// ch := eventBus.SubscribeToEventStored("*")
|
||||||
|
//
|
||||||
|
// for event := range ch {
|
||||||
|
// actorID := event.Data["actorId"].(string)
|
||||||
|
// version := event.Data["version"].(int64)
|
||||||
|
// store.UpdateVersionCache(actorID, version)
|
||||||
|
// }
|
||||||
|
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
|
||||||
|
eventStored := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: aether.EventTypeEventStored,
|
||||||
|
ActorID: originalEvent.ActorID, // EventStored is about the original actor
|
||||||
|
Version: originalEvent.Version, // Preserve the version of the stored event
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"eventId": originalEvent.ID,
|
||||||
|
"actorId": originalEvent.ActorID,
|
||||||
|
"version": originalEvent.Version,
|
||||||
|
"timestamp": originalEvent.Timestamp.Unix(),
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch from JetStream - use internal method that returns result
|
jes.broadcaster.Publish(jes.namespace, eventStored)
|
||||||
result, err := jes.getEventsWithErrorsInternal(actorID, 0)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(result.Events) == 0 {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
latestVersion := int64(0)
|
|
||||||
for _, event := range result.Events {
|
|
||||||
if event.Version > latestVersion {
|
|
||||||
latestVersion = event.Version
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update cache
|
|
||||||
jes.versions[actorID] = latestVersion
|
|
||||||
|
|
||||||
return latestVersion, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetEvents retrieves all events for an actor since a version.
|
// GetEvents retrieves all events for an actor since a version.
|
||||||
@@ -303,41 +410,96 @@ func (jes *JetStreamEventStore) getEventsWithErrorsInternal(actorID string, from
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLatestVersion returns the latest version for an actor, repopulating cache
|
// GetLatestVersion returns the latest version for an actor in O(1) time.
|
||||||
// with fresh data to ensure consistency even if external processes write to
|
// It uses JetStream's DeliverLast() option to fetch only the last message
|
||||||
// the same JetStream stream.
|
// instead of scanning all events, making this O(1) instead of O(n).
|
||||||
func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) {
|
func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) {
|
||||||
// Hold lock during fetch to prevent race condition with SaveEvent
|
// Create subject filter for this actor
|
||||||
jes.mu.Lock()
|
subject := fmt.Sprintf("%s.events.%s.%s",
|
||||||
defer jes.mu.Unlock()
|
jes.streamName,
|
||||||
|
sanitizeSubject(extractActorType(actorID)),
|
||||||
|
sanitizeSubject(actorID))
|
||||||
|
|
||||||
events, err := jes.GetEvents(actorID, 0)
|
// Create consumer to read only the last message
|
||||||
|
consumer, err := jes.js.PullSubscribe(subject, "", nats.DeliverLast())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, fmt.Errorf("failed to create consumer: %w", err)
|
||||||
|
}
|
||||||
|
defer consumer.Unsubscribe()
|
||||||
|
|
||||||
|
// Fetch only the last message
|
||||||
|
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
|
||||||
|
if err != nil {
|
||||||
|
if err == nats.ErrTimeout {
|
||||||
|
// No messages for this actor, return 0
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return 0, fmt.Errorf("failed to fetch last message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(events) == 0 {
|
if len(msgs) == 0 {
|
||||||
// No events for this actor - ensure cache is cleared
|
// No events for this actor
|
||||||
delete(jes.versions, actorID)
|
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
latestVersion := int64(0)
|
// Parse the last message to get the version
|
||||||
for _, event := range events {
|
var event aether.Event
|
||||||
if event.Version > latestVersion {
|
if err := json.Unmarshal(msgs[0].Data, &event); err != nil {
|
||||||
latestVersion = event.Version
|
return 0, fmt.Errorf("failed to unmarshal last event: %w", err)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Always repopulate cache with the fresh data just fetched
|
msgs[0].Ack()
|
||||||
// This ensures cache is in sync with actual state, whether from local writes
|
return event.Version, nil
|
||||||
// or external writes detected by version comparison
|
|
||||||
jes.versions[actorID] = latestVersion
|
|
||||||
|
|
||||||
return latestVersion, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLatestSnapshot gets the most recent snapshot for an actor
|
// getLatestVersionLocked is like GetLatestVersion but assumes the caller already holds jes.mu.
|
||||||
|
// This is used internally to avoid releasing and reacquiring the lock during SaveEvent,
|
||||||
|
// which would create a TOCTOU race condition.
|
||||||
|
func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) {
|
||||||
|
// Create subject filter for this actor
|
||||||
|
subject := fmt.Sprintf("%s.events.%s.%s",
|
||||||
|
jes.streamName,
|
||||||
|
sanitizeSubject(extractActorType(actorID)),
|
||||||
|
sanitizeSubject(actorID))
|
||||||
|
|
||||||
|
// Create consumer to read only the last message
|
||||||
|
consumer, err := jes.js.PullSubscribe(subject, "", nats.DeliverLast())
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to create consumer: %w", err)
|
||||||
|
}
|
||||||
|
defer consumer.Unsubscribe()
|
||||||
|
|
||||||
|
// Fetch only the last message
|
||||||
|
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
|
||||||
|
if err != nil {
|
||||||
|
if err == nats.ErrTimeout {
|
||||||
|
// No messages for this actor, return 0
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return 0, fmt.Errorf("failed to fetch last message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(msgs) == 0 {
|
||||||
|
// No events for this actor
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the last message to get the version
|
||||||
|
var event aether.Event
|
||||||
|
if err := json.Unmarshal(msgs[0].Data, &event); err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to unmarshal last event: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs[0].Ack()
|
||||||
|
return event.Version, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLatestSnapshot gets the most recent snapshot for an actor.
|
||||||
|
// Returns an error if no snapshot exists for the actor (unlike GetLatestVersion which returns 0).
|
||||||
|
// This is intentional: a missing snapshot is different from a missing event stream.
|
||||||
|
// If an actor has no events, that's a normal state (use version 0).
|
||||||
|
// If an actor has no snapshot, that could indicate an error or it could be normal
|
||||||
|
// depending on the use case, so we let the caller decide how to handle it.
|
||||||
func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
|
func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
|
||||||
// Create subject for snapshots
|
// Create subject for snapshots
|
||||||
subject := fmt.Sprintf("%s.snapshots.%s.%s",
|
subject := fmt.Sprintf("%s.snapshots.%s.%s",
|
||||||
@@ -355,12 +517,14 @@ func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.Actor
|
|||||||
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
|
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == nats.ErrTimeout {
|
if err == nats.ErrTimeout {
|
||||||
|
// No snapshot found - return error to distinguish from successful nil result
|
||||||
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
|
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("failed to fetch snapshot: %w", err)
|
return nil, fmt.Errorf("failed to fetch snapshot: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(msgs) == 0 {
|
if len(msgs) == 0 {
|
||||||
|
// No snapshot exists for this actor
|
||||||
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
|
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -417,5 +581,43 @@ func sanitizeSubject(s string) string {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateVersionCache updates the version cache for a specific actor.
|
||||||
|
// This is used when receiving events from other nodes via NATS to keep
|
||||||
|
// the version cache consistent across cluster nodes.
|
||||||
|
//
|
||||||
|
// Only updates if the new version is greater than the cached version to prevent
|
||||||
|
// stale cache entries from causing version conflicts.
|
||||||
|
func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) {
|
||||||
|
jes.mu.Lock()
|
||||||
|
defer jes.mu.Unlock()
|
||||||
|
|
||||||
|
// Only update if the new version is greater than cached version
|
||||||
|
if currentVersion, ok := jes.versions[actorID]; !ok || version > currentVersion {
|
||||||
|
jes.versions[actorID] = version
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetCachedVersion returns the cached version for an actor, if available.
|
||||||
|
func (jes *JetStreamEventStore) GetCachedVersion(actorID string) (int64, bool) {
|
||||||
|
jes.mu.Lock()
|
||||||
|
defer jes.mu.Unlock()
|
||||||
|
|
||||||
|
version, ok := jes.versions[actorID]
|
||||||
|
return version, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetBroadcaster sets the event broadcaster for this store.
|
||||||
|
// The broadcaster is used to publish EventStored events when events are saved.
|
||||||
|
func (jes *JetStreamEventStore) SetBroadcaster(broadcaster aether.EventBroadcaster) {
|
||||||
|
jes.mu.Lock()
|
||||||
|
defer jes.mu.Unlock()
|
||||||
|
jes.broadcaster = broadcaster
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the JetStream event store and cleans up resources.
|
||||||
|
func (jes *JetStreamEventStore) Close(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors
|
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors
|
||||||
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)
|
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)
|
||||||
|
|||||||
147
store/jetstream_benchmark_test.go
Normal file
147
store/jetstream_benchmark_test.go
Normal file
@@ -0,0 +1,147 @@
|
|||||||
|
//go:build integration
|
||||||
|
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BenchmarkGetLatestVersion_WithManyEvents benchmarks GetLatestVersion performance
|
||||||
|
// with a large number of events per actor.
|
||||||
|
// This demonstrates the O(1) performance by showing that time doesn't increase
|
||||||
|
// significantly with more events.
|
||||||
|
func BenchmarkGetLatestVersion_WithManyEvents(b *testing.B) {
|
||||||
|
nc := getTestNATSConnection(&testing.T{})
|
||||||
|
if nc == nil {
|
||||||
|
b.Skip("NATS not available")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-getversion-%d", time.Now().UnixNano()))
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("failed to create store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
actorID := "actor-bench-test"
|
||||||
|
|
||||||
|
// Populate with 1000 events
|
||||||
|
for i := 1; i <= 1000; i++ {
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: fmt.Sprintf("evt-%d", i),
|
||||||
|
EventType: "BenchEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: int64(i),
|
||||||
|
Data: map[string]interface{}{"index": i},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("SaveEvent failed for event %d: %v", i, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Benchmark GetLatestVersion
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
_, err := store.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("GetLatestVersion failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
|
|
||||||
|
// BenchmarkGetLatestVersion_NoCache benchmarks GetLatestVersion without cache
|
||||||
|
// to show that even uncached lookups are very fast due to DeliverLast optimization.
|
||||||
|
// A new store instance is created before timing to bypass the version cache.
|
||||||
|
func BenchmarkGetLatestVersion_NoCache(b *testing.B) {
|
||||||
|
nc := getTestNATSConnection(&testing.T{})
|
||||||
|
if nc == nil {
|
||||||
|
b.Skip("NATS not available")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-nocache-%d", time.Now().UnixNano()))
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("failed to create store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
actorID := "actor-bench-nocache"
|
||||||
|
|
||||||
|
// Populate with 1000 events
|
||||||
|
for i := 1; i <= 1000; i++ {
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: fmt.Sprintf("evt-%d", i),
|
||||||
|
EventType: "BenchEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: int64(i),
|
||||||
|
Data: map[string]interface{}{"index": i},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("SaveEvent failed for event %d: %v", i, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new store instance to bypass version cache
|
||||||
|
uncachedStore, err := NewJetStreamEventStore(nc, store.GetStreamName())
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("failed to create uncached store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Benchmark GetLatestVersion without using cache
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
_, err := uncachedStore.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("GetLatestVersion failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
|
|
||||||
|
// BenchmarkGetLatestVersion_SingleEvent benchmarks with minimal data
|
||||||
|
func BenchmarkGetLatestVersion_SingleEvent(b *testing.B) {
|
||||||
|
nc := getTestNATSConnection(&testing.T{})
|
||||||
|
if nc == nil {
|
||||||
|
b.Skip("NATS not available")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-single-%d", time.Now().UnixNano()))
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("failed to create store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
actorID := "actor-single"
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: "evt-1",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
err = store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
_, err := store.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("GetLatestVersion failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
File diff suppressed because it is too large
Load Diff
@@ -2,15 +2,19 @@ package store
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.flowmade.one/flowmade-one/aether"
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
// InMemoryEventStore provides a simple in-memory event store for testing
|
// InMemoryEventStore provides a simple in-memory event store for testing
|
||||||
type InMemoryEventStore struct {
|
type InMemoryEventStore struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
events map[string][]*aether.Event // actorID -> events
|
events map[string][]*aether.Event // actorID -> events
|
||||||
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
||||||
|
broadcaster aether.EventBroadcaster // optional broadcaster for EventStored events
|
||||||
|
namespace string // optional namespace for event publishing
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewInMemoryEventStore creates a new in-memory event store
|
// NewInMemoryEventStore creates a new in-memory event store
|
||||||
@@ -21,9 +25,21 @@ func NewInMemoryEventStore() *InMemoryEventStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewInMemoryEventStoreWithBroadcaster creates a new in-memory event store with an event broadcaster
|
||||||
|
// The broadcaster receives EventStored events when events are successfully saved.
|
||||||
|
func NewInMemoryEventStoreWithBroadcaster(broadcaster aether.EventBroadcaster, namespace string) *InMemoryEventStore {
|
||||||
|
return &InMemoryEventStore{
|
||||||
|
events: make(map[string][]*aether.Event),
|
||||||
|
snapshots: make(map[string][]*aether.ActorSnapshot),
|
||||||
|
broadcaster: broadcaster,
|
||||||
|
namespace: namespace,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SaveEvent saves an event to the in-memory store.
|
// SaveEvent saves an event to the in-memory store.
|
||||||
// Returns VersionConflictError if the event's version is not strictly greater
|
// Returns VersionConflictError if the event's version is not strictly greater
|
||||||
// than the current latest version for the actor.
|
// than the current latest version for the actor.
|
||||||
|
// If a broadcaster is configured, publishes an EventStored event on success.
|
||||||
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||||
es.mu.Lock()
|
es.mu.Lock()
|
||||||
defer es.mu.Unlock()
|
defer es.mu.Unlock()
|
||||||
@@ -51,9 +67,35 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
|||||||
es.events[event.ActorID] = make([]*aether.Event, 0)
|
es.events[event.ActorID] = make([]*aether.Event, 0)
|
||||||
}
|
}
|
||||||
es.events[event.ActorID] = append(es.events[event.ActorID], event)
|
es.events[event.ActorID] = append(es.events[event.ActorID], event)
|
||||||
|
|
||||||
|
// Publish EventStored event after successful save (if broadcaster is configured)
|
||||||
|
if es.broadcaster != nil {
|
||||||
|
es.publishEventStored(event)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// publishEventStored publishes an EventStored event to the broadcaster.
|
||||||
|
// This is called after a successful SaveEvent to notify subscribers.
|
||||||
|
func (es *InMemoryEventStore) publishEventStored(originalEvent *aether.Event) {
|
||||||
|
eventStored := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: aether.EventTypeEventStored,
|
||||||
|
ActorID: originalEvent.ActorID, // EventStored is about the original actor
|
||||||
|
Version: originalEvent.Version, // Preserve the version of the stored event
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"eventId": originalEvent.ID,
|
||||||
|
"actorId": originalEvent.ActorID,
|
||||||
|
"version": originalEvent.Version,
|
||||||
|
"timestamp": originalEvent.Timestamp.Unix(),
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
es.broadcaster.Publish(es.namespace, eventStored)
|
||||||
|
}
|
||||||
|
|
||||||
// GetEvents retrieves events for an actor from a specific version
|
// GetEvents retrieves events for an actor from a specific version
|
||||||
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||||
es.mu.RLock()
|
es.mu.RLock()
|
||||||
|
|||||||
@@ -1905,3 +1905,181 @@ func TestSaveEvent_MetadataPreservedAcrossMultipleEvents(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// === EventStored Publishing Tests ===
|
||||||
|
|
||||||
|
func TestSaveEvent_WithBroadcaster_PublishesEventStored(t *testing.T) {
|
||||||
|
// Create a mock broadcaster to capture published events
|
||||||
|
broadcaster := aether.NewEventBus()
|
||||||
|
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||||
|
|
||||||
|
// Subscribe to EventStored events
|
||||||
|
ch := broadcaster.Subscribe("test-namespace")
|
||||||
|
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: "evt-123",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"total": 100.50,
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save event
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if EventStored was published
|
||||||
|
select {
|
||||||
|
case publishedEvent := <-ch:
|
||||||
|
if publishedEvent == nil {
|
||||||
|
t.Fatal("received nil event from broadcaster")
|
||||||
|
}
|
||||||
|
if publishedEvent.EventType != aether.EventTypeEventStored {
|
||||||
|
t.Errorf("expected EventType %q, got %q", aether.EventTypeEventStored, publishedEvent.EventType)
|
||||||
|
}
|
||||||
|
if publishedEvent.ActorID != "order-456" {
|
||||||
|
t.Errorf("expected ActorID %q, got %q", "order-456", publishedEvent.ActorID)
|
||||||
|
}
|
||||||
|
if publishedEvent.Version != 1 {
|
||||||
|
t.Errorf("expected Version 1, got %d", publishedEvent.Version)
|
||||||
|
}
|
||||||
|
// Check data contains original event info
|
||||||
|
if publishedEvent.Data["eventId"] != "evt-123" {
|
||||||
|
t.Errorf("expected eventId %q, got %q", "evt-123", publishedEvent.Data["eventId"])
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("timeout waiting for EventStored event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSaveEvent_VersionConflict_NoEventStored(t *testing.T) {
|
||||||
|
broadcaster := aether.NewEventBus()
|
||||||
|
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||||
|
|
||||||
|
// Subscribe to EventStored events
|
||||||
|
ch := broadcaster.Subscribe("test-namespace")
|
||||||
|
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||||
|
|
||||||
|
// Save first event
|
||||||
|
event1 := &aether.Event{
|
||||||
|
ID: "evt-1",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(event1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent(event1) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drain the first EventStored event
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("timeout waiting for first EventStored event")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to save event with non-increasing version (should fail)
|
||||||
|
event2 := &aether.Event{
|
||||||
|
ID: "evt-2",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 1, // Same version, should conflict
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event2)
|
||||||
|
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||||
|
t.Fatalf("expected ErrVersionConflict, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify no EventStored event was published
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
t.Fatal("expected no EventStored event, but received one")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
// Expected - no event published
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSaveEvent_MultipleEvents_PublishesMultipleEventStored(t *testing.T) {
|
||||||
|
broadcaster := aether.NewEventBus()
|
||||||
|
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||||
|
|
||||||
|
// Subscribe to EventStored events
|
||||||
|
ch := broadcaster.Subscribe("test-namespace")
|
||||||
|
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||||
|
|
||||||
|
// Save multiple events
|
||||||
|
for i := int64(1); i <= 3; i++ {
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: fmt.Sprintf("evt-%d", i),
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: i,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we received 3 EventStored events in order
|
||||||
|
for i := int64(1); i <= 3; i++ {
|
||||||
|
select {
|
||||||
|
case publishedEvent := <-ch:
|
||||||
|
if publishedEvent == nil {
|
||||||
|
t.Fatal("received nil event from broadcaster")
|
||||||
|
}
|
||||||
|
if publishedEvent.Version != i {
|
||||||
|
t.Errorf("expected Version %d, got %d", i, publishedEvent.Version)
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatalf("timeout waiting for EventStored event %d", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSaveEvent_WithoutBroadcaster_NoPanic(t *testing.T) {
|
||||||
|
// Test that SaveEvent works without a broadcaster (nil broadcaster)
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: "evt-123",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"total": 100.50,
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should not panic even though broadcaster is nil
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify event was saved
|
||||||
|
events, err := store.GetEvents("order-456", 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents failed: %v", err)
|
||||||
|
}
|
||||||
|
if len(events) != 1 {
|
||||||
|
t.Fatalf("expected 1 event, got %d", len(events))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user