Deploy a PyTorch model
Create a scalable serverless endpoint for running inference on your PyTorch model
PyTorch is the de facto ML framework, and although Pipeline Cloud supports a range of frameworks, in practice most deployed pipelines are built from PyTorch models.
In this guide we'll assemble a basic neural network, deploy it to Pipeline Cloud for inference, and submit some runs (essentially inference requests). We'll also see how to deploy a trained model by using the PipelineFile
interface to upload the weights. This tutorial uses Python and our Python library pipeline-ai
. Let's go.
Install PyTorch by running
pip install torch
torch
is the package name forPyTorch
, and you'll see it imported below.
NOTE: This is a walkthrough, so many of the below code snippets are mere chunks of a larger script. If you're skimming or just want to see code, then skip to the conclusion where you'll find the complete script.
Creating a PyTorch model
First, let's create a simple NN using the example from the PyTorch docs. This simple model first flattens the input tensor, performs some linear operations on it, and is regulated by a ReLU
activation function.
import torch
from torch import nn
class NeuralNetwork(nn.Module):
def __init__(self) -> None:
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(4, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
The init
method describes the model and its layers, and the forward
method describes how an input becomes an output by passing through the network.
Running the model on CPU
To 'run' this model locally, we need to instantiate the NN, create an input tensor, pass it to the NN's forward
method, and then observe the output.
import torch
model = NeuralNetwork()
input = torch.randn(2, 4)
output = model.forward(input)
print(output.size())
We'll print the shape (.size()
) of the output tensor rather than the very long tensor itself. Running this script, we see this in the terminal:
torch.Size([2, 10])
Great! Everything is working as expected. This network is small and speedy, so it can run on CPU. Larger models or larger inputs will require more computing power, and it's common practice to use a GPU for these models since they can handle matrix multiplication more efficiently.
Running the model on GPU
If you have an NVIDIA GPU attached to your computer, you can use this modified version of the above script to send your model and inputs into GPU memory (known as VRAM) so that the computations are performed there. If you don't have a GPU, this script will fail, but it's worth seeing the syntax anyway. Notice how .to()
is called on both the model and the input.
import torch
gpu = torch.device("cuda")
model = NeuralNetwork().to(gpu)
input = torch.randn(2, 4).to(gpu)
output = model.forward(input)
print(output.size())
torch.Size([2, 10])
Set up your GPU first before trying the above script
If you don't have an NVIDIA GPU, skip to the next section. If you do, make sure you've installed the necessary drivers, a CUDA-compiled version of PyTorch, and the CUDA toolkit. Installing these things is easiest with
conda
in our experience.
Converting the model into a pipeline
Perhaps you're building an app where users need to run this model 'whenever'. One option is to turn your computer into an API server and execute jobs on your own CPU or GPU as they come in.
Or you can outsource this compute demand to a remote cloud provider such as Pipeline Cloud. Pipeline Cloud automatically constructs a serverless API which can accept your inference requests, execute them on powerful servers, and return the result, so that you don't have to worry about managing servers, handling task queues, auto-scaling, caching models intelligently, or any other MLOps hassle like that.
From your app you can simply POST
to an private API endpoint for your model to create a 'run'. We will handle execution and everything else, and then once the 'run' is complete, the result will be returned to you. You can find more benefits to our serverless compute offering here.
A 'run' is a forward pass
In the above script, we called
model.forward(input)
to perform inference on the model. A 'run', in Pipeline Cloud language, is the same thing – a single forward pass 'through the model'.
Now let's think about deploying this to Pipeline Cloud to generate an inference endpoint which you can call from your app.
Building the pipeline
To add the model to Pipeline Cloud's servers, we need to convert it into a 'pipeline'. There is no functional difference between a model and a 'pipeline', it's just a wrapper which defines the API shape and computational graph so that we can parse and smartly serve your model in the best way possible. Don't worry if that sounds mysterious for now.
First, let's install the pipeline-ai
library in order to begin assembling the pipeline:
pip install pipeline-ai
Let's recap what we want to achieve: we want to create an endpoint which will take the body of a POST
request as an input, pass it through the forward
function of our model, and then return the output in the response of the request. Before deploying to Pipeline Cloud, we'll build a local version of the pipeline.
Creating the pipeline
from pipeline import Pipeline, Variable
import torch
with Pipeline("pytorch-demo") as pipeline_blueprint:
input_tensor = Variable(torch.Tensor, is_input=True)
pipeline_blueprint.add_variable(input_tensor)
model = NeuralNetwork()
output_tensor = model.forward(input_tensor)
pipeline_blueprint.output(output_tensor)
There's a lot of new concepts in the above code snippet, so let's go through them line by line. First we import the Pipeline
and Variable
interfaces from pipeline
(aka pipeline-ai
).
Then we instantiate a new Pipeline
on line 4, and here it has the name 'PyTorch demo'. You can choose any name you want; it can be helpful to label your pipelines so you can find them more easily on the dashboard. You need to use a context manager as shown for the variables to be added to the pipeline correctly.
This blueprint is basically a diagram for what should happen when a 'run' is executed. Internally a graph is built of the flow from start to end, and we use this graph on our servers to maximise speed and efficiency of your inference. So on line 6 we tell the blueprint to expect an input of type torch.Tensor
. Recall how we created random inputs earlier by calling torch.randn(2, 4)
, which returns a torch.Tensor
.
PyTorch tensors cannot be JSON-serialised
Annoyingly, PyTorch tensors aren't JSON-serialisable, which means we won't be able to include them raw in your input or output when we deploy the API endpoint to Pipeline Cloud. But we'll see a workaround for this later.
On line 7, we add the input Variable
into the blueprint. It's important to note that arbitrary 'pre-defined' inputs such as input_boolean = False
cannot be added into the blueprint since the blueprint is not able to hold any fixed values. Instead, we must create a Variable
with type bool
or fix this boolean within the model itself.
On line 9 we instantiate the model, as before. And on line 11 we perform the forward pass on the input_tensor
. Finally, on line 12, we tell the blueprint to emit this output_tensor
as a result of the pipeline. Super! We've now built the pipeline blueprint, and there's a couple of things left to do to prepare our model for deployment.
Running the pipeline locally
All pipelines that you deploy to Pipeline Cloud can also be run locally (i.e. on your own computer), provided you have the necessary compute requirements.
So, earlier we ran the PyTorch model on CPU. Adding the pipeline blueprint to the bottom of that script, we get this:
import torch
from torch import nn
class NeuralNetwork(nn.Module):
def __init__(self) -> None:
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(4, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
from pipeline import Pipeline, Variable
import torch
with Pipeline("pytorch-demo") as pipeline_blueprint:
input_tensor = Variable(torch.Tensor, is_input=True)
pipeline_blueprint.add_variable(input_tensor)
model = NeuralNetwork()
output_tensor = model.forward(input_tensor)
pipeline_blueprint.output(output_tensor)
If we run this script, no output is printed to the terminal. That's because the pipeline has only been assembled, but not run. To perform a local run we need to 'get' the pipeline and run it. Add the following lines to the bottom of the script to do exactly that:
pytorch_pipeline = Pipeline.get_pipeline("pytorch-demo")
example_input = torch.randn(2, 4)
result = pytorch_pipeline.run(example_input)
print(result)
Notice how we used the pipeline's name (defined earlier) to access the computational graph before calling the run
method. We're almost there! However, when you try to run this script, you'll notice an error:
AttributeError: 'Variable' object has no attribute 'flatten'
This is occurring in the forward
method of the NeuralNetwork
class, and it looks like the input_tensor
Variable
from the blueprint is being directly passed to the function, rather than its actual runtime value, which we know is a tensor (example_input
). So, we need to tell the forward
method of the model that it needs to 'fetch' the true runtime value of input_tensor
.
We do this by decorating the forward
method with the @pipeline_function
decorator:
from pipeline import pipeline_function
...
class NeuralNetwork(nn.Module):
...
@pipeline_function
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
Any function which directly receives the value of a blueprint Variable
should be wrapped in the @pipeline_function
decorator, which instructs the function to access the runtime values of its input blueprint Variable
s.
Similarly, any object (such as the NeuralNetwork
class) which is instantiated directly inside the Pipeline blueprint (see line 30 above, where model = NeuralNetwork()
is called from within the context manager) needs to be decorated with @pipeline_model
. Accordingly:
from pipeline import pipeline_function, pipeline_model
...
@pipeline_model
class NeuralNetwork(nn.Module):
...
The blueprint and decorators can be a bit confusing if you're new to pipeline-ai
but don't be scared by them. They exist to help convert blueprint Variables
into their runtime values automatically, so that your functions run as expected. The library will usually raise a helpful Exception in the event of a missing decorator, or if you accidentally include a runtime value in the blueprint.
Everything's set up for a local run now, so let's test it:
[tensor([[-0.0138, 0.0716, -0.2074, -0.0073, 0.0806, -0.1348, -0.0979, 0.1820,
-0.0417, 0.0059],
[ 0.0215, 0.1512, 0.0052, -0.0503, 0.0423, -0.1144, 0.0655, 0.0270,
-0.0877, -0.0703]], grad_fn=<AddmmBackward0>)]
Eyyyy, super, it worked! We converted our PyTorch model into a pipeline and ran it locally! Notice how the result is a list – that's because a pipeline outputs a list by default and calling pipeline_blueprint.output()
simply appends a new output to this list. This way, you can easily compose multi-input and multi-output pipelines.
Uploading the pipeline to Pipeline Cloud
Now we know that the pipeline works locally, we can upload it to Pipeline Cloud's servers for remote computation. It's very simple to modify our existing code to do this. Make sure to grab an API token from the dashboard here.
...
from pipeline import PipelineCloud
pytorch_pipeline = Pipeline.get_pipeline("pytorch-demo")
api = PipelineCloud(token="PIPELINE_API_TOKEN")
uploaded_pipeline = api.upload_pipeline(pytorch_pipeline)
print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
We authenticate with Pipeline Cloud on line 5, upload the pipeline on line 7, and print out the remote pipeline id
on line 8. To refer to uploaded pipelines, we'll need to use this id, so record it somewhere. You can always find it on the dashboard too, if necessary.
There's one more thing we need to do before we can do remote runs on our deployed pipeline. Unfortunately tensors are not JSON-serialisable so we need to modify the pipeline to return the tensor data as a 'nested list'. This can be a bit complicated, so pipeline-ai
comes with a helper tensor_to_list
to do the hard work. Let's add it to the blueprint.
from pipeline.util.torch_utils import tensor_to_list
...
with Pipeline("pytorch-demo") as pipeline_blueprint:
...
output_tensor = model.forward(input_tensor)
output_list = tensor_to_list(output_tensor)
pipeline_blueprint.output(output_list)
Notice how on line 10 we converted the result of the forward
method into a list so that it can be sent over JSON through the API. Now we just need to upload this updated pipeline, and we'll be ready to submit runs! So, trigger upload_pipeline()
once again, save the pipeline id
, and now let's do a remote run!
example_input = tensor_to_list(torch.randn(2, 4))
run = api.run_pipeline(
uploaded_pipeline.id,
[example_input],
)
print(f"Run id: {run.id}")
print(run.result_preview)
And then we see printed out in the terminal:
[[[-0.06113505735993385, -0.04900337755680084, -0.028971631079912186, 0.0019726287573575974, -0.00612134113907814, 0.026191186159849167, -0.10189536213874817, -0.10396647453308105, 0.09754741191864014, -0.16766135394573212], [-0.1953875571489334, 0.010309990495443344, -0.04846293479204178, -0.061653949320316315, 0.10781243443489075, 0.08252111077308655, -0.23458144068717957, -0.14181871712207794, 0.065968818962574, -0.1120561957359314]]]
We just performed inference on our pipeline in the cloud! For smallish results you can simply print the result_preview
property of the returned RunGet
class. When you're returning a lot of data, you may see that result_preview
is None
. In this case (as long as the run's state is 'COMPLETE') you can download the result by running this:
result = api.download_result(run.result)
print(result)
If you don't see a result
You can print
run.error_info
to see whether your run failed and if so, what caused it to fail. During debugging this is a helpful feature as the raw traceback from the servers is forwarded so you can read it as if it were a traceback from your own computer.
Look how much ground we've covered: from a basic PyTorch model, we've made a deployed instance of the model which is ready to be added to apps where it will scale automatically, handle multiple parallel requests and spikes with ease, report usage metrics to the dashboard, and cost absolutely nothing while it's not in use. Pipeline Cloud is a truly convenient and modern way of serving ML models in production.
Using GPU on Pipeline Cloud
You might have noticed that we uploaded the model without specifying whether it should run on CPU or GPU. Since our code didn't reference any GPU, nor did we call .to()
to transfer the data to any NVIDIA device, by default the pipeline will execute exactly as it had locally – on CPU.
Pipeline Cloud supports arbitrary compute of any nature, and it won't meddle with the code to change the used device. So we can run CPU tasks, or GPU tasks, or a combination of both! Here though, we're running a CPU task when (as with most PyTorch models) really we want to take advantage of the remote GPU. Let's create a new pipeline_model
to wrap around the NeuralNetwork
class, which will send both the network and the inputs at inference time to the Pipeline Cloud cuda
device.
class NeuralNetwork(nn.Module):
def __init__(self) -> None:
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(4, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
@pipeline_model
class PyTorchDemoModel:
model = None
gpu = torch.device("cuda")
@pipeline_function
def load(self) -> bool:
self.model = NeuralNetwork().to(self.gpu)
return True
@pipeline_function
def infer(self, input: list[list[float]]) -> torch.Tensor:
input = torch.tensor(input) # input needs to be converted from list to tensor
input = input.to(self.gpu)
return self.model.forward(input)
with Pipeline("pytorch-demo") as pipeline_blueprint:
input_tensor = Variable(list, is_input=True)
pipeline_blueprint.add_variable(input_tensor)
model = PyTorchDemoModel()
model.load()
output_tensor = model.infer(input_tensor)
output_list = tensor_to_list(output_tensor)
pipeline_blueprint.output(output_list)
This looks like a lot, but it's pretty simple to follow! We're taking the first definition of the NeuralNetwork
module and building a pipeline_model
named 'PyTorchDemoModel' from it. There's a load
function and then an infer
function which are high-level adapters over the original init
and forward
methods on the NeuralNetwork
module. The load
and infer
functions handle transfering the model and its inputs to the Pipeline Cloud CUDA device.
Then, in the blueprint, instead of instantiating NeuralNetwork
directly, we instantiate PyTorchDemoModel
and call its load
function in order to instruct Pipeline Cloud's servers to load the model onto GPU, and later run inference on GPU. By the way, all of this code is also executable locally if you have an attached CUDA device.
If we upload and run this model, inference time (which is reported in run.compute_time_ms
) might not be very different because the model is so tiny. But for larger models, running on GPU can lead to inference speed-ups of many orders of magnitude, so it's definitely worth learning the knack.
Deploying a trained model
In the examples above we deployed a freshly-initialised edition of the neural network. But most of the time, we want to deploy the neural network with trained weights.
Although in theory it's possible to train a model using Pipeline Cloud, only inference is documented and supported at the moment. So for now let's assume we already have the trained weights in a file called weights.pt
, and we want to load them into the network so that when we call the cloud endpoint, we get useful results.
In fact, we only need to change a few lines to add in our trained model. We'll upload it using PipelineFile
, an interface from the SDK which not only cleverly chunks the upload but also enables the Pipeline Cloud servers to share the weights between themselves so that remote loading and caching is rapid.
from pipeline import PipelineFile
with Pipeline("pytorch-demo") as pipeline_blueprint:
input_tensor = Variable(list, is_input=True)
weights_file = PipelineFile(path="weights.pt")
pipeline_blueprint.add_variables(input_tensor, weights_file)
model = PyTorchDemoModel()
model.load(weights_file)
output_tensor = model.infer(input_tensor)
output_list = tensor_to_list(output_tensor)
pipeline_blueprint.output(output_list)
The above blueprint follows the same pattern as before, but with three changes. Firstly, we defined a file (PipelineFile
inherits from Variable
) at path weights.pt
on line 6. Then in the next line, we added both the input_tensor
and the weights_file
to the blueprint, using the add_variables
method. Finally, on line 10, we updated the model's load
function to receive this PipelineFile so that it can insert the stored weights into the model.
Now all that's left to do is modify the model's load
function so that it correctly loads in the weights:
@pipeline_model
class PyTorchDemoModel:
...
@pipeline_function
def load(self, weights_file: PipelineFile) -> bool:
weights = torch.load(weights_file.path, map_location=self.gpu)
self.model = NeuralNetwork().to(self.gpu)
self.model.load_state_dict(weights)
return True
...
Now we can upload the pipeline, again using upload_pipeline
, and in the terminal we'll see the weights file upload. When we submit a run, we'll now get the output of a trained model instead of the output of an untrained model.
Don't run your pipeline locally before uploading it
When you run a pipeline locally, its internal state changes according to the inputs at runtime and your current system. Quite often, serialisation issues can occur when an already-run pipeline is uploaded. And besides, you want the deployed version of your pipeline to be in its 'vanilla' un-run form before every request otherwise it may not perform as expected.
Minimising cold starts
There are a couple of neat features inside the pipeline-ai
library which you can use when uploading a pipeline to make inference even faster. To figure out how they work, we first need to understand a detail about pipeline blueprints.
What happens when we call run_pipeline()
? Well, the entire pipeline blueprint gets executed from top to bottom. This is a 'run': your entire API logic is within the blueprint and every time a request comes in, it will perform the inference steps – the pipeline_function
s inside the blueprint.
To be clear, even though PyTorchDemoModel
is instantiated inside the pipeline blueprint, it isn't instantiated during every single request, precisely because we wrapped it in pipeline_model
which prevents that repeating behaviour. Similarly, the input and output variables aren't redefined and re-added to the pipeline every time as the graph knows not to do that once it has been uploaded.
However, the PyTorchDemoModel
method load
, as it is a pipeline_function
, will be re-run every time a request is made. If the pipeline has already been loaded by a server on Pipeline Cloud, we don't actually want load
to re-run; put differently, we only want load
to run once, and only when it's cached on the server for the first time. Luckily, there are some flags to set this behaviour:
@pipeline_model
class PyTorchDemoModel:
...
@pipeline_function(run_once=True, on_startup=True)
def load(self) -> bool:
...
Check out line 6. By setting these two parameters to True
we establish that the load
function only needs to run once during caching, and not again. This way, when new runs are submitted, they will skip the load
as long as it has already been executed on the server.
Here's another trick to help Pipeline Cloud load the pipeline onto the right type of GPU. If we know how much GPU VRAM the pipeline consumes then we can specify the minimum VRAM requirement when we instantiate the pipeline.
with Pipeline("pytorch-demo", min_gpu_vram_mb=205) as pipeline_blueprint:
...
Using the min_gpu_vram_mb
argument, we're saying explicitly that this pipeline requires a GPU with at least 205 mb of available VRAM. Pipeline Cloud servers can automatically infer this if necessary, but specifying it like this helps make pipeline inference even more efficient, and ultimately that means better performance for your endpoint.
Record VRAM at inference time, not load time
If you're using the
min_gpu_vram_mb
argument, you'll need to identify how much VRAM your pipeline needs. Typically this can be as easy as watchingnvidia_smi
. But make sure to record the highest figure (which will likely be the VRAM usage during inference time) rather than just how much memory the model takes when loaded.
Conclusion
We've seen how to convert a PyTorch neural network into an inference-ready cloud endpoint by using pipeline-ai
, and how to deploy the model to the serverless GPUs at Pipeline Cloud. We looked at PipelineFile
and how to upload the weights of a trained model. We also discussed some easy performance pitfalls, and some techniques to help optimise the performance of a deployed pipeline.
It's easier with
pipeline-ai
Everything we did above can actually be done using our REST endpoints, documented here. Internally, the
pipeline-ai
SDK is simply formatting the data and sending it to the same public endpoints that you can use directly. But it's far easier to build and upload pipelines using the SDK. However, for runs, most of our users submit requests directly to the REST endpoint from their environment of choice.
Complete script
import torch
from torch import nn
from pipeline import (
Pipeline,
Variable,
pipeline_function,
pipeline_model,
PipelineCloud,
PipelineFile,
)
from pipeline.util.torch_utils import tensor_to_list
class NeuralNetwork(nn.Module):
def __init__(self) -> None:
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(4, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
@pipeline_model
class PyTorchDemoModel:
model = None
gpu = torch.device("cuda")
@pipeline_function
def load(self, weights_file: PipelineFile) -> bool:
weights = torch.load(weights_file.path, map_location=self.gpu)
self.model = NeuralNetwork().to(self.gpu)
self.model.load_state_dict(weights)
return True
@pipeline_function
def infer(self, input: list[list[float]]) -> torch.Tensor:
input = torch.tensor(input) # input needs to be converted from list to tensor
input = input.to(self.gpu)
return self.model.forward(input)
with Pipeline("pytorch-demo") as pipeline_blueprint:
input_tensor = Variable(list, is_input=True)
weights_file = PipelineFile(path="weights.pt")
pipeline_blueprint.add_variables(input_tensor, weights_file)
model = PyTorchDemoModel()
model.load(weights_file)
output_tensor = model.infer(input_tensor)
output_list = tensor_to_list(output_tensor)
pipeline_blueprint.output(output_list)
pytorch_pipeline = Pipeline.get_pipeline("pytorch-demo")
api = PipelineCloud(token="PIPELINE_API_TOKEN")
uploaded_pipeline = api.upload_pipeline(pytorch_pipeline)
print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
example_input = tensor_to_list(torch.randn(2, 4))
run = api.run_pipeline(
uploaded_pipeline.id,
[example_input],
)
print(f"Run id: {run.id}")
print(run.result_preview)
Updated 2 months ago