diff --git a/event.go b/event.go index 94ea58c..6dccd5b 100644 --- a/event.go +++ b/event.go @@ -73,6 +73,14 @@ type Event struct { Timestamp time.Time `json:"timestamp"` } +// Common event types for Aether infrastructure +const ( + // EventTypeEventStored is an internal event published when an event is successfully persisted. + // This event allows observability components (metrics, projections, audit systems) to react + // to persisted events without coupling to application code. + EventTypeEventStored = "EventStored" +) + // Common metadata keys for distributed tracing and auditing const ( // MetadataKeyCorrelationID identifies related events across services diff --git a/store/jetstream.go b/store/jetstream.go index cc06ea6..110f412 100644 --- a/store/jetstream.go +++ b/store/jetstream.go @@ -9,6 +9,7 @@ import ( "git.flowmade.one/flowmade-one/aether" "github.com/nats-io/nats.go" + "github.com/google/uuid" ) // Default configuration values for JetStream event store @@ -64,6 +65,8 @@ type JetStreamEventStore struct { config JetStreamConfig mu sync.Mutex // Protects version checks during SaveEvent versions map[string]int64 // actorID -> latest version cache + broadcaster aether.EventBroadcaster // Optional broadcaster for EventStored events + namespace string // Optional namespace for event publishing } @@ -130,6 +133,8 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co streamName: effectiveStreamName, config: config, versions: make(map[string]int64), + broadcaster: nil, + namespace: "", }, nil } @@ -143,6 +148,58 @@ func (jes *JetStreamEventStore) GetStreamName() string { return jes.streamName } +// NewJetStreamEventStoreWithBroadcaster creates a new JetStream-based event store with broadcaster support. +// The broadcaster receives EventStored events when events are successfully saved. +func NewJetStreamEventStoreWithBroadcaster(natsConn *nats.Conn, streamName string, broadcaster aether.EventBroadcaster, namespace string) (*JetStreamEventStore, error) { + config := DefaultJetStreamConfig() + if namespace != "" { + config.Namespace = namespace + } + + js, err := natsConn.JetStream() + if err != nil { + return nil, fmt.Errorf("failed to get JetStream context: %w", err) + } + + // Apply defaults for zero values + if config.StreamRetention == 0 { + config.StreamRetention = DefaultStreamRetention + } + if config.ReplicaCount == 0 { + config.ReplicaCount = DefaultReplicaCount + } + + // Apply namespace prefix to stream name if provided + effectiveStreamName := streamName + if config.Namespace != "" { + effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName) + } + + // Create or update the stream + stream := &nats.StreamConfig{ + Name: effectiveStreamName, + Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)}, + Storage: nats.FileStorage, + Retention: nats.LimitsPolicy, + MaxAge: config.StreamRetention, + Replicas: config.ReplicaCount, + } + + _, err = js.AddStream(stream) + if err != nil && !strings.Contains(err.Error(), "already exists") { + return nil, fmt.Errorf("failed to create stream: %w", err) + } + + return &JetStreamEventStore{ + js: js, + streamName: effectiveStreamName, + config: config, + versions: make(map[string]int64), + broadcaster: broadcaster, + namespace: namespace, + }, nil +} + // SaveEvent persists an event to JetStream. // Returns VersionConflictError if the event's version is not strictly greater // than the current latest version for the actor. @@ -203,9 +260,34 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { // Update version cache after successful publish jes.versions[event.ActorID] = event.Version + // Publish EventStored event after successful save (if broadcaster is configured) + if jes.broadcaster != nil { + jes.publishEventStored(event) + } + return nil } +// publishEventStored publishes an EventStored event to the broadcaster. +// This is called after a successful SaveEvent to notify subscribers. +func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) { + eventStored := &aether.Event{ + ID: uuid.New().String(), + EventType: aether.EventTypeEventStored, + ActorID: originalEvent.ActorID, // EventStored is about the original actor + Version: originalEvent.Version, // Preserve the version of the stored event + Data: map[string]interface{}{ + "eventId": originalEvent.ID, + "actorId": originalEvent.ActorID, + "version": originalEvent.Version, + "timestamp": originalEvent.Timestamp.Unix(), + }, + Timestamp: time.Now(), + } + + jes.broadcaster.Publish(jes.namespace, eventStored) +} + // GetEvents retrieves all events for an actor since a version. // Note: This method silently skips malformed events for backward compatibility. // Use GetEventsWithErrors to receive information about malformed events. diff --git a/store/memory.go b/store/memory.go index 0fba85d..1922aaa 100644 --- a/store/memory.go +++ b/store/memory.go @@ -2,15 +2,19 @@ package store import ( "sync" + "time" "git.flowmade.one/flowmade-one/aether" + "github.com/google/uuid" ) // InMemoryEventStore provides a simple in-memory event store for testing type InMemoryEventStore struct { - mu sync.RWMutex - events map[string][]*aether.Event // actorID -> events - snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version) + mu sync.RWMutex + events map[string][]*aether.Event // actorID -> events + snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version) + broadcaster aether.EventBroadcaster // optional broadcaster for EventStored events + namespace string // optional namespace for event publishing } // NewInMemoryEventStore creates a new in-memory event store @@ -21,9 +25,21 @@ func NewInMemoryEventStore() *InMemoryEventStore { } } +// NewInMemoryEventStoreWithBroadcaster creates a new in-memory event store with an event broadcaster +// The broadcaster receives EventStored events when events are successfully saved. +func NewInMemoryEventStoreWithBroadcaster(broadcaster aether.EventBroadcaster, namespace string) *InMemoryEventStore { + return &InMemoryEventStore{ + events: make(map[string][]*aether.Event), + snapshots: make(map[string][]*aether.ActorSnapshot), + broadcaster: broadcaster, + namespace: namespace, + } +} + // SaveEvent saves an event to the in-memory store. // Returns VersionConflictError if the event's version is not strictly greater // than the current latest version for the actor. +// If a broadcaster is configured, publishes an EventStored event on success. func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { es.mu.Lock() defer es.mu.Unlock() @@ -51,9 +67,35 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { es.events[event.ActorID] = make([]*aether.Event, 0) } es.events[event.ActorID] = append(es.events[event.ActorID], event) + + // Publish EventStored event after successful save (if broadcaster is configured) + if es.broadcaster != nil { + es.publishEventStored(event) + } + return nil } +// publishEventStored publishes an EventStored event to the broadcaster. +// This is called after a successful SaveEvent to notify subscribers. +func (es *InMemoryEventStore) publishEventStored(originalEvent *aether.Event) { + eventStored := &aether.Event{ + ID: uuid.New().String(), + EventType: aether.EventTypeEventStored, + ActorID: originalEvent.ActorID, // EventStored is about the original actor + Version: originalEvent.Version, // Preserve the version of the stored event + Data: map[string]interface{}{ + "eventId": originalEvent.ID, + "actorId": originalEvent.ActorID, + "version": originalEvent.Version, + "timestamp": originalEvent.Timestamp.Unix(), + }, + Timestamp: time.Now(), + } + + es.broadcaster.Publish(es.namespace, eventStored) +} + // GetEvents retrieves events for an actor from a specific version func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) { es.mu.RLock() diff --git a/store/memory_test.go b/store/memory_test.go index 8585379..9f5855f 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -1905,3 +1905,181 @@ func TestSaveEvent_MetadataPreservedAcrossMultipleEvents(t *testing.T) { } } } + +// === EventStored Publishing Tests === + +func TestSaveEvent_WithBroadcaster_PublishesEventStored(t *testing.T) { + // Create a mock broadcaster to capture published events + broadcaster := aether.NewEventBus() + store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace") + + // Subscribe to EventStored events + ch := broadcaster.Subscribe("test-namespace") + defer broadcaster.Unsubscribe("test-namespace", ch) + + event := &aether.Event{ + ID: "evt-123", + EventType: "OrderPlaced", + ActorID: "order-456", + Version: 1, + Data: map[string]interface{}{ + "total": 100.50, + }, + Timestamp: time.Now(), + } + + // Save event + err := store.SaveEvent(event) + if err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + // Check if EventStored was published + select { + case publishedEvent := <-ch: + if publishedEvent == nil { + t.Fatal("received nil event from broadcaster") + } + if publishedEvent.EventType != aether.EventTypeEventStored { + t.Errorf("expected EventType %q, got %q", aether.EventTypeEventStored, publishedEvent.EventType) + } + if publishedEvent.ActorID != "order-456" { + t.Errorf("expected ActorID %q, got %q", "order-456", publishedEvent.ActorID) + } + if publishedEvent.Version != 1 { + t.Errorf("expected Version 1, got %d", publishedEvent.Version) + } + // Check data contains original event info + if publishedEvent.Data["eventId"] != "evt-123" { + t.Errorf("expected eventId %q, got %q", "evt-123", publishedEvent.Data["eventId"]) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout waiting for EventStored event") + } +} + +func TestSaveEvent_VersionConflict_NoEventStored(t *testing.T) { + broadcaster := aether.NewEventBus() + store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace") + + // Subscribe to EventStored events + ch := broadcaster.Subscribe("test-namespace") + defer broadcaster.Unsubscribe("test-namespace", ch) + + // Save first event + event1 := &aether.Event{ + ID: "evt-1", + EventType: "OrderPlaced", + ActorID: "order-456", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + + err := store.SaveEvent(event1) + if err != nil { + t.Fatalf("SaveEvent(event1) failed: %v", err) + } + + // Drain the first EventStored event + select { + case <-ch: + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout waiting for first EventStored event") + } + + // Try to save event with non-increasing version (should fail) + event2 := &aether.Event{ + ID: "evt-2", + EventType: "OrderPlaced", + ActorID: "order-456", + Version: 1, // Same version, should conflict + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + + err = store.SaveEvent(event2) + if !errors.Is(err, aether.ErrVersionConflict) { + t.Fatalf("expected ErrVersionConflict, got %v", err) + } + + // Verify no EventStored event was published + select { + case <-ch: + t.Fatal("expected no EventStored event, but received one") + case <-time.After(50 * time.Millisecond): + // Expected - no event published + } +} + +func TestSaveEvent_MultipleEvents_PublishesMultipleEventStored(t *testing.T) { + broadcaster := aether.NewEventBus() + store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace") + + // Subscribe to EventStored events + ch := broadcaster.Subscribe("test-namespace") + defer broadcaster.Unsubscribe("test-namespace", ch) + + // Save multiple events + for i := int64(1); i <= 3; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "OrderPlaced", + ActorID: "order-456", + Version: i, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + + err := store.SaveEvent(event) + if err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + } + + // Verify we received 3 EventStored events in order + for i := int64(1); i <= 3; i++ { + select { + case publishedEvent := <-ch: + if publishedEvent == nil { + t.Fatal("received nil event from broadcaster") + } + if publishedEvent.Version != i { + t.Errorf("expected Version %d, got %d", i, publishedEvent.Version) + } + case <-time.After(100 * time.Millisecond): + t.Fatalf("timeout waiting for EventStored event %d", i) + } + } +} + +func TestSaveEvent_WithoutBroadcaster_NoPanic(t *testing.T) { + // Test that SaveEvent works without a broadcaster (nil broadcaster) + store := NewInMemoryEventStore() + + event := &aether.Event{ + ID: "evt-123", + EventType: "OrderPlaced", + ActorID: "order-456", + Version: 1, + Data: map[string]interface{}{ + "total": 100.50, + }, + Timestamp: time.Now(), + } + + // This should not panic even though broadcaster is nil + err := store.SaveEvent(event) + if err != nil { + t.Fatalf("SaveEvent failed: %v", err) + } + + // Verify event was saved + events, err := store.GetEvents("order-456", 0) + if err != nil { + t.Fatalf("GetEvents failed: %v", err) + } + if len(events) != 1 { + t.Fatalf("expected 1 event, got %d", len(events)) + } +}