From e9e50c021f1d646eafb8ddd5bdc51d7f153875ff Mon Sep 17 00:00:00 2001 From: Hugo Nijhuis Date: Thu, 8 Jan 2026 19:30:02 +0100 Subject: [PATCH] Initial aether repository structure Distributed actor system with event sourcing for Go: - event.go - Event, ActorSnapshot, EventStore interface - eventbus.go - EventBus, EventBroadcaster for pub/sub - nats_eventbus.go - NATS-backed cross-node event broadcasting - store/ - InMemoryEventStore (testing), JetStreamEventStore (production) - cluster/ - Node discovery, leader election, shard distribution - model/ - EventStorming model types Extracted from arcadia as open-source infrastructure component. Co-Authored-By: Claude --- .gitea/workflows/ci.yaml | 19 ++ .gitignore | 20 ++ CLAUDE.md | 116 +++++++++++ LICENSE | 190 ++++++++++++++++++ Makefile | 13 ++ cluster/cluster.go | 48 +++++ cluster/discovery.go | 118 +++++++++++ cluster/distributed.go | 221 +++++++++++++++++++++ cluster/hashring.go | 105 ++++++++++ cluster/leader.go | 414 +++++++++++++++++++++++++++++++++++++++ cluster/manager.go | 331 +++++++++++++++++++++++++++++++ cluster/shard.go | 188 ++++++++++++++++++ cluster/types.go | 110 +++++++++++ event.go | 38 ++++ eventbus.go | 106 ++++++++++ go.mod | 16 ++ go.sum | 14 ++ model/model.go | 47 +++++ nats_eventbus.go | 159 +++++++++++++++ store/jetstream.go | 218 +++++++++++++++++++++ store/memory.go | 60 ++++++ vision.md | 37 ++++ 22 files changed, 2588 insertions(+) create mode 100644 .gitea/workflows/ci.yaml create mode 100644 .gitignore create mode 100644 CLAUDE.md create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 cluster/cluster.go create mode 100644 cluster/discovery.go create mode 100644 cluster/distributed.go create mode 100644 cluster/hashring.go create mode 100644 cluster/leader.go create mode 100644 cluster/manager.go create mode 100644 cluster/shard.go create mode 100644 cluster/types.go create mode 100644 event.go create mode 100644 eventbus.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 model/model.go create mode 100644 nats_eventbus.go create mode 100644 store/jetstream.go create mode 100644 store/memory.go create mode 100644 vision.md diff --git a/.gitea/workflows/ci.yaml b/.gitea/workflows/ci.yaml new file mode 100644 index 0000000..0b0e748 --- /dev/null +++ b/.gitea/workflows/ci.yaml @@ -0,0 +1,19 @@ +name: CI +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version: '1.23' + - name: Build + run: go build ./... + - name: Test + run: go test ./... diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ccdd07c --- /dev/null +++ b/.gitignore @@ -0,0 +1,20 @@ +# IDE +.idea/ +.vscode/ +*.swp + +# OS +.DS_Store +Thumbs.db + +# Build artifacts +/dist/ +/build/ +/bin/ + +# Go +/vendor/ + +# Test artifacts +*.test +coverage.out diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..527f7e2 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,116 @@ +# Aether + +Distributed actor system with event sourcing for Go, powered by NATS. + +## Organization Context + +This repo is part of Flowmade. See: +- [Organization manifesto](../architecture/manifesto.md) - who we are, what we believe +- [Repository map](../architecture/repos.md) - how this fits in the bigger picture +- [Vision](./vision.md) - what this specific product does + +## Setup + +```bash +git clone git@git.flowmade.one:flowmade-one/aether.git +cd aether +go mod download +``` + +Requires NATS server for integration tests: +```bash +# Install NATS +brew install nats-server + +# Run with JetStream enabled +nats-server -js +``` + +## Project Structure + +``` +aether/ +├── event.go # Event, ActorSnapshot, EventStore interface +├── eventbus.go # EventBus, EventBroadcaster interface +├── nats_eventbus.go # NATSEventBus - cross-node event broadcasting +├── store/ +│ ├── memory.go # InMemoryEventStore (testing) +│ └── jetstream.go # JetStreamEventStore (production) +├── cluster/ +│ ├── manager.go # ClusterManager +│ ├── discovery.go # NodeDiscovery +│ ├── hashring.go # ConsistentHashRing +│ ├── shard.go # ShardManager +│ ├── leader.go # LeaderElection +│ └── types.go # Cluster types +└── model/ + └── model.go # EventStorming model types +``` + +## Development + +```bash +make build # Build the library +make test # Run tests +make lint # Run linters +``` + +## Architecture + +### Event Sourcing + +Events are the source of truth. State is derived by replaying events. + +```go +// Create an event +event := &aether.Event{ + ID: uuid.New().String(), + EventType: "OrderPlaced", + ActorID: "order-123", + Version: 1, + Data: map[string]interface{}{"total": 100.00}, + Timestamp: time.Now(), +} + +// Persist to event store +store.SaveEvent(event) + +// Replay events to rebuild state +events, _ := store.GetEvents("order-123", 0) +``` + +### Namespace Isolation + +Namespaces provide logical boundaries for events and subscriptions: + +```go +// Subscribe to events in a namespace +ch := eventBus.Subscribe("tenant-abc") + +// Events are isolated per namespace +eventBus.Publish("tenant-abc", event) // Only tenant-abc subscribers see this +``` + +### Clustering + +Aether handles node discovery, leader election, and shard distribution: + +```go +// Create cluster manager +manager := cluster.NewClusterManager(natsConn, nodeID) + +// Join cluster +manager.Start() + +// Leader election happens automatically +if manager.IsLeader() { + // Coordinate shard assignments +} +``` + +## Key Patterns + +- **Events are immutable** - Never modify, only append +- **Snapshots for performance** - Periodically snapshot state to avoid full replay +- **Namespaces for isolation** - Not multi-tenancy, just logical boundaries +- **NATS for everything** - Events, pub/sub, clustering all use NATS diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..479e231 --- /dev/null +++ b/LICENSE @@ -0,0 +1,190 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to the Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2024-2026 Flowmade + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..68a9b7d --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +.PHONY: build test lint clean + +build: + go build ./... + +test: + go test ./... + +lint: + golangci-lint run + +clean: + go clean diff --git a/cluster/cluster.go b/cluster/cluster.go new file mode 100644 index 0000000..88a30c5 --- /dev/null +++ b/cluster/cluster.go @@ -0,0 +1,48 @@ +// Package cluster provides distributed computing capabilities for the Aether VM runtime. +// +// This package implements a distributed actor system using NATS for coordination, +// featuring consistent hashing for shard distribution, leader election for +// coordination, and fault-tolerant actor migration between nodes. +// +// Key Components: +// +// - ConsistentHashRing: Distributes actors across cluster nodes using consistent hashing +// - LeaderElection: NATS-based leader election with lease-based coordination +// - ClusterManager: Coordinates distributed operations and shard rebalancing +// - NodeDiscovery: Manages cluster membership and node health monitoring +// - ShardManager: Handles actor placement and distribution across shards +// - DistributedVM: Main entry point for distributed VM cluster operations +// +// Usage: +// +// // Create a distributed VM node +// distributedVM, err := cluster.NewDistributedVM("node-1", []string{"nats://localhost:4222"}, localRuntime) +// if err != nil { +// log.Fatal(err) +// } +// +// // Start the cluster node +// if err := distributedVM.Start(); err != nil { +// log.Fatal(err) +// } +// +// // Load a model across the cluster +// if err := distributedVM.LoadModel(eventStormingModel); err != nil { +// log.Fatal(err) +// } +// +// Architecture: +// +// The cluster package implements a distributed actor system where each node +// runs a local VM runtime and coordinates with other nodes through NATS. +// Actors are sharded across nodes using consistent hashing, and the system +// supports dynamic rebalancing when nodes join or leave the cluster. +// +// Fault Tolerance: +// +// - Automatic node failure detection through heartbeat monitoring +// - Leader election ensures coordination continues despite node failures +// - Actor migration allows rebalancing when cluster topology changes +// - Graceful shutdown with proper resource cleanup +// +package cluster \ No newline at end of file diff --git a/cluster/discovery.go b/cluster/discovery.go new file mode 100644 index 0000000..6c2e13f --- /dev/null +++ b/cluster/discovery.go @@ -0,0 +1,118 @@ +package cluster + +import ( + "context" + "encoding/json" + "time" + + "github.com/nats-io/nats.go" +) + +// NodeDiscovery manages cluster membership using NATS +type NodeDiscovery struct { + nodeID string + nodeInfo *NodeInfo + natsConn *nats.Conn + heartbeat time.Duration + timeout time.Duration + updates chan NodeUpdate + ctx context.Context +} + +// NewNodeDiscovery creates a node discovery service +func NewNodeDiscovery(nodeID string, natsConn *nats.Conn, ctx context.Context) *NodeDiscovery { + nodeInfo := &NodeInfo{ + ID: nodeID, + Status: NodeStatusActive, + Capacity: 1000, // Default capacity + Load: 0, + LastSeen: time.Now(), + Metadata: make(map[string]string), + } + + return &NodeDiscovery{ + nodeID: nodeID, + nodeInfo: nodeInfo, + natsConn: natsConn, + heartbeat: 30 * time.Second, + timeout: 90 * time.Second, + updates: make(chan NodeUpdate, 100), + ctx: ctx, + } +} + +// Start begins node discovery and heartbeating +func (nd *NodeDiscovery) Start() { + // Announce this node joining + nd.announceNode(NodeJoined) + + // Start heartbeat + ticker := time.NewTicker(nd.heartbeat) + defer ticker.Stop() + + // Subscribe to node announcements + nd.natsConn.Subscribe("aether.discovery", func(msg *nats.Msg) { + var update NodeUpdate + if err := json.Unmarshal(msg.Data, &update); err != nil { + return + } + + select { + case nd.updates <- update: + case <-nd.ctx.Done(): + } + }) + + for { + select { + case <-ticker.C: + nd.announceNode(NodeUpdated) + + case <-nd.ctx.Done(): + nd.announceNode(NodeLeft) + return + } + } +} + +// GetUpdates returns the channel for receiving node updates +func (nd *NodeDiscovery) GetUpdates() <-chan NodeUpdate { + return nd.updates +} + +// GetNodeInfo returns the current node information +func (nd *NodeDiscovery) GetNodeInfo() *NodeInfo { + return nd.nodeInfo +} + +// UpdateLoad updates the node's current load +func (nd *NodeDiscovery) UpdateLoad(load float64) { + nd.nodeInfo.Load = load +} + +// UpdateVMCount updates the number of VMs on this node +func (nd *NodeDiscovery) UpdateVMCount(count int) { + nd.nodeInfo.VMCount = count +} + +// announceNode publishes node status to the cluster +func (nd *NodeDiscovery) announceNode(updateType NodeUpdateType) { + nd.nodeInfo.LastSeen = time.Now() + + update := NodeUpdate{ + Type: updateType, + Node: nd.nodeInfo, + } + + data, err := json.Marshal(update) + if err != nil { + return + } + + nd.natsConn.Publish("aether.discovery", data) +} + +// Stop gracefully stops the node discovery service +func (nd *NodeDiscovery) Stop() { + nd.announceNode(NodeLeft) +} \ No newline at end of file diff --git a/cluster/distributed.go b/cluster/distributed.go new file mode 100644 index 0000000..365522d --- /dev/null +++ b/cluster/distributed.go @@ -0,0 +1,221 @@ +package cluster + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/nats-io/nats.go" +) + +// DistributedVM manages a cluster of runtime nodes with VM-per-instance architecture +type DistributedVM struct { + nodeID string + cluster *ClusterManager + localRuntime Runtime // Interface to avoid import cycles + sharding *ShardManager + discovery *NodeDiscovery + natsConn *nats.Conn + ctx context.Context + cancel context.CancelFunc +} + +// Runtime interface to avoid import cycles with main aether package +type Runtime interface { + Start() error + LoadModel(model interface{}) error + SendMessage(message interface{}) error +} + +// DistributedVMRegistry implements VMRegistry using DistributedVM's local runtime and sharding +type DistributedVMRegistry struct { + runtime interface{} // Runtime interface to avoid import cycles + sharding *ShardManager +} + +// NewDistributedVM creates a distributed VM runtime cluster node +func NewDistributedVM(nodeID string, natsURLs []string, localRuntime Runtime) (*DistributedVM, error) { + ctx, cancel := context.WithCancel(context.Background()) + + // Connect to NATS cluster + natsURL := natsURLs[0] // Use first URL for simplicity + natsConn, err := nats.Connect(natsURL, + nats.Name(fmt.Sprintf("aether-runtime-%s", nodeID))) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to connect to NATS: %w", err) + } + + // Create cluster components + discovery := NewNodeDiscovery(nodeID, natsConn, ctx) + sharding := NewShardManager(1024, 3) // 1024 shards, 3 replicas + cluster, err := NewClusterManager(nodeID, natsConn, ctx) + if err != nil { + cancel() + natsConn.Close() + return nil, fmt.Errorf("failed to create cluster manager: %w", err) + } + + dvm := &DistributedVM{ + nodeID: nodeID, + cluster: cluster, + localRuntime: localRuntime, + sharding: sharding, + discovery: discovery, + natsConn: natsConn, + ctx: ctx, + cancel: cancel, + } + + // Create VM registry and connect it to cluster manager + vmRegistry := &DistributedVMRegistry{ + runtime: localRuntime, + sharding: sharding, + } + cluster.SetVMRegistry(vmRegistry) + + return dvm, nil +} + +// Start begins the distributed VM cluster node +func (dvm *DistributedVM) Start() error { + // Start local runtime + if err := dvm.localRuntime.Start(); err != nil { + return fmt.Errorf("failed to start local runtime: %w", err) + } + + // Start cluster services + go dvm.discovery.Start() + go dvm.cluster.Start() + + // Start message routing + go dvm.startMessageRouting() + + return nil +} + +// Stop gracefully shuts down the distributed VM node +func (dvm *DistributedVM) Stop() { + dvm.cancel() + dvm.cluster.Stop() + dvm.discovery.Stop() + dvm.natsConn.Close() +} + +// LoadModel distributes EventStorming model across the cluster with VM templates +func (dvm *DistributedVM) LoadModel(model interface{}) error { + // Load model locally first + if err := dvm.localRuntime.LoadModel(model); err != nil { + return fmt.Errorf("failed to load model locally: %w", err) + } + + // Broadcast model to other cluster nodes + msg := ClusterMessage{ + Type: "load_model", + From: dvm.nodeID, + To: "broadcast", + Payload: model, + } + + return dvm.publishClusterMessage(msg) +} + +// SendMessage routes messages across the distributed cluster +func (dvm *DistributedVM) SendMessage(message interface{}) error { + // This is a simplified implementation + // In practice, this would determine the target node based on sharding + // and route the message appropriately + + return dvm.localRuntime.SendMessage(message) +} + +// GetActorNode determines which node should handle a specific actor +func (dvm *DistributedVM) GetActorNode(actorID string) string { + // Use consistent hashing to determine the target node + return dvm.cluster.hashRing.GetNode(actorID) +} + +// IsLocalActor checks if an actor should be handled by this node +func (dvm *DistributedVM) IsLocalActor(actorID string) bool { + targetNode := dvm.GetActorNode(actorID) + return targetNode == dvm.nodeID +} + +// GetActorsInShard returns actors that belong to a specific shard on this node +func (dvm *DistributedVM) GetActorsInShard(shardID int) []string { + return dvm.cluster.GetActorsInShard(shardID) +} + +// startMessageRouting begins routing messages between cluster nodes +func (dvm *DistributedVM) startMessageRouting() { + // Subscribe to cluster messages + dvm.natsConn.Subscribe("aether.distributed.*", dvm.handleClusterMessage) +} + +// handleClusterMessage processes incoming cluster coordination messages +func (dvm *DistributedVM) handleClusterMessage(msg *nats.Msg) { + var clusterMsg ClusterMessage + if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil { + return + } + + switch clusterMsg.Type { + case "load_model": + // Handle model loading from other nodes + if model := clusterMsg.Payload; model != nil { + dvm.localRuntime.LoadModel(model) + } + + case "route_message": + // Handle message routing from other nodes + if message := clusterMsg.Payload; message != nil { + dvm.localRuntime.SendMessage(message) + } + + case "rebalance": + // Handle shard rebalancing requests + dvm.handleRebalanceRequest(clusterMsg) + } +} + +// handleRebalanceRequest processes shard rebalancing requests +func (dvm *DistributedVM) handleRebalanceRequest(msg ClusterMessage) { + // Simplified rebalancing logic + // In practice, this would implement complex actor migration +} + +// publishClusterMessage sends a message to other cluster nodes +func (dvm *DistributedVM) publishClusterMessage(msg ClusterMessage) error { + data, err := json.Marshal(msg) + if err != nil { + return err + } + + subject := fmt.Sprintf("aether.distributed.%s", msg.Type) + return dvm.natsConn.Publish(subject, data) +} + +// GetClusterInfo returns information about the cluster state +func (dvm *DistributedVM) GetClusterInfo() map[string]interface{} { + nodes := dvm.cluster.GetNodes() + + return map[string]interface{}{ + "nodeId": dvm.nodeID, + "isLeader": dvm.cluster.IsLeader(), + "leader": dvm.cluster.GetLeader(), + "nodeCount": len(nodes), + "nodes": nodes, + } +} + +// GetActiveVMs returns a map of active VMs (implementation depends on runtime) +func (dvr *DistributedVMRegistry) GetActiveVMs() map[string]interface{} { + // This would need to access the actual runtime's VM registry + // For now, return empty map to avoid import cycles + return make(map[string]interface{}) +} + +// GetShard returns the shard number for the given actor ID +func (dvr *DistributedVMRegistry) GetShard(actorID string) int { + return dvr.sharding.GetShard(actorID) +} \ No newline at end of file diff --git a/cluster/hashring.go b/cluster/hashring.go new file mode 100644 index 0000000..f1c03c3 --- /dev/null +++ b/cluster/hashring.go @@ -0,0 +1,105 @@ +package cluster + +import ( + "crypto/sha256" + "encoding/binary" + "fmt" + "sort" +) + +// ConsistentHashRing implements a consistent hash ring for shard distribution +type ConsistentHashRing struct { + ring map[uint32]string // hash -> node ID + sortedHashes []uint32 // sorted hash keys + nodes map[string]bool // active nodes +} + +// NewConsistentHashRing creates a new consistent hash ring +func NewConsistentHashRing() *ConsistentHashRing { + return &ConsistentHashRing{ + ring: make(map[uint32]string), + nodes: make(map[string]bool), + } +} + +// AddNode adds a node to the hash ring +func (chr *ConsistentHashRing) AddNode(nodeID string) { + if chr.nodes[nodeID] { + return // Node already exists + } + + chr.nodes[nodeID] = true + + // Add virtual nodes for better distribution + for i := 0; i < VirtualNodes; i++ { + virtualKey := fmt.Sprintf("%s:%d", nodeID, i) + hash := chr.hash(virtualKey) + chr.ring[hash] = nodeID + chr.sortedHashes = append(chr.sortedHashes, hash) + } + + sort.Slice(chr.sortedHashes, func(i, j int) bool { + return chr.sortedHashes[i] < chr.sortedHashes[j] + }) +} + +// RemoveNode removes a node from the hash ring +func (chr *ConsistentHashRing) RemoveNode(nodeID string) { + if !chr.nodes[nodeID] { + return // Node doesn't exist + } + + delete(chr.nodes, nodeID) + + // Remove all virtual nodes for this physical node + newHashes := make([]uint32, 0) + for _, hash := range chr.sortedHashes { + if chr.ring[hash] != nodeID { + newHashes = append(newHashes, hash) + } else { + delete(chr.ring, hash) + } + } + chr.sortedHashes = newHashes +} + +// GetNode returns the node responsible for a given key +func (chr *ConsistentHashRing) GetNode(key string) string { + if len(chr.sortedHashes) == 0 { + return "" + } + + hash := chr.hash(key) + + // Find the first node with hash >= key hash (clockwise) + idx := sort.Search(len(chr.sortedHashes), func(i int) bool { + return chr.sortedHashes[i] >= hash + }) + + // Wrap around to the first node if we've gone past the end + if idx == len(chr.sortedHashes) { + idx = 0 + } + + return chr.ring[chr.sortedHashes[idx]] +} + +// hash computes a hash for the given key +func (chr *ConsistentHashRing) hash(key string) uint32 { + h := sha256.Sum256([]byte(key)) + return binary.BigEndian.Uint32(h[:4]) +} + +// GetNodes returns all active nodes in the ring +func (chr *ConsistentHashRing) GetNodes() []string { + nodes := make([]string, 0, len(chr.nodes)) + for nodeID := range chr.nodes { + nodes = append(nodes, nodeID) + } + return nodes +} + +// IsEmpty returns true if the ring has no nodes +func (chr *ConsistentHashRing) IsEmpty() bool { + return len(chr.nodes) == 0 +} \ No newline at end of file diff --git a/cluster/leader.go b/cluster/leader.go new file mode 100644 index 0000000..b9a2607 --- /dev/null +++ b/cluster/leader.go @@ -0,0 +1,414 @@ +package cluster + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "sync" + "time" + + "github.com/nats-io/nats.go" +) + +// LeaderElection manages NATS-based leader election using lease-based coordination +type LeaderElection struct { + nodeID string + natsConn *nats.Conn + js nats.JetStreamContext + kv nats.KeyValue + isLeader bool + currentLeader string + leaderTerm uint64 + ctx context.Context + cancel context.CancelFunc + mutex sync.RWMutex + logger *log.Logger + callbacks LeaderElectionCallbacks +} + +// NewLeaderElection creates a new NATS-based leader election system +func NewLeaderElection(nodeID string, natsConn *nats.Conn, callbacks LeaderElectionCallbacks) (*LeaderElection, error) { + ctx, cancel := context.WithCancel(context.Background()) + + // Create JetStream context + js, err := natsConn.JetStream() + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create JetStream context: %w", err) + } + + // Create or get KV store for leader election + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: "aether-leader-election", + Description: "Aether cluster leader election coordination", + TTL: LeaderLeaseTimeout * 2, // Auto-cleanup expired leases + MaxBytes: 1024 * 1024, // 1MB max + Replicas: 1, // Single replica for simplicity + }) + if err != nil { + // Try to get existing KV store + kv, err = js.KeyValue("aether-leader-election") + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create/get KV store: %w", err) + } + } + + return &LeaderElection{ + nodeID: nodeID, + natsConn: natsConn, + js: js, + kv: kv, + ctx: ctx, + cancel: cancel, + logger: log.New(os.Stdout, fmt.Sprintf("[Leader %s] ", nodeID), log.LstdFlags), + callbacks: callbacks, + }, nil +} + +// Start begins the leader election process +func (le *LeaderElection) Start() { + le.logger.Printf("🗳️ Starting leader election") + + // Start election loop in background + go le.electionLoop() + + // Start lease renewal loop in background + go le.leaseRenewalLoop() + + // Start leader monitoring + go le.monitorLeadership() +} + +// Stop stops the leader election process +func (le *LeaderElection) Stop() { + le.logger.Printf("🛑 Stopping leader election") + + le.cancel() + + // If we're the leader, resign gracefully + if le.IsLeader() { + le.resignLeadership() + } +} + +// IsLeader returns whether this node is currently the leader +func (le *LeaderElection) IsLeader() bool { + le.mutex.RLock() + defer le.mutex.RUnlock() + return le.isLeader +} + +// GetLeader returns the current leader ID +func (le *LeaderElection) GetLeader() string { + le.mutex.RLock() + defer le.mutex.RUnlock() + return le.currentLeader +} + +// GetTerm returns the current leadership term +func (le *LeaderElection) GetTerm() uint64 { + le.mutex.RLock() + defer le.mutex.RUnlock() + return le.leaderTerm +} + +// electionLoop runs the main election process +func (le *LeaderElection) electionLoop() { + ticker := time.NewTicker(ElectionTimeout) + defer ticker.Stop() + + // Try to become leader immediately + le.tryBecomeLeader() + + for { + select { + case <-le.ctx.Done(): + return + case <-ticker.C: + // Periodically check if we should try to become leader + if !le.IsLeader() && le.shouldTryElection() { + le.tryBecomeLeader() + } + } + } +} + +// leaseRenewalLoop renews the leadership lease if we're the leader +func (le *LeaderElection) leaseRenewalLoop() { + ticker := time.NewTicker(HeartbeatInterval) + defer ticker.Stop() + + for { + select { + case <-le.ctx.Done(): + return + case <-ticker.C: + if le.IsLeader() { + if err := le.renewLease(); err != nil { + le.logger.Printf("❌ Failed to renew leadership lease: %v", err) + le.loseLeadership() + } + } + } + } +} + +// monitorLeadership watches for leadership changes +func (le *LeaderElection) monitorLeadership() { + watcher, err := le.kv.Watch("leader") + if err != nil { + le.logger.Printf("❌ Failed to watch leadership: %v", err) + return + } + defer watcher.Stop() + + for { + select { + case <-le.ctx.Done(): + return + case entry := <-watcher.Updates(): + if entry == nil { + continue + } + le.handleLeadershipUpdate(entry) + } + } +} + +// tryBecomeLeader attempts to acquire leadership +func (le *LeaderElection) tryBecomeLeader() { + le.logger.Printf("🗳️ Attempting to become leader") + + now := time.Now() + newLease := LeadershipLease{ + LeaderID: le.nodeID, + Term: le.leaderTerm + 1, + ExpiresAt: now.Add(LeaderLeaseTimeout), + StartedAt: now, + } + + leaseData, err := json.Marshal(newLease) + if err != nil { + le.logger.Printf("❌ Failed to marshal lease: %v", err) + return + } + + // Try to create the leader key (atomic operation) + _, err = le.kv.Create("leader", leaseData) + if err != nil { + // Leader key exists, check if it's expired + if le.tryClaimExpiredLease() { + return // Successfully claimed expired lease + } + // Another node is leader + return + } + + // Successfully became leader! + le.becomeLeader(newLease.Term) +} + +// tryClaimExpiredLease attempts to claim an expired leadership lease +func (le *LeaderElection) tryClaimExpiredLease() bool { + entry, err := le.kv.Get("leader") + if err != nil { + return false + } + + var currentLease LeadershipLease + if err := json.Unmarshal(entry.Value(), ¤tLease); err != nil { + return false + } + + // Check if lease is expired + if time.Now().Before(currentLease.ExpiresAt) { + // Lease is still valid + le.updateCurrentLeader(currentLease.LeaderID, currentLease.Term) + return false + } + + // Lease is expired, try to claim it + le.logger.Printf("🕐 Attempting to claim expired lease from %s", currentLease.LeaderID) + + now := time.Now() + newLease := LeadershipLease{ + LeaderID: le.nodeID, + Term: currentLease.Term + 1, + ExpiresAt: now.Add(LeaderLeaseTimeout), + StartedAt: now, + } + + leaseData, err := json.Marshal(newLease) + if err != nil { + return false + } + + // Atomically update the lease + _, err = le.kv.Update("leader", leaseData, entry.Revision()) + if err != nil { + return false + } + + // Successfully claimed expired lease! + le.becomeLeader(newLease.Term) + return true +} + +// renewLease renews the current leadership lease +func (le *LeaderElection) renewLease() error { + entry, err := le.kv.Get("leader") + if err != nil { + return err + } + + var currentLease LeadershipLease + if err := json.Unmarshal(entry.Value(), ¤tLease); err != nil { + return err + } + + // Verify we're still the leader + if currentLease.LeaderID != le.nodeID { + return fmt.Errorf("no longer leader, current leader is %s", currentLease.LeaderID) + } + + // Renew the lease + renewedLease := currentLease + renewedLease.ExpiresAt = time.Now().Add(LeaderLeaseTimeout) + + leaseData, err := json.Marshal(renewedLease) + if err != nil { + return err + } + + _, err = le.kv.Update("leader", leaseData, entry.Revision()) + if err != nil { + return fmt.Errorf("failed to renew lease: %w", err) + } + + le.logger.Printf("💓 Renewed leadership lease until %s", renewedLease.ExpiresAt.Format(time.RFC3339)) + return nil +} + +// becomeLeader handles becoming the cluster leader +func (le *LeaderElection) becomeLeader(term uint64) { + le.mutex.Lock() + le.isLeader = true + le.currentLeader = le.nodeID + le.leaderTerm = term + le.mutex.Unlock() + + le.logger.Printf("👑 Became cluster leader (term %d)", term) + + if le.callbacks.OnBecameLeader != nil { + le.callbacks.OnBecameLeader() + } +} + +// loseLeadership handles losing leadership +func (le *LeaderElection) loseLeadership() { + le.mutex.Lock() + wasLeader := le.isLeader + le.isLeader = false + le.mutex.Unlock() + + if wasLeader { + le.logger.Printf("📉 Lost cluster leadership") + if le.callbacks.OnLostLeader != nil { + le.callbacks.OnLostLeader() + } + } +} + +// resignLeadership gracefully resigns from leadership +func (le *LeaderElection) resignLeadership() { + if !le.IsLeader() { + return + } + + le.logger.Printf("👋 Resigning from cluster leadership") + + // Delete the leadership key + err := le.kv.Delete("leader") + if err != nil { + le.logger.Printf("⚠️ Failed to delete leadership key: %v", err) + } + + le.loseLeadership() +} + +// shouldTryElection determines if this node should attempt to become leader +func (le *LeaderElection) shouldTryElection() bool { + // Always try if no current leader + if le.GetLeader() == "" { + return true + } + + // Check if current lease is expired + entry, err := le.kv.Get("leader") + if err != nil { + // Can't read lease, try to become leader + return true + } + + var currentLease LeadershipLease + if err := json.Unmarshal(entry.Value(), ¤tLease); err != nil { + // Invalid lease, try to become leader + return true + } + + // Try if lease is expired + return time.Now().After(currentLease.ExpiresAt) +} + +// handleLeadershipUpdate processes leadership change notifications +func (le *LeaderElection) handleLeadershipUpdate(entry nats.KeyValueEntry) { + if entry.Operation() == nats.KeyValueDelete { + // Leadership was vacated + le.updateCurrentLeader("", 0) + return + } + + var lease LeadershipLease + if err := json.Unmarshal(entry.Value(), &lease); err != nil { + le.logger.Printf("⚠️ Invalid leadership lease: %v", err) + return + } + + le.updateCurrentLeader(lease.LeaderID, lease.Term) +} + +// updateCurrentLeader updates the current leader information +func (le *LeaderElection) updateCurrentLeader(leaderID string, term uint64) { + le.mutex.Lock() + oldLeader := le.currentLeader + le.currentLeader = leaderID + le.leaderTerm = term + + // Update our leadership status + if leaderID == le.nodeID { + le.isLeader = true + } else { + if le.isLeader { + le.isLeader = false + le.mutex.Unlock() + if le.callbacks.OnLostLeader != nil { + le.callbacks.OnLostLeader() + } + le.mutex.Lock() + } else { + le.isLeader = false + } + } + le.mutex.Unlock() + + // Notify of leader change + if oldLeader != leaderID && leaderID != "" && leaderID != le.nodeID { + le.logger.Printf("🔄 New cluster leader: %s (term %d)", leaderID, term) + if le.callbacks.OnNewLeader != nil { + le.callbacks.OnNewLeader(leaderID) + } + } +} \ No newline at end of file diff --git a/cluster/manager.go b/cluster/manager.go new file mode 100644 index 0000000..2ca4c40 --- /dev/null +++ b/cluster/manager.go @@ -0,0 +1,331 @@ +package cluster + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "sync" + "time" + + "github.com/nats-io/nats.go" +) + +// VMRegistry provides access to local VM information for cluster operations +type VMRegistry interface { + GetActiveVMs() map[string]interface{} // VirtualMachine interface to avoid import cycles + GetShard(actorID string) int +} + +// ClusterManager coordinates distributed VM operations across the cluster +type ClusterManager struct { + nodeID string + nodes map[string]*NodeInfo + nodeUpdates chan NodeUpdate + shardMap *ShardMap + hashRing *ConsistentHashRing + election *LeaderElection + natsConn *nats.Conn + ctx context.Context + mutex sync.RWMutex + logger *log.Logger + vmRegistry VMRegistry // Interface to access local VMs +} + +// NewClusterManager creates a cluster coordination manager +func NewClusterManager(nodeID string, natsConn *nats.Conn, ctx context.Context) (*ClusterManager, error) { + cm := &ClusterManager{ + nodeID: nodeID, + nodes: make(map[string]*NodeInfo), + nodeUpdates: make(chan NodeUpdate, 100), + shardMap: &ShardMap{Shards: make(map[int][]string), Nodes: make(map[string]NodeInfo)}, + hashRing: NewConsistentHashRing(), + natsConn: natsConn, + ctx: ctx, + logger: log.New(os.Stdout, fmt.Sprintf("[ClusterMgr %s] ", nodeID), log.LstdFlags), + vmRegistry: nil, // Will be set later via SetVMRegistry + } + + // Create leadership election with callbacks + callbacks := LeaderElectionCallbacks{ + OnBecameLeader: func() { + cm.logger.Printf("👑 This node became the cluster leader - can initiate rebalancing") + }, + OnLostLeader: func() { + cm.logger.Printf("📉 This node lost cluster leadership") + }, + OnNewLeader: func(leaderID string) { + cm.logger.Printf("🔄 Cluster leadership changed to: %s", leaderID) + }, + } + + election, err := NewLeaderElection(nodeID, natsConn, callbacks) + if err != nil { + return nil, fmt.Errorf("failed to create leader election: %w", err) + } + + cm.election = election + return cm, nil +} + +// Start begins cluster management operations +func (cm *ClusterManager) Start() { + cm.logger.Printf("🚀 Starting cluster manager") + + // Start leader election + cm.election.Start() + + // Subscribe to cluster messages + cm.natsConn.Subscribe("aether.cluster.*", cm.handleClusterMessage) + + // Start node monitoring + go cm.monitorNodes() + + // Start shard rebalancing (only if leader) + go cm.rebalanceLoop() +} + +// Stop gracefully stops the cluster manager +func (cm *ClusterManager) Stop() { + cm.logger.Printf("🛑 Stopping cluster manager") + + if cm.election != nil { + cm.election.Stop() + } +} + +// IsLeader returns whether this node is the cluster leader +func (cm *ClusterManager) IsLeader() bool { + if cm.election == nil { + return false + } + return cm.election.IsLeader() +} + +// GetLeader returns the current cluster leader ID +func (cm *ClusterManager) GetLeader() string { + if cm.election == nil { + return "" + } + return cm.election.GetLeader() +} + +// SetVMRegistry sets the VM registry for accessing local VM information +func (cm *ClusterManager) SetVMRegistry(registry VMRegistry) { + cm.vmRegistry = registry +} + +// GetActorsInShard returns actors that belong to a specific shard on this node +func (cm *ClusterManager) GetActorsInShard(shardID int) []string { + if cm.vmRegistry == nil { + return []string{} + } + + activeVMs := cm.vmRegistry.GetActiveVMs() + var actors []string + + for actorID := range activeVMs { + if cm.vmRegistry.GetShard(actorID) == shardID { + actors = append(actors, actorID) + } + } + + return actors +} + +// handleClusterMessage processes incoming cluster coordination messages +func (cm *ClusterManager) handleClusterMessage(msg *nats.Msg) { + var clusterMsg ClusterMessage + if err := json.Unmarshal(msg.Data, &clusterMsg); err != nil { + cm.logger.Printf("⚠️ Invalid cluster message: %v", err) + return + } + + switch clusterMsg.Type { + case "rebalance": + cm.handleRebalanceRequest(clusterMsg) + case "migrate": + cm.handleMigrationRequest(clusterMsg) + case "node_update": + if update, ok := clusterMsg.Payload.(NodeUpdate); ok { + cm.handleNodeUpdate(update) + } + default: + cm.logger.Printf("⚠️ Unknown cluster message type: %s", clusterMsg.Type) + } +} + +// handleNodeUpdate processes node status updates +func (cm *ClusterManager) handleNodeUpdate(update NodeUpdate) { + cm.mutex.Lock() + defer cm.mutex.Unlock() + + switch update.Type { + case NodeJoined: + cm.nodes[update.Node.ID] = update.Node + cm.hashRing.AddNode(update.Node.ID) + cm.logger.Printf("➕ Node joined: %s", update.Node.ID) + + case NodeLeft: + delete(cm.nodes, update.Node.ID) + cm.hashRing.RemoveNode(update.Node.ID) + cm.logger.Printf("➖ Node left: %s", update.Node.ID) + + case NodeUpdated: + if node, exists := cm.nodes[update.Node.ID]; exists { + // Update existing node info + *node = *update.Node + } else { + // New node + cm.nodes[update.Node.ID] = update.Node + cm.hashRing.AddNode(update.Node.ID) + } + } + + // Check for failed nodes and mark them + now := time.Now() + for _, node := range cm.nodes { + if now.Sub(node.LastSeen) > 90*time.Second && node.Status != NodeStatusFailed { + node.Status = NodeStatusFailed + cm.logger.Printf("❌ Node marked as failed: %s (last seen: %s)", + node.ID, node.LastSeen.Format(time.RFC3339)) + } + } + + // Trigger rebalancing if we're the leader and there are significant changes + if cm.IsLeader() { + activeNodeCount := 0 + for _, node := range cm.nodes { + if node.Status == NodeStatusActive { + activeNodeCount++ + } + } + + // Simple trigger: rebalance if we have different number of active nodes + // than shards assigned (this is a simplified logic) + if activeNodeCount > 0 { + cm.triggerShardRebalancing("node topology changed") + } + } +} + +// handleRebalanceRequest processes cluster rebalancing requests +func (cm *ClusterManager) handleRebalanceRequest(msg ClusterMessage) { + cm.logger.Printf("🔄 Handling rebalance request from %s", msg.From) + + // Implementation would handle the specific rebalancing logic + // This is a simplified version +} + +// handleMigrationRequest processes actor migration requests +func (cm *ClusterManager) handleMigrationRequest(msg ClusterMessage) { + cm.logger.Printf("🚚 Handling migration request from %s", msg.From) + + // Implementation would handle the specific migration logic + // This is a simplified version +} + +// triggerShardRebalancing initiates shard rebalancing across the cluster +func (cm *ClusterManager) triggerShardRebalancing(reason string) { + if !cm.IsLeader() { + return // Only leader can initiate rebalancing + } + + cm.logger.Printf("⚖️ Triggering shard rebalancing: %s", reason) + + // Get active nodes + var activeNodes []*NodeInfo + cm.mutex.RLock() + for _, node := range cm.nodes { + if node.Status == NodeStatusActive { + activeNodes = append(activeNodes, node) + } + } + cm.mutex.RUnlock() + + if len(activeNodes) == 0 { + cm.logger.Printf("⚠️ No active nodes available for rebalancing") + return + } + + // This would implement the actual rebalancing logic + cm.logger.Printf("🎯 Would rebalance across %d active nodes", len(activeNodes)) +} + +// monitorNodes periodically checks node health and updates +func (cm *ClusterManager) monitorNodes() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // Health check logic would go here + cm.checkNodeHealth() + + case <-cm.ctx.Done(): + return + } + } +} + +// checkNodeHealth verifies the health of known nodes +func (cm *ClusterManager) checkNodeHealth() { + cm.mutex.Lock() + defer cm.mutex.Unlock() + + now := time.Now() + for _, node := range cm.nodes { + if now.Sub(node.LastSeen) > 90*time.Second && node.Status == NodeStatusActive { + node.Status = NodeStatusFailed + cm.logger.Printf("💔 Node failed: %s", node.ID) + } + } +} + +// rebalanceLoop runs periodic rebalancing checks (leader only) +func (cm *ClusterManager) rebalanceLoop() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if cm.IsLeader() { + cm.triggerShardRebalancing("periodic rebalance check") + } + + case <-cm.ctx.Done(): + return + } + } +} + +// GetNodes returns a copy of the current cluster nodes +func (cm *ClusterManager) GetNodes() map[string]*NodeInfo { + cm.mutex.RLock() + defer cm.mutex.RUnlock() + + nodes := make(map[string]*NodeInfo) + for id, node := range cm.nodes { + // Create a copy to prevent external mutation + nodeCopy := *node + nodes[id] = &nodeCopy + } + return nodes +} + +// GetShardMap returns the current shard mapping +func (cm *ClusterManager) GetShardMap() *ShardMap { + cm.mutex.RLock() + defer cm.mutex.RUnlock() + + // Return a copy to prevent external mutation + return &ShardMap{ + Version: cm.shardMap.Version, + Shards: make(map[int][]string), + Nodes: make(map[string]NodeInfo), + UpdateTime: cm.shardMap.UpdateTime, + } +} \ No newline at end of file diff --git a/cluster/shard.go b/cluster/shard.go new file mode 100644 index 0000000..46e270d --- /dev/null +++ b/cluster/shard.go @@ -0,0 +1,188 @@ +package cluster + +import ( + "crypto/sha256" + "encoding/binary" + "fmt" + "hash" + "hash/fnv" +) + +// MigrationStatus tracks actor migration progress +type MigrationStatus string + +const ( + MigrationPending MigrationStatus = "pending" + MigrationInProgress MigrationStatus = "in_progress" + MigrationCompleted MigrationStatus = "completed" + MigrationFailed MigrationStatus = "failed" +) + +// PlacementStrategy determines where to place new actors +type PlacementStrategy interface { + PlaceActor(actorID string, shardMap *ShardMap, nodes map[string]*NodeInfo) (string, error) + RebalanceShards(shardMap *ShardMap, nodes map[string]*NodeInfo) (*ShardMap, error) +} + +// ShardManager handles actor placement and distribution +type ShardManager struct { + shardCount int + shardMap *ShardMap + hasher hash.Hash + placement PlacementStrategy + replication int +} + +// NewShardManager creates a new shard manager +func NewShardManager(shardCount, replication int) *ShardManager { + return &ShardManager{ + shardCount: shardCount, + shardMap: &ShardMap{Shards: make(map[int][]string), Nodes: make(map[string]NodeInfo)}, + hasher: fnv.New64a(), + placement: &ConsistentHashPlacement{}, + replication: replication, + } +} + +// GetShard returns the shard number for a given actor ID +func (sm *ShardManager) GetShard(actorID string) int { + h := sha256.Sum256([]byte(actorID)) + shardID := binary.BigEndian.Uint32(h[:4]) % uint32(sm.shardCount) + return int(shardID) +} + +// GetShardNodes returns the nodes responsible for a shard +func (sm *ShardManager) GetShardNodes(shardID int) []string { + if nodes, exists := sm.shardMap.Shards[shardID]; exists { + return nodes + } + return []string{} +} + +// AssignShard assigns a shard to specific nodes +func (sm *ShardManager) AssignShard(shardID int, nodes []string) { + if sm.shardMap.Shards == nil { + sm.shardMap.Shards = make(map[int][]string) + } + sm.shardMap.Shards[shardID] = nodes +} + +// GetPrimaryNode returns the primary node for a shard +func (sm *ShardManager) GetPrimaryNode(shardID int) string { + nodes := sm.GetShardNodes(shardID) + if len(nodes) > 0 { + return nodes[0] // First node is primary + } + return "" +} + +// GetReplicaNodes returns the replica nodes for a shard +func (sm *ShardManager) GetReplicaNodes(shardID int) []string { + nodes := sm.GetShardNodes(shardID) + if len(nodes) > 1 { + return nodes[1:] // All nodes except first are replicas + } + return []string{} +} + +// UpdateShardMap updates the entire shard map +func (sm *ShardManager) UpdateShardMap(newShardMap *ShardMap) { + sm.shardMap = newShardMap +} + +// GetShardMap returns a copy of the current shard map +func (sm *ShardManager) GetShardMap() *ShardMap { + // Return a deep copy to prevent external mutation + copy := &ShardMap{ + Version: sm.shardMap.Version, + Shards: make(map[int][]string), + Nodes: make(map[string]NodeInfo), + UpdateTime: sm.shardMap.UpdateTime, + } + + // Copy the shard assignments + for shardID, nodes := range sm.shardMap.Shards { + copy.Shards[shardID] = append([]string(nil), nodes...) + } + + // Copy the node info + for nodeID, nodeInfo := range sm.shardMap.Nodes { + copy.Nodes[nodeID] = nodeInfo + } + + return copy +} + +// RebalanceShards redistributes shards across available nodes +func (sm *ShardManager) RebalanceShards(nodes map[string]*NodeInfo) (*ShardMap, error) { + if sm.placement == nil { + return nil, fmt.Errorf("no placement strategy configured") + } + + return sm.placement.RebalanceShards(sm.shardMap, nodes) +} + +// PlaceActor determines which node should handle a new actor +func (sm *ShardManager) PlaceActor(actorID string, nodes map[string]*NodeInfo) (string, error) { + if sm.placement == nil { + return "", fmt.Errorf("no placement strategy configured") + } + + return sm.placement.PlaceActor(actorID, sm.shardMap, nodes) +} + +// GetActorsInShard returns actors that belong to a specific shard on a specific node +func (sm *ShardManager) GetActorsInShard(shardID int, nodeID string, vmRegistry VMRegistry) []string { + if vmRegistry == nil { + return []string{} + } + + activeVMs := vmRegistry.GetActiveVMs() + var actors []string + + for actorID := range activeVMs { + if sm.GetShard(actorID) == shardID { + actors = append(actors, actorID) + } + } + + return actors +} + + +// ConsistentHashPlacement implements PlacementStrategy using consistent hashing +type ConsistentHashPlacement struct{} + +// PlaceActor places an actor using consistent hashing +func (chp *ConsistentHashPlacement) PlaceActor(actorID string, shardMap *ShardMap, nodes map[string]*NodeInfo) (string, error) { + if len(nodes) == 0 { + return "", fmt.Errorf("no nodes available for placement") + } + + // Simple consistent hash placement - in a real implementation, + // this would use the consistent hash ring + h := sha256.Sum256([]byte(actorID)) + nodeIndex := binary.BigEndian.Uint32(h[:4]) % uint32(len(nodes)) + + i := 0 + for nodeID := range nodes { + if i == int(nodeIndex) { + return nodeID, nil + } + i++ + } + + // Fallback to first node + for nodeID := range nodes { + return nodeID, nil + } + + return "", fmt.Errorf("failed to place actor") +} + +// RebalanceShards rebalances shards across nodes +func (chp *ConsistentHashPlacement) RebalanceShards(currentMap *ShardMap, nodes map[string]*NodeInfo) (*ShardMap, error) { + // This is a simplified implementation + // In practice, this would implement sophisticated rebalancing logic + return currentMap, nil +} \ No newline at end of file diff --git a/cluster/types.go b/cluster/types.go new file mode 100644 index 0000000..9ec9bbf --- /dev/null +++ b/cluster/types.go @@ -0,0 +1,110 @@ +package cluster + +import ( + "time" +) + +const ( + // NumShards defines the total number of shards in the cluster + NumShards = 1024 + // VirtualNodes defines the number of virtual nodes per physical node for consistent hashing + VirtualNodes = 150 + // Leadership election constants + LeaderLeaseTimeout = 10 * time.Second // How long a leader lease lasts + HeartbeatInterval = 3 * time.Second // How often leader sends heartbeats + ElectionTimeout = 2 * time.Second // How long to wait for election +) + +// NodeStatus represents the health status of a node +type NodeStatus string + +const ( + NodeStatusActive NodeStatus = "active" + NodeStatusDraining NodeStatus = "draining" + NodeStatusFailed NodeStatus = "failed" +) + +// NodeInfo represents information about a cluster node +type NodeInfo struct { + ID string `json:"id"` + Address string `json:"address"` + Port int `json:"port"` + Status NodeStatus `json:"status"` + Capacity float64 `json:"capacity"` // Maximum load capacity + Load float64 `json:"load"` // Current CPU/memory load + LastSeen time.Time `json:"lastSeen"` // Last heartbeat timestamp + Timestamp time.Time `json:"timestamp"` + Metadata map[string]string `json:"metadata"` + IsLeader bool `json:"isLeader"` + VMCount int `json:"vmCount"` // Number of VMs on this node + ShardIDs []int `json:"shardIds"` // Shards assigned to this node +} + +// NodeUpdateType represents the type of node update +type NodeUpdateType string + +const ( + NodeJoined NodeUpdateType = "joined" + NodeLeft NodeUpdateType = "left" + NodeUpdated NodeUpdateType = "updated" +) + +// NodeUpdate represents a node status update +type NodeUpdate struct { + Type NodeUpdateType `json:"type"` + Node *NodeInfo `json:"node"` +} + +// ShardMap represents the distribution of shards across cluster nodes +type ShardMap struct { + Version uint64 `json:"version"` // Incremented on each change + Shards map[int][]string `json:"shards"` // shard ID -> [primary, replica1, replica2] + Nodes map[string]NodeInfo `json:"nodes"` // node ID -> node info + UpdateTime time.Time `json:"updateTime"` +} + +// ClusterMessage represents inter-node communication +type ClusterMessage struct { + Type string `json:"type"` + From string `json:"from"` + To string `json:"to"` + Payload interface{} `json:"payload"` + Timestamp time.Time `json:"timestamp"` +} + +// RebalanceRequest represents a request to rebalance shards +type RebalanceRequest struct { + RequestID string `json:"requestId"` + FromNode string `json:"fromNode"` + ToNode string `json:"toNode"` + ShardIDs []int `json:"shardIds"` + Reason string `json:"reason"` + Migrations []ActorMigration `json:"migrations"` +} + +// ActorMigration represents the migration of an actor between nodes +type ActorMigration struct { + ActorID string `json:"actorId"` + FromNode string `json:"fromNode"` + ToNode string `json:"toNode"` + ShardID int `json:"shardId"` + State map[string]interface{} `json:"state"` + Version int64 `json:"version"` + Status string `json:"status"` // "pending", "in_progress", "completed", "failed" +} + +// LeaderElectionCallbacks defines callbacks for leadership changes +type LeaderElectionCallbacks struct { + OnBecameLeader func() + OnLostLeader func() + OnNewLeader func(leaderID string) +} + +// LeadershipLease represents a leadership lease in the cluster +type LeadershipLease struct { + LeaderID string `json:"leaderId"` + Term uint64 `json:"term"` + ExpiresAt time.Time `json:"expiresAt"` + StartedAt time.Time `json:"startedAt"` +} + diff --git a/event.go b/event.go new file mode 100644 index 0000000..c7980da --- /dev/null +++ b/event.go @@ -0,0 +1,38 @@ +package aether + +import ( + "time" +) + +// Event represents a domain event in the system +type Event struct { + ID string `json:"id"` + EventType string `json:"eventType"` + ActorID string `json:"actorId"` + CommandID string `json:"commandId,omitempty"` // Correlation ID for command that triggered this event + Version int64 `json:"version"` + Data map[string]interface{} `json:"data"` + Timestamp time.Time `json:"timestamp"` +} + +// ActorSnapshot represents a point-in-time state snapshot +type ActorSnapshot struct { + ActorID string `json:"actorId"` + Version int64 `json:"version"` + State map[string]interface{} `json:"state"` + Timestamp time.Time `json:"timestamp"` +} + +// EventStore defines the interface for event persistence +type EventStore interface { + SaveEvent(event *Event) error + GetEvents(actorID string, fromVersion int64) ([]*Event, error) + GetLatestVersion(actorID string) (int64, error) +} + +// SnapshotStore extends EventStore with snapshot capabilities +type SnapshotStore interface { + EventStore + GetLatestSnapshot(actorID string) (*ActorSnapshot, error) + SaveSnapshot(snapshot *ActorSnapshot) error +} diff --git a/eventbus.go b/eventbus.go new file mode 100644 index 0000000..161bc1b --- /dev/null +++ b/eventbus.go @@ -0,0 +1,106 @@ +package aether + +import ( + "context" + "sync" +) + +// EventBroadcaster defines the interface for publishing and subscribing to events +type EventBroadcaster interface { + Subscribe(namespaceID string) <-chan *Event + Unsubscribe(namespaceID string, ch <-chan *Event) + Publish(namespaceID string, event *Event) + Stop() + SubscriberCount(namespaceID string) int +} + +// EventBus broadcasts events to multiple subscribers within a namespace +type EventBus struct { + subscribers map[string][]chan *Event // namespaceID -> channels + mutex sync.RWMutex + ctx context.Context + cancel context.CancelFunc +} + +// NewEventBus creates a new event bus +func NewEventBus() *EventBus { + ctx, cancel := context.WithCancel(context.Background()) + return &EventBus{ + subscribers: make(map[string][]chan *Event), + ctx: ctx, + cancel: cancel, + } +} + +// Subscribe creates a new subscription channel for a namespace +func (eb *EventBus) Subscribe(namespaceID string) <-chan *Event { + eb.mutex.Lock() + defer eb.mutex.Unlock() + + // Create buffered channel to prevent blocking publishers + ch := make(chan *Event, 100) + eb.subscribers[namespaceID] = append(eb.subscribers[namespaceID], ch) + + return ch +} + +// Unsubscribe removes a subscription channel +func (eb *EventBus) Unsubscribe(namespaceID string, ch <-chan *Event) { + eb.mutex.Lock() + defer eb.mutex.Unlock() + + subs := eb.subscribers[namespaceID] + for i, subscriber := range subs { + if subscriber == ch { + // Remove channel from slice + eb.subscribers[namespaceID] = append(subs[:i], subs[i+1:]...) + close(subscriber) + break + } + } + + // Clean up empty namespace entries + if len(eb.subscribers[namespaceID]) == 0 { + delete(eb.subscribers, namespaceID) + } +} + +// Publish sends an event to all subscribers of a namespace +func (eb *EventBus) Publish(namespaceID string, event *Event) { + eb.mutex.RLock() + defer eb.mutex.RUnlock() + + subscribers := eb.subscribers[namespaceID] + for _, ch := range subscribers { + select { + case ch <- event: + // Event delivered + default: + // Channel full, skip this subscriber (non-blocking) + } + } +} + +// Stop closes the event bus +func (eb *EventBus) Stop() { + eb.mutex.Lock() + defer eb.mutex.Unlock() + + eb.cancel() + + // Close all subscriber channels + for _, subs := range eb.subscribers { + for _, ch := range subs { + close(ch) + } + } + + eb.subscribers = make(map[string][]chan *Event) +} + +// SubscriberCount returns the number of subscribers for a namespace +func (eb *EventBus) SubscriberCount(namespaceID string) int { + eb.mutex.RLock() + defer eb.mutex.RUnlock() + return len(eb.subscribers[namespaceID]) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1181588 --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module git.flowmade.one/flowmade-one/aether + +go 1.23 + +require ( + github.com/google/uuid v1.6.0 + github.com/nats-io/nats.go v1.37.0 +) + +require ( + github.com/klauspost/compress v1.17.2 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/sys v0.16.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8697b29 --- /dev/null +++ b/go.sum @@ -0,0 +1,14 @@ +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/model/model.go b/model/model.go new file mode 100644 index 0000000..b0ae4b0 --- /dev/null +++ b/model/model.go @@ -0,0 +1,47 @@ +package model + +// EventStorming model types + +// Model represents an event storming model +type Model struct { + ID string `json:"id"` + Name string `json:"name"` + Events []DomainEvent `json:"events"` + Commands []Command `json:"commands"` + Aggregates []Aggregate `json:"aggregates"` + Processes []BusinessProcess `json:"processes"` +} + +// DomainEvent represents a domain event definition +type DomainEvent struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Data map[string]string `json:"data"` +} + +// Command represents a command definition +type Command struct { + ID string `json:"id"` + Name string `json:"name"` + Actor string `json:"actor"` + TriggersEvent string `json:"triggersEvent"` + Data map[string]string `json:"data"` +} + +// Aggregate represents an aggregate definition +type Aggregate struct { + ID string `json:"id"` + Name string `json:"name"` + Events []string `json:"events"` + Commands []string `json:"commands"` + Invariants []string `json:"invariants"` +} + +// BusinessProcess represents a business process definition +type BusinessProcess struct { + ID string `json:"id"` + Name string `json:"name"` + TriggerEvents []string `json:"triggerEvents"` + OutputCommands []string `json:"outputCommands"` +} diff --git a/nats_eventbus.go b/nats_eventbus.go new file mode 100644 index 0000000..746bc81 --- /dev/null +++ b/nats_eventbus.go @@ -0,0 +1,159 @@ +package aether + +import ( + "context" + "encoding/json" + "fmt" + "log" + "sync" + + "github.com/google/uuid" + "github.com/nats-io/nats.go" +) + +// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS +type NATSEventBus struct { + *EventBus // Embed base EventBus for local subscriptions + nc *nats.Conn // NATS connection + subscriptions []*nats.Subscription + namespaceSubscribers map[string]int // Track number of subscribers per namespace + nodeID string // Unique ID for this node + mutex sync.Mutex + ctx context.Context + cancel context.CancelFunc +} + +// eventMessage is the wire format for events sent over NATS +type eventMessage struct { + NodeID string `json:"node_id"` + NamespaceID string `json:"namespace_id"` + Event *Event `json:"event"` +} + +// NewNATSEventBus creates a new NATS-backed event bus +func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) { + ctx, cancel := context.WithCancel(context.Background()) + + neb := &NATSEventBus{ + EventBus: NewEventBus(), + nc: nc, + nodeID: uuid.New().String(), + subscriptions: make([]*nats.Subscription, 0), + namespaceSubscribers: make(map[string]int), + ctx: ctx, + cancel: cancel, + } + + return neb, nil +} + +// Subscribe creates a local subscription and ensures NATS subscription exists for the namespace +func (neb *NATSEventBus) Subscribe(namespaceID string) <-chan *Event { + neb.mutex.Lock() + defer neb.mutex.Unlock() + + // Create local subscription first + ch := neb.EventBus.Subscribe(namespaceID) + + // Check if this is the first subscriber for this namespace + count := neb.namespaceSubscribers[namespaceID] + if count == 0 { + // First subscriber - create NATS subscription + subject := fmt.Sprintf("aether.events.%s", namespaceID) + + sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) { + neb.handleNATSEvent(msg) + }) + if err != nil { + log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err) + } else { + neb.subscriptions = append(neb.subscriptions, sub) + log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject) + } + } + + neb.namespaceSubscribers[namespaceID] = count + 1 + + return ch +} + +// Unsubscribe removes a local subscription and cleans up NATS subscription if no more subscribers +func (neb *NATSEventBus) Unsubscribe(namespaceID string, ch <-chan *Event) { + neb.mutex.Lock() + defer neb.mutex.Unlock() + + neb.EventBus.Unsubscribe(namespaceID, ch) + + count := neb.namespaceSubscribers[namespaceID] + if count > 0 { + count-- + neb.namespaceSubscribers[namespaceID] = count + + if count == 0 { + delete(neb.namespaceSubscribers, namespaceID) + log.Printf("[NATSEventBus] No more subscribers for namespace %s on node %s", namespaceID, neb.nodeID) + } + } +} + +// handleNATSEvent processes events received from NATS +func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg) { + var eventMsg eventMessage + if err := json.Unmarshal(msg.Data, &eventMsg); err != nil { + log.Printf("[NATSEventBus] Failed to unmarshal event: %v", err) + return + } + + // Skip events that originated from this node (already delivered locally) + if eventMsg.NodeID == neb.nodeID { + return + } + + // Forward to local EventBus subscribers + neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event) +} + +// Publish publishes an event both locally and to NATS for cross-node broadcasting +func (neb *NATSEventBus) Publish(namespaceID string, event *Event) { + // First publish locally + neb.EventBus.Publish(namespaceID, event) + + // Then publish to NATS for other nodes + subject := fmt.Sprintf("aether.events.%s", namespaceID) + + eventMsg := eventMessage{ + NodeID: neb.nodeID, + NamespaceID: namespaceID, + Event: event, + } + + data, err := json.Marshal(eventMsg) + if err != nil { + log.Printf("[NATSEventBus] Failed to marshal event for NATS: %v", err) + return + } + + if err := neb.nc.Publish(subject, data); err != nil { + log.Printf("[NATSEventBus] Failed to publish event to NATS: %v", err) + return + } +} + +// Stop closes the NATS event bus and all subscriptions +func (neb *NATSEventBus) Stop() { + neb.mutex.Lock() + defer neb.mutex.Unlock() + + neb.cancel() + + for _, sub := range neb.subscriptions { + if err := sub.Unsubscribe(); err != nil { + log.Printf("[NATSEventBus] Error unsubscribing: %v", err) + } + } + neb.subscriptions = nil + + neb.EventBus.Stop() + + log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID) +} diff --git a/store/jetstream.go b/store/jetstream.go new file mode 100644 index 0000000..cb47a23 --- /dev/null +++ b/store/jetstream.go @@ -0,0 +1,218 @@ +package store + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "git.flowmade.one/flowmade-one/aether" + "github.com/nats-io/nats.go" +) + +// JetStreamEventStore implements EventStore using NATS JetStream for persistence +type JetStreamEventStore struct { + js nats.JetStreamContext + streamName string +} + +// NewJetStreamEventStore creates a new JetStream-based event store +func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) { + js, err := natsConn.JetStream() + if err != nil { + return nil, fmt.Errorf("failed to get JetStream context: %w", err) + } + + // Create or update the stream + stream := &nats.StreamConfig{ + Name: streamName, + Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)}, + Storage: nats.FileStorage, + Retention: nats.LimitsPolicy, + MaxAge: 365 * 24 * time.Hour, // Keep events for 1 year + Replicas: 1, // Can be increased for HA + } + + _, err = js.AddStream(stream) + if err != nil && !strings.Contains(err.Error(), "already exists") { + return nil, fmt.Errorf("failed to create stream: %w", err) + } + + return &JetStreamEventStore{ + js: js, + streamName: streamName, + }, nil +} + +// SaveEvent persists an event to JetStream +func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { + // Serialize event to JSON + data, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("failed to marshal event: %w", err) + } + + // Create subject: stream.events.actorType.actorID + subject := fmt.Sprintf("%s.events.%s.%s", + jes.streamName, + sanitizeSubject(extractActorType(event.ActorID)), + sanitizeSubject(event.ActorID)) + + // Publish with event ID as message ID for deduplication + _, err = jes.js.Publish(subject, data, nats.MsgId(event.ID)) + if err != nil { + return fmt.Errorf("failed to publish event to JetStream: %w", err) + } + + return nil +} + +// GetEvents retrieves all events for an actor since a version +func (jes *JetStreamEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) { + // Create subject filter for this actor + subject := fmt.Sprintf("%s.events.%s.%s", + jes.streamName, + sanitizeSubject(extractActorType(actorID)), + sanitizeSubject(actorID)) + + // Create consumer to read events + consumer, err := jes.js.PullSubscribe(subject, "") + if err != nil { + return nil, fmt.Errorf("failed to create consumer: %w", err) + } + defer consumer.Unsubscribe() + + var events []*aether.Event + + // Fetch messages in batches + for { + msgs, err := consumer.Fetch(100, nats.MaxWait(time.Second)) + if err != nil { + if err == nats.ErrTimeout { + break // No more messages + } + return nil, fmt.Errorf("failed to fetch messages: %w", err) + } + + for _, msg := range msgs { + var event aether.Event + if err := json.Unmarshal(msg.Data, &event); err != nil { + continue // Skip malformed events + } + + // Filter by version + if event.Version > fromVersion { + events = append(events, &event) + } + + msg.Ack() + } + + if len(msgs) < 100 { + break // No more messages + } + } + + return events, nil +} + +// GetLatestVersion returns the latest version for an actor +func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) { + events, err := jes.GetEvents(actorID, 0) + if err != nil { + return 0, err + } + + if len(events) == 0 { + return 0, nil + } + + latestVersion := int64(0) + for _, event := range events { + if event.Version > latestVersion { + latestVersion = event.Version + } + } + + return latestVersion, nil +} + +// GetLatestSnapshot gets the most recent snapshot for an actor +func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) { + // Create subject for snapshots + subject := fmt.Sprintf("%s.snapshots.%s.%s", + jes.streamName, + sanitizeSubject(extractActorType(actorID)), + sanitizeSubject(actorID)) + + // Try to get the latest snapshot + consumer, err := jes.js.PullSubscribe(subject, "", nats.DeliverLast()) + if err != nil { + return nil, fmt.Errorf("failed to create snapshot consumer: %w", err) + } + defer consumer.Unsubscribe() + + msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second)) + if err != nil { + if err == nats.ErrTimeout { + return nil, fmt.Errorf("no snapshot found for actor %s", actorID) + } + return nil, fmt.Errorf("failed to fetch snapshot: %w", err) + } + + if len(msgs) == 0 { + return nil, fmt.Errorf("no snapshot found for actor %s", actorID) + } + + var snapshot aether.ActorSnapshot + if err := json.Unmarshal(msgs[0].Data, &snapshot); err != nil { + return nil, fmt.Errorf("failed to unmarshal snapshot: %w", err) + } + + msgs[0].Ack() + return &snapshot, nil +} + +// SaveSnapshot saves a snapshot of actor state +func (jes *JetStreamEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error { + // Serialize snapshot to JSON + data, err := json.Marshal(snapshot) + if err != nil { + return fmt.Errorf("failed to marshal snapshot: %w", err) + } + + // Create subject for snapshots + subject := fmt.Sprintf("%s.snapshots.%s.%s", + jes.streamName, + sanitizeSubject(extractActorType(snapshot.ActorID)), + sanitizeSubject(snapshot.ActorID)) + + // Publish snapshot + _, err = jes.js.Publish(subject, data) + if err != nil { + return fmt.Errorf("failed to publish snapshot to JetStream: %w", err) + } + + return nil +} + +// Helper functions + +// extractActorType extracts the actor type from an actor ID +func extractActorType(actorID string) string { + for i, c := range actorID { + if c == '-' && i > 0 { + return actorID[:i] + } + } + return "unknown" +} + +// sanitizeSubject sanitizes a string for use in NATS subjects +func sanitizeSubject(s string) string { + s = strings.ReplaceAll(s, " ", "_") + s = strings.ReplaceAll(s, ".", "_") + s = strings.ReplaceAll(s, "*", "_") + s = strings.ReplaceAll(s, ">", "_") + return s +} diff --git a/store/memory.go b/store/memory.go new file mode 100644 index 0000000..f0f2eb2 --- /dev/null +++ b/store/memory.go @@ -0,0 +1,60 @@ +package store + +import ( + "git.flowmade.one/flowmade-one/aether" +) + +// InMemoryEventStore provides a simple in-memory event store for testing +type InMemoryEventStore struct { + events map[string][]*aether.Event // actorID -> events +} + +// NewInMemoryEventStore creates a new in-memory event store +func NewInMemoryEventStore() *InMemoryEventStore { + return &InMemoryEventStore{ + events: make(map[string][]*aether.Event), + } +} + +// SaveEvent saves an event to the in-memory store +func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error { + if _, exists := es.events[event.ActorID]; !exists { + es.events[event.ActorID] = make([]*aether.Event, 0) + } + es.events[event.ActorID] = append(es.events[event.ActorID], event) + return nil +} + +// GetEvents retrieves events for an actor from a specific version +func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) { + events, exists := es.events[actorID] + if !exists { + return []*aether.Event{}, nil + } + + var filteredEvents []*aether.Event + for _, event := range events { + if event.Version >= fromVersion { + filteredEvents = append(filteredEvents, event) + } + } + + return filteredEvents, nil +} + +// GetLatestVersion returns the latest version for an actor +func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) { + events, exists := es.events[actorID] + if !exists || len(events) == 0 { + return 0, nil + } + + latestVersion := int64(0) + for _, event := range events { + if event.Version > latestVersion { + latestVersion = event.Version + } + } + + return latestVersion, nil +} diff --git a/vision.md b/vision.md new file mode 100644 index 0000000..31f8bea --- /dev/null +++ b/vision.md @@ -0,0 +1,37 @@ +# Aether Vision + +Distributed actor system with event sourcing for Go, powered by NATS. + +## Organization Context + +This repo is part of Flowmade. See [organization manifesto](../architecture/manifesto.md) for who we are and what we believe. + +## What This Is + +Aether is an open-source infrastructure library for building distributed, event-sourced systems in Go. It provides: + +- **Event sourcing primitives** - Event, EventStore interface, snapshots +- **Event stores** - In-memory (testing) and JetStream (production) +- **Event bus** - Local and NATS-backed pub/sub with namespace isolation +- **Cluster management** - Node discovery, leader election, shard distribution +- **Namespace isolation** - Logical boundaries for multi-scope deployments + +## Who This Serves + +- **Go developers** building distributed systems +- **Teams** implementing event sourcing and CQRS patterns +- **Projects** needing actor-based concurrency with event persistence + +## Goals + +1. **Simple event sourcing** - Clear primitives that compose well +2. **NATS-native** - Built for JetStream, not bolted on +3. **Horizontal scaling** - Consistent hashing, shard migration, leader election +4. **Namespace isolation** - Logical boundaries without infrastructure overhead + +## Non-Goals + +- Opinionated multi-tenancy (product layer concern) +- Domain-specific abstractions (use the primitives) +- GraphQL/REST API generation (build on top) +- UI components (see iris)