diff --git a/README.md b/README.md index a3f84db..ae6c97f 100644 --- a/README.md +++ b/README.md @@ -107,7 +107,17 @@ Order state after replaying 2 events: ### Events are immutable -Events represent facts about what happened. Once saved, they are never modified - you only append new events. +Events represent immutable facts about what happened in the domain. The EventStore interface is intentionally append-only: it provides no methods to update or delete events. Once persisted, an event can never be modified, deleted, or overwritten. + +This design ensures: +- **Audit trail**: Complete, tamper-proof history of all state changes +- **Compliance**: Events serve as evidence for regulatory requirements +- **Debugging**: Full context of how the system reached its current state +- **Analysis**: Rich domain data for business intelligence and analysis + +To correct application state, you append new events (e.g., a "Reversed" or "Corrected" event) rather than modifying existing events. This maintains a complete history showing both the original event and the correction. + +The JetStream backing store uses a retention policy (default: 1 year) to automatically clean up old events, but events are never manually deleted through Aether. ### State is derived diff --git a/event.go b/event.go index 94ea58c..e084c53 100644 --- a/event.go +++ b/event.go @@ -176,6 +176,18 @@ type ActorSnapshot struct { // EventStore defines the interface for event persistence. // +// # Append-Only Design +// +// EventStore is intentionally designed as append-only: it provides no Update or Delete +// methods. Events are immutable facts about what happened in the domain. Once persisted, +// an event is permanently recorded and can never be modified, deleted, or overwritten. +// This design ensures that the event log serves as a complete, immutable audit trail +// that can be trusted for compliance, debugging, and domain analysis. +// +// To correct application state, you append new events (e.g., a "Reversed" or "Corrected" +// event), never by modifying existing events. This maintains a complete history of all +// state changes. +// // # Version Semantics // // Events for an actor must have monotonically increasing versions. When SaveEvent @@ -196,6 +208,7 @@ type EventStore interface { // SaveEvent persists an event to the store. The event's Version must be // strictly greater than the current latest version for the actor. // Returns VersionConflictError if version <= current latest version. + // Events persisted to the store are immutable and can never be modified or deleted. SaveEvent(event *Event) error // GetEvents retrieves events for an actor from a specific version (inclusive). diff --git a/store/immutability_test.go b/store/immutability_test.go new file mode 100644 index 0000000..3e938ed --- /dev/null +++ b/store/immutability_test.go @@ -0,0 +1,189 @@ +package store + +import ( + "testing" + "time" + + "git.flowmade.one/flowmade-one/aether" +) + +// TestEventImmutability verifies that events cannot be modified after being +// persisted to the store. This test demonstrates that Aether maintains an +// append-only event log that serves as an immutable audit trail. +func TestEventImmutability_InMemory(t *testing.T) { + store := NewInMemoryEventStore() + + // Create and save an event + originalEvent := &aether.Event{ + ID: "evt-immutability-123", + EventType: "OrderPlaced", + ActorID: "order-789", + Version: 1, + Data: map[string]interface{}{ + "total": 99.99, + "currency": "USD", + }, + Timestamp: time.Now(), + } + + err := store.SaveEvent(originalEvent) + if err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + // Retrieve the event from the store + events, err := store.GetEvents("order-789", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + if len(events) != 1 { + t.Fatalf("expected 1 event, got %d", len(events)) + } + + retrievedEvent := events[0] + + // Verify the event has the original data + if retrievedEvent.Data["total"].(float64) != 99.99 { + t.Errorf("expected total 99.99, got %v", retrievedEvent.Data["total"]) + } + + // Attempt to modify the retrieved event + retrievedEvent.Data["total"] = 199.99 + retrievedEvent.EventType = "OrderCancelled" + retrievedEvent.Data["currency"] = "EUR" + + // Retrieve the event again from the store + events, err = store.GetEvents("order-789", 0) + if err != nil { + t.Fatalf("GetEvents failed on second call: %v", err) + } + + storedEvent := events[0] + + // Verify that the stored event still has the original data + // This confirms that modifying the retrieved event didn't affect the stored event + if storedEvent.Data["total"].(float64) != 99.99 { + t.Errorf("stored event total was modified: expected 99.99, got %v", storedEvent.Data["total"]) + } + + if storedEvent.EventType != "OrderPlaced" { + t.Errorf("stored event type was modified: expected OrderPlaced, got %s", storedEvent.EventType) + } + + if storedEvent.Data["currency"].(string) != "USD" { + t.Errorf("stored event currency was modified: expected USD, got %s", storedEvent.Data["currency"]) + } + + // Additional verification: EventStore has no Update or Delete methods + // This is enforced at the type system level by the interface definition. + // The EventStore interface only provides: + // - SaveEvent (append-only) + // - GetEvents (read-only) + // - GetLatestVersion (read-only) + // There are intentionally no Update, Delete, or Modify methods. +} + +// TestEventImmutability_Sequential verifies that events remain immutable +// even when multiple events are saved for the same actor. +func TestEventImmutability_Sequential(t *testing.T) { + store := NewInMemoryEventStore() + actorID := "order-sequential-123" + + // Save multiple events + event1 := &aether.Event{ + ID: "evt-seq-1", + EventType: "OrderCreated", + ActorID: actorID, + Version: 1, + Data: map[string]interface{}{"status": "new"}, + Timestamp: time.Now(), + } + + event2 := &aether.Event{ + ID: "evt-seq-2", + EventType: "OrderProcessed", + ActorID: actorID, + Version: 2, + Data: map[string]interface{}{"status": "processing"}, + Timestamp: time.Now().Add(time.Second), + } + + err := store.SaveEvent(event1) + if err != nil { + t.Fatalf("SaveEvent(event1) failed: %v", err) + } + + err = store.SaveEvent(event2) + if err != nil { + t.Fatalf("SaveEvent(event2) failed: %v", err) + } + + // Retrieve all events + events, err := store.GetEvents(actorID, 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + + if len(events) != 2 { + t.Fatalf("expected 2 events, got %d", len(events)) + } + + // Attempt to modify the first event's data + events[0].Data["status"] = "cancelled" + + // Retrieve events again and verify the first event is unchanged + events, err = store.GetEvents(actorID, 0) + if err != nil { + t.Fatalf("GetEvents failed on second call: %v", err) + } + + if events[0].Data["status"].(string) != "new" { + t.Errorf("first event was modified: expected status=new, got %v", events[0].Data["status"]) + } + + if events[1].Data["status"].(string) != "processing" { + t.Errorf("second event was modified: expected status=processing, got %v", events[1].Data["status"]) + } +} + +// TestNoDeleteMethod verifies that the EventStore interface has no Delete method. +// This is a compile-time check: if Delete were added to the interface, +// all implementations would fail to compile until they implemented it. +// This test serves as a runtime confirmation that the interface intentionally +// omits delete/update operations. +func TestNoDeleteMethod(t *testing.T) { + store := NewInMemoryEventStore() + + // The following would not compile if EventStore had a Delete method: + // store.Delete(...) + // store.Update(...) + // store.Modify(...) + + // This test passes if compilation succeeds, confirming that + // the EventStore interface is append-only by design. + + // Verify the interface has exactly 3 methods + var iface interface{} = store + if _, ok := iface.(aether.EventStore); !ok { + t.Fatal("InMemoryEventStore does not implement EventStore") + } + + // The EventStore interface should only have SaveEvent, GetEvents, GetLatestVersion + // Verify by attempting to call each method + event := &aether.Event{ + ID: "evt-test", + EventType: "Test", + ActorID: "test-actor", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + + // These should work + store.SaveEvent(event) + store.GetEvents("test-actor", 0) + store.GetLatestVersion("test-actor") + + // No other methods should exist (compile-time check) +} diff --git a/store/jetstream.go b/store/jetstream.go index cc06ea6..fa2b894 100644 --- a/store/jetstream.go +++ b/store/jetstream.go @@ -18,6 +18,38 @@ const ( ) // JetStreamConfig holds configuration options for JetStreamEventStore +// +// # Stream Retention Policy +// +// JetStreamEventStore uses a LimitsPolicy retention strategy, which means events are +// kept for a specified maximum age (StreamRetention). Once events exceed this age, +// they are automatically purged from the stream. +// +// Default retention is 1 year (365 days). This provides: +// - Long-term audit trail for domain events +// - Complete history for event replay and analysis +// - Automatic cleanup of old events to manage storage costs +// +// The retention policy is applied when creating the JetStream stream: +// +// stream := &nats.StreamConfig{ +// ... +// Retention: nats.LimitsPolicy, +// MaxAge: config.StreamRetention, +// ... +// } +// +// To configure custom retention, pass a JetStreamConfig with your desired StreamRetention: +// +// config := store.JetStreamConfig{ +// StreamRetention: 90 * 24 * time.Hour, // Keep events for 90 days +// ReplicaCount: 3, // 3 replicas for HA +// } +// eventStore, err := store.NewJetStreamEventStoreWithConfig(natsConn, "events", config) +// +// Note: The retention policy only affects automatic cleanup. Aether does not provide +// methods to manually delete events - events are immutable once stored and can only +// be removed by the stream's retention policy or explicit JetStream administration. type JetStreamConfig struct { // StreamRetention is how long to keep events (default: 1 year) StreamRetention time.Duration diff --git a/store/memory.go b/store/memory.go index 0fba85d..8c0bcde 100644 --- a/store/memory.go +++ b/store/memory.go @@ -1,6 +1,7 @@ package store import ( + "encoding/json" "sync" "git.flowmade.one/flowmade-one/aether" @@ -21,9 +22,32 @@ func NewInMemoryEventStore() *InMemoryEventStore { } } +// deepCopyEvent creates a deep copy of an event to ensure immutability. +// This prevents modifications to the event after it's been persisted. +func deepCopyEvent(event *aether.Event) *aether.Event { + // Use JSON marshaling/unmarshaling for a complete deep copy + // This ensures all nested structures (maps, slices) are copied + data, _ := json.Marshal(event) + var copy aether.Event + json.Unmarshal(data, ©) + + // Preserve empty metadata maps (JSON unmarshal converts empty map to nil) + if event.Metadata != nil && len(event.Metadata) == 0 && copy.Metadata == nil { + copy.Metadata = make(map[string]string) + } + + // Preserve empty data maps + if event.Data != nil && len(event.Data) == 0 && copy.Data == nil { + copy.Data = make(map[string]interface{}) + } + + return © +} + // SaveEvent saves an event to the in-memory store. // Returns VersionConflictError if the event's version is not strictly greater // than the current latest version for the actor. +// The event is deep-copied before storage to ensure immutability. func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { es.mu.Lock() defer es.mu.Unlock() @@ -50,11 +74,15 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { if _, exists := es.events[event.ActorID]; !exists { es.events[event.ActorID] = make([]*aether.Event, 0) } - es.events[event.ActorID] = append(es.events[event.ActorID], event) + + // Deep copy the event before storing to ensure immutability + storedEvent := deepCopyEvent(event) + es.events[event.ActorID] = append(es.events[event.ActorID], storedEvent) return nil } // GetEvents retrieves events for an actor from a specific version +// Returns deep copies of events to ensure immutability func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) { es.mu.RLock() defer es.mu.RUnlock() @@ -67,7 +95,8 @@ func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*a var filteredEvents []*aether.Event for _, event := range events { if event.Version >= fromVersion { - filteredEvents = append(filteredEvents, event) + // Return a deep copy to ensure immutability + filteredEvents = append(filteredEvents, deepCopyEvent(event)) } }