BQ to BQ Batch Prediction Example : IRIS Classifier

Requirements

  • Authenticated to gcloud (gcloud auth application-default login)

This notebook demonstrate basic example of creationg a BQ to BQ batch prediction job in merlin.

The example is based on iris classifier problem where we want to classify different species of the Iris flower based on 4 features (sepal_length, sepal_width, petal_length, petal_width).

1. Train Model

First, let’s train an XGBoost classifier. We’ll use sklearn.datasets to train the model.

[ ]:
import xgboost as xgb
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
[ ]:
iris = load_iris()

Split dataset into train and test with ratio of 1:5

[ ]:
X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2)

Train the model using test dataset

[ ]:
model = xgb.XGBClassifier()
model.fit(X_train, y_train)

We’ll use F1 score to evaluate the model

[ ]:
pred_train = model.predict(X_train)
print(f"F1 score training: {f1_score(y_train, pred_train, average='micro')}")
[ ]:
pred_test = model.predict(X_test)
print(f"F1 score test: {f1_score(y_test, pred_test, average='micro')}")

The model perform good enough, so let’s use it for our prediction job. We will predict the dataset located at BQ table your-gcp-project.dataset.table and store the prediction result to your-gcp-project.dataset.result_table table

2. Wrap Model

To be able to run batch prediction job we’ll have to wrap the model inside a class implementing PyFuncV2Model abstract class. The class has 2 abstract method: initialize and infer:

  1. initialize is the entry point for initializing the model. Within this method you can do initialization step such as loading model from artifact. initialize will be called once during model initialization. The argument to initialize is a dictionary containing a key value pair of artifact name and its URL. The artifact’s keys are the same value as received by log_pyfunc_model.
  2. infer method is the prediction method of your model. infer accept pandas.DataFrame as the input and should return either np.ndarray, pd.Series, or pd.DataFrame of same length.

IMPORTANT

During batch prediction job execution, infer method will be called multiple times with different partition of the source data as the input. It is important that infer should avoid containing aggregation operation (e.g. mean, min, max) as the operation will only be applicable to the given partition, hence the result will be incorrect. If aggregation is required, it is recommeded to do it outside of the prediction job and store the result as a column in the source table.

First, we will serialize the previously trained model using joblib, so that we can upload it as an artifact to merlin.

[ ]:
import joblib
import os

MODEL_DIR = "model"
MODEL_FILE = "model.joblib"
MODEL_PATH = os.path.join(MODEL_DIR, MODEL_FILE)
MODEL_PATH_ARTIFACT_KEY = "model_path" # we will use it when calling log_pyfunc_model

joblib.dump(model, MODEL_PATH)

Next, we create IrisClassifierModel class extending PyFuncV2Model and implement the necessary methods: initialize and infer.

In the initialize method, we load the serialized from artifacts key MODEL_PATH_ARTIFACT_KEY using joblib. In the infer method, we directly call the model’s predict method

[ ]:
from merlin.model import PyFuncV2Model

class IrisClassifierModel(PyFuncV2Model):
    def initialize(self, artifacts: dict):
        self._model = joblib.load(artifacts[MODEL_PATH_ARTIFACT_KEY])

    def infer(self, model_input):
        return self._model.predict(model_input, validate_features=False)

Let’s test the model

[ ]:
model = IrisClassifierModel()
model.initialize({MODEL_PATH_ARTIFACT_KEY: MODEL_PATH})
[ ]:
pred_test = model.infer(X_test)
print(f"F1 score test: {f1_score(y_test, pred_test, average='micro')}")

3. Upload To Merlin

3.1 Initialization

[ ]:
import merlin
from merlin.model import ModelType

merlin.set_url("http://localhost:3000/api/merlin")

3.2 Set Active Project

project represent a project in real life. You may have multiple model within a project.

merlin.set_project(<project_name>) will set the active project into the name matched by argument. You can only set it to an existing project. If you would like to create a new project, please do so from the MLP console at http://localhost:3000/projects/create.

[ ]:
merlin.set_project("sample")

3.3 Set Active Model

model represents an abstract ML model. Conceptually, model in MLP is similar to a class in programming language. To instantiate a model you’ll have to create a model_version.

Each model has a type, currently model type supported by MLP are: sklearn, xgboost, tensorflow, pytorch, and user defined model (i.e. pyfunc model).

model_version represents a snapshot of particular model iteration. You’ll be able to attach information such as metrics and tag to a given model_version as well as deploy it as a model service.

merlin.set_model(<model_name>, <model_type>) will set the active model to the name given by parameter, if the model with given name is not found, a new model will be created.

Currently, batch prediction job is only supported by PYFUNC_V2 model type.

[ ]:
merlin.set_model("iris-batch", ModelType.PYFUNC_V2)

3.4 Create New Model Version And Upload

To deploy the model, we will have to create an iteration of the model (by creating a model_version), upload the serialized model to MLP, and then deploy.

To upload PyFunc model you have to provide following arguments: 1. model_instance is the instance of PyFunc model, the model has to extend merlin.PyFuncModel or merlin.PyFuncModelV2 2. conda_env is path to conda environment yaml file. The environment yaml file must contain all dependency required by the PyFunc model. 3. (Optional) artifacts is additional artifact that you want to include in the model 4. (Optional) code_path is a list of directory containing python code that will be loaded during model initialization, this is required when model_instance depend on local python package

[ ]:
# Create new version of the model
with merlin.new_model_version() as v:
    # Upload the serialized model to MLP
    merlin.log_pyfunc_model(model_instance=model,
                            conda_env="env.yaml",
                            artifacts={MODEL_PATH_ARTIFACT_KEY: MODEL_PATH})

You can check whether the model has been uploaded successfully by opening the model version’s mlflow url

[ ]:
v.mlflow_url

4. Create Batch Prediction Job

We will need to configure the data source, destination, and the job itself in order to create a prediction job

4.1 Configuring BQ Source

We can use merlin.batch.source.BigQuerySource class to configure the data source of the prediction job

[ ]:
from merlin.batch.source import BigQuerySource

There are 2 mandatory fields that must be specified in the source config: table and features.

  1. table: is BQ table id in the <gcp_project.dataset_name.table_name> format
  2. features: is the column names that will be used as features during prediction
[ ]:
bq_source = BigQuerySource(table="your-gcp-project.dataset.table",
                           features=["sepal_length",
                                     "sepal_width",
                                     "petal_length",
                                     "petal_width"])

4.2 Configuring BQ Sink

Next, we configure the destination of prediction job result using merlin.batch.sink.BigQuerySink class

[ ]:
from merlin.batch.sink import BigQuerySink

In BigQuerySink class, we can specify several parameters: 1. table (mandatory) is the destination table id in the <gcp_project.dataset_name.table_name> format 2. staging_bucket (mandatory) is the bucket name that will be used to store prediction job result temporarily before loading it to destination table 3. result_column (mandatory) is the column name that will be populated to contain the prediction result 4. save_mode (optional) is the write behavior, by default the value is SaveMode.ERRORIFEXISTS which will make the prediction job fail if the destination table already exists. Other possible value are: SaveMode.OVERWRITE, SaveMode.APPEND, and SaveMode.IGNORE

In our case, we will use SaveMode.OVERWRITE so that the destination table will be overwritten with the new value.

[ ]:
from merlin.batch.sink import SaveMode

bq_sink = BigQuerySink(table="your-gcp-project.dataset.result_table",
                       staging_bucket="your-bucket",
                       result_column="species",
                       save_mode=SaveMode.OVERWRITE)

4.3 Configuring Job

Batch prediction job can be configured using merlin.batch.config.PredictionJobConfig class. Following are the parameters that can be configured: 1. source (mandatory) is an instance of source configuration. Currently, it supports BigQuerySource 2. sink (mandatory) is an instance of sink configuration. Currently, it supports BigQuerySink 3. service_account_name (mandatory) is the secret name containing service account key for running the prediction job. The service account must have following privileges: - BigQuery user role (roles/bigquery.user) - BigQuery data editor role in the destination dataset (roles/bigQuery.dataEditor) - Bucket writer role in the staging_bucket (roles/storage.legacyBucketWriter) - Object Viewer role in the staging_bucket (roles/storage.objectViewer) 4. result_type (optional) is the type of prediction result, it will affect the column type of the result_column in destination table. By default the type is ResultType.DOUBLE 5. item_type (optional) item type of the prediction result if the result_type is ResultType.ARRAY. 6. resource_request (optional) is the resource request to run the batch prediction job. We can pass an instance of merlin.batch.config.PredictionJobResourceRequest to configure it. By default, the prediction job will use environment’s default configuration. 7. env_vars (optional) is the environment variables associated with the batch prediction job. We can pass a dictionary of environment variables e.g. env_vars={"ALPHA":"0.2"}

We are going to use previously configured bq_source and bq_sink to define the source and destination table of the prediction job. Additionally, we’ll use "batch-service-account@your-gcp-project.iam.gserviceaccount.com" service account to run the job. The service account has been granted the all the privileges needed to run the prediction job.

[ ]:
from merlin.batch.config import PredictionJobConfig

job_config = PredictionJobConfig(source=bq_source,
                                 sink=bq_sink,
                                 service_account_name="[email protected]")

4.4 Start Batch Prediction Job

Prediction job can be started by invoking create_prediction_job method of a model version and passing in the PredictionJobConfig instance. By default, the job will be run synchronously and once the job finishes running, a job object will be returned. To run the job asynchronously, you can pass in optional argument sync=False. It will return a prediction job object that will run in the background.

[ ]:
job = v.create_prediction_job(job_config=job_config, sync=False)

If you want to stop a running job, you can invoke the stop method of the job. Note that you can only stop a prediction job from the sdk if sync is set to False. You can update the status of the job by calling the refresh method which returns an updated version of the prediction job.

[ ]:
job = job.refresh()

Once, the prediction job has been completed we can check the result in destination table

[ ]:
from google.cloud import bigquery
[ ]:
client = bigquery.Client()
query_job = client.query("""
    SELECT
      *
    FROM
      `your-gcp-project.dataset.result_table`
    LIMIT
      100""")

results = query_job.result()
results.to_dataframe().head()