Files
aether/event.go
Claude Code 69da1d800e
Some checks failed
CI / build (pull_request) Successful in 21s
CI / integration (pull_request) Failing after 2m0s
docs: Verify and document append-only event guarantees
This commit addresses issue #60 by documenting and enforcing the immutability
guarantees of the event store:

- Document that EventStore interface is append-only by design (no Update/Delete methods)
- Document the immutable nature of events once persisted as an audit trail
- Add JetStream stream retention policy configuration documentation
- Add comprehensive immutability test (TestEventImmutability_InMemory, TestEventImmutability_Sequential)
- Enhance InMemoryEventStore to deep-copy events, preventing accidental mutations
- Update README with detailed immutability guarantees and audit trail benefits

The EventStore interface intentionally provides no methods to modify or delete
events. Once persisted, events are immutable facts that serve as a tamper-proof
audit trail. This design ensures compliance, debugging, and historical analysis.

Acceptance criteria met:
- EventStore interface documented as append-only (event.go)
- JetStream retention policy configuration documented (store/jetstream.go)
- Test verifying events cannot be modified after persistence (store/immutability_test.go)
- README documents immutability guarantees (README.md)

Closes #60

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 21:26:16 +01:00

241 lines
8.6 KiB
Go

package aether
import (
"errors"
"fmt"
"time"
)
// ErrVersionConflict is returned when attempting to save an event with a version
// that is not strictly greater than the current latest version for an actor.
// This ensures events have monotonically increasing versions per actor.
var ErrVersionConflict = errors.New("version conflict")
// VersionConflictError provides details about a version conflict.
// It is returned when SaveEvent is called with a version <= the current latest version.
type VersionConflictError struct {
ActorID string
AttemptedVersion int64
CurrentVersion int64
}
func (e *VersionConflictError) Error() string {
return fmt.Sprintf("%s: actor %q has version %d, cannot save version %d",
ErrVersionConflict, e.ActorID, e.CurrentVersion, e.AttemptedVersion)
}
func (e *VersionConflictError) Unwrap() error {
return ErrVersionConflict
}
// ReplayError captures information about a malformed event encountered during replay.
// This allows callers to inspect and handle corrupted data without losing context.
type ReplayError struct {
// SequenceNumber is the sequence number of the message in the stream (if available)
SequenceNumber uint64
// RawData is the raw bytes that could not be unmarshaled
RawData []byte
// Err is the underlying unmarshal error
Err error
}
func (e *ReplayError) Error() string {
return fmt.Sprintf("failed to unmarshal event at sequence %d: %v", e.SequenceNumber, e.Err)
}
func (e *ReplayError) Unwrap() error {
return e.Err
}
// ReplayResult contains the results of replaying events, including any errors encountered.
// This allows callers to decide how to handle malformed events rather than silently skipping them.
type ReplayResult struct {
// Events contains the successfully unmarshaled events
Events []*Event
// Errors contains information about any malformed events encountered
Errors []ReplayError
}
// HasErrors returns true if any malformed events were encountered during replay
func (r *ReplayResult) HasErrors() bool {
return len(r.Errors) > 0
}
// Event represents a domain event in the system
type Event struct {
ID string `json:"id"`
EventType string `json:"eventType"`
ActorID string `json:"actorId"`
CommandID string `json:"commandId,omitempty"` // Correlation ID for command that triggered this event
Version int64 `json:"version"`
Data map[string]interface{} `json:"data"`
Metadata map[string]string `json:"metadata,omitempty"` // Optional metadata for tracing and auditing
Timestamp time.Time `json:"timestamp"`
}
// Common metadata keys for distributed tracing and auditing
const (
// MetadataKeyCorrelationID identifies related events across services
MetadataKeyCorrelationID = "correlationId"
// MetadataKeyCausationID identifies the event that caused this event
MetadataKeyCausationID = "causationId"
// MetadataKeyUserID identifies the user who triggered this event
MetadataKeyUserID = "userId"
// MetadataKeyTraceID for distributed tracing integration (e.g., OpenTelemetry)
MetadataKeyTraceID = "traceId"
// MetadataKeySpanID for distributed tracing integration
MetadataKeySpanID = "spanId"
)
// SetMetadata sets a metadata key-value pair, initializing the map if needed
func (e *Event) SetMetadata(key, value string) {
if e.Metadata == nil {
e.Metadata = make(map[string]string)
}
e.Metadata[key] = value
}
// GetMetadata returns the value for a metadata key, or empty string if not found
func (e *Event) GetMetadata(key string) string {
if e.Metadata == nil {
return ""
}
return e.Metadata[key]
}
// SetCorrelationID sets the correlation ID metadata
func (e *Event) SetCorrelationID(correlationID string) {
e.SetMetadata(MetadataKeyCorrelationID, correlationID)
}
// GetCorrelationID returns the correlation ID metadata
func (e *Event) GetCorrelationID() string {
return e.GetMetadata(MetadataKeyCorrelationID)
}
// SetCausationID sets the causation ID metadata
func (e *Event) SetCausationID(causationID string) {
e.SetMetadata(MetadataKeyCausationID, causationID)
}
// GetCausationID returns the causation ID metadata
func (e *Event) GetCausationID() string {
return e.GetMetadata(MetadataKeyCausationID)
}
// SetUserID sets the user ID metadata
func (e *Event) SetUserID(userID string) {
e.SetMetadata(MetadataKeyUserID, userID)
}
// GetUserID returns the user ID metadata
func (e *Event) GetUserID() string {
return e.GetMetadata(MetadataKeyUserID)
}
// SetTraceID sets the trace ID metadata for distributed tracing
func (e *Event) SetTraceID(traceID string) {
e.SetMetadata(MetadataKeyTraceID, traceID)
}
// GetTraceID returns the trace ID metadata
func (e *Event) GetTraceID() string {
return e.GetMetadata(MetadataKeyTraceID)
}
// SetSpanID sets the span ID metadata for distributed tracing
func (e *Event) SetSpanID(spanID string) {
e.SetMetadata(MetadataKeySpanID, spanID)
}
// GetSpanID returns the span ID metadata
func (e *Event) GetSpanID() string {
return e.GetMetadata(MetadataKeySpanID)
}
// WithMetadataFrom copies metadata from another event (useful for event chaining)
func (e *Event) WithMetadataFrom(source *Event) {
if source == nil || source.Metadata == nil {
return
}
if e.Metadata == nil {
e.Metadata = make(map[string]string)
}
for k, v := range source.Metadata {
e.Metadata[k] = v
}
}
// ActorSnapshot represents a point-in-time state snapshot
type ActorSnapshot struct {
ActorID string `json:"actorId"`
Version int64 `json:"version"`
State map[string]interface{} `json:"state"`
Timestamp time.Time `json:"timestamp"`
}
// EventStore defines the interface for event persistence.
//
// # Append-Only Design
//
// EventStore is intentionally designed as append-only: it provides no Update or Delete
// methods. Events are immutable facts about what happened in the domain. Once persisted,
// an event is permanently recorded and can never be modified, deleted, or overwritten.
// This design ensures that the event log serves as a complete, immutable audit trail
// that can be trusted for compliance, debugging, and domain analysis.
//
// To correct application state, you append new events (e.g., a "Reversed" or "Corrected"
// event), never by modifying existing events. This maintains a complete history of all
// state changes.
//
// # Version Semantics
//
// Events for an actor must have monotonically increasing versions. When SaveEvent
// is called, the implementation must validate that the event's version is strictly
// greater than the current latest version for that actor. If the version is less
// than or equal to the current version, SaveEvent must return a VersionConflictError
// (which wraps ErrVersionConflict).
//
// This validation ensures event stream integrity and enables optimistic concurrency
// control. Clients should:
// 1. Call GetLatestVersion to get the current version for an actor
// 2. Set the new event's version to currentVersion + 1
// 3. Call SaveEvent - if it returns ErrVersionConflict, another writer won
// 4. On conflict, reload the latest version and retry if appropriate
//
// For new actors (no existing events), version 1 is expected for the first event.
type EventStore interface {
// SaveEvent persists an event to the store. The event's Version must be
// strictly greater than the current latest version for the actor.
// Returns VersionConflictError if version <= current latest version.
// Events persisted to the store are immutable and can never be modified or deleted.
SaveEvent(event *Event) error
// GetEvents retrieves events for an actor from a specific version (inclusive).
// Returns an empty slice if no events exist for the actor.
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
// GetLatestVersion returns the latest version for an actor.
// Returns 0 if no events exist for the actor.
GetLatestVersion(actorID string) (int64, error)
}
// EventStoreWithErrors extends EventStore with methods that report malformed events.
// Stores that may encounter corrupted data during replay (e.g., JetStream) should
// implement this interface to give callers visibility into data quality issues.
type EventStoreWithErrors interface {
EventStore
// GetEventsWithErrors retrieves events for an actor and reports any malformed
// events encountered. This method allows callers to decide how to handle
// corrupted data rather than silently skipping it.
GetEventsWithErrors(actorID string, fromVersion int64) (*ReplayResult, error)
}
// SnapshotStore extends EventStore with snapshot capabilities
type SnapshotStore interface {
EventStore
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
SaveSnapshot(snapshot *ActorSnapshot) error
}