pipeline.pipeline_function
pipeline_function - class decorator
pipeline.objects.decorators.pipeline_function(
function: Any = None,
*,
run_once: str = None,
on_startup: bool = False,
)
Description
The pipeline_function
decorator is used on runnable functions to allow them to be used by the Pipeline
context manager.
It also prevents the wrapped functions execution when it is called inside of the context manager. Instead of calling the wrapped function, a GraphNode
object is created and entered into the Graph
. The functions can still be used as normal outside of the context manager. The two instances where you can run the function when wrapped are described below:
- Inside of the
Pipeline
context manager- Only
Variable
object types can be passed in. kwargs
are currently not treated separately to args, variables are passed in by position (kwarg support coming).*
for explicitkwarg
inputs is not supported.
- Only
- Normal execution outside of the
Pipeline
context manager- Any
args
orkwargs
can be passed in as normal.
- Any
It is essential to use proper typing descriptions on an function. Typing is used by the Graph engine to determine if an input or output of the function is of a compatible type when running.
Parameters
function
(Any
, optional) - The function to be wrapped, this is implicitly passed in you do not manually pass this.run_once
(str
, optional) - If true, thepipeline_function
will only be called once if the pipeline is sequentially run.on_startup
(bool
, optional) - If true then the function is called at the start of the pipeline regardless of where it is declared in thePipeline
context manager.- Note: the only
Variable
object that can be passed in ifon_startup=True
is thePipelineFile
object currently. More will be supported in the future.
- Note: the only
Examples
from pipeline import Pipeline, Variable, pipeline_function
@pipeline_function
def add_numbers(a: int, b: int) -> int:
return a + b
c = add_numbers(3, 4)
# c = 7
with Pipeline("add-numbers") as pipeline:
a = Variable(type_class=int, is_input=True)
b = Variable(type_class=int, is_input=True)
pipeline.add_variables(a, b)
c = add_numbers(a, b)
# c is equal to a Variable object returned from add_numbers
pipeline.output(c)
Multi variable outputs
It is possible to output multiple variables from a pipeline_function
inside of the context manager by using a Tuple
:
from typing import Tuple
@pipeline_function
def test_function() -> Tuple[str, int]:
return ("test", 1)
with Pipeline("test") as builder:
var1, var2 = test_function() # Must split variables, cannot output raw Tuple
builder.output(var2, var1)
test_pl = Pipeline.get_pipeline("test")
outputs = test_pl.run()
assert outputs == [1, "test"]
Updated 10 months ago