package store import ( "sync" "time" "git.flowmade.one/flowmade-one/aether" "github.com/google/uuid" ) // InMemoryEventStore provides a simple in-memory event store for testing type InMemoryEventStore struct { mu sync.RWMutex events map[string][]*aether.Event // actorID -> events snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version) broadcaster aether.EventBroadcaster // optional broadcaster for EventStored events namespace string // optional namespace for event publishing } // NewInMemoryEventStore creates a new in-memory event store func NewInMemoryEventStore() *InMemoryEventStore { return &InMemoryEventStore{ events: make(map[string][]*aether.Event), snapshots: make(map[string][]*aether.ActorSnapshot), } } // NewInMemoryEventStoreWithBroadcaster creates a new in-memory event store with an event broadcaster // The broadcaster receives EventStored events when events are successfully saved. func NewInMemoryEventStoreWithBroadcaster(broadcaster aether.EventBroadcaster, namespace string) *InMemoryEventStore { return &InMemoryEventStore{ events: make(map[string][]*aether.Event), snapshots: make(map[string][]*aether.ActorSnapshot), broadcaster: broadcaster, namespace: namespace, } } // SaveEvent saves an event to the in-memory store. // Returns VersionConflictError if the event's version is not strictly greater // than the current latest version for the actor. // If a broadcaster is configured, publishes an EventStored event on success. func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { es.mu.Lock() defer es.mu.Unlock() // Get current latest version for this actor currentVersion := int64(0) if events, exists := es.events[event.ActorID]; exists { for _, e := range events { if e.Version > currentVersion { currentVersion = e.Version } } } // Validate version is strictly greater than current if event.Version <= currentVersion { return &aether.VersionConflictError{ ActorID: event.ActorID, AttemptedVersion: event.Version, CurrentVersion: currentVersion, } } if _, exists := es.events[event.ActorID]; !exists { es.events[event.ActorID] = make([]*aether.Event, 0) } es.events[event.ActorID] = append(es.events[event.ActorID], event) // Publish EventStored event after successful save (if broadcaster is configured) if es.broadcaster != nil { es.publishEventStored(event) } return nil } // publishEventStored publishes an EventStored event to the broadcaster. // This is called after a successful SaveEvent to notify subscribers. func (es *InMemoryEventStore) publishEventStored(originalEvent *aether.Event) { eventStored := &aether.Event{ ID: uuid.New().String(), EventType: aether.EventTypeEventStored, ActorID: originalEvent.ActorID, // EventStored is about the original actor Version: originalEvent.Version, // Preserve the version of the stored event Data: map[string]interface{}{ "eventId": originalEvent.ID, "actorId": originalEvent.ActorID, "version": originalEvent.Version, "timestamp": originalEvent.Timestamp.Unix(), }, Timestamp: time.Now(), } es.broadcaster.Publish(es.namespace, eventStored) } // GetEvents retrieves events for an actor from a specific version func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) { es.mu.RLock() defer es.mu.RUnlock() events, exists := es.events[actorID] if !exists { return []*aether.Event{}, nil } var filteredEvents []*aether.Event for _, event := range events { if event.Version >= fromVersion { filteredEvents = append(filteredEvents, event) } } return filteredEvents, nil } // GetLatestVersion returns the latest version for an actor func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) { es.mu.RLock() defer es.mu.RUnlock() events, exists := es.events[actorID] if !exists || len(events) == 0 { return 0, nil } latestVersion := int64(0) for _, event := range events { if event.Version > latestVersion { latestVersion = event.Version } } return latestVersion, nil } // SaveSnapshot saves a snapshot to the in-memory store func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error { if snapshot == nil { return &snapshotNilError{} } es.mu.Lock() defer es.mu.Unlock() if _, exists := es.snapshots[snapshot.ActorID]; !exists { es.snapshots[snapshot.ActorID] = make([]*aether.ActorSnapshot, 0) } es.snapshots[snapshot.ActorID] = append(es.snapshots[snapshot.ActorID], snapshot) return nil } // GetLatestSnapshot returns the most recent snapshot for an actor func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) { es.mu.RLock() defer es.mu.RUnlock() snapshots, exists := es.snapshots[actorID] if !exists || len(snapshots) == 0 { return nil, nil } var latest *aether.ActorSnapshot for _, snapshot := range snapshots { if latest == nil || snapshot.Version > latest.Version { latest = snapshot } } return latest, nil } // snapshotNilError is returned when attempting to save a nil snapshot type snapshotNilError struct{} func (e *snapshotNilError) Error() string { return "snapshot cannot be nil" }