Phase 0 — Map Hardening Discovery & Constraints
Date: 2026-01-28Goal: Extract schemas, patterns, infra constraints, and failure evidence before implementing ELT staging-table solution
1. BigQuery Schemas
1.1 transaction_events_raw
Source:bq show payroll-bi-gauntlet.payroll_analytics.transaction_events_rawDDL File:
integration/bigquery/sql/tables/transaction_events_raw.sql
Schema:
PARTITION BY DATE(created_at)Clustering:
CLUSTER BY tenant_id, batch_id, row_indexIdempotency Key:
(tenant_id, batch_id, row_index)
1.2 ingestion_batches
Source:bq show payroll-bi-gauntlet.payroll_analytics.ingestion_batchesDDL File:
integration/bigquery/sql/tables/ingestion_batches.sql
Schema:
PARTITION BY DATE(uploaded_at)Clustering:
CLUSTER BY tenant_id, statusrow_count: Set during upload via
count_csv_rows() / count_xlsx_rows() (trustworthy, computed from file content)
1.3 Staging Tables
Finding: No existing staging tables for intake.Query:
bq ls payroll_analytics | grep -i staging → No results
2. BigQuery Client Write Methods
2.1 Existing Patterns
Pattern 1: Row-by-Row MERGE (Current/map approach)
- File:
api/bigquery/intake_processor_queries.py:24-180 - Method:
upsert_transaction_events_raw_rows() - Approach: Loop through rows, execute MERGE statement per row
- Performance: ⚠️ BOTTLENECK - 2,841 individual BigQuery queries for Dec file
- File:
api/services/onboarding_bigquery.py:18-74 - Method:
trigger_load_job() - Approach:
client.load_table_from_uri(gcs_uri, table_id, job_config) - Usage: Used for async onboarding file processing
- Config:
LoadJobConfigwithsource_format=CSV,autodetect=True,write_disposition=WRITE_TRUNCATE
- File:
api/bigquery/onboarding_queries.py:55-96 - Method:
create_onboarding_file() - Approach: Single-row INSERT via parameterized query
- Usage: Metadata records only
- File:
api/bigquery/discovery_queries.py:94-197 - Method:
discover_batch_businesses() - Approach: Single MERGE statement with
ArrayQueryParameterfor business labels - Usage: Batch business discovery
- File:
api/bigquery/pepm_resolver_queries.py:463-550 - Method:
insert_pepm_state_batch() - Approach: Loop with individual INSERT statements
- Performance: Similar bottleneck to current
/mapapproach
2.2 Helper Modules
- Client:
api/bigquery/client.py-get_client()returnsbigquery.Client - Identifiers:
api/bigquery/identifiers.py-bq_table()for safe table name construction - Onboarding Load:
api/services/onboarding_bigquery.py-trigger_load_job(),poll_job_status()
2.3 Storage Write API
Finding: Not used in codebase. Onlyload_table_from_uri (Load Job API) and parameterized queries.
3. Cloud Run Execution Environment Constraints
Source:gcloud run services describe payroll-pipeline-cbs-api --region us-central1
Constraints:
- Request Timeout:
300 seconds(5 minutes) ⚠️ CRITICAL LIMIT - Memory:
1Gi - CPU:
1 - Concurrency:
80 - Min Instances:
0(cold start possible) - Max Instances: Not explicitly set (defaults apply)
/map endpoint must complete within 300s or request times out, leaving partial durable state.
4. Cloud Run Logs (Last Failing Map Attempt)
Batch ID:f97d7357-986d-46d4-822c-74eb3d467ac3Period Label:
2025-12-01File: December 2025 Combined Payroll.csv (2,841 rows) Log Query Attempts:
- Query 1: Filter by
batch_id→ No results (logs may have expired or batch_id not logged) - Query 2: Filter by
mapkeyword → JSON parsing error (gcloud output format issue)
- Rows Written:
279(partial - 9.8% of total) - Batch Status:
UPLOADED(notMAPPED) - Termination Cause: HTTP Timeout (>300s) - confirmed by:
- Partial write (279 rows persisted)
- Status not updated to
MAPPED - Request timeout observed in client (>300s)
~278 (279 rows written, 0-based indexing)
Trace ID: Not found in logs (may not be logged or logs expired)
5. Row Count Storage
5.1 Where row_count is Set
File:api/routes/intake.py:102-233Function:
upload_file()
Code:
api/bigquery/intake_queries.py:25-172Function:
create_ingestion_batch()Field:
row_count INT64 (nullable, stored in ingestion_batches.row_count)
5.2 Trustworthiness
✅ Trustworthy:row_count is computed from actual file content during upload:
- CSV: Counts lines (excluding header)
- XLSX: Counts data rows (excluding header)
- Set before any processing occurs
- Used for idempotency checks and invariants
/map:
- Not currently used for validation (only
verified_countfromverify_mapped_rows_exist()) - Available for invariant checks:
staged_count == batch["row_count"]
6. Background Job Framework
Finding: No background job framework (Cloud Tasks, Celery, etc.) Evidence:- No
cloud_tasksimports - No
celeryimports - No async job queue patterns
- Onboarding uses async pattern (load job + polling), but no general framework
7. Summary & Recommendations
7.1 Write Strategy Choice
Recommendation: BigQuery Load Job from GCS NDJSON Justification:- Scale: Handles 10-100k rows/week efficiently (single load job vs 2,841+ queries)
- Existing Pattern: Already used in onboarding (
api/services/onboarding_bigquery.py) - Timeout Safety: Load job is async; HTTP request can return 202 and poll status
- Cost: Single load job is cheaper than 2,841+ MERGE queries
- Reliability: BigQuery load jobs are idempotent and handle retries
insertAll in batches
- Pros: Simpler (no GCS intermediate step)
- Cons: Still requires multiple API calls, less efficient for large files, no async pattern
7.2 Implementation Plan
Phase A: Create Staging Table- File:
integration/bigquery/sql/tables/transaction_events_raw_staging.sql - Schema: Mirror
transaction_events_raw(minimal fields for MERGE) - Partitioning:
PARTITION BY DATE(created_at) - Clustering:
CLUSTER BY tenant_id, batch_id, row_index
/map Route
- File:
api/routes/intake.py:316-666 - Changes:
- Parse + validate +
process_row()→ build records list - Write records to GCS as NDJSON
- Trigger BigQuery load job to staging table
- Poll load job completion (with timeout)
- Execute single MERGE from staging → target
- Verify counts (
staged_count == expected_row_count,target_count == expected_row_count) - Set
status=MAPPED+ persistcolumn_mapping+mapping_hash
- Parse + validate +
- File:
api/bigquery/intake_processor_queries.py(new functions) - Functions:
bulk_load_to_staging()- Write NDJSON to GCS, trigger load jobmerge_staging_to_target()- Single MERGE statementverify_staging_counts()- Count verification
- Check
mapping_hashbefore processing - If hash matches and
target_count == expected_row_count, return 200 immediately
- Log:
load_job_id,merge_job_id,staged_count,target_count,trace_id - Include in response:
trace_id,load_job_id(if async),row_count_written
7.3 Files to Modify
-
New Files:
integration/bigquery/sql/tables/transaction_events_raw_staging.sqlapi/bigquery/intake_staging_queries.py(new helper module)
-
Modified Files:
api/routes/intake.py(lines 316-666) -/maproute handlerapi/bigquery/intake_queries.py- Addmapping_hashfield toupdate_batch_mapping()api/schemas/intake.py- Addmapping_hashto response if needed
-
Test Files:
api/tests/test_map_bulk_load.py(new)api/tests/test_map_idempotency.py(new)api/tests/test_map_error_contract.py(new)
8. Next Steps
- ✅ Phase 0 Complete - All artifacts gathered
- ⏸️ Await Approval - Review write strategy choice and implementation plan
- 🔄 Phase 1 - Implement staging table + bulk load + MERGE
- 🔄 Phase 2 - Add tests + observability
- 🔄 Phase 3 - Proof run with Dec file
End of Phase 0 Discovery