A pipeline in Pointblank represents the enforcement of data quality at both boundaries of a data transformation: the input (“source”) and the output (“target”). The Pipeline class binds a source contract and a target contract together, providing a unified way to validate data as it flows through your system.
This approach embodies a key principle: Pointblank enforces boundaries, it doesn’t orchestrate the transform. You write the transformation logic; Pointblank guards the edges. This deliberate separation of concerns means you can use any DataFrame library, any transformation approach, and any orchestration tool you prefer. Pointblank’s only job is to verify that data entering your transform meets expectations and that data leaving your transform satisfies guarantees you’ve made to downstream consumers.
Think of it like type-checking for data in motion: the source contract is your function’s input type signature, and the target contract is the return type. The pipeline ties them together.
Why Boundary Enforcement?
Data pipelines rarely fail in obvious ways. More often, quality degrades silently: an upstream team adds a column and reorders the schema, a third-party API starts returning nulls in a field that was always populated, or a subtle join bug duplicates rows without raising any exception. These failures propagate downstream and compound, making root-cause analysis painful.
Even with a perfectly correct transformation, data quality can degrade if:
Upstream data changes shape, types, or semantics without notice
A transform introduces bugs (null propagation, incorrect joins, off-by-one errors)
Schema drift occurs silently over time
External data sources degrade in quality between scheduled runs
Boundary enforcement addresses these problems by placing validation checkpoints at the two places that matter most: the entry point and the exit point of your transformation. By validating at both boundaries, you can:
Catch upstream issues early: before they enter your transform and corrupt derived tables
Detect transform bugs: by verifying output against a separate contract defined independently
Short-circuit expensive processing: fail fast when source data is invalid, saving compute
Generate compliance evidence: produce audit-ready reports showing data quality at each stage
Assign accountability: source contracts belong to producers, target contracts to consumers
This dual-boundary model makes it possible to distinguish between “the data I received was bad” and “my code introduced a bug”. This is a distinction that a single-point validation cannot make.
Creating a Pipeline
A Pipeline needs at least one contract (source, target, or both). In practice, most pipelines define both contracts because that gives you full boundary coverage. However, there are legitimate cases for single-boundary pipelines (covered later in this guide).
The example below demonstrates a real-world pattern: a transaction processing pipeline that receives raw financial data in one format (amounts in cents, all fields as raw strings) and produces clean, normalized output (amounts in decimal dollars, validated types). Notice how the source and target contracts encode fundamentally different expectations about the data’s shape and values (this is intentional). The source contract reflects what your upstream system actually produces, while the target contract reflects what downstream consumers need.
import pointblank as pbimport polars as pl# Define the source contract (what raw data must look like)source_contract = pb.Contract( name="raw_transactions", direction="source", schema=pb.Schema( txn_id="String", amount_cents="Int64", currency="String", timestamp="String", ), steps=[ pb.Step("col_vals_not_null", columns=["txn_id", "amount_cents", "currency"]), pb.Step("col_vals_ge", columns="amount_cents", value=0), pb.Step("col_vals_in_set", columns="currency", set=["USD", "EUR", "GBP", "JPY"]), pb.Step("rows_distinct", columns_subset=["txn_id"]), ], version="1.0.0", owner="ingestion-team",)# Define the target contract (what clean data must look like)target_contract = pb.Contract( name="clean_transactions", direction="target", schema=pb.Schema( txn_id="String", currency="String", timestamp="String", amount="Float64", ), steps=[ pb.Step("col_vals_not_null", columns=["txn_id", "amount", "currency", "timestamp"]), pb.Step("col_vals_gt", columns="amount", value=0), pb.Step("col_vals_between", columns="amount", left=0.01, right=100_000.0), pb.Step("rows_distinct", columns_subset=["txn_id"]), ], version="1.0.0", consumers=["analytics-team", "fraud-team"],)# Create the pipelinepipeline = pb.Pipeline( source=source_contract, target=target_contract, label="Transaction Processing Pipeline",)print(pipeline)
The Pipeline.run() method executes the full boundary enforcement workflow in a single call. This is the primary way to use a pipeline: you hand it your input data and a transform function, and it handles the validation orchestration for you. The method follows a strict three-step sequence:
Validate source data against the source contract
Execute your transform function (only if source validation passes)
Validate the output against the target contract
This ordering is deliberate. By validating the source before running the transform, you avoid wasting compute on data that was already invalid. And by validating the target after the transform, you confirm that your code produced output matching your guarantees.
# Sample raw transaction dataraw_transactions = pl.DataFrame( {"txn_id": ["TXN-001", "TXN-002", "TXN-003", "TXN-004", "TXN-005"],"amount_cents": [4999, 12500, 750, 89900, 3200],"currency": ["USD", "EUR", "GBP", "USD", "JPY"],"timestamp": ["2024-06-01T10:30:00","2024-06-01T11:45:00","2024-06-01T12:00:00","2024-06-01T14:20:00","2024-06-01T15:55:00", ], })# Your transformation logicdef process_transactions(df: pl.DataFrame) -> pl.DataFrame:"""Convert cents to dollars and clean up the data."""return df.with_columns( (pl.col("amount_cents") /100).alias("amount") ).drop("amount_cents")# Run the full pipelineresult = pipeline.run(data=raw_transactions, transform=process_transactions)print(f"Source passed: {result.source_passed}")print(f"Target passed: {result.target_passed}")print(f"Overall passed: {result.passed}")
result.source_validation: the interrogated Validate object for the source check
result.target_validation: the interrogated Validate object for the target check
result.transform_output: the data returned by your transform function
result.passed: True only if both boundaries pass
The PipelineResult is designed to give you full introspection. You can access the raw validation objects (which are standard Pointblank Validate instances), inspect the transformed data directly, or simply check the boolean passed property for a quick go/no-go decision. Holding onto the result object also gives you the ability to generate reports or log details after the fact.
Inspecting Results
Because PipelineResult holds full Validate objects, you have access to Pointblank’s rich reporting capabilities. Each validation result can render its standard HTML report table, which shows exactly which steps passed, which failed, and by how much. This is especially valuable during development when you need to understand why a boundary check failed, not just that it failed.
You can access individual validation results to see the standard Pointblank report:
# View the source boundary validationresult.source_validation
You can also get a text summary of the full pipeline run. The get_report() method produces a human-readable summary that includes both the source and target validation results, making it suitable for logging or printing in CI/CD output:
print(result.get_report())
Pipeline Boundary Validation Results
========================================
Source Boundary: PASSED
------------------------------
Steps: 7
Test units passed: 31
Test units failed: 0
Target Boundary: PASSED
------------------------------
Steps: 8
Test units passed: 36
Test units failed: 0
Overall: PASSED
Validating Boundaries Independently
You don’t have to use run(). You can validate each boundary separately when you need more control over the workflow. The validate_source() and validate_target() methods each return a standard Validate object (not a PipelineResult), giving you direct access to the interrogation results.
This pattern is particularly useful when:
Your transform has side effects (writing to a database, calling an API) that you don’t want triggered during validation testing
You need to inspect or modify intermediate data between the two validation steps
You’re building a custom orchestration layer that needs finer-grained control
You want to run just one boundary check in a unit test
# Validate just the sourcesource_result = pipeline.validate_source(raw_transactions)print(f"Source all passed: {source_result.all_passed()}")
Source all passed: True
# Run your transform manuallyclean_data = process_transactions(raw_transactions)# Then validate the targettarget_result = pipeline.validate_target(clean_data)print(f"Target all passed: {target_result.all_passed()}")
Target all passed: True
This is useful when your transform is complex, has side effects, or when you want to inspect intermediate results before proceeding. Notice that these methods return Validate objects directly, so you call .all_passed() on them rather than checking .passed as you would on a PipelineResult.
Short-Circuiting on Source Failure
By default, Pipeline.run() uses short-circuit behavior: if the source validation fails, the transform is never executed and the target is never validated. This is a safety mechanism that prevents invalid data from flowing into your transformation logic, where it could cause unexpected exceptions, produce corrupt output, or trigger irreversible side effects like writing bad records to a production database.
Short-circuiting also saves compute resources. If your transform is expensive (large joins, ML inference, API calls), there’s no reason to run it on data you already know is invalid. The pipeline fails fast and reports exactly why.
In the example below, the source data contains a null txn_id and a negative amount_cents, both of which violate the source contract. Because short-circuiting is enabled (the default), the transform function is never called and the target validation is skipped entirely:
To disable short-circuiting and always run both validations, set short_circuit=False. This is useful when you want to collect validation reports from both boundaries regardless of whether the source passed (for example, during development when you want to see the full picture of all data quality issues at once):
When a contract’s validation fails, the pipeline needs to do something about it. The on_violation parameter on each Contract controls this behavior. There are three modes, each suited to different stages of your development and deployment lifecycle:
"warn" (default): Issues a Python UserWarning. The pipeline continues, and the failure is recorded in the result. Ideal for development and monitoring.
"raise": Raises a RuntimeError immediately. The pipeline halts. Ideal for production gates where invalid data must never proceed.
"log": Writes a warning-level message to the pointblank.contract logger. The pipeline continues silently. Ideal for background monitoring and observability integrations.
Each contract in a pipeline can use a different violation mode. For example, you might use "warn" on the source contract (so you can still inspect the data) and "raise" on the target contract (to guarantee output quality).
Warning Mode (Default)
The warning mode is non-blocking: your pipeline completes, but Python’s warning system records that a violation occurred. You can capture these warnings programmatically using the standard warnings module:
Warning raised: Contract 'soft_check' (source) violated: validation has failing steps.
Raise Mode
Use on_violation="raise" to halt execution immediately when data is invalid. This is the strictest mode and the right choice for production pipelines where bad data must never reach downstream systems. When a validation fails with this mode, a RuntimeError is raised with a message identifying which contract failed, giving you a clear stack trace for debugging:
strict_contract = pb.Contract( name="strict_check", steps=[pb.Step("col_vals_not_null", columns=["txn_id"])], on_violation="raise", # Raises RuntimeError)pipeline = pb.Pipeline(source=strict_contract)# This will raise RuntimeError:# pipeline.validate_source(bad_raw_data)
Log Mode
Use on_violation="log" for background monitoring without interrupting the pipeline. This mode integrates with Python’s standard logging module, writing violation messages to the pointblank.contract logger at the WARNING level. This is ideal for production systems with structured logging and alerting infrastructure where you want visibility into data quality without blocking data flow:
You can set thresholds at the pipeline level to override contract-level thresholds. This is a powerful mechanism for reusing the same contracts across different environments with different tolerance levels. A contract defines what to check; the pipeline’s thresholds define how strict those checks should be in a given context.
For example, during development you might tolerate up to 10% of rows failing a given check (so you can work with imperfect sample data), while in production you might require less than 1% failure. Rather than maintaining separate contracts for each environment, you define the contracts once and let the pipeline’s thresholds control the sensitivity:
You don’t always need both boundaries. In many real-world systems, you’ll encounter situations where only one boundary is relevant. The Pipeline class supports this naturally: simply omit the contract you don’t need, and the pipeline will only validate the boundary you’ve defined.
Common patterns:
Source-Only (Ingestion Guard)
Validate incoming data without a specific output contract. This pattern is common at the edges of your system where you receive data from external sources (APIs, file uploads, partner feeds) and want to reject or flag invalid input before it enters your data lake or warehouse:
Validate data before it leaves your system. This pattern is useful for data exports, API responses, or any point where you’re publishing data to external consumers. You’ve already transformed the data and simply want to confirm it meets the contract you’ve promised:
One of the most powerful applications of boundary enforcement is catching bugs in your transformation logic. When you validate at both boundaries, you can distinguish between “the input data was bad” (source failure) and “my code introduced a problem” (source passes, target fails). This distinction is invaluable during debugging and code review.
Here’s an example where a buggy transform introduces data quality issues. The source data is perfectly valid, but the transformation logic has a conditional branch that inadvertently nulls out small transaction amounts. Without boundary enforcement, this bug might go undetected until a downstream consumer reports missing data (potentially days or weeks later):
def buggy_transform(df: pl.DataFrame) -> pl.DataFrame:"""A transform with a bug (introduces nulls for certain records)."""return df.with_columns( pl.when(pl.col("amount_cents") <1000) .then(None) # Bug! Small amounts become null .otherwise(pl.col("amount_cents") /100) .alias("amount") ).drop("amount_cents")# The source data is fine...result_source = pipeline.validate_source(raw_transactions)print(f"Source passes: {result_source.all_passed()}")# ...but the buggy transform breaks the target contractwith warnings.catch_warnings(record=True): warnings.simplefilter("always") result = pipeline.run(data=raw_transactions, transform=buggy_transform)print(f"\nAfter buggy transform:")print(f" Source passed: {result.source_passed}")print(f" Target passed: {result.target_passed}")print(f" Overall passed: {result.passed}")
# The target validation shows exactly what went wrongresult.target_validation
The target validation report above pinpoints the failure: the col_vals_not_null step on the amount column fails because the buggy transform introduced nulls. This makes the root cause obvious and you can see exactly which contract step failed and trace it back to the transformation logic. Without the target contract, this bug would silently produce a table with missing values.
Working with Pandas
Pipelines work with any DataFrame library supported by Pointblank, including Pandas. The interface is identical: you pass a Pandas DataFrame to run() or validate_source(), and Pointblank handles the rest. This means your contracts are portable across DataFrame backends and you can define a contract once and use it to validate both Polars and Pandas data without modification.
Here’s the same transaction-processing workflow using Pandas DataFrames:
Pipelines are stateless and can be run multiple times with different data. Each call to run() produces an independent PipelineResult with no side effects on the pipeline object itself. This makes pipelines safe to reuse in loops, parallel processing, or long-running services where the same validation logic applies to many batches of data.
The example below demonstrates running a single pipeline against multiple data batches (a common pattern in batch-processing systems where data arrives in chunks):
# Same pipeline, different data batchesbatches = [ pl.DataFrame({"txn_id": [f"BATCH1-{i}"for i inrange(3)],"amount_cents": [1000, 2000, 3000],"currency": ["USD", "USD", "EUR"],"timestamp": ["2024-06-01T10:00:00"] *3, }), pl.DataFrame({"txn_id": [f"BATCH2-{i}"for i inrange(3)],"amount_cents": [500, 1500, 4500],"currency": ["GBP", "JPY", "USD"],"timestamp": ["2024-06-02T10:00:00"] *3, }),]for i, batch inenumerate(batches): result = pipeline.run(data=batch, transform=process_transactions)print(f"Batch {i +1}: {'PASSED'if result.passed else'FAILED'}")
Batch 1: PASSED
Batch 2: PASSED
What’s Next
You now have a solid understanding of how Pipeline enforces data quality at both boundaries of a transformation. You’ve seen how to run full pipelines, inspect results, validate boundaries independently, handle violations, tune thresholds per environment, and detect transform bugs through dual-boundary validation.
The next guide page covers YAML Contracts: how to serialize contracts and pipelines to YAML files for version control, team collaboration, and configuration-driven validation workflows. YAML serialization is especially powerful in production environments where contracts should be managed as configuration rather than code, enabling non-developers to review and modify data quality rules.