[Issue #20] Add wildcard namespace subscriptions #52
129
eventbus.go
129
eventbus.go
@@ -5,18 +5,43 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// EventBroadcaster defines the interface for publishing and subscribing to events
|
// EventBroadcaster defines the interface for publishing and subscribing to events.
|
||||||
|
//
|
||||||
|
// Subscribe accepts namespace patterns following NATS subject matching conventions:
|
||||||
|
// - Exact match: "tenant-a" matches only "tenant-a"
|
||||||
|
// - Single wildcard: "*" matches any single token, "tenant-*" matches "tenant-a", "tenant-b"
|
||||||
|
// - Multi-token wildcard: ">" matches one or more tokens (only at end of pattern)
|
||||||
|
//
|
||||||
|
// Security Warning: Wildcard subscriptions bypass namespace isolation.
|
||||||
|
// Only grant wildcard access to trusted system components.
|
||||||
type EventBroadcaster interface {
|
type EventBroadcaster interface {
|
||||||
Subscribe(namespaceID string) <-chan *Event
|
// Subscribe creates a channel that receives events matching the namespace pattern.
|
||||||
Unsubscribe(namespaceID string, ch <-chan *Event)
|
// Pattern syntax follows NATS conventions: "*" matches single token, ">" matches multiple.
|
||||||
|
Subscribe(namespacePattern string) <-chan *Event
|
||||||
|
Unsubscribe(namespacePattern string, ch <-chan *Event)
|
||||||
Publish(namespaceID string, event *Event)
|
Publish(namespaceID string, event *Event)
|
||||||
Stop()
|
Stop()
|
||||||
SubscriberCount(namespaceID string) int
|
SubscriberCount(namespaceID string) int
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventBus broadcasts events to multiple subscribers within a namespace
|
// subscription represents a single subscriber channel with its pattern
|
||||||
|
type subscription struct {
|
||||||
|
pattern string
|
||||||
|
ch chan *Event
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventBus broadcasts events to multiple subscribers within a namespace.
|
||||||
|
// Supports wildcard patterns for cross-namespace subscriptions.
|
||||||
|
//
|
||||||
|
// Security Considerations:
|
||||||
|
// Wildcard subscriptions (using "*" or ">") receive events from multiple namespaces.
|
||||||
|
// This is intentional for cross-cutting concerns like logging, monitoring, and auditing.
|
||||||
|
// However, it bypasses namespace isolation - use with appropriate access controls.
|
||||||
type EventBus struct {
|
type EventBus struct {
|
||||||
subscribers map[string][]chan *Event // namespaceID -> channels
|
// exactSubscribers holds subscribers for exact namespace matches (no wildcards)
|
||||||
|
exactSubscribers map[string][]chan *Event
|
||||||
|
// wildcardSubscribers holds subscribers with wildcard patterns
|
||||||
|
wildcardSubscribers []subscription
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
@@ -26,51 +51,85 @@ type EventBus struct {
|
|||||||
func NewEventBus() *EventBus {
|
func NewEventBus() *EventBus {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
return &EventBus{
|
return &EventBus{
|
||||||
subscribers: make(map[string][]chan *Event),
|
exactSubscribers: make(map[string][]chan *Event),
|
||||||
|
wildcardSubscribers: make([]subscription, 0),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe creates a new subscription channel for a namespace
|
// Subscribe creates a new subscription channel for a namespace pattern.
|
||||||
func (eb *EventBus) Subscribe(namespaceID string) <-chan *Event {
|
// Patterns follow NATS subject matching conventions:
|
||||||
|
// - "*" matches a single token (any sequence without ".")
|
||||||
|
// - ">" matches one or more tokens (only valid at the end)
|
||||||
|
// - Exact strings match exactly
|
||||||
|
//
|
||||||
|
// Security Warning: Wildcard patterns receive events from all matching namespaces,
|
||||||
|
// bypassing namespace isolation. Only use for trusted system components.
|
||||||
|
func (eb *EventBus) Subscribe(namespacePattern string) <-chan *Event {
|
||||||
eb.mutex.Lock()
|
eb.mutex.Lock()
|
||||||
defer eb.mutex.Unlock()
|
defer eb.mutex.Unlock()
|
||||||
|
|
||||||
// Create buffered channel to prevent blocking publishers
|
// Create buffered channel to prevent blocking publishers
|
||||||
ch := make(chan *Event, 100)
|
ch := make(chan *Event, 100)
|
||||||
eb.subscribers[namespaceID] = append(eb.subscribers[namespaceID], ch)
|
|
||||||
|
if IsWildcardPattern(namespacePattern) {
|
||||||
|
// Store wildcard subscription separately
|
||||||
|
eb.wildcardSubscribers = append(eb.wildcardSubscribers, subscription{
|
||||||
|
pattern: namespacePattern,
|
||||||
|
ch: ch,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
// Exact match subscription
|
||||||
|
eb.exactSubscribers[namespacePattern] = append(eb.exactSubscribers[namespacePattern], ch)
|
||||||
|
}
|
||||||
|
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe removes a subscription channel
|
// Unsubscribe removes a subscription channel
|
||||||
func (eb *EventBus) Unsubscribe(namespaceID string, ch <-chan *Event) {
|
func (eb *EventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
|
||||||
eb.mutex.Lock()
|
eb.mutex.Lock()
|
||||||
defer eb.mutex.Unlock()
|
defer eb.mutex.Unlock()
|
||||||
|
|
||||||
subs := eb.subscribers[namespaceID]
|
if IsWildcardPattern(namespacePattern) {
|
||||||
|
// Remove from wildcard subscribers
|
||||||
|
for i, sub := range eb.wildcardSubscribers {
|
||||||
|
if sub.ch == ch {
|
||||||
|
eb.wildcardSubscribers = append(eb.wildcardSubscribers[:i], eb.wildcardSubscribers[i+1:]...)
|
||||||
|
close(sub.ch)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Remove from exact subscribers
|
||||||
|
subs := eb.exactSubscribers[namespacePattern]
|
||||||
for i, subscriber := range subs {
|
for i, subscriber := range subs {
|
||||||
if subscriber == ch {
|
if subscriber == ch {
|
||||||
// Remove channel from slice
|
// Remove channel from slice
|
||||||
eb.subscribers[namespaceID] = append(subs[:i], subs[i+1:]...)
|
eb.exactSubscribers[namespacePattern] = append(subs[:i], subs[i+1:]...)
|
||||||
close(subscriber)
|
close(subscriber)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up empty namespace entries
|
// Clean up empty namespace entries
|
||||||
if len(eb.subscribers[namespaceID]) == 0 {
|
if len(eb.exactSubscribers[namespacePattern]) == 0 {
|
||||||
delete(eb.subscribers, namespaceID)
|
delete(eb.exactSubscribers, namespacePattern)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish sends an event to all subscribers of a namespace
|
// Publish sends an event to all subscribers of a namespace.
|
||||||
|
// Events are delivered to:
|
||||||
|
// - All exact subscribers for the namespace
|
||||||
|
// - All wildcard subscribers whose pattern matches the namespace
|
||||||
func (eb *EventBus) Publish(namespaceID string, event *Event) {
|
func (eb *EventBus) Publish(namespaceID string, event *Event) {
|
||||||
eb.mutex.RLock()
|
eb.mutex.RLock()
|
||||||
defer eb.mutex.RUnlock()
|
defer eb.mutex.RUnlock()
|
||||||
|
|
||||||
subscribers := eb.subscribers[namespaceID]
|
// Deliver to exact subscribers
|
||||||
|
subscribers := eb.exactSubscribers[namespaceID]
|
||||||
for _, ch := range subscribers {
|
for _, ch := range subscribers {
|
||||||
select {
|
select {
|
||||||
case ch <- event:
|
case ch <- event:
|
||||||
@@ -79,6 +138,18 @@ func (eb *EventBus) Publish(namespaceID string, event *Event) {
|
|||||||
// Channel full, skip this subscriber (non-blocking)
|
// Channel full, skip this subscriber (non-blocking)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deliver to matching wildcard subscribers
|
||||||
|
for _, sub := range eb.wildcardSubscribers {
|
||||||
|
if MatchNamespacePattern(sub.pattern, namespaceID) {
|
||||||
|
select {
|
||||||
|
case sub.ch <- event:
|
||||||
|
// Event delivered
|
||||||
|
default:
|
||||||
|
// Channel full, skip this subscriber (non-blocking)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop closes the event bus
|
// Stop closes the event bus
|
||||||
@@ -88,19 +159,35 @@ func (eb *EventBus) Stop() {
|
|||||||
|
|
||||||
eb.cancel()
|
eb.cancel()
|
||||||
|
|
||||||
// Close all subscriber channels
|
// Close all exact subscriber channels
|
||||||
for _, subs := range eb.subscribers {
|
for _, subs := range eb.exactSubscribers {
|
||||||
for _, ch := range subs {
|
for _, ch := range subs {
|
||||||
close(ch)
|
close(ch)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
eb.subscribers = make(map[string][]chan *Event)
|
// Close all wildcard subscriber channels
|
||||||
|
for _, sub := range eb.wildcardSubscribers {
|
||||||
|
close(sub.ch)
|
||||||
|
}
|
||||||
|
|
||||||
|
eb.exactSubscribers = make(map[string][]chan *Event)
|
||||||
|
eb.wildcardSubscribers = make([]subscription, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscriberCount returns the number of subscribers for a namespace
|
// SubscriberCount returns the number of subscribers for a namespace.
|
||||||
|
// This counts only exact match subscribers, not wildcard subscribers that may match.
|
||||||
func (eb *EventBus) SubscriberCount(namespaceID string) int {
|
func (eb *EventBus) SubscriberCount(namespaceID string) int {
|
||||||
eb.mutex.RLock()
|
eb.mutex.RLock()
|
||||||
defer eb.mutex.RUnlock()
|
defer eb.mutex.RUnlock()
|
||||||
return len(eb.subscribers[namespaceID])
|
return len(eb.exactSubscribers[namespaceID])
|
||||||
|
}
|
||||||
|
|
||||||
|
// WildcardSubscriberCount returns the number of wildcard subscribers.
|
||||||
|
// These are subscribers using "*" or ">" patterns that may receive events
|
||||||
|
// from multiple namespaces.
|
||||||
|
func (eb *EventBus) WildcardSubscriberCount() int {
|
||||||
|
eb.mutex.RLock()
|
||||||
|
defer eb.mutex.RUnlock()
|
||||||
|
return len(eb.wildcardSubscribers)
|
||||||
}
|
}
|
||||||
|
|||||||
416
eventbus_test.go
Normal file
416
eventbus_test.go
Normal file
@@ -0,0 +1,416 @@
|
|||||||
|
package aether
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEventBus_ExactSubscription(t *testing.T) {
|
||||||
|
eb := NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
ch := eb.Subscribe("tenant-a")
|
||||||
|
|
||||||
|
event := &Event{
|
||||||
|
ID: "evt-1",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: "actor-1",
|
||||||
|
}
|
||||||
|
|
||||||
|
eb.Publish("tenant-a", event)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case received := <-ch:
|
||||||
|
if received.ID != event.ID {
|
||||||
|
t.Errorf("expected event ID %s, got %s", event.ID, received.ID)
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("timed out waiting for event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_WildcardStarSubscription(t *testing.T) {
|
||||||
|
eb := NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
// Subscribe to all single-token namespaces
|
||||||
|
ch := eb.Subscribe("*")
|
||||||
|
|
||||||
|
event := &Event{
|
||||||
|
ID: "evt-1",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: "actor-1",
|
||||||
|
}
|
||||||
|
|
||||||
|
eb.Publish("tenant-a", event)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case received := <-ch:
|
||||||
|
if received.ID != event.ID {
|
||||||
|
t.Errorf("expected event ID %s, got %s", event.ID, received.ID)
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("timed out waiting for event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_WildcardGreaterSubscription(t *testing.T) {
|
||||||
|
eb := NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
// Subscribe to all namespaces
|
||||||
|
ch := eb.Subscribe(">")
|
||||||
|
|
||||||
|
events := []*Event{
|
||||||
|
{ID: "evt-1", EventType: "Test1", ActorID: "actor-1"},
|
||||||
|
{ID: "evt-2", EventType: "Test2", ActorID: "actor-2"},
|
||||||
|
{ID: "evt-3", EventType: "Test3", ActorID: "actor-3"},
|
||||||
|
}
|
||||||
|
|
||||||
|
namespaces := []string{"tenant-a", "tenant-b", "prod.tenant.orders"}
|
||||||
|
|
||||||
|
for i, ns := range namespaces {
|
||||||
|
eb.Publish(ns, events[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
received := make(map[string]bool)
|
||||||
|
timeout := time.After(100 * time.Millisecond)
|
||||||
|
|
||||||
|
for i := 0; i < len(events); i++ {
|
||||||
|
select {
|
||||||
|
case evt := <-ch:
|
||||||
|
received[evt.ID] = true
|
||||||
|
case <-timeout:
|
||||||
|
t.Fatalf("timed out after receiving %d of %d events", i, len(events))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, evt := range events {
|
||||||
|
if !received[evt.ID] {
|
||||||
|
t.Errorf("did not receive event %s", evt.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_PrefixWildcard(t *testing.T) {
|
||||||
|
eb := NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
// Subscribe to prod.*
|
||||||
|
ch := eb.Subscribe("prod.*")
|
||||||
|
|
||||||
|
event1 := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"}
|
||||||
|
event2 := &Event{ID: "evt-2", EventType: "Test", ActorID: "actor-2"}
|
||||||
|
event3 := &Event{ID: "evt-3", EventType: "Test", ActorID: "actor-3"}
|
||||||
|
|
||||||
|
// Should match
|
||||||
|
eb.Publish("prod.tenant", event1)
|
||||||
|
eb.Publish("prod.orders", event2)
|
||||||
|
// Should not match (different prefix)
|
||||||
|
eb.Publish("staging.tenant", event3)
|
||||||
|
|
||||||
|
received := make(map[string]bool)
|
||||||
|
timeout := time.After(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// Should receive exactly 2 events
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
select {
|
||||||
|
case evt := <-ch:
|
||||||
|
received[evt.ID] = true
|
||||||
|
case <-timeout:
|
||||||
|
t.Fatalf("timed out after receiving %d events", len(received))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we got the right ones
|
||||||
|
if !received["evt-1"] || !received["evt-2"] {
|
||||||
|
t.Errorf("expected evt-1 and evt-2, got %v", received)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify no third event arrives
|
||||||
|
select {
|
||||||
|
case evt := <-ch:
|
||||||
|
t.Errorf("unexpected event received: %s", evt.ID)
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
// Expected - no more events
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_MultipleWildcardSubscribers(t *testing.T) {
|
||||||
|
eb := NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
ch1 := eb.Subscribe("prod.*")
|
||||||
|
ch2 := eb.Subscribe("prod.>")
|
||||||
|
ch3 := eb.Subscribe(">")
|
||||||
|
|
||||||
|
event := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"}
|
||||||
|
|
||||||
|
eb.Publish("prod.tenant.orders", event)
|
||||||
|
|
||||||
|
// ch1 (prod.*) should NOT receive - doesn't match 3 tokens
|
||||||
|
select {
|
||||||
|
case <-ch1:
|
||||||
|
t.Error("prod.* should not match prod.tenant.orders")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
// Expected
|
||||||
|
}
|
||||||
|
|
||||||
|
// ch2 (prod.>) should receive
|
||||||
|
select {
|
||||||
|
case received := <-ch2:
|
||||||
|
if received.ID != event.ID {
|
||||||
|
t.Errorf("expected %s, got %s", event.ID, received.ID)
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Error("prod.> should match prod.tenant.orders")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ch3 (>) should receive
|
||||||
|
select {
|
||||||
|
case received := <-ch3:
|
||||||
|
if received.ID != event.ID {
|
||||||
|
t.Errorf("expected %s, got %s", event.ID, received.ID)
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Error("> should match prod.tenant.orders")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_ExactAndWildcardCoexist(t *testing.T) {
|
||||||
|
eb := NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
chExact := eb.Subscribe("tenant-a")
|
||||||
|
chWildcard := eb.Subscribe("*")
|
||||||
|
|
||||||
|
event := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"}
|
||||||
|
|
||||||
|
eb.Publish("tenant-a", event)
|
||||||
|
|
||||||
|
// Both should receive the event
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(2)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
select {
|
||||||
|
case received := <-chExact:
|
||||||
|
if received.ID != event.ID {
|
||||||
|
t.Errorf("exact: expected %s, got %s", event.ID, received.ID)
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Error("exact subscriber timed out")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
select {
|
||||||
|
case received := <-chWildcard:
|
||||||
|
if received.ID != event.ID {
|
||||||
|
t.Errorf("wildcard: expected %s, got %s", event.ID, received.ID)
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Error("wildcard subscriber timed out")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_WildcardUnsubscribe(t *testing.T) {
|
||||||
|
eb := NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
ch := eb.Subscribe("prod.*")
|
||||||
|
|
||||||
|
// Verify it's counted
|
||||||
|
if eb.WildcardSubscriberCount() != 1 {
|
||||||
|
t.Errorf("expected 1 wildcard subscriber, got %d", eb.WildcardSubscriberCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
eb.Unsubscribe("prod.*", ch)
|
||||||
|
|
||||||
|
// Verify it's removed
|
||||||
|
if eb.WildcardSubscriberCount() != 0 {
|
||||||
|
t.Errorf("expected 0 wildcard subscribers, got %d", eb.WildcardSubscriberCount())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_SubscriberCount(t *testing.T) {
|
||||||
|
eb := NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
// Add exact subscribers
|
||||||
|
ch1 := eb.Subscribe("tenant-a")
|
||||||
|
ch2 := eb.Subscribe("tenant-a")
|
||||||
|
|
||||||
|
if eb.SubscriberCount("tenant-a") != 2 {
|
||||||
|
t.Errorf("expected 2 exact subscribers, got %d", eb.SubscriberCount("tenant-a"))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add wildcard subscriber - should not affect exact count
|
||||||
|
eb.Subscribe("*")
|
||||||
|
|
||||||
|
if eb.SubscriberCount("tenant-a") != 2 {
|
||||||
|
t.Errorf("expected 2 exact subscribers after wildcard add, got %d", eb.SubscriberCount("tenant-a"))
|
||||||
|
}
|
||||||
|
if eb.WildcardSubscriberCount() != 1 {
|
||||||
|
t.Errorf("expected 1 wildcard subscriber, got %d", eb.WildcardSubscriberCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsubscribe exact
|
||||||
|
eb.Unsubscribe("tenant-a", ch1)
|
||||||
|
if eb.SubscriberCount("tenant-a") != 1 {
|
||||||
|
t.Errorf("expected 1 exact subscriber after unsubscribe, got %d", eb.SubscriberCount("tenant-a"))
|
||||||
|
}
|
||||||
|
|
||||||
|
eb.Unsubscribe("tenant-a", ch2)
|
||||||
|
if eb.SubscriberCount("tenant-a") != 0 {
|
||||||
|
t.Errorf("expected 0 exact subscribers after unsubscribe, got %d", eb.SubscriberCount("tenant-a"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_StopClosesAllChannels(t *testing.T) {
|
||||||
|
eb := NewEventBus()
|
||||||
|
|
||||||
|
chExact := eb.Subscribe("tenant-a")
|
||||||
|
chWildcard := eb.Subscribe("*")
|
||||||
|
|
||||||
|
eb.Stop()
|
||||||
|
|
||||||
|
// Both channels should be closed
|
||||||
|
select {
|
||||||
|
case _, ok := <-chExact:
|
||||||
|
if ok {
|
||||||
|
t.Error("expected exact channel to be closed")
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Error("timed out waiting for exact channel close")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case _, ok := <-chWildcard:
|
||||||
|
if ok {
|
||||||
|
t.Error("expected wildcard channel to be closed")
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Error("timed out waiting for wildcard channel close")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_NamespaceIsolation(t *testing.T) {
|
||||||
|
eb := NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
chA := eb.Subscribe("tenant-a")
|
||||||
|
chB := eb.Subscribe("tenant-b")
|
||||||
|
|
||||||
|
eventA := &Event{ID: "evt-a", EventType: "Test", ActorID: "actor-1"}
|
||||||
|
eventB := &Event{ID: "evt-b", EventType: "Test", ActorID: "actor-2"}
|
||||||
|
|
||||||
|
eb.Publish("tenant-a", eventA)
|
||||||
|
eb.Publish("tenant-b", eventB)
|
||||||
|
|
||||||
|
// Verify tenant-a receives only its event
|
||||||
|
select {
|
||||||
|
case received := <-chA:
|
||||||
|
if received.ID != "evt-a" {
|
||||||
|
t.Errorf("tenant-a received wrong event: %s", received.ID)
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Error("tenant-a timed out")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-chA:
|
||||||
|
t.Error("tenant-a received extra event")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
// Expected
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify tenant-b receives only its event
|
||||||
|
select {
|
||||||
|
case received := <-chB:
|
||||||
|
if received.ID != "evt-b" {
|
||||||
|
t.Errorf("tenant-b received wrong event: %s", received.ID)
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Error("tenant-b timed out")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-chB:
|
||||||
|
t.Error("tenant-b received extra event")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
// Expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_NonBlockingPublish(t *testing.T) {
|
||||||
|
eb := NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
// Create subscriber but don't read from channel
|
||||||
|
_ = eb.Subscribe("tenant-a")
|
||||||
|
|
||||||
|
// Fill the channel buffer (100 events)
|
||||||
|
for i := 0; i < 150; i++ {
|
||||||
|
event := &Event{
|
||||||
|
ID: "evt",
|
||||||
|
EventType: "Test",
|
||||||
|
ActorID: "actor-1",
|
||||||
|
}
|
||||||
|
// Should not block even when channel is full
|
||||||
|
eb.Publish("tenant-a", event)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we got here without blocking, test passes
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_ConcurrentOperations(t *testing.T) {
|
||||||
|
eb := NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Concurrent subscriptions
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(n int) {
|
||||||
|
defer wg.Done()
|
||||||
|
ch := eb.Subscribe("tenant-a")
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
eb.Unsubscribe("tenant-a", ch)
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Concurrent wildcard subscriptions
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(n int) {
|
||||||
|
defer wg.Done()
|
||||||
|
ch := eb.Subscribe("*")
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
eb.Unsubscribe("*", ch)
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Concurrent publishes
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(n int) {
|
||||||
|
defer wg.Done()
|
||||||
|
event := &Event{
|
||||||
|
ID: "evt",
|
||||||
|
EventType: "Test",
|
||||||
|
ActorID: "actor-1",
|
||||||
|
}
|
||||||
|
eb.Publish("tenant-a", event)
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
@@ -11,12 +11,18 @@ import (
|
|||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS
|
// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS.
|
||||||
|
// Supports wildcard patterns for cross-namespace subscriptions using NATS native wildcards.
|
||||||
|
//
|
||||||
|
// Security Considerations:
|
||||||
|
// Wildcard subscriptions (using "*" or ">") receive events from multiple namespaces.
|
||||||
|
// This bypasses namespace isolation at the NATS level. Ensure proper access controls
|
||||||
|
// are in place at the application layer before granting wildcard subscription access.
|
||||||
type NATSEventBus struct {
|
type NATSEventBus struct {
|
||||||
*EventBus // Embed base EventBus for local subscriptions
|
*EventBus // Embed base EventBus for local subscriptions
|
||||||
nc *nats.Conn // NATS connection
|
nc *nats.Conn // NATS connection
|
||||||
subscriptions []*nats.Subscription
|
subscriptions []*nats.Subscription
|
||||||
namespaceSubscribers map[string]int // Track number of subscribers per namespace
|
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
|
||||||
nodeID string // Unique ID for this node
|
nodeID string // Unique ID for this node
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
@@ -39,7 +45,7 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
|
|||||||
nc: nc,
|
nc: nc,
|
||||||
nodeID: uuid.New().String(),
|
nodeID: uuid.New().String(),
|
||||||
subscriptions: make([]*nats.Subscription, 0),
|
subscriptions: make([]*nats.Subscription, 0),
|
||||||
namespaceSubscribers: make(map[string]int),
|
patternSubscribers: make(map[string]int),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
@@ -47,57 +53,68 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
|
|||||||
return neb, nil
|
return neb, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe creates a local subscription and ensures NATS subscription exists for the namespace
|
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
|
||||||
func (neb *NATSEventBus) Subscribe(namespaceID string) <-chan *Event {
|
// Supports NATS subject patterns:
|
||||||
|
// - "*" matches a single token
|
||||||
|
// - ">" matches one or more tokens (only at the end)
|
||||||
|
//
|
||||||
|
// Security Warning: Wildcard patterns receive events from all matching namespaces,
|
||||||
|
// bypassing namespace isolation. Only use for trusted system components.
|
||||||
|
func (neb *NATSEventBus) Subscribe(namespacePattern string) <-chan *Event {
|
||||||
neb.mutex.Lock()
|
neb.mutex.Lock()
|
||||||
defer neb.mutex.Unlock()
|
defer neb.mutex.Unlock()
|
||||||
|
|
||||||
// Create local subscription first
|
// Create local subscription first
|
||||||
ch := neb.EventBus.Subscribe(namespaceID)
|
ch := neb.EventBus.Subscribe(namespacePattern)
|
||||||
|
|
||||||
// Check if this is the first subscriber for this namespace
|
// Check if this is the first subscriber for this pattern
|
||||||
count := neb.namespaceSubscribers[namespaceID]
|
count := neb.patternSubscribers[namespacePattern]
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
// First subscriber - create NATS subscription
|
// First subscriber - create NATS subscription
|
||||||
subject := fmt.Sprintf("aether.events.%s", namespaceID)
|
// NATS natively supports wildcards, so we can use the pattern directly
|
||||||
|
subject := fmt.Sprintf("aether.events.%s", namespacePattern)
|
||||||
|
|
||||||
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
|
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
|
||||||
neb.handleNATSEvent(msg)
|
neb.handleNATSEvent(msg, namespacePattern)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err)
|
log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err)
|
||||||
} else {
|
} else {
|
||||||
neb.subscriptions = append(neb.subscriptions, sub)
|
neb.subscriptions = append(neb.subscriptions, sub)
|
||||||
|
if IsWildcardPattern(namespacePattern) {
|
||||||
|
log.Printf("[NATSEventBus] Node %s subscribed to wildcard pattern %s", neb.nodeID, subject)
|
||||||
|
} else {
|
||||||
log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject)
|
log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
neb.namespaceSubscribers[namespaceID] = count + 1
|
neb.patternSubscribers[namespacePattern] = count + 1
|
||||||
|
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe removes a local subscription and cleans up NATS subscription if no more subscribers
|
// Unsubscribe removes a local subscription and cleans up NATS subscription if no more subscribers
|
||||||
func (neb *NATSEventBus) Unsubscribe(namespaceID string, ch <-chan *Event) {
|
func (neb *NATSEventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
|
||||||
neb.mutex.Lock()
|
neb.mutex.Lock()
|
||||||
defer neb.mutex.Unlock()
|
defer neb.mutex.Unlock()
|
||||||
|
|
||||||
neb.EventBus.Unsubscribe(namespaceID, ch)
|
neb.EventBus.Unsubscribe(namespacePattern, ch)
|
||||||
|
|
||||||
count := neb.namespaceSubscribers[namespaceID]
|
count := neb.patternSubscribers[namespacePattern]
|
||||||
if count > 0 {
|
if count > 0 {
|
||||||
count--
|
count--
|
||||||
neb.namespaceSubscribers[namespaceID] = count
|
neb.patternSubscribers[namespacePattern] = count
|
||||||
|
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
delete(neb.namespaceSubscribers, namespaceID)
|
delete(neb.patternSubscribers, namespacePattern)
|
||||||
log.Printf("[NATSEventBus] No more subscribers for namespace %s on node %s", namespaceID, neb.nodeID)
|
log.Printf("[NATSEventBus] No more subscribers for pattern %s on node %s", namespacePattern, neb.nodeID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleNATSEvent processes events received from NATS
|
// handleNATSEvent processes events received from NATS
|
||||||
func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg) {
|
func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg, subscribedPattern string) {
|
||||||
var eventMsg eventMessage
|
var eventMsg eventMessage
|
||||||
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
|
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
|
||||||
log.Printf("[NATSEventBus] Failed to unmarshal event: %v", err)
|
log.Printf("[NATSEventBus] Failed to unmarshal event: %v", err)
|
||||||
@@ -109,8 +126,33 @@ func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Forward to local EventBus subscribers
|
// For wildcard subscriptions, we need to deliver to the EventBus using
|
||||||
|
// the subscribed pattern so it reaches the correct wildcard subscriber.
|
||||||
|
// For exact subscriptions, use the actual namespace.
|
||||||
|
if IsWildcardPattern(subscribedPattern) {
|
||||||
|
// Deliver using the pattern - the EventBus will route to wildcard subscribers
|
||||||
|
neb.deliverToWildcardSubscribers(subscribedPattern, eventMsg.Event)
|
||||||
|
} else {
|
||||||
|
// Forward to local EventBus subscribers with actual namespace
|
||||||
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
|
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// deliverToWildcardSubscribers delivers an event to subscribers of a specific wildcard pattern
|
||||||
|
func (neb *NATSEventBus) deliverToWildcardSubscribers(pattern string, event *Event) {
|
||||||
|
neb.EventBus.mutex.RLock()
|
||||||
|
defer neb.EventBus.mutex.RUnlock()
|
||||||
|
|
||||||
|
for _, sub := range neb.EventBus.wildcardSubscribers {
|
||||||
|
if sub.pattern == pattern {
|
||||||
|
select {
|
||||||
|
case sub.ch <- event:
|
||||||
|
// Event delivered
|
||||||
|
default:
|
||||||
|
// Channel full, skip this subscriber (non-blocking)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish publishes an event both locally and to NATS for cross-node broadcasting
|
// Publish publishes an event both locally and to NATS for cross-node broadcasting
|
||||||
|
|||||||
83
pattern.go
Normal file
83
pattern.go
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
package aether
|
||||||
|
|
||||||
|
import "strings"
|
||||||
|
|
||||||
|
// MatchNamespacePattern checks if a namespace matches a pattern.
|
||||||
|
// Patterns follow NATS subject matching conventions where tokens are separated by dots:
|
||||||
|
// - "*" matches exactly one token (any sequence without ".")
|
||||||
|
// - ">" matches one or more tokens (only valid at the end of a pattern)
|
||||||
|
// - Exact strings match exactly
|
||||||
|
//
|
||||||
|
// Examples:
|
||||||
|
// - "tenant-a" matches "tenant-a" (exact match)
|
||||||
|
// - "*" matches any single-token namespace like "tenant-a" or "production"
|
||||||
|
// - ">" matches any namespace with one or more tokens
|
||||||
|
// - "prod.*" matches "prod.tenant", "prod.orders" (but not "prod.tenant.orders")
|
||||||
|
// - "prod.>" matches "prod.tenant", "prod.tenant.orders", "prod.a.b.c"
|
||||||
|
// - "*.tenant.*" matches "prod.tenant.orders", "staging.tenant.events"
|
||||||
|
//
|
||||||
|
// Security Considerations:
|
||||||
|
// Wildcard subscriptions provide cross-namespace visibility. Use with caution:
|
||||||
|
// - "*" or ">" patterns receive events from ALL matching namespaces
|
||||||
|
// - This bypasses namespace isolation for the subscriber
|
||||||
|
// - Only grant wildcard subscription access to trusted system components
|
||||||
|
// - Consider auditing wildcard subscription usage
|
||||||
|
// - For multi-tenant systems, wildcard access should be restricted to admin/ops
|
||||||
|
// - Use the most specific pattern possible to minimize exposure
|
||||||
|
func MatchNamespacePattern(pattern, namespace string) bool {
|
||||||
|
// Empty pattern matches nothing
|
||||||
|
if pattern == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// ">" matches everything when used alone
|
||||||
|
if pattern == ">" {
|
||||||
|
return namespace != ""
|
||||||
|
}
|
||||||
|
|
||||||
|
patternTokens := strings.Split(pattern, ".")
|
||||||
|
namespaceTokens := strings.Split(namespace, ".")
|
||||||
|
|
||||||
|
return matchTokens(patternTokens, namespaceTokens)
|
||||||
|
}
|
||||||
|
|
||||||
|
// matchTokens recursively matches pattern tokens against namespace tokens
|
||||||
|
func matchTokens(patternTokens, namespaceTokens []string) bool {
|
||||||
|
// If pattern is exhausted, namespace must also be exhausted
|
||||||
|
if len(patternTokens) == 0 {
|
||||||
|
return len(namespaceTokens) == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
patternToken := patternTokens[0]
|
||||||
|
|
||||||
|
// ">" matches one or more remaining tokens (must be last pattern token)
|
||||||
|
if patternToken == ">" {
|
||||||
|
// ">" requires at least one token to match
|
||||||
|
return len(namespaceTokens) >= 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// If namespace is exhausted but pattern has more tokens, no match
|
||||||
|
if len(namespaceTokens) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
namespaceToken := namespaceTokens[0]
|
||||||
|
|
||||||
|
// "*" matches exactly one token
|
||||||
|
if patternToken == "*" {
|
||||||
|
return matchTokens(patternTokens[1:], namespaceTokens[1:])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exact match required
|
||||||
|
if patternToken == namespaceToken {
|
||||||
|
return matchTokens(patternTokens[1:], namespaceTokens[1:])
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsWildcardPattern returns true if the pattern contains wildcards (* or >).
|
||||||
|
// Wildcard patterns can match multiple namespaces and bypass namespace isolation.
|
||||||
|
func IsWildcardPattern(pattern string) bool {
|
||||||
|
return strings.Contains(pattern, "*") || strings.Contains(pattern, ">")
|
||||||
|
}
|
||||||
117
pattern_test.go
Normal file
117
pattern_test.go
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
package aether
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestMatchNamespacePattern(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
pattern string
|
||||||
|
namespace string
|
||||||
|
expected bool
|
||||||
|
}{
|
||||||
|
// Exact matches
|
||||||
|
{"exact match", "tenant-a", "tenant-a", true},
|
||||||
|
{"exact mismatch", "tenant-a", "tenant-b", false},
|
||||||
|
{"exact match with dots", "prod.tenant.a", "prod.tenant.a", true},
|
||||||
|
{"exact mismatch with dots", "prod.tenant.a", "prod.tenant.b", false},
|
||||||
|
|
||||||
|
// Empty cases
|
||||||
|
{"empty pattern", "", "tenant-a", false},
|
||||||
|
{"empty namespace exact", "tenant-a", "", false},
|
||||||
|
{"empty namespace catch-all", ">", "", false},
|
||||||
|
{"both empty", "", "", false},
|
||||||
|
|
||||||
|
// Single wildcard (*) - matches one token (NATS semantics: tokens are dot-separated)
|
||||||
|
{"star matches any single token", "*", "tenant-a", true},
|
||||||
|
{"star matches any single token 2", "*", "anything", true},
|
||||||
|
{"star does not match multi-token", "*", "prod.tenant", false},
|
||||||
|
{"prefix with star", "prod.*", "prod.tenant", true},
|
||||||
|
{"prefix with star 2", "prod.*", "prod.orders", true},
|
||||||
|
{"prefix with star no match extra tokens", "prod.*", "prod.tenant.orders", false},
|
||||||
|
{"prefix with star no match wrong prefix", "prod.*", "staging.tenant", false},
|
||||||
|
{"middle wildcard", "prod.*.orders", "prod.tenant.orders", true},
|
||||||
|
{"middle wildcard no match", "prod.*.orders", "prod.tenant.events", false},
|
||||||
|
{"multiple stars", "*.tenant.*", "prod.tenant.orders", true},
|
||||||
|
{"multiple stars 2", "*.*.orders", "prod.tenant.orders", true},
|
||||||
|
{"multiple stars no match", "*.*.orders", "prod.orders", false},
|
||||||
|
|
||||||
|
// Multi-token wildcard (>) - matches one or more tokens
|
||||||
|
{"greater matches one", ">", "tenant", true},
|
||||||
|
{"greater matches multi", ">", "prod.tenant.orders", true},
|
||||||
|
{"prefix greater", "prod.>", "prod.tenant", true},
|
||||||
|
{"prefix greater multi", "prod.>", "prod.tenant.orders.items", true},
|
||||||
|
{"prefix greater no match different prefix", "prod.>", "staging.tenant", false},
|
||||||
|
{"prefix greater requires at least one", "prod.>", "prod", false},
|
||||||
|
{"deep prefix greater", "prod.tenant.>", "prod.tenant.orders", true},
|
||||||
|
|
||||||
|
// Combined wildcards
|
||||||
|
{"star then greater", "*.>", "prod.tenant", true},
|
||||||
|
{"star then greater multi", "*.>", "prod.tenant.orders", true},
|
||||||
|
{"star then greater no match single", "*.>", "prod", false},
|
||||||
|
|
||||||
|
// Edge cases
|
||||||
|
{"trailing dot in pattern", "tenant.", "tenant.", true},
|
||||||
|
{"just dots", "..", "..", true},
|
||||||
|
{"star at end", "prod.tenant.*", "prod.tenant.a", true},
|
||||||
|
{"star at end no match", "prod.tenant.*", "prod.other.a", false},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
result := MatchNamespacePattern(tt.pattern, tt.namespace)
|
||||||
|
if result != tt.expected {
|
||||||
|
t.Errorf("MatchNamespacePattern(%q, %q) = %v, want %v",
|
||||||
|
tt.pattern, tt.namespace, result, tt.expected)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIsWildcardPattern(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
pattern string
|
||||||
|
expected bool
|
||||||
|
}{
|
||||||
|
{"tenant-a", false},
|
||||||
|
{"prod.tenant.orders", false},
|
||||||
|
{"*", true},
|
||||||
|
{"prod.*", true},
|
||||||
|
{"*.orders", true},
|
||||||
|
{">", true},
|
||||||
|
{"prod.>", true},
|
||||||
|
{"*.>", true},
|
||||||
|
{"prod.*.orders", true},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.pattern, func(t *testing.T) {
|
||||||
|
result := IsWildcardPattern(tt.pattern)
|
||||||
|
if result != tt.expected {
|
||||||
|
t.Errorf("IsWildcardPattern(%q) = %v, want %v",
|
||||||
|
tt.pattern, result, tt.expected)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkMatchNamespacePattern(b *testing.B) {
|
||||||
|
benchmarks := []struct {
|
||||||
|
name string
|
||||||
|
pattern string
|
||||||
|
namespace string
|
||||||
|
}{
|
||||||
|
{"exact", "tenant-a", "tenant-a"},
|
||||||
|
{"star", "*", "tenant-a"},
|
||||||
|
{"prefix_star", "prod.*", "prod.tenant"},
|
||||||
|
{"greater", ">", "prod.tenant.orders"},
|
||||||
|
{"complex", "prod.*.>", "prod.tenant.orders.items"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, bm := range benchmarks {
|
||||||
|
b.Run(bm.name, func(b *testing.B) {
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
MatchNamespacePattern(bm.pattern, bm.namespace)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user