Source code for merlin.batch.config

# Copyright 2020 The Merlin Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum
from typing import Optional, Dict

from merlin.batch.sink import Sink
from merlin.batch.source import Source


[docs]class ResultType(Enum): DOUBLE = "DOUBLE" FLOAT = "FLOAT" INTEGER = "INTEGER" LONG = "LONG" STRING = "STRING" ARRAY = "ARRAY"
[docs]class PredictionJobResourceRequest: """ Resource request configuration for starting prediction job """
[docs] def __init__(self, driver_cpu_request: str, driver_memory_request: str, executor_cpu_request: str, executor_memory_request: str, executor_replica: int): """ Create resource request object :param driver_cpu_request: driver's cpu request in kubernetes request format (e.g. : 500m, 1, 2, etc) :param driver_memory_request: driver's memory request in kubernetes format (e.g.: 512Mi, 1Gi, 2Gi, etc) :param executor_cpu_request: executors's cpu request in kubernetes request format (e.g. : 500m, 1, 2, etc) :param executor_memory_request: executors's memory request in kubernetes format (e.g.: 512Mi, 1Gi, 2Gi, etc) :param executor_replica: number of executor to be used """ self._driver_cpu_request = driver_cpu_request self._driver_memory_request = driver_memory_request self._executor_cpu_request = executor_cpu_request self._executor_memory_request = executor_memory_request self._executor_replica = executor_replica
[docs] def to_dict(self): return { "driver_cpu_request": self._driver_cpu_request, "driver_memory_request": self._driver_memory_request, "executor_cpu_request": self._executor_cpu_request, "executor_memory_request": self._executor_memory_request, "executor_replica": self._executor_replica }
[docs]class PredictionJobConfig:
[docs] def __init__(self, source: Source, sink: Sink, service_account_name: str, result_type: ResultType = ResultType.DOUBLE, item_type: ResultType = ResultType.DOUBLE, resource_request: PredictionJobResourceRequest = None, env_vars: Dict[str, str] = None): """ Create configuration for starting a prediction job :param source: source configuration. See merlin.batch.source package. :param sink: sink configuration. See merlin.batch.sink package :param service_account_name: secret name containing the service account for executing the prediction job. :param result_type: type of the prediction result (default to ResultType.DOUBLE). :param item_type: item type of the prediction result if the result_type is ResultType.ARRAY. Otherwise will be ignored. :param resource_request: optional resource request for starting the prediction job. If not given the system default will be used. :param env_vars: optional environment variables in the form of a key value pair in a list. """ self._source = source self._sink = sink self._service_account_name = service_account_name self._resource_request = resource_request self._result_type = result_type self._item_type = item_type self._env_vars = env_vars
@property def source(self) -> Source: return self._source @property def sink(self) -> Sink: return self._sink @property def service_account_name(self) -> str: return self._service_account_name @property def resource_request(self) -> Optional[PredictionJobResourceRequest]: return self._resource_request @property def result_type(self) -> ResultType: return self._result_type @property def item_type(self) -> ResultType: return self._item_type @property def env_vars(self) -> Optional[Dict[str, str]]: return self._env_vars