package store import ( "sync" "time" "git.flowmade.one/flowmade-one/aether" ) // InMemoryEventStore provides a simple in-memory event store for testing type InMemoryEventStore struct { mu sync.RWMutex events map[string][]*aether.Event // actorID -> events snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version) eventBus aether.EventBroadcaster // Optional EventBus for publishing EventStored metrics aether.MetricsCollector // Optional metrics collector } // NewInMemoryEventStore creates a new in-memory event store func NewInMemoryEventStore() *InMemoryEventStore { return &InMemoryEventStore{ events: make(map[string][]*aether.Event), snapshots: make(map[string][]*aether.ActorSnapshot), } } // WithEventBus sets the EventBus for publishing EventStored events. // This is optional - if not set, EventStored will not be published. func (es *InMemoryEventStore) WithEventBus(bus aether.EventBroadcaster) *InMemoryEventStore { es.eventBus = bus return es } // WithMetrics sets the metrics collector for recording EventStored metrics. // This is optional - if not set, metrics will not be recorded. func (es *InMemoryEventStore) WithMetrics(metrics aether.MetricsCollector) *InMemoryEventStore { es.metrics = metrics return es } // SaveEvent saves an event to the in-memory store. // Returns VersionConflictError if the event's version is not strictly greater // than the current latest version for the actor. // On success, publishes an EventStored event to the EventBus (if configured). func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { es.mu.Lock() defer es.mu.Unlock() // Get current latest version for this actor currentVersion := int64(0) if events, exists := es.events[event.ActorID]; exists { for _, e := range events { if e.Version > currentVersion { currentVersion = e.Version } } } // Validate version is strictly greater than current if event.Version <= currentVersion { return &aether.VersionConflictError{ ActorID: event.ActorID, AttemptedVersion: event.Version, CurrentVersion: currentVersion, } } if _, exists := es.events[event.ActorID]; !exists { es.events[event.ActorID] = make([]*aether.Event, 0) } es.events[event.ActorID] = append(es.events[event.ActorID], event) // Publish EventStored event on success es.publishEventStored(event) return nil } // publishEventStored publishes an EventStored event to the EventBus and records metrics func (es *InMemoryEventStore) publishEventStored(event *aether.Event) { if es.eventBus == nil { return } stored := &aether.EventStored{ EventID: event.ID, ActorID: event.ActorID, Version: event.Version, Timestamp: time.Now(), } // Convert EventStored to Event for publishing (internal system event) storedEvent := &aether.Event{ ID: "eventstored-" + event.ID, EventType: "EventStored", ActorID: event.ActorID, Version: event.Version, Data: map[string]interface{}{ "eventId": stored.EventID, "actorId": stored.ActorID, "version": stored.Version, "timestamp": stored.Timestamp, }, Timestamp: stored.Timestamp, } // Publish to default namespace (internal events) es.eventBus.Publish("__internal__", storedEvent) // Record metrics if collector is configured if es.metrics != nil { es.metrics.RecordPublish("__internal__") } } // GetEvents retrieves events for an actor from a specific version func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) { es.mu.RLock() defer es.mu.RUnlock() events, exists := es.events[actorID] if !exists { return []*aether.Event{}, nil } var filteredEvents []*aether.Event for _, event := range events { if event.Version >= fromVersion { filteredEvents = append(filteredEvents, event) } } return filteredEvents, nil } // GetLatestVersion returns the latest version for an actor func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) { es.mu.RLock() defer es.mu.RUnlock() events, exists := es.events[actorID] if !exists || len(events) == 0 { return 0, nil } latestVersion := int64(0) for _, event := range events { if event.Version > latestVersion { latestVersion = event.Version } } return latestVersion, nil } // SaveSnapshot saves a snapshot to the in-memory store func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error { if snapshot == nil { return &snapshotNilError{} } es.mu.Lock() defer es.mu.Unlock() if _, exists := es.snapshots[snapshot.ActorID]; !exists { es.snapshots[snapshot.ActorID] = make([]*aether.ActorSnapshot, 0) } es.snapshots[snapshot.ActorID] = append(es.snapshots[snapshot.ActorID], snapshot) return nil } // GetLatestSnapshot returns the most recent snapshot for an actor func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) { es.mu.RLock() defer es.mu.RUnlock() snapshots, exists := es.snapshots[actorID] if !exists || len(snapshots) == 0 { return nil, nil } var latest *aether.ActorSnapshot for _, snapshot := range snapshots { if latest == nil || snapshot.Version > latest.Version { latest = snapshot } } return latest, nil } // snapshotNilError is returned when attempting to save a nil snapshot type snapshotNilError struct{} func (e *snapshotNilError) Error() string { return "snapshot cannot be nil" }