feat(event-sourcing): Publish EventStored after successful SaveEvent
Add EventStored internal event published to the EventBus when events are successfully persisted. This allows observability components (metrics, projections, audit systems) to react to persisted events without coupling to application code. Implementation: - Add EventTypeEventStored constant to define the event type - Update InMemoryEventStore with optional EventBroadcaster support - Add NewInMemoryEventStoreWithBroadcaster constructor - Update JetStreamEventStore with EventBroadcaster support - Add NewJetStreamEventStoreWithBroadcaster constructor - Implement publishEventStored() helper method - Publish EventStored containing EventID, ActorID, Version, Timestamp - Only publish on successful SaveEvent (not on version conflicts) - Automatically recorded in metrics through normal Publish flow Test coverage: - EventStored published after successful SaveEvent - No EventStored published on version conflict - Multiple EventStored events published in order - SaveEvent works correctly without broadcaster (nil-safe) Closes #61 Co-Authored-By: Claude Code <noreply@anthropic.com>
This commit was merged in pull request #135.
This commit is contained in:
8
event.go
8
event.go
@@ -73,6 +73,14 @@ type Event struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
// Common event types for Aether infrastructure
|
||||
const (
|
||||
// EventTypeEventStored is an internal event published when an event is successfully persisted.
|
||||
// This event allows observability components (metrics, projections, audit systems) to react
|
||||
// to persisted events without coupling to application code.
|
||||
EventTypeEventStored = "EventStored"
|
||||
)
|
||||
|
||||
// Common metadata keys for distributed tracing and auditing
|
||||
const (
|
||||
// MetadataKeyCorrelationID identifies related events across services
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Default configuration values for JetStream event store
|
||||
@@ -64,6 +65,8 @@ type JetStreamEventStore struct {
|
||||
config JetStreamConfig
|
||||
mu sync.Mutex // Protects version checks during SaveEvent
|
||||
versions map[string]int64 // actorID -> latest version cache
|
||||
broadcaster aether.EventBroadcaster // Optional broadcaster for EventStored events
|
||||
namespace string // Optional namespace for event publishing
|
||||
}
|
||||
|
||||
|
||||
@@ -130,6 +133,8 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
|
||||
streamName: effectiveStreamName,
|
||||
config: config,
|
||||
versions: make(map[string]int64),
|
||||
broadcaster: nil,
|
||||
namespace: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -143,6 +148,58 @@ func (jes *JetStreamEventStore) GetStreamName() string {
|
||||
return jes.streamName
|
||||
}
|
||||
|
||||
// NewJetStreamEventStoreWithBroadcaster creates a new JetStream-based event store with broadcaster support.
|
||||
// The broadcaster receives EventStored events when events are successfully saved.
|
||||
func NewJetStreamEventStoreWithBroadcaster(natsConn *nats.Conn, streamName string, broadcaster aether.EventBroadcaster, namespace string) (*JetStreamEventStore, error) {
|
||||
config := DefaultJetStreamConfig()
|
||||
if namespace != "" {
|
||||
config.Namespace = namespace
|
||||
}
|
||||
|
||||
js, err := natsConn.JetStream()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
|
||||
}
|
||||
|
||||
// Apply defaults for zero values
|
||||
if config.StreamRetention == 0 {
|
||||
config.StreamRetention = DefaultStreamRetention
|
||||
}
|
||||
if config.ReplicaCount == 0 {
|
||||
config.ReplicaCount = DefaultReplicaCount
|
||||
}
|
||||
|
||||
// Apply namespace prefix to stream name if provided
|
||||
effectiveStreamName := streamName
|
||||
if config.Namespace != "" {
|
||||
effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName)
|
||||
}
|
||||
|
||||
// Create or update the stream
|
||||
stream := &nats.StreamConfig{
|
||||
Name: effectiveStreamName,
|
||||
Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)},
|
||||
Storage: nats.FileStorage,
|
||||
Retention: nats.LimitsPolicy,
|
||||
MaxAge: config.StreamRetention,
|
||||
Replicas: config.ReplicaCount,
|
||||
}
|
||||
|
||||
_, err = js.AddStream(stream)
|
||||
if err != nil && !strings.Contains(err.Error(), "already exists") {
|
||||
return nil, fmt.Errorf("failed to create stream: %w", err)
|
||||
}
|
||||
|
||||
return &JetStreamEventStore{
|
||||
js: js,
|
||||
streamName: effectiveStreamName,
|
||||
config: config,
|
||||
versions: make(map[string]int64),
|
||||
broadcaster: broadcaster,
|
||||
namespace: namespace,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SaveEvent persists an event to JetStream.
|
||||
// Returns VersionConflictError if the event's version is not strictly greater
|
||||
// than the current latest version for the actor.
|
||||
@@ -203,9 +260,34 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
||||
// Update version cache after successful publish
|
||||
jes.versions[event.ActorID] = event.Version
|
||||
|
||||
// Publish EventStored event after successful save (if broadcaster is configured)
|
||||
if jes.broadcaster != nil {
|
||||
jes.publishEventStored(event)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// publishEventStored publishes an EventStored event to the broadcaster.
|
||||
// This is called after a successful SaveEvent to notify subscribers.
|
||||
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
|
||||
eventStored := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
EventType: aether.EventTypeEventStored,
|
||||
ActorID: originalEvent.ActorID, // EventStored is about the original actor
|
||||
Version: originalEvent.Version, // Preserve the version of the stored event
|
||||
Data: map[string]interface{}{
|
||||
"eventId": originalEvent.ID,
|
||||
"actorId": originalEvent.ActorID,
|
||||
"version": originalEvent.Version,
|
||||
"timestamp": originalEvent.Timestamp.Unix(),
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
jes.broadcaster.Publish(jes.namespace, eventStored)
|
||||
}
|
||||
|
||||
// GetEvents retrieves all events for an actor since a version.
|
||||
// Note: This method silently skips malformed events for backward compatibility.
|
||||
// Use GetEventsWithErrors to receive information about malformed events.
|
||||
|
||||
@@ -2,15 +2,19 @@ package store
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// InMemoryEventStore provides a simple in-memory event store for testing
|
||||
type InMemoryEventStore struct {
|
||||
mu sync.RWMutex
|
||||
events map[string][]*aether.Event // actorID -> events
|
||||
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
||||
mu sync.RWMutex
|
||||
events map[string][]*aether.Event // actorID -> events
|
||||
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
||||
broadcaster aether.EventBroadcaster // optional broadcaster for EventStored events
|
||||
namespace string // optional namespace for event publishing
|
||||
}
|
||||
|
||||
// NewInMemoryEventStore creates a new in-memory event store
|
||||
@@ -21,9 +25,21 @@ func NewInMemoryEventStore() *InMemoryEventStore {
|
||||
}
|
||||
}
|
||||
|
||||
// NewInMemoryEventStoreWithBroadcaster creates a new in-memory event store with an event broadcaster
|
||||
// The broadcaster receives EventStored events when events are successfully saved.
|
||||
func NewInMemoryEventStoreWithBroadcaster(broadcaster aether.EventBroadcaster, namespace string) *InMemoryEventStore {
|
||||
return &InMemoryEventStore{
|
||||
events: make(map[string][]*aether.Event),
|
||||
snapshots: make(map[string][]*aether.ActorSnapshot),
|
||||
broadcaster: broadcaster,
|
||||
namespace: namespace,
|
||||
}
|
||||
}
|
||||
|
||||
// SaveEvent saves an event to the in-memory store.
|
||||
// Returns VersionConflictError if the event's version is not strictly greater
|
||||
// than the current latest version for the actor.
|
||||
// If a broadcaster is configured, publishes an EventStored event on success.
|
||||
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||
es.mu.Lock()
|
||||
defer es.mu.Unlock()
|
||||
@@ -51,9 +67,35 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||
es.events[event.ActorID] = make([]*aether.Event, 0)
|
||||
}
|
||||
es.events[event.ActorID] = append(es.events[event.ActorID], event)
|
||||
|
||||
// Publish EventStored event after successful save (if broadcaster is configured)
|
||||
if es.broadcaster != nil {
|
||||
es.publishEventStored(event)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// publishEventStored publishes an EventStored event to the broadcaster.
|
||||
// This is called after a successful SaveEvent to notify subscribers.
|
||||
func (es *InMemoryEventStore) publishEventStored(originalEvent *aether.Event) {
|
||||
eventStored := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
EventType: aether.EventTypeEventStored,
|
||||
ActorID: originalEvent.ActorID, // EventStored is about the original actor
|
||||
Version: originalEvent.Version, // Preserve the version of the stored event
|
||||
Data: map[string]interface{}{
|
||||
"eventId": originalEvent.ID,
|
||||
"actorId": originalEvent.ActorID,
|
||||
"version": originalEvent.Version,
|
||||
"timestamp": originalEvent.Timestamp.Unix(),
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
es.broadcaster.Publish(es.namespace, eventStored)
|
||||
}
|
||||
|
||||
// GetEvents retrieves events for an actor from a specific version
|
||||
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||
es.mu.RLock()
|
||||
|
||||
@@ -1905,3 +1905,181 @@ func TestSaveEvent_MetadataPreservedAcrossMultipleEvents(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === EventStored Publishing Tests ===
|
||||
|
||||
func TestSaveEvent_WithBroadcaster_PublishesEventStored(t *testing.T) {
|
||||
// Create a mock broadcaster to capture published events
|
||||
broadcaster := aether.NewEventBus()
|
||||
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||
|
||||
// Subscribe to EventStored events
|
||||
ch := broadcaster.Subscribe("test-namespace")
|
||||
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-123",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{
|
||||
"total": 100.50,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Save event
|
||||
err := store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Check if EventStored was published
|
||||
select {
|
||||
case publishedEvent := <-ch:
|
||||
if publishedEvent == nil {
|
||||
t.Fatal("received nil event from broadcaster")
|
||||
}
|
||||
if publishedEvent.EventType != aether.EventTypeEventStored {
|
||||
t.Errorf("expected EventType %q, got %q", aether.EventTypeEventStored, publishedEvent.EventType)
|
||||
}
|
||||
if publishedEvent.ActorID != "order-456" {
|
||||
t.Errorf("expected ActorID %q, got %q", "order-456", publishedEvent.ActorID)
|
||||
}
|
||||
if publishedEvent.Version != 1 {
|
||||
t.Errorf("expected Version 1, got %d", publishedEvent.Version)
|
||||
}
|
||||
// Check data contains original event info
|
||||
if publishedEvent.Data["eventId"] != "evt-123" {
|
||||
t.Errorf("expected eventId %q, got %q", "evt-123", publishedEvent.Data["eventId"])
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("timeout waiting for EventStored event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_VersionConflict_NoEventStored(t *testing.T) {
|
||||
broadcaster := aether.NewEventBus()
|
||||
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||
|
||||
// Subscribe to EventStored events
|
||||
ch := broadcaster.Subscribe("test-namespace")
|
||||
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||
|
||||
// Save first event
|
||||
event1 := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err := store.SaveEvent(event1)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent(event1) failed: %v", err)
|
||||
}
|
||||
|
||||
// Drain the first EventStored event
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("timeout waiting for first EventStored event")
|
||||
}
|
||||
|
||||
// Try to save event with non-increasing version (should fail)
|
||||
event2 := &aether.Event{
|
||||
ID: "evt-2",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1, // Same version, should conflict
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = store.SaveEvent(event2)
|
||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
t.Fatalf("expected ErrVersionConflict, got %v", err)
|
||||
}
|
||||
|
||||
// Verify no EventStored event was published
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatal("expected no EventStored event, but received one")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected - no event published
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_MultipleEvents_PublishesMultipleEventStored(t *testing.T) {
|
||||
broadcaster := aether.NewEventBus()
|
||||
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||
|
||||
// Subscribe to EventStored events
|
||||
ch := broadcaster.Subscribe("test-namespace")
|
||||
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||
|
||||
// Save multiple events
|
||||
for i := int64(1); i <= 3; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: i,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err := store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify we received 3 EventStored events in order
|
||||
for i := int64(1); i <= 3; i++ {
|
||||
select {
|
||||
case publishedEvent := <-ch:
|
||||
if publishedEvent == nil {
|
||||
t.Fatal("received nil event from broadcaster")
|
||||
}
|
||||
if publishedEvent.Version != i {
|
||||
t.Errorf("expected Version %d, got %d", i, publishedEvent.Version)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatalf("timeout waiting for EventStored event %d", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_WithoutBroadcaster_NoPanic(t *testing.T) {
|
||||
// Test that SaveEvent works without a broadcaster (nil broadcaster)
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-123",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{
|
||||
"total": 100.50,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// This should not panic even though broadcaster is nil
|
||||
err := store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify event was saved
|
||||
events, err := store.GetEvents("order-456", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected 1 event, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user