Distributed batch inference with Hugging Face on Amazon Sagemaker

Use SageMaker Processing Jobs to easily run inference on your large Dataset with Hugging Face’s Transformer Models

Ratul Ghosh
6 min readNov 15, 2022
Photo by Alex Kulikov on Unsplash

This blog will give you a complete walk through of running a distributed batch inference on large data in production. We’ll be using Amazon Sagemaker, a fully managed machine learning service. With Amazon SageMaker, data scientists and developers can quickly build and train machine learning models and then deploy them into a production-ready hosted environment.

Let’s say we have petabytes of data and we want to use a pretrained model for batch inference. A Sagemaker processing job is one of the easiest ways since it focuses on abstracting away the infrastructure needed. It also requires a minimum change in our existing code. In this tutorial, we will be using a pretrained model from Huggin Face for calculating the semantic similarity between a pair of sentences.

This blog post will cover the following topics:

  • Modifying the code for the SageMaker Processing job
  • Building a docker container with the code and dependencies and pushing it to an Amazon Elastic Container Registry (Amazon ECR) repository
  • Starting the processing job with the custom docker image

Modifying the code

Creating a processing job requires specifying an Amazon Simple Storage Service (Amazon S3) URI to download data from and a path in your docker container to download the data. The path in the processing container must begin with /opt/ml/processing/. More on this is discussed later.

Note: /opt/ml and all its subdirectories are reserved by SageMaker. When building your Processing Docker image, don't place any data required by your container in these directories.

We need to modify our script to read data from this path /opt/ml/processing/

In this example, we are using data coming from an upstream data pipeline which breaks the data into multiple parquet files and stores them in an S3 bucket. If there are multiple GPUs on the selected instance we will use each GPU for inference on each file in parallel. Here we are using a pretrained sentence transformer from Hugging Face(more info here) but we can use any model by simply modifying the script below.

The example script reads files from the input directory, adds a new column with cosine similarity from the sentence embeddings, and saves it in the output directory. The arguments can be overwritten from the processing job as shown later.

import os
import argparse
import logging
import numpy as np
import pandas as pd
from numpy.linalg import norm
import torch.multiprocessing as mp
from sentence_transformers import SentenceTransformer

# set up logger
logging.basicConfig(format="%(asctime)s:%(levelname)s:%(name)s:%(message)s")
logger = logging.getLogger("HuggingFace")
logger.propagate = True
logger.setLevel(logging.INFO)


def get_model_output(cfg, model, df, device_idx):

# sanity check
text1 = df["text1"].values
text2 = df["text2"].values

# Create the trainer for inference.
logger.info("Predicting model")
# Compute embedding for both lists
embeddings1 = model.encode(
text1,
batch_size=cfg.batch_size,
convert_to_numpy=True,
device=device_idx,
)
embeddings2 = model.encode(
text2,
batch_size=cfg.batch_size,
convert_to_numpy=True,
device=device_idx,
)
df["semanticScore"] = np.hstack(
[
(emb1 @ emb2.T) / (norm(emb1) * norm(emb2))
for emb1, emb2 in zip(embeddings1, embeddings2)
]
)
return df


def do_infer(cfg, device_idx):

# Load the model.
logger.info("Loading model")
model = SentenceTransformer(cfg.model, device=device_idx)

# Init file path that this process needs to process.
if os.path.isdir(cfg.input_dir):
filepath_list = [
filepath
for filepath in os.listdir(cfg.input_dir)
if filepath.endswith(".parquet")
]
filepath_list = sorted(filepath_list)
filepath_list = [
filepath_list[idx]
for idx in range(len(filepath_list))
if idx % cfg.num_gpu == device_idx
]
else:
filepath_list = (
[cfg.input_dir] if device_idx == 0 else []
) # only use the first GPU.

# Get the input and output filepath.
input_list = [os.path.join(cfg.input_dir, filepath) for filepath in filepath_list]
output_list = [os.path.join(cfg.output_dir, filepath) for filepath in filepath_list]
logger.info(f"Device {device_idx} input: {input_list}")
logger.info(f"Device {device_idx} output: {output_list}")

# Start processing.
for in_filename, out_filename in zip(input_list, output_list):
# Read the dataset.
logger.info("Reading file {}".format(in_filename))
prod_df = pd.read_parquet(in_filename)

# Run the inference.
output_df = get_model_output(cfg, model, prod_df, device_idx)

# Save the results
logger.info(f"Writing the result to {out_filename}")
output_df.to_parquet(out_filename)
logger.info(f"Done writing the result to {out_filename}")


def do_multiprocessing_infer(cfg):

processes_list = [
mp.Process(target=do_infer, args=(cfg, device_idx))
for device_idx in range(cfg.num_gpu)
]

for process in processes_list:
process.start()

for process in processes_list:
process.join()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--num_gpu", type=int, default=4)
parser.add_argument("--batch_size", type=int, default=128)
parser.add_argument(
"--input_dir", type=str, default="/opt/ml/processing/data/input"
)
parser.add_argument(
"--output_dir", type=str, default="/opt/ml/processing/data/output"
)
parser.add_argument(
"--model",
type=str,
default="sentence-transformers/distiluse-base-multilingual-cased-v2",
)
args = parser.parse_args()
do_multiprocessing_infer(args)

Building a docker image and pushing it to Amazon ECR

Once we are done with the code, we need to build a docker container. Below is a sample DOCKER file. In this example, all our code is inside the src directory


FROM python:3.8

RUN apt-get -y update && apt-get install -y --no-install-recommends \
wget \
python3 \
&& rm -rf /var/lib/apt/lists/*

RUN wget https://bootstrap.pypa.io/get-pip.py && python3 get-pip.py && \
rm -rf /root/.cache

COPY requirements.txt /opt/program/
COPY src/ /opt/program/src/
WORKDIR /opt/program

RUN pip install -r requirements.txt


# Set some environment variables. PYTHONUNBUFFERED keeps Python from buffering our standard
# output stream, which means that logs can be delivered to the user quickly. PYTHONDONTWRITEBYTECODE
# keeps Python from writing the .pyc files which are unnecessary in this case. We also update
# PATH so that the train and serve programs are found when the container is invoked.
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE

Once we have the DOCKER file we need to build it to create an image and tag that image before we push it to Amazon ECR. ECR is a container registry like Docker Hub where we can host container images.

The sample bash script will take care of all the AWS-related authentication, create a repository named sm-semantic-similarity, tag it and finally push it to the Amazon ECR repository. We’ll get an unique link like this “<ACCOUNT-ID>.dkr.ecr.us-east-1.amazonaws.com/sm-lexical-similarity:v1

# Name of algo -> ECR
algorithm_name=sm-semantic-similarity

account=$(aws sts get-caller-identity --query Account --output text)

# Region, defaults to us-east-1
region=$(aws configure get region)
region=${region:-us-east-1}

fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:v1"

# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${algorithm_name}" --region ${region}> /dev/null 2>&1

if [ $? -ne 0 ]
then
aws ecr create-repository --repository-name "${algorithm_name}" --region ${region}> /dev/null
fi

# Get the login command from ECR and execute it directly
aws ecr get-login-password --region ${region}|docker login --username AWS --password-stdin ${fullname}

# Build the docker image locally with the image name and then push it to ECR
# with the full name.

docker build -t ${algorithm_name} .
docker tag ${algorithm_name} ${fullname}
docker push ${fullname}

Running the SageMaker Processing job using the custom container

Amazon SageMaker copies the data from S3 and then pulls the container. Cluster resources are provisioned for the duration of the job, and cleaned up when it completes. The output of the Processing job is stored in the S3 bucket specified in the arguments.

The Processor class requires the following parameters:

  • role: An AWS IAM role name or ARN
  • image_uri: The unique link of the Docker image
  • instance_count: The number of instances to run a processing job with.
  • instance_type: The type of EC2 instance to use for processing
  • entrypoint: List of strings that make a command for the entrypoint. Here we can pass the num_gpus parameter based on the instance_type. For example ml.p3.16xlarge has 8 GPUs
  • volume_size_in_gb: Size in GB to use for storing data during the processing job

When running the processing job we need to provide the list of input files as ProcessingInput objects and a list of ProcessingOutput objects as outputs. Notice the parameter s3_data_distribution_type that can be either "FullyReplicated" or "ShardedByS3Key", where FullyReplicated will make a copy of the given dataset available in every instance, and ShardedByS3Key will copy [# of data files]/[# of instances] pieces of the data to every instance.
Refer here for more information.

import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.processing import Processor
from sagemaker.processing import ProcessingInput, ProcessingOutput

region = boto3.session.Session().region_name
role = get_execution_role()

ecr_image = '<ACCOUNT-ID>.dkr.ecr.us-east-1.amazonaws.com/sm-lexical-similarity:v1'

huggingface_processor = Processor(role=role,
entrypoint=['python', '-u', 'src/infer.py', '--num_gpus=8'],
image_uri=ecr_image,
instance_type='ml.p3.16xlarge',
instance_count=50,
volume_size_in_gb=600,
base_job_name = 'preprocess-semantic'
)

huggingface_processor.run(
inputs=[
ProcessingInput(
source='<s3_uri or local path>',
s3_data_distribution_type='ShardedByS3Key',
destination='/opt/ml/processing/data/input')
],

outputs=[
ProcessingOutput(
source='/opt/ml/processing/data/output/',
destination='<s3_uri>,
s3_upload_mode='Continuous'
)
]

)

Conclusion

In this blog, we have covered an end-to-end distributed processing job using a pretrained transformer model from Hugging Face. We leveraged Amazon SageMaker to abstract away the provision of resources. We learned how to package our code, create docker containers, and upload it to Amazon ECR. Here is the official Amazon SageMaker Processing Class Documentation to learn more.

I hope you like this tutorial and find it useful. If you have any thoughts, comments, or questions, please leave a comment below or contact me on LinkedIn. Happy reading 🙂

--

--

Ratul Ghosh

Applied Scientist. Working on search, recommendation, advertisement and MLOps. I don’t represent my employer.