Files
aether/CLAUDE.md
Hugo Nijhuis f62964bf3b
All checks were successful
CI / build (pull_request) Successful in 15s
CI / build (push) Successful in 16s
Add namespace-scoped event stores for storage isolation
Add support for optional namespace prefixes on JetStreamEventStore streams
to enable complete namespace isolation at the storage level:

- Add Namespace field to JetStreamConfig
- Add NewJetStreamEventStoreWithNamespace convenience constructor
- Prefix stream names with sanitized namespace when configured
- Add GetNamespace and GetStreamName accessor methods
- Add unit tests for namespace functionality
- Document namespace-scoped stores in CLAUDE.md

The namespace prefix is sanitized (spaces, dots, wildcards converted to
underscores) and prepended to the stream name, ensuring events from one
namespace cannot be read from another namespace's store while maintaining
full backward compatibility for non-namespaced stores.

Closes #19

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 19:01:03 +01:00

195 lines
5.8 KiB
Markdown

# Aether
Distributed actor system with event sourcing for Go, powered by NATS.
## Organization Context
This repo is part of Flowmade. See:
- [Organization manifesto](https://git.flowmade.one/flowmade-one/architecture/src/branch/main/manifesto.md) - who we are, what we believe
- [Repository map](https://git.flowmade.one/flowmade-one/architecture/src/branch/main/repos.md) - how this fits in the bigger picture
- [Vision](./vision.md) - what this specific product does
## Setup
```bash
git clone git@git.flowmade.one:flowmade-one/aether.git
cd aether
go mod download
```
Requires NATS server for integration tests:
```bash
# Install NATS
brew install nats-server
# Run with JetStream enabled
nats-server -js
```
## Project Structure
```
aether/
├── event.go # Event, ActorSnapshot, EventStore interface
├── eventbus.go # EventBus, EventBroadcaster interface
├── nats_eventbus.go # NATSEventBus - cross-node event broadcasting
├── store/
│ ├── memory.go # InMemoryEventStore (testing)
│ └── jetstream.go # JetStreamEventStore (production)
├── cluster/
│ ├── manager.go # ClusterManager
│ ├── discovery.go # NodeDiscovery
│ ├── hashring.go # ConsistentHashRing
│ ├── shard.go # ShardManager
│ ├── leader.go # LeaderElection
│ └── types.go # Cluster types
└── model/
└── model.go # EventStorming model types
```
## Development
```bash
make build # Build the library
make test # Run tests
make lint # Run linters
```
## Architecture
### Event Sourcing
Events are the source of truth. State is derived by replaying events.
```go
// Create an event
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: "order-123",
Version: 1,
Data: map[string]interface{}{"total": 100.00},
Timestamp: time.Now(),
}
// Persist to event store
store.SaveEvent(event)
// Replay events to rebuild state
events, _ := store.GetEvents("order-123", 0)
```
### Event Versioning
Events for each actor must have **monotonically increasing versions**. This ensures event stream integrity and enables optimistic concurrency control.
#### Version Semantics
- Each actor has an independent version sequence
- Version must be strictly greater than the current latest version
- For new actors (no events), the first event must have version > 0
- Non-consecutive versions are allowed (gaps are permitted)
#### Optimistic Concurrency Pattern
```go
// 1. Get current version
currentVersion, _ := store.GetLatestVersion("order-123")
// 2. Create event with next version
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderUpdated",
ActorID: "order-123",
Version: currentVersion + 1,
Data: map[string]interface{}{"status": "shipped"},
Timestamp: time.Now(),
}
// 3. Attempt to save
err := store.SaveEvent(event)
if errors.Is(err, aether.ErrVersionConflict) {
// Another writer won - reload and retry if appropriate
var versionErr *aether.VersionConflictError
errors.As(err, &versionErr)
log.Printf("Conflict: actor %s has version %d, attempted %d",
versionErr.ActorID, versionErr.CurrentVersion, versionErr.AttemptedVersion)
}
```
#### Error Types
- `ErrVersionConflict` - Sentinel error for version conflicts (use with `errors.Is`)
- `VersionConflictError` - Detailed error with ActorID, CurrentVersion, and AttemptedVersion
### Namespace Isolation
Namespaces provide logical boundaries for events and subscriptions.
#### Event Bus Namespaces
The event bus supports namespace-scoped pub/sub:
```go
// Subscribe to events in a namespace
ch := eventBus.Subscribe("tenant-abc")
// Events are isolated per namespace
eventBus.Publish("tenant-abc", event) // Only tenant-abc subscribers see this
```
#### Namespace-Scoped Event Stores
JetStreamEventStore supports optional namespace prefixes for complete storage isolation:
```go
// Create a namespaced event store (convenience function)
store, err := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-abc")
// Or configure via JetStreamConfig
config := store.JetStreamConfig{
Namespace: "tenant-abc",
StreamRetention: 30 * 24 * time.Hour,
ReplicaCount: 3,
}
store, err := store.NewJetStreamEventStoreWithConfig(natsConn, "events", config)
// The actual stream name becomes "tenant-abc_events"
// Events from one namespace cannot be read from another namespace's store
```
Namespace isolation at the storage level ensures:
- **Complete data isolation**: Events stored with one namespace prefix are invisible to stores with different namespaces
- **Backward compatibility**: Empty namespace (default) works exactly as before
- **Safe characters**: Namespace names are sanitized (spaces, dots, wildcards become underscores)
Use namespace-scoped stores when you need strong isolation guarantees at the persistence layer, such as:
- Multi-tenant deployments where tenant data must be completely separated
- Logical boundaries between different domains or bounded contexts
- Test isolation in integration tests
### Clustering
Aether handles node discovery, leader election, and shard distribution:
```go
// Create cluster manager
manager := cluster.NewClusterManager(natsConn, nodeID)
// Join cluster
manager.Start()
// Leader election happens automatically
if manager.IsLeader() {
// Coordinate shard assignments
}
```
## Key Patterns
- **Events are immutable** - Never modify, only append
- **Versions are monotonic** - Each event must have version > previous for same actor
- **Snapshots for performance** - Periodically snapshot state to avoid full replay
- **Namespaces for isolation** - Not multi-tenancy, just logical boundaries
- **NATS for everything** - Events, pub/sub, clustering all use NATS