Tutorial
11 min read

Deep Learning with Azure: PyTorch distributed training done right in Kedro

At GetInData we use the Kedro framework as the core building block of our MLOps solutions as it structures ML projects well, providing great abstraction for nodes, pipelines, datasets and configuration. This creates easy to maintain and composable code for data processing, training and evaluating models, while not limiting the machine learning frameworks Data Scientists can use. We also provide workshops for Data Scientists to get them up-to-speed with making Kedro work for them to solve real problems and provide value for our clients. One of the recent use cases we encountered that was not supported out-of-the-box by Kedro was the distributed training scenario. Data Scientists love neural networks, but training them at scale usually requires more compute power than is available on a single machine (even with GPU). Larger networks such as BERT, GPT-2, ViT or even top variants of EfficientNet and ResNet combined with large datasets take a long time to train, which results in both unhappy Data Scientists and a longer experiment feedback loop, which is discouraging for business.

Combining the worlds of neural networks, Kedro and distributed training can be quite a challenge and requires a lot of manual configuration. We knew this, so we decided to make yet another contribution to the Kedro community. In this blog post we will demonstrate how to use PyTorch with Kedro to train neural networks and then easily scale-out the training using distributed computing, thanks to our recently-released new feature for the Kedro AzureML plugin.

Kedro + PyTorch + Azure ML setup

We will use the same scenario used in Kedro AzureML’s quickstart (kedro-azureml >= 0.2.2 is required here) with modifications for neural network training with PyTorch. For that purpose, we added PyTorch Lightning as an additional dependency to the project. We’re skipping the initial setup covered by the docs in this blogpost and moving on to the important parts. We encourage you to go through the plugin’s quickstart first.

Please note that model saving and future productionisation of the trained model are not included in the scope of this blog post or related code.

Add the PyTorch regression model

We remove the “candidate modeling pipeline” from the data science pipeline of the Spaceflights starter and modify the pipeline itself by replacing the original train_model node with our train_pytorch_model node. Besides the dataset, our node also accepts a number of neural network specific parameters, such as: number of epochs, batch size, learning rate and most importantly, the number of nodes to use for distributed training. We use a simple multi-layer neural network made out of linear layers with Leaky ReLU activation plus batch normalization in the input layer. As the Spaceflights project has a regression target, we use Smooth L1 loss to train our model. We pass all of the required parameters to PyTorch Lightning’s Trainer class and call fit to train the model.

def train_model_pytorch(
    X_train: pd.DataFrame,
    y_train: pd.Series,
    num_nodes: int,
    max_epochs: int,
    learning_rate: float,
    batch_size: int,
):
    class SimpleNetwork(pl.LightningModule):
        def __init__(self, n_features: int, lr: float) -> None:
            super().__init__()
            self.lr = lr
            self.normalize = nn.BatchNorm1d(n_features)
            internal_features = 1024
            hidden_layer_size = 128
            depth = 10
            self.layers = nn.Sequential(
                nn.Linear(n_features, internal_features),
                nn.Sequential(
                    nn.Linear(internal_features, hidden_layer_size),
                    nn.LeakyReLU(),
                    *sum(
                        [
                            [
                                nn.Linear(hidden_layer_size, hidden_layer_size),
                                nn.LeakyReLU(),
                            ]
                            for _ in range(depth)
                        ],
                        [],
                    ),
                ),
                nn.Linear(hidden_layer_size, 1, bias=False),
            )

        def forward(self, x):
            normalized = self.normalize(x)
            outputs = self.layers(normalized)
            return outputs.squeeze()

        def training_step(self, batch, batch_idx):
            x, y = batch
            outputs = self.forward(x)
            loss = F.smooth_l1_loss(outputs, y)
            return loss

        def predict_step(
            self, batch: Any, batch_idx: int, dataloader_idx: int = 0
        ) -> Any:
            return self.forward(batch[0])

        def configure_optimizers(self):
            return Adagrad(self.parameters(), lr=self.lr)

    epochs = max_epochs
    data = create_dataloader(X_train.astype("float"), y_train, batch_size=batch_size)
    model = SimpleNetwork(X_train.shape[1], learning_rate)

    trainer = pl.Trainer(
        max_epochs=epochs,
        logger=True,
        callbacks=[TQDMProgressBar(refresh_rate=20)],
        num_nodes=num_nodes,
    )

    trainer.fit(model, train_dataloaders=data)
    return model

The Spaceflights starter uses the pandas-based dataset and we need to transform it into the PyTorch DataSet (and later DataLoader) like this:

def create_dataloader(x: pd.DataFrame, y: pd.Series=None, predict=False, batch_size=256):
    data = [torch.from_numpy(x.values).float()]
    if y is not None:
        data.append(torch.from_numpy(y.values).float())
    return DataLoader(TensorDataset(*data), shuffle=not predict, batch_size=batch_size)

Once the model is trained, we then need to modify the evaluation code to use a PyTorch-based model instead of a Scikit-Learn one.

def evaluate_model(model: pl.LightningModule, X_test: pd.DataFrame, y_test: pd.Series):
    """Calculates and logs the coefficient of determination.

    Args:
        model: Trained model.
        X_test: Testing data of independent features.
        y_test: Testing data for price.
    """

    with torch.no_grad():
        trainer = pl.Trainer()
        dataloader = create_dataloader(X_test.astype("float"), predict=True)
        y_pred = trainer.predict(model, dataloaders=dataloader)
        y_pred = pd.Series(
            index=y_test.index, data=torch.cat(y_pred).reshape(-1).numpy()
        )

    r2 = r2_score(y_test, y_pred)
    mae = mean_absolute_error(y_test, y_pred)
    logger = logging.getLogger(__name__)
    logger.info("Model has a coefficient R^2 of %.3f on test data.", r2)
    logger.info("Model has MAE of %.3f on test data.", mae)

Once these changes are made, the last thing to do is to modify the node’s usage in the pipeline definition - we need to pass all of the neural network params we want to use. Remember to add them in the conf/base/parameters/data_science.yml file too.

node(
                func=train_model_pytorch,
                inputs=[
                    "X_train",
                    "y_train",
                    "params:model_options.num_nodes",
                    "params:model_options.epochs",
                    "params:model_options.learning_rate",
                    "params:model_options.batch_size",
                ],
                outputs="regressor",
                name="train_model_node",
            ),

important note when distributed training is used with kedro dataset synchronization problems might occur  when the datasets generated by the distributed node are not explicitly defined in the data catalog our plugi 2

Contents of data_science.yml in parameters:

data_science:
  active_modelling_pipeline:
    model_options:
      epochs: 10
      learning_rate: 1.0e-3
      batch_size: 256
      num_nodes: 1
      test_size: 0.2
      random_state: 666
      features:
        - engines
        - passenger_capacity
        - crew
        - d_check_complete
        - moon_clearance_complete
        - iata_approved
        - company_rating
        - review_scores_rating

Testing the code

Having the pipeline prepared, we can now run the code locally to test out the overall pipeline correctness. For the local run, we set both the number of epochs and the number of nodes to 1.

 kedro run --params data_science.active_modelling_pipeline.model_options.epochs:1 --params data_science.active_modelling_pipeline.model_options.num_nodes:1

Local kedro run should finish with a success and similar log messages:

[10/21/22 16:22:53] INFO     Model has a coefficient R^2 of 0.465 on test data.
                    INFO     Model has MAE of 768.543 on test data.
                    INFO     Completed 6 out of 6 tasks 
                    INFO     Pipeline execution completed successfully.

Running Kedro Pipeline on Azure Machine Learning Pipelines

We can now test out our pipeline in the cloud and launch it on Azure ML Pipelines with the help of our open source kedro-azureml plugin. Before running the pipeline, we need to build a docker image for it. This process is exactly the same as the one described in the plugin’s quickstart -  just use kedro-docker and modify dockerignore to include the data/01_raw folder within the image. Build and push the image to Azure Container Registry (or another container registry of your choice - just make sure it’s accessible from the Azure ML Studio).

With the number of nodes initially set to 1 (no distributed training yet), we launch the job on Azure ML Pipelines like this:

kedro azureml run -s <your Azure subscription id>

If you encounter any issues with the setup, you can always refer to the step-by-step video guide we’ve prepared here.

Adding GPU support and launching distributed training

First, we need cuda support within the docker image, so we will use one of  PyTorch Lightning’s official images as a base one. Remember that docker images with both cuda and PyTorch pre-installed tend to be large - the one we used was 5.9GB compressed and 12.0GB uncompressed. After installing Kedro and the project requirements, the uncompressed size goes up to approximately 12.3GB.

Before running the training in a distributed manner, a compute cluster with GPUs needs to be created in the Azure ML Studio. For a good price-to-performance ratio, we use STANDARD_NC4AS_T4_V3 machines with T4 GPUs (1 per machine). We also recommend using the scale to 0 capabilities of Azure ML compute clusters, so that the machines will only run when there is a job (Kedro pipeline in this case) scheduled on them.

Once we have prepared all of the required components, it’s time to add the actual support for distributed training into our Kedro PyTorch training node. This is where our plugin comes into play - with a simple decorator, we “tell” the Kedro-AzureML plugin to use distributed training for this particular node and that’s all you need!

from kedro_azureml.distributed import distributed_job
from kedro_azureml.distributed.config import Framework

@distributed_job(Framework.PyTorch, num_nodes="params:model_options.num_nodes")
def train_model_pytorch(
    X_train: pd.DataFrame,
    y_train: pd.Series,
    num_nodes: int,
    max_epochs: int,
    learning_rate: float,
    batch_size: int,
):
# (...) rest of the code

The @distributed_job takes 2 parameters - the first one is the name of the underlying framework (we support all of the options provided by Azure: PyTorch, TensorFlow and MPI) to be used and the second one is the number of distributed nodes we want to utilize. Note that this parameter is dynamic and can be changed by a simple parameter override and native Kedro capabilities, so you don’t have to hard-code anything!

Now, we build the image (with cuda as the base one), push it and launch the distributed job.

 kedro azureml run -s <your Azure subscription id>  --params '{"data_science": {"active_modelling_pipeline": {"model_options":{ "num_nodes": 2}}}}'

If you want to increase/decrease the number of distributed nodes, you can use any param you want here, it’s up to you. The way you access the parameters for @distributed_job decorator is exactly the same as you do when you define the Kedro pipeline.

Running the distributed training job in Azure ML Pipelines should look like the screenshot below.

PyTorch-distributed-training-with-Kedro-in-Azure-ML-Pipelines
PyTorch distributed training with Kedro in Azure ML Pipelines

Note that within the train_model_node there will be 2 separate user logs, one for each of the nodes you use for training. The master node will be the one that saves all of the data and synchronizes the training process.

Summary

With our Kedro-AzureML plugin running, distributed with PyTorch/TensorFlow, training jobs should be a breeze. This also leverages one of the selling points of public cloud - the ability to quickly scale up when needed and zero costs when computational resources are out of use. This will keep the Data Scientists happy with relatively short pipeline execution time, and the finance department relieved by keeping the monthly bills down. We highly encourage you to try it out by yourself!

The whole project used in this blogpost is available as a reference on GitHub:

https://github.com/getindata/example-kedro-azureml-pytorch-distributed

If you encounter any issues with our plugin or if you have some feature requests, feel free to create an issue on the Kedro-AzureML plugin’s GitHub: https://github.com/getindata/kedro-azureml or on the official Kedro’s Slack.

___________

Did you like our post? If you want more, do not hesitate to download our free Ebook “MLOps: Power Up Machine Learning Process. Introduction to Vertex AI, Snowflake and dbt Cloud”.

MLOps
Kedro
Azure
AzureML
PyTorch
Deep Learning
Deep Learning with Azure
23 November 2022

Want more? Check our articles

data driven fast track 3 steps make you data driven company
Tech News

Data-driven fast-track: 3 steps to make your company more data-driven

Hardly anyone needs convincing that the more a data-driven company you are, the better. We all have examples of great tech companies in mind. The…

Read more
big data for e commerce

Big Data for E-commerce.

The year 2020 was full of challenges in many areas, and in many companies and organizations.  Often, it was necessary to introduce radical changes or…

Read more
bqmlobszar roboczy 1 4
Tutorial

A Step-by-Step Guide to Training a Machine Learning Model using BigQuery ML (BQML)

What is BigQuery ML? BQML empowers data analysts to create and execute ML models through existing SQL tools & skills. Thanks to that, data analysts…

Read more
blogobszar roboczy 1
Success Stories

How we built a Modern Data Platform in 4 months for Volt.io, a FinTech scale-up

Money transfers from one account to another within one second, wherever you are? Volt.io is building the world’s first global real-time payment…

Read more
logs analytics in cloud loki albert lewandowski getindata big data blog notext
Tutorial

Logs analytics at scale in the cloud with Loki

Logs can provide a lot of useful information about the environment and status of the application and should be part of our monitoring stack. We'll…

Read more
blogdzisobszar roboczy 1 4
Use-cases/Project

What drives your customer’s decisions? Find answers with Machine Learning Models! H&M’s Kaggle competition

Introduction We recently took part in the Kaggle H&M Personalized Fashion Recommendations competition where we were challenged to build a…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.

The administrator of your personal data is GetInData Sp. z o.o. Sp.k with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the  Terms & Conditions. For more information on personal data processing and your rights please see  Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy