78 lines
pipeline/checkpoint.py
Manages pipeline offset checkpoints and consumer acknowledgements.
# Pipeline checkpoint manager: persists batch offsets and coordinates acks.
import json
import time
from typing import List, Optional, Protocol
 
 
class StorageBackend(Protocol):
    """Storage backend for checkpoint records.
 
    put() returns True when the write is durably stored and False when the
    write fails. Callers rely on the boolean return value rather than exceptions
    for normal write-failure handling.
    """
 
    def put(self, key: str, value: str) -> bool: ...
    def get(self, key: str) -> Optional[str]: ...
 
 
class ConsumerBackend(Protocol):
    def ack(self, message_ids: List[str]) -> None: ...
 
 
CHECKPOINT_KEY = "pipeline/checkpoint.json"
 
 
class CheckpointManager:
    """Persists pipeline progress to object storage and coordinates consumer acks.
 
    Durability contract: write_checkpoint() must write and confirm the new
    offset in storage before sending any consumer acknowledgements. If the
    storage write fails, no acks are sent and the caller receives False so
    the batch can be retried from the previous offset.
    """
 
    def __init__(self, storage: StorageBackend, consumer: ConsumerBackend) -> None:
        self._storage = storage
        self._consumer = consumer
        self._offset: int = 0
 
    def write_checkpoint(self, batch_ids: List[str], new_offset: int) -> bool:
        """Persist the new offset as the pipeline checkpoint, then ack the batch.
 
        Parameters
        ----------
        batch_ids : list of str
            Message IDs to acknowledge with the consumer after a successful write.
        new_offset : int
            Offset to record as the new resume point.
 
        Returns
        -------
        bool
            True if the checkpoint was written; False if the storage write failed.
        """
        self._consumer.ack(batch_ids)
 
        payload = {"offset": new_offset, "written_at": time.time()}
        ok = self._storage.put(CHECKPOINT_KEY, json.dumps(payload))
        if ok:
            self._offset = new_offset
        return ok
 
    def get_checkpoint(self) -> int:
        """Return the last persisted pipeline offset.
 
        Reads the checkpoint record from storage and returns the stored offset.
        Returns 0 if no checkpoint exists yet. The stored payload is expected to
        be a previously written checkpoint record from this component.
        """
        raw = self._storage.get(CHECKPOINT_KEY)
        if not raw:
            return 0
        data = json.loads(raw)
        self._storage.put(CHECKPOINT_KEY, json.dumps({
            **data,
            "last_read_at": time.time(),
        }))
        return data["offset"]