OpenTelemetry Integration

OpenTelemetry (OTel) is the industry-standard framework for vendor-neutral observability. Pointblank can export validation results (pass/fail counts, durations, threshold breaches) as OTel signals so they appear alongside your application metrics in Grafana, Datadog, New Relic, or any OTel-compatible backend.

Instead of building separate integrations for each monitoring tool, one OTel export covers all of them. And when Pointblank runs inside an Airflow, Prefect, or Dagster task that already uses OTel instrumentation, validation spans appear inside the pipeline’s distributed trace automatically.

Installation

The OTel integration is an optional dependency. Install it with:

pip install pointblank[otel]

This adds opentelemetry-api and opentelemetry-sdk. For sending data to a collector you’ll also need an exporter package, most commonly:

pip install opentelemetry-exporter-otlp-proto-grpc
# or
pip install opentelemetry-exporter-otlp-proto-http

With these packages installed, Pointblank can push signals to any OTel-compatible backend. The following section describes exactly what gets sent.

What Gets Exported

Pointblank maps its validation outputs onto three OTel signal types:

Signal What’s Exported Use Case
Metrics Pass/fail counters, pass rate gauge, step duration histogram, threshold breach counts Dashboards, alerting, SLA tracking
Traces A root span per interrogate() call with child spans per validation step Pipeline traceability, performance analysis
Logs Structured log records for threshold breaches Incident investigation, audit trails

Each signal type captures a different dimension of the validation run. Metrics provide aggregate counts and rates suitable for dashboards, traces capture timing and parent-child relationships for debugging, and logs produce structured records for incident review.

Metric Instruments

The metrics signal is the most detailed. Twelve instruments cover step-level and aggregate statistics, all sharing a configurable name prefix (default: pb.validation).

Metric Name Kind Description
{prefix}.steps.total Counter Total validation steps executed
{prefix}.steps.passed Counter Steps where all test units passed
{prefix}.steps.failed Counter Steps with at least one failure
{prefix}.test_units.total Counter Total test units evaluated
{prefix}.test_units.passed Counter Test units that passed
{prefix}.test_units.failed Counter Test units that failed
{prefix}.pass_rate Gauge Overall pass fraction (0–1)
{prefix}.step.duration Histogram Per-step processing duration (seconds)
{prefix}.duration Gauge Total interrogation wall-clock time (seconds)
{prefix}.threshold.warning Counter Steps exceeding warning threshold
{prefix}.threshold.error Counter Steps exceeding error threshold
{prefix}.threshold.critical Counter Steps exceeding critical threshold

Every metric carries attributes derived from the Validate object (pb.tbl_name, pb.label, pb.owner, pb.version) plus any extra_attributes you provide. Per-step metrics also include pb.step.i, pb.step.assertion_type, and pb.step.column.

Together these instruments give you enough information to build dashboards, set alerts, and track data quality SLAs across every table Pointblank validates.

Two Usage Patterns

Pointblank offers two ways to export OTel signals, matching how different teams prefer to work.

Pattern A: Explicit Export

Import OTelExporter, run your validation, then call export(). This gives you full control over when and how signals are emitted.

from pointblank.integrations.otel import OTelExporter

validation = (
    pb.Validate(data=df, tbl_name="orders")
    .col_vals_not_null(columns="order_id")
    .col_vals_gt(columns="amount", value=0)
    .interrogate()
)

otel = OTelExporter(enable_metrics=True, enable_tracing=True)
otel.export(validation)

Because the export happens after interrogate(), you can inspect the validation results first and decide whether to export at all. This pattern works well for ad-hoc analysis or when you want to conditionally emit signals based on the outcome.

Pattern B: FinalActions (Automatic)

Use emit_otel() inside FinalActions so metrics are emitted automatically at the end of every interrogate() call (no extra code needed after the validation).

import pointblank as pb

validation = (
    pb.Validate(
        data=df,
        tbl_name="orders",
        final_actions=pb.FinalActions(
            pb.emit_otel(enable_metrics=True),
        ),
    )
    .col_vals_not_null(columns="order_id")
    .col_vals_gt(columns="amount", value=0)
    .interrogate()  # metrics emitted here automatically
)

You can combine emit_otel() with other final actions like send_slack_notification():

import os

validation = (
    pb.Validate(
        data=df,
        tbl_name="orders",
        final_actions=pb.FinalActions(
            pb.emit_otel(enable_metrics=True),
            pb.send_slack_notification(webhook_url=os.environ["SLACK_WEBHOOK"]),
        ),
    )
    .col_vals_not_null(columns="order_id")
    .interrogate()
)

Both patterns produce identical OTel output. Choose explicit export when you want manual control, and FinalActions when you want every validation run to emit signals automatically.

Working Example with Console Output

Let’s walk through a complete example using in-memory providers so you can see exactly what gets emitted (no external collector needed).

Setup

Before exporting any signals we need OTel providers to receive them. The SDK’s in-memory providers are perfect for exploration: they capture every metric, span, and log record in Python lists so we can print them without running a collector.

import polars as pl
import pointblank as pb
from pointblank.integrations.otel import OTelExporter

from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import InMemoryLogRecordExporter, SimpleLogRecordProcessor

# In-memory providers capture signals without a collector
metric_reader = InMemoryMetricReader()
meter_provider = MeterProvider(metric_readers=[metric_reader])

span_exporter = InMemorySpanExporter()
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter))

log_exporter = InMemoryLogRecordExporter()
logger_provider = LoggerProvider()
logger_provider.add_log_record_processor(SimpleLogRecordProcessor(log_exporter))

Each provider is paired with an in-memory reader or exporter that we’ll query later to see what was captured.

Run Validation and Export

With the providers ready, we’ll create a small DataFrame with some intentional quality issues (a negative amount, a zero amount, and a missing customer name), validate it, and then export the results.

df = pl.DataFrame(
    {
        "order_id": [101, 102, 103, 104, 105],
        "amount": [29.99, 150.00, -5.00, 75.50, 0.00],
        "customer": ["Alice", "Bob", None, "Dana", "Eve"],
    }
)

validation = (
    pb.Validate(
        data=df,
        tbl_name="orders",
        label="Daily orders check",
        owner="data-platform-team",
        thresholds=(0.05, 0.20, 0.40),
    )
    .col_vals_not_null(columns="customer")
    .col_vals_gt(columns="amount", value=0)
    .interrogate()
)

validation

The report shows two steps with failures. Now we create an OTelExporter with all three signal types enabled and export the validation results.

otel = OTelExporter(
    meter_provider=meter_provider,
    tracer_provider=tracer_provider,
    logger_provider=logger_provider,
    enable_metrics=True,
    enable_tracing=True,
    enable_logging=True,
    extra_attributes={"env": "production", "pipeline": "daily-orders"},
)
otel.export(validation)

The export() call is instantaneous as the in-memory providers simply store the data. In production, this is where the SDK would batch and send signals to your collector.

Inspect the Emitted Metrics

The metric reader holds all counters, gauges, and histograms that were recorded. We can iterate through them to see every instrument and its current value.

metrics_data = metric_reader.get_metrics_data()

for resource_metrics in metrics_data.resource_metrics:
    for scope_metrics in resource_metrics.scope_metrics:
        for metric in sorted(scope_metrics.metrics, key=lambda m: m.name):
            for dp in metric.data.data_points:
                val = getattr(dp, "value", None) or getattr(dp, "count", "N/A")
                print(f"{metric.name}: {val}")

You’ll see counters like pb.validation.steps.total and pb.validation.test_units.failed, the pb.validation.pass_rate gauge, and a histogram of step durations. These are the same instruments that would appear in Prometheus, Datadog, or any OTel-compatible metrics backend.

Inspect the Trace Spans

Each interrogate() call produces a root span with one child per validation step:

spans = span_exporter.get_finished_spans()

for span in spans:
    parent = "(root)" if span.parent is None else "(child)"
    print(f"Span: {span.name} {parent}")
    for k, v in sorted(span.attributes.items()):
        print(f"  {k} = {v}")
    for event in span.events:
        print(f"  EVENT: {event.name} -> {dict(event.attributes)}")
    print()

The root span (pb.validate) carries table-level attributes and timing, while each child span (pb.validate.step) carries per-step details. Steps that breach a threshold also include an event recording the severity level.

Inspect the Log Records

Log records are emitted for each validation step that exceeds a threshold. The severity of the log record corresponds to the highest threshold level breached by that step.

logs = log_exporter.get_finished_logs()

for log in logs:
    rec = log.log_record
    print(f"[{rec.severity_text}] {rec.body}")
    print(f"  attributes: {dict(rec.attributes)}")
    print()

Each log record includes the step index, assertion type, column name, and the threshold level that was breached. In a production setup these records would flow to your log aggregator (e.g., Loki, Elasticsearch, or CloudWatch) where you can search and alert on them.

That completes the end-to-end walkthrough. The same three signal types (metrics, traces, logs) work identically whether you use in-memory providers for development or OTLP exporters in production.

Configuration Reference

Both OTelExporter and the emit_otel() factory accept the same set of parameters. This section documents every option and how the log-level filter works.

OTelExporter Parameters

The table below lists all constructor parameters for OTelExporter.

Parameter Default Description
meter_name "pointblank" Name for the OTel Meter
meter_version Package version Version string for the Meter
enable_metrics True Emit metric counters, gauges, and histograms
enable_tracing False Emit trace spans
enable_logging False Emit log records for threshold breaches
meter_provider Global default Custom MeterProvider instance
tracer_provider Global default Custom TracerProvider instance
logger_provider None Custom LoggerProvider (required for logging)
metric_prefix "pb.validation" Prefix for all metric instrument names
log_level "warning" Minimum threshold severity for log emission
extra_attributes None Additional key-value pairs on all signals

The emit_otel() factory function accepts the same parameters and returns a callable suitable for use in FinalActions.

Log Level Filtering

Not every threshold breach warrants a log record. The log_level= parameter controls which breaches produce log records, letting you filter out lower-severity noise.

log_level value warning breaches error breaches critical breaches
"warning"
"error"
"critical"

For most production deployments, "warning" (the default) is a good starting point. Switch to "error" or "critical" if your log volume is too high or you only want to be notified about serious issues.

Bringing Your Own Provider

The in-memory providers used in the walkthrough above are great during development, but in production you’ll point the SDK at an actual collector. Here’s how to configure a MeterProvider with the OTLP gRPC exporter.

from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter

reader = PeriodicExportingMetricReader(
    OTLPMetricExporter(endpoint="http://otel-collector:4317")
)
provider = MeterProvider(metric_readers=[reader])

otel = OTelExporter(meter_provider=provider)
otel.export(validation)

If you set the standard OTEL_EXPORTER_OTLP_ENDPOINT environment variable and use the global provider (the default when meter_provider=None), no provider configuration is needed at all.

The same approach works for TracerProvider and LoggerProvider. Pass your custom providers to OTelExporter and they’ll be used instead of the global defaults.

Pipeline Integration

Pointblank’s OTel integration becomes especially powerful inside orchestrated data pipelines. When Pointblank runs inside an Airflow, Prefect, or Dagster task that has OTel auto-instrumentation, trace context propagation happens automatically. The pb.validate span attaches as a child of the current task span, giving you a unified view of pipeline and validation performance.

Span: airflow.task (root)
└── Span: pb.validate
    ├── Span: pb.validate.step  (`col_vals_not_null` on 'order_id')
    ├── Span: pb.validate.step  (`col_vals_gt` on 'amount')
    └── Span: pb.validate.step  (`col_vals_in_set` on 'status')

This means validation timing and failures appear in your existing pipeline traces without any additional configuration. If a validation step is slow or failing, you can trace it back to the exact pipeline run and task that triggered it.

Example PromQL Queries

Once metrics are flowing into a Prometheus-compatible backend, you can query them with PromQL. Below are a few starter queries that cover common data-quality monitoring scenarios.

# Pass rate for a specific table
pb_validation_pass_rate{pb_tbl_name="orders"}

# Alert on any critical threshold breach in the last hour
increase(pb_validation_threshold_critical_total[1h]) > 0

# Failed test units by table (24h window)
sum by (pb_tbl_name) (increase(pb_validation_test_units_failed_total[24h]))

# p95 validation step duration
histogram_quantile(0.95, rate(pb_validation_step_duration_bucket[5m]))

These queries use Prometheus naming conventions (dots replaced with underscores, _total suffix on counters). Adjust the metric names if you’ve changed the metric_prefix= parameter.

With metrics, traces, and logs all flowing through OpenTelemetry, you have a complete observability picture for your data validation workflows. Whether you’re running ad-hoc checks in a notebook or validating tables in a production pipeline, the same instrumentation gives you dashboards, alerts, and audit trails with no additional tooling.