YAML Contracts & Pipelines

Contracts and pipelines are designed to be data, not code. They serialize cleanly to YAML, making them useful for version control, team collaboration, and configuration-driven workflows. This guide covers how to write, load, and use YAML-based contracts and pipelines.

When you define a contract in Python, it lives inside your application code. That’s fine for quick validation tasks, but in production data platforms you often want the specification of what data should look like to live separately from the code that processes it. YAML contracts give you exactly that separation with a human-readable, machine-parseable definition of data expectations that anyone on your team can read, review, and modify.

Why YAML?

YAML contracts offer several advantages over defining contracts purely in Python:

  • Version-controllable: track contract changes alongside your code in Git, with clear diffs when expectations evolve
  • Language-agnostic: contracts can be understood (and even authored) by non-Python team members such as data analysts, product managers, or compliance officers
  • Separates concerns: validation rules live apart from transform logic, meaning you can update expectations without touching application code
  • CI/CD friendly: load and enforce contracts in automated pipelines and gate deployments on contract compliance
  • Reviewable: pull requests that change data contracts are easy to review because the YAML format highlights exactly what changed in a readable way

The YAML approach doesn’t replace the Python API but, rather, it complements it. Use YAML for stable, well-defined contracts that change infrequently, and Python for exploratory validation or complex logic that benefits from programmatic construction.

Contract YAML Format

Every contract YAML file uses a contract: top-level key as its root element. Inside this key you define all the metadata, schema, and validation steps that make up your contract. The structure is designed to be self-documenting so someone reading the file should immediately understand what data this contract governs, who owns it, and what rules are enforced.

Here’s a complete example showing all the available fields:

contract:
  name: customer_records
  direction: source
  version: "1.2.0"
  owner: data-platform-team
  consumers:
    - analytics-team
    - ml-team
  description: "Raw customer data from the CRM system"
  on_violation: warn

  schema:
    customer_id: String
    email: String
    name: String
    signup_date: String
    plan: String

  steps:
    - col_vals_not_null:
        columns:
          - customer_id
          - email
    - col_vals_regex:
        columns: email
        pattern: "^[^@]+@[^@]+\\.[^@]+$"
    - rows_distinct:
        columns_subset:
          - customer_id
    - col_vals_in_set:
        columns: plan
        set:
          - free
          - pro
          - enterprise

  thresholds:
    warning: 0.01
    error: 0.05
    critical: 0.10

Structure Reference

The table below summarizes every key available inside the contract: block. Only name is required and everything else has sensible defaults or can be omitted if not applicable to your use case.

Key Required Description
name Yes Human-readable identifier for this contract
direction No "source" (default) or "target": indicates where in a pipeline this contract applies
version No Semantic version string (e.g., "1.2.0") for tracking contract evolution over time
owner No Team or individual responsible for maintaining this contract
consumers No List of teams or systems that depend on data conforming to this contract
description No Longer prose describing the contract’s purpose and context
on_violation No Behavior when validation fails: "warn" (default), "raise", or "log"
schema No Column name -> data type mapping; triggers a col_schema_match() check
steps No Ordered list of validation steps to execute
thresholds No Threshold levels (warning, error, critical) as fractions or absolute counts

Step Syntax in YAML

Steps are the heart of a contract. They define the actual validation rules that will be checked against your data. In YAML, each step is a list item whose key is the name of a Pointblank validation method (like col_vals_gt or rows_distinct), and whose value is a dictionary of the arguments to pass to that method.

This mapping is direct: the YAML step col_vals_gt: {columns: amount, value: 0} is exactly equivalent to calling .col_vals_gt(columns="amount", value=0) in Python. Here are examples covering the most common patterns:

steps:
  # Step with keyword arguments
  - col_vals_gt:
      columns: amount
      value: 0

  # Step with a list of columns
  - col_vals_not_null:
      columns:
        - id
        - name
        - email

  # Step with multiple arguments
  - col_vals_between:
      columns: age
      left: 0
      right: 150

  # Step with a set/list of values
  - col_vals_in_set:
      columns: status
      set:
        - active
        - inactive
        - suspended

  # Step with no arguments
  - rows_complete: {}

Writing and Loading Contracts

Pointblank supports a complete round-trip workflow: define contracts in Python, save them to YAML for version control and sharing, then load them back in any environment. This section walks through each step of that workflow.

Saving to YAML

The .to_yaml() method serializes a Contract object to a YAML string. You can optionally pass a file path to write directly to disk. The resulting YAML is clean, readable, and contains all the information needed to reconstruct the contract exactly.

import pointblank as pb
from pathlib import Path

# Create a contract in Python
contract = pb.Contract(
    name="product_catalog",
    direction="source",
    version="1.0.0",
    owner="catalog-team",
    schema=pb.Schema(
        product_id="String",
        name="String",
        price="Float64",
        category="String",
        in_stock="Boolean",
    ),
    steps=[
        pb.Step("col_vals_not_null", columns=["product_id", "name", "price"]),
        pb.Step("col_vals_gt", columns="price", value=0),
        pb.Step("col_vals_in_set", columns="category", set=["electronics", "clothing", "food", "home"]),
        pb.Step("rows_distinct", columns_subset=["product_id"]),
    ],
    thresholds=pb.Thresholds(warning=0.01, error=0.05),
)

# Save to YAML
yaml_output = contract.to_yaml("product_contract.yaml")
print(yaml_output)
contract:
  name: product_catalog
  version: 1.0.0
  owner: catalog-team
  schema:
    product_id: String
    name: String
    price: Float64
    category: String
    in_stock: Boolean
  steps:
  - col_vals_not_null:
      columns:
      - product_id
      - name
      - price
  - col_vals_gt:
      columns: price
      value: 0
  - col_vals_in_set:
      columns: category
      set:
      - electronics
      - clothing
      - food
      - home
  - rows_distinct:
      columns_subset:
      - product_id
  thresholds:
    warning: 0.01
    error: 0.05

Loading from YAML

The Contract.from_yaml() class method reads a YAML file and reconstructs the full Contract object, including schema, steps, thresholds, and all metadata. The loaded contract is identical in behavior to one created directly in Python (there’s no functional difference).

# Load the contract back
loaded_contract = pb.Contract.from_yaml("product_contract.yaml")
print(loaded_contract)
print(f"Steps: {len(loaded_contract.steps)}")
print(f"Version: {loaded_contract.version}")
Contract(name='product_catalog', direction='source', version='1.0.0', schema=<defined>, steps=4)
Steps: 4
Version: 1.0.0

Validating with a Loaded Contract

Once loaded, a contract works exactly like one defined in Python. Call .validate(data) to compile it into a Validate object, execute all checks, and get the standard Pointblank validation report. This is the core value proposition of YAML contracts: define once, validate anywhere.

import polars as pl

products = pl.DataFrame(
    {
        "product_id": ["P001", "P002", "P003", "P004", "P005"],
        "name": ["Laptop", "T-Shirt", "Coffee Beans", "Desk Lamp", "Headphones"],
        "price": [999.99, 29.99, 12.50, 45.00, 79.99],
        "category": ["electronics", "clothing", "food", "home", "electronics"],
        "in_stock": [True, True, False, True, True],
    }
)

loaded_contract.validate(products)
Pointblank Validation
Contract: product_catalog v1.0.0
Polarsproduct_catalogWARNING0.01ERROR0.05CRITICAL
STEP COLUMNS VALUES TBL EVAL UNITS PASS FAIL W E C EXT
#4CA64C 1
col_schema_match
col_schema_match()
SCHEMA 1 1
1.00
0
0.00
#4CA64C 2
col_vals_not_null
col_vals_not_null()
product_id 5 5
1.00
0
0.00
#4CA64C 3
col_vals_not_null
col_vals_not_null()
name 5 5
1.00
0
0.00
#4CA64C 4
col_vals_not_null
col_vals_not_null()
price 5 5
1.00
0
0.00
#4CA64C 5
col_vals_gt
col_vals_gt()
price 0 5 5
1.00
0
0.00
#4CA64C 6
col_vals_in_set
col_vals_in_set()
category electronics, clothing, food, home 5 5
1.00
0
0.00
#4CA64C 7
rows_distinct
rows_distinct()
product_id 5 5
1.00
0
0.00
Owner: catalog-teamVersion: 1.0.0

Notes

Step 1 (schema_check) Schema validation passed.

Schema Comparison
TARGET EXPECTED
COLUMN DATA TYPE COLUMN DATA TYPE
1 product_id String 1 product_id String
2 name String 2 name String
3 price Float64 3 price Float64
4 category String 4 category String
5 in_stock Boolean 5 in_stock Boolean
Supplied Column Schema:
[('product_id', 'String'), ('name', 'String'), ('price', 'Float64'), ('category', 'String'), ('in_stock', 'Boolean')]
Schema Match Settings
COMPLETE
IN ORDER
COLUMN ≠ column
DTYPE ≠ dtype
float ≠ float64

Pipeline YAML Format

While individual contracts validate data at a single point, pipelines enforce quality at both the input and output boundaries of a transformation. A pipeline YAML file packages the source contract, target contract, and pipeline-level settings (like global thresholds and short-circuit behavior) into a single, self-contained document.

This is especially powerful for team workflows: a data engineer defines the pipeline specification in YAML, commits it to version control, and any downstream system can load and execute it without knowing anything about the implementation details.

Let’s build a pipeline in Python and export it to see what the YAML looks like:

# Create a pipeline and export to YAML
source = pb.Contract(
    name="raw_events",
    direction="source",
    version="1.0.0",
    schema=pb.Schema(
        event_id="String",
        user_id="String",
        event_type="String",
        value_cents="Int64",
    ),
    steps=[
        pb.Step("col_vals_not_null", columns=["event_id", "user_id", "event_type"]),
        pb.Step("col_vals_ge", columns="value_cents", value=0),
        pb.Step("col_vals_in_set", columns="event_type", set=["click", "purchase", "view", "signup"]),
        pb.Step("rows_distinct", columns_subset=["event_id"]),
    ],
)

target = pb.Contract(
    name="enriched_events",
    direction="target",
    version="1.0.0",
    schema=pb.Schema(
        event_id="String",
        user_id="String",
        event_type="String",
        value="Float64",
    ),
    steps=[
        pb.Step("col_vals_not_null", columns=["event_id", "user_id", "event_type", "value"]),
        pb.Step("col_vals_gt", columns="value", value=0),
        pb.Step("rows_distinct", columns_subset=["event_id"]),
    ],
)

pipeline = pb.Pipeline(
    source=source,
    target=target,
    label="Event Processing Pipeline",
    thresholds=pb.Thresholds(warning=0.02, error=0.10),
)

pipeline_yaml = pipeline.to_yaml("event_pipeline.yaml")
print(pipeline_yaml)
pipeline:
  label: Event Processing Pipeline
  thresholds:
    warning: 0.02
    error: 0.1
source:
  name: raw_events
  version: 1.0.0
  schema:
    event_id: String
    user_id: String
    event_type: String
    value_cents: Int64
  steps:
  - col_vals_not_null:
      columns:
      - event_id
      - user_id
      - event_type
  - col_vals_ge:
      columns: value_cents
      value: 0
  - col_vals_in_set:
      columns: event_type
      set:
      - click
      - purchase
      - view
      - signup
  - rows_distinct:
      columns_subset:
      - event_id
target:
  name: enriched_events
  direction: target
  version: 1.0.0
  schema:
    event_id: String
    user_id: String
    event_type: String
    value: Float64
  steps:
  - col_vals_not_null:
      columns:
      - event_id
      - user_id
      - event_type
      - value
  - col_vals_gt:
      columns: value
      value: 0
  - rows_distinct:
      columns_subset:
      - event_id

Pipeline YAML Structure

The generated YAML has three top-level sections that correspond to the three components of boundary enforcement: the pipeline: section for global settings, the source: section for the inbound contract, and the target: section for the outbound contract. Each section can be understood independently, which makes reviewing changes straightforward.

Here’s the annotated structure:

# Pipeline metadata and global settings
pipeline:
  label: "Event Processing Pipeline"
  thresholds:
    warning: 0.02
    error: 0.10
  # short_circuit: true   (default, omitted when true)

# Source contract (inbound boundary)
source:
  name: raw_events
  version: "1.0.0"
  schema:
    event_id: String
    user_id: String
    event_type: String
    value_cents: Int64
  steps:
    - col_vals_not_null:
        columns: [event_id, user_id, event_type]
    - col_vals_ge:
        columns: value_cents
        value: 0

# Target contract (outbound boundary)
target:
  name: enriched_events
  version: "1.0.0"
  schema:
    event_id: String
    user_id: String
    event_type: String
    value: Float64
  steps:
    - col_vals_not_null:
        columns: [event_id, user_id, event_type, value]
    - col_vals_gt:
        columns: value
        value: 0

Loading and Running a Pipeline from YAML

Loading a pipeline from YAML works the same way as loading a contract: call the from_yaml() class method with the file path. The returned Pipeline object is fully functional and ready to validate data or run a complete boundary enforcement workflow.

# Load the pipeline from YAML
loaded_pipeline = pb.Pipeline.from_yaml("event_pipeline.yaml")
print(loaded_pipeline)
Pipeline(
  label='Event Processing Pipeline',
  source='raw_events',
  target='enriched_events',
  short_circuit=True
)

With the pipeline loaded, you provide data and a transform function just as you would with a pipeline defined in Python. The YAML only stores the contract specifications whereas the transform logic always lives in your Python code (since transforms are arbitrary functions that can’t be meaningfully serialized to YAML).

# Create test data and run the pipeline
events = pl.DataFrame(
    {
        "event_id": ["EVT-001", "EVT-002", "EVT-003", "EVT-004"],
        "user_id": ["U100", "U200", "U100", "U300"],
        "event_type": ["purchase", "click", "view", "signup"],
        "value_cents": [2500, 0, 0, 0],
    }
)

def enrich_events(df: pl.DataFrame) -> pl.DataFrame:
    """Convert cents to dollars, defaulting zero to a minimum value."""
    return df.with_columns(
        pl.when(pl.col("value_cents") > 0)
        .then(pl.col("value_cents") / 100)
        .otherwise(0.01)  # Minimum trackable value
        .alias("value")
    ).drop("value_cents")

result = loaded_pipeline.run(data=events, transform=enrich_events)
print(f"Pipeline passed: {result.passed}")
Pipeline passed: True
result.target_validation
Pointblank Validation
Event Processing Pipeline [target]
Polarsenriched_eventsWARNING0.02ERROR0.1CRITICAL
STEP COLUMNS VALUES TBL EVAL UNITS PASS FAIL W E C EXT
#4CA64C 1
col_schema_match
col_schema_match()
SCHEMA 1 1
1.00
0
0.00
#4CA64C 2
col_vals_not_null
col_vals_not_null()
event_id 4 4
1.00
0
0.00
#4CA64C 3
col_vals_not_null
col_vals_not_null()
user_id 4 4
1.00
0
0.00
#4CA64C 4
col_vals_not_null
col_vals_not_null()
event_type 4 4
1.00
0
0.00
#4CA64C 5
col_vals_not_null
col_vals_not_null()
value 4 4
1.00
0
0.00
#4CA64C 6
col_vals_gt
col_vals_gt()
value 0 4 4
1.00
0
0.00
#4CA64C 7
rows_distinct
rows_distinct()
event_id 4 4
1.00
0
0.00
Version: 1.0.0

Notes

Step 1 (schema_check) Schema validation passed.

Schema Comparison
TARGET EXPECTED
COLUMN DATA TYPE COLUMN DATA TYPE
1 event_id String 1 event_id String
2 user_id String 2 user_id String
3 event_type String 3 event_type String
4 value Float64 4 value Float64
Supplied Column Schema:
[('event_id', 'String'), ('user_id', 'String'), ('event_type', 'String'), ('value', 'Float64')]
Schema Match Settings
COMPLETE
IN ORDER
COLUMN ≠ column
DTYPE ≠ dtype
float ≠ float64

Practical Patterns

The following patterns illustrate common ways teams use YAML contracts and pipelines in real-world data platforms. These aren’t exhaustive: they’re starting points that you can adapt to your own workflow.

Pattern 1: Environment-Specific Pipelines

A common need is to apply different tolerance levels in different environments. In development, you might accept 20% failure rates while iterating on a transform. In production, anything above 1% is a critical issue. YAML contracts make this easy: store the same validation logic in a shared contract, but load different pipeline configurations that set environment-appropriate thresholds.

# Same contracts, different thresholds
dev_pipeline = pb.Pipeline(
    source=source,
    target=target,
    thresholds=pb.Thresholds(warning=0.20, error=0.50),  # Lenient for dev
    label="DEV: Event Pipeline",
)

prod_pipeline = pb.Pipeline(
    source=source,
    target=target,
    thresholds=pb.Thresholds(warning=0.01, error=0.05, critical=0.10),  # Strict
    label="PROD: Event Pipeline",
)

# In practice, you'd select based on an env var:
# import os
# pipeline = pb.Pipeline.from_yaml(f"contracts/{os.environ['ENV']}_pipeline.yaml")

Pattern 2: Contract Library

As your data platform grows, you’ll accumulate many contracts. A recommended practice is to organize them into a directory structure that mirrors your data architecture. This creates a discoverable, browsable “library” of data quality expectations that serves as living documentation of your system’s data contracts.

Here’s a typical layout:

contracts/
├── sources/
│   ├── raw_events.yaml
│   ├── raw_transactions.yaml
│   └── raw_customers.yaml
├── targets/
│   ├── clean_events.yaml
│   ├── clean_transactions.yaml
│   └── clean_customers.yaml
└── pipelines/
    ├── event_pipeline.yaml
    └── transaction_pipeline.yaml
# Load any contract from the library
events_source = pb.Contract.from_yaml("contracts/sources/raw_events.yaml")
events_target = pb.Contract.from_yaml("contracts/targets/clean_events.yaml")

# Build pipeline dynamically
pipeline = pb.Pipeline(source=events_source, target=events_target)

Pattern 3: CI/CD Integration

YAML contracts integrate naturally into CI/CD pipelines. Because contracts are plain files, you can load them in test suites, gate deployments on contract compliance, and fail builds when data quality regresses. This turns data quality from a manual spot-check into an automated, enforceable standard.

Here’s what a pytest-based contract test might look like:

# In your test suite or CI script:
import pointblank as pb

def test_pipeline_contract():
    pipeline = pb.Pipeline.from_yaml("contracts/pipelines/event_pipeline.yaml")
    test_data = load_test_fixture("events_sample.parquet")

    result = pipeline.run(data=test_data, transform=my_transform)
    assert result.passed, result.get_report()

Pattern 4: Composing Steps from Multiple Sources

Sometimes you want to define a set of “base” validation rules that apply to all tables (like “every table must have a non-null id column that’s unique”), and then layer on table-specific rules. YAML makes this easy because you can parse step definitions from YAML fragments and combine them programmatically.

This pattern is especially useful when you have organization-wide data quality standards that every contract should inherit.

# Base steps that apply to all tables
base_steps_yaml = """
- col_vals_not_null:
    columns: [id]
- rows_distinct:
    columns_subset: [id]
"""

import yaml
base_steps = [pb.Step.from_dict(s) for s in yaml.safe_load(base_steps_yaml)]

# Compose with table-specific steps
full_contract = pb.Contract(
    name="composed_contract",
    steps=base_steps + [
        pb.Step("col_vals_gt", columns="score", value=0),
        pb.Step("col_vals_le", columns="score", value=100),
    ],
)

print(f"Total steps: {len(full_contract.steps)}")
for step in full_contract.steps:
    print(f"  - {step}")
Total steps: 4
  - Step('col_vals_not_null', columns=['id'])
  - Step('rows_distinct', columns_subset=['id'])
  - Step('col_vals_gt', columns='score', value=0)
  - Step('col_vals_le', columns='score', value=100)

Round-Trip Fidelity

A critical property of Pointblank’s YAML serialization is round-trip fidelity: when you save a contract to YAML and load it back, you get an identical object. No information is lost, no defaults are silently changed, and no steps are reordered. This guarantee means you can confidently use YAML as your source of truth without worrying about subtle differences between the YAML representation and the in-memory object.

Let’s verify this with a fully-specified contract:

# Original contract
original = pb.Contract(
    name="fidelity_test",
    direction="target",
    version="3.0.0",
    owner="qa-team",
    consumers=["reporting", "dashboards"],
    on_violation="raise",
    schema=pb.Schema(x="Int64", y="Float64", z="String"),
    steps=[
        pb.Step("col_vals_not_null", columns=["x", "y", "z"]),
        pb.Step("col_vals_between", columns="y", left=-1.0, right=1.0),
    ],
    thresholds=pb.Thresholds(warning=0.01, error=0.05, critical=0.10),
)

# Round-trip through YAML
original.to_yaml("fidelity_test.yaml")
restored = pb.Contract.from_yaml("fidelity_test.yaml")

# Verify fidelity
print(f"Name matches: {original.name == restored.name}")
print(f"Direction matches: {original.direction == restored.direction}")
print(f"Version matches: {original.version == restored.version}")
print(f"Owner matches: {original.owner == restored.owner}")
print(f"Steps match: {original.steps == restored.steps}")
print(f"Thresholds match: {original.thresholds.warning == restored.thresholds.warning}")
Name matches: True
Direction matches: True
Version matches: True
Owner matches: True
Steps match: True
Thresholds match: True

Cleanup

The examples in this guide created several YAML files on disk to demonstrate the serialization workflow. The following hidden cell removes them so they don’t clutter your working directory. In your own projects, these files would live in version control and persist across sessions. You’d never need to clean them up like this.

Conclusion

The table below summarizes the key operations and how they map between the Python API and YAML:

Operation Python API YAML
Define contract pb.Contract(...) contract: block
Define pipeline pb.Pipeline(...) pipeline: + source: + target:
Save .to_yaml("path.yaml")
Load contract pb.Contract.from_yaml("path.yaml")
Load pipeline pb.Pipeline.from_yaml("path.yaml")
Validate .validate(data) Load -> validate in Python
Run pipeline .run(data, transform) Load -> run in Python

YAML contracts and pipelines give you the best of both worlds: declarative, reviewable specifications that integrate seamlessly with Pointblank’s Python execution engine. The separation of what (YAML contracts) from how (Python transforms) creates a clean architecture where data quality expectations can evolve independently from processing logic: reviewed by different people, on different timelines, with clear accountability at each boundary.