Data Quality Checks Every Pipeline Should Have [2026 Guide]
Six categories of data quality checks with SQL and PySpark examples, plus a reusable Python framework to run them automatically after every pipeline load.
Your pipeline ran successfully. Zero errors. Green checkmarks everywhere. But the data it produced is wrong — duplicate records, null values where there shouldn't be any, timestamps from the future, and revenue numbers that are off by a factor of ten.
A pipeline that completes without errors is not the same as a pipeline that produces correct data. I've learned this the hard way more than once. The fix isn't complicated — it's building data quality checks directly into your pipeline so bad data gets caught before it reaches your dashboards and reports.
Here are the checks I run on every production pipeline, with SQL and PySpark examples you can copy straight into your Databricks notebooks.
1. Row count checks
The simplest and most effective check. Compare the number of rows you loaded against what you expected.
Check: row count is not zero
A pipeline that runs but produces zero rows is almost always broken. Catch it immediately:
SELECT
CASE WHEN COUNT(*) = 0
THEN 'FAIL: Table is empty after load'
ELSE 'PASS: ' || COUNT(*) || ' rows loaded'
END AS row_count_check
FROM target_table
WHERE load_date = current_date()
Check: row count is within expected range
If your table normally gets 10,000-15,000 rows per day, a load of 500 rows or 500,000 rows both signal a problem:
expected_min = 8000
expected_max = 20000
actual_count = spark.table("target_table") \
.filter(f"load_date = '{today}'") \
.count()
if actual_count < expected_min or actual_count > expected_max:
raise ValueError(
f"Row count {actual_count} outside expected range "
f"[{expected_min}, {expected_max}]"
)
Check: row count didn't drop compared to yesterday
A sudden drop in row count usually means your source stopped sending data or your extraction filter is wrong:
WITH daily_counts AS (
SELECT
load_date,
COUNT(*) AS row_count,
LAG(COUNT(*)) OVER (ORDER BY load_date) AS prev_count
FROM target_table
GROUP BY load_date
)
SELECT
load_date,
row_count,
prev_count,
CASE
WHEN prev_count IS NOT NULL
AND row_count < prev_count * 0.5
THEN 'FAIL: Row count dropped by more than 50%'
ELSE 'PASS'
END AS drop_check
FROM daily_counts
WHERE load_date = current_date()
📄 Get the complete quality framework
The Data Quality Monitoring Playbook has the full PySpark validation function, all 6 check categories, alerting patterns, threshold recommendations, and a tool comparison. $4.99
2. Null checks
Null values in columns that should never be null are the second most common data quality issue I see.
Check: critical columns are not null
Define which columns are business-critical and check them after every load:
SELECT
SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) AS null_order_ids,
SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) AS null_customer_ids,
SUM(CASE WHEN order_date IS NULL THEN 1 ELSE 0 END) AS null_order_dates,
SUM(CASE WHEN amount IS NULL THEN 1 ELSE 0 END) AS null_amounts
FROM orders_table
WHERE load_date = current_date()
If any of these return a number greater than zero, investigate before your stakeholders find out.
PySpark version with automatic failure
critical_columns = ["order_id", "customer_id", "order_date", "amount"]
df = spark.table("orders_table").filter(f"load_date = '{today}'")
for col_name in critical_columns:
null_count = df.filter(f"{col_name} IS NULL").count()
if null_count > 0:
raise ValueError(
f"FAIL: {null_count} null values found in {col_name}"
)
print(f"PASS: {col_name} has no nulls")
3. Duplicate checks
If you've read my article on fixing duplicate records in Delta tables, you know how much damage duplicates can cause. Don't wait to find them manually — check on every load.
Check: no duplicates on business key
SELECT
CASE
WHEN COUNT(*) > COUNT(DISTINCT order_id)
THEN 'FAIL: ' || (COUNT(*) - COUNT(DISTINCT order_id)) || ' duplicate order_ids'
ELSE 'PASS: No duplicates on order_id'
END AS duplicate_check
FROM orders_table
WHERE load_date = current_date()
Check: no duplicates on composite key
For tables with composite business keys, check the full combination:
SELECT COUNT(*) AS duplicate_count
FROM (
SELECT customer_id, order_date, product_id, COUNT(*) AS cnt
FROM orders_table
WHERE load_date = current_date()
GROUP BY customer_id, order_date, product_id
HAVING COUNT(*) > 1
)
If duplicate_count is greater than zero, your MERGE key might be incomplete — exactly the issue I covered in my duplicates article.
4. Range and validity checks
Data can be non-null and non-duplicate but still wrong. A negative revenue number, a birthdate in the year 2099, or a discount percentage of 500% are all technically valid data types but obviously incorrect.
Check: numeric values are within valid ranges
SELECT
SUM(CASE WHEN amount < 0 THEN 1 ELSE 0 END) AS negative_amounts,
SUM(CASE WHEN amount > 1000000 THEN 1 ELSE 0 END) AS suspiciously_large,
SUM(CASE WHEN discount_pct < 0 OR discount_pct > 100 THEN 1 ELSE 0 END) AS invalid_discounts
FROM orders_table
WHERE load_date = current_date()
Check: dates are within expected range
Timestamps from the future or the distant past are almost always data errors:
SELECT
SUM(CASE WHEN order_date > current_date() THEN 1 ELSE 0 END) AS future_dates,
SUM(CASE WHEN order_date < '2020-01-01' THEN 1 ELSE 0 END) AS ancient_dates
FROM orders_table
WHERE load_date = current_date()
Check: categorical values are valid
If a column should only contain specific values, validate them:
SELECT DISTINCT status
FROM orders_table
WHERE load_date = current_date()
AND status NOT IN ('pending', 'confirmed', 'shipped', 'delivered', 'cancelled')
Any rows returned are invalid status values that need investigation.
5. Freshness checks
A table that hasn't been updated in 3 days is stale, and any dashboard built on it is showing outdated numbers.
Check: table was updated recently
SELECT
MAX(load_date) AS last_load,
DATEDIFF(current_date(), MAX(load_date)) AS days_since_update,
CASE
WHEN DATEDIFF(current_date(), MAX(load_date)) > 1
THEN 'FAIL: Table is stale — last updated ' || MAX(load_date)
ELSE 'PASS: Table is fresh'
END AS freshness_check
FROM orders_table
Check: source data is not lagging
Sometimes your pipeline runs fine but the source data itself is behind:
SELECT
MAX(event_timestamp) AS latest_event,
TIMESTAMPDIFF(HOUR, MAX(event_timestamp), current_timestamp()) AS hours_behind,
CASE
WHEN TIMESTAMPDIFF(HOUR, MAX(event_timestamp), current_timestamp()) > 6
THEN 'FAIL: Source data is more than 6 hours behind'
ELSE 'PASS: Source data is current'
END AS lag_check
FROM events_table
6. Schema checks
Schema drift — when columns are added, removed, or change types unexpectedly — can silently break downstream queries.
Check: expected columns exist
expected_columns = [
"order_id", "customer_id", "order_date",
"amount", "status", "load_date"
]
actual_columns = [col.name for col in spark.table("orders_table").schema]
missing = set(expected_columns) - set(actual_columns)
if missing:
raise ValueError(f"FAIL: Missing columns: {missing}")
unexpected = set(actual_columns) - set(expected_columns)
if unexpected:
print(f"WARNING: Unexpected new columns: {unexpected}")
Check: column types haven't changed
expected_types = {
"order_id": "string",
"amount": "double",
"order_date": "date",
}
for col in spark.table("orders_table").schema:
if col.name in expected_types:
actual_type = col.dataType.simpleString()
expected_type = expected_types[col.name]
if actual_type != expected_type:
raise ValueError(
f"FAIL: {col.name} type changed "
f"from {expected_type} to {actual_type}"
)
Putting it all together: a reusable quality check framework
Instead of copying these checks into every notebook, build a simple reusable framework:
class DataQualityChecker:
def __init__(self, table_name, load_date=None):
self.table = table_name
self.date = load_date or str(date.today())
self.results = []
def check_row_count(self, min_rows=1, max_rows=None):
count = spark.table(self.table) \
.filter(f"load_date = '{self.date}'").count()
if count < min_rows:
self.results.append(f"FAIL: Only {count} rows (min: {min_rows})")
elif max_rows and count > max_rows:
self.results.append(f"FAIL: {count} rows (max: {max_rows})")
else:
self.results.append(f"PASS: {count} rows loaded")
def check_no_nulls(self, columns):
df = spark.table(self.table).filter(f"load_date = '{self.date}'")
for col in columns:
nulls = df.filter(f"{col} IS NULL").count()
if nulls > 0:
self.results.append(f"FAIL: {nulls} nulls in {col}")
else:
self.results.append(f"PASS: No nulls in {col}")
def check_no_duplicates(self, key_columns):
keys = ", ".join(key_columns)
dupes = spark.sql(f"""
SELECT COUNT(*) as cnt FROM (
SELECT {keys}, COUNT(*) as c
FROM {self.table}
WHERE load_date = '{self.date}'
GROUP BY {keys} HAVING COUNT(*) > 1
)
""").collect()[0]["cnt"]
if dupes > 0:
self.results.append(f"FAIL: {dupes} duplicate keys on ({keys})")
else:
self.results.append(f"PASS: No duplicates on ({keys})")
def report(self):
failures = [r for r in self.results if r.startswith("FAIL")]
print(f"\n{'='*50}")
print(f"Data Quality Report: {self.table}")
print(f"Load Date: {self.date}")
print(f"{'='*50}")
for r in self.results:
print(r)
print(f"\n{len(self.results)} checks run, {len(failures)} failures")
if failures:
raise ValueError(f"{len(failures)} quality checks failed")
Using the framework
dq = DataQualityChecker("my_database.orders_table")
dq.check_row_count(min_rows=5000, max_rows=50000)
dq.check_no_nulls(["order_id", "customer_id", "amount"])
dq.check_no_duplicates(["order_id"])
dq.report()
This gives you a clean summary after every pipeline run:
==================================================
Data Quality Report: my_database.orders_table
Load Date: 2026-03-15
==================================================
PASS: 12,847 rows loaded
PASS: No nulls in order_id
PASS: No nulls in customer_id
PASS: No nulls in amount
PASS: No duplicates on (order_id)
5 checks run, 0 failures
Key takeaways
Data quality checks aren't optional — they're the difference between "the pipeline ran" and "the data is correct." Start with these six categories and you'll catch 90% of data issues before they reach production dashboards:
- Row counts — is the right amount of data arriving?
- Null checks — are critical fields populated?
- Duplicate checks — is each record unique?
- Range checks — are values within valid bounds?
- Freshness checks — is the data current?
- Schema checks — has the structure changed?
Build them into a reusable framework, run them after every pipeline execution, and pipe failures into Slack or email so you find issues before your stakeholders do.
For the maintenance side of keeping your Delta tables healthy after these quality checks, see my guide on OPTIMIZE, Z-ORDER, and VACUUM. And if you want to run these checks on a self-hosted pipeline for $6/month on DigitalOcean, here's my guide on running scheduled Python ETL pipelines on a VPS.
Want all 20 Delta table troubleshooting patterns in one place? Grab the Delta Table Troubleshooting Checklist — copy-paste diagnostic queries for every issue, just $9.
Want more practical data engineering guides? Subscribe to PipelinePulse — I publish hands-on tutorials based on real production experience, not textbook theory.