Skip to content

Orbital Internals

orbitalml.translate

Translate a pipeline into an Ibis expression.

orbitalml.translate.ResultsProjection

Projection of the results of the pipeline.

This class is used to select the columns to be returned from the pipeline. It can be used to select specific columns to include in the final result set.

It can also be used to skip the select step of columns from the pipeline.

You can use the omit method to skip the projection step entirely.

Source code in orbitalml/translate.py
class ResultsProjection:
    """Projection of the results of the pipeline.

    This class is used to select the columns to be returned
    from the pipeline. It can be used to select specific
    columns to include in the final result set.

    It can also be used to skip the select step of columns
    from the pipeline.

    You can use the `omit` method to skip the projection
    step entirely.
    """

    def __init__(self, select: typing.Optional[list[str]] = None) -> None:
        """
        :param select: A list of additional columns to be selected from the pipeline.
        """
        self._select = select or []
        self._omit = False

    @classmethod
    def omit(cls) -> "ResultsProjection":
        """Create a projection that skips projection phase entirely."""
        projection = cls()
        projection._omit = True
        return projection

    def _expand(self, results: typing.Iterable[str]) -> typing.Optional[list[str]]:
        if self._omit:
            return None

        def _emit_projection() -> typing.Generator[str, None, None]:
            yield from results
            for item in self._select:
                yield item

        return list(_emit_projection())

__init__

__init__(select: Optional[list[str]] = None) -> None

Parameters:

Name Type Description Default
select Optional[list[str]]

A list of additional columns to be selected from the pipeline.

None
Source code in orbitalml/translate.py
def __init__(self, select: typing.Optional[list[str]] = None) -> None:
    """
    :param select: A list of additional columns to be selected from the pipeline.
    """
    self._select = select or []
    self._omit = False

omit classmethod

Create a projection that skips projection phase entirely.

Source code in orbitalml/translate.py
@classmethod
def omit(cls) -> "ResultsProjection":
    """Create a projection that skips projection phase entirely."""
    projection = cls()
    projection._omit = True
    return projection

orbitalml.translate.translate

translate(
    table: Table,
    pipeline: ParsedPipeline,
    projection: ResultsProjection = ResultsProjection(),
) -> Table

Translate a pipeline into an Ibis expression.

This function takes a pipeline and a table and translates the pipeline into an Ibis expression applied to the table.

It is possible to further chain operations on the result to allow post processing of the prediction.

Source code in orbitalml/translate.py
def translate(
    table: ibis.Table,
    pipeline: ParsedPipeline,
    projection: ResultsProjection = ResultsProjection(),
) -> ibis.Table:
    """Translate a pipeline into an Ibis expression.

    This function takes a pipeline and a table and translates the pipeline
    into an Ibis expression applied to the table.

    It is possible to further chain operations on the result
    to allow post processing of the prediction.
    """
    optimizer = Optimizer(enabled=True)
    features = {colname: table[colname] for colname in table.columns}
    variables = GraphVariables(features, pipeline._model.graph)
    nodes = list(pipeline._model.graph.node)
    for node in nodes:
        op_type = node.op_type
        if op_type not in TRANSLATORS:
            raise NotImplementedError(f"Translation for {op_type} not implemented")
        translator = TRANSLATORS[op_type](table, node, variables, optimizer)  # type: ignore[abstract]
        _log_debug_start(translator, variables)
        translator.process()
        table = translator.mutated_table  # Translator might return a new table.
        _log_debug_end(translator, variables)
    return _projection_results(table, variables, projection)

orbitalml.translation.variables

Define the variables and group of variables used in the translation process.

orbitalml.translation.variables.VariablesGroup

Bases: dict[str, VariablesGroupVarT], Generic[VariablesGroupVarT]

A group of variables that can be used to represent a set of expressions.

This is used to represent a group of columns in a table, the group will act as a single entity on which expressions will be applied.

If an expression is applied to the group, it will be applied to all columns in the group.

Source code in orbitalml/translation/variables.py
class VariablesGroup(dict[str, VariablesGroupVarT], typing.Generic[VariablesGroupVarT]):
    """A group of variables that can be used to represent a set of expressions.

    This is used to represent a group of columns in a table,
    the group will act as a single entity on which expressions will
    be applied.

    If an expression is applied to the group, it will be applied to all
    columns in the group.
    """

    VAR_TYPE = ibis.Expr

    def __init__(self, vargroup: dict | None = None) -> None:
        """
        :param vargroup: A dictionary of names and expressions that are part of the group.
        """
        if vargroup is not None:
            for expr in vargroup.values():
                if not isinstance(expr, self.VAR_TYPE):
                    raise TypeError(f"Expected {self.VAR_TYPE} value, got {type(expr)}")
            args = [vargroup]
        else:
            args = []

        super().__init__(*args)

    def __setitem__(self, key: str, value: VariablesGroupVarT, /) -> None:
        if not isinstance(value, self.VAR_TYPE):
            raise TypeError(f"Expected {self.VAR_TYPE} value, got {type(value)}")
        return super().__setitem__(key, value)

    def as_value(self, name: str) -> ibis.Value:
        """Return a subvariable as a Value.

        Values are expressions on top of which operations
        like comparions, mathematical operations, etc. can be applied.
        """
        value = self[name]
        if not isinstance(value, ibis.Value):
            raise TypeError(f"Expected value, got {type(value)}")
        return value

    def values_value(self) -> list[ibis.Value]:
        """Return all subvariables as a list of Values."""
        values = list(self.values())
        for value in values:
            if not isinstance(value, ibis.Value):
                raise TypeError(f"Expected value, got {type(value)}")
        return typing.cast(list[ibis.Value], values)

__init__

__init__(vargroup: dict | None = None) -> None

Parameters:

Name Type Description Default
vargroup dict | None

A dictionary of names and expressions that are part of the group.

None
Source code in orbitalml/translation/variables.py
def __init__(self, vargroup: dict | None = None) -> None:
    """
    :param vargroup: A dictionary of names and expressions that are part of the group.
    """
    if vargroup is not None:
        for expr in vargroup.values():
            if not isinstance(expr, self.VAR_TYPE):
                raise TypeError(f"Expected {self.VAR_TYPE} value, got {type(expr)}")
        args = [vargroup]
    else:
        args = []

    super().__init__(*args)

as_value

as_value(name: str) -> Value

Return a subvariable as a Value.

Values are expressions on top of which operations like comparions, mathematical operations, etc. can be applied.

Source code in orbitalml/translation/variables.py
def as_value(self, name: str) -> ibis.Value:
    """Return a subvariable as a Value.

    Values are expressions on top of which operations
    like comparions, mathematical operations, etc. can be applied.
    """
    value = self[name]
    if not isinstance(value, ibis.Value):
        raise TypeError(f"Expected value, got {type(value)}")
    return value

values_value

values_value() -> list[Value]

Return all subvariables as a list of Values.

Source code in orbitalml/translation/variables.py
def values_value(self) -> list[ibis.Value]:
    """Return all subvariables as a list of Values."""
    values = list(self.values())
    for value in values:
        if not isinstance(value, ibis.Value):
            raise TypeError(f"Expected value, got {type(value)}")
    return typing.cast(list[ibis.Value], values)

orbitalml.translation.variables.ValueVariablesGroup

Bases: VariablesGroup[Value]

A group of value variables that can be used to represent a set of values.

This is used to represent a group of columns in a table, the group will act as a single entity on which expressions will be applied.

If an expression is applied to the group, it will be applied to all columns in the group.

Source code in orbitalml/translation/variables.py
class ValueVariablesGroup(VariablesGroup[ibis.expr.types.Value]):
    """A group of value variables that can be used to represent a set of values.

    This is used to represent a group of columns in a table,
    the group will act as a single entity on which expressions will
    be applied.

    If an expression is applied to the group, it will be applied to all
    columns in the group.
    """

    VAR_TYPE = ibis.expr.types.Value

orbitalml.translation.variables.NumericVariablesGroup

Bases: VariablesGroup[NumericValue]

A group of numeric variables that can be used to represent a set of numeric values.

This is used to represent a group of numeric columns in a table, steps that expect to be able to perform mathematical operations over a variables group will create a NumericVariablesGroup from it, so that it is guaranteed that all subvariables are numeric.

Source code in orbitalml/translation/variables.py
class NumericVariablesGroup(VariablesGroup[ibis.expr.types.NumericValue]):
    """A group of numeric variables that can be used to represent a set of numeric values.

    This is used to represent a group of numeric columns in a table,
    steps that expect to be able to perform mathematical operations
    over a variables group will create a NumericVariablesGroup
    from it, so that it is guaranteed that all subvariables are numeric.
    """

    VAR_TYPE = ibis.expr.types.NumericValue

orbitalml.translation.variables.GraphVariables

A class to manage the variables used in the translation process.

This class is responsible for managing the variables and constants used in the translation process. It keeps track of the variables that have been consumed and the variables that are still available.

When a variable is consumed it will be hidden from the list of available variables. This makes sure that the remaining variables that were not consumed are only the variables that should appear in the output (as they were set with no one consuming them afterward).

This class also manages constants (initializers) that are used in the translation process. When consuming a variable, it could be both a constant or a variable. But if its a constant it won't actually be consumed as constants never appear in the output and thus it will be available for other nodes that need it.

Source code in orbitalml/translation/variables.py
class GraphVariables:
    """A class to manage the variables used in the translation process.

    This class is responsible for managing the variables and constants
    used in the translation process. It keeps track of the variables
    that have been consumed and the variables that are still available.

    When a variable is consumed it will be hidden from the list of
    available variables. This makes sure that the remaining variables
    that were not consumed are only the variables that should appear
    in the output (as they were set with no one consuming them afterward).

    This class also manages constants (initializers) that are used in the translation
    process. When consuming a variable, it could be both a constant or a variable.
    But if its a constant it won't actually be consumed as constants never
    appear in the output and thus it will be available for other nodes that
    need it.
    """

    def __init__(self, table: ibis.Table, graph: onnx.GraphProto) -> None:
        """
        :param table: The table the variables came from.
        :param graph: The pipeline graph requiring the variables and providing the constants.
        """
        self._initializers: dict[str, onnx.TensorProto] = {
            init.name: init for init in graph.initializer
        }
        self._initializers_values: dict[str, VariableTypes] = {
            init.name: onnx_utils.get_initializer_data(init)
            for init in graph.initializer
        }
        self._variables: dict[str, ibis.Expr | VariablesGroup] = {
            inp.name: table[inp.name] for inp in graph.input
        }
        self._consumed: set[str] = set()
        self._uniqueid: int = 0

    def consume(self, name: str) -> ibis.Expr | VariableTypes | VariablesGroup:
        """Consume a variable or a constant.

        Return a python value for constants and an Expression
        or VariablesGroup for variables.

        When a variable is consumed it will be hidden from the list of
        remaining variables.
        """
        constant_value = self._initializers_values.get(name)
        if constant_value is not None:
            return constant_value

        self._consumed.add(name)
        return self._variables[name]

    def peek_variable(
        self, name: str, default: None | ibis.Expr = None
    ) -> ibis.Expr | VariablesGroup | None:
        """Peek a variable without consuming it."""
        return self._variables.get(name, default)

    def get_initializer(
        self, name: str, default: None | onnx.TensorProto = None
    ) -> onnx.TensorProto | None:
        """Get an initializer by name."""
        return self._initializers.get(name, default)

    def get_initializer_value(
        self, name: str, default: None | VariableTypes = None
    ) -> VariableTypes | None:
        """Get a constant value."""
        return self._initializers_values.get(name, default)

    def keys(self) -> list[str]:
        """Name of all the variables that were not consumed."""
        return [f for f in self._variables if f not in self._consumed]

    def __setitem__(self, key: str, value: ibis.Expr | VariablesGroup, /) -> None:
        self._variables[key] = value
        self._consumed.discard(key)

    def __contains__(self, key: str) -> bool:
        return key in self._variables and key not in self._consumed

    def __len__(self) -> int:
        return len(self.keys())

    def nested_len(self) -> int:
        """Get total amount of variables and subvariables"""
        total = 0
        for name in self._variables:
            if name not in self._consumed:
                var = self._variables[name]
                if isinstance(var, VariablesGroup):
                    total += len(var)
                else:
                    total += 1
        return total

    def remaining(self) -> dict[str, ibis.Expr | VariablesGroup]:
        """Return the variables that were not consumed."""
        return {
            name: self._variables[name]
            for name in self._variables
            if name not in self._consumed
        }

    def generate_unique_shortname(self) -> str:
        """Generate a unique short name for a variable."""
        self._uniqueid += 1
        return f"v{self._uniqueid}"

__init__

__init__(table: Table, graph: GraphProto) -> None

Parameters:

Name Type Description Default
table Table

The table the variables came from.

required
graph GraphProto

The pipeline graph requiring the variables and providing the constants.

required
Source code in orbitalml/translation/variables.py
def __init__(self, table: ibis.Table, graph: onnx.GraphProto) -> None:
    """
    :param table: The table the variables came from.
    :param graph: The pipeline graph requiring the variables and providing the constants.
    """
    self._initializers: dict[str, onnx.TensorProto] = {
        init.name: init for init in graph.initializer
    }
    self._initializers_values: dict[str, VariableTypes] = {
        init.name: onnx_utils.get_initializer_data(init)
        for init in graph.initializer
    }
    self._variables: dict[str, ibis.Expr | VariablesGroup] = {
        inp.name: table[inp.name] for inp in graph.input
    }
    self._consumed: set[str] = set()
    self._uniqueid: int = 0

consume

consume(name: str) -> Expr | VariableTypes | VariablesGroup

Consume a variable or a constant.

Return a python value for constants and an Expression or VariablesGroup for variables.

When a variable is consumed it will be hidden from the list of remaining variables.

Source code in orbitalml/translation/variables.py
def consume(self, name: str) -> ibis.Expr | VariableTypes | VariablesGroup:
    """Consume a variable or a constant.

    Return a python value for constants and an Expression
    or VariablesGroup for variables.

    When a variable is consumed it will be hidden from the list of
    remaining variables.
    """
    constant_value = self._initializers_values.get(name)
    if constant_value is not None:
        return constant_value

    self._consumed.add(name)
    return self._variables[name]

peek_variable

peek_variable(
    name: str, default: None | Expr = None
) -> Expr | VariablesGroup | None

Peek a variable without consuming it.

Source code in orbitalml/translation/variables.py
def peek_variable(
    self, name: str, default: None | ibis.Expr = None
) -> ibis.Expr | VariablesGroup | None:
    """Peek a variable without consuming it."""
    return self._variables.get(name, default)

get_initializer

get_initializer(
    name: str, default: None | TensorProto = None
) -> TensorProto | None

Get an initializer by name.

Source code in orbitalml/translation/variables.py
def get_initializer(
    self, name: str, default: None | onnx.TensorProto = None
) -> onnx.TensorProto | None:
    """Get an initializer by name."""
    return self._initializers.get(name, default)

get_initializer_value

get_initializer_value(
    name: str, default: None | VariableTypes = None
) -> VariableTypes | None

Get a constant value.

Source code in orbitalml/translation/variables.py
def get_initializer_value(
    self, name: str, default: None | VariableTypes = None
) -> VariableTypes | None:
    """Get a constant value."""
    return self._initializers_values.get(name, default)

keys

keys() -> list[str]

Name of all the variables that were not consumed.

Source code in orbitalml/translation/variables.py
def keys(self) -> list[str]:
    """Name of all the variables that were not consumed."""
    return [f for f in self._variables if f not in self._consumed]

nested_len

nested_len() -> int

Get total amount of variables and subvariables

Source code in orbitalml/translation/variables.py
def nested_len(self) -> int:
    """Get total amount of variables and subvariables"""
    total = 0
    for name in self._variables:
        if name not in self._consumed:
            var = self._variables[name]
            if isinstance(var, VariablesGroup):
                total += len(var)
            else:
                total += 1
    return total

remaining

remaining() -> dict[str, Expr | VariablesGroup]

Return the variables that were not consumed.

Source code in orbitalml/translation/variables.py
def remaining(self) -> dict[str, ibis.Expr | VariablesGroup]:
    """Return the variables that were not consumed."""
    return {
        name: self._variables[name]
        for name in self._variables
        if name not in self._consumed
    }

generate_unique_shortname

generate_unique_shortname() -> str

Generate a unique short name for a variable.

Source code in orbitalml/translation/variables.py
def generate_unique_shortname(self) -> str:
    """Generate a unique short name for a variable."""
    self._uniqueid += 1
    return f"v{self._uniqueid}"

orbitalml.translation.translator

Base class for the translators of each pipeline step.

orbitalml.translation.translator.Translator

Bases: ABC

Base class for all translators.

This class is responsible for translating pipeline steps into Ibis expressions.

Source code in orbitalml/translation/translator.py
class Translator(abc.ABC):
    """Base class for all translators.

    This class is responsible for translating pipeline steps into Ibis expressions.
    """

    def __init__(
        self,
        table: ibis.Table,
        node: onnx.NodeProto,
        variables: GraphVariables,
        optimizer: Optimizer,
    ) -> None:
        """
        :param table: The table the generated query should target.
        :param node: The pipeline node to be translated.
        :param variables: The variables used during the translation process.
        :param optimizer: The optimizer used for the translation.
        """
        self._table = table
        self._variables = variables
        self._node = node
        self._optimizer = optimizer
        self._inputs = node.input
        self._outputs = node.output
        self._attributes = {
            attr.name: onnx_utils.get_attr_value(attr) for attr in node.attribute
        }

    @abc.abstractmethod
    def process(self) -> None:
        """Performs the translation and set the output variable."""
        pass

    @property
    def operation(self) -> str:
        """What is the operation being translated"""
        return self._node.op_type

    @property
    def inputs(self) -> list[str]:
        """The input variables for this node"""
        return [str(i) for i in self._inputs]

    @property
    def outputs(self) -> list[str]:
        """The expected output variables the node should emit"""
        return [str(o) for o in self._outputs]

    @property
    def mutated_table(self) -> ibis.Table:
        """The table as it is being mutated by the translator.

        This is required for the translator to be able too set
        temporary variables that are not part of the final output.

        For example when an expression is used many times,
        the translator can create a temporary column in the
        SQL query to avoid recomputing the same expression.
        That leads to new columns being added to the table.
        """
        return self._table

    def set_output(
        self,
        value: ibis.Deferred | ibis.Expr | VariablesGroup | onnx_utils.VariableTypes,
        index: int = 0,
    ) -> None:
        """Set the output variable for the translator.

        This is only allowed if the translator has a single output.
        Otherwise the node is expected to explicitly set every variable.
        """
        if not isinstance(value, (ibis.Expr, VariablesGroup)):
            value = ibis.literal(value)
        self._variables[self.outputs[index]] = value

    def preserve(self, *variables) -> list[ibis.Expr]:
        """Preserve the given variables in the table.

        This causes the variables to be projected in the table,
        so that future expressions can use them instead of
        repeating the expression.
        """
        for v in variables:
            if v.get_name() in self._table.columns:
                raise ValueError(
                    f"Preserve variable already exists in the table: {v.get_name()}"
                )

        mutate_args = {v.get_name(): v for v in variables}
        self._table = self._table.mutate(**mutate_args)

        # TODO: Should probably update self._variables too
        # in case the same variable is used in multiple places
        # but this is not a common case, and it's complex because
        # we don't know the variable name (!= column_name)
        # so we'll leave it for now.
        return [self._table[cname] for cname in mutate_args]

    def variable_unique_short_alias(self, prefix: str | None = None) -> str:
        """Generate a unique short name for a variable.

        This is generally used to generate names for temporary variables
        that are used in the translation process.

        The names are as short as possible to minimize the
        SQL query length.
        """
        shortname = self._variables.generate_unique_shortname()
        if prefix:
            shortname = f"{prefix}_{shortname}"
        return shortname

operation property

operation: str

What is the operation being translated

inputs property

inputs: list[str]

The input variables for this node

outputs property

outputs: list[str]

The expected output variables the node should emit

mutated_table property

mutated_table: Table

The table as it is being mutated by the translator.

This is required for the translator to be able too set temporary variables that are not part of the final output.

For example when an expression is used many times, the translator can create a temporary column in the SQL query to avoid recomputing the same expression. That leads to new columns being added to the table.

__init__

__init__(
    table: Table,
    node: NodeProto,
    variables: GraphVariables,
    optimizer: Optimizer,
) -> None

Parameters:

Name Type Description Default
table Table

The table the generated query should target.

required
node NodeProto

The pipeline node to be translated.

required
variables GraphVariables

The variables used during the translation process.

required
optimizer Optimizer

The optimizer used for the translation.

required
Source code in orbitalml/translation/translator.py
def __init__(
    self,
    table: ibis.Table,
    node: onnx.NodeProto,
    variables: GraphVariables,
    optimizer: Optimizer,
) -> None:
    """
    :param table: The table the generated query should target.
    :param node: The pipeline node to be translated.
    :param variables: The variables used during the translation process.
    :param optimizer: The optimizer used for the translation.
    """
    self._table = table
    self._variables = variables
    self._node = node
    self._optimizer = optimizer
    self._inputs = node.input
    self._outputs = node.output
    self._attributes = {
        attr.name: onnx_utils.get_attr_value(attr) for attr in node.attribute
    }

process abstractmethod

process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/translator.py
@abc.abstractmethod
def process(self) -> None:
    """Performs the translation and set the output variable."""
    pass

set_output

set_output(
    value: Deferred | Expr | VariablesGroup | VariableTypes,
    index: int = 0,
) -> None

Set the output variable for the translator.

This is only allowed if the translator has a single output. Otherwise the node is expected to explicitly set every variable.

Source code in orbitalml/translation/translator.py
def set_output(
    self,
    value: ibis.Deferred | ibis.Expr | VariablesGroup | onnx_utils.VariableTypes,
    index: int = 0,
) -> None:
    """Set the output variable for the translator.

    This is only allowed if the translator has a single output.
    Otherwise the node is expected to explicitly set every variable.
    """
    if not isinstance(value, (ibis.Expr, VariablesGroup)):
        value = ibis.literal(value)
    self._variables[self.outputs[index]] = value

preserve

preserve(*variables) -> list[Expr]

Preserve the given variables in the table.

This causes the variables to be projected in the table, so that future expressions can use them instead of repeating the expression.

Source code in orbitalml/translation/translator.py
def preserve(self, *variables) -> list[ibis.Expr]:
    """Preserve the given variables in the table.

    This causes the variables to be projected in the table,
    so that future expressions can use them instead of
    repeating the expression.
    """
    for v in variables:
        if v.get_name() in self._table.columns:
            raise ValueError(
                f"Preserve variable already exists in the table: {v.get_name()}"
            )

    mutate_args = {v.get_name(): v for v in variables}
    self._table = self._table.mutate(**mutate_args)

    # TODO: Should probably update self._variables too
    # in case the same variable is used in multiple places
    # but this is not a common case, and it's complex because
    # we don't know the variable name (!= column_name)
    # so we'll leave it for now.
    return [self._table[cname] for cname in mutate_args]

variable_unique_short_alias

variable_unique_short_alias(
    prefix: str | None = None,
) -> str

Generate a unique short name for a variable.

This is generally used to generate names for temporary variables that are used in the translation process.

The names are as short as possible to minimize the SQL query length.

Source code in orbitalml/translation/translator.py
def variable_unique_short_alias(self, prefix: str | None = None) -> str:
    """Generate a unique short name for a variable.

    This is generally used to generate names for temporary variables
    that are used in the translation process.

    The names are as short as possible to minimize the
    SQL query length.
    """
    shortname = self._variables.generate_unique_shortname()
    if prefix:
        shortname = f"{prefix}_{shortname}"
    return shortname

orbitalml.translation.optimizer

Implement optiomizations to the Ibis expression tree.

Primarily it takes care of folding constant expressions and removing unnecessary casts.

orbitalml.translation.optimizer.Optimizer

Optimizer for Ibis expressions.

This class is responsible for applying a set of optimization processes to Ibis expressionsto remove unecessary operations and reduce query complexity.

Source code in orbitalml/translation/optimizer.py
class Optimizer:
    """Optimizer for Ibis expressions.

    This class is responsible for applying a set of optimization
    processes to Ibis expressionsto remove unecessary operations and
    reduce query complexity.
    """

    BINARY_OPS: dict[type[Binary], typing.Callable] = {
        # Mathematical Operators
        Add: operator.add,
        Subtract: operator.sub,
        Multiply: operator.mul,
        Divide: operator.truediv,
        FloorDivide: operator.floordiv,
        Modulus: operator.mod,
        # Logical Operators
        Equals: operator.eq,
        NotEquals: operator.ne,
        Greater: operator.gt,
        GreaterEqual: operator.ge,
        Less: operator.lt,
        LessEqual: operator.le,
        IdenticalTo: operator.eq,
        # Binary Operators
        And: operator.and_,
        Or: operator.or_,
        Xor: operator.xor,
    }

    UNARY_OPS: dict[type[Unary], typing.Callable] = {
        Negate: operator.neg,
        Abs: operator.abs,
        Ceil: lambda x: float(operator.methodcaller("ceil")(x)),  # Se necessario
        Floor: lambda x: float(operator.methodcaller("floor")(x)),
        Not: operator.not_,
    }

    def __init__(self, enabled: bool = True) -> None:
        """
        :param enabled: Whether to enable the optimizer.
                        When disabled, the optimizer will
                        return the expression unchanged.
        """
        self.ENABLED = enabled

    def _ensure_expr(self, value: ibis.Expr) -> ibis.Expr:
        """Ensure that the value is an Ibis expression.

        Literal objects need to be converted back to an
        Ibis expression to be used in the query.
        """
        if isinstance(value, Literal):
            return ibis.literal(value.value)
        return value

    def _fold_associative_op_contiguous(
        self, lst: list[ibis.expr.types.NumericValue], pyop: typing.Callable
    ) -> list[ibis.expr.types.NumericValue]:
        """Precompute an operation applied on multiple elements.

        Given a list of expressions and a binary operation,
        this function will precompute the operation on all
        constant expressions in the list returning a new list
        of expressions with the folded constants.
        """
        if self.ENABLED is False:
            return list(lst)

        lst = list(lst)

        result = []
        for is_number, group in itertools.groupby(
            lst, key=lambda x: isinstance(x, NumericScalar)
        ):
            if is_number:
                values = [scalar.execute() for scalar in group]
                folded_value = ibis.literal(functools.reduce(pyop, values, 0))
                result.append(folded_value)
            else:
                result.extend(group)
        return result

    def fold_contiguous_sum(
        self, lst: list[ibis.expr.types.NumericValue]
    ) -> list[ibis.expr.types.NumericValue]:
        """Precompute constants in a list of sums"""
        return self._fold_associative_op_contiguous(lst, operator.add)

    def fold_contiguous_product(
        self, lst: list[ibis.expr.types.NumericValue]
    ) -> list[ibis.expr.types.NumericValue]:
        """Precompute constants in a list of multiplications"""
        return self._fold_associative_op_contiguous(lst, operator.mul)

    def fold_case(self, expr: ibis.Value | ibis.Deferred) -> ibis.Value:
        """Apply different folding strategies to CASE WHHEN expressions.

        - If the CASE is a constant, it will evalute it immediately.
        - If the CASE is a IF ELSE statement returning 1 or 0,
          it will be converted to a boolean expression.
        - When the results and the default are the same, just return
          the default value.
        """
        if not isinstance(expr, ibis.Value):
            raise NotImplementedError("Deferred case expressions are not supported")

        if self.ENABLED is False:
            return expr

        op = expr.op()

        results_are_literals = all(
            isinstance(c, Literal) for c in itertools.chain([op.default], op.results)
        )
        possible_values = (
            set(itertools.chain([op.default.value], [c.value for c in op.results]))
            if results_are_literals
            else set()
        )

        if results_are_literals and len(possible_values) == 1:
            # All results and the default are literals with the same value.
            # It doesn't make any sense to have the case as it will always
            # lead to the same result.
            return self._ensure_expr(possible_values.pop())
        elif len(op.cases) == 1 and isinstance(op.cases[0], Literal):
            # It's only a IF ELSE statement, we can check the case
            # and eventually drop it if it's a constant.
            if op.cases[0].value:
                return op.results[0].to_expr()
            else:
                return op.default.to_expr()
        elif len(op.cases) == 1 and results_are_literals and possible_values == {1, 0}:
            # results are 1 or 0, we can fold it to a boolean expression.
            # FIXME: This doesn't work on postgresql so we need to disable it for the moment.
            return expr
            if op.results[0].value == 1:
                return (op.cases[0].to_expr()).cast("float64")
            else:
                return (~(op.cases[0].to_expr())).cast("float64")

        return expr

    def fold_cast(self, expr: ibis.Value) -> ibis.Value:
        """Given a cast expression, precompute it if possible."""
        if self.ENABLED is False:
            return expr

        op_instance = expr.op()
        if not isinstance(op_instance, ibis.expr.operations.Cast):
            # Not a cast, ignore
            # This can happen when a Field (a column) is casted to a type
            # and the Column is already of the same type.
            # Ibis seems to optimize this case and remove the cast.
            return expr

        target_type = op_instance.to
        arg = op_instance.arg

        if isinstance(arg, Literal):
            value = arg.value
            if target_type == dt.int64:
                return ibis.literal(int(value))
            elif target_type == dt.float64:
                return ibis.literal(float(value))
            elif target_type == dt.string:
                return ibis.literal(str(value))
            elif target_type == dt.boolean:
                return ibis.literal(bool(value))
            else:
                raise NotImplementedError(
                    f"Literal Cast to {target_type} not supported"
                )
        elif arg.dtype() == target_type:
            # The expression is already of the target type
            # No need to cast it again.
            return expr

        return expr

    def fold_zeros(self, expr: ibis.Expr) -> ibis.Expr:
        """Given a binary expression, precompute the result if it contains zeros.

        Operations like x + 0, x * 0, x - 0 etc can be folded in just x or 0
        without the need to compute the operation.
        """
        if self.ENABLED is False:
            return expr

        op = expr.op()
        inputs = op.args
        op_class = type(op)

        if op_class == Multiply:
            left_val = inputs[0].value if isinstance(inputs[0], Literal) else None
            right_val = inputs[1].value if isinstance(inputs[1], Literal) else None
            if left_val == 0 or right_val == 0:
                return ibis.literal(0)
        elif op_class == Add:
            left_val = inputs[0].value if isinstance(inputs[0], Literal) else None
            right_val = inputs[1].value if isinstance(inputs[1], Literal) else None
            if left_val == 0:
                return inputs[1].to_expr()
            elif right_val == 0:
                return inputs[0].to_expr()
        elif op_class == Subtract:
            left_val = inputs[0].value if isinstance(inputs[0], Literal) else None
            right_val = inputs[1].value if isinstance(inputs[1], Literal) else None
            if left_val == 0:
                return inputs[1].to_expr()
            elif right_val == 0:
                return inputs[0].to_expr()

        return expr

    def fold_operation(self, expr: ibis.Expr) -> ibis.Expr:
        """Given a node (an Ibis expression) fold constant expressions.

        If all node immediate children are constant (i.e. NumericScalar),
        compute the operation in Python and return a literal with the result.

        Otherwise, simply return the expression unchanged.

        This function assumes that constant folding has already been applied
        to the children.
        """
        if self.ENABLED is False:
            return expr

        if isinstance(expr, (int, float, str, bool)):
            # In some cases the operation has been computed in python.
            # For example when we try to compute * between a ONNX literal
            # and a previously folded expression.
            # In those case return a literal so we guarantee we always
            # return an Ibis expression
            return ibis.literal(expr)

        op = expr.op()
        inputs = op.args

        if not all(isinstance(child, Literal) for child in inputs):
            # We can only fold operations where all children are literals.
            # At least we can remove zeros if they exist.
            return self.fold_zeros(expr)

        op_class = type(op)
        if op_class in self.BINARY_OPS:
            left_val = inputs[0].value
            right_val = inputs[1].value
            result = self.BINARY_OPS[typing.cast(type[Binary], op_class)](
                left_val, right_val
            )
            return self._ensure_expr(result)
        elif op_class in self.UNARY_OPS and len(inputs) == 1:
            result = self.UNARY_OPS[typing.cast(type[Unary], op_class)](inputs[0].value)
            return self._ensure_expr(result)
        else:
            # No possible folding
            return expr

    def _debug(self, expr: ibis.Expr, show_args: bool = True) -> str:
        """Given an expression, return a string representation for debugging."""
        if isinstance(expr, Literal):
            return repr(expr.value)
        elif show_args is False:
            return type(expr).__name__
        elif not hasattr(expr, "args"):
            return f"{type(expr).__name__}(<unknown>)"
        else:
            return f"{type(expr).__name__}({', '.join([self._debug(a, show_args=False) for a in expr.args])})"

__init__

__init__(enabled: bool = True) -> None

Parameters:

Name Type Description Default
enabled bool

Whether to enable the optimizer. When disabled, the optimizer will return the expression unchanged.

True
Source code in orbitalml/translation/optimizer.py
def __init__(self, enabled: bool = True) -> None:
    """
    :param enabled: Whether to enable the optimizer.
                    When disabled, the optimizer will
                    return the expression unchanged.
    """
    self.ENABLED = enabled

fold_contiguous_sum

fold_contiguous_sum(
    lst: list[NumericValue],
) -> list[NumericValue]

Precompute constants in a list of sums

Source code in orbitalml/translation/optimizer.py
def fold_contiguous_sum(
    self, lst: list[ibis.expr.types.NumericValue]
) -> list[ibis.expr.types.NumericValue]:
    """Precompute constants in a list of sums"""
    return self._fold_associative_op_contiguous(lst, operator.add)

fold_contiguous_product

fold_contiguous_product(
    lst: list[NumericValue],
) -> list[NumericValue]

Precompute constants in a list of multiplications

Source code in orbitalml/translation/optimizer.py
def fold_contiguous_product(
    self, lst: list[ibis.expr.types.NumericValue]
) -> list[ibis.expr.types.NumericValue]:
    """Precompute constants in a list of multiplications"""
    return self._fold_associative_op_contiguous(lst, operator.mul)

fold_case

fold_case(expr: Value | Deferred) -> Value

Apply different folding strategies to CASE WHHEN expressions.

  • If the CASE is a constant, it will evalute it immediately.
  • If the CASE is a IF ELSE statement returning 1 or 0, it will be converted to a boolean expression.
  • When the results and the default are the same, just return the default value.
Source code in orbitalml/translation/optimizer.py
def fold_case(self, expr: ibis.Value | ibis.Deferred) -> ibis.Value:
    """Apply different folding strategies to CASE WHHEN expressions.

    - If the CASE is a constant, it will evalute it immediately.
    - If the CASE is a IF ELSE statement returning 1 or 0,
      it will be converted to a boolean expression.
    - When the results and the default are the same, just return
      the default value.
    """
    if not isinstance(expr, ibis.Value):
        raise NotImplementedError("Deferred case expressions are not supported")

    if self.ENABLED is False:
        return expr

    op = expr.op()

    results_are_literals = all(
        isinstance(c, Literal) for c in itertools.chain([op.default], op.results)
    )
    possible_values = (
        set(itertools.chain([op.default.value], [c.value for c in op.results]))
        if results_are_literals
        else set()
    )

    if results_are_literals and len(possible_values) == 1:
        # All results and the default are literals with the same value.
        # It doesn't make any sense to have the case as it will always
        # lead to the same result.
        return self._ensure_expr(possible_values.pop())
    elif len(op.cases) == 1 and isinstance(op.cases[0], Literal):
        # It's only a IF ELSE statement, we can check the case
        # and eventually drop it if it's a constant.
        if op.cases[0].value:
            return op.results[0].to_expr()
        else:
            return op.default.to_expr()
    elif len(op.cases) == 1 and results_are_literals and possible_values == {1, 0}:
        # results are 1 or 0, we can fold it to a boolean expression.
        # FIXME: This doesn't work on postgresql so we need to disable it for the moment.
        return expr
        if op.results[0].value == 1:
            return (op.cases[0].to_expr()).cast("float64")
        else:
            return (~(op.cases[0].to_expr())).cast("float64")

    return expr

fold_cast

fold_cast(expr: Value) -> Value

Given a cast expression, precompute it if possible.

Source code in orbitalml/translation/optimizer.py
def fold_cast(self, expr: ibis.Value) -> ibis.Value:
    """Given a cast expression, precompute it if possible."""
    if self.ENABLED is False:
        return expr

    op_instance = expr.op()
    if not isinstance(op_instance, ibis.expr.operations.Cast):
        # Not a cast, ignore
        # This can happen when a Field (a column) is casted to a type
        # and the Column is already of the same type.
        # Ibis seems to optimize this case and remove the cast.
        return expr

    target_type = op_instance.to
    arg = op_instance.arg

    if isinstance(arg, Literal):
        value = arg.value
        if target_type == dt.int64:
            return ibis.literal(int(value))
        elif target_type == dt.float64:
            return ibis.literal(float(value))
        elif target_type == dt.string:
            return ibis.literal(str(value))
        elif target_type == dt.boolean:
            return ibis.literal(bool(value))
        else:
            raise NotImplementedError(
                f"Literal Cast to {target_type} not supported"
            )
    elif arg.dtype() == target_type:
        # The expression is already of the target type
        # No need to cast it again.
        return expr

    return expr

fold_zeros

fold_zeros(expr: Expr) -> Expr

Given a binary expression, precompute the result if it contains zeros.

Operations like x + 0, x * 0, x - 0 etc can be folded in just x or 0 without the need to compute the operation.

Source code in orbitalml/translation/optimizer.py
def fold_zeros(self, expr: ibis.Expr) -> ibis.Expr:
    """Given a binary expression, precompute the result if it contains zeros.

    Operations like x + 0, x * 0, x - 0 etc can be folded in just x or 0
    without the need to compute the operation.
    """
    if self.ENABLED is False:
        return expr

    op = expr.op()
    inputs = op.args
    op_class = type(op)

    if op_class == Multiply:
        left_val = inputs[0].value if isinstance(inputs[0], Literal) else None
        right_val = inputs[1].value if isinstance(inputs[1], Literal) else None
        if left_val == 0 or right_val == 0:
            return ibis.literal(0)
    elif op_class == Add:
        left_val = inputs[0].value if isinstance(inputs[0], Literal) else None
        right_val = inputs[1].value if isinstance(inputs[1], Literal) else None
        if left_val == 0:
            return inputs[1].to_expr()
        elif right_val == 0:
            return inputs[0].to_expr()
    elif op_class == Subtract:
        left_val = inputs[0].value if isinstance(inputs[0], Literal) else None
        right_val = inputs[1].value if isinstance(inputs[1], Literal) else None
        if left_val == 0:
            return inputs[1].to_expr()
        elif right_val == 0:
            return inputs[0].to_expr()

    return expr

fold_operation

fold_operation(expr: Expr) -> Expr

Given a node (an Ibis expression) fold constant expressions.

If all node immediate children are constant (i.e. NumericScalar), compute the operation in Python and return a literal with the result.

Otherwise, simply return the expression unchanged.

This function assumes that constant folding has already been applied to the children.

Source code in orbitalml/translation/optimizer.py
def fold_operation(self, expr: ibis.Expr) -> ibis.Expr:
    """Given a node (an Ibis expression) fold constant expressions.

    If all node immediate children are constant (i.e. NumericScalar),
    compute the operation in Python and return a literal with the result.

    Otherwise, simply return the expression unchanged.

    This function assumes that constant folding has already been applied
    to the children.
    """
    if self.ENABLED is False:
        return expr

    if isinstance(expr, (int, float, str, bool)):
        # In some cases the operation has been computed in python.
        # For example when we try to compute * between a ONNX literal
        # and a previously folded expression.
        # In those case return a literal so we guarantee we always
        # return an Ibis expression
        return ibis.literal(expr)

    op = expr.op()
    inputs = op.args

    if not all(isinstance(child, Literal) for child in inputs):
        # We can only fold operations where all children are literals.
        # At least we can remove zeros if they exist.
        return self.fold_zeros(expr)

    op_class = type(op)
    if op_class in self.BINARY_OPS:
        left_val = inputs[0].value
        right_val = inputs[1].value
        result = self.BINARY_OPS[typing.cast(type[Binary], op_class)](
            left_val, right_val
        )
        return self._ensure_expr(result)
    elif op_class in self.UNARY_OPS and len(inputs) == 1:
        result = self.UNARY_OPS[typing.cast(type[Unary], op_class)](inputs[0].value)
        return self._ensure_expr(result)
    else:
        # No possible folding
        return expr

orbitalml.translation.steps

Translators for each ParsedPipeline step

orbitalml.translation.steps.add

Translate an Add operation to the equivalent query expression.

AddTranslator

Bases: Translator

Processes an Add node and updates the variables with the output expression.

Given the node to translate, the variables and constants available for the translation context, generates a query expression that processes the input variables and produces a new output variable that computes based on the Add operation.

Source code in orbitalml/translation/steps/add.py
class AddTranslator(Translator):
    """Processes an Add node and updates the variables with the output expression.

    Given the node to translate, the variables and constants available for
    the translation context, generates a query expression that processes
    the input variables and produces a new output variable that computes
    based on the Add operation.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx__Add.html

        first_operand = self._variables.consume(self._inputs[0])
        second_operand = self._variables.get_initializer_value(self._inputs[1])
        if second_operand is None or not isinstance(second_operand, (list, tuple)):
            raise NotImplementedError(
                "Add: Second input (divisor) must be a constant list."
            )

        type_check_var = first_operand
        if isinstance(type_check_var, VariablesGroup):
            type_check_var = next(iter(type_check_var.values()), None)
        if not isinstance(type_check_var, ibis.expr.types.NumericValue):
            raise ValueError("Add: The first operand must be a numeric value.")

        add_values = list(second_operand)
        if isinstance(first_operand, VariablesGroup):
            first_operand = NumericVariablesGroup(first_operand)
            struct_fields = list(first_operand.keys())
            if len(add_values) != len(struct_fields):
                # TODO: Implement dividing by a single value,
                #       see Div implementation.
                raise ValueError(
                    "When the first operand is a group of columns, the second operand must contain the same number of values"
                )
            self.set_output(
                ValueVariablesGroup(
                    {
                        field: (
                            self._optimizer.fold_operation(
                                first_operand[field] + add_values[i]
                            )
                        )
                        for i, field in enumerate(struct_fields)
                    }
                )
            )
        else:
            if len(add_values) != 1:
                raise ValueError(
                    "When the first operand is a single column, the second operand must contain exactly 1 value"
                )
            first_operand = typing.cast(ibis.expr.types.NumericValue, first_operand)
            self.set_output(
                self._optimizer.fold_operation(first_operand + add_values[0])
            )
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/add.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx__Add.html

    first_operand = self._variables.consume(self._inputs[0])
    second_operand = self._variables.get_initializer_value(self._inputs[1])
    if second_operand is None or not isinstance(second_operand, (list, tuple)):
        raise NotImplementedError(
            "Add: Second input (divisor) must be a constant list."
        )

    type_check_var = first_operand
    if isinstance(type_check_var, VariablesGroup):
        type_check_var = next(iter(type_check_var.values()), None)
    if not isinstance(type_check_var, ibis.expr.types.NumericValue):
        raise ValueError("Add: The first operand must be a numeric value.")

    add_values = list(second_operand)
    if isinstance(first_operand, VariablesGroup):
        first_operand = NumericVariablesGroup(first_operand)
        struct_fields = list(first_operand.keys())
        if len(add_values) != len(struct_fields):
            # TODO: Implement dividing by a single value,
            #       see Div implementation.
            raise ValueError(
                "When the first operand is a group of columns, the second operand must contain the same number of values"
            )
        self.set_output(
            ValueVariablesGroup(
                {
                    field: (
                        self._optimizer.fold_operation(
                            first_operand[field] + add_values[i]
                        )
                    )
                    for i, field in enumerate(struct_fields)
                }
            )
        )
    else:
        if len(add_values) != 1:
            raise ValueError(
                "When the first operand is a single column, the second operand must contain exactly 1 value"
            )
        first_operand = typing.cast(ibis.expr.types.NumericValue, first_operand)
        self.set_output(
            self._optimizer.fold_operation(first_operand + add_values[0])
        )

orbitalml.translation.steps.argmax

Defines the translation step for the ArgMax operation.

ArgMaxTranslator

Bases: Translator

Processes an ArgMax node and updates the variables with the output expression.

Given the node to translate, the variables and constants available for the translation context, generates a query expression that processes the input variables and produces a new output variable that computes based on the ArgMax operation.

The ArgMax implementation is currently limited to emitting a variable that represents the index of the column with the maximum value in a group of columns. It is not possible to compute the max of a set of rows thus axis must be 1 and keepdims must be 1.

As it computes the maximum out of a set of columns, argmax expects a columns group as its input.

The limitation is due to the fact that we can't mix variables with a different amount of rows and MAX(col) would end up producing a single row. This is usually ok, as ArgMax is primarily used to pick the values with the maximum value out of the features analyzed by the model, and thus it is only required to produce a new value for each entry on which to perform a prediction/classification (row).

Source code in orbitalml/translation/steps/argmax.py
class ArgMaxTranslator(Translator):
    """Processes an ArgMax node and updates the variables with the output expression.

    Given the node to translate, the variables and constants available for
    the translation context, generates a query expression that processes
    the input variables and produces a new output variable that computes
    based on the ArgMax operation.

    The ArgMax implementation is currently limited to emitting a variable
    that represents the index of the column with the maximum value in a group
    of columns. It is not possible to compute the max of a set of rows
    thus axis must be 1 and keepdims must be 1.

    As it computes the maximum out of a set of columns, argmax expects
    a columns group as its input.

    The limitation is due to the fact that we can't mix variables with
    a different amount of rows and MAX(col) would end up producing a single row.
    This is usually ok, as ArgMax is primarily used to pick the values
    with the maximum value out of the features analyzed by the model,
    and thus it is only required to produce a new value for each entry
    on which to perform a prediction/classification (row).
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx__ArgMax.html
        data = self._variables.consume(self.inputs[0])
        axis = self._attributes.get("axis", 1)
        keepdims = self._attributes.get("keepdims", 1)
        select_last_index = self._attributes.get("select_last_index", 0)

        if not isinstance(data, dict):
            # if it's a single column, we can't really do much with it
            # as there aren't other comlumns to compare with.
            raise NotImplementedError(
                "ArgMaxTranslator can only be applied to a group of columns"
            )

        if axis != 1:
            # For axis=0 we would want to return the index of the row
            # with the maximum value, but we don't have a row identifier
            raise NotImplementedError("ArgMaxTranslator only supports axis=1")
        if keepdims != 1:
            raise NotImplementedError(
                "ArgMaxTranslator only supports retaining original dimensions"
            )

        keys = list(data.keys())

        # Generate a CASE THEN ELSE expression to find
        # which out of all the columns has the maximum value.
        case_expr = ibis.case()
        for idx, key in enumerate(keys):
            cond = None
            # Compare the current column with all other columns
            for j, other in enumerate(keys):
                if j == idx:
                    # Do not compare to yourself.
                    continue
                # When select_last_index is True
                # We use '>', otherwise '>=' so that we can pick the first occurrence.
                cmp_expr = (
                    data[key] > data[other]
                    if select_last_index
                    else data[key] >= data[other]
                )
                cond = cmp_expr if cond is None else cond & cmp_expr
            case_expr = case_expr.when(cond, idx)
        argmax_expr = case_expr.else_(0).end()

        self.set_output(argmax_expr)
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/argmax.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx__ArgMax.html
    data = self._variables.consume(self.inputs[0])
    axis = self._attributes.get("axis", 1)
    keepdims = self._attributes.get("keepdims", 1)
    select_last_index = self._attributes.get("select_last_index", 0)

    if not isinstance(data, dict):
        # if it's a single column, we can't really do much with it
        # as there aren't other comlumns to compare with.
        raise NotImplementedError(
            "ArgMaxTranslator can only be applied to a group of columns"
        )

    if axis != 1:
        # For axis=0 we would want to return the index of the row
        # with the maximum value, but we don't have a row identifier
        raise NotImplementedError("ArgMaxTranslator only supports axis=1")
    if keepdims != 1:
        raise NotImplementedError(
            "ArgMaxTranslator only supports retaining original dimensions"
        )

    keys = list(data.keys())

    # Generate a CASE THEN ELSE expression to find
    # which out of all the columns has the maximum value.
    case_expr = ibis.case()
    for idx, key in enumerate(keys):
        cond = None
        # Compare the current column with all other columns
        for j, other in enumerate(keys):
            if j == idx:
                # Do not compare to yourself.
                continue
            # When select_last_index is True
            # We use '>', otherwise '>=' so that we can pick the first occurrence.
            cmp_expr = (
                data[key] > data[other]
                if select_last_index
                else data[key] >= data[other]
            )
            cond = cmp_expr if cond is None else cond & cmp_expr
        case_expr = case_expr.when(cond, idx)
    argmax_expr = case_expr.else_(0).end()

    self.set_output(argmax_expr)

orbitalml.translation.steps.arrayfeatureextractor

ArrayFeatureExtractorTranslator

Bases: Translator

Processes an ArrayFeatureExtractor node and updates the variables with the output expression.

ArrayFeatureExtractor can be considered the opposit of :class:ConactTranslator, as in most cases it will be used to pick one or more features out of a group of column previously concatenated, or to pick a specific feature out of the result of an ArgMax operation.

The provided indices always refer to the last axis of the input tensor. If the input is a 2D tensor, the last axis is the column axis. So an index of 0 would mean the first column. If the input is a 1D tensor instead the last axis is the row axis. So an index of 0 would mean the first row.

This could be confusing because axis are inverted between tensors and orbitalml column groups. In the case of Tensors, index=0 means row=0, while instead in orbitalml column groups (by virtue of being a group of columns), index=0 means the first column.

We have to consider that the indices we receive, in case of column groups, are actually column indices, not row indices as in case of a tensor, the last index would be the column index. In case of single columns, instead the index is the index of a row like it would be with a 1D tensor.

Source code in orbitalml/translation/steps/arrayfeatureextractor.py
class ArrayFeatureExtractorTranslator(Translator):
    """Processes an ArrayFeatureExtractor node and updates the variables with the output expression.

    ArrayFeatureExtractor can be considered the opposit of :class:`ConactTranslator`, as
    in most cases it will be used to pick one or more features out of a group of column
    previously concatenated, or to pick a specific feature out of the result of an ArgMax operation.

    The provided indices always refer to the **last** axis of the input tensor.
    If the input is a 2D tensor, the last axis is the column axis. So an index
    of ``0`` would mean the first column. If the input is a 1D tensor instead the
    last axis is the row axis. So an index of ``0`` would mean the first row.

    This could be confusing because axis are inverted between tensors and orbitalml column groups.
    In the case of Tensors, index=0 means row=0, while instead in orbitalml
    column groups (by virtue of being a group of columns), index=0 means
    the first column.

    We have to consider that the indices we receive, in case of column groups,
    are actually column indices, not row indices as in case of a tensor,
    the last index would be the column index. In case of single columns,
    instead the index is the index of a row like it would be with a 1D tensor.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx_aionnxml_ArrayFeatureExtractor.html

        data = self._variables.consume(self.inputs[0])
        indices = self._variables.consume(self.inputs[1])

        if isinstance(data, VariablesGroup):
            # We are selecting a set of columns out of a column group

            # This expects that dictionaries are sorted by insertion order
            # AND that all values of the dictionary are columns.
            data_keys: list[str] = list(data.keys())
            data_values: list[ibis.Expr] = list(data.values())

            if not isinstance(indices, (list, tuple)):
                raise ValueError(
                    "ArrayFeatureExtractor expects a list of indices as input."
                )

            indices = typing.cast(list[int], indices)
            if len(indices) > len(data_keys):
                raise ValueError(
                    "Indices requested are more than the available numer of columns."
                )

            # Pick only the columns that are in the list of indicies.
            result = ValueVariablesGroup(
                {data_keys[i]: data_values[i] for i in indices}
            )
        elif isinstance(data, (tuple, list)):
            # We are selecting values out of a list of values
            # This is usually used to select "classes" out of a list of
            # possible values based on the variables that represents those classes.
            if not isinstance(indices, ibis.expr.types.Column):
                raise ValueError(
                    "ArrayFeatureExtractor expects a column as indices when picking from a group of values."
                )

            case_expr = ibis.case()
            for i, col in enumerate(data):
                case_expr = case_expr.when(indices == i, col)
            result = case_expr.else_(ibis.null()).end()
        else:
            raise NotImplementedError(
                "ArrayFeatureExtractor only supports column groups or lists of constants as input."
            )

        self.set_output(result)
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/arrayfeatureextractor.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx_aionnxml_ArrayFeatureExtractor.html

    data = self._variables.consume(self.inputs[0])
    indices = self._variables.consume(self.inputs[1])

    if isinstance(data, VariablesGroup):
        # We are selecting a set of columns out of a column group

        # This expects that dictionaries are sorted by insertion order
        # AND that all values of the dictionary are columns.
        data_keys: list[str] = list(data.keys())
        data_values: list[ibis.Expr] = list(data.values())

        if not isinstance(indices, (list, tuple)):
            raise ValueError(
                "ArrayFeatureExtractor expects a list of indices as input."
            )

        indices = typing.cast(list[int], indices)
        if len(indices) > len(data_keys):
            raise ValueError(
                "Indices requested are more than the available numer of columns."
            )

        # Pick only the columns that are in the list of indicies.
        result = ValueVariablesGroup(
            {data_keys[i]: data_values[i] for i in indices}
        )
    elif isinstance(data, (tuple, list)):
        # We are selecting values out of a list of values
        # This is usually used to select "classes" out of a list of
        # possible values based on the variables that represents those classes.
        if not isinstance(indices, ibis.expr.types.Column):
            raise ValueError(
                "ArrayFeatureExtractor expects a column as indices when picking from a group of values."
            )

        case_expr = ibis.case()
        for i, col in enumerate(data):
            case_expr = case_expr.when(indices == i, col)
        result = case_expr.else_(ibis.null()).end()
    else:
        raise NotImplementedError(
            "ArrayFeatureExtractor only supports column groups or lists of constants as input."
        )

    self.set_output(result)

orbitalml.translation.steps.cast

Translators for Cast and CastLike operations

CastTranslator

Bases: Translator

Processes a Cast node and updates the variables with the output expression.

Cast operation is used to convert a variable from one type to another one provided by the attribute to.

Source code in orbitalml/translation/steps/cast.py
class CastTranslator(Translator):
    """Processes a Cast node and updates the variables with the output expression.

    Cast operation is used to convert a variable from one type to another one
    provided by the attribute `to`.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx__Cast.html
        expr = self._variables.consume(self.inputs[0])
        to_type: int = typing.cast(int, self._attributes["to"])
        if to_type not in ONNX_TYPES_TO_IBIS:
            raise NotImplementedError(f"Cast: type {to_type} not supported")

        target_type = ONNX_TYPES_TO_IBIS[to_type]
        if isinstance(expr, VariablesGroup):
            casted = ValueVariablesGroup(
                {
                    k: self._optimizer.fold_cast(expr.as_value(k).cast(target_type))
                    for k in expr
                }
            )
            self.set_output(casted)
        elif isinstance(expr, ibis.Value):
            self.set_output(self._optimizer.fold_cast(expr.cast(target_type)))
        else:
            raise ValueError(
                f"Cast: expected a column group or a single column. Got {type(expr)}"
            )
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/cast.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx__Cast.html
    expr = self._variables.consume(self.inputs[0])
    to_type: int = typing.cast(int, self._attributes["to"])
    if to_type not in ONNX_TYPES_TO_IBIS:
        raise NotImplementedError(f"Cast: type {to_type} not supported")

    target_type = ONNX_TYPES_TO_IBIS[to_type]
    if isinstance(expr, VariablesGroup):
        casted = ValueVariablesGroup(
            {
                k: self._optimizer.fold_cast(expr.as_value(k).cast(target_type))
                for k in expr
            }
        )
        self.set_output(casted)
    elif isinstance(expr, ibis.Value):
        self.set_output(self._optimizer.fold_cast(expr.cast(target_type)))
    else:
        raise ValueError(
            f"Cast: expected a column group or a single column. Got {type(expr)}"
        )

CastLikeTranslator

Bases: Translator

Processes a CastLike node and updates the variables with the output expression.

CastLike operation is used to convert a variable from one type to the same type of another variable, thus uniforming the two

Source code in orbitalml/translation/steps/cast.py
class CastLikeTranslator(Translator):
    """Processes a CastLike node and updates the variables with the output expression.

    CastLike operation is used to convert a variable from one type to
    the same type of another variable, thus uniforming the two
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx__CastLike.html

        # Cast a variable to have the same type of another variable.
        # For the moment provide a very minimal implementation,
        # in most cases this is used to cast concatenated features to the same type
        # of another feature.
        expr = self._variables.consume(self.inputs[0])
        like_expr = self._variables.consume(self.inputs[1])

        # Assert that the first input is a dict (multiple concatenated columns).
        if not isinstance(expr, VariablesGroup):
            # TODO: Support single variables as well.
            #       This should be fairly straightforward to implement,
            #       but there hasn't been the need for it yet.
            raise NotImplementedError(
                "CastLike currently only supports casting a group of columns."
            )

        # Assert that the second input is a single expression.
        if isinstance(like_expr, VariablesGroup):
            raise NotImplementedError(
                "CastLike currently only supports casting to a single column type, not a group."
            )

        if not isinstance(like_expr, ibis.Value):
            raise ValueError(
                f"CastLike: expected a single column. Got {type(like_expr)}"
            )

        # Get the target type from the second input.
        target_type: ibis.DataType = like_expr.type()

        # Now cast each field in the dictionary to the target type.
        casted = ValueVariablesGroup(
            {
                key: self._optimizer.fold_cast(expr.as_value(key).cast(target_type))
                for key in expr
            }
        )
        self.set_output(casted)
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/cast.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx__CastLike.html

    # Cast a variable to have the same type of another variable.
    # For the moment provide a very minimal implementation,
    # in most cases this is used to cast concatenated features to the same type
    # of another feature.
    expr = self._variables.consume(self.inputs[0])
    like_expr = self._variables.consume(self.inputs[1])

    # Assert that the first input is a dict (multiple concatenated columns).
    if not isinstance(expr, VariablesGroup):
        # TODO: Support single variables as well.
        #       This should be fairly straightforward to implement,
        #       but there hasn't been the need for it yet.
        raise NotImplementedError(
            "CastLike currently only supports casting a group of columns."
        )

    # Assert that the second input is a single expression.
    if isinstance(like_expr, VariablesGroup):
        raise NotImplementedError(
            "CastLike currently only supports casting to a single column type, not a group."
        )

    if not isinstance(like_expr, ibis.Value):
        raise ValueError(
            f"CastLike: expected a single column. Got {type(like_expr)}"
        )

    # Get the target type from the second input.
    target_type: ibis.DataType = like_expr.type()

    # Now cast each field in the dictionary to the target type.
    casted = ValueVariablesGroup(
        {
            key: self._optimizer.fold_cast(expr.as_value(key).cast(target_type))
            for key in expr
        }
    )
    self.set_output(casted)

orbitalml.translation.steps.concat

Translator for Concat and FeatureVectorizer operations.

ConcatTranslator

Bases: Translator

Concatenate multiple columns into a single group of columns.

In tensor terms, this is meant to create a new tensor by concatenating the inputs along a given axis. In most cases, this is used to concatenate multiple features into a single one, thus its purpose is usually to create a column group from separate columns.

This means that the most common use case is axis=1, which means concatenating over the columns (by virtue of column/rows in tensors being flipped over column groups), and thus only axis=1 case is supported.

Source code in orbitalml/translation/steps/concat.py
class ConcatTranslator(Translator):
    """Concatenate multiple columns into a single group of columns.

    In tensor terms, this is meant to create a new tensor by concatenating
    the inputs along a given axis. In most cases, this is used to
    concatenate multiple features into a single one, thus its purpose
    is usually to create a column group from separate columns.

    This means that the most common use case is axis=1,
    which means concatenating over the columns (by virtue of
    column/rows in tensors being flipped over column groups),
    and thus only axis=1 case is supported.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx__Concat.html

        # Currently only support concatenating over columns,
        # we can't concatenate rows.
        if self._attributes["axis"] not in (1, -1):
            # -1 means last axis, which for 2D entities is equal axis=1
            raise NotImplementedError(
                "Concat currently only supports concatenating over columns (axis=1 or -1)."
            )
        self.set_output(self._concatenate_columns(self))

    @classmethod
    def _concatenate_columns(cls, translator: Translator) -> VariablesGroup:
        """Implement actual operation of concatenating columns.

        This is used by both Concat and FeatureVectorizer translators,
        as they both need to concatenate columns.
        """
        result = ValueVariablesGroup()

        for col in translator.inputs:
            feature = translator._variables.consume(col)
            if isinstance(feature, dict):
                # When the feature is a dictionary,  it means that it was previously
                # concatenated with other features. In pure ONNX terms it would be
                # a tensor, so when we concatenate it we should just merge all the values
                # like we would do when concatenating two tensors.
                for key in feature:
                    varname = col + "." + key
                    result[varname] = feature[key]
            elif isinstance(feature, ibis.Expr):
                result[col] = feature
            else:
                raise ValueError(
                    f"Concat: expected a column group or a single column. Got {type(feature)}"
                )

        return result
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/concat.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx__Concat.html

    # Currently only support concatenating over columns,
    # we can't concatenate rows.
    if self._attributes["axis"] not in (1, -1):
        # -1 means last axis, which for 2D entities is equal axis=1
        raise NotImplementedError(
            "Concat currently only supports concatenating over columns (axis=1 or -1)."
        )
    self.set_output(self._concatenate_columns(self))

FeatureVectorizerTranslator

Bases: Translator

Concatenate multiple columns into a single group of columns.

This is similar to Concat, but it is a simplified version that always only acts on columns, and does not support concatenating over rows. While Concat can in theory support rows concatenation, even though orbitalml doesn't implement it.

Source code in orbitalml/translation/steps/concat.py
class FeatureVectorizerTranslator(Translator):
    """Concatenate multiple columns into a single group of columns.

    This is similar to Concat, but it is a simplified version
    that always only acts on columns, and does not support
    concatenating over rows. While Concat can in theory
    support rows concatenation, even though orbitalml doesn't implement it.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx_aionnxml_FeatureVectorizer.html

        # We can support this by doing the same as Concat,
        # in most cases it's sufficient
        ninputdimensions = typing.cast(list[int], self._attributes["inputdimensions"])

        if len(ninputdimensions) != len(self._inputs):
            raise ValueError(
                "Number of input dimensions should be equal to number of inputs."
            )

        # Validate that dimensions are actually correct,
        # as inputdimensions is meant to provide the number of columns of each variable
        for input_idx, colname in enumerate(self.inputs):
            dimensions = ninputdimensions[input_idx]
            feature = self._variables.peek_variable(colname)
            if isinstance(feature, dict):
                if len(feature) != dimensions:
                    raise ValueError(
                        f"Number of columns in input {colname} should be equal to the number of dimensions, got {len(feature)} != {dimensions}"
                    )
            else:
                if dimensions != 1:
                    raise ValueError(
                        f"When merging over individual columns, the dimension should be 1, got {dimensions} for {colname}"
                    )

        self.set_output(ConcatTranslator._concatenate_columns(self))
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/concat.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx_aionnxml_FeatureVectorizer.html

    # We can support this by doing the same as Concat,
    # in most cases it's sufficient
    ninputdimensions = typing.cast(list[int], self._attributes["inputdimensions"])

    if len(ninputdimensions) != len(self._inputs):
        raise ValueError(
            "Number of input dimensions should be equal to number of inputs."
        )

    # Validate that dimensions are actually correct,
    # as inputdimensions is meant to provide the number of columns of each variable
    for input_idx, colname in enumerate(self.inputs):
        dimensions = ninputdimensions[input_idx]
        feature = self._variables.peek_variable(colname)
        if isinstance(feature, dict):
            if len(feature) != dimensions:
                raise ValueError(
                    f"Number of columns in input {colname} should be equal to the number of dimensions, got {len(feature)} != {dimensions}"
                )
        else:
            if dimensions != 1:
                raise ValueError(
                    f"When merging over individual columns, the dimension should be 1, got {dimensions} for {colname}"
                )

    self.set_output(ConcatTranslator._concatenate_columns(self))

orbitalml.translation.steps.div

Defines the translation step for the Div operation.

DivTranslator

Bases: Translator

Processes a Div node and updates the variables with the output expression.

This class is responsible for handling the division operation in the translation process. It takes two inputs: the first operand and the second operand (divisor).

The first operand can be a column group or a single column, while the second operand must be a constant value.

When the second operand is a single value, all columns of the column group are divided for that value. If the second operand is instead a list, each column of the column group is divided for the corresponding value in the list.

Source code in orbitalml/translation/steps/div.py
class DivTranslator(Translator):
    """Processes a Div node and updates the variables with the output expression.

    This class is responsible for handling the division operation in the
    translation process. It takes two inputs: the first operand and the second
    operand (divisor).

    The first operand can be a column group or a single column,
    while the second operand must be a constant value.

    When the second operand is a single value, all columns of the column
    group are divided for that value. If the second operand is instead
    a list, each column of the column group is divided for the corresponding
    value in the list.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx__Div.html

        first_operand = self._variables.consume(self.inputs[0])
        second_arg = self._variables.get_initializer_value(self.inputs[1])
        if second_arg is None or not isinstance(second_arg, (list, tuple)):
            raise NotImplementedError(
                "Div: Second input (divisor) must be a constant list."
            )

        if isinstance(first_operand, VariablesGroup):
            first_operand = NumericVariablesGroup(first_operand)
            struct_fields = list(first_operand.keys())
            for value in first_operand.values():
                if not isinstance(value, ibis.expr.types.NumericValue):
                    raise ValueError("Div: The first operand must be a numeric value.")

            first_operand = typing.cast(
                dict[str, ibis.expr.types.NumericValue], first_operand
            )
            if len(second_arg) == 1:
                second_arg = second_arg[0]
                if not isinstance(second_arg, (int, float)):
                    raise ValueError("Div: The second operand must be a numeric value.")
                self.set_output(
                    ValueVariablesGroup(
                        {
                            field: (
                                self._optimizer.fold_operation(
                                    first_operand[field] / ibis.literal(second_arg)
                                )
                            )
                            for field in struct_fields
                        }
                    )
                )
            else:
                if len(second_arg) != len(first_operand):
                    raise ValueError(
                        "The number of elements in the second operand must match the number of columns in the first operand."
                    )
                self.set_output(
                    ValueVariablesGroup(
                        {
                            field: (
                                self._optimizer.fold_operation(
                                    first_operand[field] / second_arg[i]
                                )
                            )
                            for i, field in enumerate(struct_fields)
                        }
                    )
                )
        else:
            if not isinstance(first_operand, ibis.expr.types.NumericValue):
                raise ValueError("Div: The first operand must be a numeric value.")
            if len(second_arg) != 1:
                raise ValueError(
                    "when first operand is a single column, second operand must contain only one value."
                )

            first_operand = typing.cast(ibis.expr.types.NumericValue, first_operand)
            self.set_output(self._optimizer.fold_operation(first_operand / second_arg))
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/div.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx__Div.html

    first_operand = self._variables.consume(self.inputs[0])
    second_arg = self._variables.get_initializer_value(self.inputs[1])
    if second_arg is None or not isinstance(second_arg, (list, tuple)):
        raise NotImplementedError(
            "Div: Second input (divisor) must be a constant list."
        )

    if isinstance(first_operand, VariablesGroup):
        first_operand = NumericVariablesGroup(first_operand)
        struct_fields = list(first_operand.keys())
        for value in first_operand.values():
            if not isinstance(value, ibis.expr.types.NumericValue):
                raise ValueError("Div: The first operand must be a numeric value.")

        first_operand = typing.cast(
            dict[str, ibis.expr.types.NumericValue], first_operand
        )
        if len(second_arg) == 1:
            second_arg = second_arg[0]
            if not isinstance(second_arg, (int, float)):
                raise ValueError("Div: The second operand must be a numeric value.")
            self.set_output(
                ValueVariablesGroup(
                    {
                        field: (
                            self._optimizer.fold_operation(
                                first_operand[field] / ibis.literal(second_arg)
                            )
                        )
                        for field in struct_fields
                    }
                )
            )
        else:
            if len(second_arg) != len(first_operand):
                raise ValueError(
                    "The number of elements in the second operand must match the number of columns in the first operand."
                )
            self.set_output(
                ValueVariablesGroup(
                    {
                        field: (
                            self._optimizer.fold_operation(
                                first_operand[field] / second_arg[i]
                            )
                        )
                        for i, field in enumerate(struct_fields)
                    }
                )
            )
    else:
        if not isinstance(first_operand, ibis.expr.types.NumericValue):
            raise ValueError("Div: The first operand must be a numeric value.")
        if len(second_arg) != 1:
            raise ValueError(
                "when first operand is a single column, second operand must contain only one value."
            )

        first_operand = typing.cast(ibis.expr.types.NumericValue, first_operand)
        self.set_output(self._optimizer.fold_operation(first_operand / second_arg))

orbitalml.translation.steps.gather

Defines the translation step for the Gather operation.

GatherTranslator

Bases: Translator

Processes a Gather node and updates the variables with the output expression.

The gather operations is meant to pick a specific value out of a column or column group.

The first operand can be a column group or a single column, while the second operand must be a constant value.

When the first operand is a column, the second operand must be 0 as there is only one column.

The operation could in theory be used to pick a specific row of columns by setting axis=0, but this is not supported in the current implementation.

Source code in orbitalml/translation/steps/gather.py
class GatherTranslator(Translator):
    """Processes a Gather node and updates the variables with the output expression.

    The gather operations is meant to pick a specific value out of a column or
    column group.

    The first operand can be a column group or a single column,
    while the second operand must be a constant value.

    When the first operand is a column, the second operand must be 0 as
    there is only one column.

    The operation could in theory be used to pick a specific row of columns
    by setting axis=0, but this is not supported in the current implementation.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx__Gather.html

        axis = self._attributes.get("axis", 0)
        if axis != 1:
            raise NotImplementedError(
                f"Gather: axis {axis} not supported, only selecting columns (axis=1) is supported"
            )

        expr = self._variables.consume(self.inputs[0])
        idx = self._variables.get_initializer_value(self.inputs[1])
        if not isinstance(idx, (tuple, list)) or len(idx) != 1:
            raise NotImplementedError(
                "Gather second operand must a list of one element"
            )

        idx = idx[0]  # TODO: Support gathering multiple columns
        if not isinstance(idx, int):
            raise ValueError("Gather: index must be an integer constant")

        if isinstance(expr, VariablesGroup):
            keys = list(expr.keys())
            if idx < 0 or idx >= len(keys):
                raise IndexError("Gather: index out of bounds")
            self.set_output(expr[keys[idx]])
        else:
            # Assume that if it's a single column by virtue of the fact that we only
            # support axis=1, then the index must be 0.
            if idx != 0:
                raise NotImplementedError(
                    f"Gather: index {idx} not supported for single columns"
                )
            self.set_output(expr)
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/gather.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx__Gather.html

    axis = self._attributes.get("axis", 0)
    if axis != 1:
        raise NotImplementedError(
            f"Gather: axis {axis} not supported, only selecting columns (axis=1) is supported"
        )

    expr = self._variables.consume(self.inputs[0])
    idx = self._variables.get_initializer_value(self.inputs[1])
    if not isinstance(idx, (tuple, list)) or len(idx) != 1:
        raise NotImplementedError(
            "Gather second operand must a list of one element"
        )

    idx = idx[0]  # TODO: Support gathering multiple columns
    if not isinstance(idx, int):
        raise ValueError("Gather: index must be an integer constant")

    if isinstance(expr, VariablesGroup):
        keys = list(expr.keys())
        if idx < 0 or idx >= len(keys):
            raise IndexError("Gather: index out of bounds")
        self.set_output(expr[keys[idx]])
    else:
        # Assume that if it's a single column by virtue of the fact that we only
        # support axis=1, then the index must be 0.
        if idx != 0:
            raise NotImplementedError(
                f"Gather: index {idx} not supported for single columns"
            )
        self.set_output(expr)

orbitalml.translation.steps.identity

Implementation of the Identity operator.

IdentityTranslator

Bases: Translator

Processes an Identity node and updates the variables with the output expression.

The identity node is a no-op, it simply passes the input to the output, it is meant to copy the input into the output, but as there could be multiple references to the same expression, it doesn't actually need to perform a copy.

Source code in orbitalml/translation/steps/identity.py
class IdentityTranslator(Translator):
    """Processes an Identity node and updates the variables with the output expression.

    The identity node is a no-op, it simply passes the input to the output,
    it is meant to copy the input into the output, but as there could be
    multiple references to the same expression, it doesn't actually need
    to perform a copy.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx__Identity.html

        self.set_output(self._variables.consume(self._inputs[0]))
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/identity.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx__Identity.html

    self.set_output(self._variables.consume(self._inputs[0]))

orbitalml.translation.steps.imputer

Implementation of the Imputer operator.

ImputerTranslator

Bases: Translator

Processes an Imputer node and updates the variables with the output expression.

The imputer node replaces missing values in the input expression with another value. Currently the only supported value is a float, which is used to replace all missing values in the input expression.

Source code in orbitalml/translation/steps/imputer.py
class ImputerTranslator(Translator):
    """Processes an Imputer node and updates the variables with the output expression.

    The imputer node replaces missing values in the input expression with
    another value. Currently the only supported value is a float, which is
    used to replace all missing values in the input expression.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx_aionnxml_Imputer.html

        imputed_values = self._attributes["imputed_value_floats"]
        if not isinstance(imputed_values, (tuple, list)):
            raise ValueError("Imputer: imputed_value must be a list or tuple of floats")

        expr = self._variables.consume(self.inputs[0])
        if isinstance(expr, VariablesGroup):
            keys = list(expr.keys())
            if len(keys) != len(imputed_values):
                raise ValueError(
                    "Imputer: number of imputed values does not match number of columns"
                )
            new_expr = ValueVariablesGroup()
            for i, key in enumerate(keys):
                new_expr[key] = ibis.coalesce(expr[key], imputed_values[i])
            self.set_output(new_expr)
        else:
            self.set_output(ibis.coalesce(expr, imputed_values[0]))
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/imputer.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx_aionnxml_Imputer.html

    imputed_values = self._attributes["imputed_value_floats"]
    if not isinstance(imputed_values, (tuple, list)):
        raise ValueError("Imputer: imputed_value must be a list or tuple of floats")

    expr = self._variables.consume(self.inputs[0])
    if isinstance(expr, VariablesGroup):
        keys = list(expr.keys())
        if len(keys) != len(imputed_values):
            raise ValueError(
                "Imputer: number of imputed values does not match number of columns"
            )
        new_expr = ValueVariablesGroup()
        for i, key in enumerate(keys):
            new_expr[key] = ibis.coalesce(expr[key], imputed_values[i])
        self.set_output(new_expr)
    else:
        self.set_output(ibis.coalesce(expr, imputed_values[0]))

orbitalml.translation.steps.labelencoder

Implementation of the LabelEncoder operator.

LabelEncoderTranslator

Bases: Translator

Processes a LabelEncoder node and updates the variables with the output expression.

LabelEncoder is used to map values from one variable to values of another one. It is usually meant to map numeric values to categories

Source code in orbitalml/translation/steps/labelencoder.py
class LabelEncoderTranslator(Translator):
    """Processes a LabelEncoder node and updates the variables with the output expression.

    LabelEncoder is used to map values from one variable to values of another one.
    It is usually meant to map numeric values to categories
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx_aionnxml_LabelEncoder.html

        input_values = self._variables.consume(self.inputs[0])

        # Automatically find attributes that start with "keys_", "values_", and "default_"
        mapping_keys = next(
            (
                attr_value
                for attr_name, attr_value in self._attributes.items()
                if attr_name.startswith("keys_")
            ),
            None,
        )
        mapping_values = next(
            (
                attr_value
                for attr_name, attr_value in self._attributes.items()
                if attr_name.startswith("values_")
            ),
            None,
        )
        default = next(
            (
                attr_value
                for attr_name, attr_value in self._attributes.items()
                if attr_name.startswith("default_")
            ),
            None,
        )
        if mapping_keys is None or mapping_values is None:
            raise ValueError("LabelEncoder: required mapping attributes not found.")
        if not isinstance(mapping_values, (tuple, list)) or not isinstance(
            mapping_keys, (tuple, list)
        ):
            raise ValueError("LabelEncoder: mapping must be a list of keys and values")

        if default is None:
            value_sample = mapping_values[0]
            if isinstance(value_sample, int):
                default = -1
            elif isinstance(value_sample, str):
                default = "_Unused"
            elif isinstance(value_sample, float):
                default = -0.0
            else:
                raise ValueError(
                    f"LabelEncoder: unsupported values attribute type: {mapping_values}"
                )

        case_expr = ibis.case()
        for k, v in zip(mapping_keys, mapping_values):
            case_expr = case_expr.when(input_values == k, v)
        case_expr = case_expr.else_(default).end()

        if not isinstance(case_expr, ibis.Value):
            raise NotImplementedError("Deferred case expression not supported")
        case_expr = self._optimizer.fold_case(case_expr)

        self.set_output(case_expr)
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/labelencoder.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx_aionnxml_LabelEncoder.html

    input_values = self._variables.consume(self.inputs[0])

    # Automatically find attributes that start with "keys_", "values_", and "default_"
    mapping_keys = next(
        (
            attr_value
            for attr_name, attr_value in self._attributes.items()
            if attr_name.startswith("keys_")
        ),
        None,
    )
    mapping_values = next(
        (
            attr_value
            for attr_name, attr_value in self._attributes.items()
            if attr_name.startswith("values_")
        ),
        None,
    )
    default = next(
        (
            attr_value
            for attr_name, attr_value in self._attributes.items()
            if attr_name.startswith("default_")
        ),
        None,
    )
    if mapping_keys is None or mapping_values is None:
        raise ValueError("LabelEncoder: required mapping attributes not found.")
    if not isinstance(mapping_values, (tuple, list)) or not isinstance(
        mapping_keys, (tuple, list)
    ):
        raise ValueError("LabelEncoder: mapping must be a list of keys and values")

    if default is None:
        value_sample = mapping_values[0]
        if isinstance(value_sample, int):
            default = -1
        elif isinstance(value_sample, str):
            default = "_Unused"
        elif isinstance(value_sample, float):
            default = -0.0
        else:
            raise ValueError(
                f"LabelEncoder: unsupported values attribute type: {mapping_values}"
            )

    case_expr = ibis.case()
    for k, v in zip(mapping_keys, mapping_values):
        case_expr = case_expr.when(input_values == k, v)
    case_expr = case_expr.else_(default).end()

    if not isinstance(case_expr, ibis.Value):
        raise NotImplementedError("Deferred case expression not supported")
    case_expr = self._optimizer.fold_case(case_expr)

    self.set_output(case_expr)

orbitalml.translation.steps.linearclass

Implementation of the LinearClassifier operator.

LinearClassifierTranslator

Bases: Translator

Processes a LinearClassifier node and updates variables with the classification results.

The LinearClassifier operator computes classification outputs as: Scores = X * coefficients + intercepts

For more complex pipelines the LinearClassifier operator is not always used, usually a combination of Mul and Add operations is used.

Source code in orbitalml/translation/steps/linearclass.py
class LinearClassifierTranslator(Translator):
    """Processes a LinearClassifier node and updates variables with the classification results.

    The LinearClassifier operator computes classification outputs as:
    Scores = X * coefficients + intercepts

    For more complex pipelines the LinearClassifier operator is not always used,
    usually a combination of Mul and Add operations is used.
    """

    def process(self) -> None:
        """Performs the translation and sets the output variables Y (predictions) and Z (scores)."""
        # https://onnx.ai/onnx/operators/onnx_aionnxml_LinearClassifier.html
        coefficients = typing.cast(list[float], self._attributes["coefficients"])
        intercepts = typing.cast(list[float], self._attributes.get("intercepts", []))
        multi_class = typing.cast(int, self._attributes.get("multi_class", 0))
        post_transform = typing.cast(
            str, self._attributes.get("post_transform", "NONE")
        )

        if multi_class != 0:
            raise NotImplementedError("Multi-class classification is not implemented.")

        classlabels: list[str] | list[int] | None = typing.cast(
            list[int] | None, self._attributes.get("classlabels_ints")
        ) or typing.cast(list[str] | None, self._attributes.get("classlabels_strings"))

        if classlabels is None:
            raise ValueError(
                "LinearClassifier: classlabels_ints or classlabels_strings must be defined."
            )

        if len(self._inputs) != 1:
            raise ValueError("LinearClassifier node must have exactly 1 input.")

        input_operand = self._variables.consume(self._inputs[0])

        # Standardize input_operand to a columns group,
        # so that we can reuse a single implementation.
        if not isinstance(input_operand, VariablesGroup):
            input_operand = ValueVariablesGroup({"feature": input_operand})

        num_features = len(input_operand)
        num_classes = len(classlabels)

        if len(coefficients) != num_classes * num_features:
            raise ValueError(
                "Coefficients length must equal number of classes × number of input fields."
            )

        fieldsgroup = NumericVariablesGroup(input_operand)
        fields = list(fieldsgroup.values())
        scores = []

        for class_idx in range(num_classes):
            start = class_idx * num_features
            end = start + num_features
            coef_slice = coefficients[start:end]
            intercept = intercepts[class_idx] if intercepts else 0.0

            score = ibis.literal(intercept)
            for val, coef in zip(fields, coef_slice):
                score += val * coef

            score = self._apply_post_transform(score, post_transform)
            scores.append(self._optimizer.fold_operation(score))

        scores_struct = ValueVariablesGroup(
            {str(label): score for label, score in zip(classlabels, scores)}
        )

        max_score = ibis.greatest(*scores_struct.values())
        predictions = ibis.case()
        for label, score in scores_struct.items():
            predictions = predictions.when(score == max_score, label)
        predictions = predictions.end()

        self.set_output(predictions, index=0)
        self.set_output(scores_struct, index=1)

    @classmethod
    def _apply_post_transform(
        cls, score: ibis.expr.types.NumericValue, transform: str
    ) -> ibis.expr.types.NumericValue:
        # TODO: Move to a dedicated set of post-transform
        #       functions together with SOFTMAX
        if transform == "LOGISTIC":
            return 1 / (1 + (-score).exp())
        elif transform == "NONE":
            return score
        else:
            # TODO: apply more post_transform here if needed
            raise NotImplementedError(
                f"Post transform '{transform}' is not implemented."
            )
process
process() -> None

Performs the translation and sets the output variables Y (predictions) and Z (scores).

Source code in orbitalml/translation/steps/linearclass.py
def process(self) -> None:
    """Performs the translation and sets the output variables Y (predictions) and Z (scores)."""
    # https://onnx.ai/onnx/operators/onnx_aionnxml_LinearClassifier.html
    coefficients = typing.cast(list[float], self._attributes["coefficients"])
    intercepts = typing.cast(list[float], self._attributes.get("intercepts", []))
    multi_class = typing.cast(int, self._attributes.get("multi_class", 0))
    post_transform = typing.cast(
        str, self._attributes.get("post_transform", "NONE")
    )

    if multi_class != 0:
        raise NotImplementedError("Multi-class classification is not implemented.")

    classlabels: list[str] | list[int] | None = typing.cast(
        list[int] | None, self._attributes.get("classlabels_ints")
    ) or typing.cast(list[str] | None, self._attributes.get("classlabels_strings"))

    if classlabels is None:
        raise ValueError(
            "LinearClassifier: classlabels_ints or classlabels_strings must be defined."
        )

    if len(self._inputs) != 1:
        raise ValueError("LinearClassifier node must have exactly 1 input.")

    input_operand = self._variables.consume(self._inputs[0])

    # Standardize input_operand to a columns group,
    # so that we can reuse a single implementation.
    if not isinstance(input_operand, VariablesGroup):
        input_operand = ValueVariablesGroup({"feature": input_operand})

    num_features = len(input_operand)
    num_classes = len(classlabels)

    if len(coefficients) != num_classes * num_features:
        raise ValueError(
            "Coefficients length must equal number of classes × number of input fields."
        )

    fieldsgroup = NumericVariablesGroup(input_operand)
    fields = list(fieldsgroup.values())
    scores = []

    for class_idx in range(num_classes):
        start = class_idx * num_features
        end = start + num_features
        coef_slice = coefficients[start:end]
        intercept = intercepts[class_idx] if intercepts else 0.0

        score = ibis.literal(intercept)
        for val, coef in zip(fields, coef_slice):
            score += val * coef

        score = self._apply_post_transform(score, post_transform)
        scores.append(self._optimizer.fold_operation(score))

    scores_struct = ValueVariablesGroup(
        {str(label): score for label, score in zip(classlabels, scores)}
    )

    max_score = ibis.greatest(*scores_struct.values())
    predictions = ibis.case()
    for label, score in scores_struct.items():
        predictions = predictions.when(score == max_score, label)
    predictions = predictions.end()

    self.set_output(predictions, index=0)
    self.set_output(scores_struct, index=1)

orbitalml.translation.steps.linearreg

Implementation of the LinearRegression operator.

LinearRegressorTranslator

Bases: Translator

Processes a LinearRegression node and updates variables with the predicted expression.

The LinearRegression operator computes predictions as: Y = X * coefficients + intercept

For more complex pipelines the LinearRegression operator is not always used, usually a combination of Mul and Add operations is used.

Source code in orbitalml/translation/steps/linearreg.py
class LinearRegressorTranslator(Translator):
    """Processes a LinearRegression node and updates variables with the predicted expression.

    The LinearRegression operator computes predictions as:
    Y = X * coefficients + intercept

    For more complex pipelines the LinearRegression operator is not always used,
    usually a combination of Mul and Add operations is used.
    """

    def process(self) -> None:
        """Performs the translation and sets the output variable."""
        # https://onnx.ai/onnx/operators/onnx_aionnxml_LinearRegressor.html
        coefficients = typing.cast(list[float], self._attributes["coefficients"])
        intercepts = typing.cast(list[float], self._attributes.get("intercepts", [0.0]))
        targets = typing.cast(int, self._attributes.get("targets", 1))

        post_transform = self._attributes.get("post_transform", "NONE")
        if post_transform != "NONE":
            raise NotImplementedError("Post transform is not implemented.")

        if len(intercepts) not in [0, targets]:
            raise ValueError(
                "LinearRegressor: intercepts length must match targets or be empty."
            )

        if len(self._inputs) != 1:
            raise ValueError("LinearRegressor node must have exactly 1 input.")

        input_operand = self._variables.consume(self._inputs[0])

        if isinstance(input_operand, VariablesGroup):
            input_operand = NumericVariablesGroup(input_operand)
            num_features = len(input_operand)

            if len(coefficients) != targets * num_features:
                raise ValueError(
                    "Coefficients length must equal targets number of input fields."
                )

            results = {}
            fields = list(input_operand.values())

            for target_idx in range(targets):
                start = target_idx * num_features
                end = start + num_features
                coef_slice = coefficients[start:end]

                intercept = intercepts[target_idx] if intercepts else 0.0

                prediction = ibis.literal(intercept)
                for val, coef in zip(fields, coef_slice):
                    prediction += val * coef

                # TODO: apply post_transform here if needed

                results[f"target_{target_idx}"] = self._optimizer.fold_operation(
                    prediction
                )

            self.set_output(ValueVariablesGroup(results))

        else:
            input_operand = typing.cast(ibis.expr.types.NumericValue, input_operand)

            if targets != 1 or len(coefficients) != 1:
                raise ValueError(
                    "Single column input expects exactly one target and one coefficient."
                )

            intercept = intercepts[0] if intercepts else 0.0
            prediction = (input_operand * coefficients[0]) + intercept

            # TODO: apply post_transform here if needed

            self.set_output(self._optimizer.fold_operation(prediction))
process
process() -> None

Performs the translation and sets the output variable.

Source code in orbitalml/translation/steps/linearreg.py
def process(self) -> None:
    """Performs the translation and sets the output variable."""
    # https://onnx.ai/onnx/operators/onnx_aionnxml_LinearRegressor.html
    coefficients = typing.cast(list[float], self._attributes["coefficients"])
    intercepts = typing.cast(list[float], self._attributes.get("intercepts", [0.0]))
    targets = typing.cast(int, self._attributes.get("targets", 1))

    post_transform = self._attributes.get("post_transform", "NONE")
    if post_transform != "NONE":
        raise NotImplementedError("Post transform is not implemented.")

    if len(intercepts) not in [0, targets]:
        raise ValueError(
            "LinearRegressor: intercepts length must match targets or be empty."
        )

    if len(self._inputs) != 1:
        raise ValueError("LinearRegressor node must have exactly 1 input.")

    input_operand = self._variables.consume(self._inputs[0])

    if isinstance(input_operand, VariablesGroup):
        input_operand = NumericVariablesGroup(input_operand)
        num_features = len(input_operand)

        if len(coefficients) != targets * num_features:
            raise ValueError(
                "Coefficients length must equal targets number of input fields."
            )

        results = {}
        fields = list(input_operand.values())

        for target_idx in range(targets):
            start = target_idx * num_features
            end = start + num_features
            coef_slice = coefficients[start:end]

            intercept = intercepts[target_idx] if intercepts else 0.0

            prediction = ibis.literal(intercept)
            for val, coef in zip(fields, coef_slice):
                prediction += val * coef

            # TODO: apply post_transform here if needed

            results[f"target_{target_idx}"] = self._optimizer.fold_operation(
                prediction
            )

        self.set_output(ValueVariablesGroup(results))

    else:
        input_operand = typing.cast(ibis.expr.types.NumericValue, input_operand)

        if targets != 1 or len(coefficients) != 1:
            raise ValueError(
                "Single column input expects exactly one target and one coefficient."
            )

        intercept = intercepts[0] if intercepts else 0.0
        prediction = (input_operand * coefficients[0]) + intercept

        # TODO: apply post_transform here if needed

        self.set_output(self._optimizer.fold_operation(prediction))

orbitalml.translation.steps.matmul

Implementation of the LabelEncoder operator.

MatMulTranslator

Bases: Translator

Processes a MatMul node and updates the variables with the output expression.

This class is responsible for handling the matrix multiplication operation in the translation process. It takes two inputs: the first operand and the second operand (coefficient tensor). The first operand can be a column group or a single column, while the second operand must be a constant value.

When the second operand is a single value, all columns of the column group are multiplied by that value. If the second operand is instead a list, each column of the column group is multiplied by the corresponding value in the list.

Source code in orbitalml/translation/steps/matmul.py
class MatMulTranslator(Translator):
    """Processes a MatMul node and updates the variables with the output expression.

    This class is responsible for handling the matrix multiplication operation
    in the translation process. It takes two inputs: the first operand and the
    second operand (coefficient tensor).
    The first operand can be a column group or a single column,
    while the second operand must be a constant value.

    When the second operand is a single value, all columns of the column
    group are multiplied by that value. If the second operand is instead
    a list, each column of the column group is multiplied by the
    corresponding value in the list.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx__MatMul.html

        coef_tensor = self._variables.get_initializer(self.inputs[1])
        if coef_tensor is None:
            raise ValueError(
                "Coefficient tensor (second input) not found in initializers."
            )
        coef_shape = list(coef_tensor.dims)
        if len(coef_shape) not in (1, 2):
            raise ValueError(
                "MatMul with coefficient tensor rank > 2 is not supported."
            )

        coef = self._variables.get_initializer_value(self.inputs[1])
        if coef is None or not isinstance(coef, (list, tuple)):
            raise NotImplementedError(
                "MatMul: Second input (divisor) must be a constant list."
            )
        coef_type_check = coef[0]
        if not isinstance(coef_type_check, (int, float)):
            raise ValueError("MatMul: The second operand must be a numeric value.")

        first_operand = self._variables.consume(self.inputs[0])
        operand_type_check = first_operand
        if isinstance(operand_type_check, dict):
            operand_type_check = list(operand_type_check.values())[0]
        if not isinstance(operand_type_check, ibis.expr.types.NumericValue):
            raise ValueError(
                "MatMul: The first operand must be a numeric column or a column group of numerics."
            )

        # Case 1: left operand is a dict (multiple columns)
        if isinstance(first_operand, dict):
            left_exprs: list[ibis.expr.types.NumericValue] = list(
                first_operand.values()
            )
            num_features = len(left_exprs)
            if len(coef_shape) == 1:
                # Coefficient vector: expected shape (num_features,)
                if num_features != coef_shape[0]:
                    raise ValueError(
                        "Mismatch: number of features and coefficient vector length"
                    )
                result = sum(
                    self._optimizer.fold_contiguous_sum(
                        [
                            self._optimizer.fold_operation(left_exprs[i] * coef[i])
                            for i in range(num_features)
                        ]
                    )
                )
                self.set_output(result)
            elif len(coef_shape) == 2:
                # Coefficient matrix: expected shape (num_features, output_dim)
                if num_features != coef_shape[0]:
                    raise ValueError(
                        "Mismatch: number of features and coefficient matrix rows"
                    )
                output_dim = coef_shape[1]
                result_list: list[ibis.expr.types.NumericValue] = [
                    sum(
                        self._optimizer.fold_contiguous_sum(
                            [
                                self._optimizer.fold_operation(
                                    left_exprs[i] * coef[i * output_dim + j]
                                )
                                for i in range(num_features)
                            ]
                        )
                    )
                    for j in range(output_dim)
                ]
                if output_dim == 1:
                    result = result_list[0]
                else:
                    # Return a dict of output expressions if there are multiple output columns.
                    result = ValueVariablesGroup(
                        {f"out_{j}": result_list[j] for j in range(output_dim)}
                    )
                self.set_output(result)
            else:
                raise NotImplementedError(
                    "MatMul with coefficient tensor rank > 2 is not supported"
                )
        else:
            first_operand = typing.cast(ibis.expr.types.NumericValue, first_operand)
            # Case 2: left operand is a single expression.
            if len(coef_shape) == 1:
                # Expect a single coefficient.
                if coef_shape[0] != 1:
                    raise ValueError(
                        "Expected coefficient vector of length 1 for single operand"
                    )
                self.set_output(self._optimizer.fold_operation(first_operand * coef[0]))
            elif len(coef_shape) == 2:
                # Two possible shapes: [1, N] or [N, 1]
                if coef_shape[0] == 1:
                    output_dim = coef_shape[1]
                    result_list = [
                        self._optimizer.fold_operation(first_operand * coef[j])
                        for j in range(output_dim)
                    ]
                    if output_dim == 1:
                        result = result_list[0]
                        self.set_output(result_list[0])
                    else:
                        result = ValueVariablesGroup(
                            {f"out_{j}": result_list[j] for j in range(output_dim)}
                        )
                    self.set_output(result)
                elif coef_shape[1] == 1:
                    # This case implies the left operand is a vector of length matching coef_shape[0],
                    # but a single expression cannot be indexed. We mark this as not supported.
                    raise NotImplementedError(
                        "MatMul with left operand as single column and coefficient matrix shape [N,1] is not supported"
                    )
                else:
                    raise NotImplementedError(
                        "Unexpected coefficient shape for single operand"
                    )
            else:
                raise NotImplementedError(
                    "MatMul with coefficient tensor rank > 2 is not supported"
                )
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/matmul.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx__MatMul.html

    coef_tensor = self._variables.get_initializer(self.inputs[1])
    if coef_tensor is None:
        raise ValueError(
            "Coefficient tensor (second input) not found in initializers."
        )
    coef_shape = list(coef_tensor.dims)
    if len(coef_shape) not in (1, 2):
        raise ValueError(
            "MatMul with coefficient tensor rank > 2 is not supported."
        )

    coef = self._variables.get_initializer_value(self.inputs[1])
    if coef is None or not isinstance(coef, (list, tuple)):
        raise NotImplementedError(
            "MatMul: Second input (divisor) must be a constant list."
        )
    coef_type_check = coef[0]
    if not isinstance(coef_type_check, (int, float)):
        raise ValueError("MatMul: The second operand must be a numeric value.")

    first_operand = self._variables.consume(self.inputs[0])
    operand_type_check = first_operand
    if isinstance(operand_type_check, dict):
        operand_type_check = list(operand_type_check.values())[0]
    if not isinstance(operand_type_check, ibis.expr.types.NumericValue):
        raise ValueError(
            "MatMul: The first operand must be a numeric column or a column group of numerics."
        )

    # Case 1: left operand is a dict (multiple columns)
    if isinstance(first_operand, dict):
        left_exprs: list[ibis.expr.types.NumericValue] = list(
            first_operand.values()
        )
        num_features = len(left_exprs)
        if len(coef_shape) == 1:
            # Coefficient vector: expected shape (num_features,)
            if num_features != coef_shape[0]:
                raise ValueError(
                    "Mismatch: number of features and coefficient vector length"
                )
            result = sum(
                self._optimizer.fold_contiguous_sum(
                    [
                        self._optimizer.fold_operation(left_exprs[i] * coef[i])
                        for i in range(num_features)
                    ]
                )
            )
            self.set_output(result)
        elif len(coef_shape) == 2:
            # Coefficient matrix: expected shape (num_features, output_dim)
            if num_features != coef_shape[0]:
                raise ValueError(
                    "Mismatch: number of features and coefficient matrix rows"
                )
            output_dim = coef_shape[1]
            result_list: list[ibis.expr.types.NumericValue] = [
                sum(
                    self._optimizer.fold_contiguous_sum(
                        [
                            self._optimizer.fold_operation(
                                left_exprs[i] * coef[i * output_dim + j]
                            )
                            for i in range(num_features)
                        ]
                    )
                )
                for j in range(output_dim)
            ]
            if output_dim == 1:
                result = result_list[0]
            else:
                # Return a dict of output expressions if there are multiple output columns.
                result = ValueVariablesGroup(
                    {f"out_{j}": result_list[j] for j in range(output_dim)}
                )
            self.set_output(result)
        else:
            raise NotImplementedError(
                "MatMul with coefficient tensor rank > 2 is not supported"
            )
    else:
        first_operand = typing.cast(ibis.expr.types.NumericValue, first_operand)
        # Case 2: left operand is a single expression.
        if len(coef_shape) == 1:
            # Expect a single coefficient.
            if coef_shape[0] != 1:
                raise ValueError(
                    "Expected coefficient vector of length 1 for single operand"
                )
            self.set_output(self._optimizer.fold_operation(first_operand * coef[0]))
        elif len(coef_shape) == 2:
            # Two possible shapes: [1, N] or [N, 1]
            if coef_shape[0] == 1:
                output_dim = coef_shape[1]
                result_list = [
                    self._optimizer.fold_operation(first_operand * coef[j])
                    for j in range(output_dim)
                ]
                if output_dim == 1:
                    result = result_list[0]
                    self.set_output(result_list[0])
                else:
                    result = ValueVariablesGroup(
                        {f"out_{j}": result_list[j] for j in range(output_dim)}
                    )
                self.set_output(result)
            elif coef_shape[1] == 1:
                # This case implies the left operand is a vector of length matching coef_shape[0],
                # but a single expression cannot be indexed. We mark this as not supported.
                raise NotImplementedError(
                    "MatMul with left operand as single column and coefficient matrix shape [N,1] is not supported"
                )
            else:
                raise NotImplementedError(
                    "Unexpected coefficient shape for single operand"
                )
        else:
            raise NotImplementedError(
                "MatMul with coefficient tensor rank > 2 is not supported"
            )

orbitalml.translation.steps.mul

Translate an Mul operation to the equivalent query expression.

MulTranslator

Bases: Translator

Processes an Mul node and updates the variables with the output expression.

Given the node to translate, the variables and constants available for the translation context, generates a query expression that processes the input variables and produces a new output variable that computes based on the Mul operation.

Source code in orbitalml/translation/steps/mul.py
class MulTranslator(Translator):
    """Processes an Mul node and updates the variables with the output expression.

    Given the node to translate, the variables and constants available for
    the translation context, generates a query expression that processes
    the input variables and produces a new output variable that computes
    based on the Mul operation.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx__Mul.html

        first_operand = self._variables.consume(self._inputs[0])
        second_operand = self._variables.get_initializer_value(self._inputs[1])
        if second_operand is None or not isinstance(second_operand, (list, tuple)):
            raise NotImplementedError(
                "Mul: Second input (divisor) must be a constant list."
            )

        type_check_var = first_operand
        if isinstance(type_check_var, VariablesGroup):
            type_check_var = next(iter(type_check_var.values()), None)
        if not isinstance(type_check_var, ibis.expr.types.NumericValue):
            raise ValueError("Mul: The first operand must be a numeric value.")

        add_values = list(second_operand)
        if isinstance(first_operand, VariablesGroup):
            first_operand = NumericVariablesGroup(first_operand)
            struct_fields = list(first_operand.keys())
            if len(add_values) != len(struct_fields):
                # TODO: Implement dividing by a single value,
                #       see Div implementation.
                raise ValueError(
                    "When the first operand is a group of columns, the second operand must contain the same number of values"
                )
            self.set_output(
                ValueVariablesGroup(
                    {
                        field: (
                            self._optimizer.fold_operation(
                                first_operand[field] * add_values[i]
                            )
                        )
                        for i, field in enumerate(struct_fields)
                    }
                )
            )
        else:
            if len(add_values) != 1:
                raise ValueError(
                    "When the first operand is a single column, the second operand must contain exactly 1 value"
                )
            first_operand = typing.cast(ibis.expr.types.NumericValue, first_operand)
            self.set_output(
                self._optimizer.fold_operation(first_operand * add_values[0])
            )
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/mul.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx__Mul.html

    first_operand = self._variables.consume(self._inputs[0])
    second_operand = self._variables.get_initializer_value(self._inputs[1])
    if second_operand is None or not isinstance(second_operand, (list, tuple)):
        raise NotImplementedError(
            "Mul: Second input (divisor) must be a constant list."
        )

    type_check_var = first_operand
    if isinstance(type_check_var, VariablesGroup):
        type_check_var = next(iter(type_check_var.values()), None)
    if not isinstance(type_check_var, ibis.expr.types.NumericValue):
        raise ValueError("Mul: The first operand must be a numeric value.")

    add_values = list(second_operand)
    if isinstance(first_operand, VariablesGroup):
        first_operand = NumericVariablesGroup(first_operand)
        struct_fields = list(first_operand.keys())
        if len(add_values) != len(struct_fields):
            # TODO: Implement dividing by a single value,
            #       see Div implementation.
            raise ValueError(
                "When the first operand is a group of columns, the second operand must contain the same number of values"
            )
        self.set_output(
            ValueVariablesGroup(
                {
                    field: (
                        self._optimizer.fold_operation(
                            first_operand[field] * add_values[i]
                        )
                    )
                    for i, field in enumerate(struct_fields)
                }
            )
        )
    else:
        if len(add_values) != 1:
            raise ValueError(
                "When the first operand is a single column, the second operand must contain exactly 1 value"
            )
        first_operand = typing.cast(ibis.expr.types.NumericValue, first_operand)
        self.set_output(
            self._optimizer.fold_operation(first_operand * add_values[0])
        )

orbitalml.translation.steps.onehotencoder

Implementation of the OneHotEncoder operator.

OneHotEncoderTranslator

Bases: Translator

Processes a MatMul node and updates the variables with the output expression.

Given a categorical variable, this class creates a new group of columns, with one column for each category. The values of the column are 1.0 if the original column value is equal to the category, and 0.0 otherwise.

It supports only strings for categories and emits floats as column values.

Source code in orbitalml/translation/steps/onehotencoder.py
class OneHotEncoderTranslator(Translator):
    """Processes a MatMul node and updates the variables with the output expression.

    Given a categorical variable, this class creates a new group of columns,
    with one column for each category. The values of the column are 1.0
    if the original column value is equal to the category, and 0.0 otherwise.

    It supports only strings for categories and emits floats as column
    values.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx_aionnxml_OneHotEncoder.html
        cats = typing.cast(list[str], self._attributes.get("cats_strings"))
        if not isinstance(cats, list):
            # We currently only support string values for categories
            raise ValueError("OneHotEncoder: attribute cats_strings not found")

        input_expr = self._variables.consume(self.inputs[0])
        if not isinstance(input_expr, ibis.Value):
            raise ValueError("OneHotEncoder: input expression not found")

        casted_variables = [
            ibis.ifelse(input_expr == cat, 1, 0)
            .cast("float64")
            .name(self.variable_unique_short_alias("oh"))
            for cat in cats
        ]

        # OneHot encoded features are usually consumed multiple times
        # by subsequent operations, so preserving them makes sense.
        casted_variables = self.preserve(*casted_variables)
        self.set_output(
            ValueVariablesGroup(
                {cat: casted_variables[i] for i, cat in enumerate(cats)}
            )
        )
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/onehotencoder.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx_aionnxml_OneHotEncoder.html
    cats = typing.cast(list[str], self._attributes.get("cats_strings"))
    if not isinstance(cats, list):
        # We currently only support string values for categories
        raise ValueError("OneHotEncoder: attribute cats_strings not found")

    input_expr = self._variables.consume(self.inputs[0])
    if not isinstance(input_expr, ibis.Value):
        raise ValueError("OneHotEncoder: input expression not found")

    casted_variables = [
        ibis.ifelse(input_expr == cat, 1, 0)
        .cast("float64")
        .name(self.variable_unique_short_alias("oh"))
        for cat in cats
    ]

    # OneHot encoded features are usually consumed multiple times
    # by subsequent operations, so preserving them makes sense.
    casted_variables = self.preserve(*casted_variables)
    self.set_output(
        ValueVariablesGroup(
            {cat: casted_variables[i] for i, cat in enumerate(cats)}
        )
    )

orbitalml.translation.steps.reshape

Implementation of the Reshape operator.

ReshapeTranslator

Bases: Translator

Processes a Reshape node and updates the variables with the output expression.

Reshape is currently a noop operation, it only supports cases where it doesn't have to change the data shape. That is generally not possible to support columns of different length in the same expressions/table so we can't really change the shape of a column as it implies changing its length.

Source code in orbitalml/translation/steps/reshape.py
class ReshapeTranslator(Translator):
    """Processes a Reshape node and updates the variables with the output expression.

    Reshape is currently a noop operation, it only supports cases where
    it doesn't have to change the data shape.
    That is generally not possible to support columns of different length in
    the same expressions/table so we can't really change the shape of a column
    as it implies changing its length.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""

        # https://onnx.ai/onnx/operators/onnx__Reshape.html
        first_operand = self._variables.consume(self.inputs[0])
        if isinstance(first_operand, dict):
            first_operand_len = len(first_operand)
        else:
            first_operand_len = 1

        shape = self._variables.get_initializer_value(self.inputs[1])
        if not isinstance(shape, list) or not isinstance(shape[0], int):
            # Reshape explicitly requires ints.
            raise NotImplementedError("Reshape: requires integer values for the shape.")

        if shape[0] != -1:
            # We don't support changing the numer of rows
            raise NotImplementedError("Reshape can't change the number of rows")

        if len(shape) == 1 and first_operand_len == 1:
            # We can reshape a single column to a single column
            # nothing has changed.
            pass
        elif len(shape) == 2 and shape[1] == first_operand_len:
            # We can reshape a group of columns into the same
            # number of columns, nothing has changed.
            pass
        else:
            raise ValueError(f"Reshape shape={shape} not supported")

        # At this point we should have a single column containing the
        # result of the whole expression, so there should really be nothing to reshape.
        self.set_output(first_operand)
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/reshape.py
def process(self) -> None:
    """Performs the translation and set the output variable."""

    # https://onnx.ai/onnx/operators/onnx__Reshape.html
    first_operand = self._variables.consume(self.inputs[0])
    if isinstance(first_operand, dict):
        first_operand_len = len(first_operand)
    else:
        first_operand_len = 1

    shape = self._variables.get_initializer_value(self.inputs[1])
    if not isinstance(shape, list) or not isinstance(shape[0], int):
        # Reshape explicitly requires ints.
        raise NotImplementedError("Reshape: requires integer values for the shape.")

    if shape[0] != -1:
        # We don't support changing the numer of rows
        raise NotImplementedError("Reshape can't change the number of rows")

    if len(shape) == 1 and first_operand_len == 1:
        # We can reshape a single column to a single column
        # nothing has changed.
        pass
    elif len(shape) == 2 and shape[1] == first_operand_len:
        # We can reshape a group of columns into the same
        # number of columns, nothing has changed.
        pass
    else:
        raise ValueError(f"Reshape shape={shape} not supported")

    # At this point we should have a single column containing the
    # result of the whole expression, so there should really be nothing to reshape.
    self.set_output(first_operand)

orbitalml.translation.steps.scaler

Implementation of the Scaler operator.

ScalerTranslator

Bases: Translator

Processes a Scaler node and updates variables with the scaled expression.

The Scaler operator applies a scaling and offset to the input: Y = (X - offset) * scale

The scaler operation is not always used, for more complex pipelines usually a combination of Sub and Mul operations is used.

Source code in orbitalml/translation/steps/scaler.py
class ScalerTranslator(Translator):
    """Processes a Scaler node and updates variables with the scaled expression.

    The Scaler operator applies a scaling and offset to the input:
    Y = (X - offset) * scale

    The scaler operation is not always used, for more complex pipelines
    usually a combination of Sub and Mul operations is used.
    """

    def process(self) -> None:
        """Performs the translation and sets the output variable."""
        # https://onnx.ai/onnx/operators/onnx_aionnxml_Scaler.html
        scale = typing.cast(list[float], self._attributes["scale"])
        offset = typing.cast(list[float], self._attributes["offset"])

        if len(self._inputs) != 1:
            raise ValueError("Scaler node must have exactly 1 input.")

        input_operand = self._variables.consume(self._inputs[0])

        type_check_var = input_operand
        if isinstance(type_check_var, dict):
            type_check_var = next(iter(type_check_var.values()), None)

        if not isinstance(type_check_var, ibis.expr.types.NumericValue):
            raise ValueError("Scaler: The input operand must be numeric.")

        if isinstance(input_operand, VariablesGroup):
            input_operand = NumericVariablesGroup(input_operand)

            # If the attributes are len=1,
            # it means to apply the same value to all inputs.
            num_fields = len(input_operand)
            if len(offset) == 1:
                offset = offset * num_fields
            if len(scale) == 1:
                scale = scale * num_fields

            if len(offset) != num_fields or len(scale) != num_fields:
                raise ValueError(
                    "Scaler: offset and scale lists must match the number of input fields."
                )

            self.set_output(
                ValueVariablesGroup(
                    {
                        field: self._optimizer.fold_operation(
                            (val - offset[i]) * scale[i]
                        )
                        for i, (field, val) in enumerate(input_operand.items())
                    }
                )
            )
        else:
            input_operand = typing.cast(ibis.expr.types.NumericValue, input_operand)
            self.set_output(
                self._optimizer.fold_operation((input_operand - offset[0]) * scale[0])
            )
process
process() -> None

Performs the translation and sets the output variable.

Source code in orbitalml/translation/steps/scaler.py
def process(self) -> None:
    """Performs the translation and sets the output variable."""
    # https://onnx.ai/onnx/operators/onnx_aionnxml_Scaler.html
    scale = typing.cast(list[float], self._attributes["scale"])
    offset = typing.cast(list[float], self._attributes["offset"])

    if len(self._inputs) != 1:
        raise ValueError("Scaler node must have exactly 1 input.")

    input_operand = self._variables.consume(self._inputs[0])

    type_check_var = input_operand
    if isinstance(type_check_var, dict):
        type_check_var = next(iter(type_check_var.values()), None)

    if not isinstance(type_check_var, ibis.expr.types.NumericValue):
        raise ValueError("Scaler: The input operand must be numeric.")

    if isinstance(input_operand, VariablesGroup):
        input_operand = NumericVariablesGroup(input_operand)

        # If the attributes are len=1,
        # it means to apply the same value to all inputs.
        num_fields = len(input_operand)
        if len(offset) == 1:
            offset = offset * num_fields
        if len(scale) == 1:
            scale = scale * num_fields

        if len(offset) != num_fields or len(scale) != num_fields:
            raise ValueError(
                "Scaler: offset and scale lists must match the number of input fields."
            )

        self.set_output(
            ValueVariablesGroup(
                {
                    field: self._optimizer.fold_operation(
                        (val - offset[i]) * scale[i]
                    )
                    for i, (field, val) in enumerate(input_operand.items())
                }
            )
        )
    else:
        input_operand = typing.cast(ibis.expr.types.NumericValue, input_operand)
        self.set_output(
            self._optimizer.fold_operation((input_operand - offset[0]) * scale[0])
        )

orbitalml.translation.steps.softmax

Implementation of the Softmax operator.

SoftmaxTranslator

Bases: Translator

Processes a Softmax node and updates the variables with the output expression.

The operation computes the normalized exponential of the input::

Softmax = Exp(input) / Sum(Exp(input))

Currently the Softmax operation is supported only for axis=-1 or axis=1, which means for the a column group means that the softmax is computed independently for each column in the group.

Source code in orbitalml/translation/steps/softmax.py
class SoftmaxTranslator(Translator):
    """Processes a Softmax node and updates the variables with the output expression.

    The operation computes the normalized exponential of the input::

        Softmax = Exp(input) / Sum(Exp(input))

    Currently the Softmax operation is supported only for axis=-1 or axis=1,
    which means for the a column group means that the softmax is computed
    independently for each column in the group.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx__Softmax.html
        data = self._variables.consume(self.inputs[0])
        if not isinstance(data, (ibis.expr.types.NumericValue, dict)):
            raise ValueError(
                "Softmax: The first operand must be a numeric column or a column group of numerics."
            )

        axis = self._attributes.get("axis", -1)
        if axis not in (-1, 1):
            raise ValueError(
                "SoftmaxTranslator supports only axis=-1 or axis=1 for group of columns"
            )

        if isinstance(data, VariablesGroup):
            data = NumericVariablesGroup(data)
        else:
            data = typing.cast(
                ibis.expr.types.NumericValue, ibis.expr.types.NumericValue
            )
        self.set_output(self.compute_softmax(self, data))

    @classmethod
    def compute_softmax(
        cls,
        translator: Translator,
        data: ibis.expr.types.NumericValue | VariablesGroup,
    ) -> ibis.Expr | VariablesGroup:
        """Computes the actual softmax operation over a column or column group."""
        if isinstance(data, VariablesGroup):
            data = NumericVariablesGroup(data)
            max_value = ibis.greatest(*data.values()).name(
                translator.variable_unique_short_alias("sfmx")
            )
            translator.preserve(max_value)

            # Compute, for each column, the exponent
            exp_dict = {k: (v - max_value).exp() for k, v in data.items()}

            # Sum all column exponents
            sum_exp = sum(exp_dict.values())

            # Multi columns case: softmax = exp(column_exp) / (exponents_sum)
            return ValueVariablesGroup({k: exp_dict[k] / sum_exp for k in data.keys()})
        elif isinstance(data, ibis.Expr):
            # Single column case: softmax(x) = exp(x) / exp(x) = 1
            return ibis.literal(1.0)
        else:
            raise TypeError(
                f"Softmax: expected a column group or a single column. Got {type(data)}"
            )
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/softmax.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx__Softmax.html
    data = self._variables.consume(self.inputs[0])
    if not isinstance(data, (ibis.expr.types.NumericValue, dict)):
        raise ValueError(
            "Softmax: The first operand must be a numeric column or a column group of numerics."
        )

    axis = self._attributes.get("axis", -1)
    if axis not in (-1, 1):
        raise ValueError(
            "SoftmaxTranslator supports only axis=-1 or axis=1 for group of columns"
        )

    if isinstance(data, VariablesGroup):
        data = NumericVariablesGroup(data)
    else:
        data = typing.cast(
            ibis.expr.types.NumericValue, ibis.expr.types.NumericValue
        )
    self.set_output(self.compute_softmax(self, data))
compute_softmax classmethod
compute_softmax(
    translator: Translator,
    data: NumericValue | VariablesGroup,
) -> Expr | VariablesGroup

Computes the actual softmax operation over a column or column group.

Source code in orbitalml/translation/steps/softmax.py
@classmethod
def compute_softmax(
    cls,
    translator: Translator,
    data: ibis.expr.types.NumericValue | VariablesGroup,
) -> ibis.Expr | VariablesGroup:
    """Computes the actual softmax operation over a column or column group."""
    if isinstance(data, VariablesGroup):
        data = NumericVariablesGroup(data)
        max_value = ibis.greatest(*data.values()).name(
            translator.variable_unique_short_alias("sfmx")
        )
        translator.preserve(max_value)

        # Compute, for each column, the exponent
        exp_dict = {k: (v - max_value).exp() for k, v in data.items()}

        # Sum all column exponents
        sum_exp = sum(exp_dict.values())

        # Multi columns case: softmax = exp(column_exp) / (exponents_sum)
        return ValueVariablesGroup({k: exp_dict[k] / sum_exp for k in data.keys()})
    elif isinstance(data, ibis.Expr):
        # Single column case: softmax(x) = exp(x) / exp(x) = 1
        return ibis.literal(1.0)
    else:
        raise TypeError(
            f"Softmax: expected a column group or a single column. Got {type(data)}"
        )

orbitalml.translation.steps.sub

Implementation of the Sub operator.

SubTranslator

Bases: Translator

Processes a Sub node and updates the variables with the output expression.

Given the node to translate, the variables and constants available for the translation context, generates a query expression that processes the input variables and produces a new output variable that computes based on the Sub operation.

Source code in orbitalml/translation/steps/sub.py
class SubTranslator(Translator):
    """Processes a Sub node and updates the variables with the output expression.

    Given the node to translate, the variables and constants available for
    the translation context, generates a query expression that processes
    the input variables and produces a new output variable that computes
    based on the Sub operation.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx__Sub.html
        assert len(self._inputs) == 2, "The Sub node must have exactly 2 inputs."

        first_operand = self._variables.consume(self._inputs[0])
        second_operand = self._variables.get_initializer_value(self._inputs[1])
        if second_operand is None or not isinstance(second_operand, (list, tuple)):
            raise NotImplementedError(
                "Sub: Second input (divisor) must be a constant list."
            )

        type_check_var = first_operand
        if isinstance(type_check_var, dict):
            type_check_var = next(iter(type_check_var.values()), None)
        if not isinstance(type_check_var, ibis.expr.types.NumericValue):
            raise ValueError("Sub: The first operand must be a numeric value.")

        sub_values = list(second_operand)
        if isinstance(first_operand, VariablesGroup):
            first_operand = NumericVariablesGroup(first_operand)
            struct_fields = list(first_operand.keys())
            assert len(sub_values) == len(struct_fields), (
                f"The number of values in the initializer ({len(sub_values)}) must match the number of fields ({len(struct_fields)}"
            )
            self.set_output(
                ValueVariablesGroup(
                    {
                        field: (
                            self._optimizer.fold_operation(
                                first_operand[field] - sub_values[i]
                            )
                        )
                        for i, field in enumerate(struct_fields)
                    }
                )
            )
        else:
            if len(sub_values) != 1:
                raise ValueError(
                    "When the first operand is a single column, the second operand must contain exactly 1 value"
                )
            first_operand = typing.cast(ibis.expr.types.NumericValue, first_operand)
            self.set_output(
                self._optimizer.fold_operation(first_operand - sub_values[0])
            )
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/sub.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx__Sub.html
    assert len(self._inputs) == 2, "The Sub node must have exactly 2 inputs."

    first_operand = self._variables.consume(self._inputs[0])
    second_operand = self._variables.get_initializer_value(self._inputs[1])
    if second_operand is None or not isinstance(second_operand, (list, tuple)):
        raise NotImplementedError(
            "Sub: Second input (divisor) must be a constant list."
        )

    type_check_var = first_operand
    if isinstance(type_check_var, dict):
        type_check_var = next(iter(type_check_var.values()), None)
    if not isinstance(type_check_var, ibis.expr.types.NumericValue):
        raise ValueError("Sub: The first operand must be a numeric value.")

    sub_values = list(second_operand)
    if isinstance(first_operand, VariablesGroup):
        first_operand = NumericVariablesGroup(first_operand)
        struct_fields = list(first_operand.keys())
        assert len(sub_values) == len(struct_fields), (
            f"The number of values in the initializer ({len(sub_values)}) must match the number of fields ({len(struct_fields)}"
        )
        self.set_output(
            ValueVariablesGroup(
                {
                    field: (
                        self._optimizer.fold_operation(
                            first_operand[field] - sub_values[i]
                        )
                    )
                    for i, field in enumerate(struct_fields)
                }
            )
        )
    else:
        if len(sub_values) != 1:
            raise ValueError(
                "When the first operand is a single column, the second operand must contain exactly 1 value"
            )
        first_operand = typing.cast(ibis.expr.types.NumericValue, first_operand)
        self.set_output(
            self._optimizer.fold_operation(first_operand - sub_values[0])
        )

orbitalml.translation.steps.trees

Translators for trees based models.

TreeEnsembleClassifierTranslator

Bases: Translator

Processes a TreeEnsembleClassifier node and updates the variables with the output expression.

This node is foundational for most tree based models: - Random Forest - Gradient Boosted Trees - Decision Trees

The parsing of the tree is done by the :func:build_tree function, which results in a dictionary of trees.

The class parses the trees to generate a set of CASE WHEN THEN ELSE expressions that are used to compute the votes for each class.

The class also computes the probability of each class by dividing the votes by the sum of all votes.

Source code in orbitalml/translation/steps/trees/classifier.py
class TreeEnsembleClassifierTranslator(Translator):
    """Processes a TreeEnsembleClassifier node and updates the variables with the output expression.

    This node is foundational for most tree based models:
    - Random Forest
    - Gradient Boosted Trees
    - Decision Trees

    The parsing of the tree is done by the :func:`build_tree` function,
    which results in a dictionary of trees.

    The class parses the trees to generate a set of `CASE WHEN THEN ELSE`
    expressions that are used to compute the votes for each class.

    The class also computes the probability of each class by dividing
    the votes by the sum of all votes.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx_aionnxml_TreeEnsembleClassifier.html
        # This is deprecated in ONNX but it's what skl2onnx uses.

        input_exr = self._variables.consume(self.inputs[0])
        if not isinstance(input_exr, (ibis.Expr, VariablesGroup)):
            raise ValueError(
                "TreeEnsembleClassifier: The first operand must be a column or a column group."
            )

        label_expr, prob_colgroup = self.build_classifier(input_exr)
        post_transform = typing.cast(
            str, self._attributes.get("post_transform", "NONE")
        )

        if post_transform != "NONE":
            if post_transform == "SOFTMAX":
                prob_colgroup = SoftmaxTranslator.compute_softmax(self, prob_colgroup)
            elif post_transform == "LOGISTIC":
                prob_colgroup = ValueVariablesGroup(
                    {
                        lbl: LinearClassifierTranslator._apply_post_transform(
                            prob_col, post_transform
                        )
                        for lbl, prob_col in prob_colgroup.items()
                    }
                )
            else:
                raise NotImplementedError(
                    f"Post transform {post_transform} not implemented."
                )

        self._variables[self.outputs[0]] = label_expr
        self._variables[self.outputs[1]] = prob_colgroup

    def build_classifier(
        self, input_expr: ibis.Expr | VariablesGroup
    ) -> tuple[ibis.Expr, VariablesGroup]:
        """Build the classification expression and the probabilities expressions

        Return the classification expression as the first argument and a group of
        variables (one for each category) for the probability expressions.
        """
        optimizer = self._optimizer
        ensemble_trees = build_tree(self)

        classlabels = self._attributes.get(
            "classlabels_strings"
        ) or self._attributes.get("classlabels_int64s")
        if classlabels is None:
            raise ValueError("Unable to detect classlabels for classification")
        output_classlabels = classlabels = typing.cast(
            list[str] | list[int], classlabels
        )

        # ONNX treats binary classification as a special case:
        # https://github.com/microsoft/onnxruntime/blob/5982430af66f52a288cb8b2181e0b5b2e09118c8/onnxruntime/core/providers/cpu/ml/tree_ensemble_common.h#L854C1-L871C4
        # https://github.com/microsoft/onnxruntime/blob/5982430af66f52a288cb8b2181e0b5b2e09118c8/onnxruntime/core/providers/cpu/ml/tree_ensemble_aggregator.h#L469-L494
        # In this case there is only one weight and it's the probability of the positive class.
        # So we need to check if we are in a binary classification case.
        weights_classid = typing.cast(list[int], self._attributes["class_ids"])
        is_binary = len(classlabels) == 2 and len(set(weights_classid)) == 1
        if is_binary:
            # In this case there is only one label, the first one
            # which actually acts as the score of the prediction.
            # When > 0.5 then class 1, when < 0.5 then class 0
            classlabels = typing.cast(list[str] | list[int], [classlabels[0]])

        if isinstance(input_expr, VariablesGroup):
            ordered_features = input_expr.values_value()
        else:
            ordered_features = typing.cast(list[ibis.Value], [input_expr])
        ordered_features = [
            feature.name(self.variable_unique_short_alias("tcl"))
            for feature in ordered_features
        ]
        ordered_features = self.preserve(*ordered_features)

        def build_tree_case(node: dict) -> dict[str | int, ibis.Expr]:
            # Leaf node, return the votes
            if node["mode"] == "LEAF":
                # We can assume missing class = weight 0
                # The optimizer will remove this if both true and false have 0.
                return {
                    clslabel: ibis.literal(node["weight"].get(clslabel, 0.0))
                    for clslabel in classlabels
                }

            # Branch node, build a CASE statement
            feature_expr = ordered_features[node["feature_id"]]
            condition = mode_to_condition(node, feature_expr)

            true_votes = build_tree_case(node["true"])
            false_votes = build_tree_case(node["false"])

            votes = {}
            for clslabel in classlabels:
                t_val = true_votes[clslabel]
                f_val = false_votes[clslabel]
                votes[clslabel] = optimizer.fold_case(
                    ibis.case().when(condition, t_val).else_(f_val).end()
                )
            return votes

        # Genera voti per ogni albero
        tree_votes = []
        for tree in ensemble_trees.values():
            tree_votes.append(build_tree_case(tree))

        # Aggregate votes from all trees.
        total_votes = {}
        for clslabel in classlabels:
            total_votes[clslabel] = ibis.literal(0.0)
            for votes in tree_votes:
                total_votes[clslabel] = optimizer.fold_operation(
                    total_votes[clslabel] + votes.get(clslabel, ibis.literal(0.0))
                )

        # Compute prediction of class itself.
        if is_binary:
            total_score = total_votes[classlabels[0]]
            label_expr = optimizer.fold_case(
                ibis.case()
                .when(total_score > 0.5, output_classlabels[1])
                .else_(output_classlabels[0])
                .end()
            )
            # The order matters, for ONNX the VariableGroup is a list of subvariables
            # the names are not important.
            prob_dict = ValueVariablesGroup(
                {
                    str(output_classlabels[0]): 1.0 - total_score,
                    str(output_classlabels[1]): total_score,
                }
            )
        else:
            candidate_cls = classlabels[0]
            candidate_vote = total_votes[candidate_cls]
            for clslabel in classlabels[1:]:
                candidate_cls = optimizer.fold_case(
                    ibis.case()
                    .when(total_votes[clslabel] > candidate_vote, clslabel)
                    .else_(candidate_cls)
                    .end()
                )
                candidate_vote = optimizer.fold_case(
                    ibis.case()
                    .when(total_votes[clslabel] > candidate_vote, total_votes[clslabel])
                    .else_(candidate_vote)
                    .end()
                )

            label_expr = ibis.case()
            for clslabel in classlabels:
                label_expr = label_expr.when(candidate_cls == clslabel, clslabel)
            label_expr = label_expr.else_(ibis.null()).end()
            label_expr = optimizer.fold_case(label_expr)

            post_transform = typing.cast(
                str, self._attributes.get("post_transform", "NONE")
            )
            if post_transform == "SOFTMAX":
                # Use softmax as an hint that we are doing a gradient boosted tree,
                # thus the probability is the same as the score and should not be normalized
                prob_dict = ValueVariablesGroup(
                    {str(clslabel): total_votes[clslabel] for clslabel in classlabels}
                )
            else:
                # Compute probability to return it too.
                sum_votes = sum(total_votes[clslabel] for clslabel in classlabels)
                prob_dict = ValueVariablesGroup(
                    {
                        str(clslabel): total_votes[clslabel] / sum_votes
                        for clslabel in classlabels
                    }
                )

        return label_expr, prob_dict
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/trees/classifier.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx_aionnxml_TreeEnsembleClassifier.html
    # This is deprecated in ONNX but it's what skl2onnx uses.

    input_exr = self._variables.consume(self.inputs[0])
    if not isinstance(input_exr, (ibis.Expr, VariablesGroup)):
        raise ValueError(
            "TreeEnsembleClassifier: The first operand must be a column or a column group."
        )

    label_expr, prob_colgroup = self.build_classifier(input_exr)
    post_transform = typing.cast(
        str, self._attributes.get("post_transform", "NONE")
    )

    if post_transform != "NONE":
        if post_transform == "SOFTMAX":
            prob_colgroup = SoftmaxTranslator.compute_softmax(self, prob_colgroup)
        elif post_transform == "LOGISTIC":
            prob_colgroup = ValueVariablesGroup(
                {
                    lbl: LinearClassifierTranslator._apply_post_transform(
                        prob_col, post_transform
                    )
                    for lbl, prob_col in prob_colgroup.items()
                }
            )
        else:
            raise NotImplementedError(
                f"Post transform {post_transform} not implemented."
            )

    self._variables[self.outputs[0]] = label_expr
    self._variables[self.outputs[1]] = prob_colgroup
build_classifier
build_classifier(
    input_expr: Expr | VariablesGroup,
) -> tuple[Expr, VariablesGroup]

Build the classification expression and the probabilities expressions

Return the classification expression as the first argument and a group of variables (one for each category) for the probability expressions.

Source code in orbitalml/translation/steps/trees/classifier.py
def build_classifier(
    self, input_expr: ibis.Expr | VariablesGroup
) -> tuple[ibis.Expr, VariablesGroup]:
    """Build the classification expression and the probabilities expressions

    Return the classification expression as the first argument and a group of
    variables (one for each category) for the probability expressions.
    """
    optimizer = self._optimizer
    ensemble_trees = build_tree(self)

    classlabels = self._attributes.get(
        "classlabels_strings"
    ) or self._attributes.get("classlabels_int64s")
    if classlabels is None:
        raise ValueError("Unable to detect classlabels for classification")
    output_classlabels = classlabels = typing.cast(
        list[str] | list[int], classlabels
    )

    # ONNX treats binary classification as a special case:
    # https://github.com/microsoft/onnxruntime/blob/5982430af66f52a288cb8b2181e0b5b2e09118c8/onnxruntime/core/providers/cpu/ml/tree_ensemble_common.h#L854C1-L871C4
    # https://github.com/microsoft/onnxruntime/blob/5982430af66f52a288cb8b2181e0b5b2e09118c8/onnxruntime/core/providers/cpu/ml/tree_ensemble_aggregator.h#L469-L494
    # In this case there is only one weight and it's the probability of the positive class.
    # So we need to check if we are in a binary classification case.
    weights_classid = typing.cast(list[int], self._attributes["class_ids"])
    is_binary = len(classlabels) == 2 and len(set(weights_classid)) == 1
    if is_binary:
        # In this case there is only one label, the first one
        # which actually acts as the score of the prediction.
        # When > 0.5 then class 1, when < 0.5 then class 0
        classlabels = typing.cast(list[str] | list[int], [classlabels[0]])

    if isinstance(input_expr, VariablesGroup):
        ordered_features = input_expr.values_value()
    else:
        ordered_features = typing.cast(list[ibis.Value], [input_expr])
    ordered_features = [
        feature.name(self.variable_unique_short_alias("tcl"))
        for feature in ordered_features
    ]
    ordered_features = self.preserve(*ordered_features)

    def build_tree_case(node: dict) -> dict[str | int, ibis.Expr]:
        # Leaf node, return the votes
        if node["mode"] == "LEAF":
            # We can assume missing class = weight 0
            # The optimizer will remove this if both true and false have 0.
            return {
                clslabel: ibis.literal(node["weight"].get(clslabel, 0.0))
                for clslabel in classlabels
            }

        # Branch node, build a CASE statement
        feature_expr = ordered_features[node["feature_id"]]
        condition = mode_to_condition(node, feature_expr)

        true_votes = build_tree_case(node["true"])
        false_votes = build_tree_case(node["false"])

        votes = {}
        for clslabel in classlabels:
            t_val = true_votes[clslabel]
            f_val = false_votes[clslabel]
            votes[clslabel] = optimizer.fold_case(
                ibis.case().when(condition, t_val).else_(f_val).end()
            )
        return votes

    # Genera voti per ogni albero
    tree_votes = []
    for tree in ensemble_trees.values():
        tree_votes.append(build_tree_case(tree))

    # Aggregate votes from all trees.
    total_votes = {}
    for clslabel in classlabels:
        total_votes[clslabel] = ibis.literal(0.0)
        for votes in tree_votes:
            total_votes[clslabel] = optimizer.fold_operation(
                total_votes[clslabel] + votes.get(clslabel, ibis.literal(0.0))
            )

    # Compute prediction of class itself.
    if is_binary:
        total_score = total_votes[classlabels[0]]
        label_expr = optimizer.fold_case(
            ibis.case()
            .when(total_score > 0.5, output_classlabels[1])
            .else_(output_classlabels[0])
            .end()
        )
        # The order matters, for ONNX the VariableGroup is a list of subvariables
        # the names are not important.
        prob_dict = ValueVariablesGroup(
            {
                str(output_classlabels[0]): 1.0 - total_score,
                str(output_classlabels[1]): total_score,
            }
        )
    else:
        candidate_cls = classlabels[0]
        candidate_vote = total_votes[candidate_cls]
        for clslabel in classlabels[1:]:
            candidate_cls = optimizer.fold_case(
                ibis.case()
                .when(total_votes[clslabel] > candidate_vote, clslabel)
                .else_(candidate_cls)
                .end()
            )
            candidate_vote = optimizer.fold_case(
                ibis.case()
                .when(total_votes[clslabel] > candidate_vote, total_votes[clslabel])
                .else_(candidate_vote)
                .end()
            )

        label_expr = ibis.case()
        for clslabel in classlabels:
            label_expr = label_expr.when(candidate_cls == clslabel, clslabel)
        label_expr = label_expr.else_(ibis.null()).end()
        label_expr = optimizer.fold_case(label_expr)

        post_transform = typing.cast(
            str, self._attributes.get("post_transform", "NONE")
        )
        if post_transform == "SOFTMAX":
            # Use softmax as an hint that we are doing a gradient boosted tree,
            # thus the probability is the same as the score and should not be normalized
            prob_dict = ValueVariablesGroup(
                {str(clslabel): total_votes[clslabel] for clslabel in classlabels}
            )
        else:
            # Compute probability to return it too.
            sum_votes = sum(total_votes[clslabel] for clslabel in classlabels)
            prob_dict = ValueVariablesGroup(
                {
                    str(clslabel): total_votes[clslabel] / sum_votes
                    for clslabel in classlabels
                }
            )

    return label_expr, prob_dict

TreeEnsembleRegressorTranslator

Bases: Translator

Processes a TreeEnsembleClassifier node and updates the variables with the output expression.

This node is foundational for most tree based models: - Gradient Boosted Trees - Decision Trees

The parsing of the tree is done by the :func:build_tree function, which results in a dictionary of trees.

The class parses the trees to generate a set of CASE WHEN THEN ELSE expressions that are used to compute the prediction for each tree.

Source code in orbitalml/translation/steps/trees/regressor.py
class TreeEnsembleRegressorTranslator(Translator):
    """Processes a TreeEnsembleClassifier node and updates the variables with the output expression.

    This node is foundational for most tree based models:
    - Gradient Boosted Trees
    - Decision Trees

    The parsing of the tree is done by the :func:`build_tree` function,
    which results in a dictionary of trees.

    The class parses the trees to generate a set of `CASE WHEN THEN ELSE`
    expressions that are used to compute the prediction for each tree.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx_aionnxml_TreeEnsembleRegressor.html
        # This is deprecated in ONNX but it's what skl2onnx uses.
        input_exr = self._variables.consume(self.inputs[0])
        if not isinstance(input_exr, (ibis.Expr, VariablesGroup)):
            raise ValueError(
                "TreeEnsembleRegressor: The first operand must be a column or a column group."
            )

        prediction_expr = self.build_regressor(input_exr)
        self.set_output(prediction_expr)

    def build_regressor(self, input_expr: VariablesGroup | ibis.Expr) -> ibis.Expr:
        """Build the regression expression"""
        optimizer = self._optimizer
        ensemble_trees = build_tree(self)

        if isinstance(input_expr, VariablesGroup):
            ordered_features = input_expr.values_value()
        else:
            ordered_features = typing.cast(list[ibis.Value], [input_expr])
        ordered_features = [
            feature.name(self.variable_unique_short_alias("tcl"))
            for feature in ordered_features
        ]
        ordered_features = self.preserve(*ordered_features)

        def build_tree_value(node: dict) -> ibis.Expr:
            # Leaf node, should return the prediction weight
            if node["mode"] == "LEAF":
                return ibis.literal(node["weight"])

            # BRANCH node, should return a CASE statement
            feature_expr = ordered_features[node["feature_id"]]
            condition = mode_to_condition(node, feature_expr)

            if node["missing_tracks_true"]:
                raise NotImplementedError("Missing value tracks true not supported")

            true_val = build_tree_value(node["true"])
            false_val = build_tree_value(node["false"])
            case_expr = optimizer.fold_case(
                ibis.case().when(condition, true_val).else_(false_val).end()
            )
            return case_expr

        # Build results from each tree and sum them
        tree_values = []
        for tree in ensemble_trees.values():
            tree_values.append(build_tree_value(tree))
        total_value: ibis.NumericValue = ibis.literal(0.0)
        for val in tree_values:
            total_value = optimizer.fold_operation(total_value + val)

        # According to ONNX doc: can be left unassigned (assumed 0)
        base_values = typing.cast(
            list[float], self._attributes.get("base_values", [0.0])
        )
        if len(base_values) != 1:
            raise NotImplementedError("Base values with length != 1 not supported")
        total_value = optimizer.fold_operation(
            total_value + ibis.literal(base_values[0])
        )

        return total_value
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/trees/regressor.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx_aionnxml_TreeEnsembleRegressor.html
    # This is deprecated in ONNX but it's what skl2onnx uses.
    input_exr = self._variables.consume(self.inputs[0])
    if not isinstance(input_exr, (ibis.Expr, VariablesGroup)):
        raise ValueError(
            "TreeEnsembleRegressor: The first operand must be a column or a column group."
        )

    prediction_expr = self.build_regressor(input_exr)
    self.set_output(prediction_expr)
build_regressor
build_regressor(input_expr: VariablesGroup | Expr) -> Expr

Build the regression expression

Source code in orbitalml/translation/steps/trees/regressor.py
def build_regressor(self, input_expr: VariablesGroup | ibis.Expr) -> ibis.Expr:
    """Build the regression expression"""
    optimizer = self._optimizer
    ensemble_trees = build_tree(self)

    if isinstance(input_expr, VariablesGroup):
        ordered_features = input_expr.values_value()
    else:
        ordered_features = typing.cast(list[ibis.Value], [input_expr])
    ordered_features = [
        feature.name(self.variable_unique_short_alias("tcl"))
        for feature in ordered_features
    ]
    ordered_features = self.preserve(*ordered_features)

    def build_tree_value(node: dict) -> ibis.Expr:
        # Leaf node, should return the prediction weight
        if node["mode"] == "LEAF":
            return ibis.literal(node["weight"])

        # BRANCH node, should return a CASE statement
        feature_expr = ordered_features[node["feature_id"]]
        condition = mode_to_condition(node, feature_expr)

        if node["missing_tracks_true"]:
            raise NotImplementedError("Missing value tracks true not supported")

        true_val = build_tree_value(node["true"])
        false_val = build_tree_value(node["false"])
        case_expr = optimizer.fold_case(
            ibis.case().when(condition, true_val).else_(false_val).end()
        )
        return case_expr

    # Build results from each tree and sum them
    tree_values = []
    for tree in ensemble_trees.values():
        tree_values.append(build_tree_value(tree))
    total_value: ibis.NumericValue = ibis.literal(0.0)
    for val in tree_values:
        total_value = optimizer.fold_operation(total_value + val)

    # According to ONNX doc: can be left unassigned (assumed 0)
    base_values = typing.cast(
        list[float], self._attributes.get("base_values", [0.0])
    )
    if len(base_values) != 1:
        raise NotImplementedError("Base values with length != 1 not supported")
    total_value = optimizer.fold_operation(
        total_value + ibis.literal(base_values[0])
    )

    return total_value

classifier

Implement classification based on trees

TreeEnsembleClassifierTranslator

Bases: Translator

Processes a TreeEnsembleClassifier node and updates the variables with the output expression.

This node is foundational for most tree based models: - Random Forest - Gradient Boosted Trees - Decision Trees

The parsing of the tree is done by the :func:build_tree function, which results in a dictionary of trees.

The class parses the trees to generate a set of CASE WHEN THEN ELSE expressions that are used to compute the votes for each class.

The class also computes the probability of each class by dividing the votes by the sum of all votes.

Source code in orbitalml/translation/steps/trees/classifier.py
class TreeEnsembleClassifierTranslator(Translator):
    """Processes a TreeEnsembleClassifier node and updates the variables with the output expression.

    This node is foundational for most tree based models:
    - Random Forest
    - Gradient Boosted Trees
    - Decision Trees

    The parsing of the tree is done by the :func:`build_tree` function,
    which results in a dictionary of trees.

    The class parses the trees to generate a set of `CASE WHEN THEN ELSE`
    expressions that are used to compute the votes for each class.

    The class also computes the probability of each class by dividing
    the votes by the sum of all votes.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx_aionnxml_TreeEnsembleClassifier.html
        # This is deprecated in ONNX but it's what skl2onnx uses.

        input_exr = self._variables.consume(self.inputs[0])
        if not isinstance(input_exr, (ibis.Expr, VariablesGroup)):
            raise ValueError(
                "TreeEnsembleClassifier: The first operand must be a column or a column group."
            )

        label_expr, prob_colgroup = self.build_classifier(input_exr)
        post_transform = typing.cast(
            str, self._attributes.get("post_transform", "NONE")
        )

        if post_transform != "NONE":
            if post_transform == "SOFTMAX":
                prob_colgroup = SoftmaxTranslator.compute_softmax(self, prob_colgroup)
            elif post_transform == "LOGISTIC":
                prob_colgroup = ValueVariablesGroup(
                    {
                        lbl: LinearClassifierTranslator._apply_post_transform(
                            prob_col, post_transform
                        )
                        for lbl, prob_col in prob_colgroup.items()
                    }
                )
            else:
                raise NotImplementedError(
                    f"Post transform {post_transform} not implemented."
                )

        self._variables[self.outputs[0]] = label_expr
        self._variables[self.outputs[1]] = prob_colgroup

    def build_classifier(
        self, input_expr: ibis.Expr | VariablesGroup
    ) -> tuple[ibis.Expr, VariablesGroup]:
        """Build the classification expression and the probabilities expressions

        Return the classification expression as the first argument and a group of
        variables (one for each category) for the probability expressions.
        """
        optimizer = self._optimizer
        ensemble_trees = build_tree(self)

        classlabels = self._attributes.get(
            "classlabels_strings"
        ) or self._attributes.get("classlabels_int64s")
        if classlabels is None:
            raise ValueError("Unable to detect classlabels for classification")
        output_classlabels = classlabels = typing.cast(
            list[str] | list[int], classlabels
        )

        # ONNX treats binary classification as a special case:
        # https://github.com/microsoft/onnxruntime/blob/5982430af66f52a288cb8b2181e0b5b2e09118c8/onnxruntime/core/providers/cpu/ml/tree_ensemble_common.h#L854C1-L871C4
        # https://github.com/microsoft/onnxruntime/blob/5982430af66f52a288cb8b2181e0b5b2e09118c8/onnxruntime/core/providers/cpu/ml/tree_ensemble_aggregator.h#L469-L494
        # In this case there is only one weight and it's the probability of the positive class.
        # So we need to check if we are in a binary classification case.
        weights_classid = typing.cast(list[int], self._attributes["class_ids"])
        is_binary = len(classlabels) == 2 and len(set(weights_classid)) == 1
        if is_binary:
            # In this case there is only one label, the first one
            # which actually acts as the score of the prediction.
            # When > 0.5 then class 1, when < 0.5 then class 0
            classlabels = typing.cast(list[str] | list[int], [classlabels[0]])

        if isinstance(input_expr, VariablesGroup):
            ordered_features = input_expr.values_value()
        else:
            ordered_features = typing.cast(list[ibis.Value], [input_expr])
        ordered_features = [
            feature.name(self.variable_unique_short_alias("tcl"))
            for feature in ordered_features
        ]
        ordered_features = self.preserve(*ordered_features)

        def build_tree_case(node: dict) -> dict[str | int, ibis.Expr]:
            # Leaf node, return the votes
            if node["mode"] == "LEAF":
                # We can assume missing class = weight 0
                # The optimizer will remove this if both true and false have 0.
                return {
                    clslabel: ibis.literal(node["weight"].get(clslabel, 0.0))
                    for clslabel in classlabels
                }

            # Branch node, build a CASE statement
            feature_expr = ordered_features[node["feature_id"]]
            condition = mode_to_condition(node, feature_expr)

            true_votes = build_tree_case(node["true"])
            false_votes = build_tree_case(node["false"])

            votes = {}
            for clslabel in classlabels:
                t_val = true_votes[clslabel]
                f_val = false_votes[clslabel]
                votes[clslabel] = optimizer.fold_case(
                    ibis.case().when(condition, t_val).else_(f_val).end()
                )
            return votes

        # Genera voti per ogni albero
        tree_votes = []
        for tree in ensemble_trees.values():
            tree_votes.append(build_tree_case(tree))

        # Aggregate votes from all trees.
        total_votes = {}
        for clslabel in classlabels:
            total_votes[clslabel] = ibis.literal(0.0)
            for votes in tree_votes:
                total_votes[clslabel] = optimizer.fold_operation(
                    total_votes[clslabel] + votes.get(clslabel, ibis.literal(0.0))
                )

        # Compute prediction of class itself.
        if is_binary:
            total_score = total_votes[classlabels[0]]
            label_expr = optimizer.fold_case(
                ibis.case()
                .when(total_score > 0.5, output_classlabels[1])
                .else_(output_classlabels[0])
                .end()
            )
            # The order matters, for ONNX the VariableGroup is a list of subvariables
            # the names are not important.
            prob_dict = ValueVariablesGroup(
                {
                    str(output_classlabels[0]): 1.0 - total_score,
                    str(output_classlabels[1]): total_score,
                }
            )
        else:
            candidate_cls = classlabels[0]
            candidate_vote = total_votes[candidate_cls]
            for clslabel in classlabels[1:]:
                candidate_cls = optimizer.fold_case(
                    ibis.case()
                    .when(total_votes[clslabel] > candidate_vote, clslabel)
                    .else_(candidate_cls)
                    .end()
                )
                candidate_vote = optimizer.fold_case(
                    ibis.case()
                    .when(total_votes[clslabel] > candidate_vote, total_votes[clslabel])
                    .else_(candidate_vote)
                    .end()
                )

            label_expr = ibis.case()
            for clslabel in classlabels:
                label_expr = label_expr.when(candidate_cls == clslabel, clslabel)
            label_expr = label_expr.else_(ibis.null()).end()
            label_expr = optimizer.fold_case(label_expr)

            post_transform = typing.cast(
                str, self._attributes.get("post_transform", "NONE")
            )
            if post_transform == "SOFTMAX":
                # Use softmax as an hint that we are doing a gradient boosted tree,
                # thus the probability is the same as the score and should not be normalized
                prob_dict = ValueVariablesGroup(
                    {str(clslabel): total_votes[clslabel] for clslabel in classlabels}
                )
            else:
                # Compute probability to return it too.
                sum_votes = sum(total_votes[clslabel] for clslabel in classlabels)
                prob_dict = ValueVariablesGroup(
                    {
                        str(clslabel): total_votes[clslabel] / sum_votes
                        for clslabel in classlabels
                    }
                )

        return label_expr, prob_dict
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/trees/classifier.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx_aionnxml_TreeEnsembleClassifier.html
    # This is deprecated in ONNX but it's what skl2onnx uses.

    input_exr = self._variables.consume(self.inputs[0])
    if not isinstance(input_exr, (ibis.Expr, VariablesGroup)):
        raise ValueError(
            "TreeEnsembleClassifier: The first operand must be a column or a column group."
        )

    label_expr, prob_colgroup = self.build_classifier(input_exr)
    post_transform = typing.cast(
        str, self._attributes.get("post_transform", "NONE")
    )

    if post_transform != "NONE":
        if post_transform == "SOFTMAX":
            prob_colgroup = SoftmaxTranslator.compute_softmax(self, prob_colgroup)
        elif post_transform == "LOGISTIC":
            prob_colgroup = ValueVariablesGroup(
                {
                    lbl: LinearClassifierTranslator._apply_post_transform(
                        prob_col, post_transform
                    )
                    for lbl, prob_col in prob_colgroup.items()
                }
            )
        else:
            raise NotImplementedError(
                f"Post transform {post_transform} not implemented."
            )

    self._variables[self.outputs[0]] = label_expr
    self._variables[self.outputs[1]] = prob_colgroup
build_classifier
build_classifier(
    input_expr: Expr | VariablesGroup,
) -> tuple[Expr, VariablesGroup]

Build the classification expression and the probabilities expressions

Return the classification expression as the first argument and a group of variables (one for each category) for the probability expressions.

Source code in orbitalml/translation/steps/trees/classifier.py
def build_classifier(
    self, input_expr: ibis.Expr | VariablesGroup
) -> tuple[ibis.Expr, VariablesGroup]:
    """Build the classification expression and the probabilities expressions

    Return the classification expression as the first argument and a group of
    variables (one for each category) for the probability expressions.
    """
    optimizer = self._optimizer
    ensemble_trees = build_tree(self)

    classlabels = self._attributes.get(
        "classlabels_strings"
    ) or self._attributes.get("classlabels_int64s")
    if classlabels is None:
        raise ValueError("Unable to detect classlabels for classification")
    output_classlabels = classlabels = typing.cast(
        list[str] | list[int], classlabels
    )

    # ONNX treats binary classification as a special case:
    # https://github.com/microsoft/onnxruntime/blob/5982430af66f52a288cb8b2181e0b5b2e09118c8/onnxruntime/core/providers/cpu/ml/tree_ensemble_common.h#L854C1-L871C4
    # https://github.com/microsoft/onnxruntime/blob/5982430af66f52a288cb8b2181e0b5b2e09118c8/onnxruntime/core/providers/cpu/ml/tree_ensemble_aggregator.h#L469-L494
    # In this case there is only one weight and it's the probability of the positive class.
    # So we need to check if we are in a binary classification case.
    weights_classid = typing.cast(list[int], self._attributes["class_ids"])
    is_binary = len(classlabels) == 2 and len(set(weights_classid)) == 1
    if is_binary:
        # In this case there is only one label, the first one
        # which actually acts as the score of the prediction.
        # When > 0.5 then class 1, when < 0.5 then class 0
        classlabels = typing.cast(list[str] | list[int], [classlabels[0]])

    if isinstance(input_expr, VariablesGroup):
        ordered_features = input_expr.values_value()
    else:
        ordered_features = typing.cast(list[ibis.Value], [input_expr])
    ordered_features = [
        feature.name(self.variable_unique_short_alias("tcl"))
        for feature in ordered_features
    ]
    ordered_features = self.preserve(*ordered_features)

    def build_tree_case(node: dict) -> dict[str | int, ibis.Expr]:
        # Leaf node, return the votes
        if node["mode"] == "LEAF":
            # We can assume missing class = weight 0
            # The optimizer will remove this if both true and false have 0.
            return {
                clslabel: ibis.literal(node["weight"].get(clslabel, 0.0))
                for clslabel in classlabels
            }

        # Branch node, build a CASE statement
        feature_expr = ordered_features[node["feature_id"]]
        condition = mode_to_condition(node, feature_expr)

        true_votes = build_tree_case(node["true"])
        false_votes = build_tree_case(node["false"])

        votes = {}
        for clslabel in classlabels:
            t_val = true_votes[clslabel]
            f_val = false_votes[clslabel]
            votes[clslabel] = optimizer.fold_case(
                ibis.case().when(condition, t_val).else_(f_val).end()
            )
        return votes

    # Genera voti per ogni albero
    tree_votes = []
    for tree in ensemble_trees.values():
        tree_votes.append(build_tree_case(tree))

    # Aggregate votes from all trees.
    total_votes = {}
    for clslabel in classlabels:
        total_votes[clslabel] = ibis.literal(0.0)
        for votes in tree_votes:
            total_votes[clslabel] = optimizer.fold_operation(
                total_votes[clslabel] + votes.get(clslabel, ibis.literal(0.0))
            )

    # Compute prediction of class itself.
    if is_binary:
        total_score = total_votes[classlabels[0]]
        label_expr = optimizer.fold_case(
            ibis.case()
            .when(total_score > 0.5, output_classlabels[1])
            .else_(output_classlabels[0])
            .end()
        )
        # The order matters, for ONNX the VariableGroup is a list of subvariables
        # the names are not important.
        prob_dict = ValueVariablesGroup(
            {
                str(output_classlabels[0]): 1.0 - total_score,
                str(output_classlabels[1]): total_score,
            }
        )
    else:
        candidate_cls = classlabels[0]
        candidate_vote = total_votes[candidate_cls]
        for clslabel in classlabels[1:]:
            candidate_cls = optimizer.fold_case(
                ibis.case()
                .when(total_votes[clslabel] > candidate_vote, clslabel)
                .else_(candidate_cls)
                .end()
            )
            candidate_vote = optimizer.fold_case(
                ibis.case()
                .when(total_votes[clslabel] > candidate_vote, total_votes[clslabel])
                .else_(candidate_vote)
                .end()
            )

        label_expr = ibis.case()
        for clslabel in classlabels:
            label_expr = label_expr.when(candidate_cls == clslabel, clslabel)
        label_expr = label_expr.else_(ibis.null()).end()
        label_expr = optimizer.fold_case(label_expr)

        post_transform = typing.cast(
            str, self._attributes.get("post_transform", "NONE")
        )
        if post_transform == "SOFTMAX":
            # Use softmax as an hint that we are doing a gradient boosted tree,
            # thus the probability is the same as the score and should not be normalized
            prob_dict = ValueVariablesGroup(
                {str(clslabel): total_votes[clslabel] for clslabel in classlabels}
            )
        else:
            # Compute probability to return it too.
            sum_votes = sum(total_votes[clslabel] for clslabel in classlabels)
            prob_dict = ValueVariablesGroup(
                {
                    str(clslabel): total_votes[clslabel] / sum_votes
                    for clslabel in classlabels
                }
            )

    return label_expr, prob_dict

regressor

Implement regression based on trees

TreeEnsembleRegressorTranslator

Bases: Translator

Processes a TreeEnsembleClassifier node and updates the variables with the output expression.

This node is foundational for most tree based models: - Gradient Boosted Trees - Decision Trees

The parsing of the tree is done by the :func:build_tree function, which results in a dictionary of trees.

The class parses the trees to generate a set of CASE WHEN THEN ELSE expressions that are used to compute the prediction for each tree.

Source code in orbitalml/translation/steps/trees/regressor.py
class TreeEnsembleRegressorTranslator(Translator):
    """Processes a TreeEnsembleClassifier node and updates the variables with the output expression.

    This node is foundational for most tree based models:
    - Gradient Boosted Trees
    - Decision Trees

    The parsing of the tree is done by the :func:`build_tree` function,
    which results in a dictionary of trees.

    The class parses the trees to generate a set of `CASE WHEN THEN ELSE`
    expressions that are used to compute the prediction for each tree.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx_aionnxml_TreeEnsembleRegressor.html
        # This is deprecated in ONNX but it's what skl2onnx uses.
        input_exr = self._variables.consume(self.inputs[0])
        if not isinstance(input_exr, (ibis.Expr, VariablesGroup)):
            raise ValueError(
                "TreeEnsembleRegressor: The first operand must be a column or a column group."
            )

        prediction_expr = self.build_regressor(input_exr)
        self.set_output(prediction_expr)

    def build_regressor(self, input_expr: VariablesGroup | ibis.Expr) -> ibis.Expr:
        """Build the regression expression"""
        optimizer = self._optimizer
        ensemble_trees = build_tree(self)

        if isinstance(input_expr, VariablesGroup):
            ordered_features = input_expr.values_value()
        else:
            ordered_features = typing.cast(list[ibis.Value], [input_expr])
        ordered_features = [
            feature.name(self.variable_unique_short_alias("tcl"))
            for feature in ordered_features
        ]
        ordered_features = self.preserve(*ordered_features)

        def build_tree_value(node: dict) -> ibis.Expr:
            # Leaf node, should return the prediction weight
            if node["mode"] == "LEAF":
                return ibis.literal(node["weight"])

            # BRANCH node, should return a CASE statement
            feature_expr = ordered_features[node["feature_id"]]
            condition = mode_to_condition(node, feature_expr)

            if node["missing_tracks_true"]:
                raise NotImplementedError("Missing value tracks true not supported")

            true_val = build_tree_value(node["true"])
            false_val = build_tree_value(node["false"])
            case_expr = optimizer.fold_case(
                ibis.case().when(condition, true_val).else_(false_val).end()
            )
            return case_expr

        # Build results from each tree and sum them
        tree_values = []
        for tree in ensemble_trees.values():
            tree_values.append(build_tree_value(tree))
        total_value: ibis.NumericValue = ibis.literal(0.0)
        for val in tree_values:
            total_value = optimizer.fold_operation(total_value + val)

        # According to ONNX doc: can be left unassigned (assumed 0)
        base_values = typing.cast(
            list[float], self._attributes.get("base_values", [0.0])
        )
        if len(base_values) != 1:
            raise NotImplementedError("Base values with length != 1 not supported")
        total_value = optimizer.fold_operation(
            total_value + ibis.literal(base_values[0])
        )

        return total_value
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/trees/regressor.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx_aionnxml_TreeEnsembleRegressor.html
    # This is deprecated in ONNX but it's what skl2onnx uses.
    input_exr = self._variables.consume(self.inputs[0])
    if not isinstance(input_exr, (ibis.Expr, VariablesGroup)):
        raise ValueError(
            "TreeEnsembleRegressor: The first operand must be a column or a column group."
        )

    prediction_expr = self.build_regressor(input_exr)
    self.set_output(prediction_expr)
build_regressor
build_regressor(input_expr: VariablesGroup | Expr) -> Expr

Build the regression expression

Source code in orbitalml/translation/steps/trees/regressor.py
def build_regressor(self, input_expr: VariablesGroup | ibis.Expr) -> ibis.Expr:
    """Build the regression expression"""
    optimizer = self._optimizer
    ensemble_trees = build_tree(self)

    if isinstance(input_expr, VariablesGroup):
        ordered_features = input_expr.values_value()
    else:
        ordered_features = typing.cast(list[ibis.Value], [input_expr])
    ordered_features = [
        feature.name(self.variable_unique_short_alias("tcl"))
        for feature in ordered_features
    ]
    ordered_features = self.preserve(*ordered_features)

    def build_tree_value(node: dict) -> ibis.Expr:
        # Leaf node, should return the prediction weight
        if node["mode"] == "LEAF":
            return ibis.literal(node["weight"])

        # BRANCH node, should return a CASE statement
        feature_expr = ordered_features[node["feature_id"]]
        condition = mode_to_condition(node, feature_expr)

        if node["missing_tracks_true"]:
            raise NotImplementedError("Missing value tracks true not supported")

        true_val = build_tree_value(node["true"])
        false_val = build_tree_value(node["false"])
        case_expr = optimizer.fold_case(
            ibis.case().when(condition, true_val).else_(false_val).end()
        )
        return case_expr

    # Build results from each tree and sum them
    tree_values = []
    for tree in ensemble_trees.values():
        tree_values.append(build_tree_value(tree))
    total_value: ibis.NumericValue = ibis.literal(0.0)
    for val in tree_values:
        total_value = optimizer.fold_operation(total_value + val)

    # According to ONNX doc: can be left unassigned (assumed 0)
    base_values = typing.cast(
        list[float], self._attributes.get("base_values", [0.0])
    )
    if len(base_values) != 1:
        raise NotImplementedError("Base values with length != 1 not supported")
    total_value = optimizer.fold_operation(
        total_value + ibis.literal(base_values[0])
    )

    return total_value

tree

Prase tree definitions and return a graph of nodes.

build_tree
build_tree(
    translator: Translator,
) -> dict[int, dict[int, dict]]

Build a tree based on nested dictionaries of nodes.

The tree is built based on the node and attributes of the translator.

Source code in orbitalml/translation/steps/trees/tree.py
def build_tree(translator: Translator) -> dict[int, dict[int, dict]]:
    """Build a tree based on nested dictionaries of nodes.

    The tree is built based on the node and attributes of the translator.
    """
    nodes_treeids = typing.cast(list[int], translator._attributes["nodes_treeids"])
    nodes_nodeids = typing.cast(list[int], translator._attributes["nodes_nodeids"])
    nodes_modes = typing.cast(list[str], translator._attributes["nodes_modes"])
    nodes_truenodeids = typing.cast(
        list[int], translator._attributes["nodes_truenodeids"]
    )
    nodes_falsenodeids = typing.cast(
        list[int], translator._attributes["nodes_falsenodeids"]
    )
    nodes_thresholds = typing.cast(list[float], translator._attributes["nodes_values"])
    nodes_featureids = typing.cast(
        list[int], translator._attributes["nodes_featureids"]
    )
    nodes_missing_value_tracks_true = typing.cast(
        list[int], translator._attributes["nodes_missing_value_tracks_true"]
    )
    node = translator._node

    # Assert a few things to ensure we don't ed up genearting a tree with wrong data
    # All entries related to branches should match in length
    assert (
        len(nodes_treeids)
        == len(nodes_nodeids)
        == len(nodes_modes)
        == len(nodes_truenodeids)
        == len(nodes_falsenodeids)
        == len(nodes_thresholds)
        == len(nodes_featureids)
    )

    # Weight could be a float or a dictionary of class labels weights
    weights: dict = {}
    if node.op_type == "TreeEnsembleClassifier":
        weights = typing.cast(dict[tuple[int, int], dict[str | int, float]], weights)
        # Weights for classifier, in this case the weights are per-class
        class_nodeids = typing.cast(list[int], translator._attributes["class_nodeids"])
        class_treeids = typing.cast(list[int], translator._attributes["class_treeids"])
        class_weights = typing.cast(
            list[float], translator._attributes["class_weights"]
        )
        weights_classid = typing.cast(list[int], translator._attributes["class_ids"])
        assert (
            len(class_treeids)
            == len(class_nodeids)
            == len(class_weights)
            == len(weights_classid)
        )
        classlabels = typing.cast(
            None | list[str | int],
            translator._attributes.get("classlabels_strings")
            or translator._attributes.get("classlabels_int64s"),
        )
        if not classlabels:
            raise ValueError("Missing class labels when building tree")

        for tree_id, node_id, weight, weight_classid in zip(
            class_treeids, class_nodeids, class_weights, weights_classid
        ):
            node_weights = typing.cast(
                dict[str | int, float], weights.setdefault((tree_id, node_id), {})
            )
            node_weights[classlabels[weight_classid]] = weight

    elif node.op_type == "TreeEnsembleRegressor":
        # Weights for the regressor, in this case leaf nodes have only 1 weight
        weights = typing.cast(dict[tuple[int, int], float], weights)
        target_weights = typing.cast(
            list[float], translator._attributes["target_weights"]
        )
        target_nodeids = typing.cast(
            list[int], translator._attributes["target_nodeids"]
        )
        target_treeids = typing.cast(
            list[int], translator._attributes["target_treeids"]
        )
        assert len(target_treeids) == len(target_nodeids) == len(target_weights)
        for tree_id, node_id, weight in zip(
            target_treeids, target_nodeids, target_weights
        ):
            weights[(tree_id, node_id)] = weight
    else:
        raise NotImplementedError(f"Unsupported tree node type: {node.op_type}")

    # Create all nodes for the trees
    trees: dict[int, dict[int, dict]] = {}
    for tree_id, node_id, mode, true_id, false_id, threshold, feature_id in zip(
        nodes_treeids,
        nodes_nodeids,
        nodes_modes,
        nodes_truenodeids,
        nodes_falsenodeids,
        nodes_thresholds,
        nodes_featureids,
    ):
        if tree_id not in trees:
            trees[tree_id] = {}

        node_dict = {
            "id": (tree_id, node_id),
            "mode": mode,
            "feature_id": feature_id,
            "missing_tracks_true": bool(
                nodes_missing_value_tracks_true[node_id]
                if nodes_missing_value_tracks_true
                else 0
            ),
        }
        if mode == "LEAF":
            node_dict["weight"] = weights[(tree_id, node_id)]
        else:
            node_dict["treshold"] = threshold

        trees[tree_id][node_id] = node_dict

    # Link nodes creating a tree structure
    for tree_id, node_id, true_id, false_id in zip(
        nodes_treeids,
        nodes_nodeids,
        nodes_truenodeids,
        nodes_falsenodeids,
    ):
        if node_id in trees[tree_id]:
            node_dict = trees[tree_id][node_id]
            if node_dict["mode"] == "LEAF":
                # Leaf nodes have no true or false branches
                # In the end they are leaves so they don't have branches
                continue
            if true_id in trees[tree_id]:
                node_dict["true"] = trees[tree_id][true_id]
            if false_id in trees[tree_id]:
                node_dict["false"] = trees[tree_id][false_id]

    return {tree_id: trees[tree_id][0] for tree_id in trees}
mode_to_condition
mode_to_condition(node: dict, feature_expr: Expr) -> Expr

Build a comparison expression for a branch node.

The comparison is based on the mode of the node and the threshold for that noode. The feature will be compared to the threshold using the operator defined by the mode.

Source code in orbitalml/translation/steps/trees/tree.py
def mode_to_condition(node: dict, feature_expr: ibis.Expr) -> ibis.Expr:
    """Build a comparison expression for a branch node.

    The comparison is based on the mode of the node and the threshold
    for that noode. The feature will be compared to the threshold
    using the operator defined by the mode.
    """
    threshold = node["treshold"]
    if node["mode"] == "BRANCH_LEQ":
        condition = feature_expr <= threshold
    elif node["mode"] == "BRANCH_LT":
        condition = feature_expr < threshold
    elif node["mode"] == "BRANCH_GTE":
        condition = feature_expr >= threshold
    elif node["mode"] == "BRANCH_GT":
        condition = feature_expr > threshold
    elif node["mode"] == "BRANCH_EQ":
        condition = feature_expr == threshold
    elif node["mode"] == "BRANCH_NEQ":
        condition = feature_expr != threshold
    else:
        raise NotImplementedError(f"Unsupported node mode: {node['mode']}")
    return condition

orbitalml.translation.steps.where

Implementation of the Where operator.

WhereTranslator

Bases: Translator

Processes a Where node and updates the variables with the output expression.

The where operation is expected to return ether its first or second input depending on a condition variable. When the variable is true, the first input is returned, otherwise the second input is returned.

The condition variable will usually be a column computed through an expression that represents a boolean predicate.

The first and second inputs can be either a single column or a group of columns. If any of the two is a group of columns, a new group of column is produced as the result. If both are single columns, the result is a single column.

Source code in orbitalml/translation/steps/where.py
class WhereTranslator(Translator):
    """Processes a Where node and updates the variables with the output expression.

    The where operation is expected to return ether its first or second input
    depending on a condition variable. When the variable is true, the first
    input is returned, otherwise the second input is returned.

    The condition variable will usually be a column computed through an expression
    that represents a boolean predicate.

    The first and second inputs can be either a single column or a group of columns.
    If any of the two is a group of columns, a new group of column is produced
    as the result.
    If both are single columns, the result is a single column.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx__Where.html
        condition_expr = self._variables.consume(self.inputs[0])
        true_expr = self._variables.consume(self.inputs[1])
        false_expr = self._variables.consume(self.inputs[2])

        if isinstance(condition_expr, VariablesGroup):
            raise NotImplementedError(
                "Where: The condition expression can't be a group of columns. Must be a single column."
            )

        if isinstance(true_expr, VariablesGroup) and isinstance(
            false_expr, VariablesGroup
        ):
            true_values = list(true_expr.values())
            false_values = list(false_expr.values())
            if len(true_values) != len(false_values):
                raise ValueError(
                    "Where: The number of values in the true and false expressions must match."
                )
            result = ValueVariablesGroup()
            for true_val, false_val, idx in zip(
                true_values, false_values, itertools.count()
            ):
                result[f"c{idx}"] = self._optimizer.fold_case(
                    ibis.case().when(condition_expr, true_val).else_(false_val).end()
                )
        elif isinstance(true_expr, VariablesGroup) and not isinstance(
            false_expr, VariablesGroup
        ):
            result = ValueVariablesGroup()
            for idx, true_val in enumerate(true_expr.values()):
                result[f"c{idx}"] = self._optimizer.fold_case(
                    ibis.case().when(condition_expr, true_val).else_(false_expr).end()
                )
        elif not isinstance(true_expr, VariablesGroup) and isinstance(
            false_expr, VariablesGroup
        ):
            result = ValueVariablesGroup()
            for idx, false_val in enumerate(false_expr.values()):
                result[f"c{idx}"] = self._optimizer.fold_case(
                    ibis.case().when(condition_expr, true_expr).else_(false_val).end()
                )
        else:
            result = self._optimizer.fold_case(
                ibis.case().when(condition_expr, true_expr).else_(false_expr).end()
            )

        self.set_output(result)
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/where.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx__Where.html
    condition_expr = self._variables.consume(self.inputs[0])
    true_expr = self._variables.consume(self.inputs[1])
    false_expr = self._variables.consume(self.inputs[2])

    if isinstance(condition_expr, VariablesGroup):
        raise NotImplementedError(
            "Where: The condition expression can't be a group of columns. Must be a single column."
        )

    if isinstance(true_expr, VariablesGroup) and isinstance(
        false_expr, VariablesGroup
    ):
        true_values = list(true_expr.values())
        false_values = list(false_expr.values())
        if len(true_values) != len(false_values):
            raise ValueError(
                "Where: The number of values in the true and false expressions must match."
            )
        result = ValueVariablesGroup()
        for true_val, false_val, idx in zip(
            true_values, false_values, itertools.count()
        ):
            result[f"c{idx}"] = self._optimizer.fold_case(
                ibis.case().when(condition_expr, true_val).else_(false_val).end()
            )
    elif isinstance(true_expr, VariablesGroup) and not isinstance(
        false_expr, VariablesGroup
    ):
        result = ValueVariablesGroup()
        for idx, true_val in enumerate(true_expr.values()):
            result[f"c{idx}"] = self._optimizer.fold_case(
                ibis.case().when(condition_expr, true_val).else_(false_expr).end()
            )
    elif not isinstance(true_expr, VariablesGroup) and isinstance(
        false_expr, VariablesGroup
    ):
        result = ValueVariablesGroup()
        for idx, false_val in enumerate(false_expr.values()):
            result[f"c{idx}"] = self._optimizer.fold_case(
                ibis.case().when(condition_expr, true_expr).else_(false_val).end()
            )
    else:
        result = self._optimizer.fold_case(
            ibis.case().when(condition_expr, true_expr).else_(false_expr).end()
        )

    self.set_output(result)

orbitalml.translation.steps.zipmap

Implementation of the ZipMap operator.

ZipMapTranslator

Bases: Translator

Processes a ZipMap node and updates the variables with the output expression.

The ZipMap operator is used to map values from one variable to another set of values. It is usually meant to map numeric values to categories.

If the input is a group of columns, all columns in the group will be remappped according to the class labels.

Source code in orbitalml/translation/steps/zipmap.py
class ZipMapTranslator(Translator):
    """Processes a ZipMap node and updates the variables with the output expression.

    The ZipMap operator is used to map values from one variable to another set
    of values. It is usually meant to map numeric values to categories.

    If the input is a group of columns, all columns in the group
    will be remappped according to the class labels.
    """

    def process(self) -> None:
        """Performs the translation and set the output variable."""
        # https://onnx.ai/onnx/operators/onnx_aionnxml_ZipMap.html
        data = self._variables.consume(self.inputs[0])

        int_labels = typing.cast(
            list[int] | None, self._attributes.get("classlabels_int64s")
        )
        string_labels = typing.cast(
            list[str] | None, self._attributes.get("classlabels_strings")
        )
        if string_labels is not None:
            labels = string_labels
        elif int_labels is not None:
            labels = [str(i) for i in int_labels]
        else:
            raise ValueError("ZipMap: required mapping attributes not found.")

        if isinstance(data, VariablesGroup):
            if len(labels) != len(data):
                raise ValueError("ZipMap: The number of labels and columns must match.")
            result = ValueVariablesGroup(
                {label: value for label, value in zip(labels, data.values())}
            )
        elif isinstance(data, ibis.Expr):
            if len(labels) != 1:
                raise ValueError("ZipMap: The number of labels and columns must match.")
            result = ValueVariablesGroup({label: data for label in labels})
        else:
            raise ValueError(
                f"ZipMap: expected a column group or a single column. Got {type(data)}"
            )

        self.set_output(result)
process
process() -> None

Performs the translation and set the output variable.

Source code in orbitalml/translation/steps/zipmap.py
def process(self) -> None:
    """Performs the translation and set the output variable."""
    # https://onnx.ai/onnx/operators/onnx_aionnxml_ZipMap.html
    data = self._variables.consume(self.inputs[0])

    int_labels = typing.cast(
        list[int] | None, self._attributes.get("classlabels_int64s")
    )
    string_labels = typing.cast(
        list[str] | None, self._attributes.get("classlabels_strings")
    )
    if string_labels is not None:
        labels = string_labels
    elif int_labels is not None:
        labels = [str(i) for i in int_labels]
    else:
        raise ValueError("ZipMap: required mapping attributes not found.")

    if isinstance(data, VariablesGroup):
        if len(labels) != len(data):
            raise ValueError("ZipMap: The number of labels and columns must match.")
        result = ValueVariablesGroup(
            {label: value for label, value in zip(labels, data.values())}
        )
    elif isinstance(data, ibis.Expr):
        if len(labels) != 1:
            raise ValueError("ZipMap: The number of labels and columns must match.")
        result = ValueVariablesGroup({label: data for label in labels})
    else:
        raise ValueError(
            f"ZipMap: expected a column group or a single column. Got {type(data)}"
        )

    self.set_output(result)