BQ to BQ Batch Prediction Example: Predicting New York Taxi Trip Fare¶
Requirements¶
- Authenticated to gcloud (
gcloud auth application-default login
)
This notebook demonstrate a more complex example of using batch prediction job in merlin. The example also demonstrate the scalability of merlin prediction job in processing a large amount of data (~150 Million rows). For basic introduction of batch prediction job in merlin you can read Batch Prediction Tutorial 1 - Iris Classifier
notebook
Problem Statement¶
The problem that we are trying to solve in this notebook is to predict the total taxi fare of a taxi trip in new york city given following data: 1. pickup_datetime 2. pickup_longitude 3. pickup_latitude 4. dropoff_longitude 5. dropoff_latitude 6. passenger_count
The data is available in BQ public dataset bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2015
. The table has 146,112,989 rows. We will train a model using a subset of data (50000 rows) and use the model to predict the whole table using merlin’s batch prediction.
1. Train Model¶
Download subset of the table for training model
[ ]:
from google.cloud import bigquery
import numpy as np
import pandas as pd
client = bigquery.Client()
query_job = client.query("""
SELECT
pickup_datetime,
pickup_longitude,
pickup_latitude,
dropoff_longitude,
dropoff_latitude,
passenger_count,
total_amount
FROM
`bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2015`
LIMIT
50000""")
results = query_job.result()
df = results.to_dataframe()
df.head()
[ ]:
df.describe()
Clean the data to remove trip with: - 0 passenger_count - 0 latitude/longitude - Negative total_amount - Outside New York
[ ]:
df = df.replace(0, np.nan).dropna()[df["total_amount"] > 0.0]
df.describe()
[ ]:
def select_within_boundingbox(df, BB):
"""
https://www.kaggle.com/breemen/nyc-taxi-fare-data-exploration
"""
return df[(df.pickup_longitude >= BB[0]) & (df.pickup_longitude <= BB[1]) & \
(df.pickup_latitude >= BB[2]) & (df.pickup_latitude <= BB[3]) & \
(df.dropoff_longitude >= BB[0]) & (df.dropoff_longitude <= BB[1]) & \
(df.dropoff_latitude >= BB[2]) & (df.dropoff_latitude <= BB[3])]
# load image of NYC map
BB = (-74.5, -72.8, 40.5, 41.8)
df = select_within_boundingbox(df, BB)
df.describe()
Prepare dataset for training and testing.
[ ]:
features = [
"pickup_datetime",
"pickup_longitude",
"pickup_latitude",
"dropoff_longitude",
"dropoff_latitude",
"passenger_count"
]
label = "total_amount"
X = df[features]
y = df[label]
We will add transformation to: 1. Process pickup_datetime
into 3 additional features: month
, day_of_month
, day_of_week
and hour
2. Process the location features into distance features: distance_haversine
and distance_manhattan
[ ]:
def process_pickup_datetime(df):
df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
df['month'] = df['pickup_datetime'].dt.month
df['day_of_month'] = df['pickup_datetime'].dt.day
df['hour'] = df['pickup_datetime'].dt.hour
df['day_of_week'] = df['pickup_datetime'].dt.dayofweek
def haversine_distance(lat1, lng1, lat2, lng2):
lat1, lng1, lat2, lng2 = map(np.radians, (lat1, lng1, lat2, lng2))
AVG_EARTH_RADIUS = 6371 # in km
lat = lat2 - lat1
lng = lng2 - lng1
d = np.sin(lat * 0.5) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(lng * 0.5) ** 2
h = 2 * AVG_EARTH_RADIUS * np.arcsin(np.sqrt(d))
return h
def manhattan_distance(lat1, lng1, lat2, lng2):
a = haversine_distance(lat1, lng1, lat1, lng2)
b = haversine_distance(lat1, lng1, lat2, lng1)
return a + b
def transform(df):
process_pickup_datetime(df)
df["distance_haversine"] = haversine_distance(
df['pickup_latitude'].values,
df['pickup_longitude'].values,
df['dropoff_latitude'].values,
df['dropoff_longitude'].values)
df["distance_manhattan"] = manhattan_distance(
df['pickup_latitude'].values,
df['pickup_longitude'].values,
df['dropoff_latitude'].values,
df['dropoff_longitude'].values)
return df.drop(columns=['pickup_datetime'], axis=1)
[ ]:
X_trans = transform(X)
[ ]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X_trans, y, test_size=0.2)
Train an xgboost linear regressor with the training dataset and use RMSE to measure the performance.
[ ]:
import xgboost as xgb
import math
from sklearn.metrics import mean_squared_error, mean_absolute_error
model = xgb.XGBRegressor(max_depth=10)
model.fit(X_train, y_train)
pred_train = model.predict(X_train)
print(f"Training RMSE: {math.sqrt(mean_squared_error(y_train, pred_train))}")
print(f"Training MAE: {mean_absolute_error(y_train, pred_train)}")
pred_test = model.predict(X_test)
print(f"Test RMSE: {math.sqrt(mean_squared_error(y_test, pred_test))}")
print(f"Test MAE: {mean_absolute_error(y_test, pred_test)}")
The model perform good enough, so let’s use it predict the whole table (bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2015
) and store the result to your-gcp-project.dataset.ny_taxi_prediction
table
2. Wrap Model¶
To be able to run batch prediction job we’ll have to wrap the model inside a class implementing PyFuncV2Model
abstract class. The class has 2 abstract method: initialize
and infer
:
initialize
is the entry point for initializing the model. Within this method you can do initialization step such as loading model from artifact.initialize
will be called once during model initialization. The argument to initialize is a dictionary containing a key value pair of artifact name and its URL. The artifact’s keys are the same value as received bylog_pyfunc_model
.infer
method is the prediction method of your model.infer
acceptpandas.DataFrame
as the input and should return eithernp.ndarray
,pd.Series
, orpd.DataFrame
of same length.
IMPORTANT¶
During batch prediction job execution, infer
method will be called multiple times with different partition of the source data as the input. It is important that infer
should avoid containing aggregation operation (e.g. mean, min, max) as the operation will only be applicable to the given partition, hence the result will be incorrect. If aggregation is required, it is recommeded to do it outside of the prediction job and store the result as a column in the source table.
First, we will serialize the previously trained model using joblib, so that we can upload it as an artifact to merlin.
[ ]:
import joblib
import os
MODEL_DIR = "model"
MODEL_FILE = "nyc-model.joblib"
MODEL_PATH = os.path.join(MODEL_DIR, MODEL_FILE)
MODEL_PATH_ARTIFACT_KEY = "model_path" # we will use it when calling log_pyfunc_model
joblib.dump(model, MODEL_PATH)
Next, we create NYTaxiFareModel
class extending PyFuncV2Model
and implement the necessary methods: initialize
and infer
.
In the initialize
method, we load the serialized from artifacts
key MODEL_PATH_ARTIFACT_KEY
using joblib.
In the infer
method, we will apply tranformation to the table similarly as when we train the model (see: transform
method above).
[ ]:
from merlin.model import PyFuncV2Model
import joblib
import os
class NYTaxiFareModel(PyFuncV2Model):
def initialize(self, artifacts):
self.model = joblib.load(artifacts[MODEL_PATH_ARTIFACT_KEY])
def infer(self, df_predict):
df = transform(df_predict)
return self.model.predict(df)
[ ]:
m = NYTaxiFareModel()
[ ]:
m.initialize({MODEL_PATH_ARTIFACT_KEY: MODEL_PATH})
[ ]:
pred = m.infer(X)
print(f"RMSE: {math.sqrt(mean_squared_error(y, pred))}")
print(f"MAE: {mean_absolute_error(y, pred)}")
3. Upload To Merlin¶
3.1 Initialization¶
[ ]:
import merlin
from merlin.model import ModelType
merlin.set_url("http://localhost:3000/api/merlin")
3.2 Set Active Project¶
project
represent a project in real life. You may have multiple model within a project.
merlin.set_project(<project_name>)
will set the active project into the name matched by argument. You can only set it to an existing project. If you would like to create a new project, please do so from the MLP console at http://localhost:3000/projects/create.
[ ]:
merlin.set_project("sample")
3.3 Set Active Model¶
model
represents an abstract ML model. Conceptually, model
in MLP is similar to a class in programming language. To instantiate a model
you’ll have to create a model_version
.
Each model
has a type, currently model type supported by MLP are: sklearn, xgboost, tensorflow, pytorch, and user defined model (i.e. pyfunc model).
model_version
represents a snapshot of particular model
iteration. You’ll be able to attach information such as metrics and tag to a given model_version
as well as deploy it as a model service.
merlin.set_model(<model_name>, <model_type>)
will set the active model to the name given by parameter, if the model with given name is not found, a new model will be created.
Currently, batch prediction job is only supported by PYFUNC_V2
model type.
[ ]:
merlin.set_model("nyc-batch", ModelType.PYFUNC_V2)
3.4 Create New Model Version And Upload¶
To deploy the model, we will have to create an iteration of the model (by creating a model_version
), upload the serialized model to MLP, and then deploy.
To upload PyFunc model you have to provide following arguments: 1. model_instance
is the instance of PyFunc model, the model has to extend merlin.PyFuncModel
or merlin.PyFuncModelV2
2. conda_env
is path to conda environment yaml file. The environment yaml file must contain all dependency required by the PyFunc model. 3. (Optional) artifacts
is additional artifact that you want to include in the model 4. (Optional) code_path
is a list of directory containing python code
that will be loaded during model initialization, this is required when model_instance
depend on local python package
[ ]:
# Create new version of the model
with merlin.new_model_version() as v:
# Upload the serialized model to MLP
merlin.log_pyfunc_model(model_instance=m,
conda_env="env.yaml",
artifacts={MODEL_PATH_ARTIFACT_KEY: MODEL_PATH})
You can check whether the model has been uploaded successfully by opening the model version’s mlflow url
[ ]:
v.mlflow_url
4. Create Batch Prediction Job¶
The batch prediction job will use bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2015
as data source and “pickup_datetime”, “pickup_longitude”, “pickup_latitude”, “dropoff_longitude”, “dropoff_latitude”, “passenger_count” as features. The prediction result will be stored in your-gcp-project.dataset.ny_taxi_prediction
table under total_fare
column.
Since the data size is quite large, we will not use default resource request and instead specify the request using PredictionJobResourceRequest
instance.
[ ]:
from merlin.batch.source import BigQuerySource
from merlin.batch.sink import BigQuerySink
from merlin.batch.sink import SaveMode
from merlin.batch.config import PredictionJobConfig, PredictionJobResourceRequest
bq_source = BigQuerySource("bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2015",
features=[ "pickup_datetime",
"pickup_longitude",
"pickup_latitude",
"dropoff_longitude",
"dropoff_latitude",
"passenger_count"])
bq_sink = BigQuerySink("your-gcp-project.dataset.ny_taxi_prediction",
staging_bucket="your-bucket",
result_column="total_fare",
save_mode=SaveMode.OVERWRITE)
job_config = PredictionJobConfig(source=bq_source,
sink=bq_sink,
service_account_name="[email protected]",
resource_request=PredictionJobResourceRequest(driver_cpu_request="1",
driver_memory_request="1Gi",
executor_cpu_request="2",
executor_memory_request="2Gi",
executor_replica=6))
job = v.create_prediction_job(job_config=job_config)
Once, the prediction job has been completed we can check the result in destination table
[ ]:
query_job = client.query("""
SELECT
*
FROM
`your-gcp-project.dataset.ny_taxi_prediction`
LIMIT
100""")
results = query_job.result()
df = results.to_dataframe()
df.head()
[ ]: