pipeline_function - class decorator
pipeline.objects.decorators.pipeline_function( function: Any = None, *, run_once: str = None, on_startup: bool = False, )
pipeline_function decorator is used on runnable functions to allow them to be used by the
Pipeline context manager.
It also prevents the wrapped functions execution when it is called inside of the context manager. Instead of calling the wrapped function, a
GraphNode object is created and entered into the
Graph. The functions can still be used as normal outside of the context manager. The two instances where you can run the function when wrapped are described below:
- Inside of the
Variableobject types can be passed in.
kwargsare currently not treated separately to args, variables are passed in by position (kwarg support coming).
kwarginputs is not supported.
- Normal execution outside of the
kwargscan be passed in as normal.
It is essential to use proper typing descriptions on an function. Typing is used by the Graph engine to determine if an input or output of the function is of a compatible type when running.
Any, optional) - The function to be wrapped, this is implicitly passed in you do not manually pass this.
str, optional) - If true, the
pipeline_functionwill only be called once if the pipeline is sequentially run.
bool, optional) - If true then the function is called at the start of the pipeline regardless of where it is declared in the
- Note: the only
Variableobject that can be passed in if
PipelineFileobject currently. More will be supported in the future.
- Note: the only
from pipeline import Pipeline, Variable, pipeline_function @pipeline_function def add_numbers(a: int, b: int) -> int: return a + b c = add_numbers(3, 4) # c = 7 with Pipeline("add-numbers") as pipeline: a = Variable(type_class=int, is_input=True) b = Variable(type_class=int, is_input=True) pipeline.add_variables(a, b) c = add_numbers(a, b) # c is equal to a Variable object returned from add_numbers pipeline.output(c)
Multi variable outputs
It is possible to output multiple variables from a
pipeline_function inside of the context manager by using a
from typing import Tuple @pipeline_function def test_function() -> Tuple[str, int]: return ("test", 1) with Pipeline("test") as builder: var1, var2 = test_function() # Must split variables, cannot output raw Tuple builder.output(var2, var1) test_pl = Pipeline.get_pipeline("test") outputs = test_pl.run() assert outputs == [1, "test"]
Updated 4 months ago