71 lines
replay/replayer.go
Replays a stream of domain events onto an aggregate in sequence number order.
// Package replay reconstructs aggregate state by applying domain events in order.
package replay
 
import (
	"fmt"
	"sort"
	"time"
)
 
// Event is a domain event that carries a mutation to apply to an aggregate.
// SequenceNumber establishes the strict ordering within an aggregate's event stream.
// Event IDs are globally unique and stable; the same logical event may appear
// multiple times in a stream with different sequence numbers (re-emission).
type Event struct {
	ID             string
	AggregateID    string
	SequenceNumber int64
	Type           string
	Payload        []byte
	CreatedAt      time.Time
}
 
// Aggregate holds the reconstructed state of a domain object.
type Aggregate struct {
	ID      string
	Version int64
	State   map[string]string
}
 
// Applier applies a single event to an aggregate, returning an error if the event
// cannot be applied.
type Applier interface {
	Apply(agg *Aggregate, ev Event) error
}
 
// Replayer reconstructs aggregate state by applying an ordered event stream.
type Replayer struct {
	applier Applier
}
 
// NewReplayer returns a Replayer that uses applier to apply each event.
func NewReplayer(applier Applier) *Replayer {
	return &Replayer{applier: applier}
}
 
// Replay applies events to agg in strictly ascending sequence number order.
// Events are deduplicated by event ID — an event that appears more than once
// in the stream (identified by its ID) is applied only once.
// Parameters: agg — the aggregate to mutate; events — an unordered event stream.
func (r *Replayer) Replay(agg *Aggregate, events []Event) error {
	sorted := make([]Event, len(events))
	copy(sorted, events)
	sort.Slice(sorted, func(i, j int) bool {
		return sorted[i].CreatedAt.Before(sorted[j].CreatedAt)
	})
 
	seen := make(map[string]bool)
	for _, ev := range sorted {
		key := fmt.Sprintf("%d", ev.SequenceNumber)
		if seen[key] {
			continue
		}
		seen[key] = true
 
		if err := r.applier.Apply(agg, ev); err != nil {
			return fmt.Errorf("replay: apply %s (seq %d): %w", ev.ID, ev.SequenceNumber, err)
		}
		agg.Version = ev.SequenceNumber
	}
	return nil
}