All checks were successful
CI / build (push) Successful in 15s
- Add Metadata field (map[string]string) to Event struct with omitempty - Add helper methods for common metadata: SetCorrelationID/GetCorrelationID, SetCausationID/GetCausationID, SetUserID/GetUserID, SetTraceID/GetTraceID, SetSpanID/GetSpanID - Add WithMetadataFrom helper for copying metadata between events - Add metadata key constants for standard fields - Add comprehensive unit tests for metadata serialization and helpers - Add store tests verifying metadata persistence Closes #7 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
183 lines
6.2 KiB
Go
183 lines
6.2 KiB
Go
package aether
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// ErrVersionConflict is returned when attempting to save an event with a version
|
|
// that is not strictly greater than the current latest version for an actor.
|
|
// This ensures events have monotonically increasing versions per actor.
|
|
var ErrVersionConflict = errors.New("version conflict")
|
|
|
|
// VersionConflictError provides details about a version conflict.
|
|
// It is returned when SaveEvent is called with a version <= the current latest version.
|
|
type VersionConflictError struct {
|
|
ActorID string
|
|
AttemptedVersion int64
|
|
CurrentVersion int64
|
|
}
|
|
|
|
func (e *VersionConflictError) Error() string {
|
|
return fmt.Sprintf("%s: actor %q has version %d, cannot save version %d",
|
|
ErrVersionConflict, e.ActorID, e.CurrentVersion, e.AttemptedVersion)
|
|
}
|
|
|
|
func (e *VersionConflictError) Unwrap() error {
|
|
return ErrVersionConflict
|
|
}
|
|
|
|
// Event represents a domain event in the system
|
|
type Event struct {
|
|
ID string `json:"id"`
|
|
EventType string `json:"eventType"`
|
|
ActorID string `json:"actorId"`
|
|
CommandID string `json:"commandId,omitempty"` // Correlation ID for command that triggered this event
|
|
Version int64 `json:"version"`
|
|
Data map[string]interface{} `json:"data"`
|
|
Metadata map[string]string `json:"metadata,omitempty"` // Optional metadata for tracing and auditing
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
// Common metadata keys for distributed tracing and auditing
|
|
const (
|
|
// MetadataKeyCorrelationID identifies related events across services
|
|
MetadataKeyCorrelationID = "correlationId"
|
|
// MetadataKeyCausationID identifies the event that caused this event
|
|
MetadataKeyCausationID = "causationId"
|
|
// MetadataKeyUserID identifies the user who triggered this event
|
|
MetadataKeyUserID = "userId"
|
|
// MetadataKeyTraceID for distributed tracing integration (e.g., OpenTelemetry)
|
|
MetadataKeyTraceID = "traceId"
|
|
// MetadataKeySpanID for distributed tracing integration
|
|
MetadataKeySpanID = "spanId"
|
|
)
|
|
|
|
// SetMetadata sets a metadata key-value pair, initializing the map if needed
|
|
func (e *Event) SetMetadata(key, value string) {
|
|
if e.Metadata == nil {
|
|
e.Metadata = make(map[string]string)
|
|
}
|
|
e.Metadata[key] = value
|
|
}
|
|
|
|
// GetMetadata returns the value for a metadata key, or empty string if not found
|
|
func (e *Event) GetMetadata(key string) string {
|
|
if e.Metadata == nil {
|
|
return ""
|
|
}
|
|
return e.Metadata[key]
|
|
}
|
|
|
|
// SetCorrelationID sets the correlation ID metadata
|
|
func (e *Event) SetCorrelationID(correlationID string) {
|
|
e.SetMetadata(MetadataKeyCorrelationID, correlationID)
|
|
}
|
|
|
|
// GetCorrelationID returns the correlation ID metadata
|
|
func (e *Event) GetCorrelationID() string {
|
|
return e.GetMetadata(MetadataKeyCorrelationID)
|
|
}
|
|
|
|
// SetCausationID sets the causation ID metadata
|
|
func (e *Event) SetCausationID(causationID string) {
|
|
e.SetMetadata(MetadataKeyCausationID, causationID)
|
|
}
|
|
|
|
// GetCausationID returns the causation ID metadata
|
|
func (e *Event) GetCausationID() string {
|
|
return e.GetMetadata(MetadataKeyCausationID)
|
|
}
|
|
|
|
// SetUserID sets the user ID metadata
|
|
func (e *Event) SetUserID(userID string) {
|
|
e.SetMetadata(MetadataKeyUserID, userID)
|
|
}
|
|
|
|
// GetUserID returns the user ID metadata
|
|
func (e *Event) GetUserID() string {
|
|
return e.GetMetadata(MetadataKeyUserID)
|
|
}
|
|
|
|
// SetTraceID sets the trace ID metadata for distributed tracing
|
|
func (e *Event) SetTraceID(traceID string) {
|
|
e.SetMetadata(MetadataKeyTraceID, traceID)
|
|
}
|
|
|
|
// GetTraceID returns the trace ID metadata
|
|
func (e *Event) GetTraceID() string {
|
|
return e.GetMetadata(MetadataKeyTraceID)
|
|
}
|
|
|
|
// SetSpanID sets the span ID metadata for distributed tracing
|
|
func (e *Event) SetSpanID(spanID string) {
|
|
e.SetMetadata(MetadataKeySpanID, spanID)
|
|
}
|
|
|
|
// GetSpanID returns the span ID metadata
|
|
func (e *Event) GetSpanID() string {
|
|
return e.GetMetadata(MetadataKeySpanID)
|
|
}
|
|
|
|
// WithMetadataFrom copies metadata from another event (useful for event chaining)
|
|
func (e *Event) WithMetadataFrom(source *Event) {
|
|
if source == nil || source.Metadata == nil {
|
|
return
|
|
}
|
|
if e.Metadata == nil {
|
|
e.Metadata = make(map[string]string)
|
|
}
|
|
for k, v := range source.Metadata {
|
|
e.Metadata[k] = v
|
|
}
|
|
}
|
|
|
|
// ActorSnapshot represents a point-in-time state snapshot
|
|
type ActorSnapshot struct {
|
|
ActorID string `json:"actorId"`
|
|
Version int64 `json:"version"`
|
|
State map[string]interface{} `json:"state"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
// EventStore defines the interface for event persistence.
|
|
//
|
|
// # Version Semantics
|
|
//
|
|
// Events for an actor must have monotonically increasing versions. When SaveEvent
|
|
// is called, the implementation must validate that the event's version is strictly
|
|
// greater than the current latest version for that actor. If the version is less
|
|
// than or equal to the current version, SaveEvent must return a VersionConflictError
|
|
// (which wraps ErrVersionConflict).
|
|
//
|
|
// This validation ensures event stream integrity and enables optimistic concurrency
|
|
// control. Clients should:
|
|
// 1. Call GetLatestVersion to get the current version for an actor
|
|
// 2. Set the new event's version to currentVersion + 1
|
|
// 3. Call SaveEvent - if it returns ErrVersionConflict, another writer won
|
|
// 4. On conflict, reload the latest version and retry if appropriate
|
|
//
|
|
// For new actors (no existing events), version 1 is expected for the first event.
|
|
type EventStore interface {
|
|
// SaveEvent persists an event to the store. The event's Version must be
|
|
// strictly greater than the current latest version for the actor.
|
|
// Returns VersionConflictError if version <= current latest version.
|
|
SaveEvent(event *Event) error
|
|
|
|
// GetEvents retrieves events for an actor from a specific version (inclusive).
|
|
// Returns an empty slice if no events exist for the actor.
|
|
GetEvents(actorID string, fromVersion int64) ([]*Event, error)
|
|
|
|
// GetLatestVersion returns the latest version for an actor.
|
|
// Returns 0 if no events exist for the actor.
|
|
GetLatestVersion(actorID string) (int64, error)
|
|
}
|
|
|
|
// SnapshotStore extends EventStore with snapshot capabilities
|
|
type SnapshotStore interface {
|
|
EventStore
|
|
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
|
|
SaveSnapshot(snapshot *ActorSnapshot) error
|
|
}
|