- Add ErrVersionConflict error type and VersionConflictError for detailed conflict information - Implement version validation in InMemoryEventStore.SaveEvent that rejects events with version <= current latest version - Implement version validation in JetStreamEventStore.SaveEvent with version caching for performance - Add comprehensive tests for version conflict detection including concurrent writes to same actor - Document versioning semantics in EventStore interface and CLAUDE.md This ensures events have monotonically increasing versions per actor and provides clear error messages for version conflicts, enabling optimistic concurrency control patterns. Closes #6 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
282 lines
7.4 KiB
Go
282 lines
7.4 KiB
Go
package store
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.flowmade.one/flowmade-one/aether"
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
// JetStreamEventStore implements EventStore using NATS JetStream for persistence
|
|
type JetStreamEventStore struct {
|
|
js nats.JetStreamContext
|
|
streamName string
|
|
mu sync.Mutex // Protects version checks during SaveEvent
|
|
versions map[string]int64 // actorID -> latest version cache
|
|
}
|
|
|
|
// NewJetStreamEventStore creates a new JetStream-based event store
|
|
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
|
|
js, err := natsConn.JetStream()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
|
|
}
|
|
|
|
// Create or update the stream
|
|
stream := &nats.StreamConfig{
|
|
Name: streamName,
|
|
Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)},
|
|
Storage: nats.FileStorage,
|
|
Retention: nats.LimitsPolicy,
|
|
MaxAge: 365 * 24 * time.Hour, // Keep events for 1 year
|
|
Replicas: 1, // Can be increased for HA
|
|
}
|
|
|
|
_, err = js.AddStream(stream)
|
|
if err != nil && !strings.Contains(err.Error(), "already exists") {
|
|
return nil, fmt.Errorf("failed to create stream: %w", err)
|
|
}
|
|
|
|
return &JetStreamEventStore{
|
|
js: js,
|
|
streamName: streamName,
|
|
versions: make(map[string]int64),
|
|
}, nil
|
|
}
|
|
|
|
// SaveEvent persists an event to JetStream.
|
|
// Returns VersionConflictError if the event's version is not strictly greater
|
|
// than the current latest version for the actor.
|
|
func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
|
jes.mu.Lock()
|
|
defer jes.mu.Unlock()
|
|
|
|
// Get current latest version for this actor
|
|
currentVersion, err := jes.getLatestVersionLocked(event.ActorID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get latest version: %w", err)
|
|
}
|
|
|
|
// Validate version is strictly greater than current
|
|
if event.Version <= currentVersion {
|
|
return &aether.VersionConflictError{
|
|
ActorID: event.ActorID,
|
|
AttemptedVersion: event.Version,
|
|
CurrentVersion: currentVersion,
|
|
}
|
|
}
|
|
|
|
// Serialize event to JSON
|
|
data, err := json.Marshal(event)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal event: %w", err)
|
|
}
|
|
|
|
// Create subject: stream.events.actorType.actorID
|
|
subject := fmt.Sprintf("%s.events.%s.%s",
|
|
jes.streamName,
|
|
sanitizeSubject(extractActorType(event.ActorID)),
|
|
sanitizeSubject(event.ActorID))
|
|
|
|
// Publish with event ID as message ID for deduplication
|
|
_, err = jes.js.Publish(subject, data, nats.MsgId(event.ID))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to publish event to JetStream: %w", err)
|
|
}
|
|
|
|
// Update version cache
|
|
jes.versions[event.ActorID] = event.Version
|
|
|
|
return nil
|
|
}
|
|
|
|
// getLatestVersionLocked returns the latest version for an actor.
|
|
// Caller must hold jes.mu.
|
|
func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) {
|
|
// Check cache first
|
|
if version, ok := jes.versions[actorID]; ok {
|
|
return version, nil
|
|
}
|
|
|
|
// Fetch from JetStream
|
|
events, err := jes.getEventsInternal(actorID, 0)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if len(events) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
latestVersion := int64(0)
|
|
for _, event := range events {
|
|
if event.Version > latestVersion {
|
|
latestVersion = event.Version
|
|
}
|
|
}
|
|
|
|
// Update cache
|
|
jes.versions[actorID] = latestVersion
|
|
|
|
return latestVersion, nil
|
|
}
|
|
|
|
// GetEvents retrieves all events for an actor since a version
|
|
func (jes *JetStreamEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
|
return jes.getEventsInternal(actorID, fromVersion)
|
|
}
|
|
|
|
// getEventsInternal is the internal implementation of GetEvents
|
|
func (jes *JetStreamEventStore) getEventsInternal(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
|
// Create subject filter for this actor
|
|
subject := fmt.Sprintf("%s.events.%s.%s",
|
|
jes.streamName,
|
|
sanitizeSubject(extractActorType(actorID)),
|
|
sanitizeSubject(actorID))
|
|
|
|
// Create consumer to read events
|
|
consumer, err := jes.js.PullSubscribe(subject, "")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create consumer: %w", err)
|
|
}
|
|
defer consumer.Unsubscribe()
|
|
|
|
var events []*aether.Event
|
|
|
|
// Fetch messages in batches
|
|
for {
|
|
msgs, err := consumer.Fetch(100, nats.MaxWait(time.Second))
|
|
if err != nil {
|
|
if err == nats.ErrTimeout {
|
|
break // No more messages
|
|
}
|
|
return nil, fmt.Errorf("failed to fetch messages: %w", err)
|
|
}
|
|
|
|
for _, msg := range msgs {
|
|
var event aether.Event
|
|
if err := json.Unmarshal(msg.Data, &event); err != nil {
|
|
continue // Skip malformed events
|
|
}
|
|
|
|
// Filter by version
|
|
if event.Version > fromVersion {
|
|
events = append(events, &event)
|
|
}
|
|
|
|
msg.Ack()
|
|
}
|
|
|
|
if len(msgs) < 100 {
|
|
break // No more messages
|
|
}
|
|
}
|
|
|
|
return events, nil
|
|
}
|
|
|
|
// GetLatestVersion returns the latest version for an actor
|
|
func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) {
|
|
events, err := jes.GetEvents(actorID, 0)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if len(events) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
latestVersion := int64(0)
|
|
for _, event := range events {
|
|
if event.Version > latestVersion {
|
|
latestVersion = event.Version
|
|
}
|
|
}
|
|
|
|
return latestVersion, nil
|
|
}
|
|
|
|
// GetLatestSnapshot gets the most recent snapshot for an actor
|
|
func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
|
|
// Create subject for snapshots
|
|
subject := fmt.Sprintf("%s.snapshots.%s.%s",
|
|
jes.streamName,
|
|
sanitizeSubject(extractActorType(actorID)),
|
|
sanitizeSubject(actorID))
|
|
|
|
// Try to get the latest snapshot
|
|
consumer, err := jes.js.PullSubscribe(subject, "", nats.DeliverLast())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create snapshot consumer: %w", err)
|
|
}
|
|
defer consumer.Unsubscribe()
|
|
|
|
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
|
|
if err != nil {
|
|
if err == nats.ErrTimeout {
|
|
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
|
|
}
|
|
return nil, fmt.Errorf("failed to fetch snapshot: %w", err)
|
|
}
|
|
|
|
if len(msgs) == 0 {
|
|
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
|
|
}
|
|
|
|
var snapshot aether.ActorSnapshot
|
|
if err := json.Unmarshal(msgs[0].Data, &snapshot); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal snapshot: %w", err)
|
|
}
|
|
|
|
msgs[0].Ack()
|
|
return &snapshot, nil
|
|
}
|
|
|
|
// SaveSnapshot saves a snapshot of actor state
|
|
func (jes *JetStreamEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error {
|
|
// Serialize snapshot to JSON
|
|
data, err := json.Marshal(snapshot)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal snapshot: %w", err)
|
|
}
|
|
|
|
// Create subject for snapshots
|
|
subject := fmt.Sprintf("%s.snapshots.%s.%s",
|
|
jes.streamName,
|
|
sanitizeSubject(extractActorType(snapshot.ActorID)),
|
|
sanitizeSubject(snapshot.ActorID))
|
|
|
|
// Publish snapshot
|
|
_, err = jes.js.Publish(subject, data)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to publish snapshot to JetStream: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Helper functions
|
|
|
|
// extractActorType extracts the actor type from an actor ID
|
|
func extractActorType(actorID string) string {
|
|
for i, c := range actorID {
|
|
if c == '-' && i > 0 {
|
|
return actorID[:i]
|
|
}
|
|
}
|
|
return "unknown"
|
|
}
|
|
|
|
// sanitizeSubject sanitizes a string for use in NATS subjects
|
|
func sanitizeSubject(s string) string {
|
|
s = strings.ReplaceAll(s, " ", "_")
|
|
s = strings.ReplaceAll(s, ".", "_")
|
|
s = strings.ReplaceAll(s, "*", "_")
|
|
s = strings.ReplaceAll(s, ">", "_")
|
|
return s
|
|
}
|