Create and deploy a general pipeline
How to use the Pipeline Library to create pipelines from any runnable function/model.
Building a Pipeline with Pipeline library
Basic imports
from pipeline import (
Pipeline,
PipelineFile,
Variable,
pipeline_function,
pipeline_model,
)
Pipeline
- The main class that defines our compute pipeline.
PipelineFile
- Pipeline's file storage accessor for uploading files (e.g. model weights, ONNX files etc.)
Variable
- Used to define input variables for pipelines.
pipeline_function
- A decorator used to mark pipeline functions and verify the wrapped function correctly parses input/output data. It observes typing syntax used for each variable input to the function and the function's output.
pipeline_model
- Wrapper for a class representing a machine learning model.
Pipelines of functions
Here's an example of creating a Pipeline that only uses basic math functions. We will also cover running a pipeline locally and in the cloud at the end of this section.
from pipeline import Pipeline, pipeline_function, Variable
PIPELINE_NAME = "maths-is-fun"
@pipeline_function
def square(a: float) -> float:
return a ** 2
@pipeline_function
def minus(a: float, b: float) -> float:
return a - b
@pipeline_function
def multiply(a: float, b: float) -> float:
return a * b
with Pipeline(PIPELINE_NAME) as pipeline:
flt_1 = Variable(type_class=float, is_input=True)
flt_2 = Variable(type_class=float, is_input=True)
pipeline.add_variables(flt_1,flt_2)
sq_1 = square(flt_1)
res_1 = multiply(flt_2, sq_1)
res_2 = minus(res_1, sq_1)
sq_2 = square(res_2)
res_3 = multiply(flt_2, sq_2)
res_4 = minus(res_3, sq_1)
pipeline.output(res_2, res_4)
complete_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
print(complete_pipeline.run(5.0, 6.0))
We have created 3 math functions square
,minus
,multiply
that we wish to use inside of our pipeline. Each must be wrapped in the pipeline_function
wrapper in order to do so and require type hints for the inputs and outputs of each function.
@pipeline_function
def square(a: float) -> float:
The Pipeline itself is assembled inside the context manager. Here we create the context manager and give the name "maths-is-fun" to the pipeline in the example.
with Pipeline(PIPELINE_NAME) as pipeline:
Next we define what inputs to expect for the Pipeline using the Variable function for each input. Type must be provided for each. The method add_variable
is then called to formally add the input variables we've created to the Pipeline.
flt_1 = Variable(type_class=float, is_input=True)
flt_2 = Variable(type_class=float, is_input=True)
pipeline.add_variables(flt_1,flt_2)
Now we can set the operations of the Pipeline which use our wrapped pipeline functions. And declare the pipeline output using the output
method.
sq_1 = square(flt_1)
res_1 = multiply(flt_2, sq_1)
res_2 = minus(res_1, sq_1)
sq_2 = square(res_2)
res_3 = multiply(flt_2, sq_2)
res_4 = minus(res_3, sq_1)
pipeline.output(res_2, res_4)
Finally we can retrieve the constructed pipeline by calling Pipeline
's get_pipeline
method on the given name of our example pipeline.
complete_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
We now have our complete pipeline ready to run locally or in the cloud. Local runs are performed like this:
complete_pipeline.run(5.0, 6.0)
To run in pipeline cloud, we have a few more steps. We must
- import PipelineCloud
- instantiate the api with your api token.
- upload your pipeline to pipeline cloud
- run in pipeline cloud
NOTE: Pipeline Cloud Upload will fail if you perform a local run beforehand in the same script, so make sure not to do local runs in the same script as your pipeline upload.
from pipeline import (PipelineCloud)
api = PipelineCloud(token="pipeline_token_value")
uploaded_pipeline = api.upload_pipeline(complete_pipeline)
api.run_pipeline(
uploaded_pipeline,
[5.0, 6.0],
)
NOTE: the inputs to cloud runs api.run_pipeline
are the inputs to local run Pipeline.run
in a list.
The returned outputs from pipelines also come in a list, so you may need to parse the data to get your expected output, for example like:
[parsed_output] = complete_pipeline.run(5.0, 6.0)
The result object
Result object from PipelineCloud.run_pipeline
follows the schema RunGet
in pipeline package.
The notable metadata are id
which is the id of the pipeline run. run_state
which tells us via enums the state of the run. If completed successfully (no errors on resource) will return RunState.COMPLETE
, if failed will return RunState.FAILED
.
The result itself can be found in the result_preview.
result = api.run_pipeline(...)
print(result.result_preview)
However this has a 2mb limit. If your output exceeds that you will need to make a further api call to retrieve your result
api.download_result(result)
Pipelines of models
Here's an example of creating a Pipeline that uses a pipeline_model
, this is the general form of creating a Pipeline for machine learning models.
from pipeline import (
Pipeline,
Variable,
pipeline_function,
pipeline_model,
)
PIPELINE_NAME = "hugging-face-classifier"
@pipeline_model
class model:
def __init__(self):
self.session = None
@pipeline_function(run_once=True, on_startup=True)
def load(self) -> bool:
from transformers import pipeline
self.session = pipeline(model="roberta-large-mnli")
return True
@pipeline_function
def predict(self, input: list) -> list:
return self.session(input)
with Pipeline(PIPELINE_NAME) as pipeline:
input = Variable(list, is_input=True)
pipeline.add_variables(
input,
)
model = model()
model.load()
output = model.predict(
input,
)
pipeline.output(output)
hf_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
# run locally
[output] = hf_pipeline.run(["I love Halloween"])
print(output)
The general structure of this Pipeline creation is similar to the previous example with a few changes that I'll go through.
We are importing the pipeline_model
decorator which we use to wrap a class.
from pipeline import (
pipeline_model,
)
@pipeline_model
class model:
This class is used to hold the machine learning model that we wish to use in a pipeline.
Inside the class we have an **init**
function and pipeline_function
s.
Code within the **init**
function is run on upload whereas the code inside the pipeline_function
s are serialised and only run in the cloud.
We have defined a load
pipeline_function
with args run_once=True, on_startup=True
and returns a bool, True
when finished.
@pipeline_function(run_once=True, on_startup=True)
def load(self) -> bool:
run_once=True
specifies for the load
function to be run only once while it exists in the cache of a GPU. This means the model will not need to be loaded after it is initially cached on the GPU.
on_startup=True
ensures the pipeline_function
is executed at the start of a pipeline run, regardless of when it's placed when defining the pipeline.
Inside the load function we load the ML model and attach the inference session to the class's self.model attribute so we can later call it in the predict
function.
@pipeline_function(run_once=True, on_startup=True)
def load(self) -> bool:
from transformers import pipeline
self.session = pipeline(model="roberta-large-mnli")
return True
Note: Make sure to import any packages you need inside the function itself so that those imports will be executed on the worker resource when you run in the cloud.
The predict function runs the loaded session with the input args provided to it.
@pipeline_function
def predict(self, input: list) -> list:
return self.session(input)
The Pipeline construction context is almost the same as before. Here we instantiate the model class and call the model.load()
function and set the output to model.predict(...)
of the input Variable
with Pipeline(PIPELINE_NAME) as pipeline:
input = Variable(list, is_input=True)
pipeline.add_variables(
input,
)
model = model()
model.load()
output = model.predict(
input,
)
pipeline.output(output)
Pipelines using files
Here's an example of using PipelineFile
from pipeline import (
Pipeline,
PipelineFile,
Variable,
pipeline_function,
pipeline_model,
)
@pipeline_model
class model:
def __init__(self):
self.session = None
import numpy as np
@pipeline_function
def predict(self, onnx_output: list, onnx_input: dict = {}) -> list:
res = self.session.run(onnx_output, onnx_input)
return res[0].tolist()
@pipeline_function(run_once=True, on_startup=True)
def load(self, onnx_file: PipelineFile) -> bool:
import onnxruntime
self.session = onnxruntime.InferenceSession(
onnx_file.path,
providers=[
"CUDAExecutionProvider",
],
)
return True
with Pipeline("onnx") as pipeline:
onnx_file = PipelineFile(path='local_path_to_onnxfile')
onnx_output = Variable(list, is_input=True)
onnx_input = Variable(dict, is_input=True)
pipeline.add_variables(
onnx_file,
onnx_output,
onnx_input,
)
model = model()
model.load(onnx_file)
output = model.predict(
onnx_output,
onnx_input,
)
pipeline.output(output)
onnx_pipeline = Pipeline.get_pipeline("onnx")
Here we use PipelineFile to upload an onnx file containing the onnx model we want to run inference on.
To do so we are importing PipelineFile
from pipeline import PipelineFile
and using it similarly to how Variable
is used.
Inside the Pipeline constructor context manager:
with Pipeline("onnx") as pipeline:
onnx_file = PipelineFile(path='local_path_to_onnxfile')
pipeline.add_variables(
onnx_file,
)
model = model()
model.load(onnx_file)
The onnx_file
can then be used as an input arg to a PipelineFunction
the load
function in this case.
Once uploaded, the ".path" attribute of the PipelineFile
object will be updated automatically to the path to the file inside the cloud file storage system.
Updated 6 months ago