---
title: "Joins in Production"
author: "Gilles Colling"
date: "`r Sys.Date()`"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Joins in Production}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

```{r setup, include = FALSE}
knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>",
  fig.width = 6,
  fig.height = 4
)
library(joinspy)
```

At the console, we see the wrong row count, fix the key, and re-run. In a
scheduled script, the same issue goes unnoticed until someone looks at the
output. This vignette wires joinspy into automated pipelines so that join
problems surface as errors or log entries. Every example uses synthetic data
and `tempfile()` paths, so the whole thing runs end to end.

Production here means any context where nobody watches the console: cron jobs,
CI runs, scheduled reports, Shiny back ends, `targets` pipelines. The failure
mode is depressingly uniform. An upstream table gains a duplicate key or a
whitespace-padded ID, the join silently multiplies or drops rows, and the
numbers in the final report drift. Weeks can pass before anyone traces the
drift back to the join, and by then the bad rows have propagated into every
table built on top of it. The tools below convert that slow, silent failure
into a loud, immediate one: a non-zero exit status the scheduler flags, a log
entry a monitoring script picks up, or a failed unit test in CI.

We work through the pieces in increasing order of machinery -- hard
assertions, silent instrumented joins, programmatic report inspection,
cardinality guards, unit tests for join contracts, logging, multi-join chain
analysis, and finally what all of this costs at runtime.

## Assertions with key_check()

`key_check()` returns a single logical -- `TRUE` if no issues were detected,
`FALSE` otherwise. Behind that logical sit four scans: duplicate keys, NA
keys, leading or trailing whitespace, and case mismatches between the two
tables. The simplest assertion wraps it in `stopifnot()`:

```{r}
orders <- data.frame(
  customer_id = c("C01", "C02", "C03", "C04"),
  amount = c(100, 200, 150, 300),
  stringsAsFactors = FALSE
)

customers <- data.frame(
  customer_id = c("C01", "C02", "C03", "C04"),
  region = c("East", "West", "East", "North"),
  stringsAsFactors = FALSE
)

# This passes -- keys are clean
stopifnot(key_check(orders, customers, by = "customer_id", warn = FALSE))
```

An assertion this cheap can run on every execution. When it passes, it adds
nothing to the log; when it fails, `stopifnot()` raises an error, the script
exits with a non-zero status, and the scheduler marks the run as failed. That
exit status is what makes the pattern work: cron, GitHub Actions, and Airflow
all key their alerting off it, so a failed key check becomes a notification
without any extra plumbing.

When the keys have problems, the script halts:

```{r error = TRUE}
orders_dirty <- data.frame(
  customer_id = c("C01", "C02 ", "C03 ", "C04"),
  amount = c(100, 200, 150, 300),
  stringsAsFactors = FALSE
)

stopifnot(key_check(orders_dirty, customers, by = "customer_id", warn = FALSE))
```

The two padded IDs (`"C02 "` and `"C03 "`) would silently fail to match in a
left join -- the rows survive, the customer columns come back `NA`, and
downstream aggregation quietly treats them as customerless orders. Halting at
the check, before the join runs, keeps the bad rows out of everything built
afterwards.

With `warn = FALSE`, the printed diagnostics are suppressed; in a cron job or
CI pipeline, the hard failure is the message. An explicit `if`/`stop` gives us
a custom error text:

```{r error = TRUE}
if (!key_check(orders_dirty, customers, by = "customer_id", warn = FALSE)) {
  stop("Key quality check failed for orders-customers join. ",
       "Run join_spy() interactively for details.", call. = FALSE)
}
```

The custom message earns its keep months later, when the error lands in a log
file at 3 a.m. and whoever reads it has no context. Naming the join and
pointing at `join_spy()` turns a bare `stopifnot` failure into a starting
point for diagnosis.

We can also chain the assertion with a repair step -- run `key_check()` first,
repair on failure, then re-check:

```{r}
ok <- key_check(orders_dirty, customers, by = "customer_id", warn = FALSE)

if (!ok) {
  repaired <- join_repair(
    orders_dirty, customers,
    by = "customer_id",
    trim_whitespace = TRUE,
    remove_invisible = TRUE
  )
  orders_clean <- repaired$x
  customers_clean <- repaired$y

  # Re-check after repair
  stopifnot(key_check(orders_clean, customers_clean,
                       by = "customer_id", warn = FALSE))
}
```

`join_repair()` only touches mechanical defects -- here, trimming whitespace
and stripping invisible Unicode characters from the key columns. It cannot
invent missing customers or decide which of two duplicate rows is correct, so
the re-check after repair matters: if the second `key_check()` still fails,
the problem is structural (true duplicates, NA keys) and the script should
stop for a human.

`key_check()` is a binary pass/fail gate; `join_spy()` builds a full report
with match rates, expected row counts, and categorized issues. The split
mirrors how the checks run: `key_check()` performs the cheap scans and skips
the heavier analyses such as near-match detection, so it stays affordable on
every execution. In production, `key_check()` runs every time. `join_spy()` is
what we reach for when an assertion fails and we need to understand why.

## Silent Joins in Pipelines

The `*_join_spy()` wrappers print diagnostic output by default. In a scheduled
script, `.quiet = TRUE` suppresses all printed output while still computing the
report internally.

```{r}
sensors <- data.frame(
  sensor_id = c("S01", "S02", "S03", "S04"),
  location = c("Roof", "Basement", "Lobby", "Garage"),
  stringsAsFactors = FALSE
)

readings <- data.frame(
  sensor_id = c("S01", "S02", "S03", "S05"),
  temperature = c(22.1, 18.5, 21.0, 19.3),
  stringsAsFactors = FALSE
)

# Nothing printed
result <- left_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE)
```

`.quiet = TRUE` overrides `verbose`, so it wins regardless of what a caller
passed for `verbose`. The report is computed either way and stored twice: once
in the package environment, where `last_report()` finds it, and once as an
attribute on the returned data frame.

```{r}
rpt <- last_report()
rpt$match_analysis$match_rate
```

The match rate is the share of unique left-table keys that found a partner on
the right -- here 3 of the 4 sensor IDs, since `S04` has no reading and `S05`
has no sensor. The join runs silently; later, we pull the report and check its
contents programmatically:

```{r}
rpt <- last_report()
if (rpt$match_analysis$match_rate < 0.95) {
  warning(sprintf(
    "Low match rate (%.1f%%) in sensor join -- check for missing sensor IDs",
    rpt$match_analysis$match_rate * 100
  ))
}
```

The report object is a plain list, so standard R subsetting works for
arbitrarily complex validation logic. Anything the printed report shows is
reachable by name; the next section walks the full structure.

One caveat: `last_report()` stores only the most recent report. If a script
performs three joins in sequence, only the third report survives. To retain
earlier reports, capture them explicitly:

```{r}
result1 <- left_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE)
report1 <- last_report()

# ... later ...
result2 <- inner_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE)
report2 <- last_report()
```

The same report also travels with the result as an attribute, which sidesteps
the global state entirely:

```{r}
identical(attr(result1, "join_report"), report1)
```

`attr(result1, "join_report")` returns the report for that specific join no
matter how many joins ran since. Inside package code or functions other people
call, the attribute is the safer interface; `last_report()` is a console
convenience that happens to also work in scripts.

## Inspecting Reports Programmatically

Threshold checks like the one above need to know where the numbers live. A
`JoinReport` is an S3 object built on a named list, and its components map
one-to-one onto the sections of the printed report:

```{r}
rpt <- join_spy(orders_dirty, customers, by = "customer_id")
names(rpt)
```

`x_summary` and `y_summary` describe the key columns of each table: row count
(`n_rows`), unique keys (`n_unique`), duplicated keys (`n_duplicates`), rows
affected by duplication (`n_duplicate_rows`), and NA keys (`n_na`).
`match_analysis` holds the overlap: `n_matched`, `n_left_only`,
`n_right_only`, and `match_rate`. `expected_rows` predicts the row count for
each join type (`inner`, `left`, `right`, `full`), and `issues` is a list with
one entry per detected problem. Of the remaining components, `by` records the
key, `multicolumn_analysis` scores each column of a composite key separately,
and `memory_estimate` holds human-readable result-size estimates per join
type.

```{r}
rpt$x_summary$n_duplicates
rpt$match_analysis$match_rate
rpt$expected_rows$left
```

The padded IDs cut the match rate to 50% even though neither table has a
single duplicate. For alerting, three numbers cover most needs: the match
rate, the issue count, and the expected row count for the join type about to
run.

```{r}
length(rpt$issues)
vapply(rpt$issues, function(i) i$type, character(1))
```

Each issue is itself a list with a `type`, a `severity` (`"error"`,
`"warning"`, or `"info"`), a human-readable `message`, and usually a `details`
element holding the offending values. Filtering on severity separates problems
that should halt a pipeline from observations that belong in a log:

```{r}
severities <- vapply(rpt$issues, function(i) i$severity, character(1))
severities
sum(severities == "warning")
```

Here the whitespace issue carries `"warning"` severity while the near-match
hint is merely `"info"`. Wrapping the thresholds in a function gives the
pipeline a single reusable gate:

```{r}
report_gate <- function(rpt, min_match = 0.95) {
  stopifnot(is_join_report(rpt))
  problems <- character(0)
  if (rpt$match_analysis$match_rate < min_match) {
    problems <- c(problems, sprintf(
      "match rate %.0f%% below %.0f%%",
      100 * rpt$match_analysis$match_rate, 100 * min_match
    ))
  }
  sev <- vapply(rpt$issues, function(i) i$severity, character(1))
  if (any(sev == "warning")) {
    problems <- c(problems, sprintf("%d warning-level issue(s)",
                                    sum(sev == "warning")))
  }
  problems
}

report_gate(rpt)
```

`is_join_report()` guards the entry point. `last_report()` returns `NULL`
before any join has run, and a half-wired pipeline will happily pass that
`NULL` along; failing on the class check localizes the bug to the gate's
caller. The returned character vector doubles as the alert body -- empty means
healthy, anything else is the text of the page.

For metric collection, `summary()` flattens the same numbers into a
two-column data frame:

```{r}
summary(rpt)
```

That shape suits accumulating one row of metrics per pipeline run or writing
into a database table; `summary(rpt, format = "markdown")` produces a table
for inclusion in a rendered report.

## Cardinality Guards

A join that was one-to-one in development can become many-to-many in production
when upstream data changes. `join_strict()` enforces a cardinality constraint
and throws an error if it is violated.

In development, we use `detect_cardinality()` to understand the actual
relationship:

```{r}
products <- data.frame(
  product_id = c("P1", "P2", "P3"),
  name = c("Widget", "Gadget", "Gizmo"),
  stringsAsFactors = FALSE
)

line_items <- data.frame(
  product_id = c("P1", "P1", "P2", "P3", "P3"),
  order_id = c(101, 102, 103, 104, 105),
  stringsAsFactors = FALSE
)

detect_cardinality(products, line_items, by = "product_id")
```

One-to-many: each product appears once in `products` but can appear multiple
times in `line_items`. `detect_cardinality()` prints its finding and returns
the string invisibly, so the same call works interactively and as a value in a
gate (`card <- detect_cardinality(...)`). We encode the expectation in
production:

```{r}
result <- join_strict(
  products, line_items,
  by = "product_id",
  type = "left",
  expect = "1:n"
)
nrow(result)
```

If someone loads a `products` table with duplicate product IDs, the script
fails immediately:

```{r error = TRUE}
products_bad <- data.frame(
  product_id = c("P1", "P1", "P2", "P3"),
  name = c("Widget", "Widget v2", "Gadget", "Gizmo"),
  stringsAsFactors = FALSE
)

join_strict(
  products_bad, line_items,
  by = "product_id",
  type = "left",
  expect = "1:n"
)
```

This is the classic slow failure made fast. A reference table that was clean
for two years gains its first duplicate -- a vendor file gets loaded twice, or
a re-run ETL step appends a second copy -- and every downstream left join
starts multiplying rows. Without the guard, the symptom is inflated totals in
a report some weeks later; with it, the run fails the same night, and the
error message carries the cause: the expected cardinality, the one actually
found, and the duplicate counts on each side.

The four cardinality levels:

- **1:1** -- lookup to lookup. Each key appears exactly once on both sides.

- **1:n** -- reference on the left, transactions on the right (products to
  line items, stations to hourly readings).
- **n:1** -- transactions on the left, lookup on the right (sales joined to a
  region table).
- **n:m** -- duplicates on both sides. Almost always a bug; requiring an
  explicit `expect = "n:m"` acts as a speed bump.

In practice, `"1:n"` and `"n:1"` cover most production joins.
`detect_cardinality()` confirms the relationship during development; the
`expect` value is then hard-coded in the production script.

`check_cartesian()` solves a different problem: it warns about Cartesian
product explosion when a key has many duplicates on *both* sides. A join can
violate a `"1:1"` constraint without triggering a Cartesian explosion (one
extra duplicate is enough), and a `"n:m"` join can produce a massive product
that `join_strict()` would allow. The two functions complement each other.

```{r}
events_a <- data.frame(id = rep(c("E1", "E2"), each = 20), src = "a",
                       stringsAsFactors = FALSE)
events_b <- data.frame(id = rep(c("E1", "E2"), each = 20), src = "b",
                       stringsAsFactors = FALSE)

chk <- check_cartesian(events_a, events_b, by = "id")
chk$expansion_factor
```

The return value is a list with `has_explosion`, the `expansion_factor`, the
predicted `total_inner` row count, and a `worst_keys` data frame naming the
keys responsible. As a pipeline gate, `if (chk$has_explosion) stop(...)`
catches the blow-up before the join allocates the memory. The 20x factor here
turns a 40-row input into an 800-row result; the same arithmetic on a
million-row table is what fills a server's RAM at 2 a.m.

## Testing Join Contracts with testthat

The gates so far run when the pipeline runs. A second line of defense runs
when the code changes: unit tests that pin down the join contracts -- which
columns join which tables, at what cardinality, with what key quality. These
contracts rarely appear in documentation; they live in the heads of whoever
wrote the pipeline. Encoding them as testthat expectations lets them survive
refactors and staff turnover.

Since `key_check()` returns a logical and `detect_cardinality()` returns a
string, both drop straight into expectations:

```{r, eval = requireNamespace("testthat", quietly = TRUE)}
library(testthat)

test_that("orders join customers cleanly on customer_id", {
  expect_true(key_check(orders, customers, by = "customer_id", warn = FALSE))
})
```

```{r, eval = requireNamespace("testthat", quietly = TRUE)}
test_that("products to line_items is one-to-many", {
  expect_identical(
    detect_cardinality(products, line_items, by = "product_id"),
    "1:n"
  )
})
```

The cardinality test is the one that pays off. Cardinality violations come
from data, and data changes without commits; running this test against a fresh
extract in CI, or on a schedule, catches the first duplicate product before
the nightly join multiplies line items. A third useful contract is row
preservation, written against the prediction in the report:

```{r, eval = requireNamespace("testthat", quietly = TRUE)}
test_that("left join is predicted to preserve order rows", {
  rpt <- join_spy(orders, customers, by = "customer_id")
  expect_equal(rpt$expected_rows$left, nrow(orders))
})
```

`expected_rows$left` equals `nrow(orders)` exactly when no left key matches a
duplicated right key, so this single expectation encodes "the enrichment join
does not multiply rows" without running the join. In a package or pipeline
repository these tests live in `tests/testthat/test-join-contracts.R` and run
with everything else under `testthat::test_dir()` or `R CMD check`. What data
they point at is the design decision: against committed fixture files they
test the code's assumptions; against a small fresh extract they test the data
itself, which makes them a scheduled data-quality job wearing a unit-test
interface.

## Logging and Audit Trails

### Manual logging

`log_report()` writes a single report to a file. The format depends on the
file extension: `.txt` and `.log` produce human-readable text, `.json`
produces machine-readable JSON, and `.rds` saves the complete R object.

```{r}
report <- join_spy(sensors, readings, by = "sensor_id")

# Text format -- human-readable
txt_log <- tempfile(fileext = ".log")
log_report(report, txt_log)
cat(readLines(txt_log), sep = "\n")
unlink(txt_log)
```

The text entry mirrors the printed report -- key column, per-table summaries,
match analysis, expected rows per join type, and an issue tally -- with a
separator line so consecutive entries stay readable in one file. By default
`log_report()` overwrites; `append = TRUE` adds to the end (text and log
formats only), and the timestamp written with each entry means a grep for a
date range works on the raw file.

```{r}
# JSON format -- machine-readable
json_log <- tempfile(fileext = ".json")
log_report(report, json_log)
cat(readLines(json_log), sep = "\n")
unlink(json_log)
```

Text format works for tailing logs during a batch run; JSON format feeds into
monitoring systems or downstream scripts. Reports can also be saved as `.rds`
files, which preserves the full R object for later interactive inspection.
The `.rds` route matters when the issues themselves need preserving: text and
JSON record issue counts and types, while the RDS file keeps each issue's
`details` element -- the actual offending key values -- which is what we want
in front of us when reconstructing what went wrong last Tuesday.

### Automatic logging

For scripts with many joins, `set_log_file()` at the top is cleaner than
calling `log_report()` after each one. Every subsequent `*_join_spy()` call
appends its report to the file.

```{r}
auto_log <- tempfile(fileext = ".log")
set_log_file(auto_log, format = "text")

# These joins are automatically logged
result1 <- left_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE)
result2 <- inner_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE)

# Check what got logged
cat(readLines(auto_log), sep = "\n")

# Clean up
set_log_file(NULL)
unlink(auto_log)
```

Each wrapper call appends one entry, so the file accumulates a join-by-join
history of the run even when every join is `.quiet`. `set_log_file()` returns
the previous setting invisibly, which lets a function enable logging for its
own joins and then put back whatever the caller had configured:

```{r}
fn_log <- tempfile(fileext = ".log")
previous <- set_log_file(fn_log)

result <- left_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE)

set_log_file(previous)
unlink(fn_log)
```

Automatic logging only triggers from `*_join_spy()` wrappers. `join_strict()`
and bare `merge()` calls are not logged -- the wrappers are the instrumented
path. To combine cardinality enforcement with logging, run
`detect_cardinality()` as a separate check and use a `*_join_spy()` wrapper for
the actual join.

`get_log_file()` returns the current log path (or `NULL` if logging is
disabled), which is useful inside helpers that should log only when the
surrounding pipeline asked for it:

```{r}
# Only log if logging is configured
if (!is.null(get_log_file())) {
  message("Logging is active at: ", get_log_file())
}
```

## Reading JSON Logs Downstream

The JSON format exists for consumers that are not R: a dashboard, a cron
wrapper in Python, a shell script that greps last night's log. joinspy
serializes reports with an internal base-R writer, so the package carries no
JSON dependency, and the output is plain JSON that any parser reads. Each
entry records the join key, both table summaries, the match analysis, the
expected row counts, the issue count and types, and a timestamp.

Each report is appended as its own object, so the file is a sequence of JSON
objects; line-oriented tools work on it directly, and a full parser reads it
one object at a time. Reading it back in R needs nothing beyond base
functions, so we log two joins and then play the part of the monitoring
script:

```{r}
json_log <- tempfile(fileext = ".json")
set_log_file(json_log, format = "json")

r1 <- left_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE)
r2 <- inner_join_spy(orders, customers, by = "customer_id", .quiet = TRUE)

set_log_file(NULL)
```

The monitoring side reads the file, finds the lines carrying the field it
cares about, and extracts the values:

```{r}
log_lines <- readLines(json_log)
rate_lines <- grep('"match_rate"', log_lines, value = TRUE)
rate_lines
```

```{r}
rates <- as.numeric(sub('.*"match_rate": *([0-9.]+).*', "\\1", rate_lines))
rates
which(rates < 0.95)
unlink(json_log)
```

The sensor join logged its 75% match rate, the order join a clean 100%, and
the threshold flags entry one. The same extract pattern works on `"n_issues"`
for issue-count alerting. It also works on `"logged_at"` for staleness checks
-- a pipeline whose newest log entry is two days old has a different problem
than a pipeline logging failures, and the timestamp catches it. For richer
processing, any JSON library parses the entries one object at a time; the
field names match the report components walked through earlier, so a script
written against the JSON and a script written against `last_report()` read the
same vocabulary.

## Diagnosing a Multi-Join Pipeline

Real pipelines chain joins: orders pick up customer attributes, customers pick
up regions, regions pick up targets. When the final table has the wrong row
count, the offending join could be any of them, and checking each one by hand
means re-running the pipeline step by step. `analyze_join_chain()` does that
walk in one call -- it takes a named list of tables and a list of join
specifications, runs `join_spy()` at every step, carries the intermediate
result forward with a left join, and prints a per-step summary.

```{r}
orders_chain <- data.frame(
  order_id = 1:6,
  customer_id = c("C1", "C2", "C2", "C3", "C4", "C4"),
  stringsAsFactors = FALSE
)

customers_chain <- data.frame(
  customer_id = c("C1", "C2", "C3", "C4"),
  region_id = c("R1", "R1", "R2", "R3"),
  stringsAsFactors = FALSE
)

regions <- data.frame(
  region_id = c("R1", "R2"),
  region_name = c("North", "South"),
  stringsAsFactors = FALSE
)
```

```{r}
chain <- analyze_join_chain(
  tables = list(orders = orders_chain, customers = customers_chain,
                regions = regions),
  joins = list(
    list(left = "orders", right = "customers", by = "customer_id"),
    list(left = "result", right = "regions", by = "region_id")
  )
)
```

The literal name `"result"` refers to the accumulated output of the previous
steps, so chains of any length need only that one keyword. Step 1 reports the
order-side duplicates, which are expected for transaction data joining a
lookup. The real finding is at step 2, where the match rate drops: region
`"R3"` exists in the customer table and has no row in `regions`. That is the
kind of fact that hides for months in a three-join pipeline, because each
individual join "works".

The return value is a list with one entry per step, each holding the table
names, the `by` columns, and the full `JoinReport`, so everything from the
programmatic-inspection section applies per step:

```{r}
chain[[2]]$report$match_analysis$match_rate
chain[[2]]$report$match_analysis$left_only_keys
```

A chain-level gate reduces to one vector:

```{r}
vapply(chain, function(s) length(s$report$issues), integer(1))
```

In production we run the chain analysis at deployment time and whenever an
upstream schema changes. Since it performs each intermediate left join to feed
the next step, its cost is comparable to running the pipeline's joins once,
which makes it a pre-flight and post-incident tool; the per-run gates stay
with `key_check()` and `join_strict()`.

## Sampling for Large Datasets

The `sample` parameter in `join_spy()` runs the analysis on a random subset
while the actual join (via a `*_join_spy()` wrapper) still operates on the full
data.

```{r}
# Simulate a large dataset
set.seed(42)
big_orders <- data.frame(
  customer_id = sample(paste0("C", sprintf("%04d", 1:5000)), 50000, replace = TRUE),
  amount = round(runif(50000, 10, 500), 2),
  stringsAsFactors = FALSE
)

big_customers <- data.frame(
  customer_id = paste0("C", sprintf("%04d", 1:6000)),
  region = sample(c("North", "South", "East", "West"), 6000, replace = TRUE),
  stringsAsFactors = FALSE
)

# Full analysis
system.time(report_full <- join_spy(big_orders, big_customers, by = "customer_id"))

# Sampled analysis
system.time(report_sampled <- join_spy(big_orders, big_customers,
                                        by = "customer_id", sample = 5000))
```

The sampled report records its own provenance in `report$sampling` -- the
sample size and the original row counts -- and the printed report flags it, so
a log reader can tell estimated diagnostics from exact ones:

```{r}
report_sampled$sampling
```

The sampled report is approximate -- match rates and duplicate counts are
estimated from the subset. For production monitoring, we typically care whether
the match rate is roughly 95% or roughly 60%, not whether it is 94.7% or
95.1%. Sampling catches systemic problems (wrong key column, widespread
encoding issues, duplicate explosion) with a fraction of the runtime. A
`set.seed()` ahead of the call makes the sampled diagnostic reproducible,
which keeps two runs on identical data from flapping between alert and no
alert.

Sampling can miss rare issues. If 0.1% of keys have a zero-width space, a
5,000-row sample from a 10-million-row table might not include any. Running
full diagnostics periodically (weekly, or when the upstream source changes)
alongside sampled daily runs covers both speed and thoroughness.

## A Complete Production Pattern

Here is a realistic production workflow: a nightly job loads order and customer
data, validates keys, repairs if needed, joins with cardinality enforcement, and
logs everything.

```{r}
# ============================================================
# Nightly order enrichment pipeline
# ============================================================

# --- Setup logging ---
pipeline_log <- tempfile(fileext = ".log")
set_log_file(pipeline_log, format = "text")

# --- Load data (simulated) ---
orders <- data.frame(
  order_id = 1:6,
  customer_id = c("C001", "C002 ", "C003", "C003", "C004", "C005"),
  amount = c(150, 230, 89, 410, 320, 175),
  stringsAsFactors = FALSE
)

customers <- data.frame(
  customer_id = c("C001", "C002", "C003", "C004", "C005", "C006"),
  name = c("Acme Corp", "Globex", "Initech", "Umbrella", "Soylent", "Wonka"),
  tier = c("gold", "silver", "gold", "bronze", "silver", "gold"),
  stringsAsFactors = FALSE
)

# --- Gate 1: key quality assertion ---
keys_ok <- key_check(orders, customers, by = "customer_id", warn = FALSE)

if (!keys_ok) {
  message("Key issues detected -- attempting repair")
  repaired <- join_repair(
    orders, customers,
    by = "customer_id",
    trim_whitespace = TRUE,
    remove_invisible = TRUE
  )
  orders <- repaired$x
  customers <- repaired$y
}

# --- Gate 2: cardinality check ---
card <- detect_cardinality(orders, customers, by = "customer_id")
if (card == "n:m") {
  set_log_file(NULL)
  unlink(pipeline_log)
  stop("Unexpected n:m cardinality in orders-customers join", call. = FALSE)
}

# --- Join (with auto-logging via *_join_spy) ---
enriched <- left_join_spy(orders, customers, by = "customer_id", .quiet = TRUE)

# --- Gate 3: row count sanity check ---
# A left join should never lose rows from the left table
if (nrow(enriched) < nrow(orders)) {
  set_log_file(NULL)
  unlink(pipeline_log)
  stop("Row count decreased after left join -- possible data corruption",
       call. = FALSE)
}

# --- Output ---
message(sprintf("Pipeline complete: %d enriched orders", nrow(enriched)))
head(enriched)

# --- Review the log ---
if (file.exists(pipeline_log)) {
  cat(readLines(pipeline_log), sep = "\n")
}

# --- Cleanup ---
set_log_file(NULL)
unlink(pipeline_log)
```

The three gates catch different failure modes. Gate 1 catches string-level
problems and repairs the mechanical ones; in this run it finds the padded
`"C002 "` and trims it, so that customer keeps its name and tier in the
enriched output. Gate 2 halts on the one structural problem repair cannot
touch: an unexpected many-to-many relationship, the kind that multiplies rows.
Gate 3 is the catch-all -- a left join returning fewer rows than its left
table means something went wrong at a level the key diagnostics do not see,
such as an upstream filter applied to the wrong object. Logging runs
throughout because `set_log_file()` was called at the top, so even a run that
dies at gate 3 leaves a record of the join that preceded the death.

Equally deliberate is what the pattern does not check. It never verifies that
`customer_id` is the right column to join on; a clean key on the wrong column
passes every gate. It enforces no match-rate floor, so orders for customers
missing from the reference table sail through with `NA` names -- adding a
fourth gate on `last_report()$match_analysis$match_rate` covers that case
when unmatched keys count as failures for the job at hand. And it validates
keys only: a corrupted `amount` column is invisible to every check here, which
is why these gates complement value-level validation tools rather than replace
them.

The pattern stops scaling somewhere around a dozen joins. At that point the
gate blocks become copy-paste boilerplate, the thresholds belong in a config
file, and the per-join logic wants to be a function taking two tables, a key,
and an expected cardinality. Past that, a pipeline framework such as `targets`
is the better skeleton, with these same checks living inside each node. The
log file also grows without bound under a daily schedule; naming it by date
(`sprintf("enrich-%s.log", Sys.Date())`) keeps each day's audit trail separate
and bounded.

## The Cost of Diagnostics

Every gate adds runtime, and on a 50,000-row join it is worth seeing how much.
We time the bare join, the instrumented wrapper, and the cheap gate on the
tables from the sampling section:

```{r}
t_merge <- system.time(
  merge(big_orders, big_customers, by = "customer_id", all.x = TRUE)
)
t_spy <- system.time(
  left_join_spy(big_orders, big_customers, by = "customer_id", .quiet = TRUE)
)
t_check <- system.time(
  key_check(big_orders, big_customers, by = "customer_id", warn = FALSE)
)

rbind(merge = t_merge, left_join_spy = t_spy, key_check = t_check)[, 1:3]
```

The gap between `merge()` and `left_join_spy()` is the full diagnostic: key
summaries, match analysis, row-count prediction, and the string scans. The
scans are vectorized, and the one quadratic piece -- near-match detection --
is capped at 50 unmatched keys against 100 candidates regardless of table
size, so the diagnostic cost grows roughly linearly with the number of unique
keys. `key_check()` runs the cheap subset only, which is why it can sit in
front of every join in a script.

When the elapsed column stops looking ignorable -- tables in the tens of
millions of rows, or a join inside a tight loop -- the fallback order is:
keep `key_check()` on every run, move the full `join_spy()` diagnostic to a
`sample =` run, and reserve the unsampled analysis for a weekly job or for the
morning after an alert. The numbers above are the budget conversation in
miniature: the gate costs a fraction of the join it protects, and the sampling
section shows how to hold that fraction steady as the tables grow.
