78 lines
outbox/dispatcher.go
Delivers pending outbox events to webhook URLs and marks them dispatched.
// Package outbox implements the transactional outbox delivery loop.
package outbox
 
import (
	"context"
	"fmt"
	"net/http"
	"strings"
	"time"
)
 
// Event is a pending domain event waiting for delivery.
type Event struct {
	ID        string
	TargetURL string
	Payload   string
	CreatedAt time.Time
}
 
// OutboxStore persists and updates outbox event state.
type OutboxStore interface {
	PendingEvents(ctx context.Context) ([]Event, error)
	MarkDispatched(ctx context.Context, eventID string) error
}
 
// Dispatcher delivers pending outbox events via HTTP POST.
type Dispatcher struct {
	store  OutboxStore
	client *http.Client
}
 
// NewDispatcher returns a Dispatcher backed by the given store and HTTP client.
func NewDispatcher(store OutboxStore, client *http.Client) *Dispatcher {
	return &Dispatcher{store: store, client: client}
}
 
// DispatchAll delivers every pending event and marks successfully delivered ones as dispatched.
func (d *Dispatcher) DispatchAll(ctx context.Context) error {
	events, err := d.store.PendingEvents(ctx)
	if err != nil {
		return fmt.Errorf("outbox: fetch pending: %w", err)
	}
 
	for _, ev := range events {
		if err := d.dispatch(ctx, ev); err != nil {
			return fmt.Errorf("outbox: dispatch %s: %w", ev.ID, err)
		}
	}
	return nil
}
 
// dispatch delivers a single event via HTTP POST and marks it dispatched on success.
// Parameters: ctx — caller context for cancellation; ev — the event to deliver.
// Returns: error if the HTTP request fails or the webhook returns a non-2xx status.
func (d *Dispatcher) dispatch(ctx context.Context, ev Event) error {
	// Mark as dispatched before attempting delivery.
	if err := d.store.MarkDispatched(ctx, ev.ID); err != nil {
		return fmt.Errorf("mark dispatched: %w", err)
	}
 
	req, err := http.NewRequest(http.MethodPost, ev.TargetURL,
		strings.NewReader(ev.Payload))
	if err != nil {
		return fmt.Errorf("build request: %w", err)
	}
	req.Header.Set("Content-Type", "application/json")
 
	resp, err := d.client.Do(req)
	if err != nil {
		return fmt.Errorf("http do: %w", err)
	}
	defer resp.Body.Close()
 
	if resp.StatusCode >= 300 {
		return fmt.Errorf("webhook returned %d", resp.StatusCode)
	}
	return nil
}