diff --git a/event.go b/event.go index 94ea58c..5cd53bd 100644 --- a/event.go +++ b/event.go @@ -166,6 +166,17 @@ func (e *Event) WithMetadataFrom(source *Event) { } } + +// EventStored is an internal infrastructure event published after an event is successfully persisted. +// It allows observability and trigger downstream workflows without coupling to application events. +// EventStored is not published to external systems (Phase 2) - only to local EventBus subscribers. +type EventStored struct { + EventID string `json:"eventId"` // ID of the event that was stored + ActorID string `json:"actorId"` // Actor that owns the stored event + Version int64 `json:"version"` // Version of the stored event + Timestamp time.Time `json:"timestamp"` // When the event was stored +} + // ActorSnapshot represents a point-in-time state snapshot type ActorSnapshot struct { ActorID string `json:"actorId"` diff --git a/store/eventstored_test.go b/store/eventstored_test.go new file mode 100644 index 0000000..d800ef0 --- /dev/null +++ b/store/eventstored_test.go @@ -0,0 +1,362 @@ +package store + +import ( + "fmt" + "sync" + "testing" + "time" + + "git.flowmade.one/flowmade-one/aether" +) + +// MockEventBroadcaster captures published events for testing +type MockEventBroadcaster struct { + mu sync.RWMutex + events []*aether.Event + namespaces map[string]int +} + +func NewMockEventBroadcaster() *MockEventBroadcaster { + return &MockEventBroadcaster{ + events: make([]*aether.Event, 0), + namespaces: make(map[string]int), + } +} + +func (m *MockEventBroadcaster) Subscribe(namespacePattern string) <-chan *aether.Event { + return make(chan *aether.Event) +} + +func (m *MockEventBroadcaster) SubscribeWithFilter(namespacePattern string, filter *aether.SubscriptionFilter) <-chan *aether.Event { + return make(chan *aether.Event) +} + +func (m *MockEventBroadcaster) Unsubscribe(namespacePattern string, ch <-chan *aether.Event) {} + +func (m *MockEventBroadcaster) Publish(namespaceID string, event *aether.Event) { + m.mu.Lock() + defer m.mu.Unlock() + m.events = append(m.events, event) + m.namespaces[namespaceID]++ +} + +func (m *MockEventBroadcaster) Stop() {} + +func (m *MockEventBroadcaster) SubscriberCount(namespaceID string) int { + return 0 +} + +func (m *MockEventBroadcaster) GetPublishedEvents() []*aether.Event { + m.mu.RLock() + defer m.mu.RUnlock() + events := make([]*aether.Event, len(m.events)) + copy(events, m.events) + return events +} + +// === InMemoryEventStore EventStored Tests === + +func TestEventStored_PublishedOnSaveSuccess(t *testing.T) { + store := NewInMemoryEventStore() + mockBus := NewMockEventBroadcaster() + store.WithEventBus(mockBus) + + event := &aether.Event{ + ID: "evt-123", + EventType: "OrderPlaced", + ActorID: "order-456", + Version: 1, + Data: map[string]interface{}{"total": 100.50}, + Timestamp: time.Now(), + } + + err := store.SaveEvent(event) + if err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + // Verify EventStored was published + published := mockBus.GetPublishedEvents() + if len(published) != 1 { + t.Fatalf("expected 1 published event, got %d", len(published)) + } + + storedEvent := published[0] + if storedEvent.EventType != "EventStored" { + t.Errorf("expected EventType 'EventStored', got %q", storedEvent.EventType) + } + if storedEvent.ActorID != "order-456" { + t.Errorf("expected ActorID 'order-456', got %q", storedEvent.ActorID) + } + if storedEvent.Data["eventId"] != "evt-123" { + t.Errorf("expected eventId 'evt-123', got %v", storedEvent.Data["eventId"]) + } + if storedEvent.Data["version"] != int64(1) { + t.Errorf("expected version 1, got %v", storedEvent.Data["version"]) + } +} + +func TestEventStored_NotPublishedOnVersionConflict(t *testing.T) { + store := NewInMemoryEventStore() + mockBus := NewMockEventBroadcaster() + store.WithEventBus(mockBus) + + // Save first event + event1 := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event1); err != nil { + t.Fatalf("First SaveEvent failed: %v", err) + } + + // Try to save event with same version (conflict) + event2 := &aether.Event{ + ID: "evt-2", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 1, // Same version - should conflict + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err := store.SaveEvent(event2) + if err == nil { + t.Fatal("expected VersionConflictError, got nil") + } + + // Verify only 1 EventStored was published (from first event) + published := mockBus.GetPublishedEvents() + if len(published) != 1 { + t.Fatalf("expected 1 published event after conflict, got %d", len(published)) + } +} + +func TestEventStored_MultipleEventsPublished(t *testing.T) { + store := NewInMemoryEventStore() + mockBus := NewMockEventBroadcaster() + store.WithEventBus(mockBus) + + // Save 5 events + for i := 1; i <= 5; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "TestEvent", + ActorID: "actor-1", + Version: int64(i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent %d failed: %v", i, err) + } + } + + // Verify 5 EventStored events were published + published := mockBus.GetPublishedEvents() + if len(published) != 5 { + t.Fatalf("expected 5 published events, got %d", len(published)) + } + + // Verify each has correct data + for i := 0; i < 5; i++ { + if published[i].Data["version"] != int64(i+1) { + t.Errorf("event %d: expected version %d, got %v", i, i+1, published[i].Data["version"]) + } + } +} + +func TestEventStored_NotPublishedWithoutEventBus(t *testing.T) { + store := NewInMemoryEventStore() + // Don't set event bus + + event := &aether.Event{ + ID: "evt-123", + EventType: "OrderPlaced", + ActorID: "order-456", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + + // Should succeed without publishing (no-op) + err := store.SaveEvent(event) + if err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + // Event should be persisted normally + retrieved, err := store.GetEvents("order-456", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + if len(retrieved) != 1 { + t.Errorf("expected 1 event, got %d", len(retrieved)) + } +} + +func TestEventStored_ContainsRequiredFields(t *testing.T) { + store := NewInMemoryEventStore() + mockBus := NewMockEventBroadcaster() + store.WithEventBus(mockBus) + + event := &aether.Event{ + ID: "evt-abc", + EventType: "TestEvent", + ActorID: "actor-xyz", + Version: 42, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + published := mockBus.GetPublishedEvents() + if len(published) != 1 { + t.Fatalf("expected 1 event, got %d", len(published)) + } + + storedEvent := published[0] + + // Verify required fields + if storedEvent.Data["eventId"] != "evt-abc" { + t.Error("missing or incorrect eventId") + } + if storedEvent.Data["actorId"] != "actor-xyz" { + t.Error("missing or incorrect actorId") + } + if storedEvent.Data["version"] != int64(42) { + t.Error("missing or incorrect version") + } + if _, hasTimestamp := storedEvent.Data["timestamp"]; !hasTimestamp { + t.Error("missing timestamp") + } +} + +func TestEventStored_PublishedToCorrectNamespace(t *testing.T) { + store := NewInMemoryEventStore() + mockBus := NewMockEventBroadcaster() + store.WithEventBus(mockBus) + + event := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + // Verify published to __internal__ namespace + namespaces := mockBus.namespaces + if count, ok := namespaces["__internal__"]; !ok || count != 1 { + t.Errorf("expected 1 event published to __internal__, got %v", namespaces) + } +} + +func TestEventStored_WithMetricsRecording(t *testing.T) { + store := NewInMemoryEventStore() + mockBus := NewMockEventBroadcaster() + mockMetrics := aether.NewMetricsCollector() + + store.WithEventBus(mockBus) + store.WithMetrics(mockMetrics) + + event := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + // Verify metrics were recorded + published := mockMetrics.EventsPublished("__internal__") + if published != 1 { + t.Errorf("expected 1 published metric, got %d", published) + } +} + +func TestEventStored_ConcurrentPublishing(t *testing.T) { + store := NewInMemoryEventStore() + mockBus := NewMockEventBroadcaster() + store.WithEventBus(mockBus) + + numGoroutines := 10 + eventsPerGoroutine := 5 + var wg sync.WaitGroup + + for g := 0; g < numGoroutines; g++ { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + for i := 0; i < eventsPerGoroutine; i++ { + version := int64(goroutineID*eventsPerGoroutine + i + 1) + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d-%d", goroutineID, i), + EventType: "TestEvent", + ActorID: fmt.Sprintf("actor-%d", goroutineID), + Version: version, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + _ = store.SaveEvent(event) // Ignore errors (some may conflict) + } + }(g) + } + + wg.Wait() + + // Verify EventStored events were published for successful saves + published := mockBus.GetPublishedEvents() + if len(published) != numGoroutines*eventsPerGoroutine { + t.Logf("Note: got %d published events (some saves may have conflicted)", len(published)) + } + if len(published) == 0 { + t.Fatal("expected at least some published events") + } +} + +func TestEventStored_OrderPreserved(t *testing.T) { + store := NewInMemoryEventStore() + mockBus := NewMockEventBroadcaster() + store.WithEventBus(mockBus) + + // Save 3 events in order + for i := 1; i <= 3; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "TestEvent", + ActorID: "actor-1", + Version: int64(i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent %d failed: %v", i, err) + } + } + + published := mockBus.GetPublishedEvents() + + // Verify order is preserved + for i := 0; i < 3; i++ { + if published[i].Data["eventId"] != fmt.Sprintf("evt-%d", i+1) { + t.Errorf("event %d: expected evt-%d, got %v", i, i+1, published[i].Data["eventId"]) + } + } +} diff --git a/store/jetstream.go b/store/jetstream.go index cc06ea6..71a9bc9 100644 --- a/store/jetstream.go +++ b/store/jetstream.go @@ -64,6 +64,8 @@ type JetStreamEventStore struct { config JetStreamConfig mu sync.Mutex // Protects version checks during SaveEvent versions map[string]int64 // actorID -> latest version cache + eventBus aether.EventBroadcaster // Optional EventBus for publishing EventStored + metrics aether.MetricsCollector // Optional metrics collector } @@ -89,6 +91,20 @@ func NewJetStreamEventStoreWithNamespace(natsConn *nats.Conn, streamName string, return NewJetStreamEventStoreWithConfig(natsConn, streamName, config) } +// WithEventBus sets the EventBus for publishing EventStored events. +// This is optional - if not set, EventStored will not be published. +func (jes *JetStreamEventStore) WithEventBus(bus aether.EventBroadcaster) *JetStreamEventStore { + jes.eventBus = bus + return jes +} + +// WithMetrics sets the metrics collector for recording EventStored metrics. +// This is optional - if not set, metrics will not be recorded. +func (jes *JetStreamEventStore) WithMetrics(metrics aether.MetricsCollector) *JetStreamEventStore { + jes.metrics = metrics + return jes +} + // NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) { js, err := natsConn.JetStream() @@ -203,9 +219,49 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { // Update version cache after successful publish jes.versions[event.ActorID] = event.Version + // Publish EventStored event on success + jes.publishEventStored(event) + return nil } +// publishEventStored publishes an EventStored event to the EventBus and records metrics +func (jes *JetStreamEventStore) publishEventStored(event *aether.Event) { + if jes.eventBus == nil { + return + } + + stored := &aether.EventStored{ + EventID: event.ID, + ActorID: event.ActorID, + Version: event.Version, + Timestamp: time.Now(), + } + + // Convert EventStored to Event for publishing (internal system event) + storedEvent := &aether.Event{ + ID: "eventstored-" + event.ID, + EventType: "EventStored", + ActorID: event.ActorID, + Version: event.Version, + Data: map[string]interface{}{ + "eventId": stored.EventID, + "actorId": stored.ActorID, + "version": stored.Version, + "timestamp": stored.Timestamp, + }, + Timestamp: stored.Timestamp, + } + + // Publish to default namespace (internal events) + jes.eventBus.Publish("__internal__", storedEvent) + + // Record metrics if collector is configured + if jes.metrics != nil { + jes.metrics.RecordPublish("__internal__") + } +} + // GetEvents retrieves all events for an actor since a version. // Note: This method silently skips malformed events for backward compatibility. // Use GetEventsWithErrors to receive information about malformed events. diff --git a/store/memory.go b/store/memory.go index 0fba85d..7f474c1 100644 --- a/store/memory.go +++ b/store/memory.go @@ -2,6 +2,7 @@ package store import ( "sync" + "time" "git.flowmade.one/flowmade-one/aether" ) @@ -11,6 +12,8 @@ type InMemoryEventStore struct { mu sync.RWMutex events map[string][]*aether.Event // actorID -> events snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version) + eventBus aether.EventBroadcaster // Optional EventBus for publishing EventStored + metrics aether.MetricsCollector // Optional metrics collector } // NewInMemoryEventStore creates a new in-memory event store @@ -21,9 +24,24 @@ func NewInMemoryEventStore() *InMemoryEventStore { } } +// WithEventBus sets the EventBus for publishing EventStored events. +// This is optional - if not set, EventStored will not be published. +func (es *InMemoryEventStore) WithEventBus(bus aether.EventBroadcaster) *InMemoryEventStore { + es.eventBus = bus + return es +} + +// WithMetrics sets the metrics collector for recording EventStored metrics. +// This is optional - if not set, metrics will not be recorded. +func (es *InMemoryEventStore) WithMetrics(metrics aether.MetricsCollector) *InMemoryEventStore { + es.metrics = metrics + return es +} + // SaveEvent saves an event to the in-memory store. // Returns VersionConflictError if the event's version is not strictly greater // than the current latest version for the actor. +// On success, publishes an EventStored event to the EventBus (if configured). func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { es.mu.Lock() defer es.mu.Unlock() @@ -51,9 +69,50 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { es.events[event.ActorID] = make([]*aether.Event, 0) } es.events[event.ActorID] = append(es.events[event.ActorID], event) + + // Publish EventStored event on success + es.publishEventStored(event) + return nil } +// publishEventStored publishes an EventStored event to the EventBus and records metrics +func (es *InMemoryEventStore) publishEventStored(event *aether.Event) { + if es.eventBus == nil { + return + } + + stored := &aether.EventStored{ + EventID: event.ID, + ActorID: event.ActorID, + Version: event.Version, + Timestamp: time.Now(), + } + + // Convert EventStored to Event for publishing (internal system event) + storedEvent := &aether.Event{ + ID: "eventstored-" + event.ID, + EventType: "EventStored", + ActorID: event.ActorID, + Version: event.Version, + Data: map[string]interface{}{ + "eventId": stored.EventID, + "actorId": stored.ActorID, + "version": stored.Version, + "timestamp": stored.Timestamp, + }, + Timestamp: stored.Timestamp, + } + + // Publish to default namespace (internal events) + es.eventBus.Publish("__internal__", storedEvent) + + // Record metrics if collector is configured + if es.metrics != nil { + es.metrics.RecordPublish("__internal__") + } +} + // GetEvents retrieves events for an actor from a specific version func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) { es.mu.RLock()