Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions amber/src/main/python/core/models/schema/attribute_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ class AttributeType(Enum):
}


FROM_STRING_PARSER_MAPPING = {
AttributeType.STRING: str,
AttributeType.INT: int,
AttributeType.LONG: int,
AttributeType.DOUBLE: float,
AttributeType.BOOL: lambda v: str(v).strip().lower() in ("true", "1", "yes"),
AttributeType.BINARY: lambda v: v if isinstance(v, bytes) else str(v).encode(),
AttributeType.TIMESTAMP: lambda v: datetime.datetime.fromisoformat(v),
AttributeType.LARGE_BINARY: largebinary,
}

# Only single-directional mapping.
TO_PYOBJECT_MAPPING = {
AttributeType.STRING: str,
Expand Down
2 changes: 2 additions & 0 deletions amber/src/main/python/pyamber/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
SourceOperator,
TupleOperatorV2,
State,
AttributeType,
)

__all__ = [
Expand All @@ -41,4 +42,5 @@
"TupleOperatorV2",
"SourceOperator",
"State",
"AttributeType",
]
2 changes: 2 additions & 0 deletions amber/src/main/python/pytexera/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
UDFSourceOperator,
)
from core.models.type.large_binary import largebinary
from core.models.schema.attribute_type import *

__all__ = [
"State",
Expand All @@ -53,4 +54,5 @@
"Iterator",
"Optional",
"Union",
"AttributeType",
]
154 changes: 154 additions & 0 deletions amber/src/main/python/pytexera/udf/test_udf_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import datetime
from typing import Iterator, Optional

import pytest

from core.models.type.large_binary import largebinary
from pytexera import AttributeType, Tuple, TupleLike, UDFOperatorV2
from pytexera.udf.udf_operator import _UiParameterSupport


class InjectedParametersOperator(UDFOperatorV2):
def _texera_injected_ui_parameters(self):
return {
"count": "7",
"enabled": "yes",
"created_at": "2024-01-01T00:00:00",
}

def open(self):
self.count_parameter = self.UiParameter("count", AttributeType.INT)
self.enabled_parameter = self.UiParameter(
name="enabled", type=AttributeType.BOOL
)
self.created_at_parameter = self.UiParameter(
"created_at", type=AttributeType.TIMESTAMP
)

def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_


class ConflictingParameterOperator(UDFOperatorV2):
def _texera_injected_ui_parameters(self):
return {"duplicate": "1"}

def open(self):
self.UiParameter("duplicate", AttributeType.INT)
self.UiParameter("duplicate", AttributeType.STRING)

def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_


class FirstIndependentParameterOperator(UDFOperatorV2):
def _texera_injected_ui_parameters(self):
return {"count": "1"}

def open(self):
self.count_parameter = self.UiParameter("count", AttributeType.INT)

def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_


class SecondIndependentParameterOperator(UDFOperatorV2):
def _texera_injected_ui_parameters(self):
return {"count": "2"}

def open(self):
self.count_parameter = self.UiParameter("count", AttributeType.INT)

def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_


class TestUiParameterSupport:
def test_injected_values_are_applied_before_open(self):
operator = InjectedParametersOperator()

operator.open()

assert operator.count_parameter.value == 7
assert operator.enabled_parameter.value is True
assert operator.created_at_parameter.value == datetime.datetime(
2024, 1, 1, 0, 0
)

def test_duplicate_parameter_names_with_conflicting_types_raise(self):
operator = ConflictingParameterOperator()

with pytest.raises(ValueError) as exc_info:
operator.open()

assert "Duplicate UiParameter name 'duplicate'" in str(exc_info.value)

@pytest.mark.parametrize(
("raw_value", "attr_type", "expected"),
[
("hello", AttributeType.STRING, "hello"),
("7", AttributeType.INT, 7),
("99", AttributeType.LONG, 99),
("3.14", AttributeType.DOUBLE, 3.14),
("yes", AttributeType.BOOL, True),
("payload", AttributeType.BINARY, b"payload"),
(
"2024-01-01T00:00:00",
AttributeType.TIMESTAMP,
datetime.datetime(2024, 1, 1, 0, 0),
),
(
"s3://bucket/path/to/object",
AttributeType.LARGE_BINARY,
largebinary("s3://bucket/path/to/object"),
),
],
)
def test_parse_supported_types(self, raw_value, attr_type, expected):
assert _UiParameterSupport._parse(raw_value, attr_type) == expected

def test_parse_unsupported_type_raises_helpful_error(self):
with pytest.raises(TypeError, match="UiParameter.type .* is not supported"):
_UiParameterSupport._parse("value", object())

def test_wrapped_open_uses_instance_local_state(self):
assert (
getattr(
FirstIndependentParameterOperator.open,
"__texera_ui_params_wrapped__",
False,
)
is True
)

first_operator = FirstIndependentParameterOperator()
second_operator = SecondIndependentParameterOperator()

first_operator.open()
second_operator.open()

assert first_operator.count_parameter.value == 1
assert second_operator.count_parameter.value == 2
assert first_operator._ui_parameter_injected_values == {"count": "1"}
assert second_operator._ui_parameter_injected_values == {"count": "2"}
assert (
first_operator._ui_parameter_injected_values
is not second_operator._ui_parameter_injected_values
)
113 changes: 108 additions & 5 deletions amber/src/main/python/pytexera/udf/udf_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,115 @@
# under the License.

from abc import abstractmethod
from typing import Iterator, Optional, Union
from dataclasses import dataclass
import functools
from typing import Any, Dict, Iterator, Optional, Union

from pyamber import *
from core.models.schema.attribute_type import AttributeType, FROM_STRING_PARSER_MAPPING


class UDFOperatorV2(TupleOperatorV2):
@dataclass(frozen=True)
class _UiParameterValue:
name: str
type: AttributeType
value: Any


class _UiParameterSupport:
_ui_parameter_injected_values: Dict[str, Any]
_ui_parameter_name_types: Dict[str, AttributeType]

# Reserved hook name. Backend injector will generate this in the user's class.
def _texera_injected_ui_parameters(self) -> Dict[str, Any]:
return {}

def _ensure_ui_parameter_state(self) -> None:
if "_ui_parameter_injected_values" not in self.__dict__:
self._ui_parameter_injected_values = {}
if "_ui_parameter_name_types" not in self.__dict__:
self._ui_parameter_name_types = {}

def _texera_apply_injected_ui_parameters(self) -> None:
self._ensure_ui_parameter_state()
values = self._texera_injected_ui_parameters()
self._ui_parameter_injected_values = dict(values or {})
self._ui_parameter_name_types = {}

def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)

# Wrap the effective open() method once per subclass.
original_open = getattr(cls, "open", None)
if original_open is None:
return

# Avoid double wrapping
if getattr(original_open, "__texera_ui_params_wrapped__", False):
return

@functools.wraps(original_open)
def wrapped_open(self, *args, **kwargs):
self._texera_apply_injected_ui_parameters()
return original_open(self, *args, **kwargs)

setattr(wrapped_open, "__texera_ui_params_wrapped__", True)
cls.open = wrapped_open

def UiParameter(
self, name: str, attr_type: Optional[AttributeType] = None, **kwargs: Any
) -> _UiParameterValue:
if "type" in kwargs:
if attr_type is not None:
raise TypeError("UiParameter.type was provided multiple times.")
attr_type = kwargs.pop("type")

if kwargs:
unexpected_arguments = ", ".join(sorted(kwargs))
raise TypeError(
f"UiParameter got unexpected keyword argument(s): "
f"{unexpected_arguments}."
)

if attr_type is None:
raise TypeError("UiParameter.type is required.")

if not isinstance(attr_type, AttributeType):
raise TypeError(
f"UiParameter.type must be an AttributeType, got {attr_type!r}."
)

self._ensure_ui_parameter_state()
existing_type = self._ui_parameter_name_types.get(name)
if existing_type is not None and existing_type != attr_type:
raise ValueError(
f"Duplicate UiParameter name '{name}' with conflicting types: "
f"{existing_type.name} vs {attr_type.name}."
)

self._ui_parameter_name_types[name] = attr_type
raw_value = self._ui_parameter_injected_values.get(name)
return _UiParameterValue(
name=name,
type=attr_type,
value=self._parse(raw_value, attr_type),
)

@staticmethod
def _parse(value: Any, attr_type: AttributeType) -> Any:
if value is None:
return None

py_type = FROM_STRING_PARSER_MAPPING.get(attr_type)
if py_type is None:
raise TypeError(
f"UiParameter.type {attr_type!r} is not supported for parsing."
)

return py_type(value)


class UDFOperatorV2(_UiParameterSupport, TupleOperatorV2):
"""
Base class for tuple-oriented user-defined operators. A concrete implementation must
be provided upon using.
Expand Down Expand Up @@ -65,7 +168,7 @@ def close(self) -> None:
pass


class UDFSourceOperator(SourceOperator):
class UDFSourceOperator(_UiParameterSupport, SourceOperator):
def open(self) -> None:
"""
Open a context of the operator. Usually can be used for loading/initiating some
Expand All @@ -90,7 +193,7 @@ def close(self) -> None:
pass


class UDFTableOperator(TableOperator):
class UDFTableOperator(_UiParameterSupport, TableOperator):
"""
Base class for table-oriented user-defined operators. A concrete implementation must
be provided upon using.
Expand Down Expand Up @@ -123,7 +226,7 @@ def close(self) -> None:
pass


class UDFBatchOperator(BatchOperator):
class UDFBatchOperator(_UiParameterSupport, BatchOperator):
"""
Base class for batch-oriented user-defined operators. A concrete implementation must
be provided upon using.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.texera.amber.pybuilder.EncodableStringAnnotation;
import org.apache.texera.amber.pybuilder.PyStringTypes;
import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableStringFactory$;


import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
Expand Down Expand Up @@ -49,6 +53,7 @@ public Attribute(

@JsonProperty(value = "attributeName", required = true)
@NotBlank(message = "Attribute name is required")
@EncodableStringAnnotation
public String getName() {
return attributeName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp {
)
var outputColumns: List[Attribute] = List()

@JsonProperty
@JsonSchemaTitle("Parameters")
@JsonPropertyDescription("Parameters inferred from self.UiParameter(...) in Python script")
var uiParameters: List[UiUDFParameter] = List()

override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
Expand All @@ -88,7 +93,7 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp {
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, "python")
OpExecWithCode(PythonUdfUiParameterInjector.inject(code, uiParameters), "python")
)
.withParallelizable(true)
.withSuggestedWorkerNum(workers)
Expand All @@ -98,7 +103,7 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp {
workflowId,
executionId,
operatorIdentifier,
OpExecWithCode(code, "python")
OpExecWithCode(PythonUdfUiParameterInjector.inject(code, uiParameters), "python")
)
.withParallelizable(false)
}
Expand Down
Loading
Loading