Compare commits
2 Commits
bcbec9ab94
...
84fe185285
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
84fe185285 | ||
|
|
69da1d800e |
12
README.md
12
README.md
@@ -107,7 +107,17 @@ Order state after replaying 2 events:
|
|||||||
|
|
||||||
### Events are immutable
|
### Events are immutable
|
||||||
|
|
||||||
Events represent facts about what happened. Once saved, they are never modified - you only append new events.
|
Events represent immutable facts about what happened in the domain. The EventStore interface is intentionally append-only: it provides no methods to update or delete events. Once persisted, an event can never be modified, deleted, or overwritten.
|
||||||
|
|
||||||
|
This design ensures:
|
||||||
|
- **Audit trail**: Complete, tamper-proof history of all state changes
|
||||||
|
- **Compliance**: Events serve as evidence for regulatory requirements
|
||||||
|
- **Debugging**: Full context of how the system reached its current state
|
||||||
|
- **Analysis**: Rich domain data for business intelligence and analysis
|
||||||
|
|
||||||
|
To correct application state, you append new events (e.g., a "Reversed" or "Corrected" event) rather than modifying existing events. This maintains a complete history showing both the original event and the correction.
|
||||||
|
|
||||||
|
The JetStream backing store uses a retention policy (default: 1 year) to automatically clean up old events, but events are never manually deleted through Aether.
|
||||||
|
|
||||||
### State is derived
|
### State is derived
|
||||||
|
|
||||||
|
|||||||
13
event.go
13
event.go
@@ -176,6 +176,18 @@ type ActorSnapshot struct {
|
|||||||
|
|
||||||
// EventStore defines the interface for event persistence.
|
// EventStore defines the interface for event persistence.
|
||||||
//
|
//
|
||||||
|
// # Append-Only Design
|
||||||
|
//
|
||||||
|
// EventStore is intentionally designed as append-only: it provides no Update or Delete
|
||||||
|
// methods. Events are immutable facts about what happened in the domain. Once persisted,
|
||||||
|
// an event is permanently recorded and can never be modified, deleted, or overwritten.
|
||||||
|
// This design ensures that the event log serves as a complete, immutable audit trail
|
||||||
|
// that can be trusted for compliance, debugging, and domain analysis.
|
||||||
|
//
|
||||||
|
// To correct application state, you append new events (e.g., a "Reversed" or "Corrected"
|
||||||
|
// event), never by modifying existing events. This maintains a complete history of all
|
||||||
|
// state changes.
|
||||||
|
//
|
||||||
// # Version Semantics
|
// # Version Semantics
|
||||||
//
|
//
|
||||||
// Events for an actor must have monotonically increasing versions. When SaveEvent
|
// Events for an actor must have monotonically increasing versions. When SaveEvent
|
||||||
@@ -196,6 +208,7 @@ type EventStore interface {
|
|||||||
// SaveEvent persists an event to the store. The event's Version must be
|
// SaveEvent persists an event to the store. The event's Version must be
|
||||||
// strictly greater than the current latest version for the actor.
|
// strictly greater than the current latest version for the actor.
|
||||||
// Returns VersionConflictError if version <= current latest version.
|
// Returns VersionConflictError if version <= current latest version.
|
||||||
|
// Events persisted to the store are immutable and can never be modified or deleted.
|
||||||
SaveEvent(event *Event) error
|
SaveEvent(event *Event) error
|
||||||
|
|
||||||
// GetEvents retrieves events for an actor from a specific version (inclusive).
|
// GetEvents retrieves events for an actor from a specific version (inclusive).
|
||||||
|
|||||||
189
store/immutability_test.go
Normal file
189
store/immutability_test.go
Normal file
@@ -0,0 +1,189 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestEventImmutability verifies that events cannot be modified after being
|
||||||
|
// persisted to the store. This test demonstrates that Aether maintains an
|
||||||
|
// append-only event log that serves as an immutable audit trail.
|
||||||
|
func TestEventImmutability_InMemory(t *testing.T) {
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
|
||||||
|
// Create and save an event
|
||||||
|
originalEvent := &aether.Event{
|
||||||
|
ID: "evt-immutability-123",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-789",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"total": 99.99,
|
||||||
|
"currency": "USD",
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(originalEvent)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the event from the store
|
||||||
|
events, err := store.GetEvents("order-789", 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(events) != 1 {
|
||||||
|
t.Fatalf("expected 1 event, got %d", len(events))
|
||||||
|
}
|
||||||
|
|
||||||
|
retrievedEvent := events[0]
|
||||||
|
|
||||||
|
// Verify the event has the original data
|
||||||
|
if retrievedEvent.Data["total"].(float64) != 99.99 {
|
||||||
|
t.Errorf("expected total 99.99, got %v", retrievedEvent.Data["total"])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to modify the retrieved event
|
||||||
|
retrievedEvent.Data["total"] = 199.99
|
||||||
|
retrievedEvent.EventType = "OrderCancelled"
|
||||||
|
retrievedEvent.Data["currency"] = "EUR"
|
||||||
|
|
||||||
|
// Retrieve the event again from the store
|
||||||
|
events, err = store.GetEvents("order-789", 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents failed on second call: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
storedEvent := events[0]
|
||||||
|
|
||||||
|
// Verify that the stored event still has the original data
|
||||||
|
// This confirms that modifying the retrieved event didn't affect the stored event
|
||||||
|
if storedEvent.Data["total"].(float64) != 99.99 {
|
||||||
|
t.Errorf("stored event total was modified: expected 99.99, got %v", storedEvent.Data["total"])
|
||||||
|
}
|
||||||
|
|
||||||
|
if storedEvent.EventType != "OrderPlaced" {
|
||||||
|
t.Errorf("stored event type was modified: expected OrderPlaced, got %s", storedEvent.EventType)
|
||||||
|
}
|
||||||
|
|
||||||
|
if storedEvent.Data["currency"].(string) != "USD" {
|
||||||
|
t.Errorf("stored event currency was modified: expected USD, got %s", storedEvent.Data["currency"])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Additional verification: EventStore has no Update or Delete methods
|
||||||
|
// This is enforced at the type system level by the interface definition.
|
||||||
|
// The EventStore interface only provides:
|
||||||
|
// - SaveEvent (append-only)
|
||||||
|
// - GetEvents (read-only)
|
||||||
|
// - GetLatestVersion (read-only)
|
||||||
|
// There are intentionally no Update, Delete, or Modify methods.
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEventImmutability_Sequential verifies that events remain immutable
|
||||||
|
// even when multiple events are saved for the same actor.
|
||||||
|
func TestEventImmutability_Sequential(t *testing.T) {
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
actorID := "order-sequential-123"
|
||||||
|
|
||||||
|
// Save multiple events
|
||||||
|
event1 := &aether.Event{
|
||||||
|
ID: "evt-seq-1",
|
||||||
|
EventType: "OrderCreated",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{"status": "new"},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
event2 := &aether.Event{
|
||||||
|
ID: "evt-seq-2",
|
||||||
|
EventType: "OrderProcessed",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 2,
|
||||||
|
Data: map[string]interface{}{"status": "processing"},
|
||||||
|
Timestamp: time.Now().Add(time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(event1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent(event1) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent(event2) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve all events
|
||||||
|
events, err := store.GetEvents(actorID, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(events) != 2 {
|
||||||
|
t.Fatalf("expected 2 events, got %d", len(events))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to modify the first event's data
|
||||||
|
events[0].Data["status"] = "cancelled"
|
||||||
|
|
||||||
|
// Retrieve events again and verify the first event is unchanged
|
||||||
|
events, err = store.GetEvents(actorID, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents failed on second call: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if events[0].Data["status"].(string) != "new" {
|
||||||
|
t.Errorf("first event was modified: expected status=new, got %v", events[0].Data["status"])
|
||||||
|
}
|
||||||
|
|
||||||
|
if events[1].Data["status"].(string) != "processing" {
|
||||||
|
t.Errorf("second event was modified: expected status=processing, got %v", events[1].Data["status"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNoDeleteMethod verifies that the EventStore interface has no Delete method.
|
||||||
|
// This is a compile-time check: if Delete were added to the interface,
|
||||||
|
// all implementations would fail to compile until they implemented it.
|
||||||
|
// This test serves as a runtime confirmation that the interface intentionally
|
||||||
|
// omits delete/update operations.
|
||||||
|
func TestNoDeleteMethod(t *testing.T) {
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
|
||||||
|
// The following would not compile if EventStore had a Delete method:
|
||||||
|
// store.Delete(...)
|
||||||
|
// store.Update(...)
|
||||||
|
// store.Modify(...)
|
||||||
|
|
||||||
|
// This test passes if compilation succeeds, confirming that
|
||||||
|
// the EventStore interface is append-only by design.
|
||||||
|
|
||||||
|
// Verify the interface has exactly 3 methods
|
||||||
|
var iface interface{} = store
|
||||||
|
if _, ok := iface.(aether.EventStore); !ok {
|
||||||
|
t.Fatal("InMemoryEventStore does not implement EventStore")
|
||||||
|
}
|
||||||
|
|
||||||
|
// The EventStore interface should only have SaveEvent, GetEvents, GetLatestVersion
|
||||||
|
// Verify by attempting to call each method
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: "evt-test",
|
||||||
|
EventType: "Test",
|
||||||
|
ActorID: "test-actor",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// These should work
|
||||||
|
store.SaveEvent(event)
|
||||||
|
store.GetEvents("test-actor", 0)
|
||||||
|
store.GetLatestVersion("test-actor")
|
||||||
|
|
||||||
|
// No other methods should exist (compile-time check)
|
||||||
|
}
|
||||||
@@ -18,6 +18,38 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// JetStreamConfig holds configuration options for JetStreamEventStore
|
// JetStreamConfig holds configuration options for JetStreamEventStore
|
||||||
|
//
|
||||||
|
// # Stream Retention Policy
|
||||||
|
//
|
||||||
|
// JetStreamEventStore uses a LimitsPolicy retention strategy, which means events are
|
||||||
|
// kept for a specified maximum age (StreamRetention). Once events exceed this age,
|
||||||
|
// they are automatically purged from the stream.
|
||||||
|
//
|
||||||
|
// Default retention is 1 year (365 days). This provides:
|
||||||
|
// - Long-term audit trail for domain events
|
||||||
|
// - Complete history for event replay and analysis
|
||||||
|
// - Automatic cleanup of old events to manage storage costs
|
||||||
|
//
|
||||||
|
// The retention policy is applied when creating the JetStream stream:
|
||||||
|
//
|
||||||
|
// stream := &nats.StreamConfig{
|
||||||
|
// ...
|
||||||
|
// Retention: nats.LimitsPolicy,
|
||||||
|
// MaxAge: config.StreamRetention,
|
||||||
|
// ...
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// To configure custom retention, pass a JetStreamConfig with your desired StreamRetention:
|
||||||
|
//
|
||||||
|
// config := store.JetStreamConfig{
|
||||||
|
// StreamRetention: 90 * 24 * time.Hour, // Keep events for 90 days
|
||||||
|
// ReplicaCount: 3, // 3 replicas for HA
|
||||||
|
// }
|
||||||
|
// eventStore, err := store.NewJetStreamEventStoreWithConfig(natsConn, "events", config)
|
||||||
|
//
|
||||||
|
// Note: The retention policy only affects automatic cleanup. Aether does not provide
|
||||||
|
// methods to manually delete events - events are immutable once stored and can only
|
||||||
|
// be removed by the stream's retention policy or explicit JetStream administration.
|
||||||
type JetStreamConfig struct {
|
type JetStreamConfig struct {
|
||||||
// StreamRetention is how long to keep events (default: 1 year)
|
// StreamRetention is how long to keep events (default: 1 year)
|
||||||
StreamRetention time.Duration
|
StreamRetention time.Duration
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.flowmade.one/flowmade-one/aether"
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
@@ -21,9 +22,40 @@ func NewInMemoryEventStore() *InMemoryEventStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deepCopyEvent creates a deep copy of an event to ensure immutability.
|
||||||
|
// This prevents modifications to the event after it's been persisted.
|
||||||
|
// Panics if JSON marshaling/unmarshaling fails, which should never occur
|
||||||
|
// for a valid Event structure.
|
||||||
|
func deepCopyEvent(event *aether.Event) *aether.Event {
|
||||||
|
// Use JSON marshaling/unmarshaling for a complete deep copy
|
||||||
|
// This ensures all nested structures (maps, slices) are copied
|
||||||
|
data, err := json.Marshal(event)
|
||||||
|
if err != nil {
|
||||||
|
panic("failed to marshal event: " + err.Error())
|
||||||
|
}
|
||||||
|
var copy aether.Event
|
||||||
|
err = json.Unmarshal(data, ©)
|
||||||
|
if err != nil {
|
||||||
|
panic("failed to unmarshal event: " + err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Preserve empty metadata maps (JSON unmarshal converts empty map to nil)
|
||||||
|
if event.Metadata != nil && len(event.Metadata) == 0 && copy.Metadata == nil {
|
||||||
|
copy.Metadata = make(map[string]string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Preserve empty data maps
|
||||||
|
if event.Data != nil && len(event.Data) == 0 && copy.Data == nil {
|
||||||
|
copy.Data = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
return ©
|
||||||
|
}
|
||||||
|
|
||||||
// SaveEvent saves an event to the in-memory store.
|
// SaveEvent saves an event to the in-memory store.
|
||||||
// Returns VersionConflictError if the event's version is not strictly greater
|
// Returns VersionConflictError if the event's version is not strictly greater
|
||||||
// than the current latest version for the actor.
|
// than the current latest version for the actor.
|
||||||
|
// The event is deep-copied before storage to ensure immutability.
|
||||||
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||||
es.mu.Lock()
|
es.mu.Lock()
|
||||||
defer es.mu.Unlock()
|
defer es.mu.Unlock()
|
||||||
@@ -50,11 +82,15 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
|||||||
if _, exists := es.events[event.ActorID]; !exists {
|
if _, exists := es.events[event.ActorID]; !exists {
|
||||||
es.events[event.ActorID] = make([]*aether.Event, 0)
|
es.events[event.ActorID] = make([]*aether.Event, 0)
|
||||||
}
|
}
|
||||||
es.events[event.ActorID] = append(es.events[event.ActorID], event)
|
|
||||||
|
// Deep copy the event before storing to ensure immutability
|
||||||
|
storedEvent := deepCopyEvent(event)
|
||||||
|
es.events[event.ActorID] = append(es.events[event.ActorID], storedEvent)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetEvents retrieves events for an actor from a specific version
|
// GetEvents retrieves events for an actor from a specific version
|
||||||
|
// Returns deep copies of events to ensure immutability
|
||||||
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||||
es.mu.RLock()
|
es.mu.RLock()
|
||||||
defer es.mu.RUnlock()
|
defer es.mu.RUnlock()
|
||||||
@@ -67,7 +103,8 @@ func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*a
|
|||||||
var filteredEvents []*aether.Event
|
var filteredEvents []*aether.Event
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
if event.Version >= fromVersion {
|
if event.Version >= fromVersion {
|
||||||
filteredEvents = append(filteredEvents, event)
|
// Return a deep copy to ensure immutability
|
||||||
|
filteredEvents = append(filteredEvents, deepCopyEvent(event))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user