Workflow

class ikomia.dataprocess.workflow.Workflow(name: str = 'Untitled', registry: IkomiaRegistry = IkomiaRegistry())

Workflow management of Computer Vision tasks. Implement features to create, modify and run graph-based pipeline of CWorkflowTask objects or derived. Workflows can be created from scratch by using IkomiaRegistry to instanciate and connect task objects. Workflows can also be loaded from JSON file created with the interactive designer of Ikomia Studio. Derived from CWorkflow.

Import

from ikomia.dataprocess.workflow import Workflow

Methods

__init__([name, registry])

Construct Workflow object with the given name and an IkomiaRegistry object.

add_task([task, name, params, auto_connect, ...])

Add task identified by its unique name in the workflow.

connect_tasks(src, target[, edges])

Connect two tasks of the workflow.

find_task(name[, index])

Get identifiers and instance of tasks with the given name in the workflow.

get_task_id(task)

Get task unique identifier from the task instance.

get_task_output([task_obj, task_name, ...])

Get specific output(s) defined by their types (IODataType) for the given task.

get_tasks()

Get all tasks composing the workflow.

get_time_metrics()

Get metrics around workflow execution time.

get_workflow_parameters()

Get parameters exposed at workflow level.

load(path)

Load the worflow file at the given path.

remove_task([task, name, index])

Remove task from workflow specified by task instance or name (with corresponding index).

root()

Get workflow root node.

run()

Start workflow execution on global input.

run_on([array, path, url, folder])

Convenient function to run the workflow on common inputs.

save(path[, exposed_params, exposed_outputs])

Save workflow into a JSON definition file.

set_directory_input([folder, index])

Set folder as global input of the workflow.

set_image_input([array, path, url, index, ...])

Set image as global input of the workflow.

set_video_input([path, url, index, datatype])

Set video as global input of the workflow.

set_parameters(params[, task_obj, ...])

Set task parameters as a simple key-value dict.

set_task_enabled([task, name, index, enabled])

Enable/Disable task for running.

set_workflow_parameters(params)

Set workflow parameters.

Inherited methods

add_input(self, input)

Add global input to the workflow.

clear(self)

Remove all tasks and connections from the workflow.

clear_inputs(self)

Remove all inputs of the workflow.

clear_output_data(self)

Clear output data for all the tasks of the workflow.

delete_edge(self, id)

Remove the given connection/edge from the workflow.

delete_task(self, id)

Remove the given task from the workflow.

export_graphviz(self, path)

Export the workflow structure as Graphviz .dot file.

get_children(self, id)

Get child task identifiers of the task specified by the given identifier.

get_edge_info(self, id)

Get connection information (edge) from its unique identifier.

get_edge_source(self, id)

Get source/parent task identifier of a given edge.

get_edge_target(self, id)

Get target/child task identifier of a given edge.

get_elapsed_time_to(self, task_id)

Get the workflow running time in milliseconds from the start to the given task.

get_final_tasks(self)

Get all final or leaf tasks of the workflow.

get_in_edges(self, id)

Get input connections (in-edges) of the task specified by the given identifier.

get_out_edges(self, id)

Get output connections (out-edges) of the task specified by the given identifier.

get_parents(self, id)

Get parent task identifiers of the task specified by the given identifier.

get_required_tasks(self, path)

Get task names required to load and execute the given workflow file.

get_root_id(self)

Get unique identifier of the root node.

get_root_target_types(self)

Get all input data types of the tasks connected to the root node.

get_task(self, id)

Get the task object from the given ID.

get_task_count(self)

Get the number of tasks in the workflow.

get_task_ids(self)

Get the list of all task identifiers.

get_total_elapsed_time(self)

Get the total workflow running time in milliseconds

remove_input(self, index)

Remove global workflow input at the given index.

set_auto_save(self, enable)

Activate/deactivate auto-save mode.

set_input(self, input, index, new_sequence)

Set workflow input at position index.

set_output_folder(self, path)

Set workflow output folder.

stop(self)

Stop workflow execution.Each CWorkflowTask object or derived must reimplement the stop() function that will be called by the workflow.

update_start_time(self)

Reset the starting point for the computation of the running time.

Details

__init__(name: str = 'Untitled', registry: IkomiaRegistry = IkomiaRegistry())

Construct Workflow object with the given name and an IkomiaRegistry object. The latter is used to instanciate algorithm from their unique name when added to the workflow. Thus, you are able to use any Ikomia algorithms (built-in and Ikomia HUB) in your workflow.

Parameters:
  • name (str) – workflow name

  • registry (IkomiaRegistry) – Ikomia algorithm registry, default: global Ikomia registry

add_input((CWorkflow)self, (CWorkflowTaskIO)input) None :

Add global input to the workflow.

Parameters:

input (CWorkflowTaskIO based object) – input object

add_task(task: CWorkflowTask = None, name: str = '', params: dict = None, auto_connect: bool = False, public_hub: bool = True, private_hub: bool = False) CWorkflowTask

Add task identified by its unique name in the workflow. If the given task is not yet in the registry, it will be firstly downloaded and installed from Ikomia HUB. Task unique identifier can then be retrieved with get_task_id().

Parameters:
  • task (CWorkflowTask) – algorithm instance

  • name (str) – algorithm unique name

  • params (CWorkflowTaskParam based object) – algorithm parameters

  • auto_connect (bool) – True to connect with parent tasks automatically

  • public_hub (bool) – True if algorithm can be installed from Ikomia HUB

  • private_hub (bool) – True if algorithm can be installed from private HUB

Returns:

task instance

Return type:

CWorkflowTask based object

clear((CWorkflow)self) None :

Remove all tasks and connections from the workflow. The workflow is thus empty after that.

clear_inputs((CWorkflow)self) None :

Remove all inputs of the workflow.

clear_output_data((CWorkflow)self) None :

Clear output data for all the tasks of the workflow.

connect_tasks(src: CWorkflowTask, target: CWorkflowTask, edges: list = None)

Connect two tasks of the workflow. Depending of the inputs/outputs configuration, multiple connections between the two tasks can be set. A connection is a pair (ie tuple) composed by the output index of the source task and the input index of the target task.

Parameters:
  • src (CWorkflowTask based object) – source task or None (connect to root)

  • target (CWorkflowTask based object) – target task

  • edges (list of pair) – connections. If empty list is passed, auto-connection is enabled so that the system will try to find the best connections automatically with respect to inputs and outputs data types.

delete_edge((CWorkflow)self, (int)id) None :

Remove the given connection/edge from the workflow. The identifier becomes invalid after this operation.

Parameters:

id (int) – edge identifier

delete_task((CWorkflow)self, (int)id) None :

Remove the given task from the workflow. The identifier becomes invalid after this operation.

Parameters:

id (int) – task identifier

export_graphviz((CWorkflow)self, (str)path) None :

Export the workflow structure as Graphviz .dot file. You can then visualize it with the dot command or with Graphviz Python package.

Parameters:

path (str) – path where the .dot file is saved

find_task(name: str, index: int = -1) CWorkflowTask | list

Get identifiers and instance of tasks with the given name in the workflow.

Parameters:
  • name (str) – algorithm name. Multiple candidates may exist, so use index parameter to specify one.

  • index (int) – zero-based index of the wanted task. If -1, the function returns all candidates.

Returns:

if index == -1 CWorkflowTask based object : if index != -1

Return type:

list of CWorkflowTask based objects

get_children((CWorkflow)self, (int)id) object :

Get child task identifiers of the task specified by the given identifier. Task connected to the outputs of a given task is designated as child or target.

Parameters:

id (int) – task identifier on which to get childs

Returns:

child identifiers

Return type:

int list

get_edge_info((CWorkflow)self, (int)id) tuple :

Get connection information (edge) from its unique identifier.

Parameters:

id (int) – edge identifier

Returns:

pair composed by the output index of the source/parent task and the input index of the target/child task

Return type:

tuple (int)

get_edge_source((CWorkflow)self, (int)id) int :

Get source/parent task identifier of a given edge.

Parameters:

id (int) – edge identifier

Returns:

task identifier

Return type:

id (int)

get_edge_target((CWorkflow)self, (int)id) int :

Get target/child task identifier of a given edge.

Parameters:

id (int) – edge identifier

Returns:

task identifier

Return type:

id (int)

get_elapsed_time_to((CWorkflow)self, (int)task_id) float :

Get the workflow running time in milliseconds from the start to the given task.

Parameters:

id (int) – task identifier

Returns:

elapsed time

Return type:

float

get_final_tasks((CWorkflow)self) object :

Get all final or leaf tasks of the workflow.

Returns:

leaf task identifiers

Return type:

int list

get_in_edges((CWorkflow)self, (int)id) object :

Get input connections (in-edges) of the task specified by the given identifier. Edge information can then be retrieved by the function get_edge_info().

Parameters:

id (int) – task identifier

Returns:

edge identifiers

Return type:

int list

get_out_edges((CWorkflow)self, (int)id) object :

Get output connections (out-edges) of the task specified by the given identifier. Edge information can then be retrieved by the function get_edge_info().

Parameters:

id (int) – task identifier

Returns:

edge identifiers

Return type:

int list

get_parents((CWorkflow)self, (int)id) object :

Get parent task identifiers of the task specified by the given identifier. Task connected to the inputs of a given task is designated as parent or source.

Parameters:

id (int) – task identifier on which to get parents

Returns:

parent identifiers

Return type:

int list

get_required_tasks((CWorkflow)self, (str)path) object :

Get task names required to load and execute the given workflow file.

Parameters:

path (str) – path to the workflow file (JSON)

Returns:

task names

Return type:

list of str

get_root_id((CWorkflow)self) int :

Get unique identifier of the root node.

Returns:

root node ID

Return type:

int

get_root_target_types((CWorkflow)self) object :

Get all input data types of the tasks connected to the root node.

Returns:

data types

Return type:

list of IODataType

get_task((CWorkflow)self, (int)id) CWorkflowTask :

Get the task object from the given ID. Unique task identifiers can be retrieved with the functions get_task_ids() or find_task().

Returns:

CWorkflowTask object or derived

get_task_id(task) int

Get task unique identifier from the task instance.

Parameters:

task (CWorkflowTask based object) – task instance

Returns:

task unique identifier

Return type:

int

get_task_count((CWorkflow)self) int :

Get the number of tasks in the workflow.

Returns:

task count

Return type:

int

get_task_ids((CWorkflow)self) object :

Get the list of all task identifiers. You can then retrieve task object from ID with the function get_task().

get_task_output(task_obj=None, task_name: str = '', task_index: int = 0, types: list = [ikomia.core.pycore.IODataType.IMAGE], output_index: int = -1) CWorkflowTaskIO

Get specific output(s) defined by their types (IODataType) for the given task.

Parameters:
  • task_obj (task_obj (CWorkflowTask based object) – task instance. See also add_task() and find_task().

  • task_name (str) – algorithm name to be found. Multiple candidates may exist, so use task_index parameter to specify one. Method find_task() is used to retrieve corresponding task(s).

  • task_index (int) – zero-based index of the wanted task. If -1, the function returns all candidates outputs.

  • types (list of IODataType) – output data types.

  • output_index (int) – zero-based index of he wanted output.

Returns:

task output of the given data types (can be a list).

Return type:

CWorkflowTaskIO based object

get_tasks() list

Get all tasks composing the workflow.

Returns:

list of CWorkflowTask based object

get_time_metrics() dict

Get metrics around workflow execution time. This includes the total execution time of the workflow, and for each task, the execution time and the execution time from the start.

Returns:

metrics

Return type:

dict

get_total_elapsed_time((CWorkflow)self) float :

Get the total workflow running time in milliseconds

Returns:

elapsed time

Return type:

float

get_workflow_parameters()

Get parameters exposed at workflow level.

Returns:

list of key-values

Return type:

dict

load(path: str)

Load the worflow file at the given path. The function will try to install algorithms if they are not available.

Parameters:

path (str) – full path to the workflow definition file to load.

remove_input((CWorkflow)self, (int)index) None :

Remove global workflow input at the given index.

Parameters:

index (int) – zero-based index of the input to remove

remove_task(task: CWorkflowTask = None, name: str = '', index: int = 0)

Remove task from workflow specified by task instance or name (with corresponding index).

Parameters:
  • task (CWorkflowTask) – task object instance

  • name (str) – algorithm name to be found. Multiple candidates may exist, so use task_index parameter to specify one. Method find_task() is used to retrieve corresponding task(s).

  • index (int) – zero-based index of the wanted task.

root()

Get workflow root node.

Returns:

root task instance

Return type:

CWorkflowTask

run()

Start workflow execution on global input. Each CWorkflowTask object or derived must reimplement the run() function that will be called in the right order by the workflow. Please note that global inputs should be set before calling this function (see set_input(), set_image_input(), set_directory_input()).

run_on(array: ndarray = None, path: str = '', url: str = '', folder: str = '')

Convenient function to run the workflow on common inputs. For more advanced use, please consult Workflow. See also set_image_input() and set_directory_input()

Parameters:
  • array (Numpy array) – image or generic array as numpy array

  • path (str) – path to image (valid formats are those managed by OpenCV)

  • url (str) – URL to image file (valid formats are those managed by OpenCV)

  • folder (str) – image folder

set_auto_save((CWorkflow)self, (bool)enable) None :

Activate/deactivate auto-save mode. If activated, outputs of each tasks of the workflow will be saved in the workflow output folder (see set_output_folder() to set your custom folder). By default, outputs are saved in user_folder/Ikomia/Workflows.

Parameters:

enable (bool) – True or False

set_directory_input(folder: str = '', index: int = -1)

Set folder as global input of the workflow. For image-based workflows, all images inside the directory (recursively) will be processed.

Parameters:
  • folder (str) – images folder

  • index (int) – zero-based input index, if -1 a new input is added

set_image_input(array: ndarray = None, path: str = '', url: str = '', index: int = -1, datatype: IODataType = ikomia.core.pycore.IODataType.IMAGE)

Set image as global input of the workflow. Image can be specified by a Numpy array, a path or an URL thanks to keyword arguments, you have to choose one of them.

Parameters:
  • array (ndarray) – image input as Numpy array

  • path (str) – image input as file path (valid formats are those managed by OpenCV)

  • url (str) – valid URL to image file (valid formats are those managed by OpenCV)

  • index (int) – zero-based input index, if -1 a new input is added

  • datatype (IODataType) – image type

set_video_input(path: str = '', url: str = '', index: int = -1, datatype: IODataType = ikomia.core.pycore.IODataType.VIDEO)

Set video as global input of the workflow. Video can be specified by a path or an URL thanks to keyword arguments, you have to choose one of them.

Parameters:
  • path (str) – image input as file path (valid formats are those managed by OpenCV)

  • url (str) – valid URL to image file (valid formats are those managed by OpenCV)

  • index (int) – zero-based input index, if -1 a new input is added

  • datatype (IODataType) – image type

set_input((CWorkflow)self, (CWorkflowTaskIO)input, (int)index, (bool)new_sequence) None :

Set workflow input at position index. If index is greater than the input count, the function adds the right number of inputs automatically. Derived class that handles common data type already exists: CImageIO, CVideoIO, CNumericIO, CGraphicsInput, CDatasetIO, CPathIO, CArrayIO, CBlobMeasureIO.

Parameters:
  • input (CWorkflowTaskIO object or derived) – global input of the workflow

  • index (int) – zero-based index

  • new_sequence (bool) – True if it is a new input sequence, False if it is just a new frame of a video or camera stream

set_output_folder((CWorkflow)self, (str)path) None :

Set workflow output folder. If auto-save mode is activated (see set_auto_save()) outputs of each tasks of the workflow will be saved automatically in this folder. Behind the scene, each task implements a save() function that calls sequentially the save() function of all these outputs.

Parameters:

path (str) – path to the desired directory

set_parameters(params: dict, task_obj: CWorkflowTask = None, task_name: str = '', index: int = -1)

Set task parameters as a simple key-value dict. You can get parameters keys for each by calling:

print(task_obj.get_param_object())
Parameters:
  • params (dict) – key-value pairs of parameters to modify.

  • task_obj (CWorkflowTask based object) – task instance. See also add_task() and find_task().

  • task_name (str) – algorithm name to be found. Multiple candidates may exist, so use task_index parameter to specify one. Method find_task() is used to retrieve corresponding task(s).

  • index (int) – zero-based index of the wanted task. If -1, the function modifies all candidates parameters.

set_task_enabled(task: CWorkflowTask = None, name: str = '', index: int = -1, enabled: bool = True)

Enable/Disable task for running.

Parameters:
  • task (CWorkflowTask based object) – algorithm instance

  • name (str) – algorithm unique name

  • index (int) – zero-based index of the wanted task. If -1, the function modifies all candidates parameters.

  • enabled (bool) – True if algorithm has to be ran, False otherwise

set_workflow_parameters(params: dict)

Set workflow parameters. Available parameters are those exposed when workflow is saved. Actually, an exposed parameter is bound to a task parameter within a workflow. The aim is to be able to select meaningfull parameters with respect to the workflow objective.

Parameters:

params (dict) – key-value pairs

save(path: str, exposed_params: dict | None = None, exposed_outputs: dict | None = None)

Save workflow into a JSON definition file.

Parameters:
  • path (str) – full path to the workflow definition file to save.

  • exposed_params (dict) –

    set the list of task parameters that has to be exposed at workflow level. The aim is to be able to select meaningfull parameters with respect to the workflow objective.

    Example of the dict structure:

    exposed_params = {
        "task_name": {
            "task_parameter_name_1": {
                "name": "my_workflow_parameter_name_1",
                "description": "my_description_1",
            },
            "task_parameter_name_2": {
                "name": "my_workflow_parameter_name_2",
                "description": "my_description_2",
            },
        }
    }
    

    To specify a task, you can give a task name, a task instance or a task id. Be careful with task name because, in case of workflow involving several times the same task, the function will expose parameters of the first task only. Prefer using task instance or task id in this case.

    If ‘name” field is empty, the original task parameter name is used.

    The ‘description’ field is also optional.

    If you want to expose all parameters of a task, just pass an empty dict for the value of the task name key.

  • exposed_outputs (dict) –

    set the list of outputs that has to be exposed at workflow level. The aim is to be able to select meaningfull outputs with respect to the workflow objective.

    Example of the dict structure:

    exposed_outputs = {
        "task_name": [
            {
                "description": "my_description_1",
                "index": 0,
            },
            {
                "description": "my_description_2",
                "index": 1,
            },
        ]
    }
    

    To specify a task, you can give a task name, a task instance or a task id. Be careful with task name because, in case of workflow involving several times the same task, the function will expose parameters of the first task only. Prefer using task instance or task id in this case.

    The ‘description’ field is optional.

    If you want to expose all outputs of a task, just pass an empty list for the value of the task name key.

stop((CWorkflow)self) None :

Stop workflow execution.Each CWorkflowTask object or derived must reimplement the stop() function that will be called by the workflow. Depending on the process implementation, stop may not be instantaneous.

update_start_time((CWorkflow)self) None :

Reset the starting point for the computation of the running time. This feature could be interesting if you want to process a list of images and monitor the time per image.