merlin.batch package

Submodules

merlin.batch.big_query_util module

merlin.batch.big_query_util.valid_column(column_name: str) → bool[source]

Validate BigQuery column name

Parameters:column_name – BigQuery column name
Returns:boolean

Rules based on this page https://cloud.google.com/bigquery/docs/schemas#column_names * A column name must contain only letters (a-z, A-Z), numbers (0-9), or underscores (_) * It must start with a letter or underscore * Maximum length 128

merlin.batch.big_query_util.valid_columns(columns) → bool[source]

Validate multiple BiqQuery columns

Parameters:columns – List of columns
Returns:boolean
merlin.batch.big_query_util.valid_dataset(dataset: str) → bool[source]

Validate BigQuery dataset name

Parameters:dataset – BigQuery dataset name
Returns:boolean

Rules based on this page https://cloud.google.com/bigquery/docs/datasets#dataset-naming * May contain up to 1,024 characters * Can contain letters (upper or lower case), numbers, and underscores

merlin.batch.big_query_util.valid_table_id(table_id: str) → bool[source]

Validate BigQuery source_table which satisfied this format project_id.dataset.table

Parameters:table_id – Source table
Returns:boolean
merlin.batch.big_query_util.valid_table_name(table_name: str) → bool[source]

Validate BigQuery table name

Parameters:table_name – BigQuery table name
Returns:boolean

Rules based on this page https://cloud.google.com/bigquery/docs/tables#table_naming * A table name must contain only letters (a-z, A-Z), numbers (0-9), or underscores (_) * Maximum length 1024

merlin.batch.big_query_util.validate_text(text: str, pattern: str, max_length: int) → bool[source]

Validate text based on regex pattern and maximum length allowed

Parameters:
  • text – Text to validate
  • pattern – Regular expression pattern to validate text
  • max_length – Maximum length allowed
Returns:

boolean

merlin.batch.config module

class merlin.batch.config.PredictionJobConfig(source: merlin.batch.source.Source, sink: merlin.batch.sink.Sink, service_account_name: str, result_type: merlin.batch.config.ResultType = <ResultType.DOUBLE: 'DOUBLE'>, item_type: merlin.batch.config.ResultType = <ResultType.DOUBLE: 'DOUBLE'>, resource_request: merlin.batch.config.PredictionJobResourceRequest = None, env_vars: Dict[str, str] = None)[source]

Bases: object

__init__(source: merlin.batch.source.Source, sink: merlin.batch.sink.Sink, service_account_name: str, result_type: merlin.batch.config.ResultType = <ResultType.DOUBLE: 'DOUBLE'>, item_type: merlin.batch.config.ResultType = <ResultType.DOUBLE: 'DOUBLE'>, resource_request: merlin.batch.config.PredictionJobResourceRequest = None, env_vars: Dict[str, str] = None)[source]

Create configuration for starting a prediction job

Parameters:
  • source – source configuration. See merlin.batch.source package.
  • sink – sink configuration. See merlin.batch.sink package
  • service_account_name – secret name containing the service account for executing the prediction job.
  • result_type – type of the prediction result (default to ResultType.DOUBLE).
  • item_type – item type of the prediction result if the result_type is ResultType.ARRAY. Otherwise will be ignored.
  • resource_request – optional resource request for starting the prediction job. If not given the system default will be used.
  • env_vars – optional environment variables in the form of a key value pair in a list.
env_vars
item_type
resource_request
result_type
service_account_name
sink
source
class merlin.batch.config.PredictionJobResourceRequest(driver_cpu_request: str, driver_memory_request: str, executor_cpu_request: str, executor_memory_request: str, executor_replica: int)[source]

Bases: object

Resource request configuration for starting prediction job

__init__(driver_cpu_request: str, driver_memory_request: str, executor_cpu_request: str, executor_memory_request: str, executor_replica: int)[source]

Create resource request object

Parameters:
  • driver_cpu_request – driver’s cpu request in kubernetes request format (e.g. : 500m, 1, 2, etc)
  • driver_memory_request – driver’s memory request in kubernetes format (e.g.: 512Mi, 1Gi, 2Gi, etc)
  • executor_cpu_request – executors’s cpu request in kubernetes request format (e.g. : 500m, 1, 2, etc)
  • executor_memory_request – executors’s memory request in kubernetes format (e.g.: 512Mi, 1Gi, 2Gi, etc)
  • executor_replica – number of executor to be used
to_dict()[source]
class merlin.batch.config.ResultType[source]

Bases: enum.Enum

An enumeration.

ARRAY = 'ARRAY'
DOUBLE = 'DOUBLE'
FLOAT = 'FLOAT'
INTEGER = 'INTEGER'
LONG = 'LONG'
STRING = 'STRING'

merlin.batch.job module

class merlin.batch.job.JobStatus[source]

Bases: enum.Enum

An enumeration.

COMPLETED = 'completed'
FAILED = 'failed'
FAILED_SUBMISSION = 'failed_submission'
PENDING = 'pending'
RUNNING = 'running'
TERMINATED = 'terminated'
TERMINATING = 'terminating'
class merlin.batch.job.PredictionJob(job: client.models.prediction_job.PredictionJob, api_client: client.api_client.ApiClient)[source]

Bases: object

error

Error message containing the reason of failed job

Returns:str
id

ID of prediction job

Returns:int
name

Prediction job name

Returns:str
refresh()[source]

Updates status of a prediction job

Returns:
status

Prediction job status

Returns:JobStatus
stop()[source]

Stops a prediction job from running

Returns:

merlin.batch.sink module

class merlin.batch.sink.BigQuerySink(table: str, staging_bucket: str, result_column: str, save_mode: merlin.batch.sink.SaveMode = <SaveMode.ERRORIFEXISTS: 0>, options: MutableMapping[str, str] = None)[source]

Bases: merlin.batch.sink.Sink

Sink contract for BigQuery to create prediction job

__init__(table: str, staging_bucket: str, result_column: str, save_mode: merlin.batch.sink.SaveMode = <SaveMode.ERRORIFEXISTS: 0>, options: MutableMapping[str, str] = None)[source]
Parameters:
  • table – table id of destination BQ table in format gcp-project.dataset.table_name
  • staging_bucket – temporary GCS bucket for staging write into BQ table
  • result_column – column name that will be used to store prediction result.
  • save_mode – save mode. Default to SaveMode.ERRORIFEXISTS. Which will fail if destination table already exists
  • options – additional sink option to configure the prediction job.
options
result_column
save_mode
staging_bucket
table
to_dict() → Mapping[str, Any][source]
class merlin.batch.sink.SaveMode[source]

Bases: enum.Enum

An enumeration.

APPEND = 2
ERROR = 4
ERRORIFEXISTS = 0
IGNORE = 3
OVERWRITE = 1
class merlin.batch.sink.Sink[source]

Bases: abc.ABC

to_dict() → Mapping[str, Any][source]

merlin.batch.source module

class merlin.batch.source.BigQuerySource(table: str, features: Iterable[str], options: MutableMapping[str, str] = None)[source]

Bases: merlin.batch.source.Source

Source contract for BigQuery to create prediction job

__init__(table: str, features: Iterable[str], options: MutableMapping[str, str] = None)[source]
Parameters:
  • table – table id if the source in format of gcp-project.dataset.table_name
  • features – list of features to be used for prediction, it has to match the column name in the source table.
  • options – additional option to configure source.
features
options
table
to_dict() → Mapping[str, Any][source]
class merlin.batch.source.Source[source]

Bases: abc.ABC

to_dict() → Mapping[str, Any][source]