Pipeline Boundary Enforcement

A pipeline in Pointblank represents the enforcement of data quality at both boundaries of a data transformation: the input (“source”) and the output (“target”). The Pipeline class binds a source contract and a target contract together, providing a unified way to validate data as it flows through your system.

This approach embodies a key principle: Pointblank enforces boundaries, it doesn’t orchestrate the transform. You write the transformation logic; Pointblank guards the edges. This deliberate separation of concerns means you can use any DataFrame library, any transformation approach, and any orchestration tool you prefer. Pointblank’s only job is to verify that data entering your transform meets expectations and that data leaving your transform satisfies guarantees you’ve made to downstream consumers.

Think of it like type-checking for data in motion: the source contract is your function’s input type signature, and the target contract is the return type. The pipeline ties them together.

Why Boundary Enforcement?

Data pipelines rarely fail in obvious ways. More often, quality degrades silently: an upstream team adds a column and reorders the schema, a third-party API starts returning nulls in a field that was always populated, or a subtle join bug duplicates rows without raising any exception. These failures propagate downstream and compound, making root-cause analysis painful.

Even with a perfectly correct transformation, data quality can degrade if:

  • Upstream data changes shape, types, or semantics without notice
  • A transform introduces bugs (null propagation, incorrect joins, off-by-one errors)
  • Schema drift occurs silently over time
  • External data sources degrade in quality between scheduled runs

Boundary enforcement addresses these problems by placing validation checkpoints at the two places that matter most: the entry point and the exit point of your transformation. By validating at both boundaries, you can:

  1. Catch upstream issues early: before they enter your transform and corrupt derived tables
  2. Detect transform bugs: by verifying output against a separate contract defined independently
  3. Short-circuit expensive processing: fail fast when source data is invalid, saving compute
  4. Generate compliance evidence: produce audit-ready reports showing data quality at each stage
  5. Assign accountability: source contracts belong to producers, target contracts to consumers

This dual-boundary model makes it possible to distinguish between “the data I received was bad” and “my code introduced a bug”. This is a distinction that a single-point validation cannot make.

Creating a Pipeline

A Pipeline needs at least one contract (source, target, or both). In practice, most pipelines define both contracts because that gives you full boundary coverage. However, there are legitimate cases for single-boundary pipelines (covered later in this guide).

The example below demonstrates a real-world pattern: a transaction processing pipeline that receives raw financial data in one format (amounts in cents, all fields as raw strings) and produces clean, normalized output (amounts in decimal dollars, validated types). Notice how the source and target contracts encode fundamentally different expectations about the data’s shape and values (this is intentional). The source contract reflects what your upstream system actually produces, while the target contract reflects what downstream consumers need.

import pointblank as pb
import polars as pl

# Define the source contract (what raw data must look like)
source_contract = pb.Contract(
    name="raw_transactions",
    direction="source",
    schema=pb.Schema(
        txn_id="String",
        amount_cents="Int64",
        currency="String",
        timestamp="String",
    ),
    steps=[
        pb.Step("col_vals_not_null", columns=["txn_id", "amount_cents", "currency"]),
        pb.Step("col_vals_ge", columns="amount_cents", value=0),
        pb.Step("col_vals_in_set", columns="currency", set=["USD", "EUR", "GBP", "JPY"]),
        pb.Step("rows_distinct", columns_subset=["txn_id"]),
    ],
    version="1.0.0",
    owner="ingestion-team",
)

# Define the target contract (what clean data must look like)
target_contract = pb.Contract(
    name="clean_transactions",
    direction="target",
    schema=pb.Schema(
        txn_id="String",
        currency="String",
        timestamp="String",
        amount="Float64",
    ),
    steps=[
        pb.Step("col_vals_not_null", columns=["txn_id", "amount", "currency", "timestamp"]),
        pb.Step("col_vals_gt", columns="amount", value=0),
        pb.Step("col_vals_between", columns="amount", left=0.01, right=100_000.0),
        pb.Step("rows_distinct", columns_subset=["txn_id"]),
    ],
    version="1.0.0",
    consumers=["analytics-team", "fraud-team"],
)

# Create the pipeline
pipeline = pb.Pipeline(
    source=source_contract,
    target=target_contract,
    label="Transaction Processing Pipeline",
)

print(pipeline)
Pipeline(
  label='Transaction Processing Pipeline',
  source='raw_transactions',
  target='clean_transactions',
  short_circuit=True
)

Running the Pipeline

The Pipeline.run() method executes the full boundary enforcement workflow in a single call. This is the primary way to use a pipeline: you hand it your input data and a transform function, and it handles the validation orchestration for you. The method follows a strict three-step sequence:

  1. Validate source data against the source contract
  2. Execute your transform function (only if source validation passes)
  3. Validate the output against the target contract

This ordering is deliberate. By validating the source before running the transform, you avoid wasting compute on data that was already invalid. And by validating the target after the transform, you confirm that your code produced output matching your guarantees.

# Sample raw transaction data
raw_transactions = pl.DataFrame(
    {
        "txn_id": ["TXN-001", "TXN-002", "TXN-003", "TXN-004", "TXN-005"],
        "amount_cents": [4999, 12500, 750, 89900, 3200],
        "currency": ["USD", "EUR", "GBP", "USD", "JPY"],
        "timestamp": [
            "2024-06-01T10:30:00",
            "2024-06-01T11:45:00",
            "2024-06-01T12:00:00",
            "2024-06-01T14:20:00",
            "2024-06-01T15:55:00",
        ],
    }
)

# Your transformation logic
def process_transactions(df: pl.DataFrame) -> pl.DataFrame:
    """Convert cents to dollars and clean up the data."""
    return df.with_columns(
        (pl.col("amount_cents") / 100).alias("amount")
    ).drop("amount_cents")

# Run the full pipeline
result = pipeline.run(data=raw_transactions, transform=process_transactions)

print(f"Source passed: {result.source_passed}")
print(f"Target passed: {result.target_passed}")
print(f"Overall passed: {result.passed}")
Source passed: True
Target passed: True
Overall passed: True

The result is a PipelineResult object containing:

  • result.source_validation: the interrogated Validate object for the source check
  • result.target_validation: the interrogated Validate object for the target check
  • result.transform_output: the data returned by your transform function
  • result.passed: True only if both boundaries pass

The PipelineResult is designed to give you full introspection. You can access the raw validation objects (which are standard Pointblank Validate instances), inspect the transformed data directly, or simply check the boolean passed property for a quick go/no-go decision. Holding onto the result object also gives you the ability to generate reports or log details after the fact.

Inspecting Results

Because PipelineResult holds full Validate objects, you have access to Pointblank’s rich reporting capabilities. Each validation result can render its standard HTML report table, which shows exactly which steps passed, which failed, and by how much. This is especially valuable during development when you need to understand why a boundary check failed, not just that it failed.

You can access individual validation results to see the standard Pointblank report:

# View the source boundary validation
result.source_validation
Pointblank Validation
Transaction Processing Pipeline [source]
Polarsraw_transactions
STEP COLUMNS VALUES TBL EVAL UNITS PASS FAIL W E C EXT
#4CA64C 1
col_schema_match
col_schema_match()
SCHEMA 1 1
1.00
0
0.00
#4CA64C 2
col_vals_not_null
col_vals_not_null()
txn_id 5 5
1.00
0
0.00
#4CA64C 3
col_vals_not_null
col_vals_not_null()
amount_cents 5 5
1.00
0
0.00
#4CA64C 4
col_vals_not_null
col_vals_not_null()
currency 5 5
1.00
0
0.00
#4CA64C 5
col_vals_ge
col_vals_ge()
amount_cents 0 5 5
1.00
0
0.00
#4CA64C 6
col_vals_in_set
col_vals_in_set()
currency USD, EUR, GBP, JPY 5 5
1.00
0
0.00
#4CA64C 7
rows_distinct
rows_distinct()
txn_id 5 5
1.00
0
0.00
Owner: ingestion-teamVersion: 1.0.0

Notes

Step 1 (schema_check) Schema validation passed.

Schema Comparison
TARGET EXPECTED
COLUMN DATA TYPE COLUMN DATA TYPE
1 txn_id String 1 txn_id String
2 amount_cents Int64 2 amount_cents Int64
3 currency String 3 currency String
4 timestamp String 4 timestamp String
Supplied Column Schema:
[('txn_id', 'String'), ('amount_cents', 'Int64'), ('currency', 'String'), ('timestamp', 'String')]
Schema Match Settings
COMPLETE
IN ORDER
COLUMN ≠ column
DTYPE ≠ dtype
float ≠ float64
# View the target boundary validation
result.target_validation
Pointblank Validation
Transaction Processing Pipeline [target]
Polarsclean_transactions
STEP COLUMNS VALUES TBL EVAL UNITS PASS FAIL W E C EXT
#4CA64C 1
col_schema_match
col_schema_match()
SCHEMA 1 1
1.00
0
0.00
#4CA64C 2
col_vals_not_null
col_vals_not_null()
txn_id 5 5
1.00
0
0.00
#4CA64C 3
col_vals_not_null
col_vals_not_null()
amount 5 5
1.00
0
0.00
#4CA64C 4
col_vals_not_null
col_vals_not_null()
currency 5 5
1.00
0
0.00
#4CA64C 5
col_vals_not_null
col_vals_not_null()
timestamp 5 5
1.00
0
0.00
#4CA64C 6
col_vals_gt
col_vals_gt()
amount 0 5 5
1.00
0
0.00
#4CA64C 7
col_vals_between
col_vals_between()
amount [0.01, 100000.0] 5 5
1.00
0
0.00
#4CA64C 8
rows_distinct
rows_distinct()
txn_id 5 5
1.00
0
0.00
Consumers: analytics-team, fraud-teamVersion: 1.0.0

Notes

Step 1 (schema_check) Schema validation passed.

Schema Comparison
TARGET EXPECTED
COLUMN DATA TYPE COLUMN DATA TYPE
1 txn_id String 1 txn_id String
2 currency String 2 currency String
3 timestamp String 3 timestamp String
4 amount Float64 4 amount Float64
Supplied Column Schema:
[('txn_id', 'String'), ('currency', 'String'), ('timestamp', 'String'), ('amount', 'Float64')]
Schema Match Settings
COMPLETE
IN ORDER
COLUMN ≠ column
DTYPE ≠ dtype
float ≠ float64

You can also get a text summary of the full pipeline run. The get_report() method produces a human-readable summary that includes both the source and target validation results, making it suitable for logging or printing in CI/CD output:

print(result.get_report())
Pipeline Boundary Validation Results
========================================

Source Boundary: PASSED
------------------------------
  Steps: 7
  Test units passed: 31
  Test units failed: 0

Target Boundary: PASSED
------------------------------
  Steps: 8
  Test units passed: 36
  Test units failed: 0

Overall: PASSED

Validating Boundaries Independently

You don’t have to use run(). You can validate each boundary separately when you need more control over the workflow. The validate_source() and validate_target() methods each return a standard Validate object (not a PipelineResult), giving you direct access to the interrogation results.

This pattern is particularly useful when:

  • Your transform has side effects (writing to a database, calling an API) that you don’t want triggered during validation testing
  • You need to inspect or modify intermediate data between the two validation steps
  • You’re building a custom orchestration layer that needs finer-grained control
  • You want to run just one boundary check in a unit test
# Validate just the source
source_result = pipeline.validate_source(raw_transactions)
print(f"Source all passed: {source_result.all_passed()}")
Source all passed: True
# Run your transform manually
clean_data = process_transactions(raw_transactions)

# Then validate the target
target_result = pipeline.validate_target(clean_data)
print(f"Target all passed: {target_result.all_passed()}")
Target all passed: True

This is useful when your transform is complex, has side effects, or when you want to inspect intermediate results before proceeding. Notice that these methods return Validate objects directly, so you call .all_passed() on them rather than checking .passed as you would on a PipelineResult.

Short-Circuiting on Source Failure

By default, Pipeline.run() uses short-circuit behavior: if the source validation fails, the transform is never executed and the target is never validated. This is a safety mechanism that prevents invalid data from flowing into your transformation logic, where it could cause unexpected exceptions, produce corrupt output, or trigger irreversible side effects like writing bad records to a production database.

Short-circuiting also saves compute resources. If your transform is expensive (large joins, ML inference, API calls), there’s no reason to run it on data you already know is invalid. The pipeline fails fast and reports exactly why.

In the example below, the source data contains a null txn_id and a negative amount_cents, both of which violate the source contract. Because short-circuiting is enabled (the default), the transform function is never called and the target validation is skipped entirely:

# Data with problems (null txn_id, negative amount)
bad_raw_data = pl.DataFrame(
    {
        "txn_id": ["TXN-001", None, "TXN-003", "TXN-004", "TXN-005"],
        "amount_cents": [4999, -100, 750, 89900, 3200],
        "currency": ["USD", "EUR", "GBP", "USD", "JPY"],
        "timestamp": [
            "2024-06-01T10:30:00",
            "2024-06-01T11:45:00",
            "2024-06-01T12:00:00",
            "2024-06-01T14:20:00",
            "2024-06-01T15:55:00",
        ],
    }
)

# Source contract uses on_violation="warn" (default)
import warnings

with warnings.catch_warnings(record=True):
    warnings.simplefilter("always")
    result = pipeline.run(data=bad_raw_data, transform=process_transactions)

print(f"Source passed: {result.source_passed}")
print(f"Target validation: {result.target_validation}")  # None, was skipped
print(f"Transform output: {result.transform_output}")  # None, was skipped
Source passed: False
Target validation: None
Transform output: None

To disable short-circuiting and always run both validations, set short_circuit=False. This is useful when you want to collect validation reports from both boundaries regardless of whether the source passed (for example, during development when you want to see the full picture of all data quality issues at once):

pipeline_no_shortcircuit = pb.Pipeline(
    source=source_contract,
    target=target_contract,
    short_circuit=False,
)

with warnings.catch_warnings(record=True):
    warnings.simplefilter("always")
    result = pipeline_no_shortcircuit.run(
        data=bad_raw_data, transform=process_transactions
    )

print(f"Source passed: {result.source_passed}")
print(f"Target validated: {result.target_validation is not None}")
Source passed: False
Target validated: True

Violation Handling

When a contract’s validation fails, the pipeline needs to do something about it. The on_violation parameter on each Contract controls this behavior. There are three modes, each suited to different stages of your development and deployment lifecycle:

  • "warn" (default): Issues a Python UserWarning. The pipeline continues, and the failure is recorded in the result. Ideal for development and monitoring.
  • "raise": Raises a RuntimeError immediately. The pipeline halts. Ideal for production gates where invalid data must never proceed.
  • "log": Writes a warning-level message to the pointblank.contract logger. The pipeline continues silently. Ideal for background monitoring and observability integrations.

Each contract in a pipeline can use a different violation mode. For example, you might use "warn" on the source contract (so you can still inspect the data) and "raise" on the target contract (to guarantee output quality).

Warning Mode (Default)

The warning mode is non-blocking: your pipeline completes, but Python’s warning system records that a violation occurred. You can capture these warnings programmatically using the standard warnings module:

warn_contract = pb.Contract(
    name="soft_check",
    steps=[pb.Step("col_vals_not_null", columns=["txn_id"])],
    on_violation="warn",  # Issues a UserWarning
)

pipeline = pb.Pipeline(source=warn_contract)

with warnings.catch_warnings(record=True) as w:
    warnings.simplefilter("always")
    pipeline.validate_source(bad_raw_data)
    if w:
        print(f"Warning raised: {w[0].message}")
Warning raised: Contract 'soft_check' (source) violated: validation has failing steps.

Raise Mode

Use on_violation="raise" to halt execution immediately when data is invalid. This is the strictest mode and the right choice for production pipelines where bad data must never reach downstream systems. When a validation fails with this mode, a RuntimeError is raised with a message identifying which contract failed, giving you a clear stack trace for debugging:

strict_contract = pb.Contract(
    name="strict_check",
    steps=[pb.Step("col_vals_not_null", columns=["txn_id"])],
    on_violation="raise",  # Raises RuntimeError
)

pipeline = pb.Pipeline(source=strict_contract)

# This will raise RuntimeError:
# pipeline.validate_source(bad_raw_data)

Log Mode

Use on_violation="log" for background monitoring without interrupting the pipeline. This mode integrates with Python’s standard logging module, writing violation messages to the pointblank.contract logger at the WARNING level. This is ideal for production systems with structured logging and alerting infrastructure where you want visibility into data quality without blocking data flow:

import logging
logging.basicConfig(level=logging.WARNING)

log_contract = pb.Contract(
    name="monitored_check",
    steps=[pb.Step("col_vals_not_null", columns=["txn_id"])],
    on_violation="log",  # Logs via pointblank.contract logger
)

Pipeline-Level Thresholds

You can set thresholds at the pipeline level to override contract-level thresholds. This is a powerful mechanism for reusing the same contracts across different environments with different tolerance levels. A contract defines what to check; the pipeline’s thresholds define how strict those checks should be in a given context.

For example, during development you might tolerate up to 10% of rows failing a given check (so you can work with imperfect sample data), while in production you might require less than 1% failure. Rather than maintaining separate contracts for each environment, you define the contracts once and let the pipeline’s thresholds control the sensitivity:

# Relaxed thresholds for development
dev_pipeline = pb.Pipeline(
    source=source_contract,
    target=target_contract,
    thresholds=pb.Thresholds(warning=0.10, error=0.25, critical=0.50),
    label="Development Pipeline",
)

# Strict thresholds for production
prod_pipeline = pb.Pipeline(
    source=source_contract,
    target=target_contract,
    thresholds=pb.Thresholds(warning=0.01, error=0.05, critical=0.10),
    label="Production Pipeline",
)

Source-Only and Target-Only Pipelines

You don’t always need both boundaries. In many real-world systems, you’ll encounter situations where only one boundary is relevant. The Pipeline class supports this naturally: simply omit the contract you don’t need, and the pipeline will only validate the boundary you’ve defined.

Common patterns:

Source-Only (Ingestion Guard)

Validate incoming data without a specific output contract. This pattern is common at the edges of your system where you receive data from external sources (APIs, file uploads, partner feeds) and want to reject or flag invalid input before it enters your data lake or warehouse:

ingestion_pipeline = pb.Pipeline(source=source_contract)

result = ingestion_pipeline.run(
    data=raw_transactions,
    transform=lambda df: df,  # Pass-through
)
print(f"Ingestion valid: {result.passed}")
Ingestion valid: True

Target-Only (Output Guard)

Validate data before it leaves your system. This pattern is useful for data exports, API responses, or any point where you’re publishing data to external consumers. You’ve already transformed the data and simply want to confirm it meets the contract you’ve promised:

export_pipeline = pb.Pipeline(target=target_contract)

clean_data = process_transactions(raw_transactions)
result = export_pipeline.run(
    data=clean_data,
    transform=lambda df: df,  # Already transformed
)
print(f"Export valid: {result.passed}")
Export valid: True

Detecting Transform Bugs

One of the most powerful applications of boundary enforcement is catching bugs in your transformation logic. When you validate at both boundaries, you can distinguish between “the input data was bad” (source failure) and “my code introduced a problem” (source passes, target fails). This distinction is invaluable during debugging and code review.

Here’s an example where a buggy transform introduces data quality issues. The source data is perfectly valid, but the transformation logic has a conditional branch that inadvertently nulls out small transaction amounts. Without boundary enforcement, this bug might go undetected until a downstream consumer reports missing data (potentially days or weeks later):

def buggy_transform(df: pl.DataFrame) -> pl.DataFrame:
    """A transform with a bug (introduces nulls for certain records)."""
    return df.with_columns(
        pl.when(pl.col("amount_cents") < 1000)
        .then(None)  # Bug! Small amounts become null
        .otherwise(pl.col("amount_cents") / 100)
        .alias("amount")
    ).drop("amount_cents")

# The source data is fine...
result_source = pipeline.validate_source(raw_transactions)
print(f"Source passes: {result_source.all_passed()}")

# ...but the buggy transform breaks the target contract
with warnings.catch_warnings(record=True):
    warnings.simplefilter("always")
    result = pipeline.run(data=raw_transactions, transform=buggy_transform)

print(f"\nAfter buggy transform:")
print(f"  Source passed: {result.source_passed}")
print(f"  Target passed: {result.target_passed}")
print(f"  Overall passed: {result.passed}")
Source passes: True

After buggy transform:
  Source passed: True
  Target passed: True
  Overall passed: True
# The target validation shows exactly what went wrong
result.target_validation

The target validation report above pinpoints the failure: the col_vals_not_null step on the amount column fails because the buggy transform introduced nulls. This makes the root cause obvious and you can see exactly which contract step failed and trace it back to the transformation logic. Without the target contract, this bug would silently produce a table with missing values.

Working with Pandas

Pipelines work with any DataFrame library supported by Pointblank, including Pandas. The interface is identical: you pass a Pandas DataFrame to run() or validate_source(), and Pointblank handles the rest. This means your contracts are portable across DataFrame backends and you can define a contract once and use it to validate both Polars and Pandas data without modification.

Here’s the same transaction-processing workflow using Pandas DataFrames:

import pandas as pd

# Pandas DataFrame
raw_pd = pd.DataFrame(
    {
        "txn_id": ["TXN-001", "TXN-002", "TXN-003"],
        "amount_cents": [4999, 12500, 750],
        "currency": ["USD", "EUR", "GBP"],
        "timestamp": ["2024-06-01T10:30:00", "2024-06-01T11:45:00", "2024-06-01T12:00:00"],
    }
)

# Pandas transform
def pandas_transform(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["amount"] = df["amount_cents"] / 100
    df = df.drop(columns=["amount_cents"])
    return df

# Same pipeline works with Pandas
source_only = pb.Pipeline(
    source=pb.Contract(
        name="pandas_source",
        steps=[
            pb.Step("col_vals_not_null", columns=["txn_id", "amount_cents"]),
            pb.Step("col_vals_ge", columns="amount_cents", value=0),
        ],
    ),
    target=pb.Contract(
        name="pandas_target",
        steps=[
            pb.Step("col_vals_not_null", columns=["txn_id", "amount"]),
            pb.Step("col_vals_gt", columns="amount", value=0),
        ],
    ),
)

result = source_only.run(data=raw_pd, transform=pandas_transform)
print(f"Pandas pipeline passed: {result.passed}")
Pandas pipeline passed: True

Multiple Pipeline Runs

Pipelines are stateless and can be run multiple times with different data. Each call to run() produces an independent PipelineResult with no side effects on the pipeline object itself. This makes pipelines safe to reuse in loops, parallel processing, or long-running services where the same validation logic applies to many batches of data.

The example below demonstrates running a single pipeline against multiple data batches (a common pattern in batch-processing systems where data arrives in chunks):

# Same pipeline, different data batches
batches = [
    pl.DataFrame({
        "txn_id": [f"BATCH1-{i}" for i in range(3)],
        "amount_cents": [1000, 2000, 3000],
        "currency": ["USD", "USD", "EUR"],
        "timestamp": ["2024-06-01T10:00:00"] * 3,
    }),
    pl.DataFrame({
        "txn_id": [f"BATCH2-{i}" for i in range(3)],
        "amount_cents": [500, 1500, 4500],
        "currency": ["GBP", "JPY", "USD"],
        "timestamp": ["2024-06-02T10:00:00"] * 3,
    }),
]

for i, batch in enumerate(batches):
    result = pipeline.run(data=batch, transform=process_transactions)
    print(f"Batch {i + 1}: {'PASSED' if result.passed else 'FAILED'}")
Batch 1: PASSED
Batch 2: PASSED

What’s Next

You now have a solid understanding of how Pipeline enforces data quality at both boundaries of a transformation. You’ve seen how to run full pipelines, inspect results, validate boundaries independently, handle violations, tune thresholds per environment, and detect transform bugs through dual-boundary validation.

The next guide page covers YAML Contracts: how to serialize contracts and pipelines to YAML files for version control, team collaboration, and configuration-driven validation workflows. YAML serialization is especially powerful in production environments where contracts should be managed as configuration rather than code, enabling non-developers to review and modify data quality rules.