Skip to main content

System Metadata Extraction — ELT Transition Validation

Summary (3–6 bullets)

  • Captures schema-level and logic-level metadata required for ELT transition validation.
  • Documents transaction_events_raw field types/modes and complex value handling.
  • Extracts process_row() return contract, nullability behavior, and serialization details.
  • Provides staging context (GCS paths, table existence) and recommended load-job setup.

When to use this (3–6 bullets)

  • When implementing or reviewing ELT ingestion for onboarding/intake events.
  • During schema mapping work between Python payloads and BigQuery structures.
  • When troubleshooting array/null/decimal serialization issues in load jobs.
  • Before creating or validating staging-table strategy for transition rollout.

What you’ll walk away with (2–5 bullets)

  • An implementation-ready field mapping and type contract for ELT handoff.
  • Clear serialization rules for arrays, nulls, and decimal strings.
  • A concrete staging/load configuration baseline to execute safely.
Date: 2026-01-28
Objective: Extract specific schema and logic definitions for “The Architect” to validate the ELT transition

1. Schema Definition

1.1 transaction_events_raw Schema

Source: bq show --schema --format=json payroll-bi-gauntlet.payroll_analytics.transaction_events_raw
DDL File: integration/bigquery/sql/tables/transaction_events_raw.sql
Full JSON Schema:
[
  {"name":"tenant_id","type":"STRING","mode":"REQUIRED"},
  {"name":"batch_id","type":"STRING","mode":"REQUIRED"},
  {"name":"row_index","type":"INTEGER","mode":"REQUIRED"},
  {"name":"source_system","type":"STRING"},
  {"name":"event_type","type":"STRING","mode":"REQUIRED"},
  {"name":"period_label","type":"STRING","mode":"REQUIRED"},
  {"name":"business_name_raw","type":"STRING","mode":"REQUIRED"},
  {"name":"member_id_raw","type":"STRING"},
  {"name":"payee_agent_id_raw","type":"STRING"},
  {"name":"credit_raw","type":"STRING"},
  {"name":"debit_raw","type":"STRING"},
  {"name":"total_raw","type":"STRING"},
  {"name":"period_code_raw","type":"STRING"},
  {"name":"pay_periods_raw","type":"STRING"},
  {"name":"mapped_payload_json","type":"STRING","mode":"REQUIRED"},
  {"name":"row_status","type":"STRING","mode":"REQUIRED"},
  {"name":"error_codes","type":"STRING","mode":"REPEATED"},
  {"name":"warnings","type":"STRING","mode":"REPEATED"},
  {"name":"created_at","type":"TIMESTAMP","mode":"REQUIRED"},
  {"name":"created_by","type":"STRING","mode":"REQUIRED"}
]

1.2 Critical Field Types & Modes

error_codes:
  • Type: STRING
  • Mode: REPEATED (ARRAY<STRING> in BigQuery)
  • Nullable: No (REPEATED fields are arrays, empty array = [])
warnings:
  • Type: STRING
  • Mode: REPEATED (ARRAY<STRING> in BigQuery)
  • Nullable: No (REPEATED fields are arrays, empty array = [])
metadata:
  • Not present in schema (no metadata field exists)
Decimal/Numeric Fields (all stored as STRING per ADR-002):
  • credit_raw: STRING (nullable)
  • debit_raw: STRING (nullable)
  • total_raw: STRING (nullable)
  • Rationale: Audit-grade money handling - no float math, exact decimal precision
Nullable Fields (mode not REQUIRED):
  • source_system: STRING (nullable)
  • member_id_raw: STRING (nullable)
  • payee_agent_id_raw: STRING (nullable)
  • credit_raw: STRING (nullable)
  • debit_raw: STRING (nullable)
  • total_raw: STRING (nullable)
  • period_code_raw: STRING (nullable)
  • pay_periods_raw: STRING (nullable)

2. Logic Extraction: process_row()

2.1 Function Signature

File: api/routes/intake_processor.py:574-713
def process_row(
    row: Dict[str, str],
    row_index: int,
    source_system: str,
    period_label: str  # P0: period_label is wizard input, injected per-row
) -> Dict[str, Any]:

2.2 Return Signature

Return Type: Dict[str, Any] Return Keys (exact structure):
{
    "row_index": int,                    # 0-based row index
    "source_system": str,                 # e.g., "payroll_file"
    "event_type": str,                    # Always "PAYROLL_ROW"
    "period_label": str,                  # From parameter (wizard input)
    "business_name_raw": str,             # Required, never None
    "member_id_raw": str | None,          # Optional, can be None
    "payee_agent_id_raw": str | None,     # Optional, can be None
    "credit_raw": str | None,             # Decimal string or None
    "debit_raw": str | None,              # Decimal string or None
    "total_raw": str | None,              # Decimal string or None (computed if missing)
    "period_code_raw": str | None,        # Optional, can be None
    "pay_periods_raw": str | None,        # Optional, can be None
    "mapped_payload_json": str,           # JSON string (json.dumps(row, sort_keys=True))
    "row_status": str,                    # "ACCEPTED" | "REJECTED"
    "error_codes": List[str] | None,      # Python list or None (empty list becomes None)
    "warnings": List[str] | None          # Python list or None (empty list becomes None)
}

2.3 None Value Handling

Pattern: process_row() returns Python None for optional fields, not empty strings. Examples:
  • member_id_raw: row.get("member_id", "").strip() or None → Returns None if empty
  • credit_raw: (row.get("credit_raw") or row.get("credit") or "").strip() or None → Returns None if empty
  • error_codes: error_codes if error_codes else None → Returns None if list is empty
  • warnings: warnings if warnings else None → Returns None if list is empty
Exception: business_name_raw is never None (required field, defaults to empty string "" if missing, but validation will mark row as REJECTED).

2.4 Complex Type Serialization

error_codes:
  • Python Type: List[str] (e.g., ["MISSING_BUSINESS_NAME", "INVALID_DECIMAL_CREDIT"])
  • Return Value: List[str] | None (empty list becomes None)
  • BigQuery Binding: Must be converted to bigquery.ArrayQueryParameter("error_codes", "STRING", error_codes or [])
  • Note: Current upsert function handles None[] conversion: row.get("error_codes") or []
warnings:
  • Python Type: List[str] (e.g., ["MISSING_MEMBER_ID"])
  • Return Value: List[str] | None (empty list becomes None)
  • BigQuery Binding: Must be converted to bigquery.ArrayQueryParameter("warnings", "STRING", warnings or [])
  • Note: Current upsert function handles None[] conversion: row.get("warnings") or []
mapped_payload_json:
  • Python Type: str (JSON-encoded string)
  • Serialization: json.dumps(row, sort_keys=True) (full input row dict as JSON)
  • BigQuery Type: STRING (not JSON type, stored as text)
Decimal Fields (credit_raw, debit_raw, total_raw):
  • Python Type: str | None (decimal string like "1234.56" or None)
  • Serialization: Already strings (no conversion needed)
  • BigQuery Type: STRING (not NUMERIC - per ADR-002)

2.5 Row Status Logic

row_status = "REJECTED" if error_codes else "ACCEPTED"
Determination:
  • If error_codes list is non-empty → "REJECTED"
  • If error_codes list is empty → "ACCEPTED"
  • Warnings do NOT affect status (only error_codes)

3. Staging Context

3.1 GCS Bucket Naming Convention

Environment Variable: ONBOARDING_STORAGE_BUCKET
Config File: api/config/onboarding.py
Current Patterns:
  1. Intake Upload (api/routes/intake.py:173):
    object_path = f"intake/{tenant_id}/{batch_id}/{filename}"
    storage_uri = f"gs://{ONBOARDING_STORAGE_BUCKET}/{object_path}"
    
    Example: gs://payroll-bi-gauntlet-onboarding-storage/intake/creative_benefit_strategies/f97d7357-986d-46d4-822c-74eb3d467ac3/December 2025 Combnied Payroll.csv
  2. Onboarding Files (api/routes/onboarding.py:107):
    object_name = f"onboarding/tenant={tenant_id}/org={org_id or 'platform'}/file_id={file_id}/{original_filename}"
    storage_uri = f"gs://{ONBOARDING_STORAGE_BUCKET}/{object_name}"
    
  3. Bulk Import (api/services/upload_store.py:274-280):
    csv_path = f"bulk-import/{tenant_id}/{org_key}/{upload_id}.csv"
    metadata_path = f"bulk-import/{tenant_id}/{org_key}/{upload_id}.metadata.json"
    
Recommended Staging Path (for ELT):
staging_path = f"intake-staging/{tenant_id}/{batch_id}/transaction_events_raw.ndjson"
storage_uri = f"gs://{ONBOARDING_STORAGE_BUCKET}/{staging_path}"

3.2 Staging Table Existence

Query: bq ls payroll_analytics | grep -i staging
Result: No existing staging tables found
Conclusion: transaction_events_raw_staging must be created from scratch. Recommended DDL (mirror target schema):
CREATE TABLE IF NOT EXISTS `payroll-bi-gauntlet.payroll_analytics.transaction_events_raw_staging` (
  tenant_id STRING NOT NULL,
  batch_id STRING NOT NULL,
  row_index INT64 NOT NULL,
  source_system STRING,
  event_type STRING NOT NULL,
  period_label STRING NOT NULL,
  business_name_raw STRING NOT NULL,
  member_id_raw STRING,
  payee_agent_id_raw STRING,
  credit_raw STRING,
  debit_raw STRING,
  total_raw STRING,
  period_code_raw STRING,
  pay_periods_raw STRING,
  mapped_payload_json STRING NOT NULL,
  row_status STRING NOT NULL,
  error_codes `ARRAY<STRING>`,
  warnings `ARRAY<STRING>`,
  created_at TIMESTAMP NOT NULL,
  created_by STRING NOT NULL
)
PARTITION BY DATE(created_at)
CLUSTER BY tenant_id, batch_id, row_index;

4. Summary for ELT Implementation

4.1 Schema Mapping (Python → BigQuery)

Python Return ValueBigQuery TypeBinding Method
str (non-null)STRINGScalarQueryParameter("field", "STRING", value)
NoneSTRING (nullable)ScalarQueryParameter("field", "STRING", None)
List[str] (non-empty)ARRAY<STRING>ArrayQueryParameter("field", "STRING", value)
None (for arrays)ARRAY<STRING> (empty)ArrayQueryParameter("field", "STRING", [])
intINT64ScalarQueryParameter("field", "INT64", value)
str (timestamp)TIMESTAMPScalarQueryParameter("field", "TIMESTAMP", value)

4.2 NDJSON Serialization for Load Job

Format: Newline-delimited JSON (one record per line) Example Record:
{"row_index":0,"source_system":"payroll_file","event_type":"PAYROLL_ROW","period_label":"2025-12-01","business_name_raw":"ACME Corp","member_id_raw":"M001","payee_agent_id_raw":"John Doe","credit_raw":"1000.00","debit_raw":"0.00","total_raw":"1000.00","period_code_raw":"Monthly","pay_periods_raw":"1","mapped_payload_json":"{\"business_label\":\"ACME Corp\",\"member_id\":\"M001\",...}","row_status":"ACCEPTED","error_codes":null,"warnings":["MISSING_PAYEE_AGENT_ID"],"created_at":"2026-01-28T12:00:00Z","created_by":"user@example.com"}
Critical:
  • error_codes and warnings must be serialized as JSON arrays ([] for empty, null for None, or ["CODE1","CODE2"] for values)
  • BigQuery Load Job with source_format=NEWLINE_DELIMITED_JSON will parse arrays correctly
  • Decimal strings remain strings (no conversion)

4.3 Load Job Configuration

Recommended Config:
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,  # Replace staging data for this batch
    schema=[
        bigquery.SchemaField("tenant_id", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("batch_id", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("row_index", "INTEGER", mode="REQUIRED"),
        # ... (all fields)
        bigquery.SchemaField("error_codes", "STRING", mode="REPEATED"),
        bigquery.SchemaField("warnings", "STRING", mode="REPEATED"),
    ],
    time_partitioning=bigquery.TimePartitioning(
        field="created_at",
        type_=bigquery.TimePartitioningType.DAY
    ),
    clustering_fields=["tenant_id", "batch_id", "row_index"]
)

End of System Metadata Extraction