1 Commits

Author SHA1 Message Date
dae751a6ef Add EventBroadcaster metrics for observability and debugging
All checks were successful
CI / build (pull_request) Successful in 38s
- Add BroadcasterMetrics interface for reading metrics per namespace
- Add MetricsCollector interface and DefaultMetricsCollector implementation
- Track events_published and events_received counters per namespace
- Track active_subscriptions gauge per namespace
- Track publish_errors, subscribe_errors, and dropped_events counters
- Add MetricsProvider interface for EventBroadcaster implementations
- Integrate metrics tracking into EventBus and NATSEventBus
- Add optional Prometheus integration via PrometheusMetricsAdapter
- Add comprehensive unit tests for metrics functionality

Closes #22

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 19:51:23 +01:00
24 changed files with 1499 additions and 3909 deletions

View File

@@ -1,64 +0,0 @@
# Issue: Implement Actor Migration Between Cluster Nodes
## Problem
When nodes join or leave the cluster, actors need to be migrated to maintain even distribution. Currently:
- `handleRebalanceRequest` in `cluster/manager.go:150` is empty
- `handleMigrationRequest` in `cluster/manager.go:167` is empty
- `RebalanceShards` in `cluster/shard.go:211` returns unchanged map
- `SendMessage` in `cluster/distributed.go:139` ignores sharding
## Required Implementation
### 1. Rebalance Algorithm (cluster/shard.go)
Implement `ConsistentHashPlacement.RebalanceShards` to:
- Calculate new shard assignments based on active nodes
- Identify actors needing migration
- Generate migration plan with source/dest nodes
### 2. Migration Coordinator (cluster/manager.go)
Implement `handleRebalanceRequest` to:
- Accept migration plan from leader
- For each actor in plan:
1. Pause incoming messages
2. Capture actor state (replay events up to current version)
3. Serialize state
4. Send migration request to destination node
5. Wait for ack
6. Delete actor from current node
- Track migration status via `ActorMigration.Status`
### 3. Cross-Node Message Routing (cluster/distributed.go)
Implement proper routing in `SendMessage`:
- Use `GetActorNode(actorID)` to determine target node
- If remote: marshal message, send via NATS to target node
- If local: send to local runtime
- Route response back to caller if needed
## Suggested Approach
1. **Define message types** for actor migration requests/responses in `cluster/types.go`
2. **Implement state capture** - replay events to get current state
3. **Implement state restore** - deserialize and restore actor state
4. **Implement coordinator** - manage migration phases
5. **Add error handling** - handle failed migrations, retries, cleanup
6. **Add tests** - test migration with mock NATS
## Related Files
- `cluster/manager.go:150` - handleRebalanceRequest (empty)
- `cluster/manager.go:167` - handleMigrationRequest (empty)
- `cluster/shard.go:211` - RebalanceShards (stub)
- `cluster/distributed.go:139` - SendMessage (simplified)
- `cluster/types.go:108` - ActorMigration struct
## Acceptance Criteria
- [ ] `RebalanceShards` returns new shard map with actor assignments
- [ ] `handleRebalanceRequest` processes migration plan
- [ ] `handleMigrationRequest` accepts actor migrations
- [ ] `SendMessage` routes to correct node
- [ ] Actors can be migrated with state preserved
- [ ] Failed migrations are handled gracefully
- [ ] Integration test with multi-node cluster

View File

@@ -1,117 +0,0 @@
# Issue: Add Snapshot Support to Event Sourcing Workflow
## Problem
`SnapshotStore` interface is defined but snapshots are not integrated into the event sourcing workflow. This means:
- Actors with many events must replay entire history
- No performance optimization for long-lived actors
- Snapshots exist as API but are not used
## Current State
- `EventStoreWithErrors` in `event.go:235` - no snapshot methods
- `SnapshotStore` interface in `event.go:245` - defined but not widely used
- `JetStreamEventStore.GetLatestSnapshot` and `SaveSnapshot` implemented but not called automatically
- `InMemoryEventStore` has snapshot methods but no lifecycle management
## Required Implementation
### 1. Snapshot Strategy
Define when to create snapshots:
- Fixed interval (e.g., every 100 events)
- Version-based (e.g., every 50 versions)
- Hybrid: version-based with min/max bounds
### 2. State Capture
Add method to capture actor state:
```go
// CaptureState rebuilds actor state by replaying events and returns it
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
```
### 3. Snapshot Store Extension
Extend `EventStoreWithErrors` to include snapshots:
```go
type EventStoreWithSnapshots interface {
EventStoreWithErrors
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
SaveSnapshot(snapshot *ActorSnapshot) error
}
```
### 4. Snapshot Workflow
Modify event retrieval to use snapshots:
```go
GetEvents(actorID string, fromVersion int64) ([]*Event, error) {
// 1. Try to get latest snapshot
snapshot, _ := store.GetLatestSnapshot(actorID)
// 2. If snapshot exists and version <= fromVersion:
// - Return events from snapshot version + 1
// 3. Else:
// - Replay all events from version 0
}
```
## Suggested Implementation
### 1. Add CaptureState to EventStore interface
In `event.go`, extend `EventStore` or create `StateStore` interface:
```go
type StateStore interface {
EventStore
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
}
```
### 2. Implement CaptureState
In `store/jetstream.go`:
```go
func (jes *JetStreamEventStore) CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) {
// Replay events and build state (application logic needed here)
events, _ := jes.GetEvents(actorID, fromVersion)
// Need application logic to convert events to state
return state, nil
}
```
### 3. Add Snapshot Helper
Create snapshot utilities:
```go
// CreateSnapshot creates snapshot from state
func CreateSnapshot(actorID string, version int64, state map[string]interface{}) *ActorSnapshot {
return &ActorSnapshot{
ActorID: actorID,
Version: version,
State: state,
Timestamp: time.Now(),
}
}
```
### 4. Modify GetEvents
Update `GetEvents` in both stores to use snapshots when beneficial.
## Snapshots Workflow Example
```
1. Actor has 1000 events
2. Every 100 events, create snapshot
3. Actor reaches version 1000, snapshot at version 1000
4. Request events from version 900:
- Get snapshot at version 1000? No (version too high)
- Replay 900->1000 events (only 100 events)
5. Request events from version 50:
- Get latest snapshot at version 1000? Yes (version > 50)
- Use snapshot as base
- Replay 1000->1000 events (none)
```
## Acceptance Criteria
- [ ] `CaptureState` method added to event store
- [ ] Snapshots created at configured intervals
- [ ] `GetEvents` uses snapshots to optimize replay
- [ ] Snapshot workflow tested with long-lived actors
- [ ] Configuration for snapshot interval/version
- [ ] Metrics: snapshot count, average replay size

View File

@@ -1,100 +0,0 @@
# Issue: Implement VM/Runtime for Actors
## Problem
Only interfaces exist for `Runtime` and `VirtualMachine` in `cluster/types.go` and `cluster/distributed.go`, but no actual implementation. Actors cannot be created, started, stopped, or have their state managed.
## Required Components
### 1. VM Implementation (cluster/vm.go - new)
```go
type VirtualMachine struct {
actorID string
eventStore aether.EventStore
state map[string]interface{}
version int64
}
```
Methods needed:
- `GetID()`, `GetActorID()`, `GetState()` - already in interface
- `Start()` - replay events to rebuild state
- `ProcessEvent(event *aether.Event)` - apply event to state
- `Stop()` - persist final state
- `GetVersion()` - current event version
### 2. Runtime Implementation (cluster/runtime.go - new)
```go
type Runtime struct {
natsConn *nats.Conn
eventStore aether.EventStore
vmRegistry VMRegistry // map[actorID]*VirtualMachine
config RuntimeConfig
}
```
Methods needed:
- `Start()` - initialize and start processing
- `LoadModel(model eventstorming.Model)` - register domain types
- `SendMessage(message RuntimeMessage)` - route to appropriate VM
- `GetActiveVMs()` - return map of active VMs
- `CreateVM(actorID string)` - create new VM instance
- `StopVM(actorID string)` - persist and stop VM
### 3. Event Processing
- Subscribe to actor's event stream
- Replay events to build initial state
- Apply new events as they arrive
- Handle event versions and conflicts
## Suggested Design
### VM Lifecycle
```
1. Actor message arrives for actor-123
2. Runtime checks if VM exists for actor-123
3. If not, create VM:
- Replay events from event store
- Rebuild state
4. Route message to VM
5. VM processes message -> creates new events
6. Events persisted to event store
7. VM state updated
```
### State Management
- State derived from event replay
- No separate state store needed
- Can snapshot periodically for performance
- Version conflict handling using existing EventStore
## Implementation Steps
1. **Create VM struct** in `cluster/vm.go`
2. **Implement event replay** to rebuild state
3. **Create Runtime** in `cluster/runtime.go`
4. **Register Runtime with cluster** via `SetVMProvider`
5. **Implement message processing** - validate against model
6. **Add version conflict handling** using existing EventStore
7. **Write tests** - mock event store, test state transitions
## File Structure
```
cluster/
├── vm.go # VirtualMachine implementation
├── runtime.go # Runtime implementation
├── vm_test.go # VM tests
├── runtime_test.go # Runtime tests
└── integration_test.go # Integration tests
```
## Acceptance Criteria
- [ ] VM can be created with actor ID
- [ ] VM replays events to build state
- [ ] VM processes events and updates state
- [ ] VM persists current version
- [ ] Runtime can create/stop VMs
- [ ] Runtime manages VM registry
- [ ] Integration test with NATS and JetStream

106
AGENTS.md
View File

@@ -1,106 +0,0 @@
# Aether
**Distributed event sourcing primitives for Go, powered by NATS.**
---
## Development Commands
```bash
make build # go build ./...
make test # go test ./...
make lint # golangci-lint run
make clean # go clean
```
## NATS Server Requirement
Integration tests require NATS with JetStream enabled:
```bash
brew install nats-server
nats-server -js
```
Run tests in a separate terminal after starting NATS.
## Project Structure
```
aether/
├── event.go # Event, ActorSnapshot, EventStore interface
├── eventbus.go # EventBus, EventBroadcaster interface
├── nats_eventbus.go # NATSEventBus implementation
├── metrics*.go # Prometheus metrics
├── store/ # EventStore implementations
│ ├── memory.go # InMemoryEventStore (testing)
│ └── jetstream.go # JetStreamEventStore (production)
├── cluster/ # Cluster management
│ ├── manager.go # ClusterManager
│ ├── discovery.go # NodeDiscovery
│ ├── hashring.go # ConsistentHashRing
│ ├── shard.go # ShardManager
│ ├── leader.go # LeaderElection
│ └── types.go # Cluster types
├── examples/ # Usage examples
└── eventstorming/ # Domain modeling reference
```
## Core Patterns
### Event Versioning
Events for each actor must have monotonically increasing versions:
```go
currentVersion, _ := store.GetLatestVersion(actorID)
event := &aether.Event{
ActorID: actorID,
Version: currentVersion + 1,
// ...
}
err := store.SaveEvent(event)
if errors.Is(err, aether.ErrVersionConflict) {
// Reload and retry
}
```
### Namespace Isolation
Namespaces provide logical boundaries for events:
```go
// Event bus namespace
ch := eventBus.Subscribe("tenant-abc")
eventBus.Publish("tenant-abc", event)
// Store namespace
store, _ := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-abc")
```
Namespaces sanitize special characters and prefix stream names for complete data isolation.
### JetStream Cache Behavior
`JetStreamEventStore` caches actor versions for performance. Cache is invalidated when `GetLatestVersion` detects a newer version from external writes.
## Testing
- Unit tests: `go test -v ./...`
- Single test: `go test -v -run TestName`
- Single file: `go test -v ./store/...`
- Benchmarks: `go test -bench=. -benchmem`
Integration tests require running NATS server first.
## Linting
```bash
golangci-lint run
golangci-lint run --fix
```
## References
- [vision.md](./vision.md) - Product vision and principles
- [examples/README.md](./examples/README.md) - Example patterns

194
CLAUDE.md Normal file
View File

@@ -0,0 +1,194 @@
# Aether
Distributed actor system with event sourcing for Go, powered by NATS.
## Organization Context
This repo is part of Flowmade. See:
- [Organization manifesto](https://git.flowmade.one/flowmade-one/architecture/src/branch/main/manifesto.md) - who we are, what we believe
- [Repository map](https://git.flowmade.one/flowmade-one/architecture/src/branch/main/repos.md) - how this fits in the bigger picture
- [Vision](./vision.md) - what this specific product does
## Setup
```bash
git clone git@git.flowmade.one:flowmade-one/aether.git
cd aether
go mod download
```
Requires NATS server for integration tests:
```bash
# Install NATS
brew install nats-server
# Run with JetStream enabled
nats-server -js
```
## Project Structure
```
aether/
├── event.go # Event, ActorSnapshot, EventStore interface
├── eventbus.go # EventBus, EventBroadcaster interface
├── nats_eventbus.go # NATSEventBus - cross-node event broadcasting
├── store/
│ ├── memory.go # InMemoryEventStore (testing)
│ └── jetstream.go # JetStreamEventStore (production)
├── cluster/
│ ├── manager.go # ClusterManager
│ ├── discovery.go # NodeDiscovery
│ ├── hashring.go # ConsistentHashRing
│ ├── shard.go # ShardManager
│ ├── leader.go # LeaderElection
│ └── types.go # Cluster types
└── model/
└── model.go # EventStorming model types
```
## Development
```bash
make build # Build the library
make test # Run tests
make lint # Run linters
```
## Architecture
### Event Sourcing
Events are the source of truth. State is derived by replaying events.
```go
// Create an event
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: "order-123",
Version: 1,
Data: map[string]interface{}{"total": 100.00},
Timestamp: time.Now(),
}
// Persist to event store
store.SaveEvent(event)
// Replay events to rebuild state
events, _ := store.GetEvents("order-123", 0)
```
### Event Versioning
Events for each actor must have **monotonically increasing versions**. This ensures event stream integrity and enables optimistic concurrency control.
#### Version Semantics
- Each actor has an independent version sequence
- Version must be strictly greater than the current latest version
- For new actors (no events), the first event must have version > 0
- Non-consecutive versions are allowed (gaps are permitted)
#### Optimistic Concurrency Pattern
```go
// 1. Get current version
currentVersion, _ := store.GetLatestVersion("order-123")
// 2. Create event with next version
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderUpdated",
ActorID: "order-123",
Version: currentVersion + 1,
Data: map[string]interface{}{"status": "shipped"},
Timestamp: time.Now(),
}
// 3. Attempt to save
err := store.SaveEvent(event)
if errors.Is(err, aether.ErrVersionConflict) {
// Another writer won - reload and retry if appropriate
var versionErr *aether.VersionConflictError
errors.As(err, &versionErr)
log.Printf("Conflict: actor %s has version %d, attempted %d",
versionErr.ActorID, versionErr.CurrentVersion, versionErr.AttemptedVersion)
}
```
#### Error Types
- `ErrVersionConflict` - Sentinel error for version conflicts (use with `errors.Is`)
- `VersionConflictError` - Detailed error with ActorID, CurrentVersion, and AttemptedVersion
### Namespace Isolation
Namespaces provide logical boundaries for events and subscriptions.
#### Event Bus Namespaces
The event bus supports namespace-scoped pub/sub:
```go
// Subscribe to events in a namespace
ch := eventBus.Subscribe("tenant-abc")
// Events are isolated per namespace
eventBus.Publish("tenant-abc", event) // Only tenant-abc subscribers see this
```
#### Namespace-Scoped Event Stores
JetStreamEventStore supports optional namespace prefixes for complete storage isolation:
```go
// Create a namespaced event store (convenience function)
store, err := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-abc")
// Or configure via JetStreamConfig
config := store.JetStreamConfig{
Namespace: "tenant-abc",
StreamRetention: 30 * 24 * time.Hour,
ReplicaCount: 3,
}
store, err := store.NewJetStreamEventStoreWithConfig(natsConn, "events", config)
// The actual stream name becomes "tenant-abc_events"
// Events from one namespace cannot be read from another namespace's store
```
Namespace isolation at the storage level ensures:
- **Complete data isolation**: Events stored with one namespace prefix are invisible to stores with different namespaces
- **Backward compatibility**: Empty namespace (default) works exactly as before
- **Safe characters**: Namespace names are sanitized (spaces, dots, wildcards become underscores)
Use namespace-scoped stores when you need strong isolation guarantees at the persistence layer, such as:
- Multi-tenant deployments where tenant data must be completely separated
- Logical boundaries between different domains or bounded contexts
- Test isolation in integration tests
### Clustering
Aether handles node discovery, leader election, and shard distribution:
```go
// Create cluster manager
manager := cluster.NewClusterManager(natsConn, nodeID)
// Join cluster
manager.Start()
// Leader election happens automatically
if manager.IsLeader() {
// Coordinate shard assignments
}
```
## Key Patterns
- **Events are immutable** - Never modify, only append
- **Versions are monotonic** - Each event must have version > previous for same actor
- **Snapshots for performance** - Periodically snapshot state to avoid full replay
- **Namespaces for isolation** - Not multi-tenancy, just logical boundaries
- **NATS for everything** - Events, pub/sub, clustering all use NATS

View File

@@ -107,34 +107,7 @@ Order state after replaying 2 events:
### Events are immutable
Events represent facts about what happened. Once saved, they are never modified or deleted - you only append new events. This immutability guarantee is enforced at multiple levels:
**Interface Design**: The `EventStore` interface provides no Update or Delete methods. Only `SaveEvent` (append), `GetEvents` (read), and `GetLatestVersion` (read) are available.
**JetStream Storage**: When using `JetStreamEventStore`, events are stored in a NATS JetStream stream configured with:
- File-based storage (durable)
- Limits-based retention policy (events expire after configured duration, not before)
- No mechanism to modify or delete individual events during their lifetime
**Audit Trail Guarantee**: Because events are immutable once persisted, they serve as a trustworthy audit trail. You can rely on the fact that historical events won't change, enabling compliance and forensics.
To correct a mistake, append a new event that expresses the correction rather than modifying history:
```go
// Wrong: Cannot update an event
// store.UpdateEvent(eventID, newData) // This method doesn't exist
// Right: Append a new event that corrects the record
correctionEvent := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderCorrected",
ActorID: orderID,
Version: currentVersion + 1,
Data: map[string]interface{}{"reason": "price adjustment"},
Timestamp: time.Now(),
}
err := store.SaveEvent(correctionEvent)
```
Events represent facts about what happened. Once saved, they are never modified - you only append new events.
### State is derived

View File

@@ -1,713 +0,0 @@
package cluster
import (
"fmt"
"testing"
)
func TestNewShardManager(t *testing.T) {
sm := NewShardManager(16, 3)
if sm == nil {
t.Fatal("NewShardManager returned nil")
}
if sm.shardCount != 16 {
t.Errorf("expected shardCount 16, got %d", sm.shardCount)
}
if sm.replication != 3 {
t.Errorf("expected replication 3, got %d", sm.replication)
}
if sm.shardMap == nil {
t.Error("shardMap is nil")
}
if sm.placement == nil {
t.Error("placement strategy is nil")
}
}
func TestNewShardManager_DefaultsForZeroValues(t *testing.T) {
sm := NewShardManagerWithConfig(ShardConfig{})
if sm.shardCount != DefaultNumShards {
t.Errorf("expected default shardCount %d, got %d", DefaultNumShards, sm.shardCount)
}
if sm.replication != 1 {
t.Errorf("expected default replication 1, got %d", sm.replication)
}
}
func TestNewShardManagerWithConfig_CustomValues(t *testing.T) {
config := ShardConfig{
ShardCount: 256,
ReplicationFactor: 2,
}
sm := NewShardManagerWithConfig(config)
if sm.shardCount != 256 {
t.Errorf("expected shardCount 256, got %d", sm.shardCount)
}
if sm.replication != 2 {
t.Errorf("expected replication 2, got %d", sm.replication)
}
}
func TestGetShard_ReturnsCorrectShardForActor(t *testing.T) {
sm := NewShardManager(16, 1)
// Test that GetShard returns consistent results
actorID := "actor-123"
shard1 := sm.GetShard(actorID)
shard2 := sm.GetShard(actorID)
if shard1 != shard2 {
t.Errorf("GetShard not consistent: got %d and %d for same actor", shard1, shard2)
}
// Verify shard is within valid range
if shard1 < 0 || shard1 >= 16 {
t.Errorf("shard %d is out of range [0, 16)", shard1)
}
}
func TestGetShard_DifferentActorsCanMapToDifferentShards(t *testing.T) {
sm := NewShardManager(16, 1)
// With enough actors, we should see different shards
shardsSeen := make(map[int]bool)
for i := 0; i < 100; i++ {
actorID := fmt.Sprintf("actor-%d", i)
shard := sm.GetShard(actorID)
shardsSeen[shard] = true
}
// We should see multiple different shards
if len(shardsSeen) < 2 {
t.Errorf("expected multiple different shards, got %d unique shards", len(shardsSeen))
}
}
func TestGetShard_DistributesActorsAcrossShards(t *testing.T) {
sm := NewShardManager(16, 1)
distribution := make(map[int]int)
numActors := 1000
for i := 0; i < numActors; i++ {
actorID := fmt.Sprintf("actor-%d", i)
shard := sm.GetShard(actorID)
distribution[shard]++
}
// Verify all shards are within valid range
for shard := range distribution {
if shard < 0 || shard >= 16 {
t.Errorf("shard %d is out of range [0, 16)", shard)
}
}
// With good hashing, we should see fairly even distribution
expectedPerShard := numActors / 16
for shard, count := range distribution {
deviation := float64(count-expectedPerShard) / float64(expectedPerShard)
if deviation > 0.5 || deviation < -0.5 {
t.Logf("shard %d has %d actors (%.1f%% deviation)", shard, count, deviation*100)
}
}
}
func TestGetShardNodes_EmptyShard(t *testing.T) {
sm := NewShardManager(16, 1)
nodes := sm.GetShardNodes(0)
if nodes == nil {
t.Error("GetShardNodes returned nil, expected empty slice")
}
if len(nodes) != 0 {
t.Errorf("expected empty slice for unassigned shard, got %v", nodes)
}
}
func TestGetShardNodes_ReturnsAssignedNodes(t *testing.T) {
sm := NewShardManager(16, 3)
// Assign nodes to shard
sm.AssignShard(0, []string{"node-1", "node-2", "node-3"})
nodes := sm.GetShardNodes(0)
if len(nodes) != 3 {
t.Errorf("expected 3 nodes, got %d", len(nodes))
}
if nodes[0] != "node-1" || nodes[1] != "node-2" || nodes[2] != "node-3" {
t.Errorf("unexpected nodes: %v", nodes)
}
}
func TestGetShardNodes_NonExistentShard(t *testing.T) {
sm := NewShardManager(16, 1)
// Query a shard that has no assignments
nodes := sm.GetShardNodes(999)
if len(nodes) != 0 {
t.Errorf("expected empty slice for non-existent shard, got %v", nodes)
}
}
func TestAssignShard_CreatesNewAssignment(t *testing.T) {
sm := NewShardManager(16, 1)
sm.AssignShard(5, []string{"node-a"})
nodes := sm.GetShardNodes(5)
if len(nodes) != 1 || nodes[0] != "node-a" {
t.Errorf("expected [node-a], got %v", nodes)
}
}
func TestAssignShard_UpdatesExistingAssignment(t *testing.T) {
sm := NewShardManager(16, 1)
sm.AssignShard(5, []string{"node-a"})
sm.AssignShard(5, []string{"node-b", "node-c"})
nodes := sm.GetShardNodes(5)
if len(nodes) != 2 {
t.Errorf("expected 2 nodes, got %d", len(nodes))
}
if nodes[0] != "node-b" || nodes[1] != "node-c" {
t.Errorf("expected [node-b, node-c], got %v", nodes)
}
}
func TestAssignShard_MultipleShards(t *testing.T) {
sm := NewShardManager(16, 1)
sm.AssignShard(0, []string{"node-1"})
sm.AssignShard(1, []string{"node-2"})
sm.AssignShard(2, []string{"node-3"})
if nodes := sm.GetShardNodes(0); len(nodes) != 1 || nodes[0] != "node-1" {
t.Errorf("shard 0: expected [node-1], got %v", nodes)
}
if nodes := sm.GetShardNodes(1); len(nodes) != 1 || nodes[0] != "node-2" {
t.Errorf("shard 1: expected [node-2], got %v", nodes)
}
if nodes := sm.GetShardNodes(2); len(nodes) != 1 || nodes[0] != "node-3" {
t.Errorf("shard 2: expected [node-3], got %v", nodes)
}
}
func TestGetPrimaryNode(t *testing.T) {
sm := NewShardManager(16, 3)
sm.AssignShard(0, []string{"primary", "replica1", "replica2"})
primary := sm.GetPrimaryNode(0)
if primary != "primary" {
t.Errorf("expected 'primary', got %q", primary)
}
}
func TestGetPrimaryNode_EmptyShard(t *testing.T) {
sm := NewShardManager(16, 1)
primary := sm.GetPrimaryNode(0)
if primary != "" {
t.Errorf("expected empty string for unassigned shard, got %q", primary)
}
}
func TestGetReplicaNodes(t *testing.T) {
sm := NewShardManager(16, 3)
sm.AssignShard(0, []string{"primary", "replica1", "replica2"})
replicas := sm.GetReplicaNodes(0)
if len(replicas) != 2 {
t.Errorf("expected 2 replicas, got %d", len(replicas))
}
if replicas[0] != "replica1" || replicas[1] != "replica2" {
t.Errorf("expected [replica1, replica2], got %v", replicas)
}
}
func TestGetReplicaNodes_SingleNode(t *testing.T) {
sm := NewShardManager(16, 1)
sm.AssignShard(0, []string{"only-node"})
replicas := sm.GetReplicaNodes(0)
if len(replicas) != 0 {
t.Errorf("expected no replicas for single-node shard, got %v", replicas)
}
}
func TestGetReplicaNodes_EmptyShard(t *testing.T) {
sm := NewShardManager(16, 1)
replicas := sm.GetReplicaNodes(0)
if len(replicas) != 0 {
t.Errorf("expected empty slice for unassigned shard, got %v", replicas)
}
}
func TestPlaceActor_NoNodes(t *testing.T) {
sm := NewShardManager(16, 1)
_, err := sm.PlaceActor("actor-1", map[string]*NodeInfo{})
if err == nil {
t.Error("expected error when no nodes available")
}
}
func TestPlaceActor_SingleNode(t *testing.T) {
sm := NewShardManager(16, 1)
nodes := map[string]*NodeInfo{
"node-1": {ID: "node-1", Status: NodeStatusActive},
}
nodeID, err := sm.PlaceActor("actor-1", nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if nodeID != "node-1" {
t.Errorf("expected node-1, got %q", nodeID)
}
}
func TestPlaceActor_ReturnsValidNode(t *testing.T) {
sm := NewShardManager(16, 1)
nodes := map[string]*NodeInfo{
"node-1": {ID: "node-1", Status: NodeStatusActive},
"node-2": {ID: "node-2", Status: NodeStatusActive},
"node-3": {ID: "node-3", Status: NodeStatusActive},
}
// PlaceActor should always return one of the available nodes
for i := 0; i < 100; i++ {
nodeID, err := sm.PlaceActor(fmt.Sprintf("actor-%d", i), nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if _, exists := nodes[nodeID]; !exists {
t.Errorf("PlaceActor returned invalid node: %q", nodeID)
}
}
}
func TestPlaceActor_DistributesAcrossNodes(t *testing.T) {
sm := NewShardManager(16, 1)
nodes := map[string]*NodeInfo{
"node-1": {ID: "node-1", Status: NodeStatusActive},
"node-2": {ID: "node-2", Status: NodeStatusActive},
"node-3": {ID: "node-3", Status: NodeStatusActive},
}
distribution := make(map[string]int)
for i := 0; i < 100; i++ {
nodeID, _ := sm.PlaceActor(fmt.Sprintf("actor-%d", i), nodes)
distribution[nodeID]++
}
// Should use multiple nodes
if len(distribution) < 2 {
t.Errorf("expected distribution across multiple nodes, got %v", distribution)
}
}
func TestUpdateShardMap(t *testing.T) {
sm := NewShardManager(16, 1)
newMap := &ShardMap{
Version: 5,
Shards: map[int][]string{
0: {"node-a", "node-b"},
1: {"node-c"},
},
Nodes: map[string]NodeInfo{
"node-a": {ID: "node-a"},
"node-b": {ID: "node-b"},
"node-c": {ID: "node-c"},
},
}
sm.UpdateShardMap(newMap)
result := sm.GetShardMap()
if result.Version != 5 {
t.Errorf("expected version 5, got %d", result.Version)
}
if len(result.Shards[0]) != 2 {
t.Errorf("expected 2 nodes for shard 0, got %d", len(result.Shards[0]))
}
}
func TestGetShardMap_ReturnsDeepCopy(t *testing.T) {
sm := NewShardManager(16, 1)
sm.AssignShard(0, []string{"node-1", "node-2"})
copy1 := sm.GetShardMap()
copy2 := sm.GetShardMap()
// Modify copy1
copy1.Shards[0][0] = "modified"
copy1.Version = 999
// copy2 should be unaffected
if copy2.Shards[0][0] == "modified" {
t.Error("GetShardMap did not return a deep copy (shard nodes modified)")
}
if copy2.Version == 999 {
t.Error("GetShardMap did not return a deep copy (version modified)")
}
// Original should be unaffected
nodes := sm.GetShardNodes(0)
if nodes[0] == "modified" {
t.Error("original shard map was modified through copy")
}
}
func TestGetShardCount(t *testing.T) {
sm := NewShardManager(64, 1)
if sm.GetShardCount() != 64 {
t.Errorf("expected 64, got %d", sm.GetShardCount())
}
}
func TestGetReplicationFactor(t *testing.T) {
sm := NewShardManager(16, 3)
if sm.GetReplicationFactor() != 3 {
t.Errorf("expected 3, got %d", sm.GetReplicationFactor())
}
}
func TestRebalanceShards_NoPlacementStrategy(t *testing.T) {
sm := NewShardManager(16, 1)
sm.placement = nil // Remove placement strategy
_, err := sm.RebalanceShards(map[string]*NodeInfo{})
if err == nil {
t.Error("expected error when no placement strategy configured")
}
}
func TestRebalanceShards_WithNodes(t *testing.T) {
sm := NewShardManager(16, 1)
nodes := map[string]*NodeInfo{
"node-1": {ID: "node-1", Status: NodeStatusActive},
"node-2": {ID: "node-2", Status: NodeStatusActive},
}
result, err := sm.RebalanceShards(nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if result == nil {
t.Error("expected non-nil result")
}
}
// Test shard assignment with node failures
func TestShardAssignment_NodeFailure(t *testing.T) {
sm := NewShardManager(16, 3)
// Initial assignment with 3 replicas
sm.AssignShard(0, []string{"node-1", "node-2", "node-3"})
// Simulate node failure by reassigning without the failed node
sm.AssignShard(0, []string{"node-1", "node-3"})
nodes := sm.GetShardNodes(0)
if len(nodes) != 2 {
t.Errorf("expected 2 nodes after failure, got %d", len(nodes))
}
// Verify primary is still correct
primary := sm.GetPrimaryNode(0)
if primary != "node-1" {
t.Errorf("expected node-1 as primary, got %q", primary)
}
// Verify replica count
replicas := sm.GetReplicaNodes(0)
if len(replicas) != 1 || replicas[0] != "node-3" {
t.Errorf("expected [node-3] as replicas, got %v", replicas)
}
}
func TestShardAssignment_AllNodesFailExceptOne(t *testing.T) {
sm := NewShardManager(16, 3)
sm.AssignShard(0, []string{"node-1", "node-2", "node-3"})
// Simulate all but one node failing
sm.AssignShard(0, []string{"node-3"})
nodes := sm.GetShardNodes(0)
if len(nodes) != 1 || nodes[0] != "node-3" {
t.Errorf("expected [node-3], got %v", nodes)
}
primary := sm.GetPrimaryNode(0)
if primary != "node-3" {
t.Errorf("expected node-3 as primary, got %q", primary)
}
replicas := sm.GetReplicaNodes(0)
if len(replicas) != 0 {
t.Errorf("expected no replicas, got %v", replicas)
}
}
// Test replication factor is respected
func TestReplicationFactor_Respected(t *testing.T) {
sm := NewShardManager(16, 3)
if sm.GetReplicationFactor() != 3 {
t.Errorf("expected replication factor 3, got %d", sm.GetReplicationFactor())
}
// Assign with exactly the replication factor
sm.AssignShard(0, []string{"node-1", "node-2", "node-3"})
nodes := sm.GetShardNodes(0)
if len(nodes) != 3 {
t.Errorf("expected 3 nodes matching replication factor, got %d", len(nodes))
}
}
func TestReplicationFactor_CanExceed(t *testing.T) {
// Note: ShardManager doesn't enforce max replication, it just tracks what's assigned
sm := NewShardManager(16, 2)
// Assign more nodes than replication factor
sm.AssignShard(0, []string{"node-1", "node-2", "node-3", "node-4"})
nodes := sm.GetShardNodes(0)
if len(nodes) != 4 {
t.Errorf("expected 4 nodes, got %d", len(nodes))
}
}
func TestReplicationFactor_LessThanFactor(t *testing.T) {
sm := NewShardManager(16, 3)
// Assign fewer nodes than replication factor (possible during degraded state)
sm.AssignShard(0, []string{"node-1"})
nodes := sm.GetShardNodes(0)
if len(nodes) != 1 {
t.Errorf("expected 1 node, got %d", len(nodes))
}
// System should track that we're under-replicated
// (in practice, cluster manager would handle this)
}
// Mock VM registry for testing GetActorsInShard
type mockVMRegistry struct {
activeVMs map[string]VirtualMachine
}
func (m *mockVMRegistry) GetActiveVMs() map[string]VirtualMachine {
return m.activeVMs
}
func (m *mockVMRegistry) GetShard(actorID string) int {
// This would use the same logic as ShardManager
return 0
}
type mockVM struct {
id string
actorID string
state VMState
}
func (m *mockVM) GetID() string { return m.id }
func (m *mockVM) GetActorID() string { return m.actorID }
func (m *mockVM) GetState() VMState { return m.state }
func TestGetActorsInShard_NilRegistry(t *testing.T) {
sm := NewShardManager(16, 1)
actors := sm.GetActorsInShard(0, "node-1", nil)
if len(actors) != 0 {
t.Errorf("expected empty slice for nil registry, got %v", actors)
}
}
func TestGetActorsInShard_WithActors(t *testing.T) {
sm := NewShardManager(16, 1)
// Create mock VMs - need to find actors that map to the same shard
// First, find some actor IDs that map to shard 0
var actorsInShard0 []string
for i := 0; i < 100; i++ {
actorID := fmt.Sprintf("actor-%d", i)
if sm.GetShard(actorID) == 0 {
actorsInShard0 = append(actorsInShard0, actorID)
if len(actorsInShard0) >= 3 {
break
}
}
}
activeVMs := make(map[string]VirtualMachine)
for _, actorID := range actorsInShard0 {
activeVMs[actorID] = &mockVM{
id: "vm-" + actorID,
actorID: actorID,
state: VMStateRunning,
}
}
registry := &mockVMRegistry{activeVMs: activeVMs}
actors := sm.GetActorsInShard(0, "node-1", registry)
if len(actors) != len(actorsInShard0) {
t.Errorf("expected %d actors in shard 0, got %d", len(actorsInShard0), len(actors))
}
}
func TestGetActorsInShard_EmptyRegistry(t *testing.T) {
sm := NewShardManager(16, 1)
registry := &mockVMRegistry{activeVMs: make(map[string]VirtualMachine)}
actors := sm.GetActorsInShard(0, "node-1", registry)
if len(actors) != 0 {
t.Errorf("expected empty slice for empty registry, got %v", actors)
}
}
// Tests for ConsistentHashPlacement
func TestConsistentHashPlacement_PlaceActor_NoNodes(t *testing.T) {
placement := &ConsistentHashPlacement{}
shardMap := &ShardMap{}
_, err := placement.PlaceActor("actor-1", shardMap, map[string]*NodeInfo{})
if err == nil {
t.Error("expected error when no nodes available")
}
}
func TestConsistentHashPlacement_PlaceActor_SingleNode(t *testing.T) {
placement := &ConsistentHashPlacement{}
shardMap := &ShardMap{}
nodes := map[string]*NodeInfo{
"node-1": {ID: "node-1"},
}
nodeID, err := placement.PlaceActor("actor-1", shardMap, nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if nodeID != "node-1" {
t.Errorf("expected node-1, got %q", nodeID)
}
}
func TestConsistentHashPlacement_PlaceActor_ReturnsValidNode(t *testing.T) {
placement := &ConsistentHashPlacement{}
shardMap := &ShardMap{}
nodes := map[string]*NodeInfo{
"node-1": {ID: "node-1"},
"node-2": {ID: "node-2"},
"node-3": {ID: "node-3"},
}
// PlaceActor should always return one of the available nodes
for i := 0; i < 100; i++ {
nodeID, err := placement.PlaceActor(fmt.Sprintf("actor-%d", i), shardMap, nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if _, exists := nodes[nodeID]; !exists {
t.Errorf("PlaceActor returned invalid node: %q", nodeID)
}
}
}
func TestConsistentHashPlacement_RebalanceShards(t *testing.T) {
placement := &ConsistentHashPlacement{}
currentMap := &ShardMap{
Version: 1,
Shards: map[int][]string{0: {"node-1"}},
}
nodes := map[string]*NodeInfo{
"node-1": {ID: "node-1"},
"node-2": {ID: "node-2"},
}
result, err := placement.RebalanceShards(currentMap, nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
// Current implementation returns unchanged map
if result != currentMap {
t.Error("expected same map returned (simplified implementation)")
}
}
// Benchmark tests
func BenchmarkGetShard(b *testing.B) {
sm := NewShardManager(1024, 1)
actorIDs := make([]string, 1000)
for i := range actorIDs {
actorIDs[i] = fmt.Sprintf("actor-%d", i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
sm.GetShard(actorIDs[i%len(actorIDs)])
}
}
func BenchmarkAssignShard(b *testing.B) {
sm := NewShardManager(1024, 1)
nodes := []string{"node-1", "node-2", "node-3"}
b.ResetTimer()
for i := 0; i < b.N; i++ {
sm.AssignShard(i%1024, nodes)
}
}
func BenchmarkPlaceActor(b *testing.B) {
sm := NewShardManager(1024, 1)
nodes := map[string]*NodeInfo{
"node-1": {ID: "node-1"},
"node-2": {ID: "node-2"},
"node-3": {ID: "node-3"},
}
actorIDs := make([]string, 1000)
for i := range actorIDs {
actorIDs[i] = fmt.Sprintf("actor-%d", i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
sm.PlaceActor(actorIDs[i%len(actorIDs)], nodes)
}
}

View File

@@ -73,14 +73,6 @@ type Event struct {
Timestamp time.Time `json:"timestamp"`
}
// Common event types for Aether infrastructure
const (
// EventTypeEventStored is an internal event published when an event is successfully persisted.
// This event allows observability components (metrics, projections, audit systems) to react
// to persisted events without coupling to application code.
EventTypeEventStored = "EventStored"
)
// Common metadata keys for distributed tracing and auditing
const (
// MetadataKeyCorrelationID identifies related events across services
@@ -184,17 +176,6 @@ type ActorSnapshot struct {
// EventStore defines the interface for event persistence.
//
// # Immutability Guarantee
//
// EventStore is append-only. Once an event is persisted via SaveEvent, it is never
// modified or deleted. The interface intentionally provides no Update or Delete methods.
// This ensures:
// - Events serve as an immutable audit trail
// - State can be safely derived by replaying events
// - Concurrent reads are always safe (events never change)
//
// To correct a mistake, append a new event that expresses the correction.
//
// # Version Semantics
//
// Events for an actor must have monotonically increasing versions. When SaveEvent
@@ -215,13 +196,10 @@ type EventStore interface {
// SaveEvent persists an event to the store. The event's Version must be
// strictly greater than the current latest version for the actor.
// Returns VersionConflictError if version <= current latest version.
// Once saved, the event is immutable and can never be modified or deleted.
SaveEvent(event *Event) error
// GetEvents retrieves events for an actor from a specific version (inclusive).
// Returns an empty slice if no events exist for the actor.
// The returned events are guaranteed to be immutable - they will never be
// modified or deleted from the store.
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
// GetLatestVersion returns the latest version for an actor.

View File

@@ -2,8 +2,6 @@ package aether
import (
"encoding/json"
"errors"
"fmt"
"strings"
"testing"
"time"
@@ -1337,190 +1335,3 @@ func TestReplayError_WithLargeRawData(t *testing.T) {
// Error() should still work
_ = err.Error()
}
// Tests for VersionConflictError
func TestVersionConflictError_Error(t *testing.T) {
err := &VersionConflictError{
ActorID: "order-123",
AttemptedVersion: 3,
CurrentVersion: 5,
}
errMsg := err.Error()
// Verify error message contains all context
if !strings.Contains(errMsg, "order-123") {
t.Errorf("error message should contain ActorID, got: %s", errMsg)
}
if !strings.Contains(errMsg, "3") {
t.Errorf("error message should contain AttemptedVersion, got: %s", errMsg)
}
if !strings.Contains(errMsg, "5") {
t.Errorf("error message should contain CurrentVersion, got: %s", errMsg)
}
if !strings.Contains(errMsg, "version conflict") {
t.Errorf("error message should contain 'version conflict', got: %s", errMsg)
}
}
func TestVersionConflictError_Fields(t *testing.T) {
err := &VersionConflictError{
ActorID: "actor-456",
AttemptedVersion: 10,
CurrentVersion: 8,
}
if err.ActorID != "actor-456" {
t.Errorf("ActorID mismatch: got %q, want %q", err.ActorID, "actor-456")
}
if err.AttemptedVersion != 10 {
t.Errorf("AttemptedVersion mismatch: got %d, want %d", err.AttemptedVersion, 10)
}
if err.CurrentVersion != 8 {
t.Errorf("CurrentVersion mismatch: got %d, want %d", err.CurrentVersion, 8)
}
}
func TestVersionConflictError_Unwrap(t *testing.T) {
err := &VersionConflictError{
ActorID: "actor-789",
AttemptedVersion: 2,
CurrentVersion: 1,
}
unwrapped := err.Unwrap()
if unwrapped != ErrVersionConflict {
t.Errorf("Unwrap should return ErrVersionConflict sentinel")
}
}
func TestVersionConflictError_ErrorsIs(t *testing.T) {
err := &VersionConflictError{
ActorID: "test-actor",
AttemptedVersion: 5,
CurrentVersion: 4,
}
// Test that errors.Is works with sentinel
if !errors.Is(err, ErrVersionConflict) {
t.Error("errors.Is(err, ErrVersionConflict) should return true")
}
// Test that other errors don't match
if errors.Is(err, errors.New("other error")) {
t.Error("errors.Is should not match unrelated errors")
}
}
func TestVersionConflictError_ErrorsAs(t *testing.T) {
originalErr := &VersionConflictError{
ActorID: "actor-unwrap",
AttemptedVersion: 7,
CurrentVersion: 6,
}
var versionErr *VersionConflictError
if !errors.As(originalErr, &versionErr) {
t.Fatalf("errors.As should succeed with VersionConflictError")
}
// Verify fields are accessible through unwrapped error
if versionErr.ActorID != "actor-unwrap" {
t.Errorf("ActorID mismatch after As: got %q", versionErr.ActorID)
}
if versionErr.AttemptedVersion != 7 {
t.Errorf("AttemptedVersion mismatch after As: got %d", versionErr.AttemptedVersion)
}
if versionErr.CurrentVersion != 6 {
t.Errorf("CurrentVersion mismatch after As: got %d", versionErr.CurrentVersion)
}
}
func TestVersionConflictError_CanReadCurrentVersion(t *testing.T) {
// This test verifies that applications can read CurrentVersion for retry strategies
err := &VersionConflictError{
ActorID: "order-abc",
AttemptedVersion: 2,
CurrentVersion: 10,
}
var versionErr *VersionConflictError
if !errors.As(err, &versionErr) {
t.Fatal("failed to unwrap VersionConflictError")
}
// Application can use CurrentVersion to decide retry strategy
nextVersion := versionErr.CurrentVersion + 1
if nextVersion != 11 {
t.Errorf("application should be able to compute next version: got %d, want 11", nextVersion)
}
// Application can log detailed context
logMsg := fmt.Sprintf("Version conflict for actor %q: attempted %d, current %d, will retry with %d",
versionErr.ActorID, versionErr.AttemptedVersion, versionErr.CurrentVersion, nextVersion)
if !strings.Contains(logMsg, "order-abc") {
t.Errorf("application context logging failed: %s", logMsg)
}
}
func TestVersionConflictError_EdgeCases(t *testing.T) {
testCases := []struct {
name string
actorID string
attemp int64
current int64
}{
{"zero current", "actor-1", 1, 0},
{"large numbers", "actor-2", 1000000, 999999},
{"max int64", "actor-3", 9223372036854775807, 9223372036854775806},
{"negative attempt", "actor-4", -1, -2},
{"empty actor id", "", 1, 0},
{"special chars in actor id", "actor@#$%", 2, 1},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := &VersionConflictError{
ActorID: tc.actorID,
AttemptedVersion: tc.attemp,
CurrentVersion: tc.current,
}
// Should not panic
msg := err.Error()
if msg == "" {
t.Error("Error() should return non-empty string")
}
// Should be wrapped correctly
if err.Unwrap() != ErrVersionConflict {
t.Error("Unwrap should return ErrVersionConflict")
}
// errors.Is should work
if !errors.Is(err, ErrVersionConflict) {
t.Error("errors.Is should work for edge case")
}
})
}
}
func TestErrVersionConflict_Sentinel(t *testing.T) {
// Verify the sentinel error is correctly defined
if ErrVersionConflict == nil {
t.Fatal("ErrVersionConflict should not be nil")
}
expectedMsg := "version conflict"
if ErrVersionConflict.Error() != expectedMsg {
t.Errorf("ErrVersionConflict message mismatch: got %q, want %q", ErrVersionConflict.Error(), expectedMsg)
}
// Test that it's usable with errors.Is
if !errors.Is(ErrVersionConflict, ErrVersionConflict) {
t.Error("ErrVersionConflict should match itself with errors.Is")
}
}

View File

@@ -18,19 +18,6 @@ type EventBroadcaster interface {
// Subscribe creates a channel that receives events matching the namespace pattern.
// Pattern syntax follows NATS conventions: "*" matches single token, ">" matches multiple.
Subscribe(namespacePattern string) <-chan *Event
// SubscribeWithFilter creates a filtered subscription channel for a namespace pattern.
// Events are filtered by the provided SubscriptionFilter before delivery.
// Filters are applied with AND logic - events must match all specified criteria.
//
// Example: Subscribe to "orders" namespace, only receiving "OrderPlaced" events for "order-*" actors:
// filter := &SubscriptionFilter{
// EventTypes: []string{"OrderPlaced"},
// ActorPattern: "order-*",
// }
// ch := bus.SubscribeWithFilter("orders", filter)
SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event
Unsubscribe(namespacePattern string, ch <-chan *Event)
Publish(namespaceID string, event *Event)
Stop()
@@ -50,13 +37,6 @@ type subscription struct {
ch chan *Event
}
// filteredSubscription represents a subscriber with an optional filter
type filteredSubscription struct {
pattern string
ch chan *Event
filter *SubscriptionFilter
}
// EventBus broadcasts events to multiple subscribers within a namespace.
// Supports wildcard patterns for cross-namespace subscriptions.
//
@@ -66,9 +46,9 @@ type filteredSubscription struct {
// However, it bypasses namespace isolation - use with appropriate access controls.
type EventBus struct {
// exactSubscribers holds subscribers for exact namespace matches (no wildcards)
exactSubscribers map[string][]*filteredSubscription
exactSubscribers map[string][]chan *Event
// wildcardSubscribers holds subscribers with wildcard patterns
wildcardSubscribers []*filteredSubscription
wildcardSubscribers []subscription
mutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
@@ -79,8 +59,8 @@ type EventBus struct {
func NewEventBus() *EventBus {
ctx, cancel := context.WithCancel(context.Background())
return &EventBus{
exactSubscribers: make(map[string][]*filteredSubscription),
wildcardSubscribers: make([]*filteredSubscription, 0),
exactSubscribers: make(map[string][]chan *Event),
wildcardSubscribers: make([]subscription, 0),
ctx: ctx,
cancel: cancel,
metrics: NewMetricsCollector(),
@@ -101,39 +81,21 @@ func (eb *EventBus) Metrics() BroadcasterMetrics {
// Security Warning: Wildcard patterns receive events from all matching namespaces,
// bypassing namespace isolation. Only use for trusted system components.
func (eb *EventBus) Subscribe(namespacePattern string) <-chan *Event {
return eb.SubscribeWithFilter(namespacePattern, nil)
}
// SubscribeWithFilter creates a filtered subscription channel for a namespace pattern.
// Events are filtered by the provided SubscriptionFilter before delivery.
// If filter is nil or empty, all events matching the namespace pattern are delivered.
//
// Filtering is applied client-side for efficient processing:
// - EventTypes: Only events with matching event types are delivered
// - ActorPattern: Only events from matching actors are delivered
//
// Both namespace pattern wildcards and event filters work together:
// - Namespace pattern determines which namespaces to subscribe to
// - Filter determines which events within those namespaces to receive
func (eb *EventBus) SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event {
eb.mutex.Lock()
defer eb.mutex.Unlock()
// Create buffered channel to prevent blocking publishers
ch := make(chan *Event, 100)
sub := &filteredSubscription{
pattern: namespacePattern,
ch: ch,
filter: filter,
}
if IsWildcardPattern(namespacePattern) {
// Store wildcard subscription separately
eb.wildcardSubscribers = append(eb.wildcardSubscribers, sub)
eb.wildcardSubscribers = append(eb.wildcardSubscribers, subscription{
pattern: namespacePattern,
ch: ch,
})
} else {
// Exact match subscription
eb.exactSubscribers[namespacePattern] = append(eb.exactSubscribers[namespacePattern], sub)
eb.exactSubscribers[namespacePattern] = append(eb.exactSubscribers[namespacePattern], ch)
}
// Record subscription metric
@@ -161,11 +123,11 @@ func (eb *EventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
} else {
// Remove from exact subscribers
subs := eb.exactSubscribers[namespacePattern]
for i, sub := range subs {
if sub.ch == ch {
// Remove subscription from slice
for i, subscriber := range subs {
if subscriber == ch {
// Remove channel from slice
eb.exactSubscribers[namespacePattern] = append(subs[:i], subs[i+1:]...)
close(sub.ch)
close(subscriber)
// Record unsubscription metric
eb.metrics.RecordUnsubscribe(namespacePattern)
break
@@ -181,8 +143,8 @@ func (eb *EventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
// Publish sends an event to all subscribers of a namespace.
// Events are delivered to:
// - All exact subscribers for the namespace (after filter matching)
// - All wildcard subscribers whose pattern matches the namespace (after filter matching)
// - All exact subscribers for the namespace
// - All wildcard subscribers whose pattern matches the namespace
func (eb *EventBus) Publish(namespaceID string, event *Event) {
eb.mutex.RLock()
defer eb.mutex.RUnlock()
@@ -192,28 +154,20 @@ func (eb *EventBus) Publish(namespaceID string, event *Event) {
// Deliver to exact subscribers
subscribers := eb.exactSubscribers[namespaceID]
for _, sub := range subscribers {
eb.deliverToSubscriber(sub, event, namespaceID)
for _, ch := range subscribers {
select {
case ch <- event:
// Event delivered
eb.metrics.RecordReceive(namespaceID)
default:
// Channel full, skip this subscriber (non-blocking)
eb.metrics.RecordDroppedEvent(namespaceID)
}
}
// Deliver to matching wildcard subscribers
for _, sub := range eb.wildcardSubscribers {
if MatchNamespacePattern(sub.pattern, namespaceID) {
eb.deliverToSubscriber(sub, event, namespaceID)
}
}
}
// deliverToSubscriber delivers an event to a subscriber if it matches the filter
func (eb *EventBus) deliverToSubscriber(sub *filteredSubscription, event *Event, namespaceID string) {
// Apply filter if present
if sub.filter != nil && !sub.filter.IsEmpty() {
if !sub.filter.Matches(event) {
// Event doesn't match filter, skip delivery
return
}
}
select {
case sub.ch <- event:
// Event delivered
@@ -223,6 +177,8 @@ func (eb *EventBus) deliverToSubscriber(sub *filteredSubscription, event *Event,
eb.metrics.RecordDroppedEvent(namespaceID)
}
}
}
}
// Stop closes the event bus
func (eb *EventBus) Stop() {
@@ -233,8 +189,8 @@ func (eb *EventBus) Stop() {
// Close all exact subscriber channels and update metrics
for namespaceID, subs := range eb.exactSubscribers {
for _, sub := range subs {
close(sub.ch)
for _, ch := range subs {
close(ch)
eb.metrics.RecordUnsubscribe(namespaceID)
}
}
@@ -245,8 +201,8 @@ func (eb *EventBus) Stop() {
eb.metrics.RecordUnsubscribe(sub.pattern)
}
eb.exactSubscribers = make(map[string][]*filteredSubscription)
eb.wildcardSubscribers = make([]*filteredSubscription, 0)
eb.exactSubscribers = make(map[string][]chan *Event)
eb.wildcardSubscribers = make([]subscription, 0)
}
// SubscriberCount returns the number of subscribers for a namespace.

View File

@@ -414,409 +414,3 @@ func TestEventBus_ConcurrentOperations(t *testing.T) {
wg.Wait()
}
// Tests for SubscribeWithFilter functionality
func TestEventBus_SubscribeWithFilter_EventTypes(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Subscribe with filter for specific event types
filter := &SubscriptionFilter{
EventTypes: []string{"OrderPlaced", "OrderShipped"},
}
ch := eb.SubscribeWithFilter("orders", filter)
// Publish events of different types
events := []*Event{
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"},
{ID: "evt-2", EventType: "OrderCancelled", ActorID: "order-2"}, // Should not be received
{ID: "evt-3", EventType: "OrderShipped", ActorID: "order-3"},
}
for _, e := range events {
eb.Publish("orders", e)
}
// Should receive evt-1 and evt-3, but not evt-2
received := make(map[string]bool)
timeout := time.After(100 * time.Millisecond)
for i := 0; i < 2; i++ {
select {
case evt := <-ch:
received[evt.ID] = true
case <-timeout:
t.Fatalf("timed out after receiving %d events", len(received))
}
}
if !received["evt-1"] || !received["evt-3"] {
t.Errorf("expected to receive evt-1 and evt-3, got %v", received)
}
// Verify evt-2 was not received
select {
case evt := <-ch:
t.Errorf("unexpected event received: %s", evt.ID)
case <-time.After(50 * time.Millisecond):
// Expected
}
}
func TestEventBus_SubscribeWithFilter_ActorPattern(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Subscribe with filter for specific actor pattern
filter := &SubscriptionFilter{
ActorPattern: "order-*",
}
ch := eb.SubscribeWithFilter("events", filter)
// Publish events from different actors
events := []*Event{
{ID: "evt-1", EventType: "Test", ActorID: "order-123"},
{ID: "evt-2", EventType: "Test", ActorID: "user-456"}, // Should not be received
{ID: "evt-3", EventType: "Test", ActorID: "order-789"},
}
for _, e := range events {
eb.Publish("events", e)
}
// Should receive evt-1 and evt-3, but not evt-2
received := make(map[string]bool)
timeout := time.After(100 * time.Millisecond)
for i := 0; i < 2; i++ {
select {
case evt := <-ch:
received[evt.ID] = true
case <-timeout:
t.Fatalf("timed out after receiving %d events", len(received))
}
}
if !received["evt-1"] || !received["evt-3"] {
t.Errorf("expected to receive evt-1 and evt-3, got %v", received)
}
// Verify evt-2 was not received
select {
case evt := <-ch:
t.Errorf("unexpected event received: %s", evt.ID)
case <-time.After(50 * time.Millisecond):
// Expected
}
}
func TestEventBus_SubscribeWithFilter_Combined(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Subscribe with filter for both event type AND actor pattern
filter := &SubscriptionFilter{
EventTypes: []string{"OrderPlaced"},
ActorPattern: "order-*",
}
ch := eb.SubscribeWithFilter("orders", filter)
// Publish events with various combinations
events := []*Event{
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-123"}, // Should be received
{ID: "evt-2", EventType: "OrderPlaced", ActorID: "user-456"}, // Wrong actor
{ID: "evt-3", EventType: "OrderCancelled", ActorID: "order-789"}, // Wrong type
{ID: "evt-4", EventType: "OrderCancelled", ActorID: "user-000"}, // Wrong both
}
for _, e := range events {
eb.Publish("orders", e)
}
// Should only receive evt-1
select {
case evt := <-ch:
if evt.ID != "evt-1" {
t.Errorf("expected evt-1, got %s", evt.ID)
}
case <-time.After(100 * time.Millisecond):
t.Fatal("timed out waiting for event")
}
// Verify no more events arrive
select {
case evt := <-ch:
t.Errorf("unexpected event received: %s", evt.ID)
case <-time.After(50 * time.Millisecond):
// Expected
}
}
func TestEventBus_SubscribeWithFilter_NilFilter(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Subscribe with nil filter - should receive all events
ch := eb.SubscribeWithFilter("events", nil)
events := []*Event{
{ID: "evt-1", EventType: "TypeA", ActorID: "actor-1"},
{ID: "evt-2", EventType: "TypeB", ActorID: "actor-2"},
}
for _, e := range events {
eb.Publish("events", e)
}
received := make(map[string]bool)
timeout := time.After(100 * time.Millisecond)
for i := 0; i < 2; i++ {
select {
case evt := <-ch:
received[evt.ID] = true
case <-timeout:
t.Fatalf("timed out after receiving %d events", len(received))
}
}
if !received["evt-1"] || !received["evt-2"] {
t.Errorf("expected all events, got %v", received)
}
}
func TestEventBus_SubscribeWithFilter_EmptyFilter(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Subscribe with empty filter - should receive all events
ch := eb.SubscribeWithFilter("events", &SubscriptionFilter{})
events := []*Event{
{ID: "evt-1", EventType: "TypeA", ActorID: "actor-1"},
{ID: "evt-2", EventType: "TypeB", ActorID: "actor-2"},
}
for _, e := range events {
eb.Publish("events", e)
}
received := make(map[string]bool)
timeout := time.After(100 * time.Millisecond)
for i := 0; i < 2; i++ {
select {
case evt := <-ch:
received[evt.ID] = true
case <-timeout:
t.Fatalf("timed out after receiving %d events", len(received))
}
}
if !received["evt-1"] || !received["evt-2"] {
t.Errorf("expected all events, got %v", received)
}
}
func TestEventBus_SubscribeWithFilter_WildcardNamespaceAndFilter(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Subscribe to wildcard namespace pattern with event type filter
filter := &SubscriptionFilter{
EventTypes: []string{"OrderPlaced"},
}
ch := eb.SubscribeWithFilter("prod.*", filter)
// Publish events to different namespaces
events := []*Event{
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"}, // prod.orders - should match
{ID: "evt-2", EventType: "OrderShipped", ActorID: "order-2"}, // prod.orders - wrong type
{ID: "evt-3", EventType: "OrderPlaced", ActorID: "order-3"}, // staging.orders - wrong namespace
}
eb.Publish("prod.orders", events[0])
eb.Publish("prod.orders", events[1])
eb.Publish("staging.orders", events[2])
// Should only receive evt-1
select {
case evt := <-ch:
if evt.ID != "evt-1" {
t.Errorf("expected evt-1, got %s", evt.ID)
}
case <-time.After(100 * time.Millisecond):
t.Fatal("timed out waiting for event")
}
// Verify no more events arrive
select {
case evt := <-ch:
t.Errorf("unexpected event received: %s", evt.ID)
case <-time.After(50 * time.Millisecond):
// Expected
}
}
func TestEventBus_SubscribeWithFilter_MultipleSubscribersWithDifferentFilters(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Two subscribers with different filters on same namespace
filter1 := &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}
filter2 := &SubscriptionFilter{EventTypes: []string{"OrderShipped"}}
ch1 := eb.SubscribeWithFilter("orders", filter1)
ch2 := eb.SubscribeWithFilter("orders", filter2)
events := []*Event{
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"},
{ID: "evt-2", EventType: "OrderShipped", ActorID: "order-2"},
}
for _, e := range events {
eb.Publish("orders", e)
}
// ch1 should only receive evt-1
select {
case evt := <-ch1:
if evt.ID != "evt-1" {
t.Errorf("ch1: expected evt-1, got %s", evt.ID)
}
case <-time.After(100 * time.Millisecond):
t.Fatal("ch1 timed out")
}
// ch2 should only receive evt-2
select {
case evt := <-ch2:
if evt.ID != "evt-2" {
t.Errorf("ch2: expected evt-2, got %s", evt.ID)
}
case <-time.After(100 * time.Millisecond):
t.Fatal("ch2 timed out")
}
// Verify no extra events
select {
case evt := <-ch1:
t.Errorf("ch1: unexpected event %s", evt.ID)
case evt := <-ch2:
t.Errorf("ch2: unexpected event %s", evt.ID)
case <-time.After(50 * time.Millisecond):
// Expected
}
}
func TestEventBus_SubscribeWithFilter_UnsubscribeFiltered(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
filter := &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}
ch := eb.SubscribeWithFilter("orders", filter)
// Verify subscription count
if eb.SubscriberCount("orders") != 1 {
t.Errorf("expected 1 subscriber, got %d", eb.SubscriberCount("orders"))
}
eb.Unsubscribe("orders", ch)
// Verify unsubscribed
if eb.SubscriberCount("orders") != 0 {
t.Errorf("expected 0 subscribers, got %d", eb.SubscriberCount("orders"))
}
}
func TestEventBus_SubscribeWithFilter_FilteredAndUnfilteredCoexist(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// One subscriber with filter, one without
filter := &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}
chFiltered := eb.SubscribeWithFilter("orders", filter)
chUnfiltered := eb.Subscribe("orders")
events := []*Event{
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"},
{ID: "evt-2", EventType: "OrderShipped", ActorID: "order-2"},
}
for _, e := range events {
eb.Publish("orders", e)
}
// Filtered subscriber should only receive evt-1
select {
case evt := <-chFiltered:
if evt.ID != "evt-1" {
t.Errorf("filtered: expected evt-1, got %s", evt.ID)
}
case <-time.After(100 * time.Millisecond):
t.Fatal("filtered subscriber timed out")
}
// Unfiltered subscriber should receive both
received := make(map[string]bool)
timeout := time.After(100 * time.Millisecond)
for i := 0; i < 2; i++ {
select {
case evt := <-chUnfiltered:
received[evt.ID] = true
case <-timeout:
t.Fatalf("unfiltered timed out after %d events", len(received))
}
}
if !received["evt-1"] || !received["evt-2"] {
t.Errorf("unfiltered expected both events, got %v", received)
}
}
func TestEventBus_SubscribeWithFilter_WildcardGreaterWithFilter(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Use > wildcard (matches one or more tokens) with filter
filter := &SubscriptionFilter{
ActorPattern: "order-*",
}
ch := eb.SubscribeWithFilter(">", filter)
events := []*Event{
{ID: "evt-1", EventType: "Test", ActorID: "order-123"},
{ID: "evt-2", EventType: "Test", ActorID: "user-456"},
{ID: "evt-3", EventType: "Test", ActorID: "order-789"},
}
// Publish to different namespaces
eb.Publish("tenant-a", events[0])
eb.Publish("tenant-b", events[1])
eb.Publish("prod.orders", events[2])
// Should receive evt-1 and evt-3, but not evt-2
received := make(map[string]bool)
timeout := time.After(100 * time.Millisecond)
for i := 0; i < 2; i++ {
select {
case evt := <-ch:
received[evt.ID] = true
case <-timeout:
t.Fatalf("timed out after %d events", len(received))
}
}
if !received["evt-1"] || !received["evt-3"] {
t.Errorf("expected evt-1 and evt-3, got %v", received)
}
// Verify no evt-2
select {
case evt := <-ch:
t.Errorf("unexpected event: %s", evt.ID)
case <-time.After(50 * time.Millisecond):
// Expected
}
}

View File

@@ -1,189 +0,0 @@
# Aether Examples
This directory contains examples demonstrating common patterns for using Aether.
## Retry Patterns (`retry_patterns.go`)
When saving events with optimistic concurrency control, your application may encounter `VersionConflictError` when multiple writers attempt to update the same actor concurrently. This file demonstrates several retry strategies.
### Pattern Overview
All retry patterns work with `VersionConflictError` which provides three critical fields:
- **ActorID**: The actor that experienced the conflict
- **CurrentVersion**: The latest version in the store
- **AttemptedVersion**: The version you tried to save
Your application can read these fields to make intelligent retry decisions.
### Available Patterns
#### SimpleRetryPattern
The most basic pattern - just retry with exponential backoff:
```go
// Automatically retries up to 3 times with exponential backoff
err := SimpleRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You want a straightforward retry mechanism without complex logic.
#### ConflictDetailedRetryPattern
Extracts detailed information from the conflict error to make smarter decisions:
```go
// Detects thrashing (multiple conflicts at same version)
// and can implement circuit-breaker logic
err := ConflictDetailedRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You need visibility into conflict patterns and want to detect system issues like thrashing.
#### JitterRetryPattern
Adds randomized jitter to prevent "thundering herd" when multiple writers retry:
```go
// Exponential backoff with jitter prevents synchronized retries
err := JitterRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You have high concurrency and want to prevent all writers from retrying at the same time.
#### AdaptiveRetryPattern
Adjusts backoff duration based on version distance (indicator of contention):
```go
// Light contention (gap=1): 50ms backoff
// Moderate contention (gap=3-10): proportional backoff
// High contention (gap>10): aggressive backoff
err := AdaptiveRetryPattern(store, "order-123", "OrderUpdated")
```
**Use when**: You want backoff strategy to respond to actual system load.
#### EventualConsistencyPattern
Instead of blocking on retry, queues the event for asynchronous retry:
```go
// Returns immediately, event is queued for background retry
EventualConsistencyPattern(store, retryQueue, event)
// Background worker processes the queue
for item := range retryQueue {
// Implement your own retry logic here
}
```
**Use when**: You can't afford to block the request, and background retry is acceptable.
#### CircuitBreakerPattern
Implements a circuit breaker to prevent cascading failures:
```go
cb := NewCircuitBreaker()
// Fails fast when circuit is open
err := CircuitBreakerRetryPattern(store, cb, "order-123", "OrderUpdated")
if err != nil && !cb.CanRetry() {
return ErrCircuitBreakerOpen
}
```
**Use when**: You have a distributed system and want to prevent retry storms during outages.
## Common Pattern: Extract and Log Context
All patterns can read context from `VersionConflictError`:
```go
var versionErr *aether.VersionConflictError
if errors.As(err, &versionErr) {
log.Printf(
"Conflict for actor %q: attempted %d, current %d",
versionErr.ActorID,
versionErr.AttemptedVersion,
versionErr.CurrentVersion,
)
}
```
## Sentinel Error Check
Check if an error is a version conflict without examining the struct:
```go
if errors.Is(err, aether.ErrVersionConflict) {
// This is a version conflict - retry is appropriate
}
```
## Implementing Your Own Pattern
Basic template:
```go
for attempt := 0; attempt < maxRetries; attempt++ {
// 1. Get current version
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
// 2. Create event with next version
event := &aether.Event{
ActorID: actorID,
Version: currentVersion + 1,
// ... other fields
}
// 3. Attempt save
err = store.SaveEvent(event)
if err == nil {
return nil // Success
}
// 4. Check if it's a conflict
if !errors.Is(err, aether.ErrVersionConflict) {
return err // Some other error
}
// 5. Implement your retry strategy
time.Sleep(yourBackoff(attempt))
}
```
## Choosing a Pattern
| Pattern | Latency | Throughput | Complexity | Use Case |
|---------|---------|-----------|-----------|----------|
| Simple | Low | Low | Very Low | Single writer, testing |
| DetailedConflict | Low | Medium | Medium | Debugging, monitoring |
| Jitter | Low-Medium | High | Low | Multi-writer concurrency |
| Adaptive | Low-Medium | High | Medium | Variable load scenarios |
| EventualConsistency | Very Low | Very High | High | High-volume, async-OK workloads |
| CircuitBreaker | Variable | Stable | High | Distributed, failure-resilient systems |
## Performance Considerations
1. **Backoff timing**: Shorter backoffs waste CPU on retries, longer backoffs increase latency
2. **Retry limits**: Too few retries give up too early, too many waste resources
3. **Jitter**: Essential for preventing synchronized retries in high-concurrency scenarios
4. **Monitoring**: Track retry rates and conflict patterns to detect system issues
## Testing
Use `aether.NewInMemoryEventStore()` in tests:
```go
store := store.NewInMemoryEventStore()
err := SimpleRetryPattern(store, "test-actor", "TestEvent")
if err != nil {
t.Fatalf("retry pattern failed: %v", err)
}
```

View File

@@ -1,168 +0,0 @@
// Package main demonstrates cross-node event broadcasting using NATSEventBus
// and JetStreamEventStore for cluster synchronization.
//
// This example shows:
// 1. Setting up NATSEventBus with JetStreamEventStore
// 2. Broadcasting events across NATS for cross-node distribution
// 3. Subscribing to EventStored events for version cache synchronization
// 4. Properly handling EventStored events from other cluster nodes
//
// Prerequisites:
// - NATS server running with JetStream enabled (nats-server -js)
// - Events stream created in JetStream
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"git.flowmade.one/flowmade-one/aether"
"git.flowmade.one/flowmade-one/aether/store"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)
func main() {
natsURL := getEnv("NATS_URL", "nats://localhost:4222")
nc, err := nats.Connect(natsURL)
if err != nil {
log.Fatal("Failed to connect to NATS:", err)
}
defer nc.Close()
ctx := context.Background()
store1, err := store.NewJetStreamEventStore(nc, "events")
if err != nil {
log.Fatal("Failed to create event store:", err)
}
eventBus1 := aether.NewNATSEventBusWithBroadcaster(nc, store1, "")
defer eventBus1.Stop()
store2, err := store.NewJetStreamEventStore(nc, "events")
if err != nil {
log.Fatal("Failed to create event store:", err)
}
eventBus2 := aether.NewNATSEventBusWithBroadcaster(nc, store2, "")
defer eventBus2.Stop()
eventStoredCh1 := eventBus1.SubscribeToEventStored("*")
eventStoredCh2 := eventBus2.SubscribeToEventStored("*")
done := make(chan struct{})
go processEvents(ctx, eventStoredCh1, store1, done)
go processEvents(ctx, eventStoredCh2, store2, done)
go func() {
time.Sleep(2 * time.Second)
actorID := "demo-actor"
event1 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{
"total": 99.99,
"status": "pending",
},
Timestamp: time.Now(),
}
log.Printf("Node 1 publishing event: %s", event1.EventType)
eventBus1.Publish("", event1)
time.Sleep(500 * time.Millisecond)
event2 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPaid",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{
"total": 99.99,
"status": "paid",
"method": "credit_card",
},
Timestamp: time.Now(),
}
log.Printf("Node 2 publishing event: %s", event2.EventType)
eventBus2.Publish("", event2)
time.Sleep(2 * time.Second)
close(done)
log.Println("Cross-node broadcasting demo complete")
}()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigCh:
log.Println("Shutting down...")
case <-done:
}
}
func processEvents(ctx context.Context, eventStoredCh <-chan *aether.Event, eventStore *store.JetStreamEventStore, done chan struct{}) {
for {
select {
case <-done:
return
case <-ctx.Done():
return
case event, ok := <-eventStoredCh:
if !ok {
return
}
if event == nil {
continue
}
if event.EventType != aether.EventTypeEventStored {
continue
}
actorID, ok := event.Data["actorId"].(string)
if !ok {
log.Printf("Warning: EventStored missing actorId")
continue
}
version, ok := event.Data["version"].(int64)
if !ok {
log.Printf("Warning: EventStored missing version")
continue
}
eventID, _ := event.Data["eventId"].(string)
log.Printf("Received EventStored: actor=%s, version=%d, eventId=%s", actorID, version, eventID)
eventStore.UpdateVersionCache(actorID, version)
currentVersion, _ := eventStore.GetLatestVersion(actorID)
log.Printf("Updated cache: %s now has version %d (cached: %d)", actorID, version, currentVersion)
}
}
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"github.com/google/uuid"
@@ -25,8 +24,6 @@ type NATSEventBus struct {
subscriptions []*nats.Subscription
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
nodeID string // Unique ID for this node
streamPrefix string // NATS subject prefix for events
eventStore interface{} // Optional event store for version cache sync (jetstream.JetStreamEventStore)
mutex sync.Mutex
ctx context.Context
cancel context.CancelFunc
@@ -49,7 +46,6 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0),
patternSubscribers: make(map[string]int),
streamPrefix: "aether",
ctx: ctx,
cancel: cancel,
}
@@ -57,43 +53,6 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
return neb, nil
}
// NewNATSEventBusWithBroadcaster creates a new NATS-backed event bus with JetStreamEventStore integration.
// The event store is used to automatically update version cache when EventStored events are received
// from other cluster nodes via NATS. This ensures cross-node version consistency.
//
// Example:
//
// eventBus := aether.NewNATSEventBusWithBroadcaster(natsConn, store, "tenant-abc")
// ch := eventBus.SubscribeToEventStored("tenant-*")
// for event := range ch {
// actorID := event.Data["actorId"].(string)
// version := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
//
// The namespace parameter is used as a prefix for EventStored event filtering.
// If empty, EventStored events from all namespaces will be received (requires wildcard pattern).
func NewNATSEventBusWithBroadcaster(nc *nats.Conn, store interface{}, namespace string) *NATSEventBus {
streamPrefix := "aether"
if namespace != "" {
streamPrefix = fmt.Sprintf("aether.%s", sanitizeSubject(namespace))
}
neb := &NATSEventBus{
EventBus: NewEventBus(),
nc: nc,
nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0),
patternSubscribers: make(map[string]int),
streamPrefix: streamPrefix,
eventStore: store,
ctx: context.Background(),
cancel: func() {},
}
return neb
}
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
// Supports NATS subject patterns:
// - "*" matches a single token
@@ -102,25 +61,11 @@ func NewNATSEventBusWithBroadcaster(nc *nats.Conn, store interface{}, namespace
// Security Warning: Wildcard patterns receive events from all matching namespaces,
// bypassing namespace isolation. Only use for trusted system components.
func (neb *NATSEventBus) Subscribe(namespacePattern string) <-chan *Event {
return neb.SubscribeWithFilter(namespacePattern, nil)
}
// SubscribeWithFilter creates a filtered subscription channel for a namespace pattern.
// Events are filtered by the provided SubscriptionFilter before delivery.
// If filter is nil or empty, all events matching the namespace pattern are delivered.
//
// For NATSEventBus:
// - Namespace pattern filtering is applied at the NATS level using native wildcards
// - EventTypes and ActorPattern filters are applied client-side after receiving messages
//
// This allows efficient server-side filtering for namespaces while providing
// flexible client-side filtering for event types and actors.
func (neb *NATSEventBus) SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event {
neb.mutex.Lock()
defer neb.mutex.Unlock()
// Create local subscription first (with filter)
ch := neb.EventBus.SubscribeWithFilter(namespacePattern, filter)
// Create local subscription first
ch := neb.EventBus.Subscribe(namespacePattern)
// Check if this is the first subscriber for this pattern
count := neb.patternSubscribers[namespacePattern]
@@ -196,21 +141,12 @@ func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg, subscribedPattern string
}
// deliverToWildcardSubscribers delivers an event to subscribers of a specific wildcard pattern
// Applies filters before delivery.
func (neb *NATSEventBus) deliverToWildcardSubscribers(pattern string, event *Event) {
neb.EventBus.mutex.RLock()
defer neb.EventBus.mutex.RUnlock()
for _, sub := range neb.EventBus.wildcardSubscribers {
if sub.pattern == pattern {
// Apply filter if present
if sub.filter != nil && !sub.filter.IsEmpty() {
if !sub.filter.Matches(event) {
// Event doesn't match filter, skip delivery
continue
}
}
select {
case sub.ch <- event:
// Event delivered from NATS
@@ -269,103 +205,3 @@ func (neb *NATSEventBus) Stop() {
log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID)
}
// sanitizeSubject sanitizes a string for use in NATS subjects
func sanitizeSubject(s string) string {
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, ".", "_")
s = strings.ReplaceAll(s, "*", "_")
s = strings.ReplaceAll(s, ">", "_")
return s
}
// extractActorType extracts the actor type from an actor ID
func extractActorType(actorID string) string {
for i, c := range actorID {
if c == '-' && i > 0 {
return actorID[:i]
}
}
return "unknown"
}
// SubscribeToEventStored creates a subscription to EventStored events for a namespace pattern.
// EventStored events are published by JetStreamEventStore when events are successfully saved.
// This is useful for cross-node event synchronization and version cache consistency.
//
// The returned channel receives EventStored events matching the pattern.
// The EventStored event schema:
// - EventType: "EventStored"
// - ActorID: ID of the actor that the original event was about
// - Version: version of the stored event
// - Data:
// - eventId: (string) ID of the stored event
// - actorId: (string) ID of the actor
// - version: (int64) version of the event
// - timestamp: (int64) Unix timestamp of when the event was stored
//
// The namespacePattern supports NATS wildcards:
// - "*" matches a single token
// - ">" matches one or more tokens (only at the end)
//
// Example:
//
// ch := eventBus.SubscribeToEventStored("tenant-*")
// for event := range ch {
// if event.EventType != aether.EventTypeEventStored {
// continue
// }
// actorID := event.Data["actorId"].(string)
// version, _ := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
//
// Security Warning: Using wildcard patterns like ">" will receive EventStored events
// from all namespaces. Ensure your application handles this appropriately.
func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event {
neb.mutex.Lock()
defer neb.mutex.Unlock()
subject := fmt.Sprintf("%s.%s.%s", neb.streamPrefix, namespacePattern, "events.>")
ch := make(chan *Event, 100)
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
var eventMsg eventMessage
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
log.Printf("[NATSEventBus] Failed to unmarshal EventStored event: %v", err)
return
}
if eventMsg.NodeID == neb.nodeID {
return
}
if eventMsg.Event.EventType == EventTypeEventStored && neb.eventStore != nil {
actorID, ok := eventMsg.Event.Data["actorId"].(string)
if !ok {
return
}
version, ok := eventMsg.Event.Data["version"].(int64)
if !ok {
return
}
// Use type assertion to call UpdateVersionCache
if es, ok := neb.eventStore.(interface{ UpdateVersionCache(string, int64) }); ok {
es.UpdateVersionCache(actorID, version)
}
}
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
})
if err != nil {
log.Printf("[NATSEventBus] Failed to subscribe to EventStored: %v", err)
close(ch)
return ch
}
neb.subscriptions = append(neb.subscriptions, sub)
return ch
}

File diff suppressed because it is too large Load Diff

View File

@@ -81,117 +81,3 @@ func matchTokens(patternTokens, namespaceTokens []string) bool {
func IsWildcardPattern(pattern string) bool {
return strings.Contains(pattern, "*") || strings.Contains(pattern, ">")
}
// SubscriptionFilter defines optional filters for event subscriptions.
// All configured filters are combined with AND logic - an event must match
// all specified criteria to be delivered to the subscriber.
//
// Filter Processing:
// - EventTypes: Event must have an EventType matching at least one in the list (OR within types)
// - ActorPattern: Event's ActorID must match the pattern (supports * and > wildcards)
//
// Filtering is applied client-side in the EventBus. For NATSEventBus, namespace-level
// filtering uses NATS subject patterns, while EventTypes and ActorPattern filtering
// happens after message receipt.
type SubscriptionFilter struct {
// EventTypes filters events by type. Empty slice means all event types.
// If specified, only events with an EventType in this list are delivered.
// Example: []string{"OrderPlaced", "OrderShipped"} receives only those event types.
EventTypes []string
// ActorPattern filters events by actor ID pattern. Empty string means all actors.
// Supports NATS-style wildcards:
// - "*" matches a single token (e.g., "order-*" matches "order-123", "order-456")
// - ">" matches one or more tokens (e.g., "order.>" matches "order.us.123", "order.eu.456")
// Example: "order-*" receives events only for actors starting with "order-"
ActorPattern string
}
// IsEmpty returns true if no filters are configured.
func (f *SubscriptionFilter) IsEmpty() bool {
return len(f.EventTypes) == 0 && f.ActorPattern == ""
}
// Matches returns true if the event matches all configured filters.
// An empty filter matches all events.
func (f *SubscriptionFilter) Matches(event *Event) bool {
if event == nil {
return false
}
// Check event type filter
if len(f.EventTypes) > 0 {
typeMatch := false
for _, et := range f.EventTypes {
if event.EventType == et {
typeMatch = true
break
}
}
if !typeMatch {
return false
}
}
// Check actor pattern filter
if f.ActorPattern != "" {
if !MatchActorPattern(f.ActorPattern, event.ActorID) {
return false
}
}
return true
}
// MatchActorPattern checks if an actor ID matches a pattern.
// Uses the same matching logic as MatchNamespacePattern for consistency.
//
// Patterns:
// - "*" matches a single token (e.g., "order-*" matches "order-123")
// - ">" matches one or more tokens (e.g., "order.>" matches "order.us.east")
// - Exact strings match exactly (e.g., "order-123" matches only "order-123")
//
// Note: For simple prefix matching without dots (e.g., "order-*" matching "order-123"),
// this uses simplified matching where "*" matches any remaining characters in a token.
func MatchActorPattern(pattern, actorID string) bool {
// Empty pattern matches nothing
if pattern == "" {
return false
}
// Empty actor ID matches nothing except ">"
if actorID == "" {
return false
}
// If pattern contains dots, use token-based matching (same as namespace)
if strings.Contains(pattern, ".") || strings.Contains(actorID, ".") {
return MatchNamespacePattern(pattern, actorID)
}
// Simple matching for non-tokenized patterns
// ">" matches any non-empty actor ID
if pattern == ">" {
return true
}
// "*" matches any single-token actor ID (no dots)
if pattern == "*" {
return true
}
// Check for suffix wildcard (e.g., "order-*")
if strings.HasSuffix(pattern, "*") {
prefix := strings.TrimSuffix(pattern, "*")
return strings.HasPrefix(actorID, prefix)
}
// Check for suffix multi-match (e.g., "order->")
if strings.HasSuffix(pattern, ">") {
prefix := strings.TrimSuffix(pattern, ">")
return strings.HasPrefix(actorID, prefix)
}
// Exact match
return pattern == actorID
}

View File

@@ -115,128 +115,3 @@ func BenchmarkMatchNamespacePattern(b *testing.B) {
})
}
}
func TestMatchActorPattern(t *testing.T) {
tests := []struct {
name string
pattern string
actorID string
expected bool
}{
// Empty cases
{"empty pattern", "", "actor-123", false},
{"empty actorID", "actor-*", "", false},
{"both empty", "", "", false},
// Exact matches (no dots)
{"exact match", "actor-123", "actor-123", true},
{"exact mismatch", "actor-123", "actor-456", false},
// Suffix wildcard with * (simple, no dots)
{"prefix with star", "order-*", "order-123", true},
{"prefix with star 2", "order-*", "order-456-xyz", true},
{"prefix with star mismatch", "order-*", "user-123", false},
{"star alone", "*", "anything", true},
// Suffix wildcard with > (simple, no dots)
{"prefix with greater", "order->", "order-123", true},
{"greater alone", ">", "anything", true},
// Dot-separated actor IDs (uses MatchNamespacePattern)
{"dotted exact match", "order.us.123", "order.us.123", true},
{"dotted exact mismatch", "order.us.123", "order.eu.123", false},
{"dotted star", "order.*", "order.123", true},
{"dotted star deep", "order.*.*", "order.us.123", true},
{"dotted greater", "order.>", "order.us.123.456", true},
{"dotted star mismatch depth", "order.*", "order.us.123", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := MatchActorPattern(tt.pattern, tt.actorID)
if result != tt.expected {
t.Errorf("MatchActorPattern(%q, %q) = %v, want %v",
tt.pattern, tt.actorID, result, tt.expected)
}
})
}
}
func TestSubscriptionFilter_IsEmpty(t *testing.T) {
tests := []struct {
name string
filter *SubscriptionFilter
expected bool
}{
{"nil fields", &SubscriptionFilter{}, true},
{"empty slice", &SubscriptionFilter{EventTypes: []string{}}, true},
{"has event types", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}, false},
{"has actor pattern", &SubscriptionFilter{ActorPattern: "order-*"}, false},
{"has both", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.filter.IsEmpty()
if result != tt.expected {
t.Errorf("SubscriptionFilter.IsEmpty() = %v, want %v", result, tt.expected)
}
})
}
}
func TestSubscriptionFilter_Matches(t *testing.T) {
tests := []struct {
name string
filter *SubscriptionFilter
event *Event
expected bool
}{
// Nil event
{"nil event", &SubscriptionFilter{}, nil, false},
// Empty filter matches all
{"empty filter", &SubscriptionFilter{}, &Event{EventType: "Test", ActorID: "actor-1"}, true},
// Event type filtering
{"event type match", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}},
&Event{EventType: "OrderPlaced", ActorID: "order-1"}, true},
{"event type mismatch", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}},
&Event{EventType: "OrderShipped", ActorID: "order-1"}, false},
{"event type multiple match first", &SubscriptionFilter{EventTypes: []string{"OrderPlaced", "OrderShipped"}},
&Event{EventType: "OrderPlaced", ActorID: "order-1"}, true},
{"event type multiple match second", &SubscriptionFilter{EventTypes: []string{"OrderPlaced", "OrderShipped"}},
&Event{EventType: "OrderShipped", ActorID: "order-1"}, true},
{"event type multiple no match", &SubscriptionFilter{EventTypes: []string{"OrderPlaced", "OrderShipped"}},
&Event{EventType: "OrderCancelled", ActorID: "order-1"}, false},
// Actor pattern filtering
{"actor pattern exact match", &SubscriptionFilter{ActorPattern: "order-123"},
&Event{EventType: "Test", ActorID: "order-123"}, true},
{"actor pattern exact mismatch", &SubscriptionFilter{ActorPattern: "order-123"},
&Event{EventType: "Test", ActorID: "order-456"}, false},
{"actor pattern wildcard match", &SubscriptionFilter{ActorPattern: "order-*"},
&Event{EventType: "Test", ActorID: "order-123"}, true},
{"actor pattern wildcard mismatch", &SubscriptionFilter{ActorPattern: "order-*"},
&Event{EventType: "Test", ActorID: "user-123"}, false},
// Combined filters (AND logic)
{"combined both match", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"},
&Event{EventType: "OrderPlaced", ActorID: "order-123"}, true},
{"combined event matches actor does not", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"},
&Event{EventType: "OrderPlaced", ActorID: "user-123"}, false},
{"combined actor matches event does not", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"},
&Event{EventType: "OrderShipped", ActorID: "order-123"}, false},
{"combined neither matches", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"},
&Event{EventType: "OrderShipped", ActorID: "user-123"}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.filter.Matches(tt.event)
if result != tt.expected {
t.Errorf("SubscriptionFilter.Matches() = %v, want %v", result, tt.expected)
}
})
}
}

View File

@@ -1,6 +0,0 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": [
"config:recommended"
]
}

View File

@@ -1,215 +0,0 @@
package store
import (
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
)
// TestEventImmutability_MemoryStore verifies that events cannot be modified after persistence
// in the in-memory event store. This demonstrates the append-only nature of event sourcing.
func TestEventImmutability_MemoryStore(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "test-actor-123"
// Create and save an event
originalEvent := &aether.Event{
ID: "evt-immutable-1",
EventType: "TestEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{
"value": "original",
},
Timestamp: time.Now(),
}
err := store.SaveEvent(originalEvent)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Retrieve the event from the store
events, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) == 0 {
t.Fatal("expected 1 event, got 0")
}
retrievedEvent := events[0]
// Verify the stored event has the correct values
if retrievedEvent.Data["value"] != "original" {
t.Errorf("Data value mismatch: got %v, want %v", retrievedEvent.Data["value"], "original")
}
if retrievedEvent.EventType != "TestEvent" {
t.Errorf("EventType mismatch: got %q, want %q", retrievedEvent.EventType, "TestEvent")
}
// Verify ID is correct
if retrievedEvent.ID != "evt-immutable-1" {
t.Errorf("Event ID mismatch: got %q, want %q", retrievedEvent.ID, "evt-immutable-1")
}
}
// TestEventImmutability_NoUpdateMethod verifies that the EventStore interface
// has only append, read methods - no Update or Delete methods.
func TestEventImmutability_NoUpdateMethod(t *testing.T) {
// This test documents that the EventStore interface is append-only.
// The interface intentionally provides:
// - SaveEvent: append only
// - GetEvents: read only
// - GetLatestVersion: read only
//
// To verify this, we demonstrate that any attempt to call non-existent
// update/delete methods would be caught at compile time (not runtime).
// This is enforced by the interface definition in event.go which does
// not include Update, Delete, or Modify methods.
store := NewInMemoryEventStore()
// Compile-time check: these would not compile if we tried them:
// store.Update(event) // compile error: no such method
// store.Delete(eventID) // compile error: no such method
// store.Modify(eventID, newData) // compile error: no such method
// Only these methods exist:
var eventStore aether.EventStore = store
if eventStore == nil {
t.Fatal("eventStore is nil")
}
// If we got here, the compile-time checks passed
t.Log("EventStore interface enforces append-only semantics by design")
}
// TestEventImmutability_VersionOnlyGoesUp verifies that versions are monotonically
// increasing and attempting to save with a non-increasing version fails.
func TestEventImmutability_VersionOnlyGoesUp(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "actor-version-check"
// Save first event with version 1
event1 := &aether.Event{
ID: "evt-v1",
EventType: "Event1",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event1)
if err != nil {
t.Fatalf("SaveEvent(v1) failed: %v", err)
}
// Try to save with same version - should fail
event2Same := &aether.Event{
ID: "evt-v1-again",
EventType: "Event2",
ActorID: actorID,
Version: 1, // Same version
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2Same)
if err == nil {
t.Error("expected SaveEvent(same version) to fail, but it succeeded")
}
// Try to save with lower version - should fail
event3Lower := &aether.Event{
ID: "evt-v0",
EventType: "Event3",
ActorID: actorID,
Version: 0, // Lower version
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event3Lower)
if err == nil {
t.Error("expected SaveEvent(lower version) to fail, but it succeeded")
}
// Save with next version - should succeed
event4Next := &aether.Event{
ID: "evt-v2",
EventType: "Event4",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event4Next)
if err != nil {
t.Fatalf("SaveEvent(v2) failed: %v", err)
}
// Verify we have exactly 2 events
events, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 2 {
t.Errorf("expected 2 events, got %d", len(events))
}
}
// TestEventImmutability_EventCannotBeDeleted verifies that there is no way to delete
// events from the store through the EventStore interface.
func TestEventImmutability_EventCannotBeDeleted(t *testing.T) {
store := NewInMemoryEventStore()
actorID := "actor-nodelete"
// Save an event
event := &aether.Event{
ID: "evt-nodelete",
EventType: "ImportantEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"critical": true},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Retrieve it
events1, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents (1) failed: %v", err)
}
if len(events1) != 1 {
t.Fatal("expected 1 event after save")
}
// Try to delete through interface - this method doesn't exist
// store.Delete("evt-nodelete") // compile error: no such method
// store.DeleteByActorID(actorID) // compile error: no such method
// Verify the event is still there (we can't delete it)
events2, err := store.GetEvents(actorID, 0)
if err != nil {
t.Fatalf("GetEvents (2) failed: %v", err)
}
if len(events2) != 1 {
t.Errorf("expected 1 event (should not be deletable), got %d", len(events2))
}
if events2[0].ID != "evt-nodelete" {
t.Errorf("event ID changed: got %q, want %q", events2[0].ID, "evt-nodelete")
}
}

View File

@@ -1,431 +0,0 @@
//go:build integration
package store
import (
"context"
"log"
"os"
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats-server/v2/server"
)
func setupNatsServer() (*server.Server, *nats.Conn, func()) {
opts := &server.Options{
Port: -1,
JetStream: true,
StoreDir: "/tmp/nats-test-" + time.Now().Format("20060102150405"),
}
s, err := server.NewServer(opts)
if err != nil {
log.Fatal("Failed to create NATS server:", err)
}
go s.Start()
if !s.ReadyForConnections(4 * time.Second) {
log.Fatal("NATS server failed to start")
}
nc, err := nats.Connect(s.ClientURL())
if err != nil {
s.Shutdown()
log.Fatal("Failed to connect to NATS:", err)
}
return s, nc, func() {
nc.Close()
s.Shutdown()
os.RemoveAll(opts.StoreDir)
}
}
func TestUpdateVersionCache(t *testing.T) {
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_update_cache")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
actorID := "test-actor-1"
tests := []struct {
name string
cachedVersion int64
newVersion int64
expectUpdate bool
expectVersion int64
}{
{
name: "update when new version is greater",
cachedVersion: 5,
newVersion: 10,
expectUpdate: true,
expectVersion: 10,
},
{
name: "do not update when new version is equal",
cachedVersion: 5,
newVersion: 5,
expectUpdate: false,
expectVersion: 5,
},
{
name: "do not update when new version is less",
cachedVersion: 10,
newVersion: 5,
expectUpdate: false,
expectVersion: 10,
},
{
name: "update when no cached version exists",
cachedVersion: 0,
newVersion: 1,
expectUpdate: true,
expectVersion: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Set up cached version
store.versions = make(map[string]int64)
store.versions[actorID] = tt.cachedVersion
// Call UpdateVersionCache
store.UpdateVersionCache(actorID, tt.newVersion)
// Verify result
if tt.expectUpdate {
if version, ok := store.versions[actorID]; !ok {
t.Error("Expected version to be updated but it wasn't cached")
} else if version != tt.expectVersion {
t.Errorf("Expected version %d, got %d", tt.expectVersion, version)
}
} else {
if version, ok := store.versions[actorID]; !ok {
t.Error("Expected version to remain cached")
} else if version != tt.expectVersion {
t.Errorf("Expected version to remain %d, got %d", tt.expectVersion, version)
}
}
})
}
}
func TestUpdateVersionCache_Concurrent(t *testing.T) {
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_update_cache_concurrent")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
actorID := "concurrent-actor"
store.versions[actorID] = 1
const numGoroutines = 50
const maxVersion = 100
var done = make(chan struct{})
var updates int32
for i := 0; i < numGoroutines; i++ {
version := int64(1 + (i % maxVersion))
go func(v int64) {
store.UpdateVersionCache(actorID, v)
select {
case <-done:
default:
updates++
}
}(version)
}
close(done)
time.Sleep(100 * time.Millisecond)
finalVersion := store.versions[actorID]
if finalVersion > maxVersion {
t.Errorf("Expected version to be at most %d, got %d", maxVersion, finalVersion)
}
}
func TestSubscribeToEventStored(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_subscribe_event_stored")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
eventBusWithStore := NewNATSEventBusWithBroadcaster(nc, store, "")
if eventBusWithStore == nil {
t.Fatalf("Failed to create event bus with broadcaster")
}
defer eventBusWithStore.Stop()
ch := eventBusWithStore.SubscribeToEventStored("*")
if ch == nil {
t.Fatal("SubscribeToEventStored returned nil channel")
}
actorID := "subscribe-test-actor"
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TestEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"key": "value"},
Timestamp: time.Now(),
}
eventBusWithStore.Publish("", event)
select {
case receivedEvent := <-ch:
if receivedEvent.EventType != aether.EventTypeEventStored {
t.Errorf("Expected EventTypeEventStored, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
data, ok := receivedEvent.Data["actorId"].(string)
if !ok || data != actorID {
t.Errorf("Expected actorId in data to be %s", actorID)
}
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for EventStored event")
}
}
func TestCrossNodeBroadcasting_SingleNode(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_single_node_broadcast")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
defer eventBus.Stop()
actorID := "broadcast-test-actor-1"
localCh := eventBus.Subscribe("")
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"total": 99.99},
Timestamp: time.Now(),
}
eventBus.Publish("", event)
select {
case receivedEvent := <-localCh:
if receivedEvent.EventType != "OrderPlaced" {
t.Errorf("Expected OrderPlaced, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for broadcast event")
}
}
func TestCrossNodeBroadcasting_MultiNode(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s1, nc1, cleanup1 := setupNatsServer()
defer cleanup1()
s2, nc2, cleanup2 := setupNatsServer()
defer cleanup2()
ctx := context.Background()
store1, err := NewJetStreamEventStore(nc1, "test_multi_node_1")
if err != nil {
t.Fatalf("Failed to create store 1: %v", err)
}
store2, err := NewJetStreamEventStore(nc2, "test_multi_node_2")
if err != nil {
t.Fatalf("Failed to create store 2: %v", err)
}
eventBus1 := NewNATSEventBusWithBroadcaster(nc1, store1, "")
eventBus2 := NewNATSEventBusWithBroadcaster(nc2, store2, "")
defer eventBus1.Stop()
defer eventBus2.Stop()
actorID := "multi-node-actor"
receiverCh := eventBus2.Subscribe("")
event := &aether.Event{
ID: uuid.New().String(),
EventType: "InventoryReserved",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"quantity": 5},
Timestamp: time.Now(),
}
eventBus1.Publish("", event)
select {
case receivedEvent := <-receiverCh:
if receivedEvent.EventType != "InventoryReserved" {
t.Errorf("Expected InventoryReserved, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
case <-time.After(3 * time.Second):
t.Fatal("Timeout waiting for cross-node event")
}
}
func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
tenantAStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-a")
if err != nil {
t.Fatalf("Failed to create tenant A store: %v", err)
}
tenantBStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-b")
if err != nil {
t.Fatalf("Failed to create tenant B store: %v", err)
}
tenantAEventBus := NewNATSEventBusWithBroadcaster(nc, tenantAStore, "tenant-a")
tenantBEventBus := NewNATSEventBusWithBroadcaster(nc, tenantBStore, "tenant-b")
defer tenantAEventBus.Stop()
defer tenantBEventBus.Stop()
tenantACh := tenantAEventBus.Subscribe("tenant-a")
tenantBCh := tenantBEventBus.Subscribe("tenant-b")
actorID := "tenant-actor"
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TenantEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"data": "tenant-a"},
Timestamp: time.Now(),
}
tenantAEventBus.Publish("tenant-a", event)
select {
case receivedEvent := <-tenantACh:
if receivedEvent.EventType != "TenantEvent" {
t.Errorf("Expected TenantEvent in tenant A, got %s", receivedEvent.EventType)
}
case <-time.After(2 * time.Second):
t.Error("Timeout waiting for tenant A to receive event")
}
select {
case <-tenantBCh:
t.Error("Tenant B should not receive tenant A's events")
case <-time.After(1 * time.Second):
// Expected - tenant B should not receive events from tenant A
}
}
func TestUpdateVersionCache_EventStored(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_version_cache_eventstored")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
defer eventBus.Stop()
actorID := "version-cache-actor"
store.UpdateVersionCache(actorID, 5)
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TestEvent",
ActorID: actorID,
Version: 10,
Data: map[string]interface{}{"test": true},
Timestamp: time.Now(),
}
eventBus.Publish("", event)
time.Sleep(100 * time.Millisecond)
storedVersion, err := store.GetLatestVersion(actorID)
if err != nil {
t.Fatalf("Failed to get latest version: %v", err)
}
if storedVersion != 10 {
t.Errorf("Expected version 10, got %d", storedVersion)
}
cacheVersion, ok := store.GetCachedVersion(actorID)
if !ok {
t.Error("Expected version to be in cache")
} else if cacheVersion != 10 {
t.Errorf("Expected cached version 10, got %d", cacheVersion)
}
}

View File

@@ -1,7 +1,6 @@
package store
import (
"context"
"encoding/json"
"fmt"
"strings"
@@ -10,7 +9,6 @@ import (
"git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go"
"github.com/google/uuid"
)
// Default configuration values for JetStream event store
@@ -21,14 +19,7 @@ const (
// JetStreamConfig holds configuration options for JetStreamEventStore
type JetStreamConfig struct {
// StreamRetention is how long to keep events (default: 1 year).
// JetStream enforces this retention policy at the storage level using a limits-based policy:
// - MaxAge: Events older than this duration are automatically deleted
// - Storage is file-based (nats.FileStorage) for durability
// - Once the retention period expires, events are permanently removed from the stream
// This ensures that old events do not consume storage indefinitely.
// To keep events indefinitely, set StreamRetention to a very large value or configure
// a custom retention policy in the JetStream stream configuration.
// StreamRetention is how long to keep events (default: 1 year)
StreamRetention time.Duration
// ReplicaCount is the number of replicas for high availability (default: 1)
ReplicaCount int
@@ -49,52 +40,14 @@ func DefaultJetStreamConfig() JetStreamConfig {
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
// It also implements EventStoreWithErrors to report malformed events during replay.
//
// ## Immutability Guarantee
//
// JetStreamEventStore is append-only. Events are stored in a JetStream stream that
// is configured with file-based storage (nats.FileStorage) and a retention policy
// (nats.LimitsPolicy). The configured MaxAge retention policy ensures that old events
// eventually expire, but during their lifetime, events are never modified or deleted
// through the EventStore API. Once an event is published to the stream:
// - It cannot be updated
// - It cannot be deleted before expiration
// - It can only be read
//
// This architectural guarantee, combined with the EventStore interface providing
// no Update or Delete methods, ensures events are immutable and suitable as an
// audit trail.
//
// ## Version Cache Invalidation Strategy
//
// JetStreamEventStore maintains an in-memory cache of actor versions for optimistic
// concurrency control. The cache is invalidated on any miss (GetLatestVersion call
// that finds a newer version in JetStream) to ensure consistency even when external
// processes write to the same JetStream stream.
//
// If only Aether owns the stream (single-writer assumption), the cache provides
// excellent performance for repeated version checks. If external writers modify
// the stream, the cache will remain consistent because:
//
// 1. On SaveEvent: getLatestVersionLocked() checks JetStream on cache miss
// 2. On GetLatestVersion: If actual version > cached version, cache is invalidated
// 3. Subsequent checks for that actor will fetch fresh data from JetStream
//
// This strategy prevents data corruption from stale cache while maintaining
// performance for the single-writer case.
type JetStreamEventStore struct {
js nats.JetStreamContext
streamName string
config JetStreamConfig
mu sync.Mutex // Protects version checks during SaveEvent
versions map[string]int64 // actorID -> latest version cache
broadcaster aether.EventBroadcaster // Optional broadcaster for EventStored events
namespace string // Optional namespace for event publishing
}
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
@@ -150,8 +103,6 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
streamName: effectiveStreamName,
config: config,
versions: make(map[string]int64),
broadcaster: nil,
namespace: "",
}, nil
}
@@ -165,58 +116,6 @@ func (jes *JetStreamEventStore) GetStreamName() string {
return jes.streamName
}
// NewJetStreamEventStoreWithBroadcaster creates a new JetStream-based event store with broadcaster support.
// The broadcaster receives EventStored events when events are successfully saved.
func NewJetStreamEventStoreWithBroadcaster(natsConn *nats.Conn, streamName string, broadcaster aether.EventBroadcaster, namespace string) (*JetStreamEventStore, error) {
config := DefaultJetStreamConfig()
if namespace != "" {
config.Namespace = namespace
}
js, err := natsConn.JetStream()
if err != nil {
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
}
// Apply defaults for zero values
if config.StreamRetention == 0 {
config.StreamRetention = DefaultStreamRetention
}
if config.ReplicaCount == 0 {
config.ReplicaCount = DefaultReplicaCount
}
// Apply namespace prefix to stream name if provided
effectiveStreamName := streamName
if config.Namespace != "" {
effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName)
}
// Create or update the stream
stream := &nats.StreamConfig{
Name: effectiveStreamName,
Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)},
Storage: nats.FileStorage,
Retention: nats.LimitsPolicy,
MaxAge: config.StreamRetention,
Replicas: config.ReplicaCount,
}
_, err = js.AddStream(stream)
if err != nil && !strings.Contains(err.Error(), "already exists") {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
return &JetStreamEventStore{
js: js,
streamName: effectiveStreamName,
config: config,
versions: make(map[string]int64),
broadcaster: broadcaster,
namespace: namespace,
}, nil
}
// SaveEvent persists an event to JetStream.
// Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor.
@@ -224,20 +123,7 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
jes.mu.Lock()
defer jes.mu.Unlock()
// Check cache first
if version, ok := jes.versions[event.ActorID]; ok {
// Validate version against cached version
if event.Version <= version {
return &aether.VersionConflictError{
ActorID: event.ActorID,
AttemptedVersion: event.Version,
CurrentVersion: version,
}
}
// Version check passed, proceed with publish while holding lock
} else {
// Cache miss - need to check actual stream
// Get current latest version while holding lock to prevent TOCTOU race
// Get current latest version for this actor
currentVersion, err := jes.getLatestVersionLocked(event.ActorID)
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
@@ -252,10 +138,6 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
}
}
// Update cache with current version
jes.versions[event.ActorID] = currentVersion
}
// Serialize event to JSON
data, err := json.Marshal(event)
if err != nil {
@@ -274,57 +156,41 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
return fmt.Errorf("failed to publish event to JetStream: %w", err)
}
// Update version cache after successful publish
// Update version cache
jes.versions[event.ActorID] = event.Version
// Publish EventStored event after successful save (if broadcaster is configured)
if jes.broadcaster != nil {
jes.publishEventStored(event)
}
return nil
}
// publishEventStored publishes an EventStored event to the broadcaster.
// This is called after a successful SaveEvent to notify subscribers.
//
// EventStored Event Schema:
// - EventType: "EventStored" (aether.EventTypeEventStored)
// - ActorID: ID of the actor that the original event was about
// - Version: version of the stored event
// - Data:
// - eventId: (string) ID of the stored event
// - actorId: (string) ID of the actor
// - version: (int64) version of the event
// - timestamp: (int64) Unix timestamp of when the event was stored
//
// Example usage with NATSEventBus:
//
// eventBus := aether.NewNATSEventBus(natsConn)
// store := store.NewJetStreamEventStoreWithBroadcaster(natsConn, "events", eventBus, "")
// ch := eventBus.SubscribeToEventStored("*")
//
// for event := range ch {
// actorID := event.Data["actorId"].(string)
// version := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
eventStored := &aether.Event{
ID: uuid.New().String(),
EventType: aether.EventTypeEventStored,
ActorID: originalEvent.ActorID, // EventStored is about the original actor
Version: originalEvent.Version, // Preserve the version of the stored event
Data: map[string]interface{}{
"eventId": originalEvent.ID,
"actorId": originalEvent.ActorID,
"version": originalEvent.Version,
"timestamp": originalEvent.Timestamp.Unix(),
},
Timestamp: time.Now(),
// getLatestVersionLocked returns the latest version for an actor.
// Caller must hold jes.mu.
func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) {
// Check cache first
if version, ok := jes.versions[actorID]; ok {
return version, nil
}
jes.broadcaster.Publish(jes.namespace, eventStored)
// Fetch from JetStream - use internal method that returns result
result, err := jes.getEventsWithErrorsInternal(actorID, 0)
if err != nil {
return 0, err
}
if len(result.Events) == 0 {
return 0, nil
}
latestVersion := int64(0)
for _, event := range result.Events {
if event.Version > latestVersion {
latestVersion = event.Version
}
}
// Update cache
jes.versions[actorID] = latestVersion
return latestVersion, nil
}
// GetEvents retrieves all events for an actor since a version.
@@ -410,96 +276,28 @@ func (jes *JetStreamEventStore) getEventsWithErrorsInternal(actorID string, from
return result, nil
}
// GetLatestVersion returns the latest version for an actor in O(1) time.
// It uses JetStream's DeliverLast() option to fetch only the last message
// instead of scanning all events, making this O(1) instead of O(n).
// GetLatestVersion returns the latest version for an actor
func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) {
// Create subject filter for this actor
subject := fmt.Sprintf("%s.events.%s.%s",
jes.streamName,
sanitizeSubject(extractActorType(actorID)),
sanitizeSubject(actorID))
// Create consumer to read only the last message
consumer, err := jes.js.PullSubscribe(subject, "", nats.DeliverLast())
events, err := jes.GetEvents(actorID, 0)
if err != nil {
return 0, fmt.Errorf("failed to create consumer: %w", err)
}
defer consumer.Unsubscribe()
// Fetch only the last message
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
if err != nil {
if err == nats.ErrTimeout {
// No messages for this actor, return 0
return 0, nil
}
return 0, fmt.Errorf("failed to fetch last message: %w", err)
return 0, err
}
if len(msgs) == 0 {
// No events for this actor
if len(events) == 0 {
return 0, nil
}
// Parse the last message to get the version
var event aether.Event
if err := json.Unmarshal(msgs[0].Data, &event); err != nil {
return 0, fmt.Errorf("failed to unmarshal last event: %w", err)
latestVersion := int64(0)
for _, event := range events {
if event.Version > latestVersion {
latestVersion = event.Version
}
}
msgs[0].Ack()
return event.Version, nil
return latestVersion, nil
}
// getLatestVersionLocked is like GetLatestVersion but assumes the caller already holds jes.mu.
// This is used internally to avoid releasing and reacquiring the lock during SaveEvent,
// which would create a TOCTOU race condition.
func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) {
// Create subject filter for this actor
subject := fmt.Sprintf("%s.events.%s.%s",
jes.streamName,
sanitizeSubject(extractActorType(actorID)),
sanitizeSubject(actorID))
// Create consumer to read only the last message
consumer, err := jes.js.PullSubscribe(subject, "", nats.DeliverLast())
if err != nil {
return 0, fmt.Errorf("failed to create consumer: %w", err)
}
defer consumer.Unsubscribe()
// Fetch only the last message
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
if err != nil {
if err == nats.ErrTimeout {
// No messages for this actor, return 0
return 0, nil
}
return 0, fmt.Errorf("failed to fetch last message: %w", err)
}
if len(msgs) == 0 {
// No events for this actor
return 0, nil
}
// Parse the last message to get the version
var event aether.Event
if err := json.Unmarshal(msgs[0].Data, &event); err != nil {
return 0, fmt.Errorf("failed to unmarshal last event: %w", err)
}
msgs[0].Ack()
return event.Version, nil
}
// GetLatestSnapshot gets the most recent snapshot for an actor.
// Returns an error if no snapshot exists for the actor (unlike GetLatestVersion which returns 0).
// This is intentional: a missing snapshot is different from a missing event stream.
// If an actor has no events, that's a normal state (use version 0).
// If an actor has no snapshot, that could indicate an error or it could be normal
// depending on the use case, so we let the caller decide how to handle it.
// GetLatestSnapshot gets the most recent snapshot for an actor
func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
// Create subject for snapshots
subject := fmt.Sprintf("%s.snapshots.%s.%s",
@@ -517,14 +315,12 @@ func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.Actor
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
if err != nil {
if err == nats.ErrTimeout {
// No snapshot found - return error to distinguish from successful nil result
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
}
return nil, fmt.Errorf("failed to fetch snapshot: %w", err)
}
if len(msgs) == 0 {
// No snapshot exists for this actor
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
}
@@ -581,43 +377,5 @@ func sanitizeSubject(s string) string {
return s
}
// UpdateVersionCache updates the version cache for a specific actor.
// This is used when receiving events from other nodes via NATS to keep
// the version cache consistent across cluster nodes.
//
// Only updates if the new version is greater than the cached version to prevent
// stale cache entries from causing version conflicts.
func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) {
jes.mu.Lock()
defer jes.mu.Unlock()
// Only update if the new version is greater than cached version
if currentVersion, ok := jes.versions[actorID]; !ok || version > currentVersion {
jes.versions[actorID] = version
}
}
// GetCachedVersion returns the cached version for an actor, if available.
func (jes *JetStreamEventStore) GetCachedVersion(actorID string) (int64, bool) {
jes.mu.Lock()
defer jes.mu.Unlock()
version, ok := jes.versions[actorID]
return version, ok
}
// SetBroadcaster sets the event broadcaster for this store.
// The broadcaster is used to publish EventStored events when events are saved.
func (jes *JetStreamEventStore) SetBroadcaster(broadcaster aether.EventBroadcaster) {
jes.mu.Lock()
defer jes.mu.Unlock()
jes.broadcaster = broadcaster
}
// Close closes the JetStream event store and cleans up resources.
func (jes *JetStreamEventStore) Close(ctx context.Context) error {
return nil
}
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)

View File

@@ -1,147 +0,0 @@
//go:build integration
package store
import (
"fmt"
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
)
// BenchmarkGetLatestVersion_WithManyEvents benchmarks GetLatestVersion performance
// with a large number of events per actor.
// This demonstrates the O(1) performance by showing that time doesn't increase
// significantly with more events.
func BenchmarkGetLatestVersion_WithManyEvents(b *testing.B) {
nc := getTestNATSConnection(&testing.T{})
if nc == nil {
b.Skip("NATS not available")
return
}
defer nc.Close()
store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-getversion-%d", time.Now().UnixNano()))
if err != nil {
b.Fatalf("failed to create store: %v", err)
}
actorID := "actor-bench-test"
// Populate with 1000 events
for i := 1; i <= 1000; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "BenchEvent",
ActorID: actorID,
Version: int64(i),
Data: map[string]interface{}{"index": i},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err != nil {
b.Fatalf("SaveEvent failed for event %d: %v", i, err)
}
}
// Benchmark GetLatestVersion
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := store.GetLatestVersion(actorID)
if err != nil {
b.Fatalf("GetLatestVersion failed: %v", err)
}
}
b.StopTimer()
}
// BenchmarkGetLatestVersion_NoCache benchmarks GetLatestVersion without cache
// to show that even uncached lookups are very fast due to DeliverLast optimization.
// A new store instance is created before timing to bypass the version cache.
func BenchmarkGetLatestVersion_NoCache(b *testing.B) {
nc := getTestNATSConnection(&testing.T{})
if nc == nil {
b.Skip("NATS not available")
return
}
defer nc.Close()
store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-nocache-%d", time.Now().UnixNano()))
if err != nil {
b.Fatalf("failed to create store: %v", err)
}
actorID := "actor-bench-nocache"
// Populate with 1000 events
for i := 1; i <= 1000; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "BenchEvent",
ActorID: actorID,
Version: int64(i),
Data: map[string]interface{}{"index": i},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err != nil {
b.Fatalf("SaveEvent failed for event %d: %v", i, err)
}
}
// Create a new store instance to bypass version cache
uncachedStore, err := NewJetStreamEventStore(nc, store.GetStreamName())
if err != nil {
b.Fatalf("failed to create uncached store: %v", err)
}
// Benchmark GetLatestVersion without using cache
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := uncachedStore.GetLatestVersion(actorID)
if err != nil {
b.Fatalf("GetLatestVersion failed: %v", err)
}
}
b.StopTimer()
}
// BenchmarkGetLatestVersion_SingleEvent benchmarks with minimal data
func BenchmarkGetLatestVersion_SingleEvent(b *testing.B) {
nc := getTestNATSConnection(&testing.T{})
if nc == nil {
b.Skip("NATS not available")
return
}
defer nc.Close()
store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-single-%d", time.Now().UnixNano()))
if err != nil {
b.Fatalf("failed to create store: %v", err)
}
actorID := "actor-single"
event := &aether.Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err != nil {
b.Fatalf("SaveEvent failed: %v", err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := store.GetLatestVersion(actorID)
if err != nil {
b.Fatalf("GetLatestVersion failed: %v", err)
}
}
b.StopTimer()
}

View File

@@ -2,10 +2,8 @@ package store
import (
"sync"
"time"
"git.flowmade.one/flowmade-one/aether"
"github.com/google/uuid"
)
// InMemoryEventStore provides a simple in-memory event store for testing
@@ -13,8 +11,6 @@ type InMemoryEventStore struct {
mu sync.RWMutex
events map[string][]*aether.Event // actorID -> events
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
broadcaster aether.EventBroadcaster // optional broadcaster for EventStored events
namespace string // optional namespace for event publishing
}
// NewInMemoryEventStore creates a new in-memory event store
@@ -25,21 +21,9 @@ func NewInMemoryEventStore() *InMemoryEventStore {
}
}
// NewInMemoryEventStoreWithBroadcaster creates a new in-memory event store with an event broadcaster
// The broadcaster receives EventStored events when events are successfully saved.
func NewInMemoryEventStoreWithBroadcaster(broadcaster aether.EventBroadcaster, namespace string) *InMemoryEventStore {
return &InMemoryEventStore{
events: make(map[string][]*aether.Event),
snapshots: make(map[string][]*aether.ActorSnapshot),
broadcaster: broadcaster,
namespace: namespace,
}
}
// SaveEvent saves an event to the in-memory store.
// Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor.
// If a broadcaster is configured, publishes an EventStored event on success.
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
es.mu.Lock()
defer es.mu.Unlock()
@@ -67,35 +51,9 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
es.events[event.ActorID] = make([]*aether.Event, 0)
}
es.events[event.ActorID] = append(es.events[event.ActorID], event)
// Publish EventStored event after successful save (if broadcaster is configured)
if es.broadcaster != nil {
es.publishEventStored(event)
}
return nil
}
// publishEventStored publishes an EventStored event to the broadcaster.
// This is called after a successful SaveEvent to notify subscribers.
func (es *InMemoryEventStore) publishEventStored(originalEvent *aether.Event) {
eventStored := &aether.Event{
ID: uuid.New().String(),
EventType: aether.EventTypeEventStored,
ActorID: originalEvent.ActorID, // EventStored is about the original actor
Version: originalEvent.Version, // Preserve the version of the stored event
Data: map[string]interface{}{
"eventId": originalEvent.ID,
"actorId": originalEvent.ActorID,
"version": originalEvent.Version,
"timestamp": originalEvent.Timestamp.Unix(),
},
Timestamp: time.Now(),
}
es.broadcaster.Publish(es.namespace, eventStored)
}
// GetEvents retrieves events for an actor from a specific version
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
es.mu.RLock()

View File

@@ -1905,181 +1905,3 @@ func TestSaveEvent_MetadataPreservedAcrossMultipleEvents(t *testing.T) {
}
}
}
// === EventStored Publishing Tests ===
func TestSaveEvent_WithBroadcaster_PublishesEventStored(t *testing.T) {
// Create a mock broadcaster to capture published events
broadcaster := aether.NewEventBus()
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
// Subscribe to EventStored events
ch := broadcaster.Subscribe("test-namespace")
defer broadcaster.Unsubscribe("test-namespace", ch)
event := &aether.Event{
ID: "evt-123",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{
"total": 100.50,
},
Timestamp: time.Now(),
}
// Save event
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Check if EventStored was published
select {
case publishedEvent := <-ch:
if publishedEvent == nil {
t.Fatal("received nil event from broadcaster")
}
if publishedEvent.EventType != aether.EventTypeEventStored {
t.Errorf("expected EventType %q, got %q", aether.EventTypeEventStored, publishedEvent.EventType)
}
if publishedEvent.ActorID != "order-456" {
t.Errorf("expected ActorID %q, got %q", "order-456", publishedEvent.ActorID)
}
if publishedEvent.Version != 1 {
t.Errorf("expected Version 1, got %d", publishedEvent.Version)
}
// Check data contains original event info
if publishedEvent.Data["eventId"] != "evt-123" {
t.Errorf("expected eventId %q, got %q", "evt-123", publishedEvent.Data["eventId"])
}
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for EventStored event")
}
}
func TestSaveEvent_VersionConflict_NoEventStored(t *testing.T) {
broadcaster := aether.NewEventBus()
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
// Subscribe to EventStored events
ch := broadcaster.Subscribe("test-namespace")
defer broadcaster.Unsubscribe("test-namespace", ch)
// Save first event
event1 := &aether.Event{
ID: "evt-1",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event1)
if err != nil {
t.Fatalf("SaveEvent(event1) failed: %v", err)
}
// Drain the first EventStored event
select {
case <-ch:
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for first EventStored event")
}
// Try to save event with non-increasing version (should fail)
event2 := &aether.Event{
ID: "evt-2",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1, // Same version, should conflict
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2)
if !errors.Is(err, aether.ErrVersionConflict) {
t.Fatalf("expected ErrVersionConflict, got %v", err)
}
// Verify no EventStored event was published
select {
case <-ch:
t.Fatal("expected no EventStored event, but received one")
case <-time.After(50 * time.Millisecond):
// Expected - no event published
}
}
func TestSaveEvent_MultipleEvents_PublishesMultipleEventStored(t *testing.T) {
broadcaster := aether.NewEventBus()
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
// Subscribe to EventStored events
ch := broadcaster.Subscribe("test-namespace")
defer broadcaster.Unsubscribe("test-namespace", ch)
// Save multiple events
for i := int64(1); i <= 3; i++ {
event := &aether.Event{
ID: fmt.Sprintf("evt-%d", i),
EventType: "OrderPlaced",
ActorID: "order-456",
Version: i,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
}
// Verify we received 3 EventStored events in order
for i := int64(1); i <= 3; i++ {
select {
case publishedEvent := <-ch:
if publishedEvent == nil {
t.Fatal("received nil event from broadcaster")
}
if publishedEvent.Version != i {
t.Errorf("expected Version %d, got %d", i, publishedEvent.Version)
}
case <-time.After(100 * time.Millisecond):
t.Fatalf("timeout waiting for EventStored event %d", i)
}
}
}
func TestSaveEvent_WithoutBroadcaster_NoPanic(t *testing.T) {
// Test that SaveEvent works without a broadcaster (nil broadcaster)
store := NewInMemoryEventStore()
event := &aether.Event{
ID: "evt-123",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{
"total": 100.50,
},
Timestamp: time.Now(),
}
// This should not panic even though broadcaster is nil
err := store.SaveEvent(event)
if err != nil {
t.Fatalf("SaveEvent failed: %v", err)
}
// Verify event was saved
events, err := store.GetEvents("order-456", 0)
if err != nil {
t.Fatalf("GetEvents failed: %v", err)
}
if len(events) != 1 {
t.Fatalf("expected 1 event, got %d", len(events))
}
}