diff --git a/event.go b/event.go index 5cd53bd..afdd7db 100644 --- a/event.go +++ b/event.go @@ -166,7 +166,6 @@ func (e *Event) WithMetadataFrom(source *Event) { } } - // EventStored is an internal infrastructure event published after an event is successfully persisted. // It allows observability and trigger downstream workflows without coupling to application events. // EventStored is not published to external systems (Phase 2) - only to local EventBus subscribers. diff --git a/store/eventstored_test.go b/store/eventstored_test.go index d800ef0..f415251 100644 --- a/store/eventstored_test.go +++ b/store/eventstored_test.go @@ -24,11 +24,11 @@ func NewMockEventBroadcaster() *MockEventBroadcaster { } func (m *MockEventBroadcaster) Subscribe(namespacePattern string) <-chan *aether.Event { - return make(chan *aether.Event) + return nil } func (m *MockEventBroadcaster) SubscribeWithFilter(namespacePattern string, filter *aether.SubscriptionFilter) <-chan *aether.Event { - return make(chan *aether.Event) + return nil } func (m *MockEventBroadcaster) Unsubscribe(namespacePattern string, ch <-chan *aether.Event) {} diff --git a/store/jetstream.go b/store/jetstream.go index 71a9bc9..9e17805 100644 --- a/store/jetstream.go +++ b/store/jetstream.go @@ -235,7 +235,7 @@ func (jes *JetStreamEventStore) publishEventStored(event *aether.Event) { EventID: event.ID, ActorID: event.ActorID, Version: event.Version, - Timestamp: time.Now(), + Timestamp: event.Timestamp, } // Convert EventStored to Event for publishing (internal system event) diff --git a/store/jetstream_integration_test.go b/store/jetstream_integration_test.go index cd47769..029604b 100644 --- a/store/jetstream_integration_test.go +++ b/store/jetstream_integration_test.go @@ -1536,3 +1536,209 @@ func BenchmarkJetStreamEventStore_GetEvents(b *testing.B) { } } } + +// === JetStreamEventStore EventStored Integration Tests === + +// TestJetStreamEventStored_PublishedAfterSaveSuccess tests that EventStored is published after successful SaveEvent +func TestJetStreamEventStored_PublishedAfterSaveSuccess(t *testing.T) { + nc := getTestNATSConnection(t) + if nc == nil { + return + } + defer nc.Close() + + streamName := fmt.Sprintf("test-eventstored-%d", time.Now().UnixNano()) + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + mockBus := NewMockEventBroadcaster() + store.WithEventBus(mockBus) + + event := &aether.Event{ + ID: "evt-123", + EventType: "OrderPlaced", + ActorID: "order-456", + Version: 1, + Data: map[string]interface{}{"total": 100.50}, + Timestamp: time.Now(), + } + + err = store.SaveEvent(event) + if err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + // Verify EventStored was published + published := mockBus.GetPublishedEvents() + if len(published) != 1 { + t.Fatalf("expected 1 published event, got %d", len(published)) + } + + storedEvent := published[0] + if storedEvent.EventType != "EventStored" { + t.Errorf("expected EventType 'EventStored', got %q", storedEvent.EventType) + } + if storedEvent.ActorID != "order-456" { + t.Errorf("expected ActorID 'order-456', got %q", storedEvent.ActorID) + } + if storedEvent.Data["eventId"] != "evt-123" { + t.Errorf("expected eventId 'evt-123', got %v", storedEvent.Data["eventId"]) + } + if storedEvent.Data["version"] != int64(1) { + t.Errorf("expected version 1, got %v", storedEvent.Data["version"]) + } +} + +// TestJetStreamEventStored_PreservesOriginalTimestamp tests that EventStored preserves the original event's timestamp +func TestJetStreamEventStored_PreservesOriginalTimestamp(t *testing.T) { + nc := getTestNATSConnection(t) + if nc == nil { + return + } + defer nc.Close() + + streamName := fmt.Sprintf("test-timestamp-%d", time.Now().UnixNano()) + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + mockBus := NewMockEventBroadcaster() + store.WithEventBus(mockBus) + + // Use a fixed timestamp in the past + pastTime := time.Now().Add(-1 * time.Hour) + event := &aether.Event{ + ID: "evt-123", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: pastTime, + } + + err = store.SaveEvent(event) + if err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + published := mockBus.GetPublishedEvents() + if len(published) != 1 { + t.Fatalf("expected 1 published event, got %d", len(published)) + } + + storedEvent := published[0] + publishedTimestamp := storedEvent.Data["timestamp"].(time.Time) + + // Check timestamp is preserved exactly + if !publishedTimestamp.Equal(pastTime) { + t.Errorf("expected timestamp %v, got %v", pastTime, publishedTimestamp) + } +} + +// TestJetStreamEventStored_MultipleEventsPublished tests that multiple EventStored events are published in order +func TestJetStreamEventStored_MultipleEventsPublished(t *testing.T) { + nc := getTestNATSConnection(t) + if nc == nil { + return + } + defer nc.Close() + + streamName := fmt.Sprintf("test-multi-%d", time.Now().UnixNano()) + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + mockBus := NewMockEventBroadcaster() + store.WithEventBus(mockBus) + + // Save 5 events + for i := 1; i <= 5; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "TestEvent", + ActorID: "actor-1", + Version: int64(i), + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event); err != nil { + t.Fatalf("SaveEvent %d failed: %v", i, err) + } + } + + // Verify 5 EventStored events were published + published := mockBus.GetPublishedEvents() + if len(published) != 5 { + t.Fatalf("expected 5 published events, got %d", len(published)) + } + + // Verify each has correct data + for i := 0; i < 5; i++ { + if published[i].Data["version"] != int64(i+1) { + t.Errorf("event %d: expected version %d, got %v", i, i+1, published[i].Data["version"]) + } + } +} + +// TestJetStreamEventStored_NotPublishedOnVersionConflict tests that EventStored is not published on version conflict +func TestJetStreamEventStored_NotPublishedOnVersionConflict(t *testing.T) { + nc := getTestNATSConnection(t) + if nc == nil { + return + } + defer nc.Close() + + streamName := fmt.Sprintf("test-conflict-%d", time.Now().UnixNano()) + defer cleanupStream(nc, streamName) + + store, err := NewJetStreamEventStore(nc, streamName) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + mockBus := NewMockEventBroadcaster() + store.WithEventBus(mockBus) + + // Save first event + event1 := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + if err := store.SaveEvent(event1); err != nil { + t.Fatalf("First SaveEvent failed: %v", err) + } + + // Try to save event with same version (conflict) + event2 := &aether.Event{ + ID: "evt-2", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 1, // Same version - should conflict + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err = store.SaveEvent(event2) + if err == nil { + t.Fatal("expected VersionConflictError, got nil") + } + + // Verify only 1 EventStored was published (from first event) + published := mockBus.GetPublishedEvents() + if len(published) != 1 { + t.Fatalf("expected 1 published event after conflict, got %d", len(published)) + } +} diff --git a/store/memory.go b/store/memory.go index 7f474c1..6d80d34 100644 --- a/store/memory.go +++ b/store/memory.go @@ -2,7 +2,6 @@ package store import ( "sync" - "time" "git.flowmade.one/flowmade-one/aether" ) @@ -12,7 +11,7 @@ type InMemoryEventStore struct { mu sync.RWMutex events map[string][]*aether.Event // actorID -> events snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version) - eventBus aether.EventBroadcaster // Optional EventBus for publishing EventStored + eventBus aether.EventBroadcaster // Optional EventBus for publishing EventStored metrics aether.MetricsCollector // Optional metrics collector } @@ -69,10 +68,10 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { es.events[event.ActorID] = make([]*aether.Event, 0) } es.events[event.ActorID] = append(es.events[event.ActorID], event) - + // Publish EventStored event on success es.publishEventStored(event) - + return nil } @@ -86,7 +85,7 @@ func (es *InMemoryEventStore) publishEventStored(event *aether.Event) { EventID: event.ID, ActorID: event.ActorID, Version: event.Version, - Timestamp: time.Now(), + Timestamp: event.Timestamp, } // Convert EventStored to Event for publishing (internal system event)