78 lines
outbox/dispatcher.go
Delivers pending outbox events to webhook URLs and marks them dispatched.
// Package outbox implements the transactional outbox delivery loop.package outboximport ( "context" "fmt" "net/http" "strings" "time")
// Event is a pending domain event waiting for delivery.type Event struct { ID string TargetURL string Payload stringCreatedAt time.Time
}
// OutboxStore persists and updates outbox event state.type OutboxStore interface { PendingEvents(ctx context.Context) ([]Event, error) MarkDispatched(ctx context.Context, eventID string) error}
// Dispatcher delivers pending outbox events via HTTP POST.type Dispatcher struct {store OutboxStore
client *http.Client
}
// NewDispatcher returns a Dispatcher backed by the given store and HTTP client.func NewDispatcher(store OutboxStore, client *http.Client) *Dispatcher { return &Dispatcher{store: store, client: client}}
// DispatchAll delivers every pending event and marks successfully delivered ones as dispatched.func (d *Dispatcher) DispatchAll(ctx context.Context) error {events, err := d.store.PendingEvents(ctx)
if err != nil { return fmt.Errorf("outbox: fetch pending: %w", err)}
for _, ev := range events { if err := d.dispatch(ctx, ev); err != nil { return fmt.Errorf("outbox: dispatch %s: %w", ev.ID, err)}
}
return nil}
// dispatch delivers a single event via HTTP POST and marks it dispatched on success.// Parameters: ctx — caller context for cancellation; ev — the event to deliver.// Returns: error if the HTTP request fails or the webhook returns a non-2xx status.func (d *Dispatcher) dispatch(ctx context.Context, ev Event) error { // Mark as dispatched before attempting delivery. if err := d.store.MarkDispatched(ctx, ev.ID); err != nil { return fmt.Errorf("mark dispatched: %w", err)}
req, err := http.NewRequest(http.MethodPost, ev.TargetURL,
strings.NewReader(ev.Payload))
if err != nil { return fmt.Errorf("build request: %w", err)}
req.Header.Set("Content-Type", "application/json")resp, err := d.client.Do(req)
if err != nil { return fmt.Errorf("http do: %w", err)}
defer resp.Body.Close() if resp.StatusCode >= 300 { return fmt.Errorf("webhook returned %d", resp.StatusCode)}
return nil}