from pointblank.adapters import ContractAdapter, ContractImport, MappedConstraint, register_adapter
@register_adapter("my_format")
class MyFormatAdapter(ContractAdapter):
"""Adapter for My Company's internal schema format."""
format_name = "my_format"
file_extensions = [".myschema"]
supports_import = True
supports_export = False # export not implemented yet
@staticmethod
def detect(source) -> bool:
"""Return True if this adapter can handle the source."""
if isinstance(source, dict):
return "my_format_version" in source
return False
def import_contract(self, source, **kwargs) -> ContractImport:
"""Parse the source and return a ContractImport."""
# Your parsing logic here
columns = [("id", "Int64"), ("value", "Float64")]
constraints = [
MappedConstraint(
method="col_vals_not_null",
kwargs={"columns": "id"},
source_description="id is required",
),
]
return ContractImport(
source_format="my_format",
columns=columns,
constraints=constraints,
)Custom Adapters
Pointblank’s contract import/export system is designed to be extensible. If your organization uses a proprietary schema format, an internal data catalog, or any other schema definition tool that isn’t covered by the built-in adapters, you can write a custom adapter and register it with the framework.
Once registered, your custom adapter works seamlessly with import_contract() and export_contract(), the same API surface your team already uses for JSON Schema and Frictionless.
The Adapter Architecture
Every adapter follows the same pattern:
- Subclass ContractAdapter and set a few class attributes
- Implement detect() (for auto-detection), import_contract(), and optionally export_contract()
- Register the adapter with the
@register_adapterdecorator
Here’s a minimal example to illustrate the structure:
After registration, it’s immediately usable:
# Now this works
result = pb.import_contract({"my_format_version": "1.0", "fields": []}, format="my_format")
print(result)ContractImport(format='my_format', columns=2, constraints=1, coverage=100%)
The adapter is now part of the Pointblank ecosystem. Any call to import_contract() with format="my_format" will route through this adapter, and the auto-detection system will call detect() when no format is specified.
# And it shows up in the adapter list
pb.list_adapters(){'frictionless': {'class': 'FrictionlessAdapter',
'file_extensions': ['.resource.json', '.datapackage.json'],
'supports_import': True,
'supports_export': True},
'json_schema': {'class': 'JSONSchemaAdapter',
'file_extensions': ['.schema.json'],
'supports_import': True,
'supports_export': True},
'my_format': {'class': 'MyFormatAdapter',
'file_extensions': ['.myschema'],
'supports_import': True,
'supports_export': False}}
The list_adapters() output confirms your adapter is registered alongside the built-in ones, showing its supported file extensions and whether it handles import, export, or both.
The ContractAdapter Base Class
Here are the class attributes and methods you can define:
| Attribute | Type | Purpose |
|---|---|---|
| format_name | str |
Short identifier (e.g., "json_schema", "my_format") |
| file_extensions | list[str] |
File extensions for auto-detection (e.g., [".schema.json"]) |
| supports_import | bool |
Whether import_contract() is implemented |
| supports_export | bool |
Whether export_contract() is implemented |
| Method | Required? | Purpose |
|---|---|---|
detect(source) |
Recommended | Returns True if this adapter handles the given source |
import_contract(source, **kwargs) |
If supports_import | Parses source, returns ContractImport |
export_contract(obj, destination, **kwargs) |
If supports_export | Exports to the format |
Building an Import Adapter
Let’s build a more realistic adapter, one that reads a simple YAML-based schema format used internally at a hypothetical company:
# company_schema.yaml
version: "2.0"
table: user_events
columns:
- name: event_id
type: string
required: true
unique: true
- name: user_id
type: integer
required: true
- name: event_type
type: string
values: [click, view, purchase, signup]
- name: amount
type: float
min: 0Here’s the adapter that handles this format:
import yaml
from pointblank.adapters import ContractAdapter, ContractImport, MappedConstraint, register_adapter
@register_adapter("company_schema")
class CompanySchemaAdapter(ContractAdapter):
"""Adapter for our company's internal YAML schema format."""
format_name = "company_schema"
file_extensions = [".company.yaml", ".company.yml"]
supports_import = True
supports_export = False
# Type mapping from our format to Pointblank dtypes
TYPE_MAP = {
"string": "String",
"integer": "Int64",
"float": "Float64",
"boolean": "Boolean",
"date": "Date",
"datetime": "Datetime",
}
@staticmethod
def detect(source) -> bool:
"""Detect our format by looking for the 'version' + 'columns' keys."""
if isinstance(source, dict):
return "version" in source and "columns" in source and "table" in source
return False
def import_contract(self, source, **kwargs) -> ContractImport:
"""Import from our company schema format."""
# Load from file or use dict directly
if isinstance(source, str):
from pathlib import Path
with open(Path(source)) as f:
doc = yaml.safe_load(f)
elif isinstance(source, dict):
doc = source
else:
raise TypeError(f"Expected str or dict, got {type(source).__name__}")
columns = []
constraints = []
warnings = []
total = 0
for col_def in doc.get("columns", []):
col_name = col_def["name"]
col_type = col_def.get("type", "string")
dtype = self.TYPE_MAP.get(col_type)
columns.append((col_name, dtype))
if col_def.get("required", False):
total += 1
constraints.append(
MappedConstraint(
method="col_vals_not_null",
kwargs={"columns": col_name},
source_description=f"{col_name} is required",
)
)
if col_def.get("unique", False):
total += 1
constraints.append(
MappedConstraint(
method="rows_distinct",
kwargs={"columns_subset": col_name},
source_description=f"{col_name} must be unique",
)
)
if "values" in col_def:
total += 1
constraints.append(
MappedConstraint(
method="col_vals_in_set",
kwargs={"columns": col_name, "set": col_def["values"]},
source_description=f"{col_name} allowed values: {col_def['values']}",
)
)
if "min" in col_def:
total += 1
constraints.append(
MappedConstraint(
method="col_vals_ge",
kwargs={"columns": col_name, "value": col_def["min"]},
source_description=f"{col_name} >= {col_def['min']}",
)
)
if "max" in col_def:
total += 1
constraints.append(
MappedConstraint(
method="col_vals_le",
kwargs={"columns": col_name, "value": col_def["max"]},
source_description=f"{col_name} <= {col_def['max']}",
)
)
coverage = 1.0 if total == 0 else (total - len(warnings)) / total
return ContractImport(
source_format="company_schema",
source_path=source if isinstance(source, str) else None,
source_version=doc.get("version"),
columns=columns,
constraints=constraints,
metadata={"table": doc.get("table")},
warnings=warnings,
coverage=coverage,
)Now let’s use it:
import polars as pl
# Simulate a company schema document
company_schema = {
"version": "2.0",
"table": "user_events",
"columns": [
{"name": "event_id", "type": "string", "required": True, "unique": True},
{"name": "user_id", "type": "integer", "required": True},
{"name": "event_type", "type": "string", "values": ["click", "view", "purchase", "signup"]},
{"name": "amount", "type": "float", "min": 0},
],
}
# Import using our custom adapter
result = pb.import_contract(company_schema, format="company_schema")
print(result.summary())Contract Import Summary
Format: company_schema
Format version: 2.0
Columns detected: 4
Constraints mapped: 5
Coverage: 100%
The summary shows four columns detected and five constraints mapped (two required fields, one unique field, one values check, and one min bound). All constraints were successfully translated, giving 100% coverage.
# Validate some data
events = pl.DataFrame(
{
"event_id": ["E001", "E002", "E003", "E004", "E005"],
"user_id": [101, 102, 101, 103, 104],
"event_type": ["click", "view", "purchase", "signup", "click"],
"amount": [0.0, 0.0, 49.99, 0.0, 0.0],
}
)
result.to_validate(data=events).interrogate()| Pointblank Validation | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
2026-06-13|17:47:01 Polars |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| STEP | COLUMNS | VALUES | TBL | EVAL | UNITS | PASS | FAIL | W | E | C | EXT | ||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| #4CA64C | 1 |
col_schema_match()
|
✓ | 1 | 1 1.00 |
0 0.00 |
— | — | — | — | |||||||||||||||||||||||||||||||||||||||||||||||||||
| #4CA64C | 2 |
col_vals_not_null()
|
✓ | 5 | 5 1.00 |
0 0.00 |
— | — | — | — | |||||||||||||||||||||||||||||||||||||||||||||||||||
| #4CA64C | 3 |
rows_distinct()
|
✓ | 5 | 5 1.00 |
0 0.00 |
— | — | — | — | |||||||||||||||||||||||||||||||||||||||||||||||||||
| #4CA64C | 4 |
col_vals_not_null()
|
✓ | 5 | 5 1.00 |
0 0.00 |
— | — | — | — | |||||||||||||||||||||||||||||||||||||||||||||||||||
| #4CA64C | 5 |
col_vals_in_set()
|
✓ | 5 | 5 1.00 |
0 0.00 |
— | — | — | — | |||||||||||||||||||||||||||||||||||||||||||||||||||
| #4CA64C | 6 |
col_vals_ge()
|
✓ | 5 | 5 1.00 |
0 0.00 |
— | — | — | — | |||||||||||||||||||||||||||||||||||||||||||||||||||
Notes Step 1 (schema_check) ✓ Schema validation passed. Schema Comparison
| |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
The validation report shows each imported constraint as a separate step, just as if you had written the validation by hand. From the user’s perspective, there is no difference between validation steps that came from a custom adapter and those written directly in Python.
The MappedConstraint Class
Each constraint from the external format gets mapped to a MappedConstraint, which is a simple data container holding:
method: the Pointblank Validate method name (e.g.,"col_vals_gt")kwargs: the keyword arguments to pass to that methodsource_description: optional human-readable note about what this was in the source format
# Creating constraints manually
c1 = MappedConstraint(
method="col_vals_between",
kwargs={"columns": "temperature", "left": -40, "right": 60},
source_description="Temperature must be in physical range",
)
print(c1)MappedConstraint('col_vals_between', columns='temperature', left=-40, right=60)
The source_description is stored for debugging and documentation but doesn’t affect validation. When users call .summary() or inspect the ContractImport object, these descriptions help them understand the provenance of each validation step. This is especially useful when debugging why a particular check was generated or when comparing the import output against the original schema.
Handling Unmappable Constraints
Not every constraint in every format has a clean Pointblank equivalent. When you encounter something that can’t be translated, add it to the warnings list rather than silently dropping it:
# In your import_contract() method:
if "custom_check" in col_def:
total += 1
warnings.append(
f"Column '{col_name}': 'custom_check' has no Pointblank equivalent, skipped."
)This follows Pointblank’s design principle of best-effort translation: generate everything you can, be transparent about what was skipped, and never silently lose information. Users can then review the warnings list and decide whether to add manual validation steps for the missing constraints or whether the gap is acceptable for their use case.
Auto-Detection Tips
The detect() method enables format auto-detection. Good detection should be:
- Fast: don’t load the entire file just to check if it’s your format
- Specific: avoid false positives that could conflict with other adapters
- Graceful: return
False(never raise) if the source isn’t your format
The detection system iterates through all registered adapters and calls detect() on each one. Because of this, your detection logic should be as lightweight as possible. Checking for the presence of a few distinctive keys in a dict is ideal. Avoid expensive operations like parsing large files or making network requests inside detect().
@staticmethod
def detect(source) -> bool:
if isinstance(source, dict):
# Check for a distinctive key combination
return "my_format_version" in source and "tables" in source
if isinstance(source, str):
# Check file extension first (cheapest check)
return source.lower().endswith(".myformat.yaml")
return FalseBest Practices
Map as much as possible: users expect high coverage. If a constraint is close to something Pointblank supports, map it (possibly with reduced precision) rather than skipping it.
Use descriptive source_description: this helps users understand what each generated validation step corresponds to in their original schema.
Set coverage accurately: track the total number of source constraints and how many were successfully mapped. This gives users confidence in the import quality.
Handle both file paths and dicts: users should be able to pass either a path string or pre-loaded data. Most adapters check
isinstance(source, str)for file paths andisinstance(source, dict)for pre-parsed content.Fail clearly on bad input: raise
TypeErrorfor wrong source types,FileNotFoundErrorfor missing files, andValueErrorfor malformed content. Don’t return partial results silently.Keep dependencies optional: if your adapter needs a third-party library, check for it at import time and give a clear installation hint if it’s missing.
Conclusion
Custom adapters let you extend Pointblank’s import/export system to handle any schema format your organization uses. The plugin architecture is intentionally simple: subclass ContractAdapter, implement one or two methods, and register it with a decorator. From that point forward, your format participates in the same import_contract() and export_contract() workflow that the built-in adapters use.
This extensibility means that Pointblank can serve as a universal validation layer regardless of where your data contracts originate. Whether your schemas live in a proprietary YAML format, an internal data catalog API, or a custom metadata store, a short adapter class is all you need to bring them into the Pointblank ecosystem and benefit from its validation reporting, threshold system, and pipeline integration.