Adds support for filtering events by type or actor pattern within namespace subscriptions. Key changes: - Add SubscriptionFilter type with EventTypes and ActorPattern fields - Add SubscribeWithFilter to EventBroadcaster interface - Implement filtering in EventBus with full wildcard pattern support preserved - Implement filtering in NATSEventBus (server-side namespace, client-side filters) - Add MatchActorPattern function for actor ID pattern matching - Add comprehensive unit tests for all filtering scenarios Filter Processing: - EventTypes: Event must match at least one type in the list (OR within types) - ActorPattern: Event's ActorID must match the pattern (supports * and > wildcards) - Multiple filters are combined with AND logic This implementation works alongside the existing wildcard subscription support: - Namespace wildcards (* and >) work with event filters - Filters are applied after namespace pattern matching - Metrics are properly recorded for filtered subscriptions Closes #21 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
823 lines
20 KiB
Go
823 lines
20 KiB
Go
package aether
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestEventBus_ExactSubscription(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
ch := eb.Subscribe("tenant-a")
|
|
|
|
event := &Event{
|
|
ID: "evt-1",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-1",
|
|
}
|
|
|
|
eb.Publish("tenant-a", event)
|
|
|
|
select {
|
|
case received := <-ch:
|
|
if received.ID != event.ID {
|
|
t.Errorf("expected event ID %s, got %s", event.ID, received.ID)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Fatal("timed out waiting for event")
|
|
}
|
|
}
|
|
|
|
func TestEventBus_WildcardStarSubscription(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Subscribe to all single-token namespaces
|
|
ch := eb.Subscribe("*")
|
|
|
|
event := &Event{
|
|
ID: "evt-1",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-1",
|
|
}
|
|
|
|
eb.Publish("tenant-a", event)
|
|
|
|
select {
|
|
case received := <-ch:
|
|
if received.ID != event.ID {
|
|
t.Errorf("expected event ID %s, got %s", event.ID, received.ID)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Fatal("timed out waiting for event")
|
|
}
|
|
}
|
|
|
|
func TestEventBus_WildcardGreaterSubscription(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Subscribe to all namespaces
|
|
ch := eb.Subscribe(">")
|
|
|
|
events := []*Event{
|
|
{ID: "evt-1", EventType: "Test1", ActorID: "actor-1"},
|
|
{ID: "evt-2", EventType: "Test2", ActorID: "actor-2"},
|
|
{ID: "evt-3", EventType: "Test3", ActorID: "actor-3"},
|
|
}
|
|
|
|
namespaces := []string{"tenant-a", "tenant-b", "prod.tenant.orders"}
|
|
|
|
for i, ns := range namespaces {
|
|
eb.Publish(ns, events[i])
|
|
}
|
|
|
|
received := make(map[string]bool)
|
|
timeout := time.After(100 * time.Millisecond)
|
|
|
|
for i := 0; i < len(events); i++ {
|
|
select {
|
|
case evt := <-ch:
|
|
received[evt.ID] = true
|
|
case <-timeout:
|
|
t.Fatalf("timed out after receiving %d of %d events", i, len(events))
|
|
}
|
|
}
|
|
|
|
for _, evt := range events {
|
|
if !received[evt.ID] {
|
|
t.Errorf("did not receive event %s", evt.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestEventBus_PrefixWildcard(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Subscribe to prod.*
|
|
ch := eb.Subscribe("prod.*")
|
|
|
|
event1 := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"}
|
|
event2 := &Event{ID: "evt-2", EventType: "Test", ActorID: "actor-2"}
|
|
event3 := &Event{ID: "evt-3", EventType: "Test", ActorID: "actor-3"}
|
|
|
|
// Should match
|
|
eb.Publish("prod.tenant", event1)
|
|
eb.Publish("prod.orders", event2)
|
|
// Should not match (different prefix)
|
|
eb.Publish("staging.tenant", event3)
|
|
|
|
received := make(map[string]bool)
|
|
timeout := time.After(100 * time.Millisecond)
|
|
|
|
// Should receive exactly 2 events
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case evt := <-ch:
|
|
received[evt.ID] = true
|
|
case <-timeout:
|
|
t.Fatalf("timed out after receiving %d events", len(received))
|
|
}
|
|
}
|
|
|
|
// Verify we got the right ones
|
|
if !received["evt-1"] || !received["evt-2"] {
|
|
t.Errorf("expected evt-1 and evt-2, got %v", received)
|
|
}
|
|
|
|
// Verify no third event arrives
|
|
select {
|
|
case evt := <-ch:
|
|
t.Errorf("unexpected event received: %s", evt.ID)
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Expected - no more events
|
|
}
|
|
}
|
|
|
|
func TestEventBus_MultipleWildcardSubscribers(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
ch1 := eb.Subscribe("prod.*")
|
|
ch2 := eb.Subscribe("prod.>")
|
|
ch3 := eb.Subscribe(">")
|
|
|
|
event := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"}
|
|
|
|
eb.Publish("prod.tenant.orders", event)
|
|
|
|
// ch1 (prod.*) should NOT receive - doesn't match 3 tokens
|
|
select {
|
|
case <-ch1:
|
|
t.Error("prod.* should not match prod.tenant.orders")
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Expected
|
|
}
|
|
|
|
// ch2 (prod.>) should receive
|
|
select {
|
|
case received := <-ch2:
|
|
if received.ID != event.ID {
|
|
t.Errorf("expected %s, got %s", event.ID, received.ID)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("prod.> should match prod.tenant.orders")
|
|
}
|
|
|
|
// ch3 (>) should receive
|
|
select {
|
|
case received := <-ch3:
|
|
if received.ID != event.ID {
|
|
t.Errorf("expected %s, got %s", event.ID, received.ID)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("> should match prod.tenant.orders")
|
|
}
|
|
}
|
|
|
|
func TestEventBus_ExactAndWildcardCoexist(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
chExact := eb.Subscribe("tenant-a")
|
|
chWildcard := eb.Subscribe("*")
|
|
|
|
event := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"}
|
|
|
|
eb.Publish("tenant-a", event)
|
|
|
|
// Both should receive the event
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
select {
|
|
case received := <-chExact:
|
|
if received.ID != event.ID {
|
|
t.Errorf("exact: expected %s, got %s", event.ID, received.ID)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("exact subscriber timed out")
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
select {
|
|
case received := <-chWildcard:
|
|
if received.ID != event.ID {
|
|
t.Errorf("wildcard: expected %s, got %s", event.ID, received.ID)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("wildcard subscriber timed out")
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestEventBus_WildcardUnsubscribe(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
ch := eb.Subscribe("prod.*")
|
|
|
|
// Verify it's counted
|
|
if eb.WildcardSubscriberCount() != 1 {
|
|
t.Errorf("expected 1 wildcard subscriber, got %d", eb.WildcardSubscriberCount())
|
|
}
|
|
|
|
eb.Unsubscribe("prod.*", ch)
|
|
|
|
// Verify it's removed
|
|
if eb.WildcardSubscriberCount() != 0 {
|
|
t.Errorf("expected 0 wildcard subscribers, got %d", eb.WildcardSubscriberCount())
|
|
}
|
|
}
|
|
|
|
func TestEventBus_SubscriberCount(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Add exact subscribers
|
|
ch1 := eb.Subscribe("tenant-a")
|
|
ch2 := eb.Subscribe("tenant-a")
|
|
|
|
if eb.SubscriberCount("tenant-a") != 2 {
|
|
t.Errorf("expected 2 exact subscribers, got %d", eb.SubscriberCount("tenant-a"))
|
|
}
|
|
|
|
// Add wildcard subscriber - should not affect exact count
|
|
eb.Subscribe("*")
|
|
|
|
if eb.SubscriberCount("tenant-a") != 2 {
|
|
t.Errorf("expected 2 exact subscribers after wildcard add, got %d", eb.SubscriberCount("tenant-a"))
|
|
}
|
|
if eb.WildcardSubscriberCount() != 1 {
|
|
t.Errorf("expected 1 wildcard subscriber, got %d", eb.WildcardSubscriberCount())
|
|
}
|
|
|
|
// Unsubscribe exact
|
|
eb.Unsubscribe("tenant-a", ch1)
|
|
if eb.SubscriberCount("tenant-a") != 1 {
|
|
t.Errorf("expected 1 exact subscriber after unsubscribe, got %d", eb.SubscriberCount("tenant-a"))
|
|
}
|
|
|
|
eb.Unsubscribe("tenant-a", ch2)
|
|
if eb.SubscriberCount("tenant-a") != 0 {
|
|
t.Errorf("expected 0 exact subscribers after unsubscribe, got %d", eb.SubscriberCount("tenant-a"))
|
|
}
|
|
}
|
|
|
|
func TestEventBus_StopClosesAllChannels(t *testing.T) {
|
|
eb := NewEventBus()
|
|
|
|
chExact := eb.Subscribe("tenant-a")
|
|
chWildcard := eb.Subscribe("*")
|
|
|
|
eb.Stop()
|
|
|
|
// Both channels should be closed
|
|
select {
|
|
case _, ok := <-chExact:
|
|
if ok {
|
|
t.Error("expected exact channel to be closed")
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("timed out waiting for exact channel close")
|
|
}
|
|
|
|
select {
|
|
case _, ok := <-chWildcard:
|
|
if ok {
|
|
t.Error("expected wildcard channel to be closed")
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("timed out waiting for wildcard channel close")
|
|
}
|
|
}
|
|
|
|
func TestEventBus_NamespaceIsolation(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
chA := eb.Subscribe("tenant-a")
|
|
chB := eb.Subscribe("tenant-b")
|
|
|
|
eventA := &Event{ID: "evt-a", EventType: "Test", ActorID: "actor-1"}
|
|
eventB := &Event{ID: "evt-b", EventType: "Test", ActorID: "actor-2"}
|
|
|
|
eb.Publish("tenant-a", eventA)
|
|
eb.Publish("tenant-b", eventB)
|
|
|
|
// Verify tenant-a receives only its event
|
|
select {
|
|
case received := <-chA:
|
|
if received.ID != "evt-a" {
|
|
t.Errorf("tenant-a received wrong event: %s", received.ID)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("tenant-a timed out")
|
|
}
|
|
|
|
select {
|
|
case <-chA:
|
|
t.Error("tenant-a received extra event")
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Expected
|
|
}
|
|
|
|
// Verify tenant-b receives only its event
|
|
select {
|
|
case received := <-chB:
|
|
if received.ID != "evt-b" {
|
|
t.Errorf("tenant-b received wrong event: %s", received.ID)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("tenant-b timed out")
|
|
}
|
|
|
|
select {
|
|
case <-chB:
|
|
t.Error("tenant-b received extra event")
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Expected
|
|
}
|
|
}
|
|
|
|
func TestEventBus_NonBlockingPublish(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Create subscriber but don't read from channel
|
|
_ = eb.Subscribe("tenant-a")
|
|
|
|
// Fill the channel buffer (100 events)
|
|
for i := 0; i < 150; i++ {
|
|
event := &Event{
|
|
ID: "evt",
|
|
EventType: "Test",
|
|
ActorID: "actor-1",
|
|
}
|
|
// Should not block even when channel is full
|
|
eb.Publish("tenant-a", event)
|
|
}
|
|
|
|
// If we got here without blocking, test passes
|
|
}
|
|
|
|
func TestEventBus_ConcurrentOperations(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
// Concurrent subscriptions
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(1)
|
|
go func(n int) {
|
|
defer wg.Done()
|
|
ch := eb.Subscribe("tenant-a")
|
|
time.Sleep(10 * time.Millisecond)
|
|
eb.Unsubscribe("tenant-a", ch)
|
|
}(i)
|
|
}
|
|
|
|
// Concurrent wildcard subscriptions
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(1)
|
|
go func(n int) {
|
|
defer wg.Done()
|
|
ch := eb.Subscribe("*")
|
|
time.Sleep(10 * time.Millisecond)
|
|
eb.Unsubscribe("*", ch)
|
|
}(i)
|
|
}
|
|
|
|
// Concurrent publishes
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(1)
|
|
go func(n int) {
|
|
defer wg.Done()
|
|
event := &Event{
|
|
ID: "evt",
|
|
EventType: "Test",
|
|
ActorID: "actor-1",
|
|
}
|
|
eb.Publish("tenant-a", event)
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
// Tests for SubscribeWithFilter functionality
|
|
|
|
func TestEventBus_SubscribeWithFilter_EventTypes(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Subscribe with filter for specific event types
|
|
filter := &SubscriptionFilter{
|
|
EventTypes: []string{"OrderPlaced", "OrderShipped"},
|
|
}
|
|
ch := eb.SubscribeWithFilter("orders", filter)
|
|
|
|
// Publish events of different types
|
|
events := []*Event{
|
|
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"},
|
|
{ID: "evt-2", EventType: "OrderCancelled", ActorID: "order-2"}, // Should not be received
|
|
{ID: "evt-3", EventType: "OrderShipped", ActorID: "order-3"},
|
|
}
|
|
|
|
for _, e := range events {
|
|
eb.Publish("orders", e)
|
|
}
|
|
|
|
// Should receive evt-1 and evt-3, but not evt-2
|
|
received := make(map[string]bool)
|
|
timeout := time.After(100 * time.Millisecond)
|
|
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case evt := <-ch:
|
|
received[evt.ID] = true
|
|
case <-timeout:
|
|
t.Fatalf("timed out after receiving %d events", len(received))
|
|
}
|
|
}
|
|
|
|
if !received["evt-1"] || !received["evt-3"] {
|
|
t.Errorf("expected to receive evt-1 and evt-3, got %v", received)
|
|
}
|
|
|
|
// Verify evt-2 was not received
|
|
select {
|
|
case evt := <-ch:
|
|
t.Errorf("unexpected event received: %s", evt.ID)
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Expected
|
|
}
|
|
}
|
|
|
|
func TestEventBus_SubscribeWithFilter_ActorPattern(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Subscribe with filter for specific actor pattern
|
|
filter := &SubscriptionFilter{
|
|
ActorPattern: "order-*",
|
|
}
|
|
ch := eb.SubscribeWithFilter("events", filter)
|
|
|
|
// Publish events from different actors
|
|
events := []*Event{
|
|
{ID: "evt-1", EventType: "Test", ActorID: "order-123"},
|
|
{ID: "evt-2", EventType: "Test", ActorID: "user-456"}, // Should not be received
|
|
{ID: "evt-3", EventType: "Test", ActorID: "order-789"},
|
|
}
|
|
|
|
for _, e := range events {
|
|
eb.Publish("events", e)
|
|
}
|
|
|
|
// Should receive evt-1 and evt-3, but not evt-2
|
|
received := make(map[string]bool)
|
|
timeout := time.After(100 * time.Millisecond)
|
|
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case evt := <-ch:
|
|
received[evt.ID] = true
|
|
case <-timeout:
|
|
t.Fatalf("timed out after receiving %d events", len(received))
|
|
}
|
|
}
|
|
|
|
if !received["evt-1"] || !received["evt-3"] {
|
|
t.Errorf("expected to receive evt-1 and evt-3, got %v", received)
|
|
}
|
|
|
|
// Verify evt-2 was not received
|
|
select {
|
|
case evt := <-ch:
|
|
t.Errorf("unexpected event received: %s", evt.ID)
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Expected
|
|
}
|
|
}
|
|
|
|
func TestEventBus_SubscribeWithFilter_Combined(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Subscribe with filter for both event type AND actor pattern
|
|
filter := &SubscriptionFilter{
|
|
EventTypes: []string{"OrderPlaced"},
|
|
ActorPattern: "order-*",
|
|
}
|
|
ch := eb.SubscribeWithFilter("orders", filter)
|
|
|
|
// Publish events with various combinations
|
|
events := []*Event{
|
|
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-123"}, // Should be received
|
|
{ID: "evt-2", EventType: "OrderPlaced", ActorID: "user-456"}, // Wrong actor
|
|
{ID: "evt-3", EventType: "OrderCancelled", ActorID: "order-789"}, // Wrong type
|
|
{ID: "evt-4", EventType: "OrderCancelled", ActorID: "user-000"}, // Wrong both
|
|
}
|
|
|
|
for _, e := range events {
|
|
eb.Publish("orders", e)
|
|
}
|
|
|
|
// Should only receive evt-1
|
|
select {
|
|
case evt := <-ch:
|
|
if evt.ID != "evt-1" {
|
|
t.Errorf("expected evt-1, got %s", evt.ID)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Fatal("timed out waiting for event")
|
|
}
|
|
|
|
// Verify no more events arrive
|
|
select {
|
|
case evt := <-ch:
|
|
t.Errorf("unexpected event received: %s", evt.ID)
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Expected
|
|
}
|
|
}
|
|
|
|
func TestEventBus_SubscribeWithFilter_NilFilter(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Subscribe with nil filter - should receive all events
|
|
ch := eb.SubscribeWithFilter("events", nil)
|
|
|
|
events := []*Event{
|
|
{ID: "evt-1", EventType: "TypeA", ActorID: "actor-1"},
|
|
{ID: "evt-2", EventType: "TypeB", ActorID: "actor-2"},
|
|
}
|
|
|
|
for _, e := range events {
|
|
eb.Publish("events", e)
|
|
}
|
|
|
|
received := make(map[string]bool)
|
|
timeout := time.After(100 * time.Millisecond)
|
|
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case evt := <-ch:
|
|
received[evt.ID] = true
|
|
case <-timeout:
|
|
t.Fatalf("timed out after receiving %d events", len(received))
|
|
}
|
|
}
|
|
|
|
if !received["evt-1"] || !received["evt-2"] {
|
|
t.Errorf("expected all events, got %v", received)
|
|
}
|
|
}
|
|
|
|
func TestEventBus_SubscribeWithFilter_EmptyFilter(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Subscribe with empty filter - should receive all events
|
|
ch := eb.SubscribeWithFilter("events", &SubscriptionFilter{})
|
|
|
|
events := []*Event{
|
|
{ID: "evt-1", EventType: "TypeA", ActorID: "actor-1"},
|
|
{ID: "evt-2", EventType: "TypeB", ActorID: "actor-2"},
|
|
}
|
|
|
|
for _, e := range events {
|
|
eb.Publish("events", e)
|
|
}
|
|
|
|
received := make(map[string]bool)
|
|
timeout := time.After(100 * time.Millisecond)
|
|
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case evt := <-ch:
|
|
received[evt.ID] = true
|
|
case <-timeout:
|
|
t.Fatalf("timed out after receiving %d events", len(received))
|
|
}
|
|
}
|
|
|
|
if !received["evt-1"] || !received["evt-2"] {
|
|
t.Errorf("expected all events, got %v", received)
|
|
}
|
|
}
|
|
|
|
func TestEventBus_SubscribeWithFilter_WildcardNamespaceAndFilter(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Subscribe to wildcard namespace pattern with event type filter
|
|
filter := &SubscriptionFilter{
|
|
EventTypes: []string{"OrderPlaced"},
|
|
}
|
|
ch := eb.SubscribeWithFilter("prod.*", filter)
|
|
|
|
// Publish events to different namespaces
|
|
events := []*Event{
|
|
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"}, // prod.orders - should match
|
|
{ID: "evt-2", EventType: "OrderShipped", ActorID: "order-2"}, // prod.orders - wrong type
|
|
{ID: "evt-3", EventType: "OrderPlaced", ActorID: "order-3"}, // staging.orders - wrong namespace
|
|
}
|
|
|
|
eb.Publish("prod.orders", events[0])
|
|
eb.Publish("prod.orders", events[1])
|
|
eb.Publish("staging.orders", events[2])
|
|
|
|
// Should only receive evt-1
|
|
select {
|
|
case evt := <-ch:
|
|
if evt.ID != "evt-1" {
|
|
t.Errorf("expected evt-1, got %s", evt.ID)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Fatal("timed out waiting for event")
|
|
}
|
|
|
|
// Verify no more events arrive
|
|
select {
|
|
case evt := <-ch:
|
|
t.Errorf("unexpected event received: %s", evt.ID)
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Expected
|
|
}
|
|
}
|
|
|
|
func TestEventBus_SubscribeWithFilter_MultipleSubscribersWithDifferentFilters(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Two subscribers with different filters on same namespace
|
|
filter1 := &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}
|
|
filter2 := &SubscriptionFilter{EventTypes: []string{"OrderShipped"}}
|
|
|
|
ch1 := eb.SubscribeWithFilter("orders", filter1)
|
|
ch2 := eb.SubscribeWithFilter("orders", filter2)
|
|
|
|
events := []*Event{
|
|
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"},
|
|
{ID: "evt-2", EventType: "OrderShipped", ActorID: "order-2"},
|
|
}
|
|
|
|
for _, e := range events {
|
|
eb.Publish("orders", e)
|
|
}
|
|
|
|
// ch1 should only receive evt-1
|
|
select {
|
|
case evt := <-ch1:
|
|
if evt.ID != "evt-1" {
|
|
t.Errorf("ch1: expected evt-1, got %s", evt.ID)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Fatal("ch1 timed out")
|
|
}
|
|
|
|
// ch2 should only receive evt-2
|
|
select {
|
|
case evt := <-ch2:
|
|
if evt.ID != "evt-2" {
|
|
t.Errorf("ch2: expected evt-2, got %s", evt.ID)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Fatal("ch2 timed out")
|
|
}
|
|
|
|
// Verify no extra events
|
|
select {
|
|
case evt := <-ch1:
|
|
t.Errorf("ch1: unexpected event %s", evt.ID)
|
|
case evt := <-ch2:
|
|
t.Errorf("ch2: unexpected event %s", evt.ID)
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Expected
|
|
}
|
|
}
|
|
|
|
func TestEventBus_SubscribeWithFilter_UnsubscribeFiltered(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
filter := &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}
|
|
ch := eb.SubscribeWithFilter("orders", filter)
|
|
|
|
// Verify subscription count
|
|
if eb.SubscriberCount("orders") != 1 {
|
|
t.Errorf("expected 1 subscriber, got %d", eb.SubscriberCount("orders"))
|
|
}
|
|
|
|
eb.Unsubscribe("orders", ch)
|
|
|
|
// Verify unsubscribed
|
|
if eb.SubscriberCount("orders") != 0 {
|
|
t.Errorf("expected 0 subscribers, got %d", eb.SubscriberCount("orders"))
|
|
}
|
|
}
|
|
|
|
func TestEventBus_SubscribeWithFilter_FilteredAndUnfilteredCoexist(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// One subscriber with filter, one without
|
|
filter := &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}
|
|
chFiltered := eb.SubscribeWithFilter("orders", filter)
|
|
chUnfiltered := eb.Subscribe("orders")
|
|
|
|
events := []*Event{
|
|
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"},
|
|
{ID: "evt-2", EventType: "OrderShipped", ActorID: "order-2"},
|
|
}
|
|
|
|
for _, e := range events {
|
|
eb.Publish("orders", e)
|
|
}
|
|
|
|
// Filtered subscriber should only receive evt-1
|
|
select {
|
|
case evt := <-chFiltered:
|
|
if evt.ID != "evt-1" {
|
|
t.Errorf("filtered: expected evt-1, got %s", evt.ID)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Fatal("filtered subscriber timed out")
|
|
}
|
|
|
|
// Unfiltered subscriber should receive both
|
|
received := make(map[string]bool)
|
|
timeout := time.After(100 * time.Millisecond)
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case evt := <-chUnfiltered:
|
|
received[evt.ID] = true
|
|
case <-timeout:
|
|
t.Fatalf("unfiltered timed out after %d events", len(received))
|
|
}
|
|
}
|
|
|
|
if !received["evt-1"] || !received["evt-2"] {
|
|
t.Errorf("unfiltered expected both events, got %v", received)
|
|
}
|
|
}
|
|
|
|
func TestEventBus_SubscribeWithFilter_WildcardGreaterWithFilter(t *testing.T) {
|
|
eb := NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Use > wildcard (matches one or more tokens) with filter
|
|
filter := &SubscriptionFilter{
|
|
ActorPattern: "order-*",
|
|
}
|
|
ch := eb.SubscribeWithFilter(">", filter)
|
|
|
|
events := []*Event{
|
|
{ID: "evt-1", EventType: "Test", ActorID: "order-123"},
|
|
{ID: "evt-2", EventType: "Test", ActorID: "user-456"},
|
|
{ID: "evt-3", EventType: "Test", ActorID: "order-789"},
|
|
}
|
|
|
|
// Publish to different namespaces
|
|
eb.Publish("tenant-a", events[0])
|
|
eb.Publish("tenant-b", events[1])
|
|
eb.Publish("prod.orders", events[2])
|
|
|
|
// Should receive evt-1 and evt-3, but not evt-2
|
|
received := make(map[string]bool)
|
|
timeout := time.After(100 * time.Millisecond)
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case evt := <-ch:
|
|
received[evt.ID] = true
|
|
case <-timeout:
|
|
t.Fatalf("timed out after %d events", len(received))
|
|
}
|
|
}
|
|
|
|
if !received["evt-1"] || !received["evt-3"] {
|
|
t.Errorf("expected evt-1 and evt-3, got %v", received)
|
|
}
|
|
|
|
// Verify no evt-2
|
|
select {
|
|
case evt := <-ch:
|
|
t.Errorf("unexpected event: %s", evt.ID)
|
|
case <-time.After(50 * time.Millisecond):
|
|
// Expected
|
|
}
|
|
}
|