GuidesAPI reference
DiscordDashboard
DiscordDashboard

Slack notifications

Deploying a pre-trained HuggingFace stable-diffusion model and posting images to a Slack channel.

If you're already familiar with PipelineCloud, then you know how it can provide a powerful platform for running complex ML tasks without the need for expensive hardware or having to handle any tedious ML-ops infrastructure yourself. In addition, you also have the power to execute arbitrary Python, giving you a lot of flexibility and control over your ML projects. For instance, you could set up a notification scheme, where a message is sent to a 3rd party service at some point during the execution of one your pipelines.

In this guide, we'll show you how to post your inference images to a Slack channel each time a prediction is made. As our inference model, we'll be using a HuggingFace pretrained stable-diffusion pipeline and building on deploy a stable diffusion pipeline. We'll be updating some of the code laid out in that guide, so make sure you've gone through that example first.

πŸ“˜

Assumes prior knowledge

This guide assumes you are already familiar with deploying HuggingFace diffusers models on PipelineCloud and will build on deploy a stable diffusion pipeline, so make sure you've gone through that first.

NOTE: This is a walkthrough, so many of the below code snippets are mere chunks of a larger script. If you're skimming or just want to see code, then skip to the conclusion where you'll find the complete script.

Background

In deploy a stable diffusion pipeline, we saw how to package a pretrained stable-diffusion HuggingFace pipeline into a deployable unit of code and upload that code to PipelineCloud. This enabled us to perform remote inferences in the cloud by making a HTTP POST request to an endpoint on the pipeline-ai API gateway.

The deployable unit of code is a pipeline-ai pipeline. This is where you define the set of instructions that should be followed when an inference call is made to the endpoint. For instance, in the stable diffusion pipeline, the instructions involved creating a model instance, loading the HF model into memory, parsing the input to the pipeline and then passing it to the model to generate a list of images.

If we then want those images to be posted to Slack, we just need to add an additional step at the end of the pipeline which calls some other pipeline_function which will handle that logic. So that's where we're heading, but we'll get to that later. First, we'll see how to set things up in Slack to allow our Python application to post to a channel and then implement a simple notification client class to interface with Slack. Seeing as we'll be using slack_sdk for this, we'll also need to set up a custom Python environment on PipelineCloud, as it isn't available in any of the public environments.

Posting to a Slack channel

In order to post messages or files to a Slack channel from our own Python application, we'll need to interact with the Slack API in some form or other. There are a few ways to do this, but the simplest is to leverage the Python Slack SDK, which contains a web package with a bunch of methods to interact with various API resources, such as conversations (i.e. channels), chats and files, to name a few.

Setting up the Slack App

The Python application needs to interact with a Slack App, so you'll need to set one up. This guide will walk you through how to setup a basic app in your workspace. The bot token you generate while following this guide, will associate your Python application (the bot) with a Slack app installed in a workspace. You'll need to make sure that you have:

  • A bot token, which we'll save as an environment variable.
  • Added the following to your bot token scopes: chat:write, files:read and files:write. You can always update these later so don't worry, and the Slack API responds with any missing scopes required when you try to perform an action on a given resource.
  • Added your new Slack App to the Slack channel you want your Python application to post to.
  • The ID of the Slack channel, which we'll also save as an environment variable.

πŸ“˜

To get the ID of the Slack channel, you can query the conversations resource:

curl https://slack.com/api/conversations.list -H "Authorization: Bearer xoxb-1234..."

There should be a number of channels in the response so just look for the ID of the channel with the right name.

Once you're sure you have all that, save your bot token and channel ID to a .env file:

SLACK_BOT_TOKEN=xoxb-__fill_me_in__
SLACK_CHANNEL=__fill_me_in__
SLACK_BOT_TOKEN=xoxb-__fill__me__in
SLACK_CHANNEL=C__fill__me__in

Now that we have everything set up on Slack, let's connect to the App with a Python client.

Implementing a Client wrapper

In this section, we'll build out a simple NotificationClient class which wraps around the Slack WebClient. We'll implement 2 methods for posting content to the channel: one for text messages and another for uploading files. Since we'll be interacting with the Slack API using slack_sdk, make sure you have added it to your local environment, using pip, poetry or whichever package manager you prefer. We'll also be loading the .env environment variables automatically using the dotenv package, so you'll want to install that too. This saves us from having to export variables each time when we open a new shell.

import os
import typing as t

from dotenv import load_dotenv
from slack_sdk import WebClient

load_dotenv()


class NotificationClient:
    """
    A wrapper class for posting messages and uploading files to a channel.
    Default client is the Slack WebClient. Inject another client if you'd like,
    but ensure the client interface matches the methods called here.
    """

    def __init__(self, Client=WebClient, token=os.getenv("SLACK_BOT_TOKEN")):
        self.client = Client(token=token)

    def post_message(self, text: str, channel=os.getenv("SLACK_CHANNEL")):
        try:
            self.client.chat_postMessage(text=text, channel=channel)
        except Exception as e:
            raise RuntimeError(f"Error posting message: {e}") from e

    def upload_file(
        self,
        file: t.Union[str, bytes],
        title="New Dreambooth image generated",
        channel: t.Optional[str] = os.getenv("SLACK_CHANNEL"),
    ):
        try:
            self.client.files_upload_v2(
                channel=channel, file=file, title=title, filename=title
            )
        except Exception as e:
            raise RuntimeError(f"Error uploading file: {e}") from e

You can test the client and whether you have correctly set up the Slack App from the previous section, by trying to post a text message to your Slack channel. For instance you could add the following lines to the above code:

if __name__ == "__main__":
    client = NotificationClient()
    client.post_message(text="Hello Slack channel")

and then run the script. If everything went well you should see a new chat message in your Slack channel!

From a Slack perspective, this is pretty much the bulk of what we'll need to post inference images of our stable-diffusion pipeline to the channel. The only additional thing we'll need is to embed logic for calling the upload_file method in the workflow of the stable diffusion pipeline. But we'll get to that later, once we've set up the custom environment.

Creating the remote Python environment

For the pipeline that we'll be developing, we'll need some Python packages that aren't included in the default environment , e.g. the slack_sdk. This means that we'll need to create a new custom environment and add all the required packages. The easiest way to achieve this is by using the pipeline-ai CLI. We recommend that you have the latest version of pipeline-ai installed.

πŸ“˜

First login with our CLI

We will be interacting with the Pipeline API using the CLI and assume you have authenticated. For more information about how to authenticate using the CLI, see our authentication guide

To create a new environment, named sd-slack say, then simply run

pipeline environments create sd-slack

in a shell with your local environment (with pipeline-ai) activated. You can check that it was created successfully by fetching it by name:

pipeline environments get -n sd-slack

Here you should see a response with an empty list of python_requirements, which are the Python packages in your environment.

Create a local requirements.txt file containing the following lines:

transformers==4.26.1
torch==1.13.1
diffusers==0.13.1
accelerate==0.17.1
slack-sdk==3.20.2

and then add all these packages to your custom environment by running:

cat requirements.txt | xargs pipeline environments update -n sd-slack add

You should now see these packages in the environment python_requirements. Note that you'll need the ID of your custom environment when uploading the pipeline to PipelineCloud.

Adding Slack notification

Now that we've set up our Slack client and custom environment, we're ready to integrate slack notification into the stable diffusion pipeline:

import typing as t

from PIL.Image import Image
from pipeline import Pipeline, Variable


PIPELINE_NAME = "sd-dreambooth"

# The `pipeline-ai` pipeline
with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
    # Define pipeline inputs
    input_kwargs = Variable(dict, is_input=True)
    pipeline.add_variables(input_kwargs)

    # Create and load model
    model = SDDreambooth()
    model.load()

    # Feed inputs to model
    context: InputKwargs = model.set_kwargs(input_kwargs)
    images: t.List[Image] = model.predict()

    # Format the images and output result
    formatted_images: t.List[str] = model.format_images(images)
    pipeline.output(formatted_images)

    # Send images to slack
    post_to_channel(formatted_images, context)

Only 2 things have changed from the original pipeline:

  • When we set the kwargs to the model on line 20, we now also return the context of the run so that we can use it elsewhere in the pipeline.
  • We have added a function, post_to_channel, at the end of the pipeline which will be responsible for posting the output of the pipeline, formatted_images, to the Slack channel. We also pass the context of the run to give additional context to the posted images, e.g. the input prompt.

Since we are calling the post_to_channel within a Pipeline context manager and want the runtime values to be passed to the function, we need to decorate it with a pipeline_function decorator:


@pipeline_function
def post_to_channel(images: t.List[str], context: InputKwargs) -> None:
    """
    Send generated images to a (Slack) channel.
    """
    client = NotificationClient()
    prompt = context.get("prompt", "No prompt text")
    title = f"New generation from {PIPELINE_NAME}: {prompt}"
    for image in images:
        client.upload_file(file=base64.b64decode(image), title=title)


Here we simply instantiate the client, grab the prompt from the context and upload each generated image to the Slack channel. You could also post additional context data besides the prompt if you want.

Running the pipeline on Pipeline Cloud

Before we can run the pipeline on Pipeline Cloud, we need to upload it to the servers. Assuming you have authenticated using the CLI , we 'get' the pipeline, before instantiating a connection to Pipeline Cloud and uploading our pipeline:

from pipeline import PipelineCloud

dreambooth_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
api = PipelineCloud()
uploaded_pipeline = api.upload_pipeline(
        dreambooth_pipeline, environment="YOUR_ENVIRONMENT_ID"
)
print(f"Uploaded pipeline id: {uploaded_pipeline.id}")

Just be sure to replace YOUR_ENVIRONMENT_ID with the ID of the custom environment you created previously, which you can get using the CLI:

pipeline environments get -n sd-slack

During this stage, the pipeline-ai library will serialize all your code and post your pipeline to an endpoint for creating pipelines in our main API gateway.

And now we run the pipeline, supplying an input dictionary of type InputKwargs :

run = api.run_pipeline(
    uploaded_pipeline.id,
    {
        "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
        "num_inference_steps": 100
    },
)

After running this you should see your inference images posted to your Slack channel! The first time you run the pipeline, it will take about a minute because the pipeline won't be cached on our servers. Subsequent runs won't be subject to this cold start though and should be pretty speedy! Just make sure you move the run_pipeline call into another script and don't execute the whole script again because you'll be uploading a new pipeline each time :disappointed:.

Conclusion

In this guide, we saw how to connect up our Python application with Slack using the slack_sdk and start posting messages to a Slack channel. We created a notification client wrapper class around the Slack WebClient which could also handle uploading files to the channel. We then updated the stable diffusion pipeline by adding a new pipeline_function call which posts the inference images to the the channel after each prediction.

Complete script

import base64
import io
import os
import random
import typing as t

import numpy as np
import torch
from diffusers.utils import logging
from dotenv import load_dotenv
from PIL.Image import Image
from pipeline import (
    Pipeline,
    PipelineCloud,
    Variable,
    pipeline_function,
    pipeline_model,
)
from slack_sdk import WebClient

load_dotenv()

PIPELINE_NAME = "sd-dreambooth"


class NotificationClient:
    """
    A wrapper class for posting messages and uploading files to a channel.
    Default client is the Slack WebClient. Inject another client if you'd like,
    but ensure the client interface matches the methods called here.
    """

    def __init__(self, Client=WebClient, token=os.getenv("SLACK_BOT_TOKEN")):
        self.client = Client(token=token)

    def post_message(self, text: str, channel=os.getenv("SLACK_CHANNEL")):
        try:
            self.client.chat_postMessage(text=text, channel=channel)
        except Exception as e:
            raise RuntimeError(f"Error posting message: {e}") from e

    def upload_file(
        self,
        file: t.Union[str, bytes],
        title="New Dreambooth image generated",
        channel: t.Optional[str] = os.getenv("SLACK_CHANNEL"),
    ):
        try:
            self.client.files_upload_v2(
                channel=channel, file=file, title=title, filename=title
            )
        except Exception as e:
            raise RuntimeError(f"Error uploading file: {e}") from e


logging.disable_progress_bar()
logging.set_verbosity_error()


# The shape of the input keyword arguments
class InputKwargs(t.TypedDict):
    prompt: str
    num_images_per_prompt: t.Optional[int]
    height: t.Optional[int]
    width: t.Optional[int]
    num_inference_steps: t.Optional[int]
    guidance_scale: t.Optional[float]
    eta: t.Optional[float]
    seed: t.Optional[int]


DEFAULT_KWARGS: InputKwargs = {
    "prompt": "Mountain winds and babbling springs and moonlight seas.",
    "num_images_per_prompt": 1,
    "height": 512,
    "width": 512,
    "num_inference_steps": 50,
    "guidance_scale": 7.5,
    "eta": 0.0,
    "seed": None,
}


@pipeline_model
class SDDreambooth:
    def __init__(self) -> None:
        self.input_kwargs = None
        self.model = None

    @pipeline_function(run_once=True, on_startup=True)
    def load(self) -> None:
        """
        Load the model into memory. The decorator parameters ensure the
        model is loaded only when needed, i.e. it is not cached on the GPU.
        """
        from diffusers import DiffusionPipeline

        device = torch.device("cuda:0")
        self.model = DiffusionPipeline.from_pretrained(
            "sd-dreambooth-library/herge-style"
        )
        self.model.to(device)

    @pipeline_function
    def set_kwargs(self, input_kwargs: InputKwargs) -> InputKwargs:
        """
        Set the model kwargs given the input kwargs.
        These are used in other methods.
        """
        self.input_kwargs = {**DEFAULT_KWARGS, **input_kwargs}
        return self.input_kwargs

    @pipeline_function
    def seed_everything(self) -> int:
        """
        Sets seed for pseudo-random number generators in: pytorch, numpy, python.random.
        `PL_GLOBAL_SEED` ensures the seed is passed to any spawned subprocesses.
        """
        seed = self.input_kwargs.pop("seed") or random.randint(1, 1_000_000)
        os.environ["PL_GLOBAL_SEED"] = str(seed)
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        return seed

    @pipeline_function
    def predict(self) -> t.List[Image]:
        """
        A forward pass through the network given the `input_kwargs`.
        """
        # Ensure the input kwargs have been set
        if self.input_kwargs is None:
            raise TypeError(
                "Input kwargs cannot be None. Set them before calling this method."
            )
        seed = self.seed_everything()
        generator = torch.Generator(device=0).manual_seed(seed)

        images = self.model(**self.input_kwargs, generator=generator).images

        return images

    @pipeline_function
    def to_string(self, image: Image) -> str:
        """
        Converts a `PIL` image to a base64 encoded string.
        """
        buffered = io.BytesIO()
        image.save(buffered, format="JPEG")
        img_str = base64.b64encode(buffered.getvalue()).decode()
        return img_str

    @pipeline_function
    def format_images(self, images: t.List[Image]) -> t.List[str]:
        """
        Formats a list of `PIL` images into a list of base64 encoded strings.
        """
        return [self.to_string(image) for image in images]


@pipeline_function
def post_to_channel(images: t.List[str], context: InputKwargs) -> None:
    """
    Send generated images to a (Slack) channel.
    """
    client = NotificationClient()
    prompt = context.get("prompt", "No prompt text")
    title = f"New generation from {PIPELINE_NAME}: {prompt}"
    for image in images:
        client.upload_file(file=base64.b64decode(image), title=title)


with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
    # Define pipeline inputs
    input_kwargs = Variable(dict, is_input=True)
    pipeline.add_variables(input_kwargs)

    # Create and load model
    model = SDDreambooth()
    model.load()

    # Feed inputs to model
    context: InputKwargs = model.set_kwargs(input_kwargs)
    images: t.List[Image] = model.predict()

    # Format the images and output result
    formatted_images: t.List[str] = model.format_images(images)
    pipeline.output(formatted_images)

    # Send images to slack
    post_to_channel(formatted_images, context)


dreambooth_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)


api = PipelineCloud()
uploaded_pipeline = api.upload_pipeline(
    dreambooth_pipeline, environment="YOUR_ENVIRONMENT_ID"
)
print(f"Uploaded pipeline id: {uploaded_pipeline.id}")

run = api.run_pipeline(
    uploaded_pipeline.id,
    {
        "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
        "num_inference_steps": 25,
    },
)