"""Generate the synthetic datasets used across the companion notebooks.

Idempotent. Safe to re-run. Writes to ../data/ relative to this script.

Datasets implemented:
    - saas_metrics.csv          (B1, B3, C2)
    - qbr_q3_2025.md            (B2, B4, B5)
    - qbr_q3_2025.pdf           (B2)
    - dashboard_screenshot.png  (B2)
    - runbook_warehouse_cost.md (B3, B4, B5)
    - runbook_data_quality.md   (B4, B5)
    - warehouse_usage.csv       (C1, C2, D1, D2)
    - pipeline_jobs.csv         (C2, D1, D2)

All values are synthetic. No real client data is included.
"""

from __future__ import annotations

import csv
import random
from pathlib import Path

# Deterministic across runs so notebook outputs are stable.
SEED = 42

DATA_DIR = Path(__file__).resolve().parent.parent / "data"


def gen_saas_metrics(out_path: Path) -> None:
    """Write a 12-month x 3-segment SaaS metrics table.

    Columns:
        month         ISO month, e.g. 2025-01
        segment       SMB | Mid-Market | Enterprise
        mrr_start     MRR at the start of the month, USD
        new_mrr       MRR from net-new logos this month
        expansion_mrr Upgrade/expansion MRR from existing customers
        churn_mrr     MRR lost from cancellations (positive number)
        mrr_end       MRR at end of month = start + new + expansion - churn
    """
    rng = random.Random(SEED)

    # Starting MRR by segment (USD, monthly recurring).
    seed_mrr = {
        "SMB": 180_000,
        "Mid-Market": 450_000,
        "Enterprise": 920_000,
    }

    # Growth profile by segment. Tuples are (new_mrr_pct_range, expansion_pct_range, churn_pct_range)
    # all expressed as fractions of mrr_start.
    profile = {
        "SMB":         ((0.05, 0.09), (0.01, 0.02), (0.025, 0.045)),  # high growth, high churn
        "Mid-Market":  ((0.03, 0.06), (0.02, 0.04), (0.010, 0.020)),
        "Enterprise":  ((0.015, 0.035), (0.03, 0.06), (0.003, 0.010)), # low growth, low churn, strong expansion
    }

    rows: list[dict] = []
    for segment, mrr_start_seed in seed_mrr.items():
        mrr_start = float(mrr_start_seed)
        new_pct_lo, new_pct_hi  = profile[segment][0]
        exp_pct_lo, exp_pct_hi  = profile[segment][1]
        chr_pct_lo, chr_pct_hi  = profile[segment][2]

        for month_num in range(1, 13):
            new_mrr       = round(mrr_start * rng.uniform(new_pct_lo, new_pct_hi), 2)
            expansion_mrr = round(mrr_start * rng.uniform(exp_pct_lo, exp_pct_hi), 2)
            churn_mrr     = round(mrr_start * rng.uniform(chr_pct_lo, chr_pct_hi), 2)
            mrr_end       = round(mrr_start + new_mrr + expansion_mrr - churn_mrr, 2)

            rows.append({
                "month":         f"2025-{month_num:02d}",
                "segment":       segment,
                "mrr_start":     round(mrr_start, 2),
                "new_mrr":       new_mrr,
                "expansion_mrr": expansion_mrr,
                "churn_mrr":     churn_mrr,
                "mrr_end":       mrr_end,
            })
            mrr_start = mrr_end

    # Sort by month then segment for human readability
    rows.sort(key=lambda r: (r["month"], r["segment"]))

    with open(out_path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
        writer.writeheader()
        writer.writerows(rows)

    print(f"wrote {len(rows)} rows -> {out_path}")


QBR_MARKDOWN = """# Q3 2025 Quarterly Business Review
## Acme SaaS Co. -- Revenue Operations

*Reporting period: 2025-07-01 to 2025-09-30.*

## Executive summary

- ARR ended Q3 at USD 34.8M, +18% year over year. Growth was led by Enterprise expansion (+24% YoY) and a steady contribution from Mid-Market new logo bookings.
- Net dollar retention came in at 117% across the full customer base, up from 113% in Q2. Enterprise NRR was 124%, a record for the segment.
- New logo bookings declined 8% versus Q2. The SMB segment is the main weak point and the priority focus for Q4.
- Gross retention held at 92% overall, with Enterprise gross retention reaching 98% (also a record).

## Segment performance

### SMB
Net new MRR grew 9% sequentially but customer churn climbed to 4.2% (vs. 3.8% in Q2). Pipeline coverage is the primary concern: SMB needs 2.5x coverage by mid-November to hit the Q4 target. Sales-led motion is being augmented with a self-serve trial flow that launched late in the quarter.

### Mid-Market
Expansion-led growth, with NRR of 121%. Three significant cross-sells closed in the quarter, each tied to the new Customer Insights dashboard rollout. New logo motion is steady; current ACV trending at USD 48k.

### Enterprise
The standout segment this quarter. NRR of 124% and gross retention of 98% are both segment records. Two seven-figure expansions closed late in the quarter, both driven by the data platform consolidation use case. EMEA pipeline remains thin -- two new AE hires planned for Q4.

## Operational highlights

- Migrated the production analytics workload from Tableau Server 2024.3 to Tableau Cloud. Performance improved; total cost of ownership dropped roughly 15%.
- Replaced three brittle ETL pipelines with dbt Cloud running on Snowflake. Mean run time fell from 47 minutes to 12 minutes; SLA breach rate fell from 6% to under 1%.
- Launched the Customer Insights dashboard internally. Weekly active usage among Customer Success Managers reached 60% by end of quarter, up from a pilot baseline of 18%.
- Completed SOC 2 Type II audit with no significant findings.

## Q4 priorities

1. **SMB pipeline coverage.** Target 2.5x coverage by mid-November via the new self-serve trial + outbound BDR augmentation.
2. **EMEA Enterprise hiring.** Two AE hires plus a regional SE. Time-to-productive target: under 90 days.
3. **Data platform completion.** Migrate the remaining Mid-Market dashboards from Tableau Server to Tableau Cloud. Decommission the on-prem Server cluster by end of December.
4. **AI assistant rollout.** Pilot the internal Customer Insights AI assistant with the top ten enterprise accounts. Measure CSM time-to-answer reduction.

## Risks and watch items

- Macroeconomic softness in SMB segment continues to compress deal sizes (median ACV down 11% YoY).
- Data platform migration carries non-trivial cutover risk for three Mid-Market customers whose Tuesday morning dashboards are business-critical.
- Talent: one Enterprise AE departed at end of Q3; backfill in progress.
"""


def gen_qbr_markdown(out_path: Path) -> None:
    out_path.write_text(QBR_MARKDOWN)
    print(f"wrote QBR markdown -> {out_path}")


def gen_qbr_pdf(md_path: Path, pdf_path: Path) -> None:
    """Render the QBR markdown into a multi-page PDF using reportlab.

    Kept deliberately simple: a single column, default fonts, a few heading sizes.
    The goal is a realistic-looking business report Claude can read, not a design showcase.
    """
    from reportlab.lib.pagesizes import LETTER
    from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
    from reportlab.lib.units import inch
    from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer

    styles = getSampleStyleSheet()
    h1 = styles["Heading1"]; h1.fontSize = 18
    h2 = styles["Heading2"]; h2.fontSize = 14
    h3 = styles["Heading3"]; h3.fontSize = 12
    body = ParagraphStyle("body", parent=styles["BodyText"], fontSize=10, leading=14)
    bullet = ParagraphStyle("bullet", parent=body, leftIndent=18, bulletIndent=6)

    md_text = md_path.read_text()
    story: list = []

    for raw_line in md_text.splitlines():
        line = raw_line.rstrip()
        if not line:
            story.append(Spacer(1, 6))
        elif line.startswith("# "):
            story.append(Paragraph(line[2:], h1))
            story.append(Spacer(1, 6))
        elif line.startswith("## "):
            story.append(Paragraph(line[3:], h2))
            story.append(Spacer(1, 4))
        elif line.startswith("### "):
            story.append(Paragraph(line[4:], h3))
        elif line.startswith("- "):
            story.append(Paragraph("&bull;&nbsp;&nbsp;" + line[2:], bullet))
        elif line.startswith("*") and line.endswith("*") and len(line) > 2:
            story.append(Paragraph("<i>" + line[1:-1] + "</i>", body))
        else:
            story.append(Paragraph(line, body))

    doc = SimpleDocTemplate(
        str(pdf_path),
        pagesize=LETTER,
        topMargin=0.8 * inch, bottomMargin=0.8 * inch,
        leftMargin=0.9 * inch, rightMargin=0.9 * inch,
    )
    doc.build(story)
    print(f"wrote QBR PDF     -> {pdf_path}")


def gen_dashboard_png(csv_path: Path, png_path: Path) -> None:
    """Render a simple BI-style chart from saas_metrics.csv.

    Two-panel figure: MRR by segment over time (line), and net new MRR by
    segment for the latest month (bar). Looks roughly like something a CSM
    or RevOps lead would screenshot from a real dashboard.
    """
    import csv as _csv
    import matplotlib
    matplotlib.use("Agg")
    import matplotlib.pyplot as plt

    rows: list[dict] = []
    with open(csv_path) as f:
        for r in _csv.DictReader(f):
            rows.append(r)

    months = sorted({r["month"] for r in rows})
    segments = ["SMB", "Mid-Market", "Enterprise"]

    # Series for left panel
    series = {s: [] for s in segments}
    for m in months:
        for s in segments:
            mrr = next(float(r["mrr_end"]) for r in rows if r["month"] == m and r["segment"] == s)
            series[s].append(mrr / 1_000_000)  # convert to USD millions

    # Latest-month net-new MRR for right panel
    latest = months[-1]
    net_new = []
    for s in segments:
        r = next(r for r in rows if r["month"] == latest and r["segment"] == s)
        net_new.append((float(r["new_mrr"]) + float(r["expansion_mrr"]) - float(r["churn_mrr"])) / 1000)

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4.5))

    for s in segments:
        ax1.plot(months, series[s], marker="o", label=s)
    ax1.set_title("MRR by segment, 2025 (USD millions)")
    ax1.set_xlabel("Month")
    ax1.set_ylabel("MRR (USD M)")
    ax1.tick_params(axis="x", rotation=45)
    ax1.grid(True, alpha=0.3)
    ax1.legend()

    bars = ax2.bar(segments, net_new, color=["#3b82f6", "#10b981", "#f59e0b"])
    ax2.set_title(f"Net new MRR, {latest} (USD thousands)")
    ax2.set_ylabel("Net new MRR (USD K)")
    ax2.grid(True, alpha=0.3, axis="y")
    for bar, val in zip(bars, net_new):
        ax2.text(bar.get_x() + bar.get_width() / 2, bar.get_height(),
                 f"{val:,.0f}", ha="center", va="bottom", fontsize=9)

    fig.suptitle("Acme SaaS Co. -- 2025 Revenue Performance", fontsize=14, fontweight="bold")
    fig.tight_layout()
    fig.savefig(png_path, dpi=120, bbox_inches="tight")
    plt.close(fig)
    print(f"wrote dashboard  -> {png_path}")


WAREHOUSE_RUNBOOK = """# Snowflake Warehouse Cost Runbook

*Owner: Data Platform Team. Last reviewed: 2025-09-15.*

This runbook is the single source of truth for diagnosing, attributing, and reducing Snowflake warehouse spend at Acme SaaS Co. Anyone on the data team -- engineering, analytics engineering, or analytics -- should be able to follow it end to end to triage a cost spike or run a regular optimization pass.

The runbook assumes familiarity with the Snowflake credit model, the `SNOWFLAKE.ACCOUNT_USAGE` schema, and basic SQL. It does **not** assume familiarity with our specific warehouse layout -- that is documented in Section 2.

---

## 1. Cost categories

Snowflake compute spend at Acme breaks down into five categories. In FY2025 the approximate split was:

| Category                          | Share | Notes |
|---|---|---|
| Scheduled ETL (dbt + Airflow)     | 48%   | Predictable load. Largest single category. |
| Ad-hoc analytics (Tableau, ad-hoc SQL) | 22% | Variable, peaks during quarter-close. |
| Reverse ETL (Hightouch, Census)   | 11%   | Mostly steady; spikes when audiences re-sync. |
| Embedded customer-facing dashboards | 10% | Latency-sensitive; cannot run on smaller warehouses. |
| Data Science / ML training        | 9%    | Mostly Snowpark. Bursty. |

The remaining ~0% covers internal admin queries, replication, and snowpipe ingest -- not zero, but small enough to live in the operational rounding.

---

## 2. Warehouse layout

Acme operates seven Snowflake warehouses, separated by workload class and SLA:

- `WH_ELT_M`         (Medium, auto-suspend 60s) -- bulk of dbt models, scheduled
- `WH_ELT_L`         (Large, auto-suspend 60s) -- large or fact-table dbt models
- `WH_AD_HOC_S`      (Small, auto-suspend 30s) -- analyst ad-hoc, capped daily credit budget
- `WH_BI_M`          (Medium, multi-cluster 1-3, auto-suspend 60s) -- Tableau extracts and dashboards
- `WH_REVERSE_S`     (Small, auto-suspend 60s) -- Hightouch and Census
- `WH_EMBEDDED_M`    (Medium, multi-cluster 1-6, auto-suspend 60s) -- customer dashboards (latency SLA 3s)
- `WH_DS_L`          (Large, auto-suspend 300s) -- ML training, Snowpark

Each warehouse has an owning team. Cost attribution flows through warehouse name -- there is **no** shared warehouse used by multiple teams. This decision predates the current platform team and is the single most useful piece of cost hygiene we have. Do not break it.

---

## 3. Common drivers of cost spikes

When monthly warehouse spend exceeds the rolling 90-day average by more than 15%, treat it as a spike and follow the diagnosis playbook in Section 4. The most common root causes, in rough order of frequency:

### 3.1 A new dbt model with an unintended full table scan

By far the most common cause. A new model joins against a fact table without a proper filter, or against `INFORMATION_SCHEMA` in a way that scans every micro-partition. Symptom: a single query in `QUERY_HISTORY` consuming hundreds of credits with no clear justification.

### 3.2 An accidentally-disabled materialization

A model whose materialization was incrementally implemented but is now running as a full `table` because someone removed the `unique_key` config during refactoring. Symptom: gradual but persistent climb in `WH_ELT_M` credit consumption over multiple days, without any specific spike.

### 3.3 Tableau extract proliferation

A new BI consumer creates one extract for every dashboard rather than reusing a shared extract. Symptom: `WH_BI_M` credits climb in step with new dashboard counts; the worst offender shows in `QUERY_HISTORY` as repeated identical SELECTs filtered by `extract_id`.

### 3.4 A misconfigured embedded dashboard

A customer-facing dashboard is missing its row-level security predicate and falls back to scanning the full multi-tenant table. Symptom: `WH_EMBEDDED_M` credits spike; the same query (parameterized) appears hundreds of times in a short window. **Treat this as a P1.** It is both a cost issue and a security issue.

### 3.5 An ML training job that wasn't supposed to run

Symptom: `WH_DS_L` credits spike outside of business hours, often associated with a single notebook or Snowpark session. Usually the result of an iteration loop someone forgot to stop.

### 3.6 Reverse ETL audience explosion

A marketing operator adds a new audience to Hightouch that is much larger than expected (e.g., "all users" instead of "active paid users"). Symptom: `WH_REVERSE_S` credits spike on the day a new sync goes live.

---

## 4. Diagnosis playbook

When a spike is detected, follow these steps in order. Time-box each step to 15 minutes -- if the answer is not obvious, escalate per Section 7 rather than dig deeper alone.

### Step 1: identify which warehouse

Run the standard cost-by-warehouse query (in Section 8) over the past 7 days. Compare against the trailing 90-day baseline. The warehouse with the largest absolute increase is your starting point; secondary warehouses can wait.

### Step 2: identify which time window

Within the offending warehouse, narrow the time window to the hour or day the spike began. Look at the `QUERY_HISTORY` aggregate by hour. A sudden cliff indicates a new query pattern was introduced; a gradual ramp indicates an existing query that grew expensive over time.

### Step 3: identify which query pattern

Group `QUERY_HISTORY` by `QUERY_PARAMETERIZED_HASH` within the offending window. The top 3-5 hashes by total credits consumed are your candidates. Pull the parameterized text of each and examine it for the patterns in Section 3.

### Step 4: identify which actor

For each candidate query pattern, the `USER_NAME` and (if applicable) the `dbt run_id` or Tableau `external_token` identifies the team responsible. Route the issue to the owning team via Slack with the parameterized query, the time window, and the estimated credit impact.

### Step 5: implement the fix

The owning team owns the fix. The platform team owns following up to confirm spend returned to the baseline within 72 hours.

---

## 5. Optimization patterns

Independent of incidents, the following patterns reduce warehouse spend by 10-30% in typical Snowflake accounts and should be applied opportunistically.

### 5.1 Auto-suspend tuning

The default 60-second auto-suspend is too generous for most workloads. Lower to 30 seconds for analyst warehouses (`WH_AD_HOC_S`); keep at 60 seconds for ETL where serial dbt models would otherwise reload the cache; consider 300 seconds only for warehouses with cached working sets that must stay warm (e.g., embedded dashboards during business hours).

### 5.2 Warehouse right-sizing

A Medium warehouse running at 30% queue depth is not a Large-warehouse candidate -- it is an over-provisioned Medium. Inspect the `WAREHOUSE_LOAD_HISTORY` view weekly. If average concurrent queries are well below the warehouse's capacity, step down. **Do not step up without evidence**: stepping up doubles credit cost.

### 5.3 Materialization review

For dbt projects, the rule of thumb: small dimensions = `table`, large or incremental facts = `incremental`, throwaway intermediate = `ephemeral` or `view`. Annual review pass against the project to confirm.

### 5.4 Clustering keys on hot fact tables

For any fact table that is the target of high-volume time-filtered queries (e.g., `fct_events` filtered by `event_date`), clustering by date can reduce per-query cost by 5-10x. Validate with `SYSTEM$CLUSTERING_INFORMATION` before deciding.

### 5.5 Query tag discipline

Every automated workload should set `QUERY_TAG` to identify its source (dbt project, Airflow DAG, Tableau workbook). Without this, attribution in Step 3 above is much harder. The platform team has a quarterly audit; teams without proper query tags lose access to peak-hour warehouse capacity.

---

## 6. Incident examples

### Incident 2025-03-04 (P2): dbt model full scan

A new `fct_subscription_events` model joined against the `EVENTS_RAW` table without filtering on `event_date`. Every run scanned 18 months of data. Daily impact: ~85 credits. Detected on 2025-03-04; root caused on 2025-03-04; fixed on 2025-03-05 by adding incremental config with `event_date >= dateadd(day, -3, current_date())` filter. Total credit waste: ~340 credits ($1,360 at our enterprise rate).

### Incident 2025-05-18 (P1): embedded dashboard missing RLS

A new customer-facing dashboard shipped with an incorrect Snowflake role configuration that bypassed row-level security. Every dashboard load scanned the full multi-tenant fact table. Daily impact during the 30-hour window before detection: ~520 credits. Also a security incident -- reported to security ops per playbook. Fixed by re-binding the dashboard's Snowflake user to the correct role.

### Incident 2025-07-22 (P3): Tableau extract proliferation

A new BI consumer (the Customer Success team) shipped 23 new dashboards, each backed by its own Tableau extract refreshing hourly. `WH_BI_M` credits climbed 40% in a week. Root caused via Step 3 of the diagnosis playbook. Fix: consolidated to three shared extracts feeding all 23 dashboards. Daily impact dropped by 35 credits.

### Incident 2025-08-30 (P3): out-of-hours ML training loop

A data scientist's Snowpark notebook was left running in a loop overnight after their kernel was supposed to have shut down. `WH_DS_L` ran for 11 hours of unattended training. Fixed with an `auto_suspend=300` config and a warehouse-level cron job that force-suspends `WH_DS_L` between 22:00 and 06:00 SGT. Net daily savings: ~40 credits.

---

## 7. Escalation

Route P1 (customer-impacting or security-related cost incidents) to the platform on-call and security on-call simultaneously. Route P2 (significant spend impact, no customer or security impact) to the platform team Slack channel within one business hour. Route P3 (housekeeping-level cost issues) into the weekly cost review backlog.

The platform team maintains a 30-day rolling P1 budget of $5,000. Incidents that exceed individual or cumulative budget thresholds trigger an automatic incident review.

---

## 8. Reference queries

Standard SQL snippets used by the diagnosis playbook. All target `SNOWFLAKE.ACCOUNT_USAGE` -- they require the `ACCOUNTADMIN` role or membership in the `PLATFORM_ANALYST` role.

### Cost by warehouse, last 7 days

```sql
select warehouse_name,
       sum(credits_used) as credits_7d
from snowflake.account_usage.warehouse_metering_history
where start_time >= dateadd(day, -7, current_timestamp())
group by warehouse_name
order by credits_7d desc;
```

### Top parameterized query patterns by spend, last 24h

```sql
select query_parameterized_hash,
       any_value(query_text)       as sample_text,
       count(*)                    as runs,
       sum(credits_used_cloud_services + credits_used_compute) as credits
from snowflake.account_usage.query_history
where start_time >= dateadd(hour, -24, current_timestamp())
  and warehouse_name = :warehouse
group by 1
order by credits desc
limit 10;
```

### Warehouse load profile

```sql
select date_trunc('hour', start_time) as hr,
       avg_running                    as avg_running_queries,
       avg_queued_load                as avg_queued_queries
from snowflake.account_usage.warehouse_load_history
where warehouse_name = :warehouse
  and start_time >= dateadd(day, -14, current_timestamp())
order by hr;
```

---

*End of runbook. For platform-team Slack, see #data-platform-ops. For escalation, see Section 7.*
"""


def gen_warehouse_runbook(out_path: Path) -> None:
    out_path.write_text(WAREHOUSE_RUNBOOK)
    print(f"wrote warehouse runbook -> {out_path}")


DATA_QUALITY_RUNBOOK = """# Data Quality Runbook

*Owner: Analytics Engineering Team. Last reviewed: 2025-10-01.*

This runbook documents how Acme SaaS Co. detects, diagnoses, remediates, and prevents data quality failures across the production data platform. It covers the dbt test suite, the Great Expectations checkpoint layer, and the manual investigation playbook for DQ incidents that slip through automated checks.

Read together with the Snowflake Warehouse Cost Runbook for incidents where a DQ failure causes a cost spike (e.g., a broken incremental filter that forces a full re-scan).

---

## 1. What counts as a data quality issue at Acme

Acme classifies DQ failures along four axes:

| Dimension    | Definition                                                  | Example failure                                      |
|---|---|---|
| Completeness | Expected rows or columns are missing                        | Daily event table has no rows after 14:00 UTC        |
| Freshness    | Data is older than the SLA allows                           | `fct_subscriptions` last updated 26 hours ago        |
| Accuracy     | Values are incorrect relative to the source of truth        | MRR figures in Snowflake differ from Stripe by >0.5% |
| Consistency  | Referential or business-rule integrity is broken            | `subscription_id` in `fct_events` has no match in `dim_subscriptions` |

A fifth dimension -- **uniqueness** -- is implicit in all four: duplicated primary keys amplify every other failure mode and are treated as P1 incidents regardless of downstream impact.

---

## 2. DQ monitoring stack

Acme runs two layers of automated DQ monitoring:

### 2.1 dbt tests (first layer -- model-level, runs on every dbt job)

Every production model has at minimum:
- `not_null` on all primary key columns
- `unique` on all primary key columns
- `accepted_values` on any column with a defined enum (e.g., `segment`, `plan_type`, `event_type`)
- `relationships` checks on all foreign keys pointing to `dim_*` tables

Failing a dbt test blocks the downstream model from materialising. Test results are emitted to the `DBT_TEST_RESULTS` table in Snowflake and surfaced in the Elementary dashboard.

### 2.2 Great Expectations checkpoints (second layer -- dataset-level, runs post-dbt)

For `fct_subscriptions`, `fct_mrr`, and `fct_events` (the three tables downstream consumers depend on most heavily), a Great Expectations suite runs after the dbt job completes. It checks:
- Row count within 20% of the trailing 7-day average (freshness + completeness proxy)
- MRR total within 0.5% of the Stripe API pull (accuracy)
- Zero null `account_id` values (completeness, belt-and-suspenders over the dbt test)
- Event timestamp distribution not bimodal (distribution check against a known-bad pattern)

GE checkpoint failures send a PagerDuty alert to the analytics engineering on-call. dbt test failures send a Slack alert to `#data-quality-alerts`.

---

## 3. Common root causes of DQ failures

In order of frequency over the past 12 months:

### 3.1 Source schema drift

An upstream SaaS vendor changes a field name, type, or enum value without notice. Common offenders: Salesforce field renames, Stripe API version upgrades, and HubSpot property migrations. Symptom: a dbt `accepted_values` test fails for a value that appears in the source but not in the expected list.

### 3.2 Incremental model filter breakage

A developer updates an incremental model's unique key or filter logic during a refactor. On the next run, the model either re-materialises rows it already has (duplicate primary keys) or silently drops rows that fall outside the new filter window. Symptom: the `unique` dbt test on the model starts failing; row counts in GE show a sudden non-monotonic drop.

### 3.3 Late-arriving events

The events pipeline has a known 2-4 hour lag from source to Snowflake under normal conditions. When upstream systems are degraded (typically 3rd-party webhooks), events arrive 12-48 hours late. Symptom: GE row count check flags the table as under-count; the lag clears itself without intervention but causes downstream freshness SLA breaches in the meantime.

### 3.4 Timezone handling error

A pipeline that ran cleanly under UTC broke when a developer introduced local-time logic for a Singapore-based client report. Timestamps shifted by 8 hours, causing events to land in the wrong partition and breaking daily aggregations at the date boundary. Symptom: row count for a specific date is roughly double; the adjacent date is roughly half. Highly non-obvious.

### 3.5 Referential integrity break after a dim table truncate

A dimension table (`dim_accounts`) was truncated and rebuilt during a one-time migration. `fct_events` rows referencing old `account_id` values now have no matching dim row. The `relationships` dbt test catches this immediately -- but only if the test runs before the downstream models that join the two tables.

---

## 4. Diagnosis playbook

Follow these steps when a DQ alert fires. Time-box each step to 10 minutes before escalating.

### Step 1: identify the affected model and dimension

Read the alert. It will name the model, the test, and the column. If the alert is from GE rather than dbt, identify which expectation suite fired and which table it covers.

### Step 2: check if the failure is new or recurring

Query `DBT_TEST_RESULTS` for the past 7 runs of the same test. A failure that first appeared today on an otherwise clean test is a new incident. A failure that has been firing intermittently for weeks is a known issue -- check the backlog before creating a new ticket.

### Step 3: check the source data

For accuracy or completeness failures, query the raw source table (`RAW.STRIPE.*`, `RAW.SALESFORCE.*`, etc.) for the same time window. If the raw source also shows the anomaly, the issue is upstream. Route to the source system owner; the analytics engineering team is not the fix.

### Step 4: check recent model changes

Run `git log --since="48 hours ago" -- models/` to find any dbt model changes in the past 48 hours that touch the affected model or its direct parents. A schema change in a parent model is the most common culprit after source drift.

### Step 5: implement the fix or roll back

For a broken incremental filter: add or restore the correct `unique_key` and filter, then run `dbt run --full-refresh --select <model>` to re-materialise cleanly. For a source schema drift: update the staging model's column mapping and accepted values, test locally with `dbt test --select <model>`, then promote.

---

## 5. Remediation patterns

### 5.1 Full-refresh as the safe default

When in doubt, a `dbt run --full-refresh` on the affected model is the safe remediation. It is more expensive (a full scan rather than incremental) but it guarantees idempotence. For large fact tables, co-ordinate with the platform team to avoid running a full-refresh on `WH_ELT_L` during peak analytics hours.

### 5.2 Soft-delete over hard-delete

When removing rows from a production table (e.g., to fix duplicates), prefer marking rows with a `_deleted_at` timestamp and filtering in downstream views rather than issuing a `DELETE` statement. This preserves the audit trail and makes rollback trivial if the fix was wrong.

### 5.3 Backfill windows

For late-arriving event fixes, a backfill job re-runs the incremental model for the affected date range with `--start-date` and `--end-date` args. Backfills always run on `WH_ELT_L` (not `WH_ELT_M`) to avoid queue contention with the regular scheduled jobs.

---

## 6. Incident examples

### Incident 2025-02-11 (P2): Stripe schema drift breaks MRR model

Stripe deprecated the `amount` field on the `invoice` object in favour of `amount_due`. The `stg_stripe__invoices` staging model silently returned null for all invoices after the migration date. `fct_mrr` showed a 100% churn rate for the affected week. Detected by GE accuracy check (MRR vs Stripe API pull diverged by 87%). Fix: updated staging model column reference; backfilled 6 days of MRR. Downstream impact: three Tableau dashboards showed incorrect data for 14 hours.

### Incident 2025-04-03 (P1): duplicate primary keys in fct_subscriptions

A developer removed the `unique_key` config from `fct_subscriptions` during a schema refactor, converting the model from incremental to full table. Every subsequent run appended a full copy of all historical rows. The `unique` dbt test caught the failure on the first run after the change -- but the change merged on a Friday afternoon and the weekend on-call rotation missed the Slack alert. By Monday, three runs had occurred and row count was 4x. Fix: restored `unique_key`, ran `--full-refresh`, re-notified downstream consumers. Post-mortem action: add PagerDuty escalation for `unique` test failures on P0 models regardless of time.

### Incident 2025-06-17 (P3): timezone shift corrupts daily event aggregation

A developer working on a Singapore-time client report introduced `convert_timezone('Asia/Singapore', event_timestamp)` in a shared staging model rather than an isolated view. All downstream daily aggregations shifted by 8 hours. Detected during a weekly data review meeting by a CSM who noticed the previous day's event count was anomalously high. Fix: reverted the staging model change; created an isolated Singapore-time view for the specific report. Lesson: timezone conversions must not occur in shared staging models.

### Incident 2025-09-08 (P2): GE row-count check false positive

A legitimate bulk data migration added 2.1M historical events to `fct_events` in a single run -- an expected one-time backfill. The GE row count check (flagging any run with >20% row count increase) fired as a false positive. The on-call engineer spent 45 minutes investigating before the migration team confirmed the cause. Action: GE checkpoint now accepts a `suppress_for_run_id` parameter that migration jobs can set to skip the volumetric check for a known-good bulk load.

---

## 7. Escalation

- **P1** (duplicated PKs on P0 models, MRR accuracy off by >1%, security-related integrity failure): page analytics engineering on-call and data platform on-call simultaneously.
- **P2** (data incorrect but not duplicated; freshness breach > 4 hours; GE suite failure on core fact tables): Slack alert to `#data-quality-alerts`; on-call responds within 1 business hour.
- **P3** (non-critical model test failure; accepted-values drift on a non-core dimension): ticket in the analytics engineering backlog; review within 2 business days.

---

## 8. Reference queries

### Recent dbt test failures

```sql
select tested_at, model_name, test_name, column_name, status, failures
from dbt_test_results
where tested_at >= dateadd(day, -3, current_timestamp())
  and status = 'fail'
order by tested_at desc;
```

### Row count trend for a model (freshness check)

```sql
select date_trunc('day', loaded_at) as day,
       count(*) as row_count
from analytics.fct_subscriptions
group by 1
order by 1 desc
limit 14;
```

### MRR accuracy spot-check (compare Snowflake vs Stripe pull)

```sql
select s.month,
       s.mrr_end as snowflake_mrr,
       st.mrr    as stripe_mrr,
       abs(s.mrr_end - st.mrr) / nullif(st.mrr, 0) as pct_diff
from analytics.fct_mrr s
join raw.stripe_mrr_reconciliation st using (month)
where s.month >= dateadd(month, -3, current_date())
order by s.month desc;
```

---

*End of runbook. For analytics engineering Slack, see #analytics-engineering. For escalation, see Section 7.*
"""


def gen_data_quality_runbook(out_path: Path) -> None:
    out_path.write_text(DATA_QUALITY_RUNBOOK)
    print(f"wrote data quality runbook -> {out_path}")


def gen_warehouse_usage(out_path: Path) -> None:
    """90 days of daily Snowflake warehouse credit usage across 7 warehouses.

    Columns:
        date                    ISO date, 2025-07-01 to 2025-09-28
        warehouse_name          One of the 7 Acme warehouses (matches Cost Runbook)
        credits_used            Snowflake credits consumed that day
        query_count             Number of queries executed
        avg_queue_time_s        Average queue wait time in seconds
        avg_execution_time_s    Average query execution time in seconds

    Anomalies reflect the incidents documented in runbook_warehouse_cost.md:
        - 2025-07-22 to 2025-08-10: WH_BI_M +40% (Tableau extract proliferation)
        - 2025-08-30: WH_DS_L x3.5 (overnight ML training loop)
        - 2025-05-18 window not in dataset (Q2); dataset covers Q3 only
    """
    import datetime

    rng = random.Random(SEED + 2)

    WAREHOUSES = {
        "WH_ELT_M":      {"base_credits": 12.0, "base_queries": 450},
        "WH_ELT_L":      {"base_credits":  8.0, "base_queries": 120},
        "WH_AD_HOC_S":   {"base_credits":  3.5, "base_queries": 280},
        "WH_BI_M":       {"base_credits":  6.0, "base_queries": 340},
        "WH_REVERSE_S":  {"base_credits":  2.0, "base_queries":  85},
        "WH_EMBEDDED_M": {"base_credits":  5.0, "base_queries": 520},
        "WH_DS_L":       {"base_credits":  4.0, "base_queries":  45},
    }

    rows: list[dict] = []
    start = datetime.date(2025, 7, 1)

    for day_num in range(90):
        d = start + datetime.timedelta(days=day_num)
        date_str = d.isoformat()
        is_weekend = d.weekday() >= 5

        for wh_name, params in WAREHOUSES.items():
            base_credits = params["base_credits"] * (1 + day_num * 0.001)  # slight growth
            base_queries = params["base_queries"]

            # Weekend reduction
            if is_weekend:
                base_credits *= 0.28
                base_queries = int(base_queries * 0.18)

            # Noise
            credits = base_credits * rng.uniform(0.88, 1.14)
            queries = max(1, int(base_queries * rng.uniform(0.90, 1.12)))

            # Incident: 2025-07-22 to 2025-08-10 -- WH_BI_M Tableau extract proliferation
            if wh_name == "WH_BI_M" and "2025-07-22" <= date_str <= "2025-08-10":
                credits *= 1.40
                queries = int(queries * 1.55)

            # Incident: 2025-08-30 -- WH_DS_L overnight ML training
            if wh_name == "WH_DS_L" and date_str == "2025-08-30":
                credits *= 3.5
                queries = max(1, int(queries * 0.6))

            avg_queue  = round(rng.uniform(0.05, 1.8) if not is_weekend else rng.uniform(0.0, 0.4), 2)
            avg_exec   = round(rng.uniform(1.5, 18.0), 2)

            rows.append({
                "date":                   date_str,
                "warehouse_name":         wh_name,
                "credits_used":           round(credits, 3),
                "query_count":            queries,
                "avg_queue_time_s":       avg_queue,
                "avg_execution_time_s":   avg_exec,
            })

    rows.sort(key=lambda r: (r["date"], r["warehouse_name"]))

    with open(out_path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
        writer.writeheader()
        writer.writerows(rows)

    print(f"wrote {len(rows)} rows -> {out_path}")


def gen_pipeline_jobs(out_path: Path) -> None:
    """90 days of pipeline job run history across 8 scheduled jobs.

    Dataset: 2025-07-01 to 2025-09-28, 668 rows total.
    6 jobs run daily; 2 Hightouch jobs run on weekdays only (64 weekdays in range).
    6 x 90 + 2 x 64 = 668.

    Columns:
        run_id                int, primary key
        job_name              one of 8 Acme pipeline jobs
        warehouse_name        Snowflake warehouse used
        started_at            ISO datetime (YYYY-MM-DDTHH:MM:SS)
        finished_at           ISO datetime or '' if failed/running
        duration_s            integer seconds or '' if failed
        status                success | failed | skipped
        credits_used          float, 0.0 if failed
        rows_processed        integer, 0 if failed
        error_message         non-empty only if failed

    Failure clusters (matches warehouse_usage.csv anomalies):
        2025-07-15 to 07-17: dbt_fct_subscriptions fails (Stripe schema drift)
                             dbt_fct_mrr skipped (downstream dependency)
        2025-08-05 to 08-06: hightouch_crm_sync and hightouch_marketing skipped
        2025-08-20:          multi-job failure (dbt_fct_subscriptions, airflow_stripe_ingest,
                             dbt_fct_events) -- correlates with WH_BI_M spike window
    """
    import datetime

    rng = random.Random(SEED + 3)

    # Job configuration: name, warehouse, base start time (H, M), base duration (s),
    # base credits, base rows, daily=True means every day, False=weekdays only.
    JOBS = [
        # (name, warehouse, start_h, start_m, base_dur_s, base_credits, base_rows, daily)
        ("dbt_fct_subscriptions", "WH_ELT_M",  1, 30, 1830,  4.22,  175_000, True),
        ("dbt_fct_mrr",           "WH_ELT_M",  1, 19,  938,  2.09,   42_000, True),
        ("hightouch_marketing",   "WH_REVERSE_S", 1, 48, 510, 0.70,   18_500, False),
        ("airflow_sf_ingest",     "WH_ELT_M",  2, 40,  755,  1.29,   29_000, True),
        ("airflow_stripe_ingest", "WH_ELT_M",  3, 37,  318,  0.50,    9_800, True),
        ("dbt_fct_events",        "WH_ELT_L",  3, 50, 2771,  6.93,  750_000, True),
        ("hightouch_crm_sync",    "WH_REVERSE_S", 3, 16, 637, 0.88,   25_000, False),
        ("dbt_dim_accounts",      "WH_ELT_M",  4,  4,  322,  0.59,   12_500, True),
    ]

    # Hardcoded failure / skip dates (deterministic, not randomised)
    STRIPE_DRIFT_ERR = (
        "KeyError: column amount_due not found in source stg_stripe__invoices. "
        "Schema drift detected -- upstream Stripe API changed field name."
    )
    TIMEOUT_ERR = "Internal error: query exceeded warehouse timeout limit of 3600s."
    WH_SUSPEND_ERR = (
        "Runtime error: warehouse WH_ELT_L suspended unexpectedly during query execution."
    )

    FORCED_STATUSES: dict[tuple[str, str], tuple[str, str]] = {
        # (job_name, date_str) -> (status, error_message)
        # Stripe schema drift cluster
        ("dbt_fct_subscriptions", "2025-07-15"): ("failed", STRIPE_DRIFT_ERR),
        ("dbt_fct_subscriptions", "2025-07-16"): ("failed", STRIPE_DRIFT_ERR),
        ("dbt_fct_subscriptions", "2025-07-17"): ("failed", STRIPE_DRIFT_ERR),
        ("dbt_fct_mrr",           "2025-07-15"): ("skipped", ""),
        ("dbt_fct_mrr",           "2025-07-16"): ("skipped", ""),
        ("dbt_fct_mrr",           "2025-07-17"): ("skipped", ""),
        # Hightouch outage
        ("hightouch_crm_sync",    "2025-08-05"): ("skipped", ""),
        ("hightouch_crm_sync",    "2025-08-06"): ("skipped", ""),
        ("hightouch_marketing",   "2025-08-05"): ("skipped", ""),
        ("hightouch_marketing",   "2025-08-06"): ("skipped", ""),
        # Multi-job failure event 2025-08-20
        ("dbt_fct_subscriptions", "2025-08-20"): ("failed", STRIPE_DRIFT_ERR),
        ("airflow_stripe_ingest", "2025-08-20"): ("failed", TIMEOUT_ERR),
        ("dbt_fct_events",        "2025-08-20"): ("failed", WH_SUSPEND_ERR),
    }

    # Scattered random failure rate per job (probability of failure on a non-forced day)
    FAILURE_RATE = {
        "dbt_fct_subscriptions": 0.025,
        "dbt_fct_mrr":           0.015,
        "hightouch_marketing":   0.015,
        "airflow_sf_ingest":     0.040,
        "airflow_stripe_ingest": 0.030,
        "dbt_fct_events":        0.035,
        "hightouch_crm_sync":    0.030,
        "dbt_dim_accounts":      0.000,  # always succeeds
    }

    rows: list[dict] = []
    run_id = 1
    start_date = datetime.date(2025, 7, 1)

    for day_num in range(90):
        d = start_date + datetime.timedelta(days=day_num)
        date_str = d.isoformat()
        is_weekday = d.weekday() < 5  # Monday=0, Friday=4

        for (job_name, wh, start_h, start_m,
             base_dur, base_credits, base_rows, daily) in JOBS:

            if not daily and not is_weekday:
                continue  # Hightouch jobs skip weekends

            started_at = datetime.datetime(
                d.year, d.month, d.day, start_h, start_m, 0
            ) + datetime.timedelta(seconds=rng.randint(-90, 90))
            started_str = started_at.strftime("%Y-%m-%dT%H:%M:%S")

            # Check for forced status first
            forced = FORCED_STATUSES.get((job_name, date_str))
            if forced:
                status, err_msg = forced
            else:
                fail_p = FAILURE_RATE.get(job_name, 0.0)
                status = "failed" if rng.random() < fail_p else "success"
                err_msg = TIMEOUT_ERR if status == "failed" else ""

            if status == "skipped":
                rows.append({
                    "run_id":        run_id,
                    "job_name":      job_name,
                    "warehouse_name": wh,
                    "started_at":    started_str,
                    "finished_at":   "",
                    "duration_s":    "",
                    "status":        "skipped",
                    "credits_used":  "0.0",
                    "rows_processed": "0",
                    "error_message": "",
                })
            elif status == "failed":
                rows.append({
                    "run_id":        run_id,
                    "job_name":      job_name,
                    "warehouse_name": wh,
                    "started_at":    started_str,
                    "finished_at":   "",
                    "duration_s":    "",
                    "status":        "failed",
                    "credits_used":  "0.0",
                    "rows_processed": "0",
                    "error_message": err_msg,
                })
            else:
                dur_s = max(60, int(base_dur * rng.uniform(0.80, 1.25)))
                finished_at = started_at + datetime.timedelta(seconds=dur_s)
                credits = round(base_credits * rng.uniform(0.82, 1.20), 3)
                row_count = max(1, int(base_rows * rng.uniform(0.90, 1.12)))
                rows.append({
                    "run_id":        run_id,
                    "job_name":      job_name,
                    "warehouse_name": wh,
                    "started_at":    started_str,
                    "finished_at":   finished_at.strftime("%Y-%m-%dT%H:%M:%S"),
                    "duration_s":    str(dur_s),
                    "status":        "success",
                    "credits_used":  str(credits),
                    "rows_processed": str(row_count),
                    "error_message": "",
                })

            run_id += 1

    with open(out_path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
        writer.writeheader()
        writer.writerows(rows)

    print(f"wrote {len(rows)} rows -> {out_path}")


def main() -> None:
    DATA_DIR.mkdir(exist_ok=True)
    gen_saas_metrics(DATA_DIR / "saas_metrics.csv")
    gen_qbr_markdown(DATA_DIR / "qbr_q3_2025.md")
    gen_qbr_pdf(DATA_DIR / "qbr_q3_2025.md", DATA_DIR / "qbr_q3_2025.pdf")
    gen_dashboard_png(DATA_DIR / "saas_metrics.csv", DATA_DIR / "dashboard_screenshot.png")
    gen_warehouse_runbook(DATA_DIR / "runbook_warehouse_cost.md")
    gen_data_quality_runbook(DATA_DIR / "runbook_data_quality.md")
    gen_warehouse_usage(DATA_DIR / "warehouse_usage.csv")
    gen_pipeline_jobs(DATA_DIR / "pipeline_jobs.csv")


if __name__ == "__main__":
    main()
