Workflow¶
- class ikomia.dataprocess.workflow.Workflow(name: str = 'Untitled', registry: IkomiaRegistry = IkomiaRegistry())¶
Workflow management of Computer Vision tasks. Implement features to create, modify and run graph-based pipeline of
CWorkflowTask
objects or derived. Workflows can be created from scratch by usingIkomiaRegistry
to instanciate and connect task objects. Workflows can also be loaded from JSON file created with the interactive designer of Ikomia Studio. Derived fromCWorkflow
.Import
from ikomia.dataprocess.workflow import Workflow
Methods
__init__
([name, registry])Construct Workflow object with the given name and an
IkomiaRegistry
object.add_task
([task, name, params, auto_connect, ...])Add task identified by its unique name in the workflow.
connect_tasks
(src, target[, edges])Connect two tasks of the workflow.
find_task
(name[, index])Get identifiers and instance of tasks with the given name in the workflow.
get_task_id
(task)Get task unique identifier from the task instance.
get_task_output
([task_obj, task_name, ...])Get specific output(s) defined by their types (
IODataType
) for the given task.Get all tasks composing the workflow.
Get metrics around workflow execution time.
Get parameters exposed at workflow level.
load
(path)Load the worflow file at the given path.
remove_task
([task, name, index])Remove task from workflow specified by task instance or name (with corresponding index).
root
()Get workflow root node.
run
()Start workflow execution on global input.
run_on
([array, path, url, folder])Convenient function to run the workflow on common inputs.
save
(path[, exposed_params, exposed_outputs])Save workflow into a JSON definition file.
set_directory_input
([folder, index])Set folder as global input of the workflow.
set_image_input
([array, path, url, index, ...])Set image as global input of the workflow.
set_video_input
([path, url, index, datatype])Set video as global input of the workflow.
set_parameters
(params[, task_obj, ...])Set task parameters as a simple key-value dict.
set_task_enabled
([task, name, index, enabled])Enable/Disable task for running.
set_workflow_parameters
(params)Set workflow parameters.
Inherited methods
add_input
(self, input)Add global input to the workflow.
clear
(self)Remove all tasks and connections from the workflow.
clear_inputs
(self)Remove all inputs of the workflow.
clear_output_data
(self)Clear output data for all the tasks of the workflow.
delete_edge
(self, id)Remove the given connection/edge from the workflow.
delete_task
(self, id)Remove the given task from the workflow.
export_graphviz
(self, path)Export the workflow structure as Graphviz .dot file.
get_children
(self, id)Get child task identifiers of the task specified by the given identifier.
get_edge_info
(self, id)Get connection information (edge) from its unique identifier.
get_edge_source
(self, id)Get source/parent task identifier of a given edge.
get_edge_target
(self, id)Get target/child task identifier of a given edge.
get_elapsed_time_to
(self, task_id)Get the workflow running time in milliseconds from the start to the given task.
get_final_tasks
(self)Get all final or leaf tasks of the workflow.
get_in_edges
(self, id)Get input connections (in-edges) of the task specified by the given identifier.
get_out_edges
(self, id)Get output connections (out-edges) of the task specified by the given identifier.
get_parents
(self, id)Get parent task identifiers of the task specified by the given identifier.
get_required_tasks
(self, path)Get task names required to load and execute the given workflow file.
get_root_id
(self)Get unique identifier of the root node.
get_root_target_types
(self)Get all input data types of the tasks connected to the root node.
get_task
(self, id)Get the task object from the given ID.
get_task_count
(self)Get the number of tasks in the workflow.
get_task_ids
(self)Get the list of all task identifiers.
get_total_elapsed_time
(self)Get the total workflow running time in milliseconds
remove_input
(self, index)Remove global workflow input at the given index.
set_auto_save
(self, enable)Activate/deactivate auto-save mode.
set_input
(self, input, index, new_sequence)Set workflow input at position index.
set_output_folder
(self, path)Set workflow output folder.
stop
(self)Stop workflow execution.Each
CWorkflowTask
object or derived must reimplement the stop() function that will be called by the workflow.update_start_time
(self)Reset the starting point for the computation of the running time.
Details
- __init__(name: str = 'Untitled', registry: IkomiaRegistry = IkomiaRegistry())¶
Construct Workflow object with the given name and an
IkomiaRegistry
object. The latter is used to instanciate algorithm from their unique name when added to the workflow. Thus, you are able to use any Ikomia algorithms (built-in and Ikomia HUB) in your workflow.- Parameters:
name (str) – workflow name
registry (IkomiaRegistry) – Ikomia algorithm registry, default: global Ikomia registry
- add_input((CWorkflow)self, (CWorkflowTaskIO)input) None : ¶
Add global input to the workflow.
- Parameters:
input (
CWorkflowTaskIO
based object) – input object
- add_task(task: CWorkflowTask = None, name: str = '', params: dict = None, auto_connect: bool = False, public_hub: bool = True, private_hub: bool = False) CWorkflowTask ¶
Add task identified by its unique name in the workflow. If the given task is not yet in the registry, it will be firstly downloaded and installed from Ikomia HUB. Task unique identifier can then be retrieved with
get_task_id()
.- Parameters:
task (CWorkflowTask) – algorithm instance
name (str) – algorithm unique name
params (
CWorkflowTaskParam
based object) – algorithm parametersauto_connect (bool) – True to connect with parent tasks automatically
public_hub (bool) – True if algorithm can be installed from Ikomia HUB
private_hub (bool) – True if algorithm can be installed from private HUB
- Returns:
task instance
- Return type:
CWorkflowTask
based object
- clear((CWorkflow)self) None : ¶
Remove all tasks and connections from the workflow. The workflow is thus empty after that.
- clear_inputs((CWorkflow)self) None : ¶
Remove all inputs of the workflow.
- clear_output_data((CWorkflow)self) None : ¶
Clear output data for all the tasks of the workflow.
- connect_tasks(src: CWorkflowTask, target: CWorkflowTask, edges: list = None)¶
Connect two tasks of the workflow. Depending of the inputs/outputs configuration, multiple connections between the two tasks can be set. A connection is a pair (ie tuple) composed by the output index of the source task and the input index of the target task.
- Parameters:
src (
CWorkflowTask
based object) – source task or None (connect to root)target (
CWorkflowTask
based object) – target taskedges (list of pair) – connections. If empty list is passed, auto-connection is enabled so that the system will try to find the best connections automatically with respect to inputs and outputs data types.
- delete_edge((CWorkflow)self, (int)id) None : ¶
Remove the given connection/edge from the workflow. The identifier becomes invalid after this operation.
- Parameters:
id (int) – edge identifier
- delete_task((CWorkflow)self, (int)id) None : ¶
Remove the given task from the workflow. The identifier becomes invalid after this operation.
- Parameters:
id (int) – task identifier
- export_graphviz((CWorkflow)self, (str)path) None : ¶
Export the workflow structure as Graphviz .dot file. You can then visualize it with the dot command or with Graphviz Python package.
- Parameters:
path (str) – path where the .dot file is saved
- find_task(name: str, index: int = -1) CWorkflowTask | list ¶
Get identifiers and instance of tasks with the given name in the workflow.
- Parameters:
name (str) – algorithm name. Multiple candidates may exist, so use index parameter to specify one.
index (int) – zero-based index of the wanted task. If -1, the function returns all candidates.
- Returns:
if index == -1
CWorkflowTask
based object : if index != -1- Return type:
list of
CWorkflowTask
based objects
- get_children((CWorkflow)self, (int)id) object : ¶
Get child task identifiers of the task specified by the given identifier. Task connected to the outputs of a given task is designated as child or target.
- Parameters:
id (int) – task identifier on which to get childs
- Returns:
child identifiers
- Return type:
int list
- get_edge_info((CWorkflow)self, (int)id) tuple : ¶
Get connection information (edge) from its unique identifier.
- Parameters:
id (int) – edge identifier
- Returns:
pair composed by the output index of the source/parent task and the input index of the target/child task
- Return type:
tuple (int)
- get_edge_source((CWorkflow)self, (int)id) int : ¶
Get source/parent task identifier of a given edge.
- Parameters:
id (int) – edge identifier
- Returns:
task identifier
- Return type:
id (int)
- get_edge_target((CWorkflow)self, (int)id) int : ¶
Get target/child task identifier of a given edge.
- Parameters:
id (int) – edge identifier
- Returns:
task identifier
- Return type:
id (int)
- get_elapsed_time_to((CWorkflow)self, (int)task_id) float : ¶
Get the workflow running time in milliseconds from the start to the given task.
- Parameters:
id (int) – task identifier
- Returns:
elapsed time
- Return type:
float
- get_final_tasks((CWorkflow)self) object : ¶
Get all final or leaf tasks of the workflow.
- Returns:
leaf task identifiers
- Return type:
int list
- get_in_edges((CWorkflow)self, (int)id) object : ¶
Get input connections (in-edges) of the task specified by the given identifier. Edge information can then be retrieved by the function
get_edge_info()
.- Parameters:
id (int) – task identifier
- Returns:
edge identifiers
- Return type:
int list
- get_out_edges((CWorkflow)self, (int)id) object : ¶
Get output connections (out-edges) of the task specified by the given identifier. Edge information can then be retrieved by the function
get_edge_info()
.- Parameters:
id (int) – task identifier
- Returns:
edge identifiers
- Return type:
int list
- get_parents((CWorkflow)self, (int)id) object : ¶
Get parent task identifiers of the task specified by the given identifier. Task connected to the inputs of a given task is designated as parent or source.
- Parameters:
id (int) – task identifier on which to get parents
- Returns:
parent identifiers
- Return type:
int list
- get_required_tasks((CWorkflow)self, (str)path) object : ¶
Get task names required to load and execute the given workflow file.
- Parameters:
path (str) – path to the workflow file (JSON)
- Returns:
task names
- Return type:
list of str
- get_root_id((CWorkflow)self) int : ¶
Get unique identifier of the root node.
- Returns:
root node ID
- Return type:
int
- get_root_target_types((CWorkflow)self) object : ¶
Get all input data types of the tasks connected to the root node.
- Returns:
data types
- Return type:
list of
IODataType
- get_task((CWorkflow)self, (int)id) CWorkflowTask : ¶
Get the task object from the given ID. Unique task identifiers can be retrieved with the functions
get_task_ids()
orfind_task()
.- Returns:
CWorkflowTask
object or derived
- get_task_id(task) int ¶
Get task unique identifier from the task instance.
- Parameters:
task (
CWorkflowTask
based object) – task instance- Returns:
task unique identifier
- Return type:
int
- get_task_count((CWorkflow)self) int : ¶
Get the number of tasks in the workflow.
- Returns:
task count
- Return type:
int
- get_task_ids((CWorkflow)self) object : ¶
Get the list of all task identifiers. You can then retrieve task object from ID with the function
get_task()
.
- get_task_output(task_obj=None, task_name: str = '', task_index: int = 0, types: list = [ikomia.core.pycore.IODataType.IMAGE], output_index: int = -1) CWorkflowTaskIO ¶
Get specific output(s) defined by their types (
IODataType
) for the given task.- Parameters:
task_obj (task_obj (
CWorkflowTask
based object) – task instance. See alsoadd_task()
andfind_task()
.task_name (str) – algorithm name to be found. Multiple candidates may exist, so use task_index parameter to specify one. Method
find_task()
is used to retrieve corresponding task(s).task_index (int) – zero-based index of the wanted task. If -1, the function returns all candidates outputs.
types (list of
IODataType
) – output data types.output_index (int) – zero-based index of he wanted output.
- Returns:
task output of the given data types (can be a list).
- Return type:
CWorkflowTaskIO
based object
- get_tasks() list ¶
Get all tasks composing the workflow.
- Returns:
list of
CWorkflowTask
based object
- get_time_metrics() dict ¶
Get metrics around workflow execution time. This includes the total execution time of the workflow, and for each task, the execution time and the execution time from the start.
- Returns:
metrics
- Return type:
dict
- get_total_elapsed_time((CWorkflow)self) float : ¶
Get the total workflow running time in milliseconds
- Returns:
elapsed time
- Return type:
float
- get_workflow_parameters()¶
Get parameters exposed at workflow level.
- Returns:
list of key-values
- Return type:
dict
- load(path: str)¶
Load the worflow file at the given path. The function will try to install algorithms if they are not available.
- Parameters:
path (str) – full path to the workflow definition file to load.
- remove_input((CWorkflow)self, (int)index) None : ¶
Remove global workflow input at the given index.
- Parameters:
index (int) – zero-based index of the input to remove
- remove_task(task: CWorkflowTask = None, name: str = '', index: int = 0)¶
Remove task from workflow specified by task instance or name (with corresponding index).
- Parameters:
task (
CWorkflowTask
) – task object instancename (str) – algorithm name to be found. Multiple candidates may exist, so use task_index parameter to specify one. Method
find_task()
is used to retrieve corresponding task(s).index (int) – zero-based index of the wanted task.
- root()¶
Get workflow root node.
- Returns:
root task instance
- Return type:
- run()¶
Start workflow execution on global input. Each
CWorkflowTask
object or derived must reimplement the run() function that will be called in the right order by the workflow. Please note that global inputs should be set before calling this function (seeset_input()
,set_image_input()
,set_directory_input()
).
- run_on(array: ndarray = None, path: str = '', url: str = '', folder: str = '')¶
Convenient function to run the workflow on common inputs. For more advanced use, please consult
Workflow
. See alsoset_image_input()
andset_directory_input()
- Parameters:
array (Numpy array) – image or generic array as numpy array
path (str) – path to image (valid formats are those managed by OpenCV)
url (str) – URL to image file (valid formats are those managed by OpenCV)
folder (str) – image folder
- set_auto_save((CWorkflow)self, (bool)enable) None : ¶
Activate/deactivate auto-save mode. If activated, outputs of each tasks of the workflow will be saved in the workflow output folder (see
set_output_folder()
to set your custom folder). By default, outputs are saved in user_folder/Ikomia/Workflows.- Parameters:
enable (bool) – True or False
- set_directory_input(folder: str = '', index: int = -1)¶
Set folder as global input of the workflow. For image-based workflows, all images inside the directory (recursively) will be processed.
- Parameters:
folder (str) – images folder
index (int) – zero-based input index, if -1 a new input is added
- set_image_input(array: ndarray = None, path: str = '', url: str = '', index: int = -1, datatype: IODataType = ikomia.core.pycore.IODataType.IMAGE)¶
Set image as global input of the workflow. Image can be specified by a Numpy array, a path or an URL thanks to keyword arguments, you have to choose one of them.
- Parameters:
array (ndarray) – image input as Numpy array
path (str) – image input as file path (valid formats are those managed by OpenCV)
url (str) – valid URL to image file (valid formats are those managed by OpenCV)
index (int) – zero-based input index, if -1 a new input is added
datatype (
IODataType
) – image type
- set_video_input(path: str = '', url: str = '', index: int = -1, datatype: IODataType = ikomia.core.pycore.IODataType.VIDEO)¶
Set video as global input of the workflow. Video can be specified by a path or an URL thanks to keyword arguments, you have to choose one of them.
- Parameters:
path (str) – image input as file path (valid formats are those managed by OpenCV)
url (str) – valid URL to image file (valid formats are those managed by OpenCV)
index (int) – zero-based input index, if -1 a new input is added
datatype (
IODataType
) – image type
- set_input((CWorkflow)self, (CWorkflowTaskIO)input, (int)index, (bool)new_sequence) None : ¶
Set workflow input at position index. If index is greater than the input count, the function adds the right number of inputs automatically. Derived class that handles common data type already exists:
CImageIO
,CVideoIO
,CNumericIO
,CGraphicsInput
,CDatasetIO
,CPathIO
,CArrayIO
,CBlobMeasureIO
.- Parameters:
input (
CWorkflowTaskIO
object or derived) – global input of the workflowindex (int) – zero-based index
new_sequence (bool) – True if it is a new input sequence, False if it is just a new frame of a video or camera stream
- set_output_folder((CWorkflow)self, (str)path) None : ¶
Set workflow output folder. If auto-save mode is activated (see
set_auto_save()
) outputs of each tasks of the workflow will be saved automatically in this folder. Behind the scene, each task implements a save() function that calls sequentially the save() function of all these outputs.- Parameters:
path (str) – path to the desired directory
- set_parameters(params: dict, task_obj: CWorkflowTask = None, task_name: str = '', index: int = -1)¶
Set task parameters as a simple key-value dict. You can get parameters keys for each by calling:
print(task_obj.get_param_object())
- Parameters:
params (dict) – key-value pairs of parameters to modify.
task_obj (
CWorkflowTask
based object) – task instance. See alsoadd_task()
andfind_task()
.task_name (str) – algorithm name to be found. Multiple candidates may exist, so use task_index parameter to specify one. Method
find_task()
is used to retrieve corresponding task(s).index (int) – zero-based index of the wanted task. If -1, the function modifies all candidates parameters.
- set_task_enabled(task: CWorkflowTask = None, name: str = '', index: int = -1, enabled: bool = True)¶
Enable/Disable task for running.
- Parameters:
task (
CWorkflowTask
based object) – algorithm instancename (str) – algorithm unique name
index (int) – zero-based index of the wanted task. If -1, the function modifies all candidates parameters.
enabled (bool) – True if algorithm has to be ran, False otherwise
- set_workflow_parameters(params: dict)¶
Set workflow parameters. Available parameters are those exposed when workflow is saved. Actually, an exposed parameter is bound to a task parameter within a workflow. The aim is to be able to select meaningfull parameters with respect to the workflow objective.
- Parameters:
params (dict) – key-value pairs
- save(path: str, exposed_params: dict | None = None, exposed_outputs: dict | None = None)¶
Save workflow into a JSON definition file.
- Parameters:
path (str) – full path to the workflow definition file to save.
exposed_params (dict) –
set the list of task parameters that has to be exposed at workflow level. The aim is to be able to select meaningfull parameters with respect to the workflow objective.
Example of the dict structure:
exposed_params = { "task_name": { "task_parameter_name_1": { "name": "my_workflow_parameter_name_1", "description": "my_description_1", }, "task_parameter_name_2": { "name": "my_workflow_parameter_name_2", "description": "my_description_2", }, } }
To specify a task, you can give a task name, a task instance or a task id. Be careful with task name because, in case of workflow involving several times the same task, the function will expose parameters of the first task only. Prefer using task instance or task id in this case.
If ‘name” field is empty, the original task parameter name is used.
The ‘description’ field is also optional.
If you want to expose all parameters of a task, just pass an empty dict for the value of the task name key.
exposed_outputs (dict) –
set the list of outputs that has to be exposed at workflow level. The aim is to be able to select meaningfull outputs with respect to the workflow objective.
Example of the dict structure:
exposed_outputs = { "task_name": [ { "description": "my_description_1", "index": 0, }, { "description": "my_description_2", "index": 1, }, ] }
To specify a task, you can give a task name, a task instance or a task id. Be careful with task name because, in case of workflow involving several times the same task, the function will expose parameters of the first task only. Prefer using task instance or task id in this case.
The ‘description’ field is optional.
If you want to expose all outputs of a task, just pass an empty list for the value of the task name key.
- stop((CWorkflow)self) None : ¶
Stop workflow execution.Each
CWorkflowTask
object or derived must reimplement the stop() function that will be called by the workflow. Depending on the process implementation, stop may not be instantaneous.
- update_start_time((CWorkflow)self) None : ¶
Reset the starting point for the computation of the running time. This feature could be interesting if you want to process a list of images and monitor the time per image.