GuidesAPI reference
DiscordDashboard
DiscordDashboard

External data stores

Deploying a pre-trained HuggingFace stable-diffusion model and saving inference images to an external data store.

For those already familiar with PipelineCloud, it's clear how powerful of a platform it can be for running complex ML tasks without the need for expensive hardware or the hassle of managing ML-ops infrastructure. But you also have the ability to execute arbitrary Python on the PipelineCloud servers, providing you with significant flexibility and authority over your ML projects. For instance, you could connect up to a third party data store and save your inference results there.

In this guide, we'll show you how to save inference images to an external data store each time a prediction is made. As our inference model, we'll be using a HuggingFace pretrained stable-diffusion pipeline and building on deploy a stable diffusion pipeline. We'll be updating some of the code laid out in that guide, so make sure you've gone through that example first.

๐Ÿ“˜

Assumes prior knowledge

This guide assumes you are already familiar with deploying HuggingFace diffusers models on PipelineCloud and will build on deploy a stable diffusion pipeline, so make sure you've gone through that first.

As our data store, we'll be connecting to a public MinIO server. MinIO is an open-source object storage server that allows you to store and access large amounts of unstructured data. It is compatible with the Amazon S3 API, which means that you can use the same tools and applications that you would use with S3, to interact with Minio. In practice, all you'll have to do to connect up with an Amazon S3 bucket instead, is change some environment variables!

NOTE: This is a walkthrough, so many of the below code snippets are mere chunks of a larger script. If you're skimming or just want to see code, then skip to the conclusion where you'll find the complete script.

Background

In deploy a stable diffusion pipeline, we saw how to package a pretrained stable-diffusion HuggingFace pipeline into a deployable unit of code and upload that code to PipelineCloud. This enabled us to perform remote inferences in the cloud by making a HTTP POST request to an endpoint on the pipeline-ai API gateway. The deployable unit of code is a pipeline-ai pipeline. This is where you define the set of instructions that should be followed when an inference call is made to the endpoint. For instance, in the stable diffusion pipeline, the instructions involved creating a model instance, loading the HF model into memory, parsing the input to the pipeline and then passing it to the model to generate a list of images.

Now say you want the generated images to be uploaded to an external storage bucket. There are a number of ways you could achieve this. You could implement this client-side, where your client application makes a POST request to api.pipeline.ai/v2/runs, waits for the response, and then handles the logic for uploading the result. Instead, you could set up some CRON job that periodically queries your pipeline's runs, then uploads any missing run results. In this guide however, we'll show you how to handle the uploads server-side, within the execution of the pipeline itself. To achieve this, we'll just need to add an additional step at the end of the pipeline which will handle that logic.

So that's where we're heading. First, though, we'll implement a simple storage client class to interface with MinIO and handle the uploads. Seeing as we'll be using the Python minio SDK for this, we'll also need to set up a custom Python environment on PipelineCloud, as it isn't available in any of the public environments.

Uploading to MinIO

The MinIO server playground

The MinIO Server Playground is a web-based user interface that allows you to quickly and easily test the functionality of the MinIO server without having to set up your own instance. It provides a sandbox environment where you can experiment with MinIO features, such as bucket creation, object uploading, and metadata management. Navigate to https://play.min.io/ and login using the access key Q3AM3UQ867SPQQA43P2F and secret key zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG. You should see a bunch of random buckets there.

We'll be interacting with this public server instead of creating our own instance or setting up our own bucket on Amazon S3. In a production setting though, you'll obviously want to set something like that up. But after you've created your Amazon S3 bucket say, everything should work the same, you'll just need to update your environment variables to point to the right bucket.

In your project directory, create a .env file with the following environment variables:

S3_ENDPOINT=play.min.io
S3_ACCESS_KEY=Q3AM3UQ867SPQQA43P2F
S3_SECRET_KEY=zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG
S3_BUCKET=stable-diffusion-pipeline-files
S3_ENDPOINT=play.min.io
S3_ACCESS_KEY=Q3AM3UQ867SPQQA43P2F
S3_SECRET_KEY=zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG
S3_BUCKET_NAME=stable-diffusion-pipeline-files

We'll be saving our images to a bucket called stable-diffusion-pipeline-files. You can see what images other folk following this guide have been generating by checking if the bucket exists on https://play.min.io/, although buckets are purged pretty regularly so there may not be anything.

Implementing a Client wrapper

Let's build out a simple StorageClient class which wraps around the Minio Client. We'll implement 2 methods for uploading data to a data store: one for creating a new bucket and another for uploading objects to the bucket. Since we'll be interacting with the MinIO API using theminio Python SDK, make sure you have added it to your local environment, using pip, poetry or whichever package manager you prefer, e.g. poetry add minio. We'll also be loading the .env environment variables automatically using the dotenv package, so you'll want to install that too. This saves us from having to export variables each time when we open a new shell.

import io
import os

from dotenv import load_dotenv
from minio import Minio

load_dotenv()


class StorageClient:
    """
    A wrapper class for uploading files to an S3 bucket.
    Default storage client is the Minio Client. Inject another client if you'd like,
    but ensure the client interface matches the methods called here.
    """

    def __init__(
        self,
        Client=Minio,
        endpoint=os.getenv("S3_ENDPOINT"),
        access_key=os.getenv("S3_ACCESS_KEY"),
        secret_key=os.getenv("S3_SECRET_KEY"),
    ):
        self.client = Client(endpoint, access_key, secret_key)

    def _create_bucket(self, bucket: str):
        """Create a new bucket for storing images, if it does not already exist"""
        if not self.client.bucket_exists(bucket):
            self.client.make_bucket(bucket)

    def upload(
        self,
        object_name: str,
        data: bytes,
        bucket=os.getenv("S3_BUCKET"),
    ):
        """Upload a file to an S3 bucket"""
        length = len(data)
        data_stream = io.BytesIO(data)
        self._create_bucket(bucket)
        self.client.put_object(bucket, object_name, data_stream, length)


The code here should hopefully be pretty straight forward. When we create a new StorageClient instance, we inject the Client and associated credentials into the constructor. That way the StorageClient is less coupled to the specific Minio client, so we could swap it out for another client which implements the methods put_object, make_bucket and bucket_exists . We then have a "private" method _create_bucket, which creates a new storage bucket if it doesn't already exist. Finally, we have our core upload method. The object_name parameter allows you to distinguish the uploaded file from other files in the bucket. We also need to provide the file data, which we'll pass in as bytes and converted into a buffered stream inside the method.

You can try out the above script, for instance by adding the following lines of code

if __name__ == "__main__":
    from datetime import datetime
    now = datetime.utcnow()
    data = b"Some binary data: \x00\x01"
    client = StorageClient()
    client.upload(f"pipeline-test_{now}", data, bucket="pipeline-test")

If all went well you should now see a new file saved to the "pipeline-test" bucket on the MinIO playground server.

This is pretty much the bulk of what we'll need to save inference images of our stable-diffusion pipeline to the data store. The only extra thing we'll need is to embed logic for calling the StorageClient.upload method in the workflow of the stable diffusion pipeline. But we'll get to that later, once we've set up the custom environment.

Creating the remote Python environment

For the pipeline that we'll be developing, we'll need some Python packages that aren't included in the default environment , e.g. the minio package. This means that we'll need to create a new custom environment and add all the required packages. The easiest way to achieve this is by using the pipeline-ai CLI. We recommend that you have the latest version of pipeline-ai installed.

๐Ÿ“˜

First login with our CLI

We will be interacting with the Pipeline API using the CLI and assume you have authenticated. For more information about how to authenticate using the CLI, see our authentication guide

To create a new environment, named sd-minio say, then simply run

pipeline environments create sd-minio

in a shell with your local environment (with pipeline-ai) activated. You can check that it was created successfully by fetching it by name:

pipeline environments get -n sd-minio

Here you should see a response with an empty list of python_requirements, which are the Python packages in your environment.

Create a local requirements.txt file containing the following lines:

transformers==4.26.1
torch==1.13.1
diffusers==0.13.1
accelerate==0.17.1
minio==7.1.14

and then add all these packages to your custom environment by running:

cat requirements.txt | xargs pipeline environments update -n sd-minio add

You should now see these packages in the environment python_requirements. Note that you'll need the ID of your custom environment when uploading the pipeline to PipelineCloud.

Integrating MinIO uploads

Now that we've set up our MinIO client and custom environment, we're ready to integrate uploads to MinIO into the stable diffusion pipeline:

import typing as t

from PIL.Image import Image
from pipeline import Pipeline, Variable


PIPELINE_NAME = "sd-dreambooth"

# The `pipeline-ai` pipeline
with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
    # Define pipeline inputs
    input_kwargs = Variable(dict, is_input=True)
    pipeline.add_variables(input_kwargs)

    # Create and load model
    model = SDDreambooth()
    model.load()

    # Feed inputs to model
    context: InputKwargs = model.set_kwargs(input_kwargs)
    images: t.List[Image] = model.predict()

    # Format the images and output result
    formatted_images: t.List[str] = model.format_images(images)
    pipeline.output(formatted_images)

    # Upload images to MinIO
    save_to_store(formatted_images)

Only 1 change has been made to the original pipeline, where have added a function call save_to_store at the end of the pipeline which will be responsible for uploading the output of the pipeline, formatted_images, to MinIO.

Since we are calling the save_to_store within a Pipeline context manager and want the runtime values to be passed to the function, we need to decorate it with a pipeline_function decorator:

@pipeline_function
def save_to_store(images: t.List[Image]) -> None:
    import base64
    from datetime import datetime

    client = StorageClient()
    for image in images:
        now = datetime.utcnow()
        image_bytes = base64.b64decode(image)
        client.upload(f"dreambooth-{now}.jpeg", image_bytes)

Here we simply instantiate the client and upload each of the generated images. Notice that we base64 decode each image before uploading, in order to be able to preview them on the MinIO playground server.

Running the pipeline on Pipeline Cloud

Before we can run the pipeline on Pipeline Cloud, we need to upload it to the servers. Assuming you have authenticated using the CLI , we 'get' the pipeline, before instantiating a connection to Pipeline Cloud and uploading our pipeline:

from pipeline import PipelineCloud

pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
api = PipelineCloud()
uploaded_pipeline = api.upload_pipeline(
        pipeline, environment="YOUR_ENVIRONMENT_ID"
)
print(f"Uploaded pipeline id: {uploaded_pipeline.id}")

Just be sure to replace YOUR_ENVIRONMENT_ID with the ID of the custom environment you created previously, which you can get using the CLI:

pipeline environments get -n sd-minio

During this stage, the pipeline-ai library will serialize all your code and post your pipeline to an endpoint for creating pipelines in our main API gateway.

And now we run the pipeline, supplying an input dictionary of type InputKwargs :

run = api.run_pipeline(
    uploaded_pipeline.id,
    {
        "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
        "num_inference_steps": 100
    },
)

After running this you should be able to preview your inference images on the server! The first time you run the pipeline, it will take about a minute because the pipeline won't be cached on our servers. Subsequent runs won't be subject to this cold start though and should be pretty speedy! Just make sure you move the run_pipeline call into another script and don't execute the whole script again because you'll be uploading a new pipeline each time :disappointed:.

Conclusion

In this guide, we saw how to connect up our Python application with MinIO using the minio package and start uploading data to a data store. We created a storage client wrapper class around the MinIO Client which handles uploading files to a bucket on the server. We then updated the stable diffusion pipeline by adding a new pipeline_function call which uploads the inference images after each prediction.

Complete Script

import base64
import io
import os
import random
import typing as t

import numpy as np
import torch
from diffusers.utils import logging
from dotenv import load_dotenv
from minio import Minio
from PIL.Image import Image
from pipeline import (
    Pipeline,
    PipelineCloud,
    Variable,
    pipeline_function,
    pipeline_model,
)

load_dotenv()

PIPELINE_NAME = "sd-dreambooth"


class StorageClient:
    """
    A wrapper class for uploading files to an S3 bucket.
    Default storage client is the Minio Client. Inject another client if you'd like,
    but ensure the client interface matches the methods called here.
    """

    def __init__(
        self,
        Client=Minio,
        endpoint=os.getenv("S3_ENDPOINT"),
        access_key=os.getenv("S3_ACCESS_KEY"),
        secret_key=os.getenv("S3_SECRET_KEY"),
    ):
        self.client = Client(endpoint, access_key, secret_key)

    def _create_bucket(self, bucket: str):
        """Create a new bucket for storing images, if it does not already exist"""
        if not self.client.bucket_exists(bucket):
            self.client.make_bucket(bucket)

    def upload(
        self,
        object_name: str,
        data: bytes,
        bucket=os.getenv("S3_BUCKET"),
    ):
        """Upload a file to an S3 bucket"""
        length = len(data)
        data_stream = io.BytesIO(data)
        self._create_bucket(bucket)
        self.client.put_object(bucket, object_name, data_stream, length)


logging.disable_progress_bar()
logging.set_verbosity_error()


# The shape of the input keyword arguments
class InputKwargs(t.TypedDict):
    prompt: str
    num_images_per_prompt: t.Optional[int]
    height: t.Optional[int]
    width: t.Optional[int]
    num_inference_steps: t.Optional[int]
    guidance_scale: t.Optional[float]
    eta: t.Optional[float]
    seed: t.Optional[int]


DEFAULT_KWARGS: InputKwargs = {
    "prompt": "Mountain winds and babbling springs and moonlight seas.",
    "num_images_per_prompt": 1,
    "height": 512,
    "width": 512,
    "num_inference_steps": 50,
    "guidance_scale": 7.5,
    "eta": 0.0,
    "seed": None,
}


@pipeline_model
class SDDreambooth:
    def __init__(self) -> None:
        self.input_kwargs = None
        self.model = None

    @pipeline_function(run_once=True, on_startup=True)
    def load(self) -> None:
        """
        Load the model into memory. The decorator parameters ensure the
        model is loaded only when needed, i.e. it is not cached on the GPU.
        """
        from diffusers import DiffusionPipeline

        device = torch.device("cuda:0")
        self.model = DiffusionPipeline.from_pretrained(
            "sd-dreambooth-library/herge-style"
        )
        self.model.to(device)

    @pipeline_function
    def set_kwargs(self, input_kwargs: InputKwargs) -> InputKwargs:
        """
        Set the model kwargs given the input kwargs.
        These are used in other methods.
        """
        self.input_kwargs = {**DEFAULT_KWARGS, **input_kwargs}
        return self.input_kwargs

    @pipeline_function
    def seed_everything(self) -> int:
        """
        Sets seed for pseudo-random number generators in: pytorch, numpy, python.random.
        `PL_GLOBAL_SEED` ensures the seed is passed to any spawned subprocesses.
        """
        seed = self.input_kwargs.pop("seed") or random.randint(1, 1_000_000)
        os.environ["PL_GLOBAL_SEED"] = str(seed)
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        return seed

    @pipeline_function
    def predict(self) -> t.List[Image]:
        """
        A forward pass through the network given the `input_kwargs`.
        """
        # Ensure the input kwargs have been set
        if self.input_kwargs is None:
            raise TypeError(
                "Input kwargs cannot be None. Set them before calling this method."
            )
        seed = self.seed_everything()
        generator = torch.Generator(device=0).manual_seed(seed)

        images = self.model(**self.input_kwargs, generator=generator).images

        return images

    @pipeline_function
    def to_string(self, image: Image) -> str:
        """
        Converts a `PIL` image to a base64 encoded string.
        """
        buffered = io.BytesIO()
        image.save(buffered, format="JPEG")
        img_str = base64.b64encode(buffered.getvalue()).decode()
        return img_str

    @pipeline_function
    def format_images(self, images: t.List[Image]) -> t.List[str]:
        """
        Formats a list of `PIL` images into a list of base64 encoded strings.
        """
        return [self.to_string(image) for image in images]


@pipeline_function
def save_to_store(images: t.List[Image]) -> None:
    import base64
    from datetime import datetime

    client = StorageClient()
    for image in images:
        now = datetime.utcnow()
        image_bytes = base64.b64decode(image)
        client.upload(f"dreambooth-{now}.jpeg", image_bytes)


with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
    # Define pipeline inputs
    input_kwargs = Variable(dict, is_input=True)
    pipeline.add_variables(input_kwargs)

    # Create and load model
    model = SDDreambooth()
    model.load()

    # Feed inputs to model
    context: InputKwargs = model.set_kwargs(input_kwargs)
    images: t.List[Image] = model.predict()

    # Format the images and output result
    formatted_images: t.List[str] = model.format_images(images)
    pipeline.output(formatted_images)
    
    # Upload images to MinIO
    save_to_store(formatted_images)


sd_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
api = PipelineCloud()
uploaded_pipeline = api.upload_pipeline(
  	sd_pipeline, environment="YOUR_ENVIRONMENT_ID"
)
print(f"Uploaded pipeline id: {uploaded_pipeline.id}")

run = api.run_pipeline(
		"pipeline_9364a03735bd41f8bbf40b6524d48d22",
  	{
    		"prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
        "num_inference_steps": 100,
    },
)