feat(event-sourcing): Publish EventStored after successful SaveEvent #135
8
event.go
8
event.go
@@ -73,6 +73,14 @@ type Event struct {
|
|||||||
Timestamp time.Time `json:"timestamp"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Common event types for Aether infrastructure
|
||||||
|
const (
|
||||||
|
// EventTypeEventStored is an internal event published when an event is successfully persisted.
|
||||||
|
// This event allows observability components (metrics, projections, audit systems) to react
|
||||||
|
// to persisted events without coupling to application code.
|
||||||
|
EventTypeEventStored = "EventStored"
|
||||||
|
)
|
||||||
|
|
||||||
// Common metadata keys for distributed tracing and auditing
|
// Common metadata keys for distributed tracing and auditing
|
||||||
const (
|
const (
|
||||||
// MetadataKeyCorrelationID identifies related events across services
|
// MetadataKeyCorrelationID identifies related events across services
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"git.flowmade.one/flowmade-one/aether"
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Default configuration values for JetStream event store
|
// Default configuration values for JetStream event store
|
||||||
@@ -64,6 +65,8 @@ type JetStreamEventStore struct {
|
|||||||
config JetStreamConfig
|
config JetStreamConfig
|
||||||
mu sync.Mutex // Protects version checks during SaveEvent
|
mu sync.Mutex // Protects version checks during SaveEvent
|
||||||
versions map[string]int64 // actorID -> latest version cache
|
versions map[string]int64 // actorID -> latest version cache
|
||||||
|
broadcaster aether.EventBroadcaster // Optional broadcaster for EventStored events
|
||||||
|
namespace string // Optional namespace for event publishing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -130,6 +133,8 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
|
|||||||
streamName: effectiveStreamName,
|
streamName: effectiveStreamName,
|
||||||
config: config,
|
config: config,
|
||||||
versions: make(map[string]int64),
|
versions: make(map[string]int64),
|
||||||
|
broadcaster: nil,
|
||||||
|
namespace: "",
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -143,6 +148,58 @@ func (jes *JetStreamEventStore) GetStreamName() string {
|
|||||||
return jes.streamName
|
return jes.streamName
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewJetStreamEventStoreWithBroadcaster creates a new JetStream-based event store with broadcaster support.
|
||||||
|
// The broadcaster receives EventStored events when events are successfully saved.
|
||||||
|
func NewJetStreamEventStoreWithBroadcaster(natsConn *nats.Conn, streamName string, broadcaster aether.EventBroadcaster, namespace string) (*JetStreamEventStore, error) {
|
||||||
|
config := DefaultJetStreamConfig()
|
||||||
|
if namespace != "" {
|
||||||
|
config.Namespace = namespace
|
||||||
|
}
|
||||||
|
|
||||||
|
js, err := natsConn.JetStream()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply defaults for zero values
|
||||||
|
if config.StreamRetention == 0 {
|
||||||
|
config.StreamRetention = DefaultStreamRetention
|
||||||
|
}
|
||||||
|
if config.ReplicaCount == 0 {
|
||||||
|
config.ReplicaCount = DefaultReplicaCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply namespace prefix to stream name if provided
|
||||||
|
effectiveStreamName := streamName
|
||||||
|
if config.Namespace != "" {
|
||||||
|
effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create or update the stream
|
||||||
|
stream := &nats.StreamConfig{
|
||||||
|
Name: effectiveStreamName,
|
||||||
|
Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)},
|
||||||
|
Storage: nats.FileStorage,
|
||||||
|
Retention: nats.LimitsPolicy,
|
||||||
|
MaxAge: config.StreamRetention,
|
||||||
|
Replicas: config.ReplicaCount,
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = js.AddStream(stream)
|
||||||
|
if err != nil && !strings.Contains(err.Error(), "already exists") {
|
||||||
|
return nil, fmt.Errorf("failed to create stream: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &JetStreamEventStore{
|
||||||
|
js: js,
|
||||||
|
streamName: effectiveStreamName,
|
||||||
|
config: config,
|
||||||
|
versions: make(map[string]int64),
|
||||||
|
broadcaster: broadcaster,
|
||||||
|
namespace: namespace,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// SaveEvent persists an event to JetStream.
|
// SaveEvent persists an event to JetStream.
|
||||||
// Returns VersionConflictError if the event's version is not strictly greater
|
// Returns VersionConflictError if the event's version is not strictly greater
|
||||||
// than the current latest version for the actor.
|
// than the current latest version for the actor.
|
||||||
@@ -203,9 +260,34 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
|||||||
// Update version cache after successful publish
|
// Update version cache after successful publish
|
||||||
jes.versions[event.ActorID] = event.Version
|
jes.versions[event.ActorID] = event.Version
|
||||||
|
|
||||||
|
// Publish EventStored event after successful save (if broadcaster is configured)
|
||||||
|
if jes.broadcaster != nil {
|
||||||
|
jes.publishEventStored(event)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// publishEventStored publishes an EventStored event to the broadcaster.
|
||||||
|
// This is called after a successful SaveEvent to notify subscribers.
|
||||||
|
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
|
||||||
|
eventStored := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: aether.EventTypeEventStored,
|
||||||
|
ActorID: originalEvent.ActorID, // EventStored is about the original actor
|
||||||
|
Version: originalEvent.Version, // Preserve the version of the stored event
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"eventId": originalEvent.ID,
|
||||||
|
"actorId": originalEvent.ActorID,
|
||||||
|
"version": originalEvent.Version,
|
||||||
|
"timestamp": originalEvent.Timestamp.Unix(),
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
jes.broadcaster.Publish(jes.namespace, eventStored)
|
||||||
|
}
|
||||||
|
|
||||||
// GetEvents retrieves all events for an actor since a version.
|
// GetEvents retrieves all events for an actor since a version.
|
||||||
// Note: This method silently skips malformed events for backward compatibility.
|
// Note: This method silently skips malformed events for backward compatibility.
|
||||||
// Use GetEventsWithErrors to receive information about malformed events.
|
// Use GetEventsWithErrors to receive information about malformed events.
|
||||||
|
|||||||
@@ -2,15 +2,19 @@ package store
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.flowmade.one/flowmade-one/aether"
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
// InMemoryEventStore provides a simple in-memory event store for testing
|
// InMemoryEventStore provides a simple in-memory event store for testing
|
||||||
type InMemoryEventStore struct {
|
type InMemoryEventStore struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
events map[string][]*aether.Event // actorID -> events
|
events map[string][]*aether.Event // actorID -> events
|
||||||
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
||||||
|
broadcaster aether.EventBroadcaster // optional broadcaster for EventStored events
|
||||||
|
namespace string // optional namespace for event publishing
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewInMemoryEventStore creates a new in-memory event store
|
// NewInMemoryEventStore creates a new in-memory event store
|
||||||
@@ -21,9 +25,21 @@ func NewInMemoryEventStore() *InMemoryEventStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewInMemoryEventStoreWithBroadcaster creates a new in-memory event store with an event broadcaster
|
||||||
|
// The broadcaster receives EventStored events when events are successfully saved.
|
||||||
|
func NewInMemoryEventStoreWithBroadcaster(broadcaster aether.EventBroadcaster, namespace string) *InMemoryEventStore {
|
||||||
|
return &InMemoryEventStore{
|
||||||
|
events: make(map[string][]*aether.Event),
|
||||||
|
snapshots: make(map[string][]*aether.ActorSnapshot),
|
||||||
|
broadcaster: broadcaster,
|
||||||
|
namespace: namespace,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SaveEvent saves an event to the in-memory store.
|
// SaveEvent saves an event to the in-memory store.
|
||||||
// Returns VersionConflictError if the event's version is not strictly greater
|
// Returns VersionConflictError if the event's version is not strictly greater
|
||||||
// than the current latest version for the actor.
|
// than the current latest version for the actor.
|
||||||
|
// If a broadcaster is configured, publishes an EventStored event on success.
|
||||||
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||||
es.mu.Lock()
|
es.mu.Lock()
|
||||||
defer es.mu.Unlock()
|
defer es.mu.Unlock()
|
||||||
@@ -51,9 +67,35 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
|||||||
es.events[event.ActorID] = make([]*aether.Event, 0)
|
es.events[event.ActorID] = make([]*aether.Event, 0)
|
||||||
}
|
}
|
||||||
es.events[event.ActorID] = append(es.events[event.ActorID], event)
|
es.events[event.ActorID] = append(es.events[event.ActorID], event)
|
||||||
|
|
||||||
|
// Publish EventStored event after successful save (if broadcaster is configured)
|
||||||
|
if es.broadcaster != nil {
|
||||||
|
es.publishEventStored(event)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// publishEventStored publishes an EventStored event to the broadcaster.
|
||||||
|
// This is called after a successful SaveEvent to notify subscribers.
|
||||||
|
func (es *InMemoryEventStore) publishEventStored(originalEvent *aether.Event) {
|
||||||
|
eventStored := &aether.Event{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
EventType: aether.EventTypeEventStored,
|
||||||
|
ActorID: originalEvent.ActorID, // EventStored is about the original actor
|
||||||
|
Version: originalEvent.Version, // Preserve the version of the stored event
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"eventId": originalEvent.ID,
|
||||||
|
"actorId": originalEvent.ActorID,
|
||||||
|
"version": originalEvent.Version,
|
||||||
|
"timestamp": originalEvent.Timestamp.Unix(),
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
es.broadcaster.Publish(es.namespace, eventStored)
|
||||||
|
}
|
||||||
|
|
||||||
// GetEvents retrieves events for an actor from a specific version
|
// GetEvents retrieves events for an actor from a specific version
|
||||||
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||||
es.mu.RLock()
|
es.mu.RLock()
|
||||||
|
|||||||
@@ -1905,3 +1905,181 @@ func TestSaveEvent_MetadataPreservedAcrossMultipleEvents(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// === EventStored Publishing Tests ===
|
||||||
|
|
||||||
|
func TestSaveEvent_WithBroadcaster_PublishesEventStored(t *testing.T) {
|
||||||
|
// Create a mock broadcaster to capture published events
|
||||||
|
broadcaster := aether.NewEventBus()
|
||||||
|
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||||
|
|
||||||
|
// Subscribe to EventStored events
|
||||||
|
ch := broadcaster.Subscribe("test-namespace")
|
||||||
|
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: "evt-123",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"total": 100.50,
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save event
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if EventStored was published
|
||||||
|
select {
|
||||||
|
case publishedEvent := <-ch:
|
||||||
|
if publishedEvent == nil {
|
||||||
|
t.Fatal("received nil event from broadcaster")
|
||||||
|
}
|
||||||
|
if publishedEvent.EventType != aether.EventTypeEventStored {
|
||||||
|
t.Errorf("expected EventType %q, got %q", aether.EventTypeEventStored, publishedEvent.EventType)
|
||||||
|
}
|
||||||
|
if publishedEvent.ActorID != "order-456" {
|
||||||
|
t.Errorf("expected ActorID %q, got %q", "order-456", publishedEvent.ActorID)
|
||||||
|
}
|
||||||
|
if publishedEvent.Version != 1 {
|
||||||
|
t.Errorf("expected Version 1, got %d", publishedEvent.Version)
|
||||||
|
}
|
||||||
|
// Check data contains original event info
|
||||||
|
if publishedEvent.Data["eventId"] != "evt-123" {
|
||||||
|
t.Errorf("expected eventId %q, got %q", "evt-123", publishedEvent.Data["eventId"])
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("timeout waiting for EventStored event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSaveEvent_VersionConflict_NoEventStored(t *testing.T) {
|
||||||
|
broadcaster := aether.NewEventBus()
|
||||||
|
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||||
|
|
||||||
|
// Subscribe to EventStored events
|
||||||
|
ch := broadcaster.Subscribe("test-namespace")
|
||||||
|
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||||
|
|
||||||
|
// Save first event
|
||||||
|
event1 := &aether.Event{
|
||||||
|
ID: "evt-1",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(event1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent(event1) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drain the first EventStored event
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("timeout waiting for first EventStored event")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to save event with non-increasing version (should fail)
|
||||||
|
event2 := &aether.Event{
|
||||||
|
ID: "evt-2",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 1, // Same version, should conflict
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event2)
|
||||||
|
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||||
|
t.Fatalf("expected ErrVersionConflict, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify no EventStored event was published
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
t.Fatal("expected no EventStored event, but received one")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
// Expected - no event published
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSaveEvent_MultipleEvents_PublishesMultipleEventStored(t *testing.T) {
|
||||||
|
broadcaster := aether.NewEventBus()
|
||||||
|
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||||
|
|
||||||
|
// Subscribe to EventStored events
|
||||||
|
ch := broadcaster.Subscribe("test-namespace")
|
||||||
|
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||||
|
|
||||||
|
// Save multiple events
|
||||||
|
for i := int64(1); i <= 3; i++ {
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: fmt.Sprintf("evt-%d", i),
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: i,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we received 3 EventStored events in order
|
||||||
|
for i := int64(1); i <= 3; i++ {
|
||||||
|
select {
|
||||||
|
case publishedEvent := <-ch:
|
||||||
|
if publishedEvent == nil {
|
||||||
|
t.Fatal("received nil event from broadcaster")
|
||||||
|
}
|
||||||
|
if publishedEvent.Version != i {
|
||||||
|
t.Errorf("expected Version %d, got %d", i, publishedEvent.Version)
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatalf("timeout waiting for EventStored event %d", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSaveEvent_WithoutBroadcaster_NoPanic(t *testing.T) {
|
||||||
|
// Test that SaveEvent works without a broadcaster (nil broadcaster)
|
||||||
|
store := NewInMemoryEventStore()
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: "evt-123",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"total": 100.50,
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should not panic even though broadcaster is nil
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify event was saved
|
||||||
|
events, err := store.GetEvents("order-456", 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents failed: %v", err)
|
||||||
|
}
|
||||||
|
if len(events) != 1 {
|
||||||
|
t.Fatalf("expected 1 event, got %d", len(events))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user