# Aether [![CI](https://git.flowmade.one/flowmade-one/aether/actions/workflows/ci.yml/badge.svg)](https://git.flowmade.one/flowmade-one/aether/actions/workflows/ci.yml) Event sourcing primitives for Go, powered by NATS. Aether provides composable building blocks for distributed, event-sourced systems without imposing framework opinions on your domain. ## Why Aether? Building distributed, event-sourced systems in Go requires assembling many pieces: event storage, pub/sub, clustering, leader election. Existing solutions are either too heavy (full frameworks with opinions about your domain), too light (just pub/sub), or not NATS-native. Aether provides clear primitives that compose well: - **Event sourcing primitives** - Event, EventStore interface, snapshots - **Event stores** - In-memory (testing) and JetStream (production) - **Event bus** - Local and NATS-backed pub/sub with namespace isolation - **Cluster management** - Node discovery, leader election, shard distribution Built for JetStream from the ground up, not bolted on. ## Installation ```bash go get git.flowmade.one/flowmade-one/aether ``` Requires Go 1.23 or later. ## Quick Start Here is a minimal example showing event sourcing fundamentals: creating events, saving them to a store, and replaying to rebuild state. ```go package main import ( "fmt" "time" "github.com/google/uuid" "git.flowmade.one/flowmade-one/aether" "git.flowmade.one/flowmade-one/aether/store" ) func main() { // Create an in-memory event store (use JetStream for production) eventStore := store.NewInMemoryEventStore() // Create and save events // Error handling omitted for brevity orderID := "order-123" orderPlaced := &aether.Event{ ID: uuid.New().String(), EventType: "OrderPlaced", ActorID: orderID, Version: 1, Data: map[string]interface{}{"total": 99.99, "items": 3}, Timestamp: time.Now(), } eventStore.SaveEvent(orderPlaced) orderShipped := &aether.Event{ ID: uuid.New().String(), EventType: "OrderShipped", ActorID: orderID, Version: 2, Data: map[string]interface{}{"carrier": "FastShip", "tracking": "FS123456"}, Timestamp: time.Now(), } eventStore.SaveEvent(orderShipped) // Replay events to rebuild state events, _ := eventStore.GetEvents(orderID, 0) state := make(map[string]interface{}) for _, event := range events { switch event.EventType { case "OrderPlaced": state["total"] = event.Data["total"] state["items"] = event.Data["items"] state["status"] = "placed" case "OrderShipped": state["status"] = "shipped" state["carrier"] = event.Data["carrier"] state["tracking"] = event.Data["tracking"] } } fmt.Printf("Order state after replaying %d events:\n", len(events)) fmt.Printf(" Status: %s\n", state["status"]) fmt.Printf(" Total: $%.2f\n", state["total"]) fmt.Printf(" Tracking: %s\n", state["tracking"]) } ``` Output: ``` Order state after replaying 2 events: Status: shipped Total: $99.99 Tracking: FS123456 ``` ## Key Concepts ### Events are immutable Events represent facts about what happened. Once saved, they are never modified or deleted - you only append new events. This immutability guarantee is enforced at multiple levels: **Interface Design**: The `EventStore` interface provides no Update or Delete methods. Only `SaveEvent` (append), `GetEvents` (read), and `GetLatestVersion` (read) are available. **JetStream Storage**: When using `JetStreamEventStore`, events are stored in a NATS JetStream stream configured with: - File-based storage (durable) - Limits-based retention policy (events expire after configured duration, not before) - No mechanism to modify or delete individual events during their lifetime **Audit Trail Guarantee**: Because events are immutable once persisted, they serve as a trustworthy audit trail. You can rely on the fact that historical events won't change, enabling compliance and forensics. To correct a mistake, append a new event that expresses the correction rather than modifying history: ```go // Wrong: Cannot update an event // store.UpdateEvent(eventID, newData) // This method doesn't exist // Right: Append a new event that corrects the record correctionEvent := &aether.Event{ ID: uuid.New().String(), EventType: "OrderCorrected", ActorID: orderID, Version: currentVersion + 1, Data: map[string]interface{}{"reason": "price adjustment"}, Timestamp: time.Now(), } err := store.SaveEvent(correctionEvent) ``` ### State is derived Current state is always derived by replaying events. This gives you a complete audit trail and the ability to rebuild state at any point in time. ### Versions ensure consistency Each event for an actor must have a strictly increasing version number. This enables optimistic concurrency control: ```go currentVersion, _ := eventStore.GetLatestVersion(actorID) event := &aether.Event{ ActorID: actorID, Version: currentVersion + 1, // ... } err := eventStore.SaveEvent(event) if errors.Is(err, aether.ErrVersionConflict) { // Another writer saved first - reload and retry } ``` ## Documentation - [Vision](./vision.md) - Product vision and design principles - [CLAUDE.md](./CLAUDE.md) - Development guide and architecture details ## License See [LICENSE](./LICENSE) for details.