GuidesAPI reference
DiscordDashboard
DiscordDashboard

Create and deploy a general pipeline

How to use the Pipeline Library to create pipelines from any runnable function/model.

Building a Pipeline with Pipeline library

Basic imports

from pipeline import (
    Pipeline,
    PipelineFile,
    Variable,
    pipeline_function,
    pipeline_model,
)

Pipeline - The main class that defines our compute pipeline.

PipelineFile - Pipeline's file storage accessor for uploading files (e.g. model weights, ONNX files etc.)

Variable - Used to define input variables for pipelines.

pipeline_function - A decorator used to mark pipeline functions and verify the wrapped function correctly parses input/output data. It observes typing syntax used for each variable input to the function and the function's output.

pipeline_model - Wrapper for a class representing a machine learning model.

Pipelines of functions

Here's an example of creating a Pipeline that only uses basic math functions. We will also cover running a pipeline locally and in the cloud at the end of this section.

from pipeline import Pipeline, pipeline_function, Variable

PIPELINE_NAME = "maths-is-fun"

@pipeline_function
def square(a: float) -> float:
    return a ** 2


@pipeline_function
def minus(a: float, b: float) -> float:
    return a - b


@pipeline_function
def multiply(a: float, b: float) -> float:
    return a * b


with Pipeline(PIPELINE_NAME) as pipeline:
    flt_1 = Variable(type_class=float, is_input=True)
    flt_2 = Variable(type_class=float, is_input=True)
    pipeline.add_variables(flt_1,flt_2)

    sq_1 = square(flt_1)
    res_1 = multiply(flt_2, sq_1)
    res_2 = minus(res_1, sq_1)
    sq_2 = square(res_2)
    res_3 = multiply(flt_2, sq_2)
    res_4 = minus(res_3, sq_1)
    pipeline.output(res_2, res_4)

complete_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)

print(complete_pipeline.run(5.0, 6.0))

We have created 3 math functions square,minus,multiply that we wish to use inside of our pipeline. Each must be wrapped in the pipeline_function wrapper in order to do so and require type hints for the inputs and outputs of each function.

@pipeline_function
def square(a: float) -> float:

The Pipeline itself is assembled inside the context manager. Here we create the context manager and give the name "maths-is-fun" to the pipeline in the example.

with Pipeline(PIPELINE_NAME) as pipeline:

Next we define what inputs to expect for the Pipeline using the Variable function for each input. Type must be provided for each. The method add_variable is then called to formally add the input variables we've created to the Pipeline.

    flt_1 = Variable(type_class=float, is_input=True)
    flt_2 = Variable(type_class=float, is_input=True)
    pipeline.add_variables(flt_1,flt_2)

Now we can set the operations of the Pipeline which use our wrapped pipeline functions. And declare the pipeline output using the output method.

    sq_1 = square(flt_1)
    res_1 = multiply(flt_2, sq_1)
    res_2 = minus(res_1, sq_1)
    sq_2 = square(res_2)
    res_3 = multiply(flt_2, sq_2)
    res_4 = minus(res_3, sq_1)
    pipeline.output(res_2, res_4)

Finally we can retrieve the constructed pipeline by calling Pipeline's get_pipeline method on the given name of our example pipeline.

complete_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)

We now have our complete pipeline ready to run locally or in the cloud. Local runs are performed like this:

complete_pipeline.run(5.0, 6.0)

To run in pipeline cloud, we have a few more steps. We must

  1. import PipelineCloud
  2. instantiate the api with your api token.
  3. upload your pipeline to pipeline cloud
  4. run in pipeline cloud

NOTE: Pipeline Cloud Upload will fail if you perform a local run beforehand in the same script, so make sure not to do local runs in the same script as your pipeline upload.

from pipeline import (PipelineCloud)
api = PipelineCloud(token="pipeline_token_value")
uploaded_pipeline = api.upload_pipeline(complete_pipeline)
api.run_pipeline(
    uploaded_pipeline,
    [5.0, 6.0],
)

NOTE: the inputs to cloud runs api.run_pipeline are the inputs to local run Pipeline.run in a list.

The returned outputs from pipelines also come in a list, so you may need to parse the data to get your expected output, for example like:

[parsed_output] = complete_pipeline.run(5.0, 6.0)

The result object

Result object from PipelineCloud.run_pipeline follows the schema RunGet in pipeline package.

The notable metadata are id which is the id of the pipeline run. run_state which tells us via enums the state of the run. If completed successfully (no errors on resource) will return RunState.COMPLETE, if failed will return RunState.FAILED.

The result itself can be found in the result_preview.

result = api.run_pipeline(...)
print(result.result_preview)

However this has a 2mb limit. If your output exceeds that you will need to make a further api call to retrieve your result

api.download_result(result)

Pipelines of models

Here's an example of creating a Pipeline that uses a pipeline_model, this is the general form of creating a Pipeline for machine learning models.

from pipeline import (
    Pipeline,
    Variable,
    pipeline_function,
    pipeline_model,
)

PIPELINE_NAME = "hugging-face-classifier"

@pipeline_model
class model:
    def __init__(self):
        self.session = None

    @pipeline_function(run_once=True, on_startup=True)
    def load(self) -> bool:
        from transformers import pipeline
        self.session = pipeline(model="roberta-large-mnli")
        return True

    @pipeline_function
    def predict(self, input: list) -> list:
        return self.session(input)
        
with Pipeline(PIPELINE_NAME) as pipeline:
    input = Variable(list, is_input=True)

    pipeline.add_variables(
        input,
    )

    model = model()
    model.load()

    output = model.predict(
        input,
    )

    pipeline.output(output)

hf_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)

# run locally
[output] = hf_pipeline.run(["I love Halloween"])
print(output)

The general structure of this Pipeline creation is similar to the previous example with a few changes that I'll go through.

We are importing the pipeline_model decorator which we use to wrap a class.

from pipeline import (
    pipeline_model,
)

@pipeline_model
class model:

This class is used to hold the machine learning model that we wish to use in a pipeline.

Inside the class we have an **init** function and pipeline_functions.

Code within the **init** function is run on upload whereas the code inside the pipeline_functions are serialised and only run in the cloud.

We have defined a load pipeline_function with args run_once=True, on_startup=True and returns a bool, True when finished.

@pipeline_function(run_once=True, on_startup=True)
def load(self) -> bool:

run_once=True specifies for the load function to be run only once while it exists in the cache of a GPU. This means the model will not need to be loaded after it is initially cached on the GPU.

on_startup=True ensures the pipeline_function is executed at the start of a pipeline run, regardless of when it's placed when defining the pipeline.

Inside the load function we load the ML model and attach the inference session to the class's self.model attribute so we can later call it in the predict function.

    @pipeline_function(run_once=True, on_startup=True)
    def load(self) -> bool:        
				from transformers import pipeline
        self.session = pipeline(model="roberta-large-mnli")
        return True

Note: Make sure to import any packages you need inside the function itself so that those imports will be executed on the worker resource when you run in the cloud.

The predict function runs the loaded session with the input args provided to it.

@pipeline_function
def predict(self, input: list) -> list:
    return self.session(input)

The Pipeline construction context is almost the same as before. Here we instantiate the model class and call the model.load() function and set the output to model.predict(...) of the input Variable

with Pipeline(PIPELINE_NAME) as pipeline:
    input = Variable(list, is_input=True)

    pipeline.add_variables(
        input,
    )

    model = model()
    model.load()

    output = model.predict(
        input,
    )

    pipeline.output(output)


Pipelines using files

Here's an example of using PipelineFile

from pipeline import (
    Pipeline,
    PipelineFile,
    Variable,
    pipeline_function,
    pipeline_model,
)

@pipeline_model
class model:
    def __init__(self):
        self.session = None

    import numpy as np

    @pipeline_function
    def predict(self, onnx_output: list, onnx_input: dict = {}) -> list:
        res = self.session.run(onnx_output, onnx_input)
        return res[0].tolist()

    @pipeline_function(run_once=True, on_startup=True)
    def load(self, onnx_file: PipelineFile) -> bool:
        import onnxruntime

        self.session = onnxruntime.InferenceSession(
            onnx_file.path,
            providers=[
                "CUDAExecutionProvider",
            ],
        )
        return True

with Pipeline("onnx") as pipeline:
    onnx_file = PipelineFile(path='local_path_to_onnxfile')
    onnx_output = Variable(list, is_input=True)
    onnx_input = Variable(dict, is_input=True)

    pipeline.add_variables(
        onnx_file,
        onnx_output,
        onnx_input,
    )

    model = model()
    model.load(onnx_file)

    output = model.predict(
        onnx_output,
        onnx_input,
    )

    pipeline.output(output)

onnx_pipeline = Pipeline.get_pipeline("onnx")

Here we use PipelineFile to upload an onnx file containing the onnx model we want to run inference on.

To do so we are importing PipelineFile

from pipeline import PipelineFile

and using it similarly to how Variable is used.

Inside the Pipeline constructor context manager:

with Pipeline("onnx") as pipeline:  
    onnx_file = PipelineFile(path='local_path_to_onnxfile')

    pipeline.add_variables(
        onnx_file,
    )

    model = model()
    model.load(onnx_file)

The onnx_file can then be used as an input arg to a PipelineFunction the load function in this case.

Once uploaded, the ".path" attribute of the PipelineFile object will be updated automatically to the path to the file inside the cloud file storage system.