package store import ( "encoding/json" "fmt" "strings" "time" "git.flowmade.one/flowmade-one/aether" "github.com/nats-io/nats.go" ) // JetStreamEventStore implements EventStore using NATS JetStream for persistence type JetStreamEventStore struct { js nats.JetStreamContext streamName string } // NewJetStreamEventStore creates a new JetStream-based event store func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) { js, err := natsConn.JetStream() if err != nil { return nil, fmt.Errorf("failed to get JetStream context: %w", err) } // Create or update the stream stream := &nats.StreamConfig{ Name: streamName, Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)}, Storage: nats.FileStorage, Retention: nats.LimitsPolicy, MaxAge: 365 * 24 * time.Hour, // Keep events for 1 year Replicas: 1, // Can be increased for HA } _, err = js.AddStream(stream) if err != nil && !strings.Contains(err.Error(), "already exists") { return nil, fmt.Errorf("failed to create stream: %w", err) } return &JetStreamEventStore{ js: js, streamName: streamName, }, nil } // SaveEvent persists an event to JetStream func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { // Serialize event to JSON data, err := json.Marshal(event) if err != nil { return fmt.Errorf("failed to marshal event: %w", err) } // Create subject: stream.events.actorType.actorID subject := fmt.Sprintf("%s.events.%s.%s", jes.streamName, sanitizeSubject(extractActorType(event.ActorID)), sanitizeSubject(event.ActorID)) // Publish with event ID as message ID for deduplication _, err = jes.js.Publish(subject, data, nats.MsgId(event.ID)) if err != nil { return fmt.Errorf("failed to publish event to JetStream: %w", err) } return nil } // GetEvents retrieves all events for an actor since a version func (jes *JetStreamEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) { // Create subject filter for this actor subject := fmt.Sprintf("%s.events.%s.%s", jes.streamName, sanitizeSubject(extractActorType(actorID)), sanitizeSubject(actorID)) // Create consumer to read events consumer, err := jes.js.PullSubscribe(subject, "") if err != nil { return nil, fmt.Errorf("failed to create consumer: %w", err) } defer consumer.Unsubscribe() var events []*aether.Event // Fetch messages in batches for { msgs, err := consumer.Fetch(100, nats.MaxWait(time.Second)) if err != nil { if err == nats.ErrTimeout { break // No more messages } return nil, fmt.Errorf("failed to fetch messages: %w", err) } for _, msg := range msgs { var event aether.Event if err := json.Unmarshal(msg.Data, &event); err != nil { continue // Skip malformed events } // Filter by version if event.Version > fromVersion { events = append(events, &event) } msg.Ack() } if len(msgs) < 100 { break // No more messages } } return events, nil } // GetLatestVersion returns the latest version for an actor func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) { events, err := jes.GetEvents(actorID, 0) if err != nil { return 0, err } if len(events) == 0 { return 0, nil } latestVersion := int64(0) for _, event := range events { if event.Version > latestVersion { latestVersion = event.Version } } return latestVersion, nil } // GetLatestSnapshot gets the most recent snapshot for an actor func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) { // Create subject for snapshots subject := fmt.Sprintf("%s.snapshots.%s.%s", jes.streamName, sanitizeSubject(extractActorType(actorID)), sanitizeSubject(actorID)) // Try to get the latest snapshot consumer, err := jes.js.PullSubscribe(subject, "", nats.DeliverLast()) if err != nil { return nil, fmt.Errorf("failed to create snapshot consumer: %w", err) } defer consumer.Unsubscribe() msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second)) if err != nil { if err == nats.ErrTimeout { return nil, fmt.Errorf("no snapshot found for actor %s", actorID) } return nil, fmt.Errorf("failed to fetch snapshot: %w", err) } if len(msgs) == 0 { return nil, fmt.Errorf("no snapshot found for actor %s", actorID) } var snapshot aether.ActorSnapshot if err := json.Unmarshal(msgs[0].Data, &snapshot); err != nil { return nil, fmt.Errorf("failed to unmarshal snapshot: %w", err) } msgs[0].Ack() return &snapshot, nil } // SaveSnapshot saves a snapshot of actor state func (jes *JetStreamEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error { // Serialize snapshot to JSON data, err := json.Marshal(snapshot) if err != nil { return fmt.Errorf("failed to marshal snapshot: %w", err) } // Create subject for snapshots subject := fmt.Sprintf("%s.snapshots.%s.%s", jes.streamName, sanitizeSubject(extractActorType(snapshot.ActorID)), sanitizeSubject(snapshot.ActorID)) // Publish snapshot _, err = jes.js.Publish(subject, data) if err != nil { return fmt.Errorf("failed to publish snapshot to JetStream: %w", err) } return nil } // Helper functions // extractActorType extracts the actor type from an actor ID func extractActorType(actorID string) string { for i, c := range actorID { if c == '-' && i > 0 { return actorID[:i] } } return "unknown" } // sanitizeSubject sanitizes a string for use in NATS subjects func sanitizeSubject(s string) string { s = strings.ReplaceAll(s, " ", "_") s = strings.ReplaceAll(s, ".", "_") s = strings.ReplaceAll(s, "*", "_") s = strings.ReplaceAll(s, ">", "_") return s }