Distributed batch inference with Hugging Face on Amazon Sagemaker
Use SageMaker Processing Jobs to easily run inference on your large Dataset with Hugging Face’s Transformer Models
This blog will give you a complete walk through of running a distributed batch inference on large data in production. We’ll be using Amazon Sagemaker, a fully managed machine learning service. With Amazon SageMaker, data scientists and developers can quickly build and train machine learning models and then deploy them into a production-ready hosted environment.
Let’s say we have petabytes of data and we want to use a pretrained model for batch inference. A Sagemaker processing job is one of the easiest ways since it focuses on abstracting away the infrastructure needed. It also requires a minimum change in our existing code. In this tutorial, we will be using a pretrained model from Huggin Face for calculating the semantic similarity between a pair of sentences.
This blog post will cover the following topics:
- Modifying the code for the SageMaker Processing job
- Building a docker container with the code and dependencies and pushing it to an Amazon Elastic Container Registry (Amazon ECR) repository
- Starting the processing job with the custom docker image
Modifying the code
Creating a processing job requires specifying an Amazon Simple Storage Service (Amazon S3) URI to download data from and a path in your docker container to download the data. The path in the processing container must begin with /opt/ml/processing/
. More on this is discussed later.
Note:
/opt/ml
and all its subdirectories are reserved by SageMaker. When building your Processing Docker image, don't place any data required by your container in these directories.
We need to modify our script to read data from this path /opt/ml/processing/
In this example, we are using data coming from an upstream data pipeline which breaks the data into multiple parquet files and stores them in an S3 bucket. If there are multiple GPUs on the selected instance we will use each GPU for inference on each file in parallel. Here we are using a pretrained sentence transformer from Hugging Face(more info here) but we can use any model by simply modifying the script below.
The example script reads files from the input directory, adds a new column with cosine similarity from the sentence embeddings, and saves it in the output directory. The arguments can be overwritten from the processing job as shown later.
import os
import argparse
import logging
import numpy as np
import pandas as pd
from numpy.linalg import norm
import torch.multiprocessing as mp
from sentence_transformers import SentenceTransformer
# set up logger
logging.basicConfig(format="%(asctime)s:%(levelname)s:%(name)s:%(message)s")
logger = logging.getLogger("HuggingFace")
logger.propagate = True
logger.setLevel(logging.INFO)
def get_model_output(cfg, model, df, device_idx):
# sanity check
text1 = df["text1"].values
text2 = df["text2"].values
# Create the trainer for inference.
logger.info("Predicting model")
# Compute embedding for both lists
embeddings1 = model.encode(
text1,
batch_size=cfg.batch_size,
convert_to_numpy=True,
device=device_idx,
)
embeddings2 = model.encode(
text2,
batch_size=cfg.batch_size,
convert_to_numpy=True,
device=device_idx,
)
df["semanticScore"] = np.hstack(
[
(emb1 @ emb2.T) / (norm(emb1) * norm(emb2))
for emb1, emb2 in zip(embeddings1, embeddings2)
]
)
return df
def do_infer(cfg, device_idx):
# Load the model.
logger.info("Loading model")
model = SentenceTransformer(cfg.model, device=device_idx)
# Init file path that this process needs to process.
if os.path.isdir(cfg.input_dir):
filepath_list = [
filepath
for filepath in os.listdir(cfg.input_dir)
if filepath.endswith(".parquet")
]
filepath_list = sorted(filepath_list)
filepath_list = [
filepath_list[idx]
for idx in range(len(filepath_list))
if idx % cfg.num_gpu == device_idx
]
else:
filepath_list = (
[cfg.input_dir] if device_idx == 0 else []
) # only use the first GPU.
# Get the input and output filepath.
input_list = [os.path.join(cfg.input_dir, filepath) for filepath in filepath_list]
output_list = [os.path.join(cfg.output_dir, filepath) for filepath in filepath_list]
logger.info(f"Device {device_idx} input: {input_list}")
logger.info(f"Device {device_idx} output: {output_list}")
# Start processing.
for in_filename, out_filename in zip(input_list, output_list):
# Read the dataset.
logger.info("Reading file {}".format(in_filename))
prod_df = pd.read_parquet(in_filename)
# Run the inference.
output_df = get_model_output(cfg, model, prod_df, device_idx)
# Save the results
logger.info(f"Writing the result to {out_filename}")
output_df.to_parquet(out_filename)
logger.info(f"Done writing the result to {out_filename}")
def do_multiprocessing_infer(cfg):
processes_list = [
mp.Process(target=do_infer, args=(cfg, device_idx))
for device_idx in range(cfg.num_gpu)
]
for process in processes_list:
process.start()
for process in processes_list:
process.join()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--num_gpu", type=int, default=4)
parser.add_argument("--batch_size", type=int, default=128)
parser.add_argument(
"--input_dir", type=str, default="/opt/ml/processing/data/input"
)
parser.add_argument(
"--output_dir", type=str, default="/opt/ml/processing/data/output"
)
parser.add_argument(
"--model",
type=str,
default="sentence-transformers/distiluse-base-multilingual-cased-v2",
)
args = parser.parse_args()
do_multiprocessing_infer(args)
Building a docker image and pushing it to Amazon ECR
Once we are done with the code, we need to build a docker container. Below is a sample DOCKER file. In this example, all our code is inside the src directory
FROM python:3.8
RUN apt-get -y update && apt-get install -y --no-install-recommends \
wget \
python3 \
&& rm -rf /var/lib/apt/lists/*
RUN wget https://bootstrap.pypa.io/get-pip.py && python3 get-pip.py && \
rm -rf /root/.cache
COPY requirements.txt /opt/program/
COPY src/ /opt/program/src/
WORKDIR /opt/program
RUN pip install -r requirements.txt
# Set some environment variables. PYTHONUNBUFFERED keeps Python from buffering our standard
# output stream, which means that logs can be delivered to the user quickly. PYTHONDONTWRITEBYTECODE
# keeps Python from writing the .pyc files which are unnecessary in this case. We also update
# PATH so that the train and serve programs are found when the container is invoked.
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
Once we have the DOCKER file we need to build it to create an image and tag that image before we push it to Amazon ECR. ECR is a container registry like Docker Hub where we can host container images.
The sample bash script will take care of all the AWS-related authentication, create a repository named sm-semantic-similarity, tag it and finally push it to the Amazon ECR repository. We’ll get an unique link like this “<ACCOUNT-ID>.dkr.ecr.us-east-1.amazonaws.com/sm-lexical-similarity:v1”
# Name of algo -> ECR
algorithm_name=sm-semantic-similarity
account=$(aws sts get-caller-identity --query Account --output text)
# Region, defaults to us-east-1
region=$(aws configure get region)
region=${region:-us-east-1}
fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:v1"
# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${algorithm_name}" --region ${region}> /dev/null 2>&1
if [ $? -ne 0 ]
then
aws ecr create-repository --repository-name "${algorithm_name}" --region ${region}> /dev/null
fi
# Get the login command from ECR and execute it directly
aws ecr get-login-password --region ${region}|docker login --username AWS --password-stdin ${fullname}
# Build the docker image locally with the image name and then push it to ECR
# with the full name.
docker build -t ${algorithm_name} .
docker tag ${algorithm_name} ${fullname}
docker push ${fullname}
Running the SageMaker Processing job using the custom container
Amazon SageMaker copies the data from S3 and then pulls the container. Cluster resources are provisioned for the duration of the job, and cleaned up when it completes. The output of the Processing job is stored in the S3 bucket specified in the arguments.
The Processor
class requires the following parameters:
- role: An AWS IAM role name or ARN
- image_uri: The unique link of the Docker image
- instance_count: The number of instances to run a processing job with.
- instance_type: The type of EC2 instance to use for processing
- entrypoint: List of strings that make a command for the entrypoint. Here we can pass the
num_gpus
parameter based on the instance_type. For exampleml.p3.16xlarge
has 8 GPUs - volume_size_in_gb: Size in GB to use for storing data during the processing job
When running the processing job we need to provide the list of input files as ProcessingInput
objects and a list of ProcessingOutput
objects as outputs. Notice the parameter s3_data_distribution_type
that can be either "FullyReplicated" or "ShardedByS3Key", where FullyReplicated will make a copy of the given dataset available in every instance, and ShardedByS3Key will copy [# of data files]/[# of instances] pieces of the data to every instance.
Refer here for more information.
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.processing import Processor
from sagemaker.processing import ProcessingInput, ProcessingOutput
region = boto3.session.Session().region_name
role = get_execution_role()
ecr_image = '<ACCOUNT-ID>.dkr.ecr.us-east-1.amazonaws.com/sm-lexical-similarity:v1'
huggingface_processor = Processor(role=role,
entrypoint=['python', '-u', 'src/infer.py', '--num_gpus=8'],
image_uri=ecr_image,
instance_type='ml.p3.16xlarge',
instance_count=50,
volume_size_in_gb=600,
base_job_name = 'preprocess-semantic'
)
huggingface_processor.run(
inputs=[
ProcessingInput(
source='<s3_uri or local path>',
s3_data_distribution_type='ShardedByS3Key',
destination='/opt/ml/processing/data/input')
],
outputs=[
ProcessingOutput(
source='/opt/ml/processing/data/output/',
destination='<s3_uri>,
s3_upload_mode='Continuous'
)
]
)
Conclusion
In this blog, we have covered an end-to-end distributed processing job using a pretrained transformer model from Hugging Face. We leveraged Amazon SageMaker to abstract away the provision of resources. We learned how to package our code, create docker containers, and upload it to Amazon ECR. Here is the official Amazon SageMaker Processing Class Documentation to learn more.
I hope you like this tutorial and find it useful. If you have any thoughts, comments, or questions, please leave a comment below or contact me on LinkedIn. Happy reading 🙂