Compare commits
1 Commits
5fb68fed4a
...
5c01911e3c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5c01911e3c |
317
examples/cross_node_broadcasting.go
Normal file
317
examples/cross_node_broadcasting.go
Normal file
@@ -0,0 +1,317 @@
|
|||||||
|
//go:build integration
|
||||||
|
// +build integration
|
||||||
|
|
||||||
|
package examples
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
"git.flowmade.one/flowmade-one/aether/store"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CrossNodeBroadcasting demonstrates how to implement cross-node event broadcasting
|
||||||
|
// using NATSEventBus with JetStreamEventStore. This example shows how events persist
|
||||||
|
// to JetStream and are then broadcast to other nodes in the cluster via NATS.
|
||||||
|
//
|
||||||
|
// Key Concepts:
|
||||||
|
// 1. NATSEventBus wraps EventBus to add NATS publishing
|
||||||
|
// 2. JetStreamEventStore with broadcaster publishes EventStored to NATS
|
||||||
|
// 3. Other nodes receive these events via NATS subscription
|
||||||
|
// 4. Version cache is updated on remote events to maintain consistency
|
||||||
|
//
|
||||||
|
// Usage:
|
||||||
|
// go run examples/cross_node_broadcasting.go
|
||||||
|
func CrossNodeBroadcastingExample() {
|
||||||
|
// Connect to NATS
|
||||||
|
nc, err := nats.Connect(nats.DefaultURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to NATS: %v", err)
|
||||||
|
}
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
// Create NATS event bus (this will broadcast to all nodes)
|
||||||
|
natsBus, err := aether.NewNATSEventBus(nc)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create NATS event bus: %v", err)
|
||||||
|
}
|
||||||
|
defer natsBus.Stop()
|
||||||
|
|
||||||
|
// Create JetStream event store WITH broadcaster
|
||||||
|
// This enables EventStored events to be published to NATS
|
||||||
|
store, err := store.NewJetStreamEventStoreWithBroadcaster(
|
||||||
|
nc,
|
||||||
|
"events",
|
||||||
|
natsBus,
|
||||||
|
"tenant-abc", // Optional namespace for isolation
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create event store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe to EventStored events to update version cache
|
||||||
|
// This keeps the version cache synchronized across nodes
|
||||||
|
eventStoredCh := natsBus.SubscribeWithFilter(
|
||||||
|
"tenant-abc",
|
||||||
|
&aether.SubscriptionFilter{
|
||||||
|
EventTypes: []string{aether.EventTypeEventStored},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for event := range eventStoredCh {
|
||||||
|
actorID := event.Data["actorId"].(string)
|
||||||
|
version := int64(event.Data["version"].(float64))
|
||||||
|
store.UpdateVersionCache(actorID, version)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Now save an event - it will be:
|
||||||
|
// 1. Persisted to JetStream
|
||||||
|
// 2. Published to NATS as EventStored
|
||||||
|
// 3. Received by other nodes via NATS
|
||||||
|
// 4. Used to update version cache
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: "event-1",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-123",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"total": 100.00,
|
||||||
|
"item": "widget",
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to save event: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println("Event saved to JetStream and broadcast to NATS")
|
||||||
|
|
||||||
|
// Other nodes in the cluster will receive this event via NATS
|
||||||
|
// and update their version cache accordingly
|
||||||
|
|
||||||
|
// Subscribe to events in this namespace
|
||||||
|
eventCh := natsBus.Subscribe("tenant-abc")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case receivedEvent := <-eventCh:
|
||||||
|
log.Printf("Received event via NATS: %s (version %d)",
|
||||||
|
receivedEvent.EventType, receivedEvent.Version)
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Println("Timeout waiting for event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CrossNodeBroadcastingMultiNode demonstrates a multi-node cluster setup.
|
||||||
|
// This simulates multiple nodes connecting to the same NATS cluster.
|
||||||
|
// In production, these would be separate processes/machines.
|
||||||
|
//
|
||||||
|
// This example requires a running NATS server with JetStream enabled.
|
||||||
|
func CrossNodeBroadcastingMultiNode() {
|
||||||
|
// Connect to NATS
|
||||||
|
nc, err := nats.Connect(nats.DefaultURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to NATS: %v", err)
|
||||||
|
}
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
nodeID := "node-1"
|
||||||
|
log.Printf("Starting %s", nodeID)
|
||||||
|
|
||||||
|
// Each node creates its own NATS event bus and event store
|
||||||
|
natsBus, err := aether.NewNATSEventBus(nc)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create NATS event bus: %v", err)
|
||||||
|
}
|
||||||
|
defer natsBus.Stop()
|
||||||
|
|
||||||
|
store, err := store.NewJetStreamEventStoreWithBroadcaster(
|
||||||
|
nc,
|
||||||
|
"events",
|
||||||
|
natsBus,
|
||||||
|
"tenant-abc",
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create event store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup EventStored subscription for cache synchronization
|
||||||
|
eventStoredCh := natsBus.SubscribeWithFilter(
|
||||||
|
"tenant-abc",
|
||||||
|
&aether.SubscriptionFilter{
|
||||||
|
EventTypes: []string{aether.EventTypeEventStored},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for event := range eventStoredCh {
|
||||||
|
actorID := event.Data["actorId"].(string)
|
||||||
|
version := int64(event.Data["version"].(float64))
|
||||||
|
store.UpdateVersionCache(actorID, version)
|
||||||
|
log.Printf("[%s] Received EventStored for %s v%d", nodeID, actorID, version)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Subscribe to actual events
|
||||||
|
eventCh := natsBus.Subscribe("tenant-abc")
|
||||||
|
log.Printf("[%s] Subscribed to tenant-abc", nodeID)
|
||||||
|
|
||||||
|
// Save an event
|
||||||
|
savedEvent := &aether.Event{
|
||||||
|
ID: fmt.Sprintf("event-%s-1", nodeID),
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-123",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"node": nodeID,
|
||||||
|
"total": 100.00,
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(savedEvent)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("[%s] Failed to save event: %v", nodeID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("[%s] Saved event to JetStream", nodeID)
|
||||||
|
|
||||||
|
// Wait to receive the event (either from local or remote)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case receivedEvent := <-eventCh:
|
||||||
|
log.Printf("[%s] Received: %s v%d from %s",
|
||||||
|
nodeID,
|
||||||
|
receivedEvent.EventType,
|
||||||
|
receivedEvent.Version,
|
||||||
|
receivedEvent.GetCorrelationID(),
|
||||||
|
)
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Printf("[%s] Timeout waiting for event", nodeID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DistributedOrderProcessing demonstrates a realistic scenario where
|
||||||
|
// multiple nodes process orders for the same actor. This shows how
|
||||||
|
// cross-node broadcasting ensures consistency.
|
||||||
|
func DistributedOrderProcessing() {
|
||||||
|
// Connect to NATS
|
||||||
|
nc, err := nats.Connect(nats.DefaultURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to NATS: %v", err)
|
||||||
|
}
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
// Node 1: Order creation
|
||||||
|
node1Bus, _ := aether.NewNATSEventBus(nc)
|
||||||
|
node1Store, _ := store.NewJetStreamEventStoreWithBroadcaster(
|
||||||
|
nc, "events", node1Bus, "orders",
|
||||||
|
)
|
||||||
|
defer node1Bus.Stop()
|
||||||
|
|
||||||
|
// Setup EventStored subscription for Node 1
|
||||||
|
eventStoredCh1 := node1Bus.SubscribeWithFilter(
|
||||||
|
"orders",
|
||||||
|
&aether.SubscriptionFilter{
|
||||||
|
EventTypes: []string{aether.EventTypeEventStored},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
go func() {
|
||||||
|
for event := range eventStoredCh1 {
|
||||||
|
actorID := event.Data["actorId"].(string)
|
||||||
|
version := int64(event.Data["version"].(float64))
|
||||||
|
node1Store.UpdateVersionCache(actorID, version)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Node 2: Order processing (different node)
|
||||||
|
node2Bus, _ := aether.NewNATSEventBus(nc)
|
||||||
|
node2Store, _ := store.NewJetStreamEventStoreWithBroadcaster(
|
||||||
|
nc, "events", node2Bus, "orders",
|
||||||
|
)
|
||||||
|
defer node2Bus.Stop()
|
||||||
|
|
||||||
|
// Setup EventStored subscription for Node 2
|
||||||
|
eventStoredCh2 := node2Bus.SubscribeWithFilter(
|
||||||
|
"orders",
|
||||||
|
&aether.SubscriptionFilter{
|
||||||
|
EventTypes: []string{aether.EventTypeEventStored},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
go func() {
|
||||||
|
for event := range eventStoredCh2 {
|
||||||
|
actorID := event.Data["actorId"].(string)
|
||||||
|
version := int64(event.Data["version"].(float64))
|
||||||
|
node2Store.UpdateVersionCache(actorID, version)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Node 2 also subscribes to events to receive updates
|
||||||
|
node2EventCh := node2Bus.Subscribe("orders")
|
||||||
|
|
||||||
|
// Node 1 creates an order
|
||||||
|
orderPlaced := &aether.Event{
|
||||||
|
ID: "order-created",
|
||||||
|
EventType: "OrderPlaced",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"total": 100.00,
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := node1Store.SaveEvent(orderPlaced); err != nil {
|
||||||
|
log.Fatalf("Failed to create order: %v", err)
|
||||||
|
}
|
||||||
|
log.Println("Node 1: Created order-456")
|
||||||
|
|
||||||
|
// Node 2 receives the OrderPlaced event via NATS
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case event := <-node2EventCh:
|
||||||
|
log.Printf("Node 2: Received %s v%d", event.EventType, event.Version)
|
||||||
|
|
||||||
|
// Node 2 processes the order (must use version 2)
|
||||||
|
orderProcessed := &aether.Event{
|
||||||
|
ID: "order-processed",
|
||||||
|
EventType: "OrderProcessed",
|
||||||
|
ActorID: "order-456",
|
||||||
|
Version: 2, // Must be > 1
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"status": "shipped",
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := node2Store.SaveEvent(orderProcessed); err != nil {
|
||||||
|
log.Fatalf("Node 2: Failed to process order: %v", err)
|
||||||
|
}
|
||||||
|
log.Println("Node 2: Processed order-456")
|
||||||
|
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Println("Node 2: Timeout waiting for order event")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify event stream consistency
|
||||||
|
events, err := node1Store.GetEvents("order-456", 0)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to get events: %v", err)
|
||||||
|
}
|
||||||
|
log.Printf("Node 1: Event stream has %d events", len(events))
|
||||||
|
}
|
||||||
4
go.mod
4
go.mod
@@ -6,16 +6,19 @@ require (
|
|||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/nats-io/nats.go v1.37.0
|
github.com/nats-io/nats.go v1.37.0
|
||||||
github.com/prometheus/client_golang v1.23.2
|
github.com/prometheus/client_golang v1.23.2
|
||||||
|
github.com/stretchr/testify v1.11.1
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||||
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
github.com/klauspost/compress v1.18.0 // indirect
|
github.com/klauspost/compress v1.18.0 // indirect
|
||||||
github.com/kr/text v0.2.0 // indirect
|
github.com/kr/text v0.2.0 // indirect
|
||||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||||
github.com/nats-io/nuid v1.0.1 // indirect
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
github.com/prometheus/client_model v0.6.2 // indirect
|
github.com/prometheus/client_model v0.6.2 // indirect
|
||||||
github.com/prometheus/common v0.66.1 // indirect
|
github.com/prometheus/common v0.66.1 // indirect
|
||||||
github.com/prometheus/procfs v0.16.1 // indirect
|
github.com/prometheus/procfs v0.16.1 // indirect
|
||||||
@@ -23,4 +26,5 @@ require (
|
|||||||
golang.org/x/crypto v0.18.0 // indirect
|
golang.org/x/crypto v0.18.0 // indirect
|
||||||
golang.org/x/sys v0.35.0 // indirect
|
golang.org/x/sys v0.35.0 // indirect
|
||||||
google.golang.org/protobuf v1.36.8 // indirect
|
google.golang.org/protobuf v1.36.8 // indirect
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -210,6 +210,26 @@ func (neb *NATSEventBus) Publish(namespaceID string, event *Event) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SubscribeToEventStored creates a subscription specifically for EventStored events.
|
||||||
|
// This is a convenience method for the common pattern of listening to persisted events
|
||||||
|
// to update version cache or trigger other actions.
|
||||||
|
//
|
||||||
|
// Example:
|
||||||
|
//
|
||||||
|
// eventStoredCh := natsBus.SubscribeToEventStored("tenant-abc")
|
||||||
|
// go func() {
|
||||||
|
// for event := range eventStoredCh {
|
||||||
|
// actorID := event.Data["actorId"].(string)
|
||||||
|
// version := int64(event.Data["version"].(float64))
|
||||||
|
// store.UpdateVersionCache(actorID, version)
|
||||||
|
// }
|
||||||
|
// }()
|
||||||
|
func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event {
|
||||||
|
return neb.SubscribeWithFilter(namespacePattern, &SubscriptionFilter{
|
||||||
|
EventTypes: []string{EventTypeEventStored},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Stop closes the NATS event bus and all subscriptions
|
// Stop closes the NATS event bus and all subscriptions
|
||||||
func (neb *NATSEventBus) Stop() {
|
func (neb *NATSEventBus) Stop() {
|
||||||
neb.mutex.Lock()
|
neb.mutex.Lock()
|
||||||
|
|||||||
36
store/helpers_test.go
Normal file
36
store/helpers_test.go
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
//go:build integration
|
||||||
|
// +build integration
|
||||||
|
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
// getTestNATSConnection returns a NATS connection for testing.
|
||||||
|
// This helper is used by benchmark tests that require NATS.
|
||||||
|
func getTestNATSConnection(t *testing.T) *nats.Conn {
|
||||||
|
nc, err := nats.Connect(nats.DefaultURL)
|
||||||
|
if err != nil {
|
||||||
|
t.Skipf("NATS not available: %v", err)
|
||||||
|
}
|
||||||
|
return nc
|
||||||
|
}
|
||||||
|
|
||||||
|
// getVersionFromEvent extracts version from EventStored event data.
|
||||||
|
func getVersionFromEvent(data map[string]interface{}) int64 {
|
||||||
|
switch v := data["version"].(type) {
|
||||||
|
case float64:
|
||||||
|
return int64(v)
|
||||||
|
case int64:
|
||||||
|
return v
|
||||||
|
case int:
|
||||||
|
return int64(v)
|
||||||
|
case uint64:
|
||||||
|
return int64(v)
|
||||||
|
default:
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
}
|
||||||
420
store/integration_test.go
Normal file
420
store/integration_test.go
Normal file
@@ -0,0 +1,420 @@
|
|||||||
|
//go:build integration
|
||||||
|
// +build integration
|
||||||
|
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// generateStreamName creates a unique stream name for each test run
|
||||||
|
func generateStreamName(baseName string) string {
|
||||||
|
return fmt.Sprintf("%s_%s_%d", baseName, "tv149", time.Now().UnixNano()%100000000)
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupStream deletes a JetStream stream if it exists.
|
||||||
|
func cleanupStream(nc *nats.Conn, streamName string) {
|
||||||
|
js, err := nc.JetStream()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = js.DeleteStream(streamName)
|
||||||
|
// Silently ignore errors - we just want to clean up
|
||||||
|
_ = err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCrossNodeBroadcasting_SingleNode tests basic cross-node broadcasting
|
||||||
|
// on a single node (local loopback).
|
||||||
|
func TestCrossNodeBroadcasting_SingleNode(t *testing.T) {
|
||||||
|
nc, err := nats.Connect(nats.DefaultURL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
streamName := generateStreamName("broadcast_single")
|
||||||
|
cleanupStream(nc, streamName)
|
||||||
|
|
||||||
|
// Create NATS event bus
|
||||||
|
natsBus, err := aether.NewNATSEventBus(nc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer natsBus.Stop()
|
||||||
|
|
||||||
|
// Create event store with broadcaster
|
||||||
|
store, err := NewJetStreamEventStoreWithBroadcaster(
|
||||||
|
nc,
|
||||||
|
streamName,
|
||||||
|
natsBus,
|
||||||
|
"tenant-single",
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Subscribe to events
|
||||||
|
eventCh := natsBus.Subscribe("tenant-single")
|
||||||
|
|
||||||
|
// Save event
|
||||||
|
testEvent := &aether.Event{
|
||||||
|
ID: "event-1",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: "actor-1",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{"test": "single"},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.SaveEvent(testEvent)
|
||||||
|
require.NoError(t, err)
|
||||||
|
log.Printf("Saved event: %s", testEvent.ID)
|
||||||
|
|
||||||
|
// Receive event via NATS (NATSBroadcast may have wrapped in EventStored)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case received := <-eventCh:
|
||||||
|
// The received event should have the same actor and version
|
||||||
|
assert.Equal(t, "actor-1", received.ActorID)
|
||||||
|
assert.Equal(t, int64(1), received.Version)
|
||||||
|
// Event type might be original or EventStored wrapper
|
||||||
|
if received.EventType == aether.EventTypeEventStored {
|
||||||
|
assert.Equal(t, "actor-1", received.Data["actorId"].(string))
|
||||||
|
log.Printf("Received EventStored wrapper: %s", received.ID)
|
||||||
|
} else {
|
||||||
|
log.Printf("Received via NATS: %s (type: %s)", received.ID, received.EventType)
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatal("Did not receive event via NATS")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCrossNodeBroadcasting_MultiNode tests broadcasting between two nodes.
|
||||||
|
func TestCrossNodeBroadcasting_MultiNode(t *testing.T) {
|
||||||
|
nc, err := nats.Connect(nats.DefaultURL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
streamName := generateStreamName("broadcast_multi")
|
||||||
|
cleanupStream(nc, streamName)
|
||||||
|
|
||||||
|
// Create Node A
|
||||||
|
nodeANatsBus, err := aether.NewNATSEventBus(nc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer nodeANatsBus.Stop()
|
||||||
|
|
||||||
|
nodeAStore, err := NewJetStreamEventStoreWithBroadcaster(
|
||||||
|
nc,
|
||||||
|
streamName,
|
||||||
|
nodeANatsBus,
|
||||||
|
"tenant-multi",
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Create Node B
|
||||||
|
nodeBNatsBus, err := aether.NewNATSEventBus(nc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer nodeBNatsBus.Stop()
|
||||||
|
|
||||||
|
nodeBStore, err := NewJetStreamEventStoreWithBroadcaster(
|
||||||
|
nc,
|
||||||
|
streamName,
|
||||||
|
nodeBNatsBus,
|
||||||
|
"tenant-multi",
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Node A subscribes
|
||||||
|
nodeAEventCh := nodeANatsBus.Subscribe("tenant-multi")
|
||||||
|
|
||||||
|
// Node B subscribes
|
||||||
|
nodeBEventCh := nodeBNatsBus.Subscribe("tenant-multi")
|
||||||
|
|
||||||
|
// Node A saves event
|
||||||
|
eventA := &aether.Event{
|
||||||
|
ID: "event-node-a",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: "multi-actor",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{"node": "a"},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = nodeAStore.SaveEvent(eventA)
|
||||||
|
require.NoError(t, err)
|
||||||
|
log.Printf("Node A saved: %s", eventA.ID)
|
||||||
|
|
||||||
|
// Node B receives event (EventStored wrapper or original)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case received := <-nodeBEventCh:
|
||||||
|
// Check actor and version match
|
||||||
|
assert.Equal(t, "multi-actor", received.ActorID)
|
||||||
|
assert.Equal(t, int64(1), received.Version)
|
||||||
|
if received.EventType == aether.EventTypeEventStored {
|
||||||
|
// EventStored wrapper
|
||||||
|
assert.Equal(t, "multi-actor", received.Data["actorId"].(string))
|
||||||
|
} else {
|
||||||
|
// Original event
|
||||||
|
assert.Equal(t, "a", received.Data["node"])
|
||||||
|
}
|
||||||
|
log.Printf("Node B received: %s (actor: %s, version: %d)", received.ID, received.ActorID, received.Version)
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatal("Node B did not receive event")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Give Node B time to receive and process Node A's event
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// Node B saves event with different actor
|
||||||
|
eventB := &aether.Event{
|
||||||
|
ID: "event-node-b",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: "multi-actor-b", // Different actor
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{"node": "b"},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = nodeBStore.SaveEvent(eventB)
|
||||||
|
require.NoError(t, err)
|
||||||
|
log.Printf("Node B saved: %s", eventB.ID)
|
||||||
|
|
||||||
|
// Node A receives Node B's event (could be EventStored or original)
|
||||||
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel2()
|
||||||
|
|
||||||
|
// Wait for either EventStored or the actual event
|
||||||
|
received := false
|
||||||
|
for !received {
|
||||||
|
select {
|
||||||
|
case event := <-nodeAEventCh:
|
||||||
|
// Check for Node B's event
|
||||||
|
if event.EventType == aether.EventTypeEventStored {
|
||||||
|
// EventStored wrapper - check actorId
|
||||||
|
actorID := event.Data["actorId"].(string)
|
||||||
|
if actorID == "multi-actor-b" {
|
||||||
|
received = true
|
||||||
|
log.Printf("Node A received EventStored for Node B's event: %s", event.ID)
|
||||||
|
}
|
||||||
|
} else if event.ActorID == "multi-actor-b" {
|
||||||
|
received = true
|
||||||
|
log.Printf("Node A received Node B's event: %s", event.ID)
|
||||||
|
}
|
||||||
|
// Keep listening until we get Node B's event
|
||||||
|
case <-ctx2.Done():
|
||||||
|
if !received {
|
||||||
|
t.Fatal("Node A did not receive Node B's event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestUpdateVersionCache tests the version cache update logic.
|
||||||
|
func TestUpdateVersionCache(t *testing.T) {
|
||||||
|
nc, err := nats.Connect(nats.DefaultURL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
streamName := generateStreamName("cache_test")
|
||||||
|
cleanupStream(nc, streamName)
|
||||||
|
|
||||||
|
store, err := NewJetStreamEventStore(nc, streamName)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Save event version 1
|
||||||
|
event1 := &aether.Event{
|
||||||
|
ID: "event-1",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: "actor-1",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
err = store.SaveEvent(event1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// EventStored will trigger UpdateVersionCache
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// Save event version 2 - should succeed (version > 1)
|
||||||
|
event2 := &aether.Event{
|
||||||
|
ID: "event-2",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: "actor-1",
|
||||||
|
Version: 2,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
err = store.SaveEvent(event2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Save event version 3 - should succeed (version > 2)
|
||||||
|
event3 := &aether.Event{
|
||||||
|
ID: "event-3",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: "actor-1",
|
||||||
|
Version: 3,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
err = store.SaveEvent(event3)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify all events can be retrieved
|
||||||
|
events, err := store.GetEvents("actor-1", 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, events, 3)
|
||||||
|
|
||||||
|
// Verify latest version
|
||||||
|
latest, err := store.GetLatestVersion("actor-1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, int64(3), latest)
|
||||||
|
|
||||||
|
// Manually update cache with version 5 (simulating external update)
|
||||||
|
store.UpdateVersionCache("actor-1", 5)
|
||||||
|
|
||||||
|
// Verify version 4 would conflict (4 < cached 5)
|
||||||
|
event4 := &aether.Event{
|
||||||
|
ID: "event-4",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: "actor-1",
|
||||||
|
Version: 4,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
err = store.SaveEvent(event4)
|
||||||
|
assert.Error(t, err, "version 4 should conflict with cached version 5")
|
||||||
|
|
||||||
|
// Version 6 should succeed (6 > 5)
|
||||||
|
event6 := &aether.Event{
|
||||||
|
ID: "event-6",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: "actor-1",
|
||||||
|
Version: 6,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
err = store.SaveEvent(event6)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify version 6 was saved
|
||||||
|
latest, err = store.GetLatestVersion("actor-1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, int64(6), latest)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSubscribeToEventStored tests the convenience helper.
|
||||||
|
func TestSubscribeToEventStored(t *testing.T) {
|
||||||
|
nc, err := nats.Connect(nats.DefaultURL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
natsBus, err := aether.NewNATSEventBus(nc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer natsBus.Stop()
|
||||||
|
|
||||||
|
// Use helper
|
||||||
|
eventStoredCh := natsBus.SubscribeToEventStored("test-store")
|
||||||
|
|
||||||
|
// Verify channel is created
|
||||||
|
assert.NotNil(t, eventStoredCh)
|
||||||
|
|
||||||
|
// Publish EventStored manually (version will be float64 from JSON)
|
||||||
|
eventStored := &aether.Event{
|
||||||
|
ID: "stored-1",
|
||||||
|
EventType: aether.EventTypeEventStored,
|
||||||
|
ActorID: "actor-1",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"actorId": "actor-1",
|
||||||
|
"version": 1.0, // Use float64 to match JSON encoding
|
||||||
|
},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
natsBus.Publish("test-store", eventStored)
|
||||||
|
|
||||||
|
// Should receive the EventStored
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case received := <-eventStoredCh:
|
||||||
|
assert.Equal(t, aether.EventTypeEventStored, received.EventType)
|
||||||
|
assert.Equal(t, "actor-1", received.ActorID)
|
||||||
|
log.Printf("Received EventStored: %s", received.ID)
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatal("Did not receive EventStored")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCrossNodeBroadcasting_NamespaceIsolation tests namespace isolation.
|
||||||
|
func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) {
|
||||||
|
nc, err := nats.Connect(nats.DefaultURL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
streamName := generateStreamName("namespace_isolation")
|
||||||
|
cleanupStream(nc, streamName)
|
||||||
|
|
||||||
|
// Create stores with different namespaces
|
||||||
|
storeA, err := NewJetStreamEventStoreWithBroadcaster(
|
||||||
|
nc,
|
||||||
|
streamName,
|
||||||
|
nil,
|
||||||
|
"tenant-a",
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
storeB, err := NewJetStreamEventStoreWithBroadcaster(
|
||||||
|
nc,
|
||||||
|
streamName,
|
||||||
|
nil,
|
||||||
|
"tenant-b",
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Save to each namespace
|
||||||
|
eventA := &aether.Event{
|
||||||
|
ID: "event-a",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: "actor-a",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{"tenant": "a"},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = storeA.SaveEvent(eventA)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
eventB := &aether.Event{
|
||||||
|
ID: "event-b",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: "actor-b",
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{"tenant": "b"},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = storeB.SaveEvent(eventB)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify each store can see its own events
|
||||||
|
eventsA, err := storeA.GetEvents("actor-a", 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, eventsA, 1)
|
||||||
|
assert.Equal(t, "a", eventsA[0].Data["tenant"])
|
||||||
|
|
||||||
|
eventsB, err := storeB.GetEvents("actor-b", 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, eventsB, 1)
|
||||||
|
assert.Equal(t, "b", eventsB[0].Data["tenant"])
|
||||||
|
}
|
||||||
@@ -558,5 +558,18 @@ func sanitizeSubject(s string) string {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateVersionCache updates the version cache for a specific actor.
|
||||||
|
// This is used when receiving events from other nodes via NATS to keep
|
||||||
|
// the version cache consistent across cluster nodes.
|
||||||
|
func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) {
|
||||||
|
jes.mu.Lock()
|
||||||
|
defer jes.mu.Unlock()
|
||||||
|
|
||||||
|
// Only update if the new version is greater than cached version
|
||||||
|
if currentVersion, ok := jes.versions[actorID]; !ok || version > currentVersion {
|
||||||
|
jes.versions[actorID] = version
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors
|
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors
|
||||||
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)
|
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)
|
||||||
|
|||||||
Reference in New Issue
Block a user