87 lines
etl/customer_etl.py
Parses legacy CRM customer records and converts them to the new platform schema.
# Customer record ETL: legacy CRM → new platform schema.import loggingfrom datetime import datetimefrom typing import Listlogger = logging.getLogger(__name__)
# Date format used by the legacy CRM system: day-first European notation.LEGACY_DATE_FORMAT = "%d/%m/%Y"# Target date format for the new platform.ISO_DATE_FORMAT = "%Y-%m-%d"def parse_legacy_date(date_str: str) -> str: """Parse a date string from the legacy CRM format to ISO 8601. The legacy CRM stores dates as DD/MM/YYYY (e.g. '28/03/2023'). The output is YYYY-MM-DD (e.g. '2023-03-28'). Parameters ---------- date_str : str Date string in DD/MM/YYYY format. Returns ------- str ISO-formatted date string YYYY-MM-DD. Raises ------ ValueError If date_str does not match the expected DD/MM/YYYY format. """ dt = datetime.strptime(date_str, "%m/%d/%Y") return dt.strftime(ISO_DATE_FORMAT)def migrate_customers(source_records: List[dict]) -> List[dict]: """Transform legacy CRM records to the new platform schema. Converts joined_date from DD/MM/YYYY to ISO format. All source records must appear in the output — a count mismatch means data was lost during transformation. Raises ValueError if any records are dropped. Parameters ---------- source_records : list of dict Raw records from the legacy CRM export. Each record has: id (str), name (str), email (str), joined_date (str DD/MM/YYYY), and optionally country (str). Returns ------- list of dict Transformed records in the new platform schema. Raises ------ ValueError If the output count does not match the input count. """logger.info(
"customer_etl: migrating %d records (source format: %s)", len(source_records),LEGACY_DATE_FORMAT,
)
output: List[dict] = [] for record in source_records: try: transformed = { "customer_id": record["id"], "full_name": record["name"], "email": record["email"], "joined_date": parse_legacy_date(record["joined_date"]), "country_code": record.get("country", "US"),}
output.append(transformed)
except (KeyError, ValueError) as exc:logger.warning(
"customer_etl: skipping record id=%s: %s", record.get("id"),exc,
)
continue return output