Skip to main content

Phase 0 — Map Hardening Discovery & Constraints

Date: 2026-01-28
Goal: Extract schemas, patterns, infra constraints, and failure evidence before implementing ELT staging-table solution

1. BigQuery Schemas

1.1 transaction_events_raw

Source: bq show payroll-bi-gauntlet.payroll_analytics.transaction_events_raw
DDL File: integration/bigquery/sql/tables/transaction_events_raw.sql
Schema:
{
  "fields": [
    {"name": "tenant_id", "type": "STRING", "mode": "REQUIRED"},
    {"name": "batch_id", "type": "STRING", "mode": "REQUIRED"},
    {"name": "row_index", "type": "INTEGER", "mode": "REQUIRED"},
    {"name": "source_system", "type": "STRING"},
    {"name": "event_type", "type": "STRING", "mode": "REQUIRED"},
    {"name": "period_label", "type": "STRING", "mode": "REQUIRED"},
    {"name": "business_name_raw", "type": "STRING", "mode": "REQUIRED"},
    {"name": "member_id_raw", "type": "STRING"},
    {"name": "payee_agent_id_raw", "type": "STRING"},
    {"name": "credit_raw", "type": "STRING"},
    {"name": "debit_raw", "type": "STRING"},
    {"name": "total_raw", "type": "STRING"},
    {"name": "period_code_raw", "type": "STRING"},
    {"name": "pay_periods_raw", "type": "STRING"},
    {"name": "mapped_payload_json", "type": "STRING", "mode": "REQUIRED"},
    {"name": "row_status", "type": "STRING", "mode": "REQUIRED"},
    {"name": "error_codes", "type": "STRING", "mode": "REPEATED"},
    {"name": "warnings", "type": "STRING", "mode": "REPEATED"},
    {"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
    {"name": "created_by", "type": "STRING", "mode": "REQUIRED"}
  ]
}
Partitioning: PARTITION BY DATE(created_at)
Clustering: CLUSTER BY tenant_id, batch_id, row_index
Idempotency Key: (tenant_id, batch_id, row_index)

1.2 ingestion_batches

Source: bq show payroll-bi-gauntlet.payroll_analytics.ingestion_batches
DDL File: integration/bigquery/sql/tables/ingestion_batches.sql
Schema:
{
  "fields": [
    {"name": "batch_id", "type": "STRING", "mode": "REQUIRED"},
    {"name": "tenant_id", "type": "STRING", "mode": "REQUIRED"},
    {"name": "org_id", "type": "STRING"},
    {"name": "status", "type": "STRING", "mode": "REQUIRED"},
    {"name": "original_filename", "type": "STRING", "mode": "REQUIRED"},
    {"name": "file_hash_sha256", "type": "STRING", "mode": "REQUIRED"},
    {"name": "uploaded_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
    {"name": "uploaded_by", "type": "STRING", "mode": "REQUIRED"},
    {"name": "storage_uri", "type": "STRING", "mode": "REQUIRED"},
    {"name": "row_count", "type": "INTEGER"},
    {"name": "column_mapping", "type": "STRING"},
    {"name": "created_at", "type": "TIMESTAMP", "mode": "REQUIRED"},
    {"name": "updated_at", "type": "TIMESTAMP", "mode": "REQUIRED"}
  ]
}
Partitioning: PARTITION BY DATE(uploaded_at)
Clustering: CLUSTER BY tenant_id, status
row_count: Set during upload via count_csv_rows() / count_xlsx_rows() (trustworthy, computed from file content)

1.3 Staging Tables

Finding: No existing staging tables for intake.
Query: bq ls payroll_analytics | grep -i staging → No results

2. BigQuery Client Write Methods

2.1 Existing Patterns

Pattern 1: Row-by-Row MERGE (Current /map approach)
  • File: api/bigquery/intake_processor_queries.py:24-180
  • Method: upsert_transaction_events_raw_rows()
  • Approach: Loop through rows, execute MERGE statement per row
  • Performance: ⚠️ BOTTLENECK - 2,841 individual BigQuery queries for Dec file
Pattern 2: Load Job from GCS (Onboarding)
  • File: api/services/onboarding_bigquery.py:18-74
  • Method: trigger_load_job()
  • Approach: client.load_table_from_uri(gcs_uri, table_id, job_config)
  • Usage: Used for async onboarding file processing
  • Config: LoadJobConfig with source_format=CSV, autodetect=True, write_disposition=WRITE_TRUNCATE
Pattern 3: Direct INSERT (Onboarding)
  • File: api/bigquery/onboarding_queries.py:55-96
  • Method: create_onboarding_file()
  • Approach: Single-row INSERT via parameterized query
  • Usage: Metadata records only
Pattern 4: MERGE with Array Parameters (Discovery)
  • File: api/bigquery/discovery_queries.py:94-197
  • Method: discover_batch_businesses()
  • Approach: Single MERGE statement with ArrayQueryParameter for business labels
  • Usage: Batch business discovery
Pattern 5: Row-by-Row INSERT (PEPM)
  • File: api/bigquery/pepm_resolver_queries.py:463-550
  • Method: insert_pepm_state_batch()
  • Approach: Loop with individual INSERT statements
  • Performance: Similar bottleneck to current /map approach

2.2 Helper Modules

  • Client: api/bigquery/client.py - get_client() returns bigquery.Client
  • Identifiers: api/bigquery/identifiers.py - bq_table() for safe table name construction
  • Onboarding Load: api/services/onboarding_bigquery.py - trigger_load_job(), poll_job_status()

2.3 Storage Write API

Finding: Not used in codebase. Only load_table_from_uri (Load Job API) and parameterized queries.

3. Cloud Run Execution Environment Constraints

Source: gcloud run services describe payroll-pipeline-cbs-api --region us-central1 Constraints:
  • Request Timeout: 300 seconds (5 minutes) ⚠️ CRITICAL LIMIT
  • Memory: 1Gi
  • CPU: 1
  • Concurrency: 80
  • Min Instances: 0 (cold start possible)
  • Max Instances: Not explicitly set (defaults apply)
Implication: /map endpoint must complete within 300s or request times out, leaving partial durable state.

4. Cloud Run Logs (Last Failing Map Attempt)

Batch ID: f97d7357-986d-46d4-822c-74eb3d467ac3
Period Label: 2025-12-01
File: December 2025 Combined Payroll.csv (2,841 rows)
Log Query Attempts:
  • Query 1: Filter by batch_id → No results (logs may have expired or batch_id not logged)
  • Query 2: Filter by map keyword → JSON parsing error (gcloud output format issue)
Known State from BigQuery:
  • Rows Written: 279 (partial - 9.8% of total)
  • Batch Status: UPLOADED (not MAPPED)
  • Termination Cause: HTTP Timeout (>300s) - confirmed by:
    1. Partial write (279 rows persisted)
    2. Status not updated to MAPPED
    3. Request timeout observed in client (>300s)
Last Successful Row Index: ~278 (279 rows written, 0-based indexing) Trace ID: Not found in logs (may not be logged or logs expired)

5. Row Count Storage

5.1 Where row_count is Set

File: api/routes/intake.py:102-233
Function: upload_file()
Code:
if file_ext == '.csv':
    row_count = count_csv_rows(file_content)
else:  # .xlsx
    row_count = count_xlsx_rows(temp_path)
Storage: api/bigquery/intake_queries.py:25-172
Function: create_ingestion_batch()
Field: row_count INT64 (nullable, stored in ingestion_batches.row_count)

5.2 Trustworthiness

✅ Trustworthy: row_count is computed from actual file content during upload:
  • CSV: Counts lines (excluding header)
  • XLSX: Counts data rows (excluding header)
  • Set before any processing occurs
  • Used for idempotency checks and invariants
Usage in /map:
  • Not currently used for validation (only verified_count from verify_mapped_rows_exist())
  • Available for invariant checks: staged_count == batch["row_count"]

6. Background Job Framework

Finding: No background job framework (Cloud Tasks, Celery, etc.) Evidence:
  • No cloud_tasks imports
  • No celery imports
  • No async job queue patterns
  • Onboarding uses async pattern (load job + polling), but no general framework
Implication: ELT solution must be synchronous (within HTTP request) or use BigQuery load job + polling pattern similar to onboarding.

7. Summary & Recommendations

7.1 Write Strategy Choice

Recommendation: BigQuery Load Job from GCS NDJSON Justification:
  1. Scale: Handles 10-100k rows/week efficiently (single load job vs 2,841+ queries)
  2. Existing Pattern: Already used in onboarding (api/services/onboarding_bigquery.py)
  3. Timeout Safety: Load job is async; HTTP request can return 202 and poll status
  4. Cost: Single load job is cheaper than 2,841+ MERGE queries
  5. Reliability: BigQuery load jobs are idempotent and handle retries
Alternative Considered: insertAll in batches
  • Pros: Simpler (no GCS intermediate step)
  • Cons: Still requires multiple API calls, less efficient for large files, no async pattern

7.2 Implementation Plan

Phase A: Create Staging Table
  • File: integration/bigquery/sql/tables/transaction_events_raw_staging.sql
  • Schema: Mirror transaction_events_raw (minimal fields for MERGE)
  • Partitioning: PARTITION BY DATE(created_at)
  • Clustering: CLUSTER BY tenant_id, batch_id, row_index
Phase B: Update /map Route
  • File: api/routes/intake.py:316-666
  • Changes:
    1. Parse + validate + process_row() → build records list
    2. Write records to GCS as NDJSON
    3. Trigger BigQuery load job to staging table
    4. Poll load job completion (with timeout)
    5. Execute single MERGE from staging → target
    6. Verify counts (staged_count == expected_row_count, target_count == expected_row_count)
    7. Set status=MAPPED + persist column_mapping + mapping_hash
Phase C: Add Helper Functions
  • File: api/bigquery/intake_processor_queries.py (new functions)
  • Functions:
    • bulk_load_to_staging() - Write NDJSON to GCS, trigger load job
    • merge_staging_to_target() - Single MERGE statement
    • verify_staging_counts() - Count verification
Phase D: Idempotency
  • Check mapping_hash before processing
  • If hash matches and target_count == expected_row_count, return 200 immediately
Phase E: Observability
  • Log: load_job_id, merge_job_id, staged_count, target_count, trace_id
  • Include in response: trace_id, load_job_id (if async), row_count_written

7.3 Files to Modify

  1. New Files:
    • integration/bigquery/sql/tables/transaction_events_raw_staging.sql
    • api/bigquery/intake_staging_queries.py (new helper module)
  2. Modified Files:
    • api/routes/intake.py (lines 316-666) - /map route handler
    • api/bigquery/intake_queries.py - Add mapping_hash field to update_batch_mapping()
    • api/schemas/intake.py - Add mapping_hash to response if needed
  3. Test Files:
    • api/tests/test_map_bulk_load.py (new)
    • api/tests/test_map_idempotency.py (new)
    • api/tests/test_map_error_contract.py (new)

8. Next Steps

  1. Phase 0 Complete - All artifacts gathered
  2. ⏸️ Await Approval - Review write strategy choice and implementation plan
  3. 🔄 Phase 1 - Implement staging table + bulk load + MERGE
  4. 🔄 Phase 2 - Add tests + observability
  5. 🔄 Phase 3 - Proof run with Dec file

End of Phase 0 Discovery