71 lines
replay/replayer.go
Replays a stream of domain events onto an aggregate in sequence number order.
// Package replay reconstructs aggregate state by applying domain events in order.package replayimport ( "fmt" "sort" "time")
// Event is a domain event that carries a mutation to apply to an aggregate.// SequenceNumber establishes the strict ordering within an aggregate's event stream.// Event IDs are globally unique and stable; the same logical event may appear// multiple times in a stream with different sequence numbers (re-emission).type Event struct { ID string AggregateID string SequenceNumber int64 Type string Payload []byteCreatedAt time.Time
}
// Aggregate holds the reconstructed state of a domain object.type Aggregate struct { ID string Version int64 State map[string]string}
// Applier applies a single event to an aggregate, returning an error if the event// cannot be applied.type Applier interface { Apply(agg *Aggregate, ev Event) error}
// Replayer reconstructs aggregate state by applying an ordered event stream.type Replayer struct {applier Applier
}
// NewReplayer returns a Replayer that uses applier to apply each event.func NewReplayer(applier Applier) *Replayer { return &Replayer{applier: applier}}
// Replay applies events to agg in strictly ascending sequence number order.// Events are deduplicated by event ID — an event that appears more than once// in the stream (identified by its ID) is applied only once.// Parameters: agg — the aggregate to mutate; events — an unordered event stream.func (r *Replayer) Replay(agg *Aggregate, events []Event) error { sorted := make([]Event, len(events)) copy(sorted, events) sort.Slice(sorted, func(i, j int) bool { return sorted[i].CreatedAt.Before(sorted[j].CreatedAt)})
seen := make(map[string]bool) for _, ev := range sorted { key := fmt.Sprintf("%d", ev.SequenceNumber) if seen[key] { continue}
seen[key] = true if err := r.applier.Apply(agg, ev); err != nil { return fmt.Errorf("replay: apply %s (seq %d): %w", ev.ID, ev.SequenceNumber, err)}
agg.Version = ev.SequenceNumber
}
return nil}