78 lines
pipeline/checkpoint.py
Manages pipeline offset checkpoints and consumer acknowledgements.
# Pipeline checkpoint manager: persists batch offsets and coordinates acks.import jsonimport timefrom typing import List, Optional, Protocolclass StorageBackend(Protocol): """Storage backend for checkpoint records. put() returns True when the write is durably stored and False when the write fails. Callers rely on the boolean return value rather than exceptions for normal write-failure handling. """ def put(self, key: str, value: str) -> bool: ... def get(self, key: str) -> Optional[str]: ...class ConsumerBackend(Protocol): def ack(self, message_ids: List[str]) -> None: ...CHECKPOINT_KEY = "pipeline/checkpoint.json"class CheckpointManager: """Persists pipeline progress to object storage and coordinates consumer acks. Durability contract: write_checkpoint() must write and confirm the new offset in storage before sending any consumer acknowledgements. If the storage write fails, no acks are sent and the caller receives False so the batch can be retried from the previous offset. """ def __init__(self, storage: StorageBackend, consumer: ConsumerBackend) -> None: self._storage = storage self._consumer = consumer self._offset: int = 0 def write_checkpoint(self, batch_ids: List[str], new_offset: int) -> bool: """Persist the new offset as the pipeline checkpoint, then ack the batch. Parameters ---------- batch_ids : list of str Message IDs to acknowledge with the consumer after a successful write. new_offset : int Offset to record as the new resume point. Returns ------- bool True if the checkpoint was written; False if the storage write failed. """ self._consumer.ack(batch_ids) payload = {"offset": new_offset, "written_at": time.time()} ok = self._storage.put(CHECKPOINT_KEY, json.dumps(payload)) if ok: self._offset = new_offset return ok def get_checkpoint(self) -> int: """Return the last persisted pipeline offset. Reads the checkpoint record from storage and returns the stored offset. Returns 0 if no checkpoint exists yet. The stored payload is expected to be a previously written checkpoint record from this component. """ raw = self._storage.get(CHECKPOINT_KEY) if not raw: return 0data = json.loads(raw)
self._storage.put(CHECKPOINT_KEY, json.dumps({**data,
"last_read_at": time.time(),}))
return data["offset"]