[Issue #19] Add namespace-scoped event stores #48

Merged
HugoNijhuis merged 1 commits from issue-19-namespace-scoped-stores into main 2026-01-10 18:03:03 +00:00
3 changed files with 192 additions and 4 deletions

View File

@@ -124,7 +124,11 @@ if errors.Is(err, aether.ErrVersionConflict) {
### Namespace Isolation ### Namespace Isolation
Namespaces provide logical boundaries for events and subscriptions: Namespaces provide logical boundaries for events and subscriptions.
#### Event Bus Namespaces
The event bus supports namespace-scoped pub/sub:
```go ```go
// Subscribe to events in a namespace // Subscribe to events in a namespace
@@ -134,6 +138,36 @@ ch := eventBus.Subscribe("tenant-abc")
eventBus.Publish("tenant-abc", event) // Only tenant-abc subscribers see this eventBus.Publish("tenant-abc", event) // Only tenant-abc subscribers see this
``` ```
#### Namespace-Scoped Event Stores
JetStreamEventStore supports optional namespace prefixes for complete storage isolation:
```go
// Create a namespaced event store (convenience function)
store, err := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-abc")
// Or configure via JetStreamConfig
config := store.JetStreamConfig{
Namespace: "tenant-abc",
StreamRetention: 30 * 24 * time.Hour,
ReplicaCount: 3,
}
store, err := store.NewJetStreamEventStoreWithConfig(natsConn, "events", config)
// The actual stream name becomes "tenant-abc_events"
// Events from one namespace cannot be read from another namespace's store
```
Namespace isolation at the storage level ensures:
- **Complete data isolation**: Events stored with one namespace prefix are invisible to stores with different namespaces
- **Backward compatibility**: Empty namespace (default) works exactly as before
- **Safe characters**: Namespace names are sanitized (spaces, dots, wildcards become underscores)
Use namespace-scoped stores when you need strong isolation guarantees at the persistence layer, such as:
- Multi-tenant deployments where tenant data must be completely separated
- Logical boundaries between different domains or bounded contexts
- Test isolation in integration tests
### Clustering ### Clustering
Aether handles node discovery, leader election, and shard distribution: Aether handles node discovery, leader election, and shard distribution:

View File

@@ -23,6 +23,11 @@ type JetStreamConfig struct {
StreamRetention time.Duration StreamRetention time.Duration
// ReplicaCount is the number of replicas for high availability (default: 1) // ReplicaCount is the number of replicas for high availability (default: 1)
ReplicaCount int ReplicaCount int
// Namespace is an optional prefix for stream names to provide storage isolation.
// When set, the actual stream name becomes "{namespace}_{streamName}".
// Events in namespaced stores are completely isolated from other namespaces.
// Leave empty for backward-compatible non-namespaced behavior.
Namespace string
} }
// DefaultJetStreamConfig returns the default configuration // DefaultJetStreamConfig returns the default configuration
@@ -48,6 +53,15 @@ func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamE
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig()) return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
} }
// NewJetStreamEventStoreWithNamespace creates a new JetStream-based event store with namespace isolation.
// The namespace is prefixed to the stream name to ensure complete isolation at the storage level.
// This is a convenience function; the same can be achieved by setting Namespace in JetStreamConfig.
func NewJetStreamEventStoreWithNamespace(natsConn *nats.Conn, streamName string, namespace string) (*JetStreamEventStore, error) {
config := DefaultJetStreamConfig()
config.Namespace = namespace
return NewJetStreamEventStoreWithConfig(natsConn, streamName, config)
}
// NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration // NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration
func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) { func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) {
js, err := natsConn.JetStream() js, err := natsConn.JetStream()
@@ -63,10 +77,16 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
config.ReplicaCount = DefaultReplicaCount config.ReplicaCount = DefaultReplicaCount
} }
// Apply namespace prefix to stream name if provided
effectiveStreamName := streamName
if config.Namespace != "" {
effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName)
}
// Create or update the stream // Create or update the stream
stream := &nats.StreamConfig{ stream := &nats.StreamConfig{
Name: streamName, Name: effectiveStreamName,
Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)}, Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)},
Storage: nats.FileStorage, Storage: nats.FileStorage,
Retention: nats.LimitsPolicy, Retention: nats.LimitsPolicy,
MaxAge: config.StreamRetention, MaxAge: config.StreamRetention,
@@ -80,12 +100,22 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
return &JetStreamEventStore{ return &JetStreamEventStore{
js: js, js: js,
streamName: streamName, streamName: effectiveStreamName,
config: config, config: config,
versions: make(map[string]int64), versions: make(map[string]int64),
}, nil }, nil
} }
// GetNamespace returns the namespace configured for this store, or empty string if not namespaced.
func (jes *JetStreamEventStore) GetNamespace() string {
return jes.config.Namespace
}
// GetStreamName returns the effective stream name (including namespace prefix if applicable).
func (jes *JetStreamEventStore) GetStreamName() string {
return jes.streamName
}
// SaveEvent persists an event to JetStream. // SaveEvent persists an event to JetStream.
// Returns VersionConflictError if the event's version is not strictly greater // Returns VersionConflictError if the event's version is not strictly greater
// than the current latest version for the actor. // than the current latest version for the actor.

124
store/namespace_test.go Normal file
View File

@@ -0,0 +1,124 @@
package store
import (
"testing"
)
func TestJetStreamConfigNamespace(t *testing.T) {
t.Run("default config has empty namespace", func(t *testing.T) {
config := DefaultJetStreamConfig()
if config.Namespace != "" {
t.Errorf("expected empty namespace in default config, got %q", config.Namespace)
}
})
t.Run("namespace can be set in config", func(t *testing.T) {
config := JetStreamConfig{
Namespace: "tenant-abc",
}
if config.Namespace != "tenant-abc" {
t.Errorf("expected namespace tenant-abc, got %q", config.Namespace)
}
})
}
func TestNamespacedStreamName(t *testing.T) {
tests := []struct {
name string
baseStreamName string
namespace string
expectedStreamName string
}{
{
name: "no namespace - stream name unchanged",
baseStreamName: "events",
namespace: "",
expectedStreamName: "events",
},
{
name: "with namespace - prefixed stream name",
baseStreamName: "events",
namespace: "tenant-abc",
expectedStreamName: "tenant-abc_events",
},
{
name: "namespace with dots - sanitized",
baseStreamName: "events",
namespace: "tenant.abc",
expectedStreamName: "tenant_abc_events",
},
{
name: "namespace with spaces - sanitized",
baseStreamName: "events",
namespace: "tenant abc",
expectedStreamName: "tenant_abc_events",
},
{
name: "namespace with special chars - sanitized",
baseStreamName: "events",
namespace: "tenant*abc>def",
expectedStreamName: "tenant_abc_def_events",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// We can't create a real JetStreamEventStore without NATS,
// but we can test the stream name logic by examining the expected format
effectiveStreamName := tt.baseStreamName
if tt.namespace != "" {
effectiveStreamName = sanitizeSubject(tt.namespace) + "_" + tt.baseStreamName
}
if effectiveStreamName != tt.expectedStreamName {
t.Errorf("expected stream name %q, got %q", tt.expectedStreamName, effectiveStreamName)
}
})
}
}
func TestSanitizeSubject(t *testing.T) {
tests := []struct {
input string
expected string
}{
{"simple", "simple"},
{"with spaces", "with_spaces"},
{"with.dots", "with_dots"},
{"with*stars", "with_stars"},
{"with>greater", "with_greater"},
{"complex.name with*special>chars", "complex_name_with_special_chars"},
}
for _, tt := range tests {
t.Run(tt.input, func(t *testing.T) {
result := sanitizeSubject(tt.input)
if result != tt.expected {
t.Errorf("sanitizeSubject(%q) = %q, want %q", tt.input, result, tt.expected)
}
})
}
}
func TestExtractActorType(t *testing.T) {
tests := []struct {
actorID string
expectedType string
}{
{"order-123", "order"},
{"user-abc-def", "user"},
{"nodelimiter", "unknown"},
{"", "unknown"},
{"-leadingdash", "unknown"},
{"a-b", "a"},
}
for _, tt := range tests {
t.Run(tt.actorID, func(t *testing.T) {
result := extractActorType(tt.actorID)
if result != tt.expectedType {
t.Errorf("extractActorType(%q) = %q, want %q", tt.actorID, result, tt.expectedType)
}
})
}
}