Phase 0B — Red-Flag & Alignment Scan
Date: 2026-01-28Scope: Intake persistence only (ELT /map hardening)
Objective: Validate ELT transition feasibility and identify blockers before implementation
1. BigQuery Schema Confirmation
1.1 error_codes & warnings Types
Source:bq show --schema payroll-bi-gauntlet.payroll_analytics.transaction_events_raw
Confirmed:
error_codes:STRING,REPEATED(ARRAY<STRING>)warnings:STRING,REPEATED(ARRAY<STRING>)- No nullable mode (REPEATED arrays are never NULL, empty array =
[])
1.2 updated_at / updated_by Existence
MERGE Statement Reference (api/bigquery/intake_processor_queries.py:99-100,123):
bq show --schema):
- ❌ updated_at: NOT PRESENT in table schema
- ❌ updated_by: NOT PRESENT in table schema
1.3 Required/Not-Null Fields (Staging Must Always Populate)
REQUIRED Fields (mode=“REQUIRED”):tenant_id(STRING)batch_id(STRING)row_index(INTEGER)event_type(STRING)period_label(STRING)business_name_raw(STRING)mapped_payload_json(STRING)row_status(STRING)created_at(TIMESTAMP)created_by(STRING)
- Must populate all REQUIRED fields
- Optional fields can be
NULL(source_system, member_id_raw, payee_agent_id_raw, credit_raw, debit_raw, total_raw, period_code_raw, pay_periods_raw) - Arrays (error_codes, warnings) can be empty
[]but notNULL(REPEATED mode)
2. Load Job Feasibility
2.1 trigger_load_job Support Analysis
Current Implementation (api/services/onboarding_bigquery.py:18-74):
- Format:
bigquery.SourceFormat.CSVonly - Schema:
autodetect=True(no explicit schema) - Write Disposition:
WRITE_TRUNCATEonly
- ✅ NDJSON Support:
bigquery.SourceFormat.NEWLINE_DELIMITED_JSON(supported by BigQuery) - ✅ Explicit Schema:
schema=[...]parameter (supported, must provide) - ✅ WRITE_APPEND:
bigquery.WriteDisposition.WRITE_APPEND(supported)
trigger_load_job() needs modification to support NDJSON + explicit schema + WRITE_APPEND. Can create new function trigger_ndjson_load_job() or extend existing.
2.2 Sync /map Timing Estimate (Dec File: 2,841 rows)
Current Bottleneck: Row-by-row MERGE (2,841 queries) → >300s timeout ELT Path Timing Estimate:- Parse + process_row(): ~5-10s (in-memory, Python)
- Write NDJSON to GCS: ~2-5s (821 KB file)
- Trigger Load Job: ~1s (async job creation, returns immediately)
- Poll Load Job: ~10-30s (BigQuery load job for 2,841 rows NDJSON)
- MERGE Staging → Target: ~5-10s (single MERGE statement)
- Verify Counts: ~2-3s (2 COUNT queries)
- Finalize Status: ~1s (UPDATE batch status)
2.3 Async Fallback Threshold
Recommendation: >10,000 rows → Return 202 + poll status endpoint Rationale:- 10k rows NDJSON ≈ ~3MB file
- Load job timing: ~30-60s (still under 300s, but approaching limit)
- Provides safety margin for network latency, BigQuery queue delays
- Allows graceful degradation for very large files
- Check
len(processed_rows) > ASYNC_THRESHOLD(default 10,000) - If async: Return 202 with
load_job_id,status="PROCESSING" - Frontend polls
/api/v1/intake/{batch_id}/statusuntilstatus="MAPPED"
3. Concurrency/Idempotency
3.1 Call Paths to /map
Direct Call Paths:-
Frontend UI (
dashboard/src/lib/intakeClient.ts):- User clicks “Map Columns” button
POST /api/v1/intake/{batch_id}/map- Single user action, no concurrent calls expected
-
API Route (
api/routes/intake.py:316):@router.post("/{batch_id}/map")- RBAC:
require_admin_or_ceo - No background jobs or async tasks call this endpoint
3.2 MAPPING_IN_PROGRESS Lock Strategy
Current State: ❌ NO LOCKING MECHANISM Status Transitions:UPLOADED→MAPPED(current)MAPPED→MAPPED(re-materialize if mapping changed)
- User clicks “Map” twice rapidly → Both requests fetch
status=UPLOADED - Both proceed to materialize rows → Duplicate writes (idempotent via MERGE, but wasteful)
- Both attempt
UPDATE status=MAPPED→ Last write wins (no conflict)
- Status is not
UPLOADED→ Return 409 CONFLICT (“Mapping already in progress or completed”)
- Lock acquired → Proceed with materialization
- On success:
UPDATE status='MAPPED' - On failure:
UPDATE status='UPLOADED'(rollback) ORUPDATE status='ERROR'
3.3 Idempotent Shortcut (Stale Status Handling)
Current Idempotency (api/routes/intake.py:368-398):
- Checks
status == "MAPPED"ANDverified_count > 0ANDmapping_hash matches - Returns 200 immediately (short-circuit)
- Request 1: Materializes rows, writes to BigQuery, but
UPDATE status='MAPPED'fails - Request 2: Fetches batch →
status=UPLOADED(stale) - Request 2: Checks
verified_count→ Finds rows exist → Comparesmapping_hash→ Matches → Returns 200 ✅
verified_count and mapping_hash, not status alone.
Enhancement Needed: Add mapping_hash field to ingestion_batches table (currently not stored, only computed on-the-fly).
4. Audit Semantics
4.1 created_at / created_by Immutability
MERGE Statement (api/bigquery/intake_processor_queries.py:122-123):
- Must preserve immutability:
created_atandcreated_byfrom staging (source) are used for INSERT - On MATCHED: Do NOT update
created_at/created_by(only update data fields)
4.2 Lineage Retention (NDJSON Deletion)
Required Lineage (if NDJSON is deleted):- ✅ trace_id: Already logged and returned in response
- ✅ load_job_id: Must be logged and optionally stored in
ingestion_batchesmetadata - ✅ merge_job_id: Must be logged (from MERGE query job)
- ⚠️ gcs_uri: Optional - Can be stored in
ingestion_batchesmetadata field (if added) or logged only
- Store in
ingestion_batches: Add optionalmetadataJSON field (or reuse existing fields) - Log: All job IDs + trace_id + gcs_uri (for Cloud Logging searchability)
- TTL: Delete NDJSON after 7 days (or after MERGE succeeds + 1 day grace period)
5. Failure-Mode Matrix
5.1 Failure Points & Error Contracts
| Failure Point | Status Transition | Error Contract Fields | HTTP Status |
|---|---|---|---|
| GCS Write Failure | UPLOADED → UPLOADED (no change) | error="GCSWriteFailed", reason="upload_failed", trace_id, batch_id, gcs_uri | 500 |
| Load Job Creation Failure | UPLOADED → UPLOADED (no change) | error="LoadJobCreationFailed", reason="bigquery_unavailable", trace_id, batch_id, gcs_uri | 503 |
| Load Job Poll Timeout | UPLOADED → MAPPING_IN_PROGRESS (stuck) | error="LoadJobTimeout", reason="job_timeout", trace_id, batch_id, load_job_id | 504 |
| Load Job Failed | MAPPING_IN_PROGRESS → UPLOADED (rollback) | error="LoadJobFailed", reason="load_job_error", trace_id, batch_id, load_job_id, error_details | 500 |
| Staged Count Mismatch | MAPPING_IN_PROGRESS → UPLOADED (rollback) | error="StagedCountMismatch", reason="count_verification_failed", trace_id, batch_id, expected_count, staged_count | 500 |
| MERGE Failure | MAPPING_IN_PROGRESS → UPLOADED (rollback) | error="MergeFailed", reason="bigquery_query_failed", trace_id, batch_id, merge_job_id | 503 |
| Target Count Mismatch | MAPPING_IN_PROGRESS → UPLOADED (rollback) | error="TargetCountMismatch", reason="count_verification_failed", trace_id, batch_id, expected_count, target_count, staged_count | 500 |
| Finalize Status Failure | MAPPING_IN_PROGRESS → MAPPING_IN_PROGRESS (stuck) | error="StatusUpdateFailed", reason="bigquery_query_failed", trace_id, batch_id | 500 |
| Cleanup Failure | MAPPED → MAPPED (no change, best-effort) | Logged only (non-blocking) | N/A |
6. Cleanup Safety
6.1 TTL vs Delete-by-Batch
Option A: TTL (Time-Based)- Pattern: GCS lifecycle policy (delete objects older than 7 days)
- Pros: Automatic, no code complexity
- Cons: Cannot delete immediately after success, may retain failed staging data
- Pattern: After MERGE succeeds + count verification, delete
gs://bucket/intake-staging/{tenant_id}/{batch_id}/* - Pros: Immediate cleanup, only deletes successful batches
- Cons: Requires explicit cleanup code, must handle failures gracefully
- Immediate: Delete staging NDJSON after MERGE succeeds (best-effort, non-blocking)
- Fallback: GCS lifecycle policy (delete objects older than 7 days) for orphaned staging files
6.2 Cleanup Isolation (Cannot Delete Other Batches)
GCS Path Pattern:- ✅ Tenant isolation: Path includes
tenant_id(from JWT, not user input) - ✅ Batch isolation: Path includes
batch_id(from route parameter, validated UUID) - ✅ No wildcards: Delete specific blob, not prefix-based delete
- ✅ Best-effort: Wrap in try/except, log but don’t fail if delete fails
batch_id is validated as UUID (already done in route: batch_id: uuid.UUID)
7. Red Flags Identified
🔴 Critical (Must Fix Before Implementation)
-
No MAPPING_IN_PROGRESS Lock: Race condition possible if user clicks “Map” twice rapidly
- Fix: Add atomic status transition check
-
mapping_hash Not Stored: Idempotency relies on computing hash on-the-fly
- Fix: Store
mapping_hashiningestion_batchesduringupdate_batch_mapping()
- Fix: Store
-
updated_at/updated_by Referenced But Don’t Exist: MERGE statement references non-existent columns
- Fix: Remove from staging MERGE statement (safe, but cleanup needed)
🟡 Medium (Should Fix)
-
No Async Threshold: All requests are sync, even for very large files
- Fix: Add async fallback for >10k rows
-
No Cleanup Strategy: Staging NDJSON files may accumulate
- Fix: Implement hybrid cleanup (immediate + TTL)
🟢 Low (Nice to Have)
- No Metadata Field: Cannot store load_job_id, merge_job_id, staging_gcs_uri
- Fix: Add
metadataJSON field toingestion_batches(optional)
- Fix: Add
8. Implementation Readiness
✅ Ready to Proceed
- Schema confirmed (error_codes, warnings are REPEATED STRING)
- Load job supports NDJSON + explicit schema + WRITE_APPEND
- Timing estimate: 26-60s for 2,841 rows (well under 300s)
- Idempotency works even with stale status
- Audit semantics preserved (created_at/created_by immutable)
⚠️ Must Address Before Implementation
- Add
MAPPING_IN_PROGRESSstatus + atomic transition - Store
mapping_hashiningestion_batches - Remove
updated_at/updated_byfrom staging MERGE (they don’t exist) - Implement structured error contracts for all failure points
- Add cleanup logic (immediate delete + TTL fallback)
End of Phase 0B Red-Flag & Alignment Scan