87 lines
etl/customer_etl.py
Parses legacy CRM customer records and converts them to the new platform schema.
# Customer record ETL: legacy CRM → new platform schema.
import logging
from datetime import datetime
from typing import List
 
logger = logging.getLogger(__name__)
 
# Date format used by the legacy CRM system: day-first European notation.
LEGACY_DATE_FORMAT = "%d/%m/%Y"
# Target date format for the new platform.
ISO_DATE_FORMAT = "%Y-%m-%d"
 
 
def parse_legacy_date(date_str: str) -> str:
    """Parse a date string from the legacy CRM format to ISO 8601.
 
    The legacy CRM stores dates as DD/MM/YYYY (e.g. '28/03/2023').
    The output is YYYY-MM-DD (e.g. '2023-03-28').
 
    Parameters
    ----------
    date_str : str
        Date string in DD/MM/YYYY format.
 
    Returns
    -------
    str
        ISO-formatted date string YYYY-MM-DD.
 
    Raises
    ------
    ValueError
        If date_str does not match the expected DD/MM/YYYY format.
    """
    dt = datetime.strptime(date_str, "%m/%d/%Y")
    return dt.strftime(ISO_DATE_FORMAT)
 
 
def migrate_customers(source_records: List[dict]) -> List[dict]:
    """Transform legacy CRM records to the new platform schema.
 
    Converts joined_date from DD/MM/YYYY to ISO format. All source records
    must appear in the output — a count mismatch means data was lost during
    transformation. Raises ValueError if any records are dropped.
 
    Parameters
    ----------
    source_records : list of dict
        Raw records from the legacy CRM export. Each record has:
        id (str), name (str), email (str), joined_date (str DD/MM/YYYY),
        and optionally country (str).
 
    Returns
    -------
    list of dict
        Transformed records in the new platform schema.
 
    Raises
    ------
    ValueError
        If the output count does not match the input count.
    """
    logger.info(
        "customer_etl: migrating %d records (source format: %s)",
        len(source_records),
        LEGACY_DATE_FORMAT,
    )
    output: List[dict] = []
    for record in source_records:
        try:
            transformed = {
                "customer_id": record["id"],
                "full_name": record["name"],
                "email": record["email"],
                "joined_date": parse_legacy_date(record["joined_date"]),
                "country_code": record.get("country", "US"),
            }
            output.append(transformed)
        except (KeyError, ValueError) as exc:
            logger.warning(
                "customer_etl: skipping record id=%s: %s",
                record.get("id"),
                exc,
            )
            continue
 
    return output