Documentation Index
Fetch the complete documentation index at: https://openmetadata-feat-feat-2mbfixtestexui.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
DataFrame Validation
The DataFrameValidator class enables you to validate pandas DataFrames directly within your ETL workflows, before data reaches its destination. This allows you to catch data quality issues early, preventing bad data from contaminating your data warehouse or analytics systems.
Overview
DataFrame validation is ideal for:
- Validating transformed data before loading to destinations
- Processing large datasets in chunks with memory efficiency
- Short-circuiting ETL pipelines on validation failures
- Providing immediate feedback during data transformations
- Publishing validation results back to OpenMetadata
Basic Usage
Creating a Validator
from metadata.sdk import configure
from metadata.sdk.data_quality.dataframes.dataframe_validator import DataFrameValidator
# Configure SDK
configure(
host="http://localhost:8585/api",
jwt_token="your-jwt-token"
)
# Create validator
validator = DataFrameValidator()
Adding Tests
Add test definitions to validate your DataFrame:
from metadata.sdk.data_quality import (
ColumnValuesToBeNotNull,
ColumnValuesToBeUnique,
ColumnValuesToBeBetween
)
# Add column-level tests
validator.add_test(
ColumnValuesToBeNotNull(column="email")
)
validator.add_test(
ColumnValuesToBeUnique(column="customer_id")
)
validator.add_test(
ColumnValuesToBeBetween(column="age", min_value=18, max_value=120)
)
Validating a DataFrame
import pandas as pd
# Load or create your DataFrame
df = pd.read_csv("customers.csv")
# Validate the DataFrame
result = validator.validate(df)
# Check if validation passed
if result.success:
print("✓ Validation passed - safe to load data")
load_to_warehouse(df)
else:
print("✗ Validation failed")
for test_case, test_result in result.test_cases_and_results:
if test_result.testCaseStatus != "Success":
print(f" - {test_case.name.root}: {test_result.result}")
Complete ETL Example
Here’s a complete example of validating transformed data in an ETL pipeline:
import pandas as pd
from sqlalchemy import create_engine
from metadata.sdk import configure
from metadata.sdk.data_quality.dataframes.dataframe_validator import DataFrameValidator
from metadata.sdk.data_quality import (
ColumnValuesToBeNotNull,
ColumnValuesToBeUnique,
ColumnValuesToBeBetween,
ColumnValuesToMatchRegex
)
# Configure SDK
configure(
host="http://localhost:8585/api",
jwt_token="your-jwt-token"
)
# Extract: Read source data
df = pd.read_csv("raw_customers.csv")
# Transform: Clean and enrich data
df["email"] = df["email"].str.lower().str.strip()
df["created_at"] = pd.to_datetime(df["created_at"])
df = df.dropna(subset=["customer_id"])
# Validate: Create and configure validator
validator = DataFrameValidator()
validator.add_tests(
ColumnValuesToBeNotNull(column="customer_id"),
ColumnValuesToBeNotNull(column="email"),
ColumnValuesToBeUnique(column="customer_id"),
ColumnValuesToBeBetween(column="age", min_value=0, max_value=120),
ColumnValuesToMatchRegex(
column="email",
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
)
# Run validation
result = validator.validate(df)
# Load: Only load if validation passes
if result.success:
engine = create_engine("postgresql://user:pass@localhost/warehouse")
df.to_sql("customers", engine, if_exists="replace", index=False)
print(f"✓ Loaded {len(df)} records successfully")
else:
print("✗ Data quality validation failed. Data not loaded.")
# Optionally publish results to OpenMetadata
result.publish("Postgres.warehouse.public.customers")
Instead of defining tests in code, load tests that are configured in OpenMetadata:
from metadata.sdk.data_quality.dataframes.dataframe_validator import DataFrameValidator
# Create validator
validator = DataFrameValidator()
# Load all tests defined in OpenMetadata for a specific table
validator.add_openmetadata_table_tests("BigQuery.analytics.staging.customers")
# Validate DataFrame against those tests
result = validator.validate(df)
if result.success:
load_to_destination(df)
# Optionally publish results to OpenMetadata, recommended so all data stakeholders are up to date
result.publish("Postgres.warehouse.public.customers")
This approach enables:
- Separation of concerns: Data stewards define quality criteria in UI, engineers execute in code
- Dynamic test updates: Test criteria changes don’t require code deployments
- Consistency: Same tests used for table validation and DataFrame validation
Next Steps
Chunk-Based Validation
Validate large DataFrames in memory-efficient chunks with transactional safety and automatic failure handling.