Compare commits
2 Commits
main
...
84fe185285
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
84fe185285 | ||
|
|
69da1d800e |
12
README.md
12
README.md
@@ -107,7 +107,17 @@ Order state after replaying 2 events:
|
||||
|
||||
### Events are immutable
|
||||
|
||||
Events represent facts about what happened. Once saved, they are never modified - you only append new events.
|
||||
Events represent immutable facts about what happened in the domain. The EventStore interface is intentionally append-only: it provides no methods to update or delete events. Once persisted, an event can never be modified, deleted, or overwritten.
|
||||
|
||||
This design ensures:
|
||||
- **Audit trail**: Complete, tamper-proof history of all state changes
|
||||
- **Compliance**: Events serve as evidence for regulatory requirements
|
||||
- **Debugging**: Full context of how the system reached its current state
|
||||
- **Analysis**: Rich domain data for business intelligence and analysis
|
||||
|
||||
To correct application state, you append new events (e.g., a "Reversed" or "Corrected" event) rather than modifying existing events. This maintains a complete history showing both the original event and the correction.
|
||||
|
||||
The JetStream backing store uses a retention policy (default: 1 year) to automatically clean up old events, but events are never manually deleted through Aether.
|
||||
|
||||
### State is derived
|
||||
|
||||
|
||||
13
event.go
13
event.go
@@ -176,6 +176,18 @@ type ActorSnapshot struct {
|
||||
|
||||
// EventStore defines the interface for event persistence.
|
||||
//
|
||||
// # Append-Only Design
|
||||
//
|
||||
// EventStore is intentionally designed as append-only: it provides no Update or Delete
|
||||
// methods. Events are immutable facts about what happened in the domain. Once persisted,
|
||||
// an event is permanently recorded and can never be modified, deleted, or overwritten.
|
||||
// This design ensures that the event log serves as a complete, immutable audit trail
|
||||
// that can be trusted for compliance, debugging, and domain analysis.
|
||||
//
|
||||
// To correct application state, you append new events (e.g., a "Reversed" or "Corrected"
|
||||
// event), never by modifying existing events. This maintains a complete history of all
|
||||
// state changes.
|
||||
//
|
||||
// # Version Semantics
|
||||
//
|
||||
// Events for an actor must have monotonically increasing versions. When SaveEvent
|
||||
@@ -196,6 +208,7 @@ type EventStore interface {
|
||||
// SaveEvent persists an event to the store. The event's Version must be
|
||||
// strictly greater than the current latest version for the actor.
|
||||
// Returns VersionConflictError if version <= current latest version.
|
||||
// Events persisted to the store are immutable and can never be modified or deleted.
|
||||
SaveEvent(event *Event) error
|
||||
|
||||
// GetEvents retrieves events for an actor from a specific version (inclusive).
|
||||
|
||||
189
store/immutability_test.go
Normal file
189
store/immutability_test.go
Normal file
@@ -0,0 +1,189 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
)
|
||||
|
||||
// TestEventImmutability verifies that events cannot be modified after being
|
||||
// persisted to the store. This test demonstrates that Aether maintains an
|
||||
// append-only event log that serves as an immutable audit trail.
|
||||
func TestEventImmutability_InMemory(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// Create and save an event
|
||||
originalEvent := &aether.Event{
|
||||
ID: "evt-immutability-123",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-789",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{
|
||||
"total": 99.99,
|
||||
"currency": "USD",
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err := store.SaveEvent(originalEvent)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Retrieve the event from the store
|
||||
events, err := store.GetEvents("order-789", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected 1 event, got %d", len(events))
|
||||
}
|
||||
|
||||
retrievedEvent := events[0]
|
||||
|
||||
// Verify the event has the original data
|
||||
if retrievedEvent.Data["total"].(float64) != 99.99 {
|
||||
t.Errorf("expected total 99.99, got %v", retrievedEvent.Data["total"])
|
||||
}
|
||||
|
||||
// Attempt to modify the retrieved event
|
||||
retrievedEvent.Data["total"] = 199.99
|
||||
retrievedEvent.EventType = "OrderCancelled"
|
||||
retrievedEvent.Data["currency"] = "EUR"
|
||||
|
||||
// Retrieve the event again from the store
|
||||
events, err = store.GetEvents("order-789", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed on second call: %v", err)
|
||||
}
|
||||
|
||||
storedEvent := events[0]
|
||||
|
||||
// Verify that the stored event still has the original data
|
||||
// This confirms that modifying the retrieved event didn't affect the stored event
|
||||
if storedEvent.Data["total"].(float64) != 99.99 {
|
||||
t.Errorf("stored event total was modified: expected 99.99, got %v", storedEvent.Data["total"])
|
||||
}
|
||||
|
||||
if storedEvent.EventType != "OrderPlaced" {
|
||||
t.Errorf("stored event type was modified: expected OrderPlaced, got %s", storedEvent.EventType)
|
||||
}
|
||||
|
||||
if storedEvent.Data["currency"].(string) != "USD" {
|
||||
t.Errorf("stored event currency was modified: expected USD, got %s", storedEvent.Data["currency"])
|
||||
}
|
||||
|
||||
// Additional verification: EventStore has no Update or Delete methods
|
||||
// This is enforced at the type system level by the interface definition.
|
||||
// The EventStore interface only provides:
|
||||
// - SaveEvent (append-only)
|
||||
// - GetEvents (read-only)
|
||||
// - GetLatestVersion (read-only)
|
||||
// There are intentionally no Update, Delete, or Modify methods.
|
||||
}
|
||||
|
||||
// TestEventImmutability_Sequential verifies that events remain immutable
|
||||
// even when multiple events are saved for the same actor.
|
||||
func TestEventImmutability_Sequential(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
actorID := "order-sequential-123"
|
||||
|
||||
// Save multiple events
|
||||
event1 := &aether.Event{
|
||||
ID: "evt-seq-1",
|
||||
EventType: "OrderCreated",
|
||||
ActorID: actorID,
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"status": "new"},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
event2 := &aether.Event{
|
||||
ID: "evt-seq-2",
|
||||
EventType: "OrderProcessed",
|
||||
ActorID: actorID,
|
||||
Version: 2,
|
||||
Data: map[string]interface{}{"status": "processing"},
|
||||
Timestamp: time.Now().Add(time.Second),
|
||||
}
|
||||
|
||||
err := store.SaveEvent(event1)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent(event1) failed: %v", err)
|
||||
}
|
||||
|
||||
err = store.SaveEvent(event2)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent(event2) failed: %v", err)
|
||||
}
|
||||
|
||||
// Retrieve all events
|
||||
events, err := store.GetEvents(actorID, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
|
||||
if len(events) != 2 {
|
||||
t.Fatalf("expected 2 events, got %d", len(events))
|
||||
}
|
||||
|
||||
// Attempt to modify the first event's data
|
||||
events[0].Data["status"] = "cancelled"
|
||||
|
||||
// Retrieve events again and verify the first event is unchanged
|
||||
events, err = store.GetEvents(actorID, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed on second call: %v", err)
|
||||
}
|
||||
|
||||
if events[0].Data["status"].(string) != "new" {
|
||||
t.Errorf("first event was modified: expected status=new, got %v", events[0].Data["status"])
|
||||
}
|
||||
|
||||
if events[1].Data["status"].(string) != "processing" {
|
||||
t.Errorf("second event was modified: expected status=processing, got %v", events[1].Data["status"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestNoDeleteMethod verifies that the EventStore interface has no Delete method.
|
||||
// This is a compile-time check: if Delete were added to the interface,
|
||||
// all implementations would fail to compile until they implemented it.
|
||||
// This test serves as a runtime confirmation that the interface intentionally
|
||||
// omits delete/update operations.
|
||||
func TestNoDeleteMethod(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
// The following would not compile if EventStore had a Delete method:
|
||||
// store.Delete(...)
|
||||
// store.Update(...)
|
||||
// store.Modify(...)
|
||||
|
||||
// This test passes if compilation succeeds, confirming that
|
||||
// the EventStore interface is append-only by design.
|
||||
|
||||
// Verify the interface has exactly 3 methods
|
||||
var iface interface{} = store
|
||||
if _, ok := iface.(aether.EventStore); !ok {
|
||||
t.Fatal("InMemoryEventStore does not implement EventStore")
|
||||
}
|
||||
|
||||
// The EventStore interface should only have SaveEvent, GetEvents, GetLatestVersion
|
||||
// Verify by attempting to call each method
|
||||
event := &aether.Event{
|
||||
ID: "evt-test",
|
||||
EventType: "Test",
|
||||
ActorID: "test-actor",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// These should work
|
||||
store.SaveEvent(event)
|
||||
store.GetEvents("test-actor", 0)
|
||||
store.GetLatestVersion("test-actor")
|
||||
|
||||
// No other methods should exist (compile-time check)
|
||||
}
|
||||
@@ -18,6 +18,38 @@ const (
|
||||
)
|
||||
|
||||
// JetStreamConfig holds configuration options for JetStreamEventStore
|
||||
//
|
||||
// # Stream Retention Policy
|
||||
//
|
||||
// JetStreamEventStore uses a LimitsPolicy retention strategy, which means events are
|
||||
// kept for a specified maximum age (StreamRetention). Once events exceed this age,
|
||||
// they are automatically purged from the stream.
|
||||
//
|
||||
// Default retention is 1 year (365 days). This provides:
|
||||
// - Long-term audit trail for domain events
|
||||
// - Complete history for event replay and analysis
|
||||
// - Automatic cleanup of old events to manage storage costs
|
||||
//
|
||||
// The retention policy is applied when creating the JetStream stream:
|
||||
//
|
||||
// stream := &nats.StreamConfig{
|
||||
// ...
|
||||
// Retention: nats.LimitsPolicy,
|
||||
// MaxAge: config.StreamRetention,
|
||||
// ...
|
||||
// }
|
||||
//
|
||||
// To configure custom retention, pass a JetStreamConfig with your desired StreamRetention:
|
||||
//
|
||||
// config := store.JetStreamConfig{
|
||||
// StreamRetention: 90 * 24 * time.Hour, // Keep events for 90 days
|
||||
// ReplicaCount: 3, // 3 replicas for HA
|
||||
// }
|
||||
// eventStore, err := store.NewJetStreamEventStoreWithConfig(natsConn, "events", config)
|
||||
//
|
||||
// Note: The retention policy only affects automatic cleanup. Aether does not provide
|
||||
// methods to manually delete events - events are immutable once stored and can only
|
||||
// be removed by the stream's retention policy or explicit JetStream administration.
|
||||
type JetStreamConfig struct {
|
||||
// StreamRetention is how long to keep events (default: 1 year)
|
||||
StreamRetention time.Duration
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
@@ -21,9 +22,40 @@ func NewInMemoryEventStore() *InMemoryEventStore {
|
||||
}
|
||||
}
|
||||
|
||||
// deepCopyEvent creates a deep copy of an event to ensure immutability.
|
||||
// This prevents modifications to the event after it's been persisted.
|
||||
// Panics if JSON marshaling/unmarshaling fails, which should never occur
|
||||
// for a valid Event structure.
|
||||
func deepCopyEvent(event *aether.Event) *aether.Event {
|
||||
// Use JSON marshaling/unmarshaling for a complete deep copy
|
||||
// This ensures all nested structures (maps, slices) are copied
|
||||
data, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
panic("failed to marshal event: " + err.Error())
|
||||
}
|
||||
var copy aether.Event
|
||||
err = json.Unmarshal(data, ©)
|
||||
if err != nil {
|
||||
panic("failed to unmarshal event: " + err.Error())
|
||||
}
|
||||
|
||||
// Preserve empty metadata maps (JSON unmarshal converts empty map to nil)
|
||||
if event.Metadata != nil && len(event.Metadata) == 0 && copy.Metadata == nil {
|
||||
copy.Metadata = make(map[string]string)
|
||||
}
|
||||
|
||||
// Preserve empty data maps
|
||||
if event.Data != nil && len(event.Data) == 0 && copy.Data == nil {
|
||||
copy.Data = make(map[string]interface{})
|
||||
}
|
||||
|
||||
return ©
|
||||
}
|
||||
|
||||
// SaveEvent saves an event to the in-memory store.
|
||||
// Returns VersionConflictError if the event's version is not strictly greater
|
||||
// than the current latest version for the actor.
|
||||
// The event is deep-copied before storage to ensure immutability.
|
||||
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||
es.mu.Lock()
|
||||
defer es.mu.Unlock()
|
||||
@@ -50,11 +82,15 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||
if _, exists := es.events[event.ActorID]; !exists {
|
||||
es.events[event.ActorID] = make([]*aether.Event, 0)
|
||||
}
|
||||
es.events[event.ActorID] = append(es.events[event.ActorID], event)
|
||||
|
||||
// Deep copy the event before storing to ensure immutability
|
||||
storedEvent := deepCopyEvent(event)
|
||||
es.events[event.ActorID] = append(es.events[event.ActorID], storedEvent)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetEvents retrieves events for an actor from a specific version
|
||||
// Returns deep copies of events to ensure immutability
|
||||
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||
es.mu.RLock()
|
||||
defer es.mu.RUnlock()
|
||||
@@ -67,7 +103,8 @@ func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*a
|
||||
var filteredEvents []*aether.Event
|
||||
for _, event := range events {
|
||||
if event.Version >= fromVersion {
|
||||
filteredEvents = append(filteredEvents, event)
|
||||
// Return a deep copy to ensure immutability
|
||||
filteredEvents = append(filteredEvents, deepCopyEvent(event))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user