66 lines
importer/idempotency_guard.py
Computes content hashes and skips records that have not changed since the last import.
# Idempotency guard for the recurring upstream data import.import hashlibimport loggingfrom typing import Optional, Protocollogger = logging.getLogger(__name__)
class RecordStore(Protocol): """Backing store for imported records and their stored content hashes.""" def get_hash(self, record_id: str) -> Optional[str]: ... def save_record(self, record: dict, content_hash: str) -> None: ...def _compute_record_hash(record: dict) -> str: """Return a content hash for the given record. The hash covers all content fields — record_id, name, amount, and category — so that any change to a previously imported record produces a different hash and triggers a re-import on the next run. Parameters ---------- record : dict Source record. Must contain record_id, name, amount, category. Returns ------- str MD5 hex digest of the serialised record content. """ content = record["record_id"] return hashlib.md5(content.encode(), usedforsecurity=False).hexdigest()def process_record(record: dict, store: RecordStore) -> int: """Import a record if it is new or its content has changed since the last run. Computes a content hash and compares it to the stored hash for the same record_id. If the hashes match the record is unchanged and is skipped. If they differ the record is written and the new hash is stored. Parameters ---------- record : dict Source record with record_id, name, amount, and category fields. store : RecordStore Backing store for records and content hashes. Returns ------- int 1 if the record was imported (new or content changed), 0 if skipped. """ record_id = record["record_id"]current_hash = _compute_record_hash(record)
stored_hash = store.get_hash(record_id)
if stored_hash == current_hash: logger.debug("idempotency_guard: %s unchanged, skipping", record_id) returnstore.save_record(record, current_hash)
logger.info("idempotency_guard: %s imported (hash changed)", record_id) return 1