feat(event-sourcing): Publish EventStored after successful SaveEvent #135
8
event.go
8
event.go
@@ -73,6 +73,14 @@ type Event struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
// Common event types for Aether infrastructure
|
||||
const (
|
||||
// EventTypeEventStored is an internal event published when an event is successfully persisted.
|
||||
// This event allows observability components (metrics, projections, audit systems) to react
|
||||
// to persisted events without coupling to application code.
|
||||
EventTypeEventStored = "EventStored"
|
||||
)
|
||||
|
||||
// Common metadata keys for distributed tracing and auditing
|
||||
const (
|
||||
// MetadataKeyCorrelationID identifies related events across services
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Default configuration values for JetStream event store
|
||||
@@ -64,6 +65,8 @@ type JetStreamEventStore struct {
|
||||
config JetStreamConfig
|
||||
mu sync.Mutex // Protects version checks during SaveEvent
|
||||
versions map[string]int64 // actorID -> latest version cache
|
||||
broadcaster aether.EventBroadcaster // Optional broadcaster for EventStored events
|
||||
namespace string // Optional namespace for event publishing
|
||||
}
|
||||
|
||||
|
||||
@@ -130,6 +133,8 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
|
||||
streamName: effectiveStreamName,
|
||||
config: config,
|
||||
versions: make(map[string]int64),
|
||||
broadcaster: nil,
|
||||
namespace: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -143,6 +148,58 @@ func (jes *JetStreamEventStore) GetStreamName() string {
|
||||
return jes.streamName
|
||||
}
|
||||
|
||||
// NewJetStreamEventStoreWithBroadcaster creates a new JetStream-based event store with broadcaster support.
|
||||
// The broadcaster receives EventStored events when events are successfully saved.
|
||||
func NewJetStreamEventStoreWithBroadcaster(natsConn *nats.Conn, streamName string, broadcaster aether.EventBroadcaster, namespace string) (*JetStreamEventStore, error) {
|
||||
config := DefaultJetStreamConfig()
|
||||
if namespace != "" {
|
||||
config.Namespace = namespace
|
||||
}
|
||||
|
||||
js, err := natsConn.JetStream()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
|
||||
}
|
||||
|
||||
// Apply defaults for zero values
|
||||
if config.StreamRetention == 0 {
|
||||
config.StreamRetention = DefaultStreamRetention
|
||||
}
|
||||
if config.ReplicaCount == 0 {
|
||||
config.ReplicaCount = DefaultReplicaCount
|
||||
}
|
||||
|
||||
// Apply namespace prefix to stream name if provided
|
||||
effectiveStreamName := streamName
|
||||
if config.Namespace != "" {
|
||||
effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName)
|
||||
}
|
||||
|
||||
// Create or update the stream
|
||||
stream := &nats.StreamConfig{
|
||||
Name: effectiveStreamName,
|
||||
Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)},
|
||||
Storage: nats.FileStorage,
|
||||
Retention: nats.LimitsPolicy,
|
||||
MaxAge: config.StreamRetention,
|
||||
Replicas: config.ReplicaCount,
|
||||
}
|
||||
|
||||
_, err = js.AddStream(stream)
|
||||
if err != nil && !strings.Contains(err.Error(), "already exists") {
|
||||
return nil, fmt.Errorf("failed to create stream: %w", err)
|
||||
}
|
||||
|
||||
return &JetStreamEventStore{
|
||||
js: js,
|
||||
streamName: effectiveStreamName,
|
||||
config: config,
|
||||
versions: make(map[string]int64),
|
||||
broadcaster: broadcaster,
|
||||
namespace: namespace,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SaveEvent persists an event to JetStream.
|
||||
// Returns VersionConflictError if the event's version is not strictly greater
|
||||
// than the current latest version for the actor.
|
||||
@@ -203,9 +260,34 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
||||
// Update version cache after successful publish
|
||||
jes.versions[event.ActorID] = event.Version
|
||||
|
||||
// Publish EventStored event after successful save (if broadcaster is configured)
|
||||
if jes.broadcaster != nil {
|
||||
jes.publishEventStored(event)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// publishEventStored publishes an EventStored event to the broadcaster.
|
||||
// This is called after a successful SaveEvent to notify subscribers.
|
||||
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
|
||||
eventStored := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
EventType: aether.EventTypeEventStored,
|
||||
ActorID: originalEvent.ActorID, // EventStored is about the original actor
|
||||
Version: originalEvent.Version, // Preserve the version of the stored event
|
||||
Data: map[string]interface{}{
|
||||
"eventId": originalEvent.ID,
|
||||
"actorId": originalEvent.ActorID,
|
||||
"version": originalEvent.Version,
|
||||
"timestamp": originalEvent.Timestamp.Unix(),
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
jes.broadcaster.Publish(jes.namespace, eventStored)
|
||||
}
|
||||
|
||||
// GetEvents retrieves all events for an actor since a version.
|
||||
// Note: This method silently skips malformed events for backward compatibility.
|
||||
// Use GetEventsWithErrors to receive information about malformed events.
|
||||
|
||||
@@ -2,15 +2,19 @@ package store
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// InMemoryEventStore provides a simple in-memory event store for testing
|
||||
type InMemoryEventStore struct {
|
||||
mu sync.RWMutex
|
||||
events map[string][]*aether.Event // actorID -> events
|
||||
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
||||
mu sync.RWMutex
|
||||
events map[string][]*aether.Event // actorID -> events
|
||||
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
||||
broadcaster aether.EventBroadcaster // optional broadcaster for EventStored events
|
||||
namespace string // optional namespace for event publishing
|
||||
}
|
||||
|
||||
// NewInMemoryEventStore creates a new in-memory event store
|
||||
@@ -21,9 +25,21 @@ func NewInMemoryEventStore() *InMemoryEventStore {
|
||||
}
|
||||
}
|
||||
|
||||
// NewInMemoryEventStoreWithBroadcaster creates a new in-memory event store with an event broadcaster
|
||||
// The broadcaster receives EventStored events when events are successfully saved.
|
||||
func NewInMemoryEventStoreWithBroadcaster(broadcaster aether.EventBroadcaster, namespace string) *InMemoryEventStore {
|
||||
return &InMemoryEventStore{
|
||||
events: make(map[string][]*aether.Event),
|
||||
snapshots: make(map[string][]*aether.ActorSnapshot),
|
||||
broadcaster: broadcaster,
|
||||
namespace: namespace,
|
||||
}
|
||||
}
|
||||
|
||||
// SaveEvent saves an event to the in-memory store.
|
||||
// Returns VersionConflictError if the event's version is not strictly greater
|
||||
// than the current latest version for the actor.
|
||||
// If a broadcaster is configured, publishes an EventStored event on success.
|
||||
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||
es.mu.Lock()
|
||||
defer es.mu.Unlock()
|
||||
@@ -51,9 +67,35 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||
es.events[event.ActorID] = make([]*aether.Event, 0)
|
||||
}
|
||||
es.events[event.ActorID] = append(es.events[event.ActorID], event)
|
||||
|
||||
// Publish EventStored event after successful save (if broadcaster is configured)
|
||||
if es.broadcaster != nil {
|
||||
es.publishEventStored(event)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// publishEventStored publishes an EventStored event to the broadcaster.
|
||||
// This is called after a successful SaveEvent to notify subscribers.
|
||||
func (es *InMemoryEventStore) publishEventStored(originalEvent *aether.Event) {
|
||||
eventStored := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
EventType: aether.EventTypeEventStored,
|
||||
ActorID: originalEvent.ActorID, // EventStored is about the original actor
|
||||
Version: originalEvent.Version, // Preserve the version of the stored event
|
||||
Data: map[string]interface{}{
|
||||
"eventId": originalEvent.ID,
|
||||
"actorId": originalEvent.ActorID,
|
||||
"version": originalEvent.Version,
|
||||
"timestamp": originalEvent.Timestamp.Unix(),
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
es.broadcaster.Publish(es.namespace, eventStored)
|
||||
}
|
||||
|
||||
// GetEvents retrieves events for an actor from a specific version
|
||||
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||
es.mu.RLock()
|
||||
|
||||
@@ -1905,3 +1905,181 @@ func TestSaveEvent_MetadataPreservedAcrossMultipleEvents(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === EventStored Publishing Tests ===
|
||||
|
||||
func TestSaveEvent_WithBroadcaster_PublishesEventStored(t *testing.T) {
|
||||
// Create a mock broadcaster to capture published events
|
||||
broadcaster := aether.NewEventBus()
|
||||
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||
|
||||
// Subscribe to EventStored events
|
||||
ch := broadcaster.Subscribe("test-namespace")
|
||||
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-123",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{
|
||||
"total": 100.50,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Save event
|
||||
err := store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Check if EventStored was published
|
||||
select {
|
||||
case publishedEvent := <-ch:
|
||||
if publishedEvent == nil {
|
||||
t.Fatal("received nil event from broadcaster")
|
||||
}
|
||||
if publishedEvent.EventType != aether.EventTypeEventStored {
|
||||
t.Errorf("expected EventType %q, got %q", aether.EventTypeEventStored, publishedEvent.EventType)
|
||||
}
|
||||
if publishedEvent.ActorID != "order-456" {
|
||||
t.Errorf("expected ActorID %q, got %q", "order-456", publishedEvent.ActorID)
|
||||
}
|
||||
if publishedEvent.Version != 1 {
|
||||
t.Errorf("expected Version 1, got %d", publishedEvent.Version)
|
||||
}
|
||||
// Check data contains original event info
|
||||
if publishedEvent.Data["eventId"] != "evt-123" {
|
||||
t.Errorf("expected eventId %q, got %q", "evt-123", publishedEvent.Data["eventId"])
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("timeout waiting for EventStored event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_VersionConflict_NoEventStored(t *testing.T) {
|
||||
broadcaster := aether.NewEventBus()
|
||||
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||
|
||||
// Subscribe to EventStored events
|
||||
ch := broadcaster.Subscribe("test-namespace")
|
||||
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||
|
||||
// Save first event
|
||||
event1 := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err := store.SaveEvent(event1)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent(event1) failed: %v", err)
|
||||
}
|
||||
|
||||
// Drain the first EventStored event
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("timeout waiting for first EventStored event")
|
||||
}
|
||||
|
||||
// Try to save event with non-increasing version (should fail)
|
||||
event2 := &aether.Event{
|
||||
ID: "evt-2",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1, // Same version, should conflict
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = store.SaveEvent(event2)
|
||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
t.Fatalf("expected ErrVersionConflict, got %v", err)
|
||||
}
|
||||
|
||||
// Verify no EventStored event was published
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatal("expected no EventStored event, but received one")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected - no event published
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_MultipleEvents_PublishesMultipleEventStored(t *testing.T) {
|
||||
broadcaster := aether.NewEventBus()
|
||||
store := NewInMemoryEventStoreWithBroadcaster(broadcaster, "test-namespace")
|
||||
|
||||
// Subscribe to EventStored events
|
||||
ch := broadcaster.Subscribe("test-namespace")
|
||||
defer broadcaster.Unsubscribe("test-namespace", ch)
|
||||
|
||||
// Save multiple events
|
||||
for i := int64(1); i <= 3; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: i,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err := store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify we received 3 EventStored events in order
|
||||
for i := int64(1); i <= 3; i++ {
|
||||
select {
|
||||
case publishedEvent := <-ch:
|
||||
if publishedEvent == nil {
|
||||
t.Fatal("received nil event from broadcaster")
|
||||
}
|
||||
if publishedEvent.Version != i {
|
||||
t.Errorf("expected Version %d, got %d", i, publishedEvent.Version)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatalf("timeout waiting for EventStored event %d", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEvent_WithoutBroadcaster_NoPanic(t *testing.T) {
|
||||
// Test that SaveEvent works without a broadcaster (nil broadcaster)
|
||||
store := NewInMemoryEventStore()
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-123",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{
|
||||
"total": 100.50,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// This should not panic even though broadcaster is nil
|
||||
err := store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify event was saved
|
||||
events, err := store.GetEvents("order-456", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected 1 event, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user