Implements cache invalidation on GetLatestVersion when external writers modify the JetStream stream. The strategy ensures consistency in multi-store scenarios while maintaining performance for the single-writer case. Changes: - Add cache invalidation logic to GetLatestVersion() that detects stale cache - Document version cache behavior in JetStreamEventStore struct comment - Add detailed documentation in CLAUDE.md about cache invalidation strategy - Add TestJetStreamEventStore_CacheInvalidationOnExternalWrite integration test - Cache is invalidated by deleting entry, forcing fresh fetch on next check The implementation follows the acceptance criteria by: 1. Documenting the single-writer assumption in code comments 2. Implementing cache invalidation on GetLatestVersion miss 3. Adding comprehensive test for external write scenarios Closes #126 Co-Authored-By: Claude Code <noreply@anthropic.com>
219 lines
7.0 KiB
Markdown
219 lines
7.0 KiB
Markdown
# Aether
|
|
|
|
Distributed actor system with event sourcing for Go, powered by NATS.
|
|
|
|
## Organization Context
|
|
|
|
This repo is part of Flowmade. See:
|
|
- [Organization manifesto](https://git.flowmade.one/flowmade-one/architecture/src/branch/main/manifesto.md) - who we are, what we believe
|
|
- [Repository map](https://git.flowmade.one/flowmade-one/architecture/src/branch/main/repos.md) - how this fits in the bigger picture
|
|
- [Vision](./vision.md) - what this specific product does
|
|
|
|
## Setup
|
|
|
|
```bash
|
|
git clone git@git.flowmade.one:flowmade-one/aether.git
|
|
cd aether
|
|
go mod download
|
|
```
|
|
|
|
Requires NATS server for integration tests:
|
|
```bash
|
|
# Install NATS
|
|
brew install nats-server
|
|
|
|
# Run with JetStream enabled
|
|
nats-server -js
|
|
```
|
|
|
|
## Project Structure
|
|
|
|
```
|
|
aether/
|
|
├── event.go # Event, ActorSnapshot, EventStore interface
|
|
├── eventbus.go # EventBus, EventBroadcaster interface
|
|
├── nats_eventbus.go # NATSEventBus - cross-node event broadcasting
|
|
├── store/
|
|
│ ├── memory.go # InMemoryEventStore (testing)
|
|
│ └── jetstream.go # JetStreamEventStore (production)
|
|
├── cluster/
|
|
│ ├── manager.go # ClusterManager
|
|
│ ├── discovery.go # NodeDiscovery
|
|
│ ├── hashring.go # ConsistentHashRing
|
|
│ ├── shard.go # ShardManager
|
|
│ ├── leader.go # LeaderElection
|
|
│ └── types.go # Cluster types
|
|
└── model/
|
|
└── model.go # EventStorming model types
|
|
```
|
|
|
|
## Development
|
|
|
|
```bash
|
|
make build # Build the library
|
|
make test # Run tests
|
|
make lint # Run linters
|
|
```
|
|
|
|
## Architecture
|
|
|
|
### Event Sourcing
|
|
|
|
Events are the source of truth. State is derived by replaying events.
|
|
|
|
```go
|
|
// Create an event
|
|
event := &aether.Event{
|
|
ID: uuid.New().String(),
|
|
EventType: "OrderPlaced",
|
|
ActorID: "order-123",
|
|
Version: 1,
|
|
Data: map[string]interface{}{"total": 100.00},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
// Persist to event store
|
|
store.SaveEvent(event)
|
|
|
|
// Replay events to rebuild state
|
|
events, _ := store.GetEvents("order-123", 0)
|
|
```
|
|
|
|
### Event Versioning
|
|
|
|
Events for each actor must have **monotonically increasing versions**. This ensures event stream integrity and enables optimistic concurrency control.
|
|
|
|
#### Version Semantics
|
|
|
|
- Each actor has an independent version sequence
|
|
- Version must be strictly greater than the current latest version
|
|
- For new actors (no events), the first event must have version > 0
|
|
- Non-consecutive versions are allowed (gaps are permitted)
|
|
|
|
#### Optimistic Concurrency Pattern
|
|
|
|
```go
|
|
// 1. Get current version
|
|
currentVersion, _ := store.GetLatestVersion("order-123")
|
|
|
|
// 2. Create event with next version
|
|
event := &aether.Event{
|
|
ID: uuid.New().String(),
|
|
EventType: "OrderUpdated",
|
|
ActorID: "order-123",
|
|
Version: currentVersion + 1,
|
|
Data: map[string]interface{}{"status": "shipped"},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
// 3. Attempt to save
|
|
err := store.SaveEvent(event)
|
|
if errors.Is(err, aether.ErrVersionConflict) {
|
|
// Another writer won - reload and retry if appropriate
|
|
var versionErr *aether.VersionConflictError
|
|
errors.As(err, &versionErr)
|
|
log.Printf("Conflict: actor %s has version %d, attempted %d",
|
|
versionErr.ActorID, versionErr.CurrentVersion, versionErr.AttemptedVersion)
|
|
}
|
|
```
|
|
|
|
#### Error Types
|
|
|
|
- `ErrVersionConflict` - Sentinel error for version conflicts (use with `errors.Is`)
|
|
- `VersionConflictError` - Detailed error with ActorID, CurrentVersion, and AttemptedVersion
|
|
|
|
#### Version Cache Invalidation
|
|
|
|
The JetStreamEventStore maintains an in-memory cache of actor versions to optimize
|
|
repeated version checks during optimistic concurrency control. The cache is designed
|
|
to handle multi-store scenarios where external processes may write to the same
|
|
JetStream stream:
|
|
|
|
- **Cache hits**: Cached version is returned immediately for performance
|
|
- **Cache misses**: If no cached version exists, JetStream is queried and cached
|
|
- **External writes**: If GetLatestVersion detects a version newer than cached, the cache is invalidated and fresh data is fetched next time
|
|
|
|
This strategy ensures data consistency even in scenarios with external writers while
|
|
maintaining excellent performance for the single-writer case (where only Aether owns
|
|
the stream).
|
|
|
|
**Implementation detail**: The cache is invalidated by deleting the entry, forcing
|
|
a fresh fetch from JetStream on the next version check for that actor. This is
|
|
safe because:
|
|
|
|
1. SaveEvent uses getLatestVersionLocked() which checks JetStream on cache miss
|
|
2. GetLatestVersion always fetches fresh data and detects stale cache entries
|
|
3. Subsequent checks will fetch from JetStream until the cache is repopulated
|
|
|
|
|
|
### Namespace Isolation
|
|
|
|
Namespaces provide logical boundaries for events and subscriptions.
|
|
|
|
#### Event Bus Namespaces
|
|
|
|
The event bus supports namespace-scoped pub/sub:
|
|
|
|
```go
|
|
// Subscribe to events in a namespace
|
|
ch := eventBus.Subscribe("tenant-abc")
|
|
|
|
// Events are isolated per namespace
|
|
eventBus.Publish("tenant-abc", event) // Only tenant-abc subscribers see this
|
|
```
|
|
|
|
#### Namespace-Scoped Event Stores
|
|
|
|
JetStreamEventStore supports optional namespace prefixes for complete storage isolation:
|
|
|
|
```go
|
|
// Create a namespaced event store (convenience function)
|
|
store, err := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-abc")
|
|
|
|
// Or configure via JetStreamConfig
|
|
config := store.JetStreamConfig{
|
|
Namespace: "tenant-abc",
|
|
StreamRetention: 30 * 24 * time.Hour,
|
|
ReplicaCount: 3,
|
|
}
|
|
store, err := store.NewJetStreamEventStoreWithConfig(natsConn, "events", config)
|
|
|
|
// The actual stream name becomes "tenant-abc_events"
|
|
// Events from one namespace cannot be read from another namespace's store
|
|
```
|
|
|
|
Namespace isolation at the storage level ensures:
|
|
- **Complete data isolation**: Events stored with one namespace prefix are invisible to stores with different namespaces
|
|
- **Backward compatibility**: Empty namespace (default) works exactly as before
|
|
- **Safe characters**: Namespace names are sanitized (spaces, dots, wildcards become underscores)
|
|
|
|
Use namespace-scoped stores when you need strong isolation guarantees at the persistence layer, such as:
|
|
- Multi-tenant deployments where tenant data must be completely separated
|
|
- Logical boundaries between different domains or bounded contexts
|
|
- Test isolation in integration tests
|
|
|
|
### Clustering
|
|
|
|
Aether handles node discovery, leader election, and shard distribution:
|
|
|
|
```go
|
|
// Create cluster manager
|
|
manager := cluster.NewClusterManager(natsConn, nodeID)
|
|
|
|
// Join cluster
|
|
manager.Start()
|
|
|
|
// Leader election happens automatically
|
|
if manager.IsLeader() {
|
|
// Coordinate shard assignments
|
|
}
|
|
```
|
|
|
|
## Key Patterns
|
|
|
|
- **Events are immutable** - Never modify, only append
|
|
- **Versions are monotonic** - Each event must have version > previous for same actor
|
|
- **Snapshots for performance** - Periodically snapshot state to avoid full replay
|
|
- **Namespaces for isolation** - Not multi-tenancy, just logical boundaries
|
|
- **NATS for everything** - Events, pub/sub, clustering all use NATS
|