Files
aether/store/memory.go
Claude Code 5223cf136a fix: address review feedback
- Removed duplicate blank line in event.go
- Use original event timestamp instead of time.Now() for EventStored
- Fixed MockEventBroadcaster.Subscribe to return nil instead of closed channel
- Added integration tests for EventStored with JetStreamEventStore

Co-Authored-By: Claude Code <noreply@anthropic.com>
2026-01-13 22:25:10 +01:00

197 lines
5.4 KiB
Go

package store
import (
"sync"
"git.flowmade.one/flowmade-one/aether"
)
// InMemoryEventStore provides a simple in-memory event store for testing
type InMemoryEventStore struct {
mu sync.RWMutex
events map[string][]*aether.Event // actorID -> events
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
eventBus aether.EventBroadcaster // Optional EventBus for publishing EventStored
metrics aether.MetricsCollector // Optional metrics collector
}
// NewInMemoryEventStore creates a new in-memory event store
func NewInMemoryEventStore() *InMemoryEventStore {
return &InMemoryEventStore{
events: make(map[string][]*aether.Event),
snapshots: make(map[string][]*aether.ActorSnapshot),
}
}
// WithEventBus sets the EventBus for publishing EventStored events.
// This is optional - if not set, EventStored will not be published.
func (es *InMemoryEventStore) WithEventBus(bus aether.EventBroadcaster) *InMemoryEventStore {
es.eventBus = bus
return es
}
// WithMetrics sets the metrics collector for recording EventStored metrics.
// This is optional - if not set, metrics will not be recorded.
func (es *InMemoryEventStore) WithMetrics(metrics aether.MetricsCollector) *InMemoryEventStore {
es.metrics = metrics
return es
}
// SaveEvent saves an event to the in-memory store.
// Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor.
// On success, publishes an EventStored event to the EventBus (if configured).
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
es.mu.Lock()
defer es.mu.Unlock()
// Get current latest version for this actor
currentVersion := int64(0)
if events, exists := es.events[event.ActorID]; exists {
for _, e := range events {
if e.Version > currentVersion {
currentVersion = e.Version
}
}
}
// Validate version is strictly greater than current
if event.Version <= currentVersion {
return &aether.VersionConflictError{
ActorID: event.ActorID,
AttemptedVersion: event.Version,
CurrentVersion: currentVersion,
}
}
if _, exists := es.events[event.ActorID]; !exists {
es.events[event.ActorID] = make([]*aether.Event, 0)
}
es.events[event.ActorID] = append(es.events[event.ActorID], event)
// Publish EventStored event on success
es.publishEventStored(event)
return nil
}
// publishEventStored publishes an EventStored event to the EventBus and records metrics
func (es *InMemoryEventStore) publishEventStored(event *aether.Event) {
if es.eventBus == nil {
return
}
stored := &aether.EventStored{
EventID: event.ID,
ActorID: event.ActorID,
Version: event.Version,
Timestamp: event.Timestamp,
}
// Convert EventStored to Event for publishing (internal system event)
storedEvent := &aether.Event{
ID: "eventstored-" + event.ID,
EventType: "EventStored",
ActorID: event.ActorID,
Version: event.Version,
Data: map[string]interface{}{
"eventId": stored.EventID,
"actorId": stored.ActorID,
"version": stored.Version,
"timestamp": stored.Timestamp,
},
Timestamp: stored.Timestamp,
}
// Publish to default namespace (internal events)
es.eventBus.Publish("__internal__", storedEvent)
// Record metrics if collector is configured
if es.metrics != nil {
es.metrics.RecordPublish("__internal__")
}
}
// GetEvents retrieves events for an actor from a specific version
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
es.mu.RLock()
defer es.mu.RUnlock()
events, exists := es.events[actorID]
if !exists {
return []*aether.Event{}, nil
}
var filteredEvents []*aether.Event
for _, event := range events {
if event.Version >= fromVersion {
filteredEvents = append(filteredEvents, event)
}
}
return filteredEvents, nil
}
// GetLatestVersion returns the latest version for an actor
func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) {
es.mu.RLock()
defer es.mu.RUnlock()
events, exists := es.events[actorID]
if !exists || len(events) == 0 {
return 0, nil
}
latestVersion := int64(0)
for _, event := range events {
if event.Version > latestVersion {
latestVersion = event.Version
}
}
return latestVersion, nil
}
// SaveSnapshot saves a snapshot to the in-memory store
func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error {
if snapshot == nil {
return &snapshotNilError{}
}
es.mu.Lock()
defer es.mu.Unlock()
if _, exists := es.snapshots[snapshot.ActorID]; !exists {
es.snapshots[snapshot.ActorID] = make([]*aether.ActorSnapshot, 0)
}
es.snapshots[snapshot.ActorID] = append(es.snapshots[snapshot.ActorID], snapshot)
return nil
}
// GetLatestSnapshot returns the most recent snapshot for an actor
func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
es.mu.RLock()
defer es.mu.RUnlock()
snapshots, exists := es.snapshots[actorID]
if !exists || len(snapshots) == 0 {
return nil, nil
}
var latest *aether.ActorSnapshot
for _, snapshot := range snapshots {
if latest == nil || snapshot.Version > latest.Version {
latest = snapshot
}
}
return latest, nil
}
// snapshotNilError is returned when attempting to save a nil snapshot
type snapshotNilError struct{}
func (e *snapshotNilError) Error() string {
return "snapshot cannot be nil"
}