Skip to content

API Reference

orbitalml

OrbitalML, translate scikit-learn pipelines into SQL queries

OrbitalML is a library for translating scikit-learn pipelines into SQL queries and Ibis expressions.

It provides a way to execute machine learning models on databases without the need for a python runtime environment.

orbitalml.ResultsProjection

Projection of the results of the pipeline.

This class is used to select the columns to be returned from the pipeline. It can be used to select specific columns to include in the final result set.

It can also be used to skip the select step of columns from the pipeline.

You can use the omit method to skip the projection step entirely.

Source code in orbitalml/translate.py
class ResultsProjection:
    """Projection of the results of the pipeline.

    This class is used to select the columns to be returned
    from the pipeline. It can be used to select specific
    columns to include in the final result set.

    It can also be used to skip the select step of columns
    from the pipeline.

    You can use the `omit` method to skip the projection
    step entirely.
    """

    def __init__(self, select: typing.Optional[list[str]] = None) -> None:
        """
        :param select: A list of additional columns to be selected from the pipeline.
        """
        self._select = select or []
        self._omit = False

    @classmethod
    def omit(cls) -> "ResultsProjection":
        """Create a projection that skips projection phase entirely."""
        projection = cls()
        projection._omit = True
        return projection

    def _expand(self, results: typing.Iterable[str]) -> typing.Optional[list[str]]:
        if self._omit:
            return None

        def _emit_projection() -> typing.Generator[str, None, None]:
            yield from results
            for item in self._select:
                yield item

        return list(_emit_projection())

__init__

__init__(select: Optional[list[str]] = None) -> None

Parameters:

Name Type Description Default
select Optional[list[str]]

A list of additional columns to be selected from the pipeline.

None
Source code in orbitalml/translate.py
def __init__(self, select: typing.Optional[list[str]] = None) -> None:
    """
    :param select: A list of additional columns to be selected from the pipeline.
    """
    self._select = select or []
    self._omit = False

omit classmethod

Create a projection that skips projection phase entirely.

Source code in orbitalml/translate.py
@classmethod
def omit(cls) -> "ResultsProjection":
    """Create a projection that skips projection phase entirely."""
    projection = cls()
    projection._omit = True
    return projection

orbitalml.parse_pipeline

parse_pipeline(
    pipeline: Pipeline, features: FeaturesTypes
) -> ParsedPipeline

Parse a scikit-learn pipeline into an intermediate representation.

features should be a mapping of column names that are the inputs of the pipeline to their types from the :module:.types module::

{
    "column_name": types.DoubleColumnType(),
    "another_column": types.Int64ColumnType()
}
Source code in orbitalml/ast.py
def parse_pipeline(
    pipeline: sklearn.pipeline.Pipeline, features: FeaturesTypes
) -> ParsedPipeline:
    """Parse a scikit-learn pipeline into an intermediate representation.

    ``features`` should be a mapping of column names that are the inputs of the
    pipeline to their types from the :module:`.types` module::

        {
            "column_name": types.DoubleColumnType(),
            "another_column": types.Int64ColumnType()
        }

    """
    onnx_model = _skl2o.to_onnx(
        pipeline,
        initial_types=[
            (fname, ftype._to_onnxtype()) for fname, ftype in features.items()
        ],
    )
    return ParsedPipeline._from_onnx_model(onnx_model, features)

orbitalml.export_sql

export_sql(
    table_name: str,
    pipeline: ParsedPipeline,
    dialect: str = "duckdb",
    projection: ResultsProjection = ResultsProjection(),
    optimize: bool = True,
) -> str

Export SQL for a given pipeline.

Given a orbitalml pipeline, this function generates a SQL query that can be used to execute the pipeline on a database. The generated SQL is compatible with the specified SQL dialect.

dialect can be any of the SQL dialects supported by sqlglot, see :class:sqlglot.dialects.DIALECTS for a complete list of supported dialects.

If optimize is set to True, the SQL query will be optimized using sqlglot's optimizer. This can improve performance, but may fail if the query is complex.

Source code in orbitalml/sql.py
def export_sql(
    table_name: str,
    pipeline: ParsedPipeline,
    dialect: str = "duckdb",
    projection: ResultsProjection = ResultsProjection(),
    optimize: bool = True,
) -> str:
    """Export SQL for a given pipeline.

    Given a orbitalml pipeline, this function generates a SQL query that can be
    used to execute the pipeline on a database. The generated SQL is compatible
    with the specified SQL dialect.

    `dialect` can be any of the SQL dialects supported by sqlglot,
    see :class:`sqlglot.dialects.DIALECTS` for a complete list of supported dialects.

    If `optimize` is set to True, the SQL query will be optimized using
    sqlglot's optimizer. This can improve performance, but may fail if
    the query is complex.
    """
    unbound_table = ibis.table(
        schema={
            fname: ftype._to_ibistype() for fname, ftype in pipeline.features.items()
        },
        name=table_name,
    )

    if projection._omit:
        raise ValueError(
            "Projection is empty. Please provide a projection to export SQL."
        )

    ibis_expr = translate(unbound_table, pipeline, projection=projection)
    sqlglot_expr = getattr(sc, dialect).compiler.to_sqlglot(ibis_expr)

    if optimize:
        c = Catalog()
        catalog = {
            unbound_table.get_name(): c.to_sqlglot_schema(unbound_table.schema())
        }
        sqlglot_expr = sqlglot.optimizer.optimize(
            sqlglot_expr, schema=catalog, rules=OPTIMIZER_RULES
        )

    return sqlglot_expr.sql(dialect=dialect)

orbitalml.ast

Translate scikit-learn models to an intermediate represetation.

The IR is what will be processed to generate the SQL queries.

orbitalml.ast.ParsedPipeline

An intermediate representation of a scikit-learn pipeline.

This object can be converted to a SQL query and run on a database. In can also be saved and loaded back in binary format to the sake of model distribution. Even though distributing the SQL query is usually more convenient.

Source code in orbitalml/ast.py
class ParsedPipeline:
    """An intermediate representation of a scikit-learn pipeline.

    This object can be converted to a SQL query and run on a database.
    In can also be saved and loaded back in binary format to the sake
    of model distribution. Even though distributing the SQL query
    is usually more convenient.
    """

    _model: _onnx.ModelProto  # type: ignore[assignment]
    features: FeaturesTypes  # type: ignore[assignment]

    def __init__(self) -> None:
        """ParsedPipeline objects can only be created by the parse_pipeline function."""

        raise NotImplementedError(
            "parse_pipeline must be used to create a ParsedPipeline object."
        )

    @classmethod
    def _from_onnx_model(
        cls, model: _onnx.ModelProto, features: FeaturesTypes
    ) -> "ParsedPipeline":
        """Create a ParsedPipeline from an ONNX model.

        This is considered an internal implementation detail
        as ONNX should never be exposed to the user.
        """
        self = super().__new__(cls)
        self._model = model
        self.features = self._validate_features(features)
        return self

    @classmethod
    def _validate_features(cls, features: FeaturesTypes) -> FeaturesTypes:
        """Validate the features of the pipeline.

        This checks that the features provided are compatible
        with what a SQL query can handle.
        """
        for name in features:
            if "." in name:
                raise ValueError(
                    f"Feature names cannot contain '.' characters: {name}, replace with '_'"
                )

        for ftype in features.values():
            if not isinstance(ftype, ColumnType):
                raise TypeError(f"Feature types must be ColumnType objects: {ftype}")

        return features

    def dump(self, filename: str) -> None:
        """Dump the parsed pipeline to a file."""
        # While the ONNX model is in protobuf format, and thus
        # it would make sense to use protobuf to serialize the
        # headers too. Using pickle avoids the need to define
        # a new protobuf schema for the headers and compile .proto files.
        header = {"version": 1, "features": self.features}
        header_data = pickle.dumps(header)
        header_len = len(header_data).to_bytes(4, "big")
        with open(filename, "wb") as f:
            f.write(header_len)
            f.write(header_data)
            f.write(self._model.SerializeToString())

    @classmethod
    def load(cls, filename: str) -> "ParsedPipeline":
        """Load a parsed pipeline from a file."""
        with open(filename, "rb") as f:
            header_len = int.from_bytes(f.read(4), "big")
            header_data = f.read(header_len)
            header = pickle.loads(header_data)
            if header["version"] != 1:
                # Currently there is only version 1
                raise UnsupportedFormatVersion("Unsupported format version.")
            model = _onnx.load_model(f)
        return cls._from_onnx_model(model, header["features"])

    def __str__(self) -> str:
        """Generate a string representation of the pipeline."""
        return str(repr_pipeline.ParsedPipelineStr(self))
__init__
__init__() -> None

ParsedPipeline objects can only be created by the parse_pipeline function.

Source code in orbitalml/ast.py
def __init__(self) -> None:
    """ParsedPipeline objects can only be created by the parse_pipeline function."""

    raise NotImplementedError(
        "parse_pipeline must be used to create a ParsedPipeline object."
    )
dump
dump(filename: str) -> None

Dump the parsed pipeline to a file.

Source code in orbitalml/ast.py
def dump(self, filename: str) -> None:
    """Dump the parsed pipeline to a file."""
    # While the ONNX model is in protobuf format, and thus
    # it would make sense to use protobuf to serialize the
    # headers too. Using pickle avoids the need to define
    # a new protobuf schema for the headers and compile .proto files.
    header = {"version": 1, "features": self.features}
    header_data = pickle.dumps(header)
    header_len = len(header_data).to_bytes(4, "big")
    with open(filename, "wb") as f:
        f.write(header_len)
        f.write(header_data)
        f.write(self._model.SerializeToString())
load classmethod
load(filename: str) -> ParsedPipeline

Load a parsed pipeline from a file.

Source code in orbitalml/ast.py
@classmethod
def load(cls, filename: str) -> "ParsedPipeline":
    """Load a parsed pipeline from a file."""
    with open(filename, "rb") as f:
        header_len = int.from_bytes(f.read(4), "big")
        header_data = f.read(header_len)
        header = pickle.loads(header_data)
        if header["version"] != 1:
            # Currently there is only version 1
            raise UnsupportedFormatVersion("Unsupported format version.")
        model = _onnx.load_model(f)
    return cls._from_onnx_model(model, header["features"])
__str__
__str__() -> str

Generate a string representation of the pipeline.

Source code in orbitalml/ast.py
def __str__(self) -> str:
    """Generate a string representation of the pipeline."""
    return str(repr_pipeline.ParsedPipelineStr(self))

orbitalml.ast.UnsupportedFormatVersion

Bases: Exception

Format of loaded pipeline is not supported.

This usually happens when trying to load a newer format version with an older version of the framework.

Source code in orbitalml/ast.py
class UnsupportedFormatVersion(Exception):
    """Format of loaded pipeline is not supported.

    This usually happens when trying to load a newer
    format version with an older version of the framework.
    """

    pass

orbitalml.types

Data types of the features processed by models.

orbitalml.types.ColumnType

Bases: ABC

A base class representing the type of a column of data.

Source code in orbitalml/types.py
class ColumnType(abc.ABC):
    """A base class representing the type of a column of data."""

    @abc.abstractmethod
    def _to_onnxtype(self) -> _sl2o_types.DataType:  # pragma: no cover
        """Convert the ColumnType to an onnx type.

        This should be implemented by all specific types.
        """
        pass

    @abc.abstractmethod
    def _to_ibistype(self) -> ibis_types.DataType:
        """Convert the ColumnType to an ibis type.

        This should be implemented by all specific types.
        """
        pass

    @staticmethod
    def _from_onnxtype(onnxtype: _sl2o_types.DataType) -> "ColumnType":
        """Given an onnx type, guess the right ColumnType."""
        if onnxtype.shape != [None, 1]:
            raise ValueError("Only columnar data is supported.")

        for scls in ColumnType.__subclasses__():
            supported_type = inspect.signature(scls._to_onnxtype).return_annotation
            if supported_type == onnxtype.__class__:
                return scls()  # type: ignore[abstract]
        else:
            raise TypeError(f"Unsupported data type {onnxtype.__class__.__name__}")

    def __eq__(self, other: object) -> bool:
        return self.__class__ == other.__class__

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}()"

orbitalml.types.FloatColumnType

Bases: ColumnType

Mark a column as containing float values

Source code in orbitalml/types.py
class FloatColumnType(ColumnType):
    """Mark a column as containing float values"""

    def _to_onnxtype(self) -> _sl2o_types.FloatTensorType:
        return _sl2o_types.FloatTensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Float32:
        return ibis_types.Float32()

orbitalml.types.Float16ColumnType

Bases: ColumnType

Mark a column as containing 16bit float values

Source code in orbitalml/types.py
class Float16ColumnType(ColumnType):
    """Mark a column as containing 16bit float values"""

    def _to_onnxtype(self) -> _sl2o_types.Float16TensorType:
        return _sl2o_types.Float16TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Float16:
        return ibis_types.Float16()

orbitalml.types.DoubleColumnType

Bases: ColumnType

Mark a column as containing double values

Source code in orbitalml/types.py
class DoubleColumnType(ColumnType):
    """Mark a column as containing double values"""

    def _to_onnxtype(self) -> _sl2o_types.DoubleTensorType:
        return _sl2o_types.DoubleTensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Float64:
        return ibis_types.Float64()

orbitalml.types.StringColumnType

Bases: ColumnType

Mark a column as containing string values

Source code in orbitalml/types.py
class StringColumnType(ColumnType):
    """Mark a column as containing string values"""

    def _to_onnxtype(self) -> _sl2o_types.StringTensorType:
        return _sl2o_types.StringTensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.String:
        return ibis_types.String()

orbitalml.types.Int64ColumnType

Bases: ColumnType

Mark a column as containing signed 64bit integer values

Source code in orbitalml/types.py
class Int64ColumnType(ColumnType):
    """Mark a column as containing signed 64bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.Int64TensorType:
        return _sl2o_types.Int64TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Int64:
        return ibis_types.Int64()

orbitalml.types.UInt64ColumnType

Bases: ColumnType

Mark a column as containing unsigned 64bit integer values

Source code in orbitalml/types.py
class UInt64ColumnType(ColumnType):
    """Mark a column as containing unsigned 64bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.UInt64TensorType:
        return _sl2o_types.UInt64TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.UInt64:
        return ibis_types.UInt64()

orbitalml.types.Int32ColumnType

Bases: ColumnType

Mark a column as containing signed 32bit integer values

Source code in orbitalml/types.py
class Int32ColumnType(ColumnType):
    """Mark a column as containing signed 32bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.Int32TensorType:
        return _sl2o_types.Int32TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Int32:
        return ibis_types.Int32()

orbitalml.types.UInt32ColumnType

Bases: ColumnType

Mark a column as containing unsigned 32bit integer values

Source code in orbitalml/types.py
class UInt32ColumnType(ColumnType):
    """Mark a column as containing unsigned 32bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.UInt32TensorType:
        return _sl2o_types.UInt32TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.UInt32:
        return ibis_types.UInt32()

orbitalml.types.Int16ColumnType

Bases: ColumnType

Mark a column as containing signed 16bit integer values

Source code in orbitalml/types.py
class Int16ColumnType(ColumnType):
    """Mark a column as containing signed 16bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.Int16TensorType:
        return _sl2o_types.Int16TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Int16:
        return ibis_types.Int16()

orbitalml.types.UInt16ColumnType

Bases: ColumnType

Mark a column as containing unsigned 16bit integer values

Source code in orbitalml/types.py
class UInt16ColumnType(ColumnType):
    """Mark a column as containing unsigned 16bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.UInt16TensorType:
        return _sl2o_types.UInt16TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.UInt16:
        return ibis_types.UInt16()

orbitalml.types.Int8ColumnType

Bases: ColumnType

Mark a column as containing signed 8bit integer values

Source code in orbitalml/types.py
class Int8ColumnType(ColumnType):
    """Mark a column as containing signed 8bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.Int8TensorType:
        return _sl2o_types.Int8TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Int8:
        return ibis_types.Int8()

orbitalml.types.UInt8ColumnType

Bases: ColumnType

Mark a column as containing unsigned 8bit integer values

Source code in orbitalml/types.py
class UInt8ColumnType(ColumnType):
    """Mark a column as containing unsigned 8bit integer values"""

    def _to_onnxtype(self) -> _sl2o_types.UInt8TensorType:
        return _sl2o_types.UInt8TensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.UInt8:
        return ibis_types.UInt8()

orbitalml.types.BooleanColumnType

Bases: ColumnType

Mark a column as containing boolean values

Source code in orbitalml/types.py
class BooleanColumnType(ColumnType):
    """Mark a column as containing boolean values"""

    def _to_onnxtype(self) -> _sl2o_types.BooleanTensorType:
        return _sl2o_types.BooleanTensorType(shape=[None, 1])

    def _to_ibistype(self) -> ibis_types.Boolean:
        return ibis_types.Boolean()

orbitalml.types.guess_datatypes

guess_datatypes(dataframe: Any) -> FeaturesTypes

Given a DataFrame, try to guess the types of each feature in it.

This procudes a :class:.FeaturesTypes dictionary that can be used by parse_pipeline to generate the SQL queries from the sklearn pipeline.

In most cases this shouldn't be necessary as the user should know on what data the pipeline was trained on, but it can be convenient when experimenting or writing tests.

Source code in orbitalml/types.py
def guess_datatypes(dataframe: typing.Any) -> FeaturesTypes:
    """Given a DataFrame, try to guess the types of each feature in it.

    This procudes a :class:`.FeaturesTypes` dictionary that can be used by
    parse_pipeline to generate the SQL queries from the sklearn pipeline.

    In most cases this shouldn't be necessary as the user should know
    on what data the pipeline was trained on, but it can be convenient
    when experimenting or writing tests.
    """
    if hasattr(dataframe, "to_pandas"):
        # Easiest way to ensure compatibility with Polars, Pandas and PyArrow.
        dataframe = dataframe.to_pandas()

    try:
        dtypes = _sl2o_types.guess_data_type(dataframe)
    except (TypeError, NotImplementedError) as exc:
        log.debug(f"Unable to guess types from {repr(dataframe)}, exception: {exc}")
        raise ValueError("Unable to guess types of dataframe") from None

    typesmap: FeaturesTypes = {}
    for name, dtype in dtypes:
        try:
            typesmap[name] = ColumnType._from_onnxtype(dtype)
        except (ValueError, TypeError, AttributeError) as exc:
            log.debug(
                f"Unable to convert to column type from {name}:{repr(dtype)}, exception: {exc}"
            )
            raise ValueError(f"Unsupported datatype for column {name}") from None
    return typesmap