GuidesAPI reference
DiscordDashboard
DiscordDashboard

Langchain with Pipeline Catalyst

How to create a Langchain conversation chain with an LLM deployed to Catalyst.

Background

Langchain is a library which provides utility modules and interface modules to other libraries, in order to build fully-fledged Large Language Model (LLM) based applications. As a standalone model, an LLM is only really responsible for predicting the text that follows some other input string of text. No state is ever stored on the LLM and each inference request is independent. However, if you want to build things like chatbots, or something that uses search or external data, then often the LLM alone won't be enough and you'll have to implement a lot of application logic yourself to achieve this. Although this is certainly doable, things can get complicated pretty quickly. Langchain attempts to abstract a lot of these tedious tasks away by offering a set of utility modules (e.g. for building prompt templates and managing conversational memory) and other modules which integrate with a whole spectrum of 3rd party tools (e.g. LLM providers and vector store providers). Plus, the open source community has really picked up on it being an exciting project, it has been moving at a really fast pace and it is quickly becoming the standard for building apps around LLMs.

Pipeline Catalyst integrates directly with Langchain through an LLM integration module. This means that you can use your own LLMs that you have deployed to Catalyst, as you would any other LLM in Langchain. In this walkthrough, we'll show you how to deploy a custom LLM on Pipeline Catalyst and then use that LLM within Lanchain to start building your own chat application.

Deploying an LLM to Catalyst

In this section we'll show you a simple way to deploy Huggingface-hosted LLMs to Pipeline Catalyst. This guide assumes you are already somewhat familiar with how to deploy a Huggingface model, so we won't be going into as much detail here. Once we have deployed the LLM, we'll then be ready to start making inference calls to it from Langchain.

As our LLM, we'll deploy a Flan-t5 model developed by Google. This model may not be the most appropriate for conversation-based applications but we are more concerned with showcasing the overall procedure followed here, which should work for most of the LLMs hosted on HuggingFace. So, as better LLMs become accessible on HuggingFace you should be able to swap those out for the Flan-t5 model pretty seamlessly.

Creating the core pipeline_model

In order to deploy the LLM to Catalyst, we need to create a wrapper class around the Huggingface model and decorate the class with the pipeline_model decorator, as follows:

from pipeline import Pipeline, pipeline_model, pipeline_function, Variable
import torch

PIPELINE_NAME = "google/flan-t5"
HF_MODEL_NAME = f"{PIPELINE_NAME}-xl"


@pipeline_model
class FlanModel:
    model = None
    tokenizer = None
    device = torch.device("cuda")

    @pipeline_function(on_startup=True, run_once=True)
    def load(self) -> None:
        """Load the pretrained model and tokenizer into memory.
        Decorator parameters ensure that loading doesn't occur when the 
        pipeline is already cached.
        """
        from transformers import AutoModelForSeq2SeqLM, AutoTokenizer

        self.model = AutoModelForSeq2SeqLM.from_pretrained(HF_MODEL_NAME).to(
            self.device
        )
        self.tokenizer = AutoTokenizer.from_pretrained(HF_MODEL_NAME)


    @pipeline_function
    def predict(self, prompt: str, model_kwargs: dict) -> list:
        """Generates a text prediction given an input prompt and model kwargs."""
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        outputs = self.model.generate(**inputs, **model_kwargs)
        return self.tokenizer.batch_decode(outputs, skip_special_tokens=True)

where we have defined, model, tokenizer and device class attributes and 2 instance methods: load and predict. Feel free to store the class attributes as instance attributes instead if you'd prefer. Both methods are decorated by the pipeline_function decorator, because these will be called explicitly within the pipeline builder (computational graph) defined below. The load method, downloads the model and tokenizer from Huggingface and loads them into memory. The on_startup=True passed to the decorator ensures that it is always called at the start of the pipeline and the run_once=True ensures that it is only run the first time after the pipeline has first been loaded in memory. In practice, this means that it will not be called if the pipeline is already cached. The predict method is what we will be passing our prompt to in order to generate text predictions. Notice that we have invoked the pytorch .to method, on both the model in load and the tokenizer in predict, passing a cuda device in order to ensure that the tensors are sent to GPU.

Creating the pipeline

Now that we have defined our pipeline_model which implements the logic for loading the HuggingFace model into memory and generating predictions, we need to create a template for the computational flow which should occur at runtime:

# Configure the pipeline, i.e. computational graph
with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=12000) as builder:
    # Bind inputs to the pipeline
    prompt = Variable(str, is_input=True)
    model_kwargs = Variable(dict, is_input=True)
    builder.add_variables(prompt, model_kwargs)
    
    # Instantiate and load the model
    model = FlanModel()
    model.load()
    
    # Generate a prediction
    output = model.predict(prompt, model_kwargs)
    builder.output(output)

After roughly estimating the required GPU memory for the model, we set the min_gpu_vram_mb. This ensures that the routing system will not route a run to a worker that does not have sufficient memory to compute the run. If you have your own GPU, you can get an estimate of how much VRAM you need by running torch.cuda.memory_allocated(self.device) before and after loading the models and computing the difference.

Within the context manager, we define the prompt and model_kwargs input variables and bind them to the pipeline. This means that when we run the pipeline, the pipeline will expect the prompt string as the first input and then a dictionary of parameters as the second input. As we'll see later, the Langchain PipelineAI LLM class expects this kind of signature, so it's important we set up the pipeline inputs in this way.

After binding the inputs, we then instantiate the FlanModel, call the load method and pass the inputs to the predict method to generate a text prediction. Finally, we set the output of the pipeline to that result.

Uploading the pipeline

Now that we have constructed the blueprint for our pipeline using the Pipeline context manager, we are ready to upload the pipeline to Catalyst. To do so, we can make use of the PipelineCloud client which will handle all the heavy lifting for us. Simply create a new client instance, passing your Pipeline API token, get the computation graph and upload it using the upload_pipeline method on the client, as follows:

from pipeline import PipelineCloud

client = PipelineCloud(token="YOUR_PIPELINE_API_KEY")

flan_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
uploaded_pipeline = client.upload_pipeline(flan_pipeline, environment="environment_4b7c7117bf8848dc97872c74c8414de1")
print(uploaded_pipeline.id)

Notice that we have set the Python runtime environment ID,environment_4b7c7117bf8848dc97872c74c8414de1, that should be used when executing the pipeline. This corresponds to the public environment, public/mystic-default-20230406, which is more up to date than the default environment. You can check out all the available public environments, on the environments page of the Catalyst dashboard .

After your pipeline has been uploaded successfully, take note of the uploaded_pipeline.id. You can always find this in the most recent pipeline in the "Deployed Pipelines" table on the home page of the dashboard.

Full snippet

from pipeline import Pipeline, pipeline_model, pipeline_function, Variable
import torch

PIPELINE_NAME = "google/flan-t5"
HF_MODEL_NAME = f"{PIPELINE_NAME}-xl"


@pipeline_model
class FlanModel:
    model = None
    tokenizer = None
    device = torch.device("cuda")

    @pipeline_function(on_startup=True, run_once=True)
    def load(self) -> None:
        """Load the pretrained model and tokenizer into memory.
        Decorator parameters ensure that loading doesn't occur when the 
        pipeline is already cached.
        """
        from transformers import AutoModelForSeq2SeqLM, AutoTokenizer

        self.model = AutoModelForSeq2SeqLM.from_pretrained(HF_MODEL_NAME).to(
            self.device
        )
        self.tokenizer = AutoTokenizer.from_pretrained(HF_MODEL_NAME)


    @pipeline_function
    def predict(self, prompt: str, model_kwargs: dict) -> list:
        """Generates a text prediction given an input prompt and model kwargs."""
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        outputs = self.model.generate(**inputs, **model_kwargs)
        return self.tokenizer.batch_decode(outputs, skip_special_tokens=True)

# Configure the pipeline, i.e. computational graph
with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=12000) as builder:
    # Bind inputs to the pipeline
    prompt = Variable(str, is_input=True)
    model_kwargs = Variable(dict, is_input=True)
    builder.add_variables(prompt, model_kwargs)
    
    # Instantiate and load the model
    model = FlanModel()
    model.load()
    
    # Generate a prediction
    output = model.predict(prompt, model_kwargs)
    builder.output(output)
    
from pipeline import PipelineCloud

client = PipelineCloud(token="YOUR_PIPELINE_API_KEY")

flan_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
uploaded_pipeline = client.upload_pipeline(flan_pipeline, environment="environment_4b7c7117bf8848dc97872c74c8414de1")
print(uploaded_pipeline.id)

Creating a pipeline tag (optional)

If you want to deploy different versions of the Flan-t5 model, e.g. the small version say, it is useful to create a tag for each Flan-t5 pipeline. That way all your pipelines can have the same name, but you can distinguish them with different tags, similar to how images are tagged in Docker. For instance, 3 different Flan-t5 pipelines, all named google/flan-t5 could be tagged as follows:

  • google/flan-t5:xl
  • google/flan-t5:small
  • google/flan-t5:base

🚧

Tag prefix must match pipeline name

The prefix of the tag, which is everything that comes before the colon symbol :, must match the actual name of the pipeline itself, which here is google/flan-t5.

In Langchain, you can reference a Catalyst pipeline by its tag, which is a lot more memorable than an ID. Let's create a tag for the Flan pipeline we just uploaded. Since we uploaded the xl version, a good tag would be google/flan-t5:xl. The easiest way to create a pipeline tag, is by using the pipelineCLI.

📘

First login with our CLI

We will be interacting with the Pipeline API using the CLI and assume you have authenticated. For more information about how to authenticate using the CLI, see our authentication guide

Once you have logged in using the CLI, find the ID of the Flan pipeline you just uploaded and create a tag, by running the following command:

pipeline tags create FLAN_PIPELINE_ID google/flan-t5:xl

replacing FLAN_PIPELINE_ID accordingly. You can then check that your tag has been successfully created by tying to fetch it:

pipeline tags get google/flan-t5:xl

Calling the LLM in Langchain

Now that we have uploaded our pipeline to Catalyst, we are ready to integrate it in Langchain. As mentioned in the introduction, Pipeline Catalyst integrates directly with Langchain through an LLM integration module. This means that you can use your own LLMs that you have deployed to Catalyst, as you would any other LLM in Langchain. To illustrate this, we'll show you how to run 2 basic chains using the Flan-t5 LLM we just deployed.

The PipelineAI LLM wrapper

The connection with Pipeline Catalyst is achieved through the PipelineAI class.

from langchain.llms import PipelineAI

You can configure your Pipeline API key either by setting it as an environment variable

import os

os.environ["PIPELINE_API_KEY"] = "YOUR_PIPELINE_API_TOKEN"

or passing it directly when you create the LLM PipelineAI(pipeline_api_key="YOUR_PIPELINE_API_TOKEN", ...). Creating an instance of PipelineAI gets you a Langchain LLM:

flan_llm = PipelineAI(
    pipeline_key="google/flan-t5:xl",
    pipeline_kwargs=dict(temperature=1.0),
)

which you can later use by running it directly or injecting it into a chain. Note that creating an instance doesn't make any API calls to Catalyst, but simply configures the LLM, such as setting your pipeline API key, the identifier of the LLM and other keyword arguments. API calls to Catalyst are only made when running the LLM or a chain that uses the LLM. In the above code snippet, pipeline_key can be the ID of the pipeline deployed on Catalyst, or a valid tag which points to the pipeline. Here we have used the tag google/flan-t5:xl that we created previously. The pipeline_kwargs represent any additional parameters that you would like to pass to the LLM when it is run.

Under the hood, the following line of code is executed by Langchain when your LLM is run: PipelineCloud().run_pipeline(self.pipeline_key, [prompt, pipeline_kwargs]). So when constructing your pipeline on Catalyst, you just need to ensure that the pipeline input variables match this expected interface if you want to pass LLM parameters, such as temperature, from Langchain to Catalyst. This is why we previously configured pipeline variables through:

   	# Bind inputs to the pipeline
    prompt = Variable(str, is_input=True)
    model_kwargs = Variable(dict, is_input=True)
    pipeline_builder.add_variables(prompt, model_kwargs)

Running an LLM chain

Let's now run the Flan-t5 LLM using Langchain. To begin with, we'll run the LLM within an LLMChain, with a formatted prompt template and then see how to create a simple chatbot in the next section.

We'll create a prompt template for generating company names:

from langchain import PromptTemplate

template = """
I want you to act as a naming consultant for new companies.
What is a good name for a company that makes {product}?
"""

prompt = PromptTemplate(template=template, input_variables=["product"])

We then construct an LLMChain, passing the prompt and the flan_llm to the chain:

from langchain import LLMChain

llm_chain = LLMChain(prompt=prompt, llm=flan_llm)

We then run the chain, by calling the run method on the chain:

output = llm_chain.run("colorful shoes")
print(output)

Which in our case, generated sassy shoes. Note that if the model isn't cached on our servers then you'll probably get a timeout on the first inference call, seeing as the model is quite large and it'll probably take over a minute to cache. Subsequent calls should be pretty speed though, of the order ~500ms.

Full Snippet

import os
from langchain.llms import PipelineAI
from langchain import PromptTemplate, LLMChain

os.environ["PIPELINE_API_KEY"] = "YOUR_PIPELINE_API_TOKEN"

template = """
I want you to act as a naming consultant for new companies.
What is a good name for a company that makes {product}?
"""

prompt = PromptTemplate(template=template, input_variables=["product"])

flan_llm = PipelineAI(
    pipeline_key="google/flan-t5:xl",
    pipeline_kwargs=dict(temperature=1.0),
)

llm_chain = LLMChain(prompt=prompt, llm=flan_llm)

output = llm_chain.run("colorful shoes")
print(output)

Running a conversation chain

Langchain makes is very easy to set up a simple chat bot using an LLM, with various utilities for managing chat history and memory in order to operate and format future prompts. For instance, within your conversation chain you could have a summarisation chain which passes the chat history to an LLM and summarises it, and then pass that summary with the next prompt to your conversation LLM.

Here we'll only show the most basic form of conversation chain and leave it to you to take this further. In order to generate conversations, we need some way to manage and update the chat history. To do so, we'll use the ConversationBufferMemory:

from langchain.memory import ConversationBufferMemory

memory = ConversationBufferMemory()

Then we create a conversation chain, passing the LLM and memory as parameters:

from langchain.chains import ConversationChain

conversation = ConversationChain(llm=flan_llm, verbose=True, memory=memory)

where we have set verbose=True so that we get a bit more insight into the actual full prompt that gets passed to the LLM. Start a Python shell and import conversation and you can start prompting the model by running conversation.predict("your prompt string") in succession. For instance, below is an excerpt of a conversation using the deployed Flan-t5 LLM:

The following is a friendly conversation between a human and an AI. The AI is talkative and provides lots of specific details from its context. If the AI does not know the answer to a question, it truthfully says it does not know.

Current conversation:
Human: Hey there, I'm Plutopulp.
AI: I'm a robot.
Human: Oh OK. What brings you here today?
AI: I'm here to find out about the world.
Human: Ah nice. What have you found out so far?
AI: I've found out that the world is a place where people can learn about themselves.
Human: What have you learned about yourself?
AI: I've learned that I'm a robot.
Human: How did you learn that?
AI: I've learned that by talking to people.
Human: What have you learned about me so far?
AI: I've learned that you are a human.
Human: Did you learn about my name?
AI: Yes, I've learned that your name is Plutopulp.

Full Snippet

from langchain.memory import ConversationBufferMemory
from langchain.llms import PipelineAI
from langchain.chains import ConversationChain

os.environ["PIPELINE_API_KEY"] = "YOUR_PIPELINE_API_TOKEN"

memory = ConversationBufferMemory()


flan_llm = PipelineAI(
    pipeline_key="google/flan-t5:xl",
    pipeline_kwargs=dict(temperature=0.9),
)

conversation = ConversationChain(llm=flan_llm, verbose=True, memory=memory)