Skip to content

API Reference

orbital

orbital, translate scikit-learn pipelines into SQL queries

orbital is a library for translating scikit-learn pipelines into SQL queries and Ibis expressions.

It provides a way to execute machine learning models on databases without the need for a python runtime environment.

orbital.ResultsProjection

Projection of the results of the pipeline.

This class is used to select the columns to be returned from the pipeline. It can be used to select specific columns to include in the final result set.

It can also be used to skip the select step of columns from the pipeline.

You can use the omit method to skip the projection step entirely.

Source code in orbital/translate.py
class ResultsProjection:
    """Projection of the results of the pipeline.

    This class is used to select the columns to be returned
    from the pipeline. It can be used to select specific
    columns to include in the final result set.

    It can also be used to skip the select step of columns
    from the pipeline.

    You can use the `omit` method to skip the projection
    step entirely.
    """

    def __init__(self, select: typing.Optional[list[str]] = None) -> None:
        """
        :param select: A list of additional columns to be selected from the pipeline.
        """
        self._select = select or []
        self._omit = False

    @classmethod
    def omit(cls) -> "ResultsProjection":
        """Create a projection that skips projection phase entirely."""
        projection = cls()
        projection._omit = True
        return projection

    def _expand(self, results: typing.Iterable[str]) -> typing.Optional[list[str]]:
        if self._omit:
            return None

        def _emit_projection() -> typing.Generator[str, None, None]:
            yield from results
            for item in self._select:
                yield item

        return list(_emit_projection())

__init__

__init__(select: Optional[list[str]] = None) -> None

Parameters:

Name Type Description Default
select Optional[list[str]]

A list of additional columns to be selected from the pipeline.

None
Source code in orbital/translate.py
def __init__(self, select: typing.Optional[list[str]] = None) -> None:
    """
    :param select: A list of additional columns to be selected from the pipeline.
    """
    self._select = select or []
    self._omit = False

omit classmethod

Create a projection that skips projection phase entirely.

Source code in orbital/translate.py
@classmethod
def omit(cls) -> "ResultsProjection":
    """Create a projection that skips projection phase entirely."""
    projection = cls()
    projection._omit = True
    return projection

orbital.parse_pipeline

parse_pipeline(
    pipeline: Pipeline, features: FeaturesTypes
) -> ParsedPipeline

Parse a scikit-learn pipeline into an intermediate representation.

Returns a orbital.ast.ParsedPipeline object that can be converted to SQL queries.

Parameters:

Name Type Description Default
pipeline Pipeline

The fitted scikit-learn pipeline to parse

required
features FeaturesTypes

Mapping of column names to their orbital.types.ColumnType objects from the orbital.types module features should be a mapping of column names that are the inputs of the pipeline to their types from the orbital.types module: { "column_name": types.DoubleColumnType(), "another_column": types.Int64ColumnType() }

required
Source code in orbital/ast.py
def parse_pipeline(
    pipeline: sklearn.pipeline.Pipeline, features: FeaturesTypes
) -> ParsedPipeline:
    """Parse a scikit-learn pipeline into an intermediate representation.

    Returns a [orbital.ast.ParsedPipeline][] object that can be converted to SQL queries.

    :param pipeline: The fitted scikit-learn pipeline to parse
    :param features: Mapping of column names to their [orbital.types.ColumnType][] objects from the [orbital.types][] module

    ``features`` should be a mapping of column names that are the inputs of the
    pipeline to their types from the [orbital.types][] module:

    ```
        {
            "column_name": types.DoubleColumnType(),
            "another_column": types.Int64ColumnType()
        }
    ```
    """
    non_passthrough_features = {
        fname: ftype for fname, ftype in features.items() if not ftype.is_passthrough
    }

    if not non_passthrough_features:
        raise ValueError(
            "All provided features are passthrough. "
            "The pipeline would not do anything useful."
        )

    # Check if pipeline starts with a model (which expects concatenated input)
    concatenated_inputs = EnsureConcatenatedInputs(non_passthrough_features)
    pipeline_requires_input_vector = concatenated_inputs.pipeline_requires_input_vector(
        pipeline
    )

    if pipeline_requires_input_vector:
        # Models expect a single feature vector "input", so we need to adapt the user
        # features to a single concatenated input tensor.
        # Later, we'll inject a concat operation to ensure the SQL query does work
        # with individual columns.
        initial_types = concatenated_inputs.concatenate_inputs()
    else:
        initial_types = [
            (fname, ftype._to_onnxtype())
            for fname, ftype in non_passthrough_features.items()
        ]

    onnx_model = cast(
        _onnx.ModelProto,
        _skl2o.to_onnx(pipeline, initial_types=initial_types),  # type: ignore[arg-type]
    )

    if pipeline_requires_input_vector:
        # Inject concat operation to create the "input" tensor when necessary.
        onnx_model = concatenated_inputs.inject_concat_step(onnx_model)

    return ParsedPipeline._from_onnx_model(onnx_model, features)

orbital.export_sql

export_sql(
    table_name: str,
    pipeline: ParsedPipeline,
    dialect: str = "duckdb",
    projection: ResultsProjection = ResultsProjection(),
    optimize: bool = True,
) -> str

Export SQL for a given pipeline.

Given a orbital pipeline, this function generates a SQL query that can be used to execute the pipeline on a database. The generated SQL is compatible with the specified SQL dialect.

dialect can be any of the SQL dialects supported by sqlglot, see sqlglot.dialects.DIALECTS for a complete list of supported dialects.

If optimize is set to True, the SQL query will be optimized using sqlglot's optimizer. This can improve performance, but may fail if the query is complex.

Source code in orbital/sql.py
def export_sql(
    table_name: str,
    pipeline: ParsedPipeline,
    dialect: str = "duckdb",
    projection: ResultsProjection = ResultsProjection(),
    optimize: bool = True,
) -> str:
    """Export SQL for a given pipeline.

    Given a orbital pipeline, this function generates a SQL query that can be
    used to execute the pipeline on a database. The generated SQL is compatible
    with the specified SQL dialect.

    `dialect` can be any of the SQL dialects supported by sqlglot,
    see `sqlglot.dialects.DIALECTS` for a complete list of supported dialects.

    If `optimize` is set to True, the SQL query will be optimized using
    sqlglot's optimizer. This can improve performance, but may fail if
    the query is complex.
    """
    unbound_table = ibis.table(
        schema={
            fname: ftype._to_ibistype() for fname, ftype in pipeline.features.items()
        },
        name=table_name,
    )

    if projection._omit:
        raise ValueError(
            "Projection is empty. Please provide a projection to export SQL."
        )

    ibis_expr = translate(unbound_table, pipeline, projection=projection)
    sqlglot_expr = getattr(sc, dialect).compiler.to_sqlglot(ibis_expr)

    if optimize:
        c = Catalog()
        catalog = sqlglot.schema.MappingSchema(
            {unbound_table.get_name(): c.to_sqlglot_schema(unbound_table.schema())},
            normalize=False,
        )
        sqlglot_expr = sqlglot.optimizer.optimize(
            sqlglot_expr, schema=catalog, rules=OPTIMIZER_RULES
        )

    return sqlglot_expr.sql(dialect=dialect)

orbital.ast

Translate scikit-learn models to an intermediate represetation.

The IR is what will be processed to generate the SQL queries.

orbital.ast.ParsedPipeline

An intermediate representation of a scikit-learn pipeline.

This object can be converted to a SQL query and run on a database. It can also be saved and loaded back in binary format for the sake of model distribution. Even though distributing the SQL query is usually more convenient.

Source code in orbital/ast.py
class ParsedPipeline:
    """An intermediate representation of a scikit-learn pipeline.

    This object can be converted to a SQL query and run on a database.
    It can also be saved and loaded back in binary format for the sake
    of model distribution. Even though distributing the SQL query
    is usually more convenient.
    """

    _model: _onnx.ModelProto  # type: ignore[assignment]
    features: FeaturesTypes  # type: ignore[assignment]

    def __init__(self) -> None:
        """[orbital.ast.ParsedPipeline][] objects can only be created by the [orbital.ast.parse_pipeline][] function."""

        raise NotImplementedError(
            "parse_pipeline must be used to create a ParsedPipeline object."
        )

    @classmethod
    def _from_onnx_model(
        cls, model: _onnx.ModelProto, features: FeaturesTypes
    ) -> "ParsedPipeline":
        """Create a [orbital.ast.ParsedPipeline][] from an ONNX model.

        This is considered an internal implementation detail
        as ONNX should never be exposed to the user.

        Returns a new [orbital.ast.ParsedPipeline][] instance.

        :param model: The ONNX model proto to wrap
        :param features: Dictionary mapping feature names to their [orbital.types.ColumnType][] objects
        """
        self = super().__new__(cls)
        self._model = model
        self.features = self._validate_features(features)
        return self

    @classmethod
    def _validate_features(cls, features: FeaturesTypes) -> FeaturesTypes:
        """Validate the features of the pipeline.

        This checks that the features provided are compatible
        with what a SQL query can handle.

        Returns the validated features dictionary.

        :param features: Dictionary mapping feature names to their [orbital.types.ColumnType][] objects
        """
        for name in features:
            if "." in name:
                raise ValueError(
                    f"Feature names cannot contain '.' characters: {name}, replace with '_'"
                )

        for ftype in features.values():
            if not isinstance(ftype, ColumnType):
                raise TypeError(f"Feature types must be ColumnType objects: {ftype}")

        return features

    def dump(self, filename: str) -> None:
        """Dump the parsed pipeline to a file.

        :param filename: Path to the file where the pipeline will be saved
        """
        # While the ONNX model is in protobuf format, and thus
        # it would make sense to use protobuf to serialize the
        # headers too. Using pickle avoids the need to define
        # a new protobuf schema for the headers and compile .proto files.
        header = {"version": 1, "features": self.features}
        header_data = pickle.dumps(header)
        header_len = len(header_data).to_bytes(4, "big")
        with open(filename, "wb") as f:
            f.write(header_len)
            f.write(header_data)
            f.write(self._model.SerializeToString())

    @classmethod
    def load(cls, filename: str) -> "ParsedPipeline":
        """Load a parsed pipeline from a file.

        Returns a [orbital.ast.ParsedPipeline][] object loaded from the specified file.

        :param filename: Path to the file containing the saved pipeline
        """
        with open(filename, "rb") as f:
            header_len = int.from_bytes(f.read(4), "big")
            header_data = f.read(header_len)
            header = pickle.loads(header_data)
            if header["version"] != 1:
                # Currently there is only version 1
                raise UnsupportedFormatVersion("Unsupported format version.")
            model = _onnx.load_model(f)
        return cls._from_onnx_model(model, header["features"])

    def __str__(self) -> str:
        """Generate a string representation of the pipeline."""
        return str(repr_pipeline.ParsedPipelineStr(self))
__init__
__init__() -> None

orbital.ast.ParsedPipeline objects can only be created by the orbital.ast.parse_pipeline function.

Source code in orbital/ast.py
def __init__(self) -> None:
    """[orbital.ast.ParsedPipeline][] objects can only be created by the [orbital.ast.parse_pipeline][] function."""

    raise NotImplementedError(
        "parse_pipeline must be used to create a ParsedPipeline object."
    )
dump
dump(filename: str) -> None

Dump the parsed pipeline to a file.

Parameters:

Name Type Description Default
filename str

Path to the file where the pipeline will be saved

required
Source code in orbital/ast.py
def dump(self, filename: str) -> None:
    """Dump the parsed pipeline to a file.

    :param filename: Path to the file where the pipeline will be saved
    """
    # While the ONNX model is in protobuf format, and thus
    # it would make sense to use protobuf to serialize the
    # headers too. Using pickle avoids the need to define
    # a new protobuf schema for the headers and compile .proto files.
    header = {"version": 1, "features": self.features}
    header_data = pickle.dumps(header)
    header_len = len(header_data).to_bytes(4, "big")
    with open(filename, "wb") as f:
        f.write(header_len)
        f.write(header_data)
        f.write(self._model.SerializeToString())
load classmethod
load(filename: str) -> ParsedPipeline

Load a parsed pipeline from a file.

Returns a orbital.ast.ParsedPipeline object loaded from the specified file.

Parameters:

Name Type Description Default
filename str

Path to the file containing the saved pipeline

required
Source code in orbital/ast.py
@classmethod
def load(cls, filename: str) -> "ParsedPipeline":
    """Load a parsed pipeline from a file.

    Returns a [orbital.ast.ParsedPipeline][] object loaded from the specified file.

    :param filename: Path to the file containing the saved pipeline
    """
    with open(filename, "rb") as f:
        header_len = int.from_bytes(f.read(4), "big")
        header_data = f.read(header_len)
        header = pickle.loads(header_data)
        if header["version"] != 1:
            # Currently there is only version 1
            raise UnsupportedFormatVersion("Unsupported format version.")
        model = _onnx.load_model(f)
    return cls._from_onnx_model(model, header["features"])
__str__
__str__() -> str

Generate a string representation of the pipeline.

Source code in orbital/ast.py
def __str__(self) -> str:
    """Generate a string representation of the pipeline."""
    return str(repr_pipeline.ParsedPipelineStr(self))

orbital.ast.UnsupportedFormatVersion

Bases: Exception

Format of loaded pipeline is not supported.

This usually happens when trying to load a newer format version with an older version of the framework.

Source code in orbital/ast.py
class UnsupportedFormatVersion(Exception):
    """Format of loaded pipeline is not supported.

    This usually happens when trying to load a newer
    format version with an older version of the framework.
    """

    pass

orbital.types

Data types of the features processed by models.

orbital.types.FeaturesTypes module-attribute

FeaturesTypes = Dict[str, ColumnType]

Mapping of feature names to their types.

orbital.types.ColumnType

Bases: ABC

A base class representing the type of a column of data.

Source code in orbital/types.py
class ColumnType(abc.ABC):
    """A base class representing the type of a column of data."""

    def __init__(self, passthrough: bool = False) -> None:
        """
        :param passthrough: If True, the column is ignored by the pipeline and is only available to SQL generator.
                            You will still need to project those columns for them to be included in the SQL query.
        """
        self.is_passthrough = passthrough

    @abc.abstractmethod
    def _to_onnxtype(self) -> _sl2o_types.DataType:  # pragma: no cover
        """Convert the ColumnType to an onnx type.

        This should be implemented by all specific types.
        """
        pass

    @abc.abstractmethod
    def _to_ibistype(self) -> ibis_types.DataType:
        """Convert the ColumnType to an ibis type.

        This should be implemented by all specific types.
        """
        pass

    @staticmethod
    def _from_onnxtype(onnxtype: _sl2o_types.DataType) -> "ColumnType":
        """Given an onnx type, guess the right ColumnType."""
        if onnxtype.shape != [None, 1]:
            raise ValueError("Only columnar data is supported.")

        for scls in ColumnType.__subclasses__():
            supported_type = inspect.signature(scls._to_onnxtype).return_annotation
            if supported_type == onnxtype.__class__:
                return scls()  # type: ignore[abstract]
        else:
            raise TypeError(f"Unsupported data type {onnxtype.__class__.__name__}")

    def __eq__(self, other: object) -> bool:
        return self.__class__ == other.__class__

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}()"
__init__
__init__(passthrough: bool = False) -> None

Parameters:

Name Type Description Default
passthrough bool

If True, the column is ignored by the pipeline and is only available to SQL generator. You will still need to project those columns for them to be included in the SQL query.

False
Source code in orbital/types.py
def __init__(self, passthrough: bool = False) -> None:
    """
    :param passthrough: If True, the column is ignored by the pipeline and is only available to SQL generator.
                        You will still need to project those columns for them to be included in the SQL query.
    """
    self.is_passthrough = passthrough

orbital.types.FloatColumnType

Bases: ColumnType

Mark a column as containing float values

Source code in orbital/types.py
class FloatColumnType(ColumnType):
    """Mark a column as containing float values"""

    def _to_onnxtype(self) -> _sl2o_types.FloatTensorType:
        return _sl2o_types.FloatTensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Float32:
        return ibis_types.Float32()

orbital.types.Float16ColumnType

Bases: ColumnType

Mark a column as containing 16bit float values

Source code in orbital/types.py
class Float16ColumnType(ColumnType):
    """Mark a column as containing 16bit float values"""

    def _to_onnxtype(self) -> _sl2o_types.Float16TensorType:
        return _sl2o_types.Float16TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Float16:
        return ibis_types.Float16()

orbital.types.DoubleColumnType

Bases: ColumnType

Mark a column as containing double values

Source code in orbital/types.py
class DoubleColumnType(ColumnType):
    """Mark a column as containing double values"""

    def _to_onnxtype(self) -> _sl2o_types.DoubleTensorType:
        return _sl2o_types.DoubleTensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Float64:
        return ibis_types.Float64()

orbital.types.StringColumnType

Bases: ColumnType

Mark a column as containing string values

Source code in orbital/types.py
class StringColumnType(ColumnType):
    """Mark a column as containing string values"""

    def _to_onnxtype(self) -> _sl2o_types.StringTensorType:
        return _sl2o_types.StringTensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.String:
        return ibis_types.String()

orbital.types.Int64ColumnType

Bases: ColumnType

Mark a column as containing signed 64bit integer values

Source code in orbital/types.py
class Int64ColumnType(ColumnType):
    """Mark a column as containing signed 64bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.Int64TensorType:
        return _sl2o_types.Int64TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Int64:
        return ibis_types.Int64()

orbital.types.UInt64ColumnType

Bases: ColumnType

Mark a column as containing unsigned 64bit integer values

Source code in orbital/types.py
class UInt64ColumnType(ColumnType):
    """Mark a column as containing unsigned 64bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.UInt64TensorType:
        return _sl2o_types.UInt64TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.UInt64:
        return ibis_types.UInt64()

orbital.types.Int32ColumnType

Bases: ColumnType

Mark a column as containing signed 32bit integer values

Source code in orbital/types.py
class Int32ColumnType(ColumnType):
    """Mark a column as containing signed 32bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.Int32TensorType:
        return _sl2o_types.Int32TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Int32:
        return ibis_types.Int32()

orbital.types.UInt32ColumnType

Bases: ColumnType

Mark a column as containing unsigned 32bit integer values

Source code in orbital/types.py
class UInt32ColumnType(ColumnType):
    """Mark a column as containing unsigned 32bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.UInt32TensorType:
        return _sl2o_types.UInt32TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.UInt32:
        return ibis_types.UInt32()

orbital.types.Int16ColumnType

Bases: ColumnType

Mark a column as containing signed 16bit integer values

Source code in orbital/types.py
class Int16ColumnType(ColumnType):
    """Mark a column as containing signed 16bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.Int16TensorType:
        return _sl2o_types.Int16TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Int16:
        return ibis_types.Int16()

orbital.types.UInt16ColumnType

Bases: ColumnType

Mark a column as containing unsigned 16bit integer values

Source code in orbital/types.py
class UInt16ColumnType(ColumnType):
    """Mark a column as containing unsigned 16bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.UInt16TensorType:
        return _sl2o_types.UInt16TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.UInt16:
        return ibis_types.UInt16()

orbital.types.Int8ColumnType

Bases: ColumnType

Mark a column as containing signed 8bit integer values

Source code in orbital/types.py
class Int8ColumnType(ColumnType):
    """Mark a column as containing signed 8bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.Int8TensorType:
        return _sl2o_types.Int8TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Int8:
        return ibis_types.Int8()

orbital.types.UInt8ColumnType

Bases: ColumnType

Mark a column as containing unsigned 8bit integer values

Source code in orbital/types.py
class UInt8ColumnType(ColumnType):
    """Mark a column as containing unsigned 8bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.UInt8TensorType:
        return _sl2o_types.UInt8TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.UInt8:
        return ibis_types.UInt8()

orbital.types.BooleanColumnType

Bases: ColumnType

Mark a column as containing boolean values

Source code in orbital/types.py
class BooleanColumnType(ColumnType):
    """Mark a column as containing boolean values"""

    def _to_onnxtype(self) -> _sl2o_types.BooleanTensorType:
        return _sl2o_types.BooleanTensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Boolean:
        return ibis_types.Boolean()

orbital.types.guess_datatypes

guess_datatypes(dataframe: Any) -> FeaturesTypes

Given a DataFrame, try to guess the types of each feature in it.

This procudes a orbital.types.FeaturesTypes dictionary that can be used by parse_pipeline to generate the SQL queries from the sklearn pipeline.

In most cases this shouldn't be necessary as the user should know on what data the pipeline was trained on, but it can be convenient when experimenting or writing tests.

Source code in orbital/types.py
def guess_datatypes(dataframe: typing.Any) -> FeaturesTypes:
    """Given a DataFrame, try to guess the types of each feature in it.

    This procudes a [orbital.types.FeaturesTypes][] dictionary that can be used by
    parse_pipeline to generate the SQL queries from the sklearn pipeline.

    In most cases this shouldn't be necessary as the user should know
    on what data the pipeline was trained on, but it can be convenient
    when experimenting or writing tests.
    """
    if hasattr(dataframe, "to_pandas"):
        # Easiest way to ensure compatibility with Polars, Pandas and PyArrow.
        dataframe = dataframe.to_pandas()

    try:
        dtypes = _sl2o_types.guess_data_type(dataframe)
    except (TypeError, NotImplementedError) as exc:
        log.debug(f"Unable to guess types from {repr(dataframe)}, exception: {exc}")
        raise ValueError("Unable to guess types of dataframe") from None

    typesmap: FeaturesTypes = {}
    for name, dtype in dtypes:
        try:
            typesmap[name] = ColumnType._from_onnxtype(dtype)
        except (ValueError, TypeError, AttributeError) as exc:
            log.debug(
                f"Unable to convert to column type from {name}:{repr(dtype)}, exception: {exc}"
            )
            raise ValueError(f"Unsupported datatype for column {name}") from None
    return typesmap