Skip to main content

Repo B + Legacy Pipeline Execution Flow Audit Plan

Goal

Confirm Repo B’s real execution flow AND legacy commission engine execution flow for Champion Health to ensure AI Studio audit precision.

Deliverables


PART A: REPO B EXECUTION FLOW

1. Entry Points & Commands

Primary Entry Point (Current Production):
  • File: repo_b/upload_to_bq.py
  • Function: process_period_from_raw() (line 1483)
  • CLI Command:
    python -m repo_b.upload_to_bq --period 2025-11 --from-raw [--input-file path/to/file.xlsx]
    
  • Orchestrator Class: None (direct function call)
Alternative Entry Points:
  • File: repo_b/run.py
  • Function: main() (line 220) - CLI entry point
  • Function: run_repo_b_engine() (line 69) - Core engine function
  • CLI Command:
    python -m repo_b.run --period 2025-11 --canonical data/canonical/2025-11.json --config configs/repo_b_config.json
    
  • File: repo_b/month_runner.py
  • Function: run_repo_b_months() (line 62) - Multi-month batch processing
  • Usage: Called programmatically, not CLI
Legacy Orchestrator (may still be used):
  • File: orchestrator.py
  • Class: PayrollPipeline (line 32)
  • Method: run() (line 406)
  • Usage: pipeline = PayrollPipeline(); pipeline.run(raw_file_path)

2. Input Folder Paths

Default Input Location:
  • Path: data/input/ (relative to repo root)
  • Discovery Logic: repo_b/stage0/raw_intake.py::discover_raw_file() (line ~200)
  • Pattern: Searches for files matching period label in filename or content
  • Supported Formats: .xlsx, .xls, .csv
Explicit File Override:
  • Can pass --input-file parameter to bypass discovery
  • Used in: repo_b/upload_to_bq.py::process_period_from_raw() (line 1559)
Canonical JSON Input (for Repo B engine directly):
  • Path: data/canonical/{period_label}.json
  • Generated by: repo_b/stage0/raw_intake.py::load_raw_payroll() (line ~300)
  • Used by: repo_b/run.py::main() when --canonical provided

3. Output Artifacts (Paths + Filenames)

Stage 1 Snapshot:
  • Path: data/outputs/repo_b/dashboard_data/{period_label}/dashboard_csvs/Stage1_Raw_{period_label}.csv
  • Generated by: repo_b/reporting/analytics_writer.py::export_stage1_snapshot() (line 574)
  • Schema: Employee-level records with columns: tenant_id, period_label, business_label, member_id, payee_agent_id, payee_agent_name, credit_amount, debit_amount, period_code, pay_periods, posted_date, etc.
Stage 3 Snapshot:
  • Path: data/outputs/repo_b/dashboard_data/{period_label}/dashboard_csvs/Stage3_Full_Detail_{period_label}.csv
  • Generated by: repo_b/reporting/analytics_writer.py::export_analytics_suite() (line 706)
  • Schema: TPA-normalized commission records with columns: business_name, downline_agent_name, agent_total, tpa_applied, absorbed_count, pepm_rate_clean, period_code, pay_periods, etc.
Agent V5 Excel Output:
  • Path: data/exports/repo_b/Repo_B_Payroll_Workbook-MM-DD-YYYY.xlsx (from month_runner.py)
  • Alternative Path: data/outputs/{month}.v5.10.3.{year}_{timestamp}.xlsx (from legacy orchestrator.py)
  • Generated by:
    • repo_b/excel_writer.py::write_repo_b_excel() (called from month_runner.py line 138)
    • Legacy: stage4_outputs/export_excel.py::export_v5_to_excel() (line 77)
  • Sheets: Agent Commissions, Audit View
CEO Excel/JSON Output:
  • Excel Path: data/outputs/repo_b/dashboard_data/{period_label}/Repo_B_Analytics_Suite_{period_label}.xlsx
  • Generated by: repo_b/reporting/analytics_writer.py::export_analytics_suite() (line 836)
  • JSON Path: data/archive/{period_label}/ceo_snapshot.json (legacy orchestrator)
  • Generated by: utils/historical_reporting.py::save_period_snapshot() (line 16)
  • Sheets: Executive_Summary, Business_Rollup, Agent_Rollup, Stage3_Full_Detail, etc.

4. BigQuery Upload Scripts

Primary Upload Script:
  • File: repo_b/upload_to_bq.py
  • Main Function: upload_repo_b_period() (line 1666)
  • Called by: process_period_from_raw() (line 1649)
  • Tables Loaded:
    • payroll_raw.stage1_snapshots (via load_stage1_to_bigquery(), line 469)
    • payroll_processed.stage3_snapshots (via load_stage3_to_bigquery(), line 578)
  • Datasets:
    • Production: payroll_raw, payroll_processed, payroll_analytics
    • Shadow: payroll_raw_shadow, payroll_processed_shadow, payroll_analytics_shadow (when BQ_SHADOW_MODE=1)
Legacy ETL Loader:
  • File: integration/bigquery/scripts/etl_loader.py
  • Class: BigQueryETLLoader (line 22)
  • Methods:
    • load_stage1_snapshot() (line 55)
    • load_stage3_snapshot() (line 101)
    • load_ceo_metrics() (line 146)
  • Status: May still be used for historical backfills
Upload Functions (in repo_b/upload_to_bq.py):
  • load_stage1_to_bigquery() (line 469) - Loads Stage1_Raw CSV
  • load_stage3_to_bigquery() (line 578) - Loads Stage3_Full_Detail CSV
  • Both use resolve_dataset() (line 57) to handle shadow mode

5. Top 10 Math/Transform Logic Files

Cadence Normalization:
  1. File: stage2_normalize/normalize_periods.py
    • Function: normalize_periods() (line 6)
    • Logic: Maps raw period strings (“Weekly”, “Bi-Weekly”) to codes (W, BW, SM, 4M, M)
    • Config: configs/cadence_map.yaml
  2. File: repo_b/stage0/raw_intake.py
    • Function: load_period_mappings() (line 285)
    • Function: normalize_to_canonical() (line 298)
    • Logic: Period code inference from member duplication patterns
Chargeback/Debt Absorption:
  1. File: repo_b/pairing.py
    • Function: pair_chargebacks_deterministic() (line 18)
    • Logic: 1:1 row matching (one debit cancels one credit row, NO amount check)
    • Grouping: ["business_label_normalized", "period_code"] (2-key grouping)
    • Line: 18-180
  2. File: stage3_absorb/absorb_chargebacks.py
    • Function: absorb_chargebacks() (line 6)
    • Logic: Legacy chargeback absorption (1:1 matching, excludes Richard Ballard 668078)
    • Line: 6-76
PEPM Calculation:
  1. File: stage3_absorb/apply_tpa_normalization.py
    • Function: apply_tpa_normalization() (line 58)
    • Formula: (PEPM × 12) / Pay_Periods (line ~100)
    • Logic: Applies PEPM formula to TPA commissions, preserves employee counts
    • Line: 58-464
  2. File: repo_b/output_adapter.py
    • Function: build_repo_b_workbook() (line ~200)
    • Function: _calculate_pepm() (line ~400)
    • Logic: PEPM rate calculation and normalization for Excel output
    • Line: ~200-800
Residual Rollup to Uplines (Ricky/Robin):
  1. File: repo_b/allocation.py
    • Function: allocate_unpaired_debits_enhanced() (line ~100)
    • Function: allocate_with_remainder_policy() (line 26)
    • Logic: Allocates unpaired debits proportionally across agents with deterministic remainder policy
    • Owner IDs: 721995 (Robin), 668078 (Richard/Ricky)
    • Line: 26-260
  2. File: repo_b/output_adapter.py
    • Function: build_repo_b_workbook() (line ~200)
    • Function: _build_owner_residuals() (line ~600)
    • Logic: Calculates owner commission residuals after agent payouts
    • Owner Detection: Checks for agent_id in
    • Line: ~200-800
  3. File: repo_b/reporting/analytics_writer.py
    • Function: build_business_summary() (line ~120)
    • Function: _build_agent_rollup() (line 544)
    • Logic: Aggregates commissions by agent, calculates owner residuals
    • Owner IDs: Hardcoded (line 254, 314)
    • Line: 120-600
Supporting Math Logic:
  1. File: repo_b/canonicalize.py
    • Function: propagate_period_label() (line ~50)
    • Function: split_credits_debits() (line ~100)
    • Logic: Splits canonical records into credits/debits, propagates period labels
    • Line: ~50-200

Evidence Summary

Entry Points:
  • repo_b/upload_to_bq.py::process_period_from_raw() - Primary production entry
  • repo_b/run.py::run_repo_b_engine() - Core engine function
  • orchestrator.py::PayrollPipeline.run() - Legacy orchestrator
Input Paths:
  • Default: data/input/ (discovered by filename/period)
  • Explicit: --input-file parameter
  • Canonical: data/canonical/{period}.json
Output Paths:
  • Stage1: data/outputs/repo_b/dashboard_data/{period}/dashboard_csvs/Stage1_Raw_{period}.csv
  • Stage3: data/outputs/repo_b/dashboard_data/{period}/dashboard_csvs/Stage3_Full_Detail_{period}.csv
  • V5 Excel: data/exports/repo_b/Repo_B_Payroll_Workbook-MM-DD-YYYY.xlsx
  • CEO Excel: data/outputs/repo_b/dashboard_data/{period}/Repo_B_Analytics_Suite_{period}.xlsx
BigQuery Upload:
  • Script: repo_b/upload_to_bq.py::upload_repo_b_period()
  • Tables: payroll_raw.stage1_snapshots, payroll_processed.stage3_snapshots
  • Datasets: Production or shadow (via resolve_dataset())
Math Logic Files (Top 10):
  1. stage2_normalize/normalize_periods.py::normalize_periods() - Cadence mapping
  2. repo_b/stage0/raw_intake.py::normalize_to_canonical() - Period inference
  3. repo_b/pairing.py::pair_chargebacks_deterministic() - Chargeback pairing (line 18)
  4. stage3_absorb/absorb_chargebacks.py::absorb_chargebacks() - Legacy absorption (line 6)
  5. stage3_absorb/apply_tpa_normalization.py::apply_tpa_normalization() - PEPM formula (line 58)
  6. repo_b/output_adapter.py::build_repo_b_workbook() - PEPM calculation (~line 200)
  7. repo_b/allocation.py::allocate_unpaired_debits_enhanced() - Debit allocation (~line 100)
  8. repo_b/output_adapter.py::_build_owner_residuals() - Owner residuals (~line 600)
  9. repo_b/reporting/analytics_writer.py::build_business_summary() - Agent rollup (line 120)
  10. repo_b/canonicalize.py::split_credits_debits() - Credit/debit split (~line 100)

PART B: LEGACY COMMISSION ENGINE EXECUTION FLOW

1. Legacy Entry Points & Commands

Primary Entry Point:
  • File: orchestrator.py
  • Class: PayrollPipeline (line 32)
  • Method: run() (line 406)
  • CLI Command:
    python orchestrator.py "data/input/January 2025 Combined Payroll.xlsx"
    python orchestrator.py "data/input/January 2025 Combined Payroll.xlsx" --interactive
    
  • Main Function: main() (line 1103) - CLI entry point
Batch Processing Entry Points:
  • File: batch_processor.py
  • Function: batch_process() (line 135)
  • Function: process_payroll_file() (line 70)
  • CLI Command: python batch_processor.py
  • Wrapper Scripts:
    • run_payroll.ps1 (PowerShell wrapper, line 29 calls batch_processor.py)
    • run_payroll.bat (Batch wrapper, line 10 calls batch_processor.py)
Makefile Commands (if any):
  • File: Makefile - No payroll-specific commands (only API/dashboard dev commands)

2. Legacy Input Folder Paths

Default Input Location:
  • Path: data/input/ (relative to repo root)
  • Pattern: Searches for .xlsx, .xls, .csv files
  • Used by:
    • orchestrator.py::run() (line 433) - load_raw_payroll() method
    • batch_processor.py::batch_process() (line 139) - Scans data/input/ directory
File Discovery:
  • Method: PayrollPipeline.load_raw_payroll() (line ~200 in orchestrator.py)
  • Logic: Direct file path passed as argument, no discovery logic
  • Supported Formats: .xlsx, .xls, .csv

3. Legacy Stage Outputs (Paths + Filenames)

Stage 1 Raw Output:
  • Path: data/archive/{period_label}/stage1_snapshot.csv
  • Generated by: utils/historical_reporting.py::save_period_snapshot() (line 16)
  • Called from: orchestrator.py::_save_period_snapshot() (line 1079)
  • Schema: Columns: business_label, member_id, credit, debit, total, period_label, trunc2_applied
  • Source Data: stage1_raw DataFrame (Robin + Richard records, line 459)
Stage 3 Normalized Output:
  • Full Snapshot Path: data/archive/{period_label}/stage3_snapshot_full.csv
  • Filtered Snapshot Path: data/archive/{period_label}/stage3_snapshot.csv (TPA agents only, backwards compatibility)
  • Generated by: utils/historical_reporting.py::save_period_snapshot() (line 63, 78)
  • Schema (Full): All Stage 3 columns including TPA, MIXED_OWNER, pure owner commissions
  • Schema (Filtered): business_label, agent_name, tpa_applied, agent_total, employee_count, period_label, trunc2_applied
Agent V5 Excel Output:
  • Path: data/outputs/{month_name}.v5.10.3.{year}_{timestamp}.xlsx
  • Generated by: stage4_outputs/export_excel.py::export_v5_to_excel() (line 77)
  • Called from: orchestrator.py::run() (line 686)
  • Sheets:
    • Sheet 1: “Agent Commissions” (clean format for payroll)
    • Sheet 2: “Audit View” (detailed breakdown with subtotals)
  • Filename Example: january.v5.10.3.2025_20250115_143022.xlsx
Executive/CEO Excel Output:
  • Path: data/outputs/{month_name}.executive.v10.3.{year}_{timestamp}.xlsx
  • Enhanced Path: data/outputs/executive_report_with_growth_loss_{timestamp}_enhanced_ceo.xlsx
  • Generated by: stage4_outputs/export_excel.py::export_executive_excel_enhanced_ceo() (line 577)
  • Called from: orchestrator.py::run() (line 721)
  • Sheets: CEO Snapshot, Business Summary, V5 Agent Commissions, Agent2 PEPM, Audit Checks, Growth/Loss Analysis, Agent2 Audit
CEO JSON Snapshot:
  • Path: data/archive/{period_label}/ceo_snapshot.json
  • Generated by: utils/historical_reporting.py::save_period_snapshot() (line 81)
  • Schema: JSON with period_label, gross_payout, chargebacks, agent_payout_net, business_owner_commission, employee_count, business_count, agent_count, trunc2_applied, created_at

4. Legacy BigQuery Upload/ETL

Primary ETL Loader:
  • File: integration/bigquery/scripts/etl_loader.py
  • Class: BigQueryETLLoader (line 22)
  • Methods:
    • load_stage1_snapshot() (line 55) - Loads Stage 1 CSV to BigQuery
    • load_stage3_snapshot() (line 101) - Loads Stage 3 CSV to BigQuery
    • load_ceo_metrics() (line 146) - Loads CEO metrics to BigQuery
  • Called from: orchestrator.py::run() (line 768) - Only if bigquery.enabled=True in tenant config
  • Tables Loaded:
    • payroll-bi-gauntlet.payroll_raw.stage1_snapshots (via load_stage1_snapshot())
    • payroll-bi-gauntlet.payroll_processed.stage3_snapshots (via load_stage3_snapshot())
    • payroll-bi-gauntlet.payroll_analytics.ceo_metrics (via load_ceo_metrics())
Backfill Script:
  • File: backfill_monthly_data.py
  • Functions:
    • load_stage1_data() (line 57) - Loads Stage 1 from archive to BigQuery
    • load_stage3_data() (line 147) - Loads Stage 3 from archive to BigQuery
  • CLI Command:
    python backfill_monthly_data.py --year 2025 --month 9
    
  • Source Paths: data/archive/{year}-{month:02d}/stage1_snapshot.csv, data/archive/{year}-{month:02d}/stage3_snapshot.csv
Other Upload Scripts:
  • File: scripts/load_stage3_complete_to_bigquery.py - Loads full Stage 3 snapshots
  • File: scripts/regenerate_stage3_complete.py - Regenerates and uploads Stage 3 data

5. Top 15 Legacy Math/Transform Logic Files

Cadence Normalization:
  1. File: stage2_normalize/normalize_periods.py
    • Function: normalize_periods() (line 6)
    • Logic: Maps raw period strings to codes (W, BW, SM, 4M, M) using configs/cadence_map.yaml
    • Line: 6-56
  2. File: orchestrator.py
    • Function: run() - Mixed cadence period assignment (line 520-560)
    • Logic: Assigns period_code based on employee frequency pattern (Member ID duplication count)
    • Line: 520-560
TPA Detection & PEPM Calculation:
  1. File: stage2_normalize/detect_tpa.py
    • Function: detect_tpa_commission() (line 33)
    • Function: load_agent2_pepm() (line 8)
    • Logic: TPA detection via business name matching (exact + fuzzy ≥95%), Mixed Commission Allocation Rule
    • Owner Residual Logic: Calculates owner_residual_per_employee = credit_per_employee - total_tpa_pepm (line 194)
    • Line: 8-342
  2. File: stage3_absorb/apply_tpa_normalization.py
    • Function: apply_tpa_normalization() (line 58)
    • Function: round2() (line 19) - Decimal-safe rounding to 2 decimals
    • Formula: (PEPM × 12) / Pay_Periods (line 227, 258)
    • Logic: Applies TPA formula, handles mixed cadence, preserves employee counts
    • Line: 19-464
  3. File: stage3_absorb/apply_tpa_normalization.py
    • Function: collapse_to_monthly() (line 300)
    • Logic: Collapses records to monthly aggregates, preserves TPA and owner rows
    • Line: 300-464
Chargeback/Debt Absorption:
  1. File: stage3_absorb/absorb_chargebacks.py
    • Function: absorb_chargebacks() (line 6)
    • Logic: 1:1 row matching (one debit cancels one credit row, NO amount check)
    • Excludes: Richard Ballard (668078) from absorption
    • Returns: business_absorbed_counts dict mapping (business, cadence) -> count
    • Line: 6-76
Multi-Agent Attribution & Residual Rollup:
  1. File: stage2_normalize/detect_tpa.py
    • Function: detect_tpa_commission() - Mixed Commission Allocation (line 184-290)
    • Logic:
      • Calculates credit_per_employee = net_commission / absorbed_count (line 188)
      • If total_tpa_pepm < credit_per_employee: Creates MIXED commission type
      • Owner residual: owner_residual_per_employee = credit_per_employee - total_tpa_pepm (line 194)
      • Creates owner residual slice for Robin Bundy (line 250-290)
    • Line: 184-290
  2. File: stage4_outputs/build_v5_agent_scope.py
    • Function: build_v5_agent_commissions() (line 6)
    • Function: build_audit_view() (line 138)
    • Logic: Builds V5 agent-by-agent commission sheet, filters to TPA commissions only (excludes MIXED_OWNER)
    • Joins Stage 1 employee counts: Uses stage1_anchors for accurate employee counts (line 31-73)
    • Line: 6-257
  3. File: stage4_outputs/export_excel.py
    • Function: export_executive_excel_enhanced_ceo() (line 577)
    • Function: build_ceo_snapshot() (line ~470, called from export function)
    • Logic: Builds CEO snapshot with Stage 1 metrics, commission allocation split (TPA vs Owner)
    • Owner IDs: Hardcoded 721995 (Robin), 668078 (Richard) - line 692 in orchestrator.py
    • Line: 448-655
  4. File: orchestrator.py
    • Function: run() - Stage 1 anchor separation (line 447-459)
    • Logic: Separates Robin (721995) and Richard (668078) records, uses both for Stage 1 metrics
    • Richard Exclusion: Richard excluded from commission allocation after Stage 1 (line 455)
    • Line: 447-459
Identity Resolution:
  1. File: stage2_normalize/resolve_identities.py
    • Function: resolve_agent_identity() (line 7)
    • Function: load_agent_hierarchy() (line 55)
    • Function: enrich_with_hierarchy() (line 63)
    • Logic: Maps Payee Agent ID to canonical agent_id/agent_name, enriches with hierarchy
    • Owner Mapping: 721995 → Robin Bundy, 668078 → Richard Ballard (from config)
    • Line: 7-82
Supporting Math Logic:
  1. File: orchestrator.py
    • Function: stage3_normalize_and_collapse() (line ~600)
    • Logic: Orchestrates Stage 3 TPA normalization and cadence collapse
    • Line: ~600-650
  2. File: stage4_outputs/build_v5_agent_scope.py
    • Function: validate_v5_output() (line 201)
    • Logic: Validates V5 output structure, checks totals match between Agent Commissions and Audit View
    • Line: 201-257
  3. File: utils/historical_reporting.py
    • Function: save_period_snapshot() (line 16)
    • Logic: Saves Stage 1, Stage 3, and CEO snapshots to data/archive/{period}/
    • Decimal Formatting: Uses format_decimal_columns_for_export() to prevent float conversion (line 56)
    • Line: 16-100
  4. File: orchestrator.py
    • Function: validate_business_loss() (line ~700)
    • Function: validate_negative_totals_anomalies() (line ~705)
    • Function: validate_commission_ratios() (line ~708)
    • Logic: Validation guardrails for business loss detection, negative totals, commission ratios
    • Line: ~700-710

Evidence Summary (Legacy Pipeline)

Entry Points:
  • orchestrator.py::PayrollPipeline.run() - Primary legacy entry
  • batch_processor.py::batch_process() - Batch processing wrapper
  • run_payroll.ps1 / run_payroll.bat - Shell wrappers
Input Paths:
  • Default: data/input/ (scanned for .xlsx, .xls, .csv files)
  • Explicit: File path passed as argument to orchestrator.py
Output Paths:
  • Stage1: data/archive/{period}/stage1_snapshot.csv
  • Stage3 Full: data/archive/{period}/stage3_snapshot_full.csv
  • Stage3 Filtered: data/archive/{period}/stage3_snapshot.csv
  • V5 Excel: data/outputs/{month}.v5.10.3.{year}_{timestamp}.xlsx
  • CEO Excel: data/outputs/{month}.executive.v10.3.{year}_{timestamp}.xlsx
  • CEO JSON: data/archive/{period}/ceo_snapshot.json
BigQuery Upload:
  • Script: integration/bigquery/scripts/etl_loader.py::BigQueryETLLoader
  • Backfill: backfill_monthly_data.py::load_stage1_data(), load_stage3_data()
  • Tables: payroll_raw.stage1_snapshots, payroll_processed.stage3_snapshots, payroll_analytics.ceo_metrics
Math Logic Files (Top 15 Legacy):
  1. stage2_normalize/normalize_periods.py::normalize_periods() - Cadence mapping (line 6)
  2. orchestrator.py::run() - Mixed cadence period assignment (line 520)
  3. stage2_normalize/detect_tpa.py::detect_tpa_commission() - TPA detection + Mixed Commission Rule (line 33)
  4. stage3_absorb/apply_tpa_normalization.py::apply_tpa_normalization() - PEPM formula (line 58)
  5. stage3_absorb/apply_tpa_normalization.py::collapse_to_monthly() - Cadence collapse (line 300)
  6. stage3_absorb/absorb_chargebacks.py::absorb_chargebacks() - Chargeback absorption (line 6)
  7. stage2_normalize/detect_tpa.py::detect_tpa_commission() - Owner residual calculation (line 194)
  8. stage4_outputs/build_v5_agent_scope.py::build_v5_agent_commissions() - V5 agent scope (line 6)
  9. stage4_outputs/export_excel.py::export_executive_excel_enhanced_ceo() - CEO export (line 577)
  10. orchestrator.py::run() - Stage 1 anchor separation (line 447)
  11. stage2_normalize/resolve_identities.py::resolve_agent_identity() - Identity resolution (line 7)
  12. orchestrator.py::stage3_normalize_and_collapse() - Stage 3 orchestration (~line 600)
  13. stage4_outputs/build_v5_agent_scope.py::validate_v5_output() - V5 validation (line 201)
  14. utils/historical_reporting.py::save_period_snapshot() - Snapshot persistence (line 16)
  15. orchestrator.py::validate_business_loss() - Validation guardrails (~line 700)