Implementing Many Models at Scale with Spark 3.x (Databricks/Synapse) and Azure ML

James Nguyen
7 min readJun 23, 2021

--

There are many scenarios where we need to build and run a large number of machine learning models. For examples: in Retail where a separate revenue forecast model is needed for each store and brand, in Supply Chain where Inventory Optimization is done for each warehouse and product, in Restaurant where demand forecasting models are needed across thousands of restaurants etc. This pattern is commonly referred to as Many Models. While Azure ML Platform team has published a popular accelerator using Azure Parallel Run Step (PRS) and AutoML, I’d like to expand it further with additional options to simplify the implementation and address more business technology scenarios such as option of using Spark in Databricks and Synapse or with AML PRS but with tabular data instead of file dataset.

This post comes with examples in my github at james-tn/Many-Models: Implement many models for ML in Azure (github.com)

Option 1: Implementing Many Models using Spark 3.x in Azure Synapse Spark or Azure Databricks

Spark is very powerful for complex big data transformation. Many customers who need Many Models probably have Spark applications in place. The ability to split or group a large dataset into/by multiple partitions for parallel processing is a valuable feature. This capability in Spark versions earlier than 3.0 however was very limited with Spark APIs for generic transformation. However, since 3.0, Spark has added much better support for Python & Pandas and with ability in Azure cloud to install Python & ML packages, Spark can be very powerful option to implement Many Models. I detailed this in another post Leveraging Spark for Large Scale Deep Learning Data Preparation and Inference | by James-Giang Nguyen | Analytics Vidhya | Medium

Spark’s Pandas UDFs: Parallelizing Python Computation

The advantage of using Spark for Many Models is Spark can provide flexible splitting or grouping logics to group data by entities (such as combination of product-store, location-product…) for Many Models training/inferencing then can easily collect result in the form of Spark Data Frame for next steps.

To illustrate, let’s look at an example of using the Pandas Function API from the github’s example:

df = spark.sql("select to_timestamp(WeekStarting) WeekStarting, float(Quantity), Brand,Revenue, Store from OJ_Sales_Data")
df = df.repartition(200) #to increase parallelism
result = df.groupby(["Brand","Store"]).applyInPandas(many_model_train, schema="Store string, Brand string, mse float, mape float, rmse float, model_name string ")

The above code snipset selects transaction data from a table, groups by Brand and Store then applies a custom ML training function (many_model_train). Inside the function, you can just assume you are given a pandas data frame with data from a unique combination of Brand and Store. Then you can just apply any ML training algorithm you like to train your model. Spark will then parallelize your logic so that you can run on multiple instances of Brand-Store in parallel.

Inside your training function, you can register your ML model to AML workspace or MLFlow easily

# 8.0 Save the pipeline and register model to AML
with open(model_name, mode='wb') as file:
cloudpickle.dump(forecaster, file)#
model = Model.register(workspace=ws, model_name=model_name, model_path=model_name, tags={'mse':str(mse), 'mape': str(mape), 'rmse': str(rmse)})
Many models registered in AML workspace

We can return the details of models produced by Spark by display the result data frame

Result table for models trained by Spark Pandas Function API

Likewise, for inferencing, the first step is to group/split data needed by scoring logic then apply a custom Pandas Function API (many_model_forecast)

df = spark.sql("select to_timestamp(WeekStarting) WeekStarting, float(Quantity), Brand,Revenue, Store from OJ_Sales_Data")
df = df.repartition(200) #to increase parallelism
prediction_result = df.groupby(["Brand","Store"]).applyInPandas(many_model_forecast, schema="WeekStarting date, Store string, Brand string, Prediction float")

Inside the many_model_forecast, a corresponding model is loaded from Azure ML or MLFlow model repo like this

model_name=data['Store'][0]+"_"+data['Brand'][0]
model = Model(ws, model_name)
model.download(exist_ok =True)
with open(model_name, 'rb') as f:
forecaster = cloudpickle.load(f)

We can preview the result of batch scoring with many models produced by Spark

Prediction Result by Spark

Option 2: Implementing Many Models using Azure ML Parallel Run Step

Azure Parallel Run Step (PRS) is a powerful option to run any sort of ML training and inferencing with Many Model Pattern. Given an input dataset (file or tabular), PRS can split data by number of files (for file dataset) or partition data by size or by a column value (tabular dataset) then apply your ML script which can be any ML algorithm on thousands of portions/partitions in parallel. Like other forms of AML training, you can specify your custom training environment with libraries dependencies or more advanced with custom docker environment for configurations beyond standard PyPi libraries and wide choices of CPU and GPUs machines.

The Many Models accelerator provides detail guideline with file dataset. This requires data to be in the form of multiple files with each file to be the data for a model (e.g. store-brand transaction data). Data in your environment may come from a centralized source (e.g. a big dataset for all products, stores…) so you would need to prepare data into such file dataset format which may not be convenient.

In this post and in my github, I provided an example with tabular dataset and uses a newer feature to partition data by column. This might be a more common scenario than file dataset. This is currently in preview but will be GA soon.

To start, we need to build a partitioned tabular dataset

partitioned_dataset = dataset.partition_by(partition_keys=['Store', 'Brand'], target=(datastore, "partition_by_key_res"), name="partitioned_oj_data")

The partition key is used by PRS to distribute data across multiple instances of PRS run. We specify this in the PRS run config.

parallel_run_train_config = ParallelRunConfig(
source_directory=scripts_folder,
entry_script=train_script_file, # the user script to run against each input
partition_keys=['Store', 'Brand'],
error_threshold=-1,
output_action='append_row',
append_row_file_name="training_output.txt",
environment=batch_env,
compute_target=compute_target,
node_count=2,
run_invocation_timeout=600
)

In the train script, you are given a mini batch which is an instance of the data partitions. Then you just need to implement your custom training logic just as you do for any Azure ML standard training. The PRS run step has two methods: the init to initialize resources provision across multiple invocation of the 2nd method which is run(mini_batch).

At the end of the run method, the trained model is registered just like how it was done in Spark method.

joblib.dump(forecaster, model_name)#model = Model.register(workspace=ws, model_name=model_name, model_path=model_name, tags={‘mse’:str(mse), ‘mape’: str(mape), ‘rmse’: str(rmse)})result =pd.DataFrame({‘Store’:[store],’Brand’:[brand], ‘mse’:[mse], ‘mape’: [mape], ‘rmse’: [rmse], ‘model_name’:[model_name]})
Many models regisered in AML by PRS

PRS helps you to consolidate outputs from individual runs and store the result in a datastore location. Then with a few lines of code, you can view the result

batch_run = pipeline_run.find_step_run(parallel_run_train_step.name)[0]batch_output = batch_run.get_output_data(output_dir.name)target_dir = tempfile.mkdtemp()batch_output.download(local_path=target_dir)result_file = os.path.join(target_dir, batch_output.path_on_datastore, parallel_run_train_config.append_row_file_name)df = pd.read_csv(result_file, delimiter=” “, header=None)df.columns = [ “Store”, “Brand”, “mse”, “mape”, “rmse”, “model_name”]print(“Train result has “, df.shape[0], “ rows”)df.head(10)
Result from AML PRS Many Model training

Just as easy, for batch scoring using multiple model, we can follow the same method. At the end, the result can be consolidated and displayed

Prediction result for Many Models using AML PRS

When to use what?

Reasons to use Synapse Spark/Databricks for Many Models

  • When your Many Model training pipeline has complex data transformation & grouping requirements before data can be used for ML training, that Spark itself is a natural fit. Then it can be a good extension to write Many Model step using Spark Pandas Function API.
  • When your ML training and scoring algorithms are straight forward common libraries such as Sklearn on tabular data. This doesn’t require you to deal with complex library installation, data dependencies, custom OS level configuration which Synapse or even Databricks users may have hard time to configure

Reasons to use Azure ML PRS and pipeline

  • When your data is already separated by entities that you models are supposed to trained for. For example, data for each model is already in a separate file or for tabular data, the partition column(s) already exist in the dataset. This means that you don’t need to use Spark for complex data transformation.
  • When you are doing types of ML that requires complex environment setup for example custom docker, downloading of data files and pretrained models… Azure ML environment and compute provide great flexibility to do all these. Computer Vision, NLP Deep Learning are examples for these scenarios.

--

--