Custom Adapters

Pointblank’s contract import/export system is designed to be extensible. If your organization uses a proprietary schema format, an internal data catalog, or any other schema definition tool that isn’t covered by the built-in adapters, you can write a custom adapter and register it with the framework.

Once registered, your custom adapter works seamlessly with import_contract() and export_contract(), the same API surface your team already uses for JSON Schema and Frictionless.

The Adapter Architecture

Every adapter follows the same pattern:

  1. Subclass ContractAdapter and set a few class attributes
  2. Implement detect() (for auto-detection), import_contract(), and optionally export_contract()
  3. Register the adapter with the @register_adapter decorator

Here’s a minimal example to illustrate the structure:

from pointblank.adapters import ContractAdapter, ContractImport, MappedConstraint, register_adapter


@register_adapter("my_format")
class MyFormatAdapter(ContractAdapter):
    """Adapter for My Company's internal schema format."""

    format_name = "my_format"
    file_extensions = [".myschema"]
    supports_import = True
    supports_export = False  # export not implemented yet

    @staticmethod
    def detect(source) -> bool:
        """Return True if this adapter can handle the source."""
        if isinstance(source, dict):
            return "my_format_version" in source
        return False

    def import_contract(self, source, **kwargs) -> ContractImport:
        """Parse the source and return a ContractImport."""
        # Your parsing logic here
        columns = [("id", "Int64"), ("value", "Float64")]
        constraints = [
            MappedConstraint(
                method="col_vals_not_null",
                kwargs={"columns": "id"},
                source_description="id is required",
            ),
        ]
        return ContractImport(
            source_format="my_format",
            columns=columns,
            constraints=constraints,
        )

After registration, it’s immediately usable:

# Now this works
result = pb.import_contract({"my_format_version": "1.0", "fields": []}, format="my_format")
print(result)
ContractImport(format='my_format', columns=2, constraints=1, coverage=100%)

The adapter is now part of the Pointblank ecosystem. Any call to import_contract() with format="my_format" will route through this adapter, and the auto-detection system will call detect() when no format is specified.

# And it shows up in the adapter list
pb.list_adapters()
{'frictionless': {'class': 'FrictionlessAdapter',
  'file_extensions': ['.resource.json', '.datapackage.json'],
  'supports_import': True,
  'supports_export': True},
 'json_schema': {'class': 'JSONSchemaAdapter',
  'file_extensions': ['.schema.json'],
  'supports_import': True,
  'supports_export': True},
 'my_format': {'class': 'MyFormatAdapter',
  'file_extensions': ['.myschema'],
  'supports_import': True,
  'supports_export': False}}

The list_adapters() output confirms your adapter is registered alongside the built-in ones, showing its supported file extensions and whether it handles import, export, or both.

The ContractAdapter Base Class

Here are the class attributes and methods you can define:

Attribute Type Purpose
format_name str Short identifier (e.g., "json_schema", "my_format")
file_extensions list[str] File extensions for auto-detection (e.g., [".schema.json"])
supports_import bool Whether import_contract() is implemented
supports_export bool Whether export_contract() is implemented
Method Required? Purpose
detect(source) Recommended Returns True if this adapter handles the given source
import_contract(source, **kwargs) If supports_import Parses source, returns ContractImport
export_contract(obj, destination, **kwargs) If supports_export Exports to the format

Building an Import Adapter

Let’s build a more realistic adapter, one that reads a simple YAML-based schema format used internally at a hypothetical company:

# company_schema.yaml
version: "2.0"
table: user_events
columns:
  - name: event_id
    type: string
    required: true
    unique: true
  - name: user_id
    type: integer
    required: true
  - name: event_type
    type: string
    values: [click, view, purchase, signup]
  - name: amount
    type: float
    min: 0

Here’s the adapter that handles this format:

import yaml
from pointblank.adapters import ContractAdapter, ContractImport, MappedConstraint, register_adapter


@register_adapter("company_schema")
class CompanySchemaAdapter(ContractAdapter):
    """Adapter for our company's internal YAML schema format."""

    format_name = "company_schema"
    file_extensions = [".company.yaml", ".company.yml"]
    supports_import = True
    supports_export = False

    # Type mapping from our format to Pointblank dtypes
    TYPE_MAP = {
        "string": "String",
        "integer": "Int64",
        "float": "Float64",
        "boolean": "Boolean",
        "date": "Date",
        "datetime": "Datetime",
    }

    @staticmethod
    def detect(source) -> bool:
        """Detect our format by looking for the 'version' + 'columns' keys."""
        if isinstance(source, dict):
            return "version" in source and "columns" in source and "table" in source
        return False

    def import_contract(self, source, **kwargs) -> ContractImport:
        """Import from our company schema format."""
        # Load from file or use dict directly
        if isinstance(source, str):
            from pathlib import Path

            with open(Path(source)) as f:
                doc = yaml.safe_load(f)
        elif isinstance(source, dict):
            doc = source
        else:
            raise TypeError(f"Expected str or dict, got {type(source).__name__}")

        columns = []
        constraints = []
        warnings = []
        total = 0

        for col_def in doc.get("columns", []):
            col_name = col_def["name"]
            col_type = col_def.get("type", "string")
            dtype = self.TYPE_MAP.get(col_type)
            columns.append((col_name, dtype))

            if col_def.get("required", False):
                total += 1
                constraints.append(
                    MappedConstraint(
                        method="col_vals_not_null",
                        kwargs={"columns": col_name},
                        source_description=f"{col_name} is required",
                    )
                )

            if col_def.get("unique", False):
                total += 1
                constraints.append(
                    MappedConstraint(
                        method="rows_distinct",
                        kwargs={"columns_subset": col_name},
                        source_description=f"{col_name} must be unique",
                    )
                )

            if "values" in col_def:
                total += 1
                constraints.append(
                    MappedConstraint(
                        method="col_vals_in_set",
                        kwargs={"columns": col_name, "set": col_def["values"]},
                        source_description=f"{col_name} allowed values: {col_def['values']}",
                    )
                )

            if "min" in col_def:
                total += 1
                constraints.append(
                    MappedConstraint(
                        method="col_vals_ge",
                        kwargs={"columns": col_name, "value": col_def["min"]},
                        source_description=f"{col_name} >= {col_def['min']}",
                    )
                )

            if "max" in col_def:
                total += 1
                constraints.append(
                    MappedConstraint(
                        method="col_vals_le",
                        kwargs={"columns": col_name, "value": col_def["max"]},
                        source_description=f"{col_name} <= {col_def['max']}",
                    )
                )

        coverage = 1.0 if total == 0 else (total - len(warnings)) / total

        return ContractImport(
            source_format="company_schema",
            source_path=source if isinstance(source, str) else None,
            source_version=doc.get("version"),
            columns=columns,
            constraints=constraints,
            metadata={"table": doc.get("table")},
            warnings=warnings,
            coverage=coverage,
        )

Now let’s use it:

import polars as pl

# Simulate a company schema document
company_schema = {
    "version": "2.0",
    "table": "user_events",
    "columns": [
        {"name": "event_id", "type": "string", "required": True, "unique": True},
        {"name": "user_id", "type": "integer", "required": True},
        {"name": "event_type", "type": "string", "values": ["click", "view", "purchase", "signup"]},
        {"name": "amount", "type": "float", "min": 0},
    ],
}

# Import using our custom adapter
result = pb.import_contract(company_schema, format="company_schema")
print(result.summary())
Contract Import Summary
  Format: company_schema
  Format version: 2.0
  Columns detected: 4
  Constraints mapped: 5
  Coverage: 100%

The summary shows four columns detected and five constraints mapped (two required fields, one unique field, one values check, and one min bound). All constraints were successfully translated, giving 100% coverage.

# Validate some data
events = pl.DataFrame(
    {
        "event_id": ["E001", "E002", "E003", "E004", "E005"],
        "user_id": [101, 102, 101, 103, 104],
        "event_type": ["click", "view", "purchase", "signup", "click"],
        "amount": [0.0, 0.0, 49.99, 0.0, 0.0],
    }
)

result.to_validate(data=events).interrogate()
Pointblank Validation
2026-06-13|17:47:01
Polars
STEP COLUMNS VALUES TBL EVAL UNITS PASS FAIL W E C EXT
#4CA64C 1
col_schema_match
col_schema_match()
SCHEMA 1 1
1.00
0
0.00
#4CA64C 2
col_vals_not_null
col_vals_not_null()
event_id 5 5
1.00
0
0.00
#4CA64C 3
rows_distinct
rows_distinct()
event_id 5 5
1.00
0
0.00
#4CA64C 4
col_vals_not_null
col_vals_not_null()
user_id 5 5
1.00
0
0.00
#4CA64C 5
col_vals_in_set
col_vals_in_set()
event_type click, view, purchase, signup 5 5
1.00
0
0.00
#4CA64C 6
col_vals_ge
col_vals_ge()
amount 0 5 5
1.00
0
0.00

Notes

Step 1 (schema_check) Schema validation passed.

Schema Comparison
TARGET EXPECTED
COLUMN DATA TYPE COLUMN DATA TYPE
1 event_id String 1 event_id String
2 user_id Int64 2 user_id Int64
3 event_type String 3 event_type String
4 amount Float64 4 amount Float64
Supplied Column Schema:
[('event_id', 'String'), ('user_id', 'Int64'), ('event_type', 'String'), ('amount', 'Float64')]
Schema Match Settings
COMPLETE
IN ORDER
COLUMN ≠ column
DTYPE ≠ dtype
float ≠ float64

The validation report shows each imported constraint as a separate step, just as if you had written the validation by hand. From the user’s perspective, there is no difference between validation steps that came from a custom adapter and those written directly in Python.

The MappedConstraint Class

Each constraint from the external format gets mapped to a MappedConstraint, which is a simple data container holding:

  • method: the Pointblank Validate method name (e.g., "col_vals_gt")
  • kwargs: the keyword arguments to pass to that method
  • source_description: optional human-readable note about what this was in the source format
# Creating constraints manually
c1 = MappedConstraint(
    method="col_vals_between",
    kwargs={"columns": "temperature", "left": -40, "right": 60},
    source_description="Temperature must be in physical range",
)
print(c1)
MappedConstraint('col_vals_between', columns='temperature', left=-40, right=60)

The source_description is stored for debugging and documentation but doesn’t affect validation. When users call .summary() or inspect the ContractImport object, these descriptions help them understand the provenance of each validation step. This is especially useful when debugging why a particular check was generated or when comparing the import output against the original schema.

Handling Unmappable Constraints

Not every constraint in every format has a clean Pointblank equivalent. When you encounter something that can’t be translated, add it to the warnings list rather than silently dropping it:

# In your import_contract() method:
if "custom_check" in col_def:
    total += 1
    warnings.append(
        f"Column '{col_name}': 'custom_check' has no Pointblank equivalent, skipped."
    )

This follows Pointblank’s design principle of best-effort translation: generate everything you can, be transparent about what was skipped, and never silently lose information. Users can then review the warnings list and decide whether to add manual validation steps for the missing constraints or whether the gap is acceptable for their use case.

Auto-Detection Tips

The detect() method enables format auto-detection. Good detection should be:

  • Fast: don’t load the entire file just to check if it’s your format
  • Specific: avoid false positives that could conflict with other adapters
  • Graceful: return False (never raise) if the source isn’t your format

The detection system iterates through all registered adapters and calls detect() on each one. Because of this, your detection logic should be as lightweight as possible. Checking for the presence of a few distinctive keys in a dict is ideal. Avoid expensive operations like parsing large files or making network requests inside detect().

@staticmethod
def detect(source) -> bool:
    if isinstance(source, dict):
        # Check for a distinctive key combination
        return "my_format_version" in source and "tables" in source

    if isinstance(source, str):
        # Check file extension first (cheapest check)
        return source.lower().endswith(".myformat.yaml")

    return False

Best Practices

  1. Map as much as possible: users expect high coverage. If a constraint is close to something Pointblank supports, map it (possibly with reduced precision) rather than skipping it.

  2. Use descriptive source_description: this helps users understand what each generated validation step corresponds to in their original schema.

  3. Set coverage accurately: track the total number of source constraints and how many were successfully mapped. This gives users confidence in the import quality.

  4. Handle both file paths and dicts: users should be able to pass either a path string or pre-loaded data. Most adapters check isinstance(source, str) for file paths and isinstance(source, dict) for pre-parsed content.

  5. Fail clearly on bad input: raise TypeError for wrong source types, FileNotFoundError for missing files, and ValueError for malformed content. Don’t return partial results silently.

  6. Keep dependencies optional: if your adapter needs a third-party library, check for it at import time and give a clear installation hint if it’s missing.

Conclusion

Custom adapters let you extend Pointblank’s import/export system to handle any schema format your organization uses. The plugin architecture is intentionally simple: subclass ContractAdapter, implement one or two methods, and register it with a decorator. From that point forward, your format participates in the same import_contract() and export_contract() workflow that the built-in adapters use.

This extensibility means that Pointblank can serve as a universal validation layer regardless of where your data contracts originate. Whether your schemas live in a proprietary YAML format, an internal data catalog API, or a custom metadata store, a short adapter class is all you need to bring them into the Pointblank ecosystem and benefit from its validation reporting, threshold system, and pipeline integration.