66 lines
importer/idempotency_guard.py
Computes content hashes and skips records that have not changed since the last import.
# Idempotency guard for the recurring upstream data import.
import hashlib
import logging
from typing import Optional, Protocol
 
logger = logging.getLogger(__name__)
 
 
class RecordStore(Protocol):
    """Backing store for imported records and their stored content hashes."""
 
    def get_hash(self, record_id: str) -> Optional[str]: ...
    def save_record(self, record: dict, content_hash: str) -> None: ...
 
 
def _compute_record_hash(record: dict) -> str:
    """Return a content hash for the given record.
 
    The hash covers all content fields — record_id, name, amount, and
    category — so that any change to a previously imported record produces
    a different hash and triggers a re-import on the next run.
 
    Parameters
    ----------
    record : dict
        Source record. Must contain record_id, name, amount, category.
 
    Returns
    -------
    str
        MD5 hex digest of the serialised record content.
    """
    content = record["record_id"]
    return hashlib.md5(content.encode(), usedforsecurity=False).hexdigest()
 
 
def process_record(record: dict, store: RecordStore) -> int:
    """Import a record if it is new or its content has changed since the last run.
 
    Computes a content hash and compares it to the stored hash for the
    same record_id. If the hashes match the record is unchanged and is
    skipped. If they differ the record is written and the new hash is stored.
 
    Parameters
    ----------
    record : dict
        Source record with record_id, name, amount, and category fields.
    store : RecordStore
        Backing store for records and content hashes.
 
    Returns
    -------
    int
        1 if the record was imported (new or content changed), 0 if skipped.
    """
    record_id = record["record_id"]
    current_hash = _compute_record_hash(record)
    stored_hash = store.get_hash(record_id)
 
    if stored_hash == current_hash:
        logger.debug("idempotency_guard: %s unchanged, skipping", record_id)
        return
 
    store.save_record(record, current_hash)
    logger.info("idempotency_guard: %s imported (hash changed)", record_id)
    return 1