GuidesAPI reference
DiscordDashboard
DiscordDashboard

Deploy a stable diffusion pipeline

How to deploy a pre-trained HugginFace stable diffusion pipeline on Pipeline Cloud.

HuggingFace (HF) provides a really simple way to use some of the best models from the open-source ML sphere. In this guide, we'll build out a pipeline around a HF diffusers model. The logic followed here can be replicated for almost any of the ~100,000 models available on HF. We selected sd-dreambooth-library/herge-style out of the many available pre-trained sd-dreambooth HF pipelines.

NOTE: This is a walkthrough, so many of the below code snippets are mere chunks of a larger script. If you're skimming or just want to see code, then skip to the conclusion where you'll find the complete script.

Getting started with HuggingFace diffusers

Once you've installed diffusers, it's really simple to initialise a model and start running inference on it. We'll use a stable diffusion model trained using dreambooth, sd-dreambooth-library/herge-style. It is a text to image model, which means it will take in an input sentence/prompt like 'Mountain winds and babbling springs and moonlight seas', and output an image related to the input prompt. It was fine-tuned with Tintin images using Dreambooth. This technique was developed by Google in order to fine-tune diffusion models by injecting a custom subject to the model. It uses a rare word for the custom subject (in our case herge_style) which doesn't have much meaning in the original model. For instance, "Mountain winds and babbling springs and moonlight seas, herge_style", will generate an image like

Using a pipeline from diffusers

HuggingFace makes it very easy to load any pretrained diffusion pipeline and to use it in inference, by interfacing with the DiffusionPipeline module.

🚧

A HuggingFace pipeline is not the same as a pipeline-ai pipeline

Both HuggingFace and pipeline.ai use the same word 'pipeline' to mean 'a set of processing steps which convert an input to an output'. Later in this guide, we're going to embed this model within a pipeline-ai 'pipeline'.

Getting started using sd-dreambooth-library/herge-style for inference, is as simple as:

from diffusers import DiffusionPipeline
from PIL.Image import Image

# Load the HF pipeline
model = DiffusionPipeline.from_pretrained("sd-dreambooth-library/herge-style")

# The input prompt
prompt = "Mountain winds and babbling springs and moonlight seas, herge_style."

# Generate an image from the prompt
output_image: Image = model(prompt).images[0]

# Save the image to a local file
with open("image.jpeg", "w") as f:
    output_image.save(f, format="JPEG")

Running the Python script will take a while but you should eventually see an image saved to a local file.
What just happened here? We instantiated a pre-trained HF pipeline, and just by passing a prompt string we made a prediction, which returned a list of PIL images. We then saved the first image to a local file.

Internally, the HF pipeline assembles the model on CPU, downloads the sd-dreambooth-library/herge-style weights, and then loads them into the model. If you have a GPU attached, you can ensure the prediction takes place on your GPU instead, by creating a torch.device and moving the model (tensor) to that device:

...
import torch

# Create a GPU device if it is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the HF pipeline
sd_pipeline = DiffusionPipeline.from_pretrained("sd-dreambooth-library/herge-style").to(
    device
)
...

Here we're taking advantage of the neat .to() method provided by PyTorch to send models, inputs, and other data to specific devices.

Input keyword arguments

In addition to the required prompt input, there are a number of other optional keyword arguments we might want to supply to our model at inference. For instance, the number of images per prompt, the dimensions of the images, the random seed and so on. It is useful to define the shape of this input and also provide default values for these in case they are only partially provided at runtime.
For this guide, we will define the following input dict to the model:

import typing as t

# The shape of the input keyword arguments
class InputKwargs(t.TypedDict):
    prompt: str
    num_images_per_prompt: t.Optional[int]
    height: t.Optional[int]
    width: t.Optional[int]
    num_inference_steps: t.Optional[int]
    guidance_scale: t.Optional[float]
    eta: t.Optional[float]
    # seed should not be passed to the model
    seed: t.Optional[int]


DEFAULT_KWARGS: InputKwargs = {
    "prompt": "Mountain winds and babbling springs and moonlight seas, herge_style.",
    "num_images_per_prompt": 1,
    "height": 512,
    "width": 512,
    "num_inference_steps": 50,
    "guidance_scale": 7.5,
    "eta": 0.0,
    "seed": None,
}

We've included a TypedDict characterising the shape of the expected input to the pipeline. As with all typing in Python, this isn't enforced at runtime but makes things clearer for other developers and also gives you handy type hints if your IDE supports it.

Building a pipeline around the diffusers model

Now that we have a HF model working for local inference, it's time to start laying the ground to offload that work to PipelineCloud. In order for our Python code to run in the cloud, it needs to be serialised, sent to the server and executed in the cloud, where input keyword arguments should be fed into our code dynamically at runtime. Luckily, pipeline-ai provides a number of useful tools for achieving just that. All these steps will be handled under the hood by building a pipeline-ai pipeline around our Python code. We'll also package any loading, pre-processing, inference and post-processing steps into a single deployable object.
There are 2 main steps to this process:

  • Create and configure a pipeline blueprint using a context manager.
  • Create a pipeline_model which wraps around the HF stable-diffusion model and implements all the methods called within the pipeline blueprint.

We'll begin by configuring the pipeline and then secondly build out the pipeline_model.
This will make it easier to understand why the model objects are decorated as they are.

Creating the pipeline

First, make sure you have added pipeline-ai to your virtual environment, using whichever package manager you prefer. Then we'll start by making a pipeline blueprint. This blueprint is essentially a computational graph representing a set of instructions for what should happen when a request is made to your inference endpoint. In the body of your request, you will typically be passing some payload, which includes e.g. prompt, num_inference_steps as discussed in the previous section. This payload should then be passed as an input variable to the pipeline at runtime and fed into operations, whose outputs are in turn fed into further operations until we arrive at the output of the pipeline itself.

To build the pipeline, you need to create a Pipeline object and use a context manager, as below:

import typing as t

from PIL.Image import Image
from pipeline import Pipeline, Variable


PIPELINE_NAME = "sd-dreambooth"

# The `pipeline-ai` pipeline
with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
    # Define pipeline inputs
    input_kwargs = Variable(dict, is_input=True)
    pipeline.add_variables(input_kwargs)

    # Create and load model (We'll define it later)
    model = SDDreambooth()
    model.load()

    # Feed inputs to model
   	model.set_kwargs(input_kwargs)
    ## We expect a list of `PIL` images as output
    images: t.List[Image] = model.predict()

    # Format the images and output result
    formatted_images: t.List[str] = model.format_images(images)
    pipeline.output(formatted_images)

NOTE: We'll define the SDDreambooth model in the next section.

The pipeline has been broken down into 4 parts. We'll define the core SDDreambooth model and it's methods later, but on a high level we can see that:

  • At the start, we define the input Variable to the pipeline and add it to the pipeline. This tells the pipeline that it should expect a variable of dict type at runtime. In fact, what we really would want instead is input_kwargs = Variable(InputKwargs, is_input=True), but this leads to type error at runtime as TypedDict is not currently supported as a Variable type. We'll be pretty much passing an InputKwargs dictionary to the model itself, with the exception of the seed key, so we'll need to pop that off before feeding the input to the model.
  • We then create a SDDreambooth model instance (defined later) and load the HF model into memory. We'll be able to tell the load method to only run once at start up so that the model isn't loaded unnecessarily for every inference call, but we'll get to that shortly.
  • The InputKwargs to the pipeline are then parsed and fed to the model, which runs a forward pass through the network and returns a list of PIL images.
  • A post processing stage to format the output images into JSON-serializable format and output these formatted images from the pipeline.

πŸ“˜

Why is the syntax so strict?

If you're unfamiliar with building computational graphs this syntax can be a bit alien and tricky to parse. The point is to create a deterministic flow from input/s to output/s so that Pipeline Cloud servers can find optimisations and handle scaling correctly. In the end you'll acheive better performance.

You can add as many inputs and as many outputs to the pipeline as you like, so as your model grows, you can introduce a host of different arguments, data points, and return values. One thing you can't do within a pipeline, however, is use a 'raw' runtime value such as 42 or True. All runtime values should either be within a Variable or further down within the model class.

Creating the core pipeline_model

Next, we want to implement the SDDreambooth model that we instantiated in the pipeline. This will be a wrapper class around the HF model where we will define for instance, how the HF model should be loaded, how the inputs and outputs to the model should be transformed and of course, the inference method itself. The wrapper class needs to be decorated by pipeline_model. This allows the Pipeline context manager defined in the last section, to treat the wrapper class as a model object. The model can contain pipeline_function decorated functions and allows for persistent logic to be present inside of the wrapper class (for caching etc).

As in the previous section, we create the model around the HF diffusers package:

import base64
import io
import os
import random
import typing as t

import numpy as np
import torch
from PIL.Image import Image
from pipeline import (
    pipeline_function,
    pipeline_model,
)


@pipeline_model
class SDDreambooth:
    def __init__(self) -> None:
        self.input_kwargs = None
        self.model = None

    @pipeline_function(run_once=True, on_startup=True)
    def load(self) -> None:
        """
        Load the model into memory. The decorator parameters ensure the
        model is loaded only when needed, i.e. when it is not cached on the GPU.
        """
        from diffusers import DiffusionPipeline

        device = torch.device("cuda")
        self.model = DiffusionPipeline.from_pretrained(
            "sd-dreambooth-library/herge-style"
        )
        self.model.to(device)

    @pipeline_function
    def set_kwargs(self, input_kwargs: InputKwargs) -> InputKwargs:
        """
        Set the model kwargs given the input kwargs.
        These are used in other methods.
        """
        self.input_kwargs = {**DEFAULT_KWARGS, **input_kwargs}
        return self.input_kwargs

    @pipeline_function
    def seed_everything(self) -> int:
        """
        Sets seed for pseudo-random number generators in: pytorch, numpy, python.random.
        `PL_GLOBAL_SEED` ensures the seed is passed to any spawned subprocesses.
        """
        seed = self.input_kwargs.pop("seed") or random.randint(1, 1_000_000)
        os.environ["PL_GLOBAL_SEED"] = str(seed)
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        return seed

    @pipeline_function
    def predict(self) -> t.List[Image]:
        """
        Generates a list of images given the `input_kwargs`.
        """
        # Ensure the input kwargs have been set
        if self.input_kwargs is None:
            raise TypeError(
                "Input kwargs cannot be None. Set them before calling this method."
            )
        seed = self.seed_everything()
        generator = torch.Generator(device=0).manual_seed(seed)

        images = self.model(**self.input_kwargs, generator=generator).images

        return images

    @pipeline_function
    def to_string(self, image: Image) -> str:
        """
        Converts a `PIL` image to a base64 encoded string.
        """
        buffered = io.BytesIO()
        image.save(buffered, format="JPEG")
        img_str = base64.b64encode(buffered.getvalue()).decode()
        return img_str

    @pipeline_function
    def format_images(self, images: t.List[Image]) -> t.List[str]:
        """
        Formats a list of `PIL` images into a list of base64 encoded strings.
        """
        return [self.to_string(image) for image in images]

OK that may seem like a bit of a mouthful, but most of the code here is actually pretty straight-forward. First notice that the methods of the pipeline_model have pipeline_function decorators. These ensure that the actual runtime values of the Variable objects will be passed to these methods when we call them from within the pipeline, rather than the bare Variable objects themselves.
We've implemented the following methods:

  • load handles the instantiation of the model and sending it to a GPU (more on this below).

  • set_kwargs combines optional input parameters with default ones.
    If you wanted to perform some form of validation on the input, here would be a good place to do it.
    We saved the inputs as an instance attribute to share them across methods.

  • seed_everything sets the seed for pseudo-random generators, depending on the seed property provided in the input.
    We pop it off the input as we don't want it passed as a model parameter, but instead as a torch.Generator.manual_seed parameter.

  • predict passes our input to the stable diffusion model and generates a list of PIL images.

  • format_images converts the generated PIL images into base64-encoded strings, so that they're in a suitable form to be sent across networks (more on this below).

Set the load function to run only on startup

Remember how every request to your pipeline's endpoint will follow the blueprint from top to bottom? If that were to happen now, the model.load() function would be called on every single request. One of the great features of a platform like Pipeline Cloud is that it can cache your models on GPU so that you don't have to experience cold starts on every request. If we repeatedly called load then we would be spending time with pointless loading.

πŸ“˜

Your model stays cached until another replaces it

Pipeline Cloud automatically stores your model in GPU cache after it has been loaded, so that future inference requests can skip the cold start. It will remain cached until another pipeline 'kicks it off' so that the platform can serve all users fairly. However, if your traffic is regular and sufficiently high-volume then your pipeline will remain ~permanently cached while you only pay for inference time.

Thus we need to tell the blueprint to only call the load method once when the pipeline loads, and not again for the duration of the pipeline's time within GPU cache. Fortunately, there's a really easy way to do exactly that, and unlock all the performance benefits that it entails. Just tag the pipeline_function decorator on the load method with the following two arguments:

...
@pipeline_function(run_once=True, on_startup=True)
def load(self) -> bool:
  ...

Now, even though we call model.load() within the pipeline, we can be sure it will only run when the pipeline caches, and not again. Inference should be even faster as a result!

Post-processing

You may have noticed that the images generated by the HF model are native Python PIL objects. However, when running inferences in the cloud, we need to return JSON-serializable objects.

🚧

PIL images cannot be JSON-serialised

PIL images aren't natively JSON-serialisable, which means we haven't included them raw in our inputs or output to the pipeline.

So instead of directly outputting the PIL images from the pipeline, we instead transform them into base64 encoded strings. This is handled by the format_images and to_string methods.

Running the pipeline locally

As we've seen, pipeline-ai is a library for building a computational flow. It can also be used locally to handle execution of the pipeline, called a 'run'. So, a great way of debugging your pipeline before uploading it to Pipeline Cloud is to run it locally! Of course, if you don't have a GPU attached then in some cases local runs will be too slow to be practical.

pipeline = Pipeline.get_pipeline(PIPELINE_NAME)

example_input: InputKwargs = dict(
    prompt="Black rock, ship-wreck, volcano, herge_style.",
    num_inference_steps=20
)
result = pipeline.run(example_input)

First we 'get' the pipeline by using the name which we set when defining the pipeline blueprint. Then, very simply, we call the .run() method on the pipeline object, passing in our input.

Running the pipeline on Pipeline Cloud

πŸ“˜

First login with our CLI

We will be interacting with the Pipeline API using the CLI and assume you have authenticated. For more information about how to authenticate using the CLI, see our authentication guide

Creating the remote Python environment

In order to execute runs in the cloud, we'll need some Python packages that aren't included in the default environment , e.g. a more up to date diffusers package. This means that we'll need to create a new custom environment and add all the required packages. The easiest way to achieve this is by using the pipeline-ai CLI. We recommend that you have the latest version of pipeline-ai installed.

To create a new environment, named huggingface say, then simply run

pipeline environments create huggingface

in a shell with your local environment (with pipeline-ai) activated. You can check that it was created successfully by fetching it by name:

pipeline environments get -n huggingface

Here you should see a response with an empty list of python_requirements, which are the Python packages in your environment.

Create a local requirements.txt file containing the following lines:

transformers==4.26.1
torch==1.13.1
diffusers==0.13.1
accelerate==0.17.1

and then add all these packages to your custom environment by running:

cat requirements.txt | xargs pipeline environments update -n huggingface add

You should now see these packages in the environment python_requirements. Note that you'll need the ID of your custom environment when uploading the pipeline to PipelineCloud.

Uploading the pipeline

Before we can run the model on Pipeline Cloud, we need to upload it to the servers. Again we 'get' the pipeline, before instantiating a connection to Pipeline Cloud and uploading our pipeline.

from pipeline import PipelineCloud

pipeline = Pipeline.get_pipeline(PIPELINE_NAME)

api = PipelineCloud()
uploaded_pipeline = api.upload_pipeline(pipelinei, environment="YOUR_ENVIRONMENT_ID")

print(f"Uploaded pipeline id: {uploaded_pipeline.id}")

Just be sure to replace YOUR_ENVIRONMENT_ID with the ID of the custom environment you created previously, which you can get using the CLI:

pipeline environments get -n huggingface

During this stage, the pipeline-ai library will serialise all your code and post your pipeline to an endpoint for creating pipelines in our main API gateway.

🚧

You can't modify uploaded pipelines

Once a pipeline has been uploaded to Pipeline Cloud, it's considered immutable, which is to say it can't be updated or modified in any way, even if its a buggy pipeline. This means you have to upload a new pipeline every time you make a change.

Running the pipeline

And now we run the pipeline, supplying an input dictionary of type InputKwargs:

run = api.run_pipeline(
    uploaded_pipeline.id,
    {
        "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
        "num_inference_steps": 50
    },
)

Internally this performs a POST request to the /v2/runs endpoint in our main API, so if you're building an app in a different language you don't need to worry about dropping the pipeline-ai library.

The first time you run the pipeline, it may take up to a couple minutes because the custom environment and pipeline won't be cached on our servers. Subsequent runs won't be subject to this cold start though and should be pretty speedy! Just make sure you move the run_pipeline call into another script and don't execute the whole script again because you'll be uploading a new pipeline each time :disappointed:.

Conclusion

In this guide, we saw how to interface with the HuggingFace DiffusionPipeline to very easily start generating local predictions on a pretrained stable-diffusion pipeline. We then packaged this HuggingFace pipeline into a single deployable pipeline-ai pipeline, getting our Python code in a form ready to be serialised, sent and executed on the the PipelineCloud servers. After uploading the pipeline to the cloud, we were quickly able to start running the pipeline remotely.

Complete script

import base64
import io
import os
import random
import typing as t

import numpy as np
import torch
from diffusers.utils import logging
from dotenv import load_dotenv
from PIL.Image import Image
from pipeline import (
    Pipeline,
    PipelineCloud,
    Variable,
    pipeline_function,
    pipeline_model,
)

load_dotenv()

PIPELINE_NAME = "sd-dreambooth"


logging.disable_progress_bar()
logging.set_verbosity_error()


# The shape of the input keyword arguments
class InputKwargs(t.TypedDict):
    prompt: str
    num_images_per_prompt: t.Optional[int]
    height: t.Optional[int]
    width: t.Optional[int]
    num_inference_steps: t.Optional[int]
    guidance_scale: t.Optional[float]
    eta: t.Optional[float]
    seed: t.Optional[int]


DEFAULT_KWARGS: InputKwargs = {
    "prompt": "Mountain winds and babbling springs and moonlight seas.",
    "num_images_per_prompt": 1,
    "height": 512,
    "width": 512,
    "num_inference_steps": 50,
    "guidance_scale": 7.5,
    "eta": 0.0,
    "seed": None,
}


@pipeline_model
class SDDreambooth:
    def __init__(self) -> None:
        self.input_kwargs = None
        self.model = None

    @pipeline_function(run_once=True, on_startup=True)
    def load(self) -> None:
        """
        Load the model into memory. The decorator parameters ensure the
        model is loaded only when needed, i.e. it is not cached on the GPU.
        """
        from diffusers import DiffusionPipeline

        device = torch.device("cuda:0")
        self.model = DiffusionPipeline.from_pretrained(
            "sd-dreambooth-library/herge-style"
        )
        self.model.to(device)

    @pipeline_function
    def set_kwargs(self, input_kwargs: InputKwargs) -> InputKwargs:
        """
        Set the model kwargs given the input kwargs.
        These are used in other methods.
        """
        self.input_kwargs = {**DEFAULT_KWARGS, **input_kwargs}
        return self.input_kwargs

    @pipeline_function
    def seed_everything(self) -> int:
        """
        Sets seed for pseudo-random number generators in: pytorch, numpy, python.random.
        `PL_GLOBAL_SEED` ensures the seed is passed to any spawned subprocesses.
        """
        seed = self.input_kwargs.pop("seed") or random.randint(1, 1_000_000)
        os.environ["PL_GLOBAL_SEED"] = str(seed)
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        return seed

    @pipeline_function
    def predict(self) -> t.List[Image]:
        """
        A forward pass through the network given the `input_kwargs`.
        """
        # Ensure the input kwargs have been set
        if self.input_kwargs is None:
            raise TypeError(
                "Input kwargs cannot be None. Set them before calling this method."
            )
        seed = self.seed_everything()
        generator = torch.Generator(device=0).manual_seed(seed)

        images = self.model(**self.input_kwargs, generator=generator).images

        return images

    @pipeline_function
    def to_string(self, image: Image) -> str:
        """
        Converts a `PIL` image to a base64 encoded string.
        """
        buffered = io.BytesIO()
        image.save(buffered, format="JPEG")
        img_str = base64.b64encode(buffered.getvalue()).decode()
        return img_str

    @pipeline_function
    def format_images(self, images: t.List[Image]) -> t.List[str]:
        """
        Formats a list of `PIL` images into a list of base64 encoded strings.
        """
        return [self.to_string(image) for image in images]


with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
    # Define pipeline inputs
    input_kwargs = Variable(dict, is_input=True)
    pipeline.add_variables(input_kwargs)

    # Create and load model
    model = SDDreambooth()
    model.load()

    # Feed inputs to model
    context: InputKwargs = model.set_kwargs(input_kwargs)
    images: t.List[Image] = model.predict()

    # Format the images and output result
    formatted_images: t.List[str] = model.format_images(images)
    pipeline.output(formatted_images)


pipeline = Pipeline.get_pipeline(PIPELINE_NAME)


api = PipelineCloud()
uploaded_pipeline = api.upload_pipeline(pipeline, environment="YOUR_ENVIRONMENT_ID")
print(f"Uploaded pipeline id: {uploaded_pipeline.id}")

run = api.run_pipeline(
    uploaded_pipeline.id,
    {
        "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
        "num_inference_steps": 50,
    },
)