Compare commits
2 Commits
bcbec9ab94
...
5223cf136a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5223cf136a | ||
|
|
0f89b07c0b |
10
event.go
10
event.go
@@ -166,6 +166,16 @@ func (e *Event) WithMetadataFrom(source *Event) {
|
||||
}
|
||||
}
|
||||
|
||||
// EventStored is an internal infrastructure event published after an event is successfully persisted.
|
||||
// It allows observability and trigger downstream workflows without coupling to application events.
|
||||
// EventStored is not published to external systems (Phase 2) - only to local EventBus subscribers.
|
||||
type EventStored struct {
|
||||
EventID string `json:"eventId"` // ID of the event that was stored
|
||||
ActorID string `json:"actorId"` // Actor that owns the stored event
|
||||
Version int64 `json:"version"` // Version of the stored event
|
||||
Timestamp time.Time `json:"timestamp"` // When the event was stored
|
||||
}
|
||||
|
||||
// ActorSnapshot represents a point-in-time state snapshot
|
||||
type ActorSnapshot struct {
|
||||
ActorID string `json:"actorId"`
|
||||
|
||||
362
store/eventstored_test.go
Normal file
362
store/eventstored_test.go
Normal file
@@ -0,0 +1,362 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
)
|
||||
|
||||
// MockEventBroadcaster captures published events for testing
|
||||
type MockEventBroadcaster struct {
|
||||
mu sync.RWMutex
|
||||
events []*aether.Event
|
||||
namespaces map[string]int
|
||||
}
|
||||
|
||||
func NewMockEventBroadcaster() *MockEventBroadcaster {
|
||||
return &MockEventBroadcaster{
|
||||
events: make([]*aether.Event, 0),
|
||||
namespaces: make(map[string]int),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockEventBroadcaster) Subscribe(namespacePattern string) <-chan *aether.Event {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockEventBroadcaster) SubscribeWithFilter(namespacePattern string, filter *aether.SubscriptionFilter) <-chan *aether.Event {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockEventBroadcaster) Unsubscribe(namespacePattern string, ch <-chan *aether.Event) {}
|
||||
|
||||
func (m *MockEventBroadcaster) Publish(namespaceID string, event *aether.Event) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.events = append(m.events, event)
|
||||
m.namespaces[namespaceID]++
|
||||
}
|
||||
|
||||
func (m *MockEventBroadcaster) Stop() {}
|
||||
|
||||
func (m *MockEventBroadcaster) SubscriberCount(namespaceID string) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *MockEventBroadcaster) GetPublishedEvents() []*aether.Event {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
events := make([]*aether.Event, len(m.events))
|
||||
copy(events, m.events)
|
||||
return events
|
||||
}
|
||||
|
||||
// === InMemoryEventStore EventStored Tests ===
|
||||
|
||||
func TestEventStored_PublishedOnSaveSuccess(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
mockBus := NewMockEventBroadcaster()
|
||||
store.WithEventBus(mockBus)
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-123",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"total": 100.50},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err := store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify EventStored was published
|
||||
published := mockBus.GetPublishedEvents()
|
||||
if len(published) != 1 {
|
||||
t.Fatalf("expected 1 published event, got %d", len(published))
|
||||
}
|
||||
|
||||
storedEvent := published[0]
|
||||
if storedEvent.EventType != "EventStored" {
|
||||
t.Errorf("expected EventType 'EventStored', got %q", storedEvent.EventType)
|
||||
}
|
||||
if storedEvent.ActorID != "order-456" {
|
||||
t.Errorf("expected ActorID 'order-456', got %q", storedEvent.ActorID)
|
||||
}
|
||||
if storedEvent.Data["eventId"] != "evt-123" {
|
||||
t.Errorf("expected eventId 'evt-123', got %v", storedEvent.Data["eventId"])
|
||||
}
|
||||
if storedEvent.Data["version"] != int64(1) {
|
||||
t.Errorf("expected version 1, got %v", storedEvent.Data["version"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventStored_NotPublishedOnVersionConflict(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
mockBus := NewMockEventBroadcaster()
|
||||
store.WithEventBus(mockBus)
|
||||
|
||||
// Save first event
|
||||
event1 := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event1); err != nil {
|
||||
t.Fatalf("First SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Try to save event with same version (conflict)
|
||||
event2 := &aether.Event{
|
||||
ID: "evt-2",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 1, // Same version - should conflict
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
err := store.SaveEvent(event2)
|
||||
if err == nil {
|
||||
t.Fatal("expected VersionConflictError, got nil")
|
||||
}
|
||||
|
||||
// Verify only 1 EventStored was published (from first event)
|
||||
published := mockBus.GetPublishedEvents()
|
||||
if len(published) != 1 {
|
||||
t.Fatalf("expected 1 published event after conflict, got %d", len(published))
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventStored_MultipleEventsPublished(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
mockBus := NewMockEventBroadcaster()
|
||||
store.WithEventBus(mockBus)
|
||||
|
||||
// Save 5 events
|
||||
for i := 1; i <= 5; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: int64(i),
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent %d failed: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify 5 EventStored events were published
|
||||
published := mockBus.GetPublishedEvents()
|
||||
if len(published) != 5 {
|
||||
t.Fatalf("expected 5 published events, got %d", len(published))
|
||||
}
|
||||
|
||||
// Verify each has correct data
|
||||
for i := 0; i < 5; i++ {
|
||||
if published[i].Data["version"] != int64(i+1) {
|
||||
t.Errorf("event %d: expected version %d, got %v", i, i+1, published[i].Data["version"])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventStored_NotPublishedWithoutEventBus(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
// Don't set event bus
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-123",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Should succeed without publishing (no-op)
|
||||
err := store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Event should be persisted normally
|
||||
retrieved, err := store.GetEvents("order-456", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("GetEvents failed: %v", err)
|
||||
}
|
||||
if len(retrieved) != 1 {
|
||||
t.Errorf("expected 1 event, got %d", len(retrieved))
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventStored_ContainsRequiredFields(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
mockBus := NewMockEventBroadcaster()
|
||||
store.WithEventBus(mockBus)
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-abc",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-xyz",
|
||||
Version: 42,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
published := mockBus.GetPublishedEvents()
|
||||
if len(published) != 1 {
|
||||
t.Fatalf("expected 1 event, got %d", len(published))
|
||||
}
|
||||
|
||||
storedEvent := published[0]
|
||||
|
||||
// Verify required fields
|
||||
if storedEvent.Data["eventId"] != "evt-abc" {
|
||||
t.Error("missing or incorrect eventId")
|
||||
}
|
||||
if storedEvent.Data["actorId"] != "actor-xyz" {
|
||||
t.Error("missing or incorrect actorId")
|
||||
}
|
||||
if storedEvent.Data["version"] != int64(42) {
|
||||
t.Error("missing or incorrect version")
|
||||
}
|
||||
if _, hasTimestamp := storedEvent.Data["timestamp"]; !hasTimestamp {
|
||||
t.Error("missing timestamp")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventStored_PublishedToCorrectNamespace(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
mockBus := NewMockEventBroadcaster()
|
||||
store.WithEventBus(mockBus)
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify published to __internal__ namespace
|
||||
namespaces := mockBus.namespaces
|
||||
if count, ok := namespaces["__internal__"]; !ok || count != 1 {
|
||||
t.Errorf("expected 1 event published to __internal__, got %v", namespaces)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventStored_WithMetricsRecording(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
mockBus := NewMockEventBroadcaster()
|
||||
mockMetrics := aether.NewMetricsCollector()
|
||||
|
||||
store.WithEventBus(mockBus)
|
||||
store.WithMetrics(mockMetrics)
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify metrics were recorded
|
||||
published := mockMetrics.EventsPublished("__internal__")
|
||||
if published != 1 {
|
||||
t.Errorf("expected 1 published metric, got %d", published)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventStored_ConcurrentPublishing(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
mockBus := NewMockEventBroadcaster()
|
||||
store.WithEventBus(mockBus)
|
||||
|
||||
numGoroutines := 10
|
||||
eventsPerGoroutine := 5
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for g := 0; g < numGoroutines; g++ {
|
||||
wg.Add(1)
|
||||
go func(goroutineID int) {
|
||||
defer wg.Done()
|
||||
for i := 0; i < eventsPerGoroutine; i++ {
|
||||
version := int64(goroutineID*eventsPerGoroutine + i + 1)
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d-%d", goroutineID, i),
|
||||
EventType: "TestEvent",
|
||||
ActorID: fmt.Sprintf("actor-%d", goroutineID),
|
||||
Version: version,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
_ = store.SaveEvent(event) // Ignore errors (some may conflict)
|
||||
}
|
||||
}(g)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Verify EventStored events were published for successful saves
|
||||
published := mockBus.GetPublishedEvents()
|
||||
if len(published) != numGoroutines*eventsPerGoroutine {
|
||||
t.Logf("Note: got %d published events (some saves may have conflicted)", len(published))
|
||||
}
|
||||
if len(published) == 0 {
|
||||
t.Fatal("expected at least some published events")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventStored_OrderPreserved(t *testing.T) {
|
||||
store := NewInMemoryEventStore()
|
||||
mockBus := NewMockEventBroadcaster()
|
||||
store.WithEventBus(mockBus)
|
||||
|
||||
// Save 3 events in order
|
||||
for i := 1; i <= 3; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: int64(i),
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent %d failed: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
published := mockBus.GetPublishedEvents()
|
||||
|
||||
// Verify order is preserved
|
||||
for i := 0; i < 3; i++ {
|
||||
if published[i].Data["eventId"] != fmt.Sprintf("evt-%d", i+1) {
|
||||
t.Errorf("event %d: expected evt-%d, got %v", i, i+1, published[i].Data["eventId"])
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -64,6 +64,8 @@ type JetStreamEventStore struct {
|
||||
config JetStreamConfig
|
||||
mu sync.Mutex // Protects version checks during SaveEvent
|
||||
versions map[string]int64 // actorID -> latest version cache
|
||||
eventBus aether.EventBroadcaster // Optional EventBus for publishing EventStored
|
||||
metrics aether.MetricsCollector // Optional metrics collector
|
||||
}
|
||||
|
||||
|
||||
@@ -89,6 +91,20 @@ func NewJetStreamEventStoreWithNamespace(natsConn *nats.Conn, streamName string,
|
||||
return NewJetStreamEventStoreWithConfig(natsConn, streamName, config)
|
||||
}
|
||||
|
||||
// WithEventBus sets the EventBus for publishing EventStored events.
|
||||
// This is optional - if not set, EventStored will not be published.
|
||||
func (jes *JetStreamEventStore) WithEventBus(bus aether.EventBroadcaster) *JetStreamEventStore {
|
||||
jes.eventBus = bus
|
||||
return jes
|
||||
}
|
||||
|
||||
// WithMetrics sets the metrics collector for recording EventStored metrics.
|
||||
// This is optional - if not set, metrics will not be recorded.
|
||||
func (jes *JetStreamEventStore) WithMetrics(metrics aether.MetricsCollector) *JetStreamEventStore {
|
||||
jes.metrics = metrics
|
||||
return jes
|
||||
}
|
||||
|
||||
// NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration
|
||||
func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) {
|
||||
js, err := natsConn.JetStream()
|
||||
@@ -203,9 +219,49 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
||||
// Update version cache after successful publish
|
||||
jes.versions[event.ActorID] = event.Version
|
||||
|
||||
// Publish EventStored event on success
|
||||
jes.publishEventStored(event)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// publishEventStored publishes an EventStored event to the EventBus and records metrics
|
||||
func (jes *JetStreamEventStore) publishEventStored(event *aether.Event) {
|
||||
if jes.eventBus == nil {
|
||||
return
|
||||
}
|
||||
|
||||
stored := &aether.EventStored{
|
||||
EventID: event.ID,
|
||||
ActorID: event.ActorID,
|
||||
Version: event.Version,
|
||||
Timestamp: event.Timestamp,
|
||||
}
|
||||
|
||||
// Convert EventStored to Event for publishing (internal system event)
|
||||
storedEvent := &aether.Event{
|
||||
ID: "eventstored-" + event.ID,
|
||||
EventType: "EventStored",
|
||||
ActorID: event.ActorID,
|
||||
Version: event.Version,
|
||||
Data: map[string]interface{}{
|
||||
"eventId": stored.EventID,
|
||||
"actorId": stored.ActorID,
|
||||
"version": stored.Version,
|
||||
"timestamp": stored.Timestamp,
|
||||
},
|
||||
Timestamp: stored.Timestamp,
|
||||
}
|
||||
|
||||
// Publish to default namespace (internal events)
|
||||
jes.eventBus.Publish("__internal__", storedEvent)
|
||||
|
||||
// Record metrics if collector is configured
|
||||
if jes.metrics != nil {
|
||||
jes.metrics.RecordPublish("__internal__")
|
||||
}
|
||||
}
|
||||
|
||||
// GetEvents retrieves all events for an actor since a version.
|
||||
// Note: This method silently skips malformed events for backward compatibility.
|
||||
// Use GetEventsWithErrors to receive information about malformed events.
|
||||
|
||||
@@ -1536,3 +1536,209 @@ func BenchmarkJetStreamEventStore_GetEvents(b *testing.B) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === JetStreamEventStore EventStored Integration Tests ===
|
||||
|
||||
// TestJetStreamEventStored_PublishedAfterSaveSuccess tests that EventStored is published after successful SaveEvent
|
||||
func TestJetStreamEventStored_PublishedAfterSaveSuccess(t *testing.T) {
|
||||
nc := getTestNATSConnection(t)
|
||||
if nc == nil {
|
||||
return
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
streamName := fmt.Sprintf("test-eventstored-%d", time.Now().UnixNano())
|
||||
defer cleanupStream(nc, streamName)
|
||||
|
||||
store, err := NewJetStreamEventStore(nc, streamName)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create store: %v", err)
|
||||
}
|
||||
|
||||
mockBus := NewMockEventBroadcaster()
|
||||
store.WithEventBus(mockBus)
|
||||
|
||||
event := &aether.Event{
|
||||
ID: "evt-123",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"total": 100.50},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify EventStored was published
|
||||
published := mockBus.GetPublishedEvents()
|
||||
if len(published) != 1 {
|
||||
t.Fatalf("expected 1 published event, got %d", len(published))
|
||||
}
|
||||
|
||||
storedEvent := published[0]
|
||||
if storedEvent.EventType != "EventStored" {
|
||||
t.Errorf("expected EventType 'EventStored', got %q", storedEvent.EventType)
|
||||
}
|
||||
if storedEvent.ActorID != "order-456" {
|
||||
t.Errorf("expected ActorID 'order-456', got %q", storedEvent.ActorID)
|
||||
}
|
||||
if storedEvent.Data["eventId"] != "evt-123" {
|
||||
t.Errorf("expected eventId 'evt-123', got %v", storedEvent.Data["eventId"])
|
||||
}
|
||||
if storedEvent.Data["version"] != int64(1) {
|
||||
t.Errorf("expected version 1, got %v", storedEvent.Data["version"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestJetStreamEventStored_PreservesOriginalTimestamp tests that EventStored preserves the original event's timestamp
|
||||
func TestJetStreamEventStored_PreservesOriginalTimestamp(t *testing.T) {
|
||||
nc := getTestNATSConnection(t)
|
||||
if nc == nil {
|
||||
return
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
streamName := fmt.Sprintf("test-timestamp-%d", time.Now().UnixNano())
|
||||
defer cleanupStream(nc, streamName)
|
||||
|
||||
store, err := NewJetStreamEventStore(nc, streamName)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create store: %v", err)
|
||||
}
|
||||
|
||||
mockBus := NewMockEventBroadcaster()
|
||||
store.WithEventBus(mockBus)
|
||||
|
||||
// Use a fixed timestamp in the past
|
||||
pastTime := time.Now().Add(-1 * time.Hour)
|
||||
event := &aether.Event{
|
||||
ID: "evt-123",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: pastTime,
|
||||
}
|
||||
|
||||
err = store.SaveEvent(event)
|
||||
if err != nil {
|
||||
t.Fatalf("SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
published := mockBus.GetPublishedEvents()
|
||||
if len(published) != 1 {
|
||||
t.Fatalf("expected 1 published event, got %d", len(published))
|
||||
}
|
||||
|
||||
storedEvent := published[0]
|
||||
publishedTimestamp := storedEvent.Data["timestamp"].(time.Time)
|
||||
|
||||
// Check timestamp is preserved exactly
|
||||
if !publishedTimestamp.Equal(pastTime) {
|
||||
t.Errorf("expected timestamp %v, got %v", pastTime, publishedTimestamp)
|
||||
}
|
||||
}
|
||||
|
||||
// TestJetStreamEventStored_MultipleEventsPublished tests that multiple EventStored events are published in order
|
||||
func TestJetStreamEventStored_MultipleEventsPublished(t *testing.T) {
|
||||
nc := getTestNATSConnection(t)
|
||||
if nc == nil {
|
||||
return
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
streamName := fmt.Sprintf("test-multi-%d", time.Now().UnixNano())
|
||||
defer cleanupStream(nc, streamName)
|
||||
|
||||
store, err := NewJetStreamEventStore(nc, streamName)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create store: %v", err)
|
||||
}
|
||||
|
||||
mockBus := NewMockEventBroadcaster()
|
||||
store.WithEventBus(mockBus)
|
||||
|
||||
// Save 5 events
|
||||
for i := 1; i <= 5; i++ {
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d", i),
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: int64(i),
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event); err != nil {
|
||||
t.Fatalf("SaveEvent %d failed: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify 5 EventStored events were published
|
||||
published := mockBus.GetPublishedEvents()
|
||||
if len(published) != 5 {
|
||||
t.Fatalf("expected 5 published events, got %d", len(published))
|
||||
}
|
||||
|
||||
// Verify each has correct data
|
||||
for i := 0; i < 5; i++ {
|
||||
if published[i].Data["version"] != int64(i+1) {
|
||||
t.Errorf("event %d: expected version %d, got %v", i, i+1, published[i].Data["version"])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestJetStreamEventStored_NotPublishedOnVersionConflict tests that EventStored is not published on version conflict
|
||||
func TestJetStreamEventStored_NotPublishedOnVersionConflict(t *testing.T) {
|
||||
nc := getTestNATSConnection(t)
|
||||
if nc == nil {
|
||||
return
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
streamName := fmt.Sprintf("test-conflict-%d", time.Now().UnixNano())
|
||||
defer cleanupStream(nc, streamName)
|
||||
|
||||
store, err := NewJetStreamEventStore(nc, streamName)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create store: %v", err)
|
||||
}
|
||||
|
||||
mockBus := NewMockEventBroadcaster()
|
||||
store.WithEventBus(mockBus)
|
||||
|
||||
// Save first event
|
||||
event1 := &aether.Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if err := store.SaveEvent(event1); err != nil {
|
||||
t.Fatalf("First SaveEvent failed: %v", err)
|
||||
}
|
||||
|
||||
// Try to save event with same version (conflict)
|
||||
event2 := &aether.Event{
|
||||
ID: "evt-2",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 1, // Same version - should conflict
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
err = store.SaveEvent(event2)
|
||||
if err == nil {
|
||||
t.Fatal("expected VersionConflictError, got nil")
|
||||
}
|
||||
|
||||
// Verify only 1 EventStored was published (from first event)
|
||||
published := mockBus.GetPublishedEvents()
|
||||
if len(published) != 1 {
|
||||
t.Fatalf("expected 1 published event after conflict, got %d", len(published))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ type InMemoryEventStore struct {
|
||||
mu sync.RWMutex
|
||||
events map[string][]*aether.Event // actorID -> events
|
||||
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
||||
eventBus aether.EventBroadcaster // Optional EventBus for publishing EventStored
|
||||
metrics aether.MetricsCollector // Optional metrics collector
|
||||
}
|
||||
|
||||
// NewInMemoryEventStore creates a new in-memory event store
|
||||
@@ -21,9 +23,24 @@ func NewInMemoryEventStore() *InMemoryEventStore {
|
||||
}
|
||||
}
|
||||
|
||||
// WithEventBus sets the EventBus for publishing EventStored events.
|
||||
// This is optional - if not set, EventStored will not be published.
|
||||
func (es *InMemoryEventStore) WithEventBus(bus aether.EventBroadcaster) *InMemoryEventStore {
|
||||
es.eventBus = bus
|
||||
return es
|
||||
}
|
||||
|
||||
// WithMetrics sets the metrics collector for recording EventStored metrics.
|
||||
// This is optional - if not set, metrics will not be recorded.
|
||||
func (es *InMemoryEventStore) WithMetrics(metrics aether.MetricsCollector) *InMemoryEventStore {
|
||||
es.metrics = metrics
|
||||
return es
|
||||
}
|
||||
|
||||
// SaveEvent saves an event to the in-memory store.
|
||||
// Returns VersionConflictError if the event's version is not strictly greater
|
||||
// than the current latest version for the actor.
|
||||
// On success, publishes an EventStored event to the EventBus (if configured).
|
||||
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||
es.mu.Lock()
|
||||
defer es.mu.Unlock()
|
||||
@@ -51,9 +68,50 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||
es.events[event.ActorID] = make([]*aether.Event, 0)
|
||||
}
|
||||
es.events[event.ActorID] = append(es.events[event.ActorID], event)
|
||||
|
||||
// Publish EventStored event on success
|
||||
es.publishEventStored(event)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// publishEventStored publishes an EventStored event to the EventBus and records metrics
|
||||
func (es *InMemoryEventStore) publishEventStored(event *aether.Event) {
|
||||
if es.eventBus == nil {
|
||||
return
|
||||
}
|
||||
|
||||
stored := &aether.EventStored{
|
||||
EventID: event.ID,
|
||||
ActorID: event.ActorID,
|
||||
Version: event.Version,
|
||||
Timestamp: event.Timestamp,
|
||||
}
|
||||
|
||||
// Convert EventStored to Event for publishing (internal system event)
|
||||
storedEvent := &aether.Event{
|
||||
ID: "eventstored-" + event.ID,
|
||||
EventType: "EventStored",
|
||||
ActorID: event.ActorID,
|
||||
Version: event.Version,
|
||||
Data: map[string]interface{}{
|
||||
"eventId": stored.EventID,
|
||||
"actorId": stored.ActorID,
|
||||
"version": stored.Version,
|
||||
"timestamp": stored.Timestamp,
|
||||
},
|
||||
Timestamp: stored.Timestamp,
|
||||
}
|
||||
|
||||
// Publish to default namespace (internal events)
|
||||
es.eventBus.Publish("__internal__", storedEvent)
|
||||
|
||||
// Record metrics if collector is configured
|
||||
if es.metrics != nil {
|
||||
es.metrics.RecordPublish("__internal__")
|
||||
}
|
||||
}
|
||||
|
||||
// GetEvents retrieves events for an actor from a specific version
|
||||
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||
es.mu.RLock()
|
||||
|
||||
Reference in New Issue
Block a user