Skip to main content

Phase 0B — Red-Flag & Alignment Scan

Date: 2026-01-28
Scope: Intake persistence only (ELT /map hardening)
Objective: Validate ELT transition feasibility and identify blockers before implementation

1. BigQuery Schema Confirmation

1.1 error_codes & warnings Types

Source: bq show --schema payroll-bi-gauntlet.payroll_analytics.transaction_events_raw Confirmed:
  • error_codes: STRING, REPEATED (ARRAY<STRING>)
  • warnings: STRING, REPEATED (ARRAY<STRING>)
  • No nullable mode (REPEATED arrays are never NULL, empty array = [])

1.2 updated_at / updated_by Existence

MERGE Statement Reference (api/bigquery/intake_processor_queries.py:99-100,123):
CURRENT_TIMESTAMP() AS updated_at,
@created_by AS updated_by
-- Comment: "Optional: updated_at/updated_by would be set here if columns exist"
Actual Schema (from bq show --schema):
  • updated_at: NOT PRESENT in table schema
  • updated_by: NOT PRESENT in table schema
Finding: MERGE statement references non-existent columns. This is safe (BigQuery ignores non-existent columns in SELECT), but staging table should NOT include these fields.

1.3 Required/Not-Null Fields (Staging Must Always Populate)

REQUIRED Fields (mode=“REQUIRED”):
  1. tenant_id (STRING)
  2. batch_id (STRING)
  3. row_index (INTEGER)
  4. event_type (STRING)
  5. period_label (STRING)
  6. business_name_raw (STRING)
  7. mapped_payload_json (STRING)
  8. row_status (STRING)
  9. created_at (TIMESTAMP)
  10. created_by (STRING)
Staging Table Requirements:
  • Must populate all REQUIRED fields
  • Optional fields can be NULL (source_system, member_id_raw, payee_agent_id_raw, credit_raw, debit_raw, total_raw, period_code_raw, pay_periods_raw)
  • Arrays (error_codes, warnings) can be empty [] but not NULL (REPEATED mode)

2. Load Job Feasibility

2.1 trigger_load_job Support Analysis

Current Implementation (api/services/onboarding_bigquery.py:18-74):
  • Format: bigquery.SourceFormat.CSV only
  • Schema: autodetect=True (no explicit schema)
  • Write Disposition: WRITE_TRUNCATE only
Required for ELT:
  • NDJSON Support: bigquery.SourceFormat.NEWLINE_DELIMITED_JSON (supported by BigQuery)
  • Explicit Schema: schema=[...] parameter (supported, must provide)
  • WRITE_APPEND: bigquery.WriteDisposition.WRITE_APPEND (supported)
Conclusion: trigger_load_job() needs modification to support NDJSON + explicit schema + WRITE_APPEND. Can create new function trigger_ndjson_load_job() or extend existing.

2.2 Sync /map Timing Estimate (Dec File: 2,841 rows)

Current Bottleneck: Row-by-row MERGE (2,841 queries) → >300s timeout ELT Path Timing Estimate:
  1. Parse + process_row(): ~5-10s (in-memory, Python)
  2. Write NDJSON to GCS: ~2-5s (821 KB file)
  3. Trigger Load Job: ~1s (async job creation, returns immediately)
  4. Poll Load Job: ~10-30s (BigQuery load job for 2,841 rows NDJSON)
  5. MERGE Staging → Target: ~5-10s (single MERGE statement)
  6. Verify Counts: ~2-3s (2 COUNT queries)
  7. Finalize Status: ~1s (UPDATE batch status)
Total Estimate: ~26-60s (well under 300s limit) Confidence: ✅ HIGH - Load job for 2,841 rows NDJSON typically completes in 10-30s. Sync path is feasible.

2.3 Async Fallback Threshold

Recommendation: >10,000 rows → Return 202 + poll status endpoint Rationale:
  • 10k rows NDJSON ≈ ~3MB file
  • Load job timing: ~30-60s (still under 300s, but approaching limit)
  • Provides safety margin for network latency, BigQuery queue delays
  • Allows graceful degradation for very large files
Implementation:
  • Check len(processed_rows) > ASYNC_THRESHOLD (default 10,000)
  • If async: Return 202 with load_job_id, status="PROCESSING"
  • Frontend polls /api/v1/intake/{batch_id}/status until status="MAPPED"

3. Concurrency/Idempotency

3.1 Call Paths to /map

Direct Call Paths:
  1. Frontend UI (dashboard/src/lib/intakeClient.ts):
    • User clicks “Map Columns” button
    • POST /api/v1/intake/{batch_id}/map
    • Single user action, no concurrent calls expected
  2. API Route (api/routes/intake.py:316):
    • @router.post("/{batch_id}/map")
    • RBAC: require_admin_or_ceo
    • No background jobs or async tasks call this endpoint
Finding: ✅ Single call path - No concurrent callers identified. Race conditions possible only if user clicks “Map” multiple times rapidly.

3.2 MAPPING_IN_PROGRESS Lock Strategy

Current State: ❌ NO LOCKING MECHANISM Status Transitions:
  • UPLOADEDMAPPED (current)
  • MAPPEDMAPPED (re-materialize if mapping changed)
Race Condition Risk:
  • User clicks “Map” twice rapidly → Both requests fetch status=UPLOADED
  • Both proceed to materialize rows → Duplicate writes (idempotent via MERGE, but wasteful)
  • Both attempt UPDATE status=MAPPED → Last write wins (no conflict)
Recommended Lock Strategy:
# Step 1: Atomic status check + transition
UPDATE ingestion_batches
SET status = 'MAPPING_IN_PROGRESS'
WHERE batch_id = @batch_id
  AND tenant_id = @tenant_id
  AND status = 'UPLOADED'  -- Only transition from UPLOADED
If UPDATE affects 0 rows:
  • Status is not UPLOADED → Return 409 CONFLICT (“Mapping already in progress or completed”)
If UPDATE affects 1 row:
  • Lock acquired → Proceed with materialization
  • On success: UPDATE status='MAPPED'
  • On failure: UPDATE status='UPLOADED' (rollback) OR UPDATE status='ERROR'
Alternative (Simpler): Use idempotency check (mapping_hash) to short-circuit duplicate requests without locking.

3.3 Idempotent Shortcut (Stale Status Handling)

Current Idempotency (api/routes/intake.py:368-398):
  • Checks status == "MAPPED" AND verified_count > 0 AND mapping_hash matches
  • Returns 200 immediately (short-circuit)
Stale Status Scenario:
  • Request 1: Materializes rows, writes to BigQuery, but UPDATE status='MAPPED' fails
  • Request 2: Fetches batch → status=UPLOADED (stale)
  • Request 2: Checks verified_count → Finds rows exist → Compares mapping_hash → Matches → Returns 200 ✅
Finding: ✅ Idempotent shortcut works even if status is stale - Relies on verified_count and mapping_hash, not status alone. Enhancement Needed: Add mapping_hash field to ingestion_batches table (currently not stored, only computed on-the-fly).

4. Audit Semantics

4.1 created_at / created_by Immutability

MERGE Statement (api/bigquery/intake_processor_queries.py:122-123):
-- Audit-grade: created_at and created_by are NOT updated on MATCHED (immutable)
-- Optional: updated_at/updated_by would be set here if columns exist
Confirmed: ✅ created_at and created_by are NOT in UPDATE SET clause - Immutability preserved. Staging → Target MERGE:
  • Must preserve immutability: created_at and created_by from staging (source) are used for INSERT
  • On MATCHED: Do NOT update created_at/created_by (only update data fields)

4.2 Lineage Retention (NDJSON Deletion)

Required Lineage (if NDJSON is deleted):
  1. trace_id: Already logged and returned in response
  2. load_job_id: Must be logged and optionally stored in ingestion_batches metadata
  3. merge_job_id: Must be logged (from MERGE query job)
  4. ⚠️ gcs_uri: Optional - Can be stored in ingestion_batches metadata field (if added) or logged only
Recommendation:
  • Store in ingestion_batches: Add optional metadata JSON field (or reuse existing fields)
  • Log: All job IDs + trace_id + gcs_uri (for Cloud Logging searchability)
  • TTL: Delete NDJSON after 7 days (or after MERGE succeeds + 1 day grace period)
Metadata Field Proposal:
ALTER TABLE ingestion_batches ADD COLUMN IF NOT EXISTS metadata STRING;
-- JSON: {"load_job_id": "...", "merge_job_id": "...", "staging_gcs_uri": "...", "staged_count": 2841, "target_count": 2841}

5. Failure-Mode Matrix

5.1 Failure Points & Error Contracts

Failure PointStatus TransitionError Contract FieldsHTTP Status
GCS Write FailureUPLOADEDUPLOADED (no change)error="GCSWriteFailed", reason="upload_failed", trace_id, batch_id, gcs_uri500
Load Job Creation FailureUPLOADEDUPLOADED (no change)error="LoadJobCreationFailed", reason="bigquery_unavailable", trace_id, batch_id, gcs_uri503
Load Job Poll TimeoutUPLOADEDMAPPING_IN_PROGRESS (stuck)error="LoadJobTimeout", reason="job_timeout", trace_id, batch_id, load_job_id504
Load Job FailedMAPPING_IN_PROGRESSUPLOADED (rollback)error="LoadJobFailed", reason="load_job_error", trace_id, batch_id, load_job_id, error_details500
Staged Count MismatchMAPPING_IN_PROGRESSUPLOADED (rollback)error="StagedCountMismatch", reason="count_verification_failed", trace_id, batch_id, expected_count, staged_count500
MERGE FailureMAPPING_IN_PROGRESSUPLOADED (rollback)error="MergeFailed", reason="bigquery_query_failed", trace_id, batch_id, merge_job_id503
Target Count MismatchMAPPING_IN_PROGRESSUPLOADED (rollback)error="TargetCountMismatch", reason="count_verification_failed", trace_id, batch_id, expected_count, target_count, staged_count500
Finalize Status FailureMAPPING_IN_PROGRESSMAPPING_IN_PROGRESS (stuck)error="StatusUpdateFailed", reason="bigquery_query_failed", trace_id, batch_id500
Cleanup FailureMAPPEDMAPPED (no change, best-effort)Logged only (non-blocking)N/A
Structured Error Contract (all failures):
{
  "error": "<StableErrorCode>",
  "reason": "<safe_message>",
  "trace_id": "<trace_id>",
  "tenant_id": "<tenant_id>",
  "batch_id": "<batch_id>",
  "period_label": "<period_label>",
  "route": "/api/v1/intake/{batch_id}/map",
  "load_job_id": "<optional>",
  "merge_job_id": "<optional>",
  "expected_count": "<optional>",
  "staged_count": "<optional>",
  "target_count": "<optional>"
}

6. Cleanup Safety

6.1 TTL vs Delete-by-Batch

Option A: TTL (Time-Based)
  • Pattern: GCS lifecycle policy (delete objects older than 7 days)
  • Pros: Automatic, no code complexity
  • Cons: Cannot delete immediately after success, may retain failed staging data
Option B: Delete-by-Batch (Explicit)
  • Pattern: After MERGE succeeds + count verification, delete gs://bucket/intake-staging/{tenant_id}/{batch_id}/*
  • Pros: Immediate cleanup, only deletes successful batches
  • Cons: Requires explicit cleanup code, must handle failures gracefully
Recommendation: Hybrid Approach
  • Immediate: Delete staging NDJSON after MERGE succeeds (best-effort, non-blocking)
  • Fallback: GCS lifecycle policy (delete objects older than 7 days) for orphaned staging files

6.2 Cleanup Isolation (Cannot Delete Other Batches)

GCS Path Pattern:
gs://{bucket}/intake-staging/{tenant_id}/{batch_id}/transaction_events_raw.ndjson
Delete Operation:
staging_path = f"intake-staging/{tenant_id}/{batch_id}/transaction_events_raw.ndjson"
blob = bucket.blob(staging_path)
blob.delete()  # Only deletes this specific batch_id's file
Safety:
  • Tenant isolation: Path includes tenant_id (from JWT, not user input)
  • Batch isolation: Path includes batch_id (from route parameter, validated UUID)
  • No wildcards: Delete specific blob, not prefix-based delete
  • Best-effort: Wrap in try/except, log but don’t fail if delete fails
Red Flag: ⚠️ Ensure batch_id is validated as UUID (already done in route: batch_id: uuid.UUID)

7. Red Flags Identified

🔴 Critical (Must Fix Before Implementation)

  1. No MAPPING_IN_PROGRESS Lock: Race condition possible if user clicks “Map” twice rapidly
    • Fix: Add atomic status transition check
  2. mapping_hash Not Stored: Idempotency relies on computing hash on-the-fly
    • Fix: Store mapping_hash in ingestion_batches during update_batch_mapping()
  3. updated_at/updated_by Referenced But Don’t Exist: MERGE statement references non-existent columns
    • Fix: Remove from staging MERGE statement (safe, but cleanup needed)

🟡 Medium (Should Fix)

  1. No Async Threshold: All requests are sync, even for very large files
    • Fix: Add async fallback for >10k rows
  2. No Cleanup Strategy: Staging NDJSON files may accumulate
    • Fix: Implement hybrid cleanup (immediate + TTL)

🟢 Low (Nice to Have)

  1. No Metadata Field: Cannot store load_job_id, merge_job_id, staging_gcs_uri
    • Fix: Add metadata JSON field to ingestion_batches (optional)

8. Implementation Readiness

✅ Ready to Proceed

  • Schema confirmed (error_codes, warnings are REPEATED STRING)
  • Load job supports NDJSON + explicit schema + WRITE_APPEND
  • Timing estimate: 26-60s for 2,841 rows (well under 300s)
  • Idempotency works even with stale status
  • Audit semantics preserved (created_at/created_by immutable)

⚠️ Must Address Before Implementation

  1. Add MAPPING_IN_PROGRESS status + atomic transition
  2. Store mapping_hash in ingestion_batches
  3. Remove updated_at/updated_by from staging MERGE (they don’t exist)
  4. Implement structured error contracts for all failure points
  5. Add cleanup logic (immediate delete + TTL fallback)

End of Phase 0B Red-Flag & Alignment Scan