[Issue #19] Add namespace-scoped event stores #48
36
CLAUDE.md
36
CLAUDE.md
@@ -124,7 +124,11 @@ if errors.Is(err, aether.ErrVersionConflict) {
|
|||||||
|
|
||||||
### Namespace Isolation
|
### Namespace Isolation
|
||||||
|
|
||||||
Namespaces provide logical boundaries for events and subscriptions:
|
Namespaces provide logical boundaries for events and subscriptions.
|
||||||
|
|
||||||
|
#### Event Bus Namespaces
|
||||||
|
|
||||||
|
The event bus supports namespace-scoped pub/sub:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
// Subscribe to events in a namespace
|
// Subscribe to events in a namespace
|
||||||
@@ -134,6 +138,36 @@ ch := eventBus.Subscribe("tenant-abc")
|
|||||||
eventBus.Publish("tenant-abc", event) // Only tenant-abc subscribers see this
|
eventBus.Publish("tenant-abc", event) // Only tenant-abc subscribers see this
|
||||||
```
|
```
|
||||||
|
|
||||||
|
#### Namespace-Scoped Event Stores
|
||||||
|
|
||||||
|
JetStreamEventStore supports optional namespace prefixes for complete storage isolation:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Create a namespaced event store (convenience function)
|
||||||
|
store, err := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-abc")
|
||||||
|
|
||||||
|
// Or configure via JetStreamConfig
|
||||||
|
config := store.JetStreamConfig{
|
||||||
|
Namespace: "tenant-abc",
|
||||||
|
StreamRetention: 30 * 24 * time.Hour,
|
||||||
|
ReplicaCount: 3,
|
||||||
|
}
|
||||||
|
store, err := store.NewJetStreamEventStoreWithConfig(natsConn, "events", config)
|
||||||
|
|
||||||
|
// The actual stream name becomes "tenant-abc_events"
|
||||||
|
// Events from one namespace cannot be read from another namespace's store
|
||||||
|
```
|
||||||
|
|
||||||
|
Namespace isolation at the storage level ensures:
|
||||||
|
- **Complete data isolation**: Events stored with one namespace prefix are invisible to stores with different namespaces
|
||||||
|
- **Backward compatibility**: Empty namespace (default) works exactly as before
|
||||||
|
- **Safe characters**: Namespace names are sanitized (spaces, dots, wildcards become underscores)
|
||||||
|
|
||||||
|
Use namespace-scoped stores when you need strong isolation guarantees at the persistence layer, such as:
|
||||||
|
- Multi-tenant deployments where tenant data must be completely separated
|
||||||
|
- Logical boundaries between different domains or bounded contexts
|
||||||
|
- Test isolation in integration tests
|
||||||
|
|
||||||
### Clustering
|
### Clustering
|
||||||
|
|
||||||
Aether handles node discovery, leader election, and shard distribution:
|
Aether handles node discovery, leader election, and shard distribution:
|
||||||
|
|||||||
@@ -23,6 +23,11 @@ type JetStreamConfig struct {
|
|||||||
StreamRetention time.Duration
|
StreamRetention time.Duration
|
||||||
// ReplicaCount is the number of replicas for high availability (default: 1)
|
// ReplicaCount is the number of replicas for high availability (default: 1)
|
||||||
ReplicaCount int
|
ReplicaCount int
|
||||||
|
// Namespace is an optional prefix for stream names to provide storage isolation.
|
||||||
|
// When set, the actual stream name becomes "{namespace}_{streamName}".
|
||||||
|
// Events in namespaced stores are completely isolated from other namespaces.
|
||||||
|
// Leave empty for backward-compatible non-namespaced behavior.
|
||||||
|
Namespace string
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultJetStreamConfig returns the default configuration
|
// DefaultJetStreamConfig returns the default configuration
|
||||||
@@ -48,6 +53,15 @@ func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamE
|
|||||||
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
|
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewJetStreamEventStoreWithNamespace creates a new JetStream-based event store with namespace isolation.
|
||||||
|
// The namespace is prefixed to the stream name to ensure complete isolation at the storage level.
|
||||||
|
// This is a convenience function; the same can be achieved by setting Namespace in JetStreamConfig.
|
||||||
|
func NewJetStreamEventStoreWithNamespace(natsConn *nats.Conn, streamName string, namespace string) (*JetStreamEventStore, error) {
|
||||||
|
config := DefaultJetStreamConfig()
|
||||||
|
config.Namespace = namespace
|
||||||
|
return NewJetStreamEventStoreWithConfig(natsConn, streamName, config)
|
||||||
|
}
|
||||||
|
|
||||||
// NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration
|
// NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration
|
||||||
func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) {
|
func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) {
|
||||||
js, err := natsConn.JetStream()
|
js, err := natsConn.JetStream()
|
||||||
@@ -63,10 +77,16 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
|
|||||||
config.ReplicaCount = DefaultReplicaCount
|
config.ReplicaCount = DefaultReplicaCount
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply namespace prefix to stream name if provided
|
||||||
|
effectiveStreamName := streamName
|
||||||
|
if config.Namespace != "" {
|
||||||
|
effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName)
|
||||||
|
}
|
||||||
|
|
||||||
// Create or update the stream
|
// Create or update the stream
|
||||||
stream := &nats.StreamConfig{
|
stream := &nats.StreamConfig{
|
||||||
Name: streamName,
|
Name: effectiveStreamName,
|
||||||
Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)},
|
Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)},
|
||||||
Storage: nats.FileStorage,
|
Storage: nats.FileStorage,
|
||||||
Retention: nats.LimitsPolicy,
|
Retention: nats.LimitsPolicy,
|
||||||
MaxAge: config.StreamRetention,
|
MaxAge: config.StreamRetention,
|
||||||
@@ -80,12 +100,22 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
|
|||||||
|
|
||||||
return &JetStreamEventStore{
|
return &JetStreamEventStore{
|
||||||
js: js,
|
js: js,
|
||||||
streamName: streamName,
|
streamName: effectiveStreamName,
|
||||||
config: config,
|
config: config,
|
||||||
versions: make(map[string]int64),
|
versions: make(map[string]int64),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetNamespace returns the namespace configured for this store, or empty string if not namespaced.
|
||||||
|
func (jes *JetStreamEventStore) GetNamespace() string {
|
||||||
|
return jes.config.Namespace
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetStreamName returns the effective stream name (including namespace prefix if applicable).
|
||||||
|
func (jes *JetStreamEventStore) GetStreamName() string {
|
||||||
|
return jes.streamName
|
||||||
|
}
|
||||||
|
|
||||||
// SaveEvent persists an event to JetStream.
|
// SaveEvent persists an event to JetStream.
|
||||||
// Returns VersionConflictError if the event's version is not strictly greater
|
// Returns VersionConflictError if the event's version is not strictly greater
|
||||||
// than the current latest version for the actor.
|
// than the current latest version for the actor.
|
||||||
|
|||||||
124
store/namespace_test.go
Normal file
124
store/namespace_test.go
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestJetStreamConfigNamespace(t *testing.T) {
|
||||||
|
t.Run("default config has empty namespace", func(t *testing.T) {
|
||||||
|
config := DefaultJetStreamConfig()
|
||||||
|
if config.Namespace != "" {
|
||||||
|
t.Errorf("expected empty namespace in default config, got %q", config.Namespace)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("namespace can be set in config", func(t *testing.T) {
|
||||||
|
config := JetStreamConfig{
|
||||||
|
Namespace: "tenant-abc",
|
||||||
|
}
|
||||||
|
if config.Namespace != "tenant-abc" {
|
||||||
|
t.Errorf("expected namespace tenant-abc, got %q", config.Namespace)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNamespacedStreamName(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
baseStreamName string
|
||||||
|
namespace string
|
||||||
|
expectedStreamName string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no namespace - stream name unchanged",
|
||||||
|
baseStreamName: "events",
|
||||||
|
namespace: "",
|
||||||
|
expectedStreamName: "events",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "with namespace - prefixed stream name",
|
||||||
|
baseStreamName: "events",
|
||||||
|
namespace: "tenant-abc",
|
||||||
|
expectedStreamName: "tenant-abc_events",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespace with dots - sanitized",
|
||||||
|
baseStreamName: "events",
|
||||||
|
namespace: "tenant.abc",
|
||||||
|
expectedStreamName: "tenant_abc_events",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespace with spaces - sanitized",
|
||||||
|
baseStreamName: "events",
|
||||||
|
namespace: "tenant abc",
|
||||||
|
expectedStreamName: "tenant_abc_events",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "namespace with special chars - sanitized",
|
||||||
|
baseStreamName: "events",
|
||||||
|
namespace: "tenant*abc>def",
|
||||||
|
expectedStreamName: "tenant_abc_def_events",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
// We can't create a real JetStreamEventStore without NATS,
|
||||||
|
// but we can test the stream name logic by examining the expected format
|
||||||
|
effectiveStreamName := tt.baseStreamName
|
||||||
|
if tt.namespace != "" {
|
||||||
|
effectiveStreamName = sanitizeSubject(tt.namespace) + "_" + tt.baseStreamName
|
||||||
|
}
|
||||||
|
|
||||||
|
if effectiveStreamName != tt.expectedStreamName {
|
||||||
|
t.Errorf("expected stream name %q, got %q", tt.expectedStreamName, effectiveStreamName)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSanitizeSubject(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
input string
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{"simple", "simple"},
|
||||||
|
{"with spaces", "with_spaces"},
|
||||||
|
{"with.dots", "with_dots"},
|
||||||
|
{"with*stars", "with_stars"},
|
||||||
|
{"with>greater", "with_greater"},
|
||||||
|
{"complex.name with*special>chars", "complex_name_with_special_chars"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.input, func(t *testing.T) {
|
||||||
|
result := sanitizeSubject(tt.input)
|
||||||
|
if result != tt.expected {
|
||||||
|
t.Errorf("sanitizeSubject(%q) = %q, want %q", tt.input, result, tt.expected)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExtractActorType(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
actorID string
|
||||||
|
expectedType string
|
||||||
|
}{
|
||||||
|
{"order-123", "order"},
|
||||||
|
{"user-abc-def", "user"},
|
||||||
|
{"nodelimiter", "unknown"},
|
||||||
|
{"", "unknown"},
|
||||||
|
{"-leadingdash", "unknown"},
|
||||||
|
{"a-b", "a"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.actorID, func(t *testing.T) {
|
||||||
|
result := extractActorType(tt.actorID)
|
||||||
|
if result != tt.expectedType {
|
||||||
|
t.Errorf("extractActorType(%q) = %q, want %q", tt.actorID, result, tt.expectedType)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user