API Reference

FACT_workflow_manager.get_task_dfMethod
get_task_df(task_name::String, dependencies_info::Dict, results_connection::MySQL.Connection) -> DataFrame

Retrieves a DataFrame for a given task by executing a database query through a results connection. This function looks up the task's details in dependencies_info using task_name as the key. If the task is found, it extracts the function name, arguments, and parameters dictionary from the task's definition and then queries the database to fetch the corresponding data.

Arguments

  • task_name::String: The name of the task for which data is to be fetched.
  • dependencies_info::Dict: A dictionary containing task definitions, including function names, arguments, and parameters.
  • results_connection::MySQL.Connection: An active database connection through which the query will be executed.

Returns

DataFrame: A DataFrame containing the results of the query. Returns an empty DataFrame if the task is not found in dependencies_info.

Example

Assuming dependencies_info is a dictionary with task definitions and results_connection is an established MySQL connection:

gettaskdf("myTask", dependenciesinfo, resultsconnection)

This will return a DataFrame with the data for myTask, based on the details specified in dependencies_info.

source
FACT_workflow_manager.manage_workflowMethod

manageworkflow(taskslist::Vector{Dict}, results_container::Dict, system::OrchestratorRegistry.ServiceRegistry)

Executes a workflow by managing the execution of tasks based on their dependencies and the provided service registry.

This function handles a workflow where tasks are represented as a list of dictionaries. Each task is executed in parallel using multithreading, respecting dependencies between tasks. Tasks communicate their completion status via channels, and their execution may update the results container.

Arguments

  • tasks_list::Vector{Dict}: A list of task dictionaries. Each dictionary represents a task and includes the following keys:
    • id (String): The unique identifier for the task.
    • dependencies (Vector{String}): A list of task IDs that must complete before this task can run.
    • Other relevant task-specific information (e.g., service, function, arguments).
  • results_container::Dict: A dictionary representing the results container, used by tasks to store or retrieve results.
  • system::OrchestratorRegistry.ServiceRegistry: The service registry used to manage services required by tasks during execution.

Returns

  • None

Behavior

  1. Converts the tasks_list into a dictionary (Dict{String, Dict}) where the task id is the key for efficient access.
  2. Initializes channels for each task to signal dependency completion.
  3. Spawns a thread for each task:
    • Waits for dependencies to complete before execution.
    • Executes the task, updating the results_container and other relevant data structures.
  4. Ensures all tasks are completed before the function exits.

Example

tasks_list = [
    Dict(
        "id" => "task1",
        "dependencies" => [],
        "service" => "example_service",
        "function" => "run_task1",
        "arguments" => "arg1",
        "parameters_dict" => Dict()
    ),
    Dict(
        "id" => "task2",
        "dependencies" => ["task1"],
        "service" => "example_service",
        "function" => "run_task2",
        "arguments" => "arg2",
        "parameters_dict" => Dict()
    )
]

results_container = Dict(
    "user" => "root",
    "password" => "password",
    "host" => "localhost",
    "port" => 5432
)

system = ServiceRegistry()  # Assuming ServiceRegistry is preconfigured

manage_workflow(tasks_list, results_container, system)
source
FACT_workflow_manager.parseWorkflowMethod

parseWorkflow(filePath::String)

Parses a workflow configuration file in TOML format into a flat list of dictionaries. Each task is represented as a dictionary, including an id key for its name.

Arguments

  • filePath::String: The path to the TOML file containing the workflow configuration.

Returns

  • Vector{Dict}: A list of task dictionaries. Each dictionary represents a task and includes:
    • id (String): The unique identifier for the task (extracted from the task name).
    • Other key-value pairs corresponding to task-specific details from the TOML file.

Behavior

  1. Reads and parses the specified TOML file.
  2. Ensures the file contains a "tasks" section.
  3. Converts each task entry into a dictionary, adding an id field corresponding to the task name.
  4. Returns a list of these dictionaries.

Example

```julia

Example TOML file structure:

[tasks.nocounties] service = "FACTgeo" function = "getgeoobjects" arguments = "nocounties" parametersdict = {} dependencies = []

[tasks.anothertask] service = "Anotherservice" function = "anotherfunction" arguments = "argvalue" parametersdict = {key1 = "value1"} dependencies = ["nocounties"]

Usage:

tasks = parseWorkflow("path/to/workflow.toml") for task in tasks println("Task ID: ", task["id"]) println("Service: ", task["service"]) end ``

source
FACT_workflow_manager.process_task_parametersMethod
process_task_parameters(original_dict::Dict, dependencies_info::Dict, results_container::Dict) -> Dict

Processes a dictionary of task parameters to resolve any references to data produced by other tasks. References are identified by a specific format (REFtaskName[!, :columnName]) and are replaced with the actual data values from the task outputs. This function supports dynamic parameter passing between tasks in a workflow, where the output of one task can be used as an input parameter for another.

Arguments

  • original_dict::Dict: The original dictionary containing the task parameters, potentially including references to other tasks' outputs.
  • dependencies_info::Dict: A dictionary containing information about the tasks and their dependencies.
  • results_container::Dict: The information to activate a database connection used to fetch data referenced by task outputs.

Returns

Dict: A new dictionary with all references resolved to actual data values. If a reference cannot be resolved due to missing data or incorrect reference format, an error is raised.

Example

Given a task parameter dictionary with references to outputs of other tasks, this function will resolve those references:

processtaskparameters(originalparameters, dependenciesinfo, results_connection)

This will return a new dictionary where all REF references have been replaced with the actual values from the respective tasks' outputs.

source
FACT_workflow_manager.run_fileMethod

run_file(workflowFilePath::String, system::OrchestratorRegistry.ServiceRegistry)

Executes a workflow defined in a file, leveraging a provided ServiceRegistry to manage the services involved.

This function automates the setup, execution, and teardown of a containerized workflow, simplifying the process of running complex workflows with multiple dependencies. It performs the following steps:

  • Parses the workflow file specified by workflowFilePath to extract tasks.
  • Retrieves the FACTResultsIO service from the provided ServiceRegistry to configure the results container.
  • Manages workflow execution, including handling tasks, updating global variables, and interacting with services.
  • Optionally updates the results container with the final workflow results.

Arguments

  • workflowFilePath::String: The path to the file containing the workflow definition.
  • system::OrchestratorRegistry.ServiceRegistry: A registry of services required for workflow execution.

Behavior

  • Parses the workflow file to determine the tasks to execute.
  • Uses the FACTResultsIO service from the registry to set up a results container.
  • Executes tasks defined in the workflow, interacting with the necessary services as defined in the registry.
  • Manages cleanup and updates for the results container after workflow execution.

Side Effects

  • Starts and stops Docker containers as required for workflow execution.
  • Modifies the results container based on workflow outcomes.

Example

To run a workflow defined in a file located at /path/to/workflow.yml, using a preconfigured service registry system:

run_file("/path/to/workflow.yml", system)

This will execute the workflow, managing all service interactions and container operations as needed.

source
FACT_workflow_manager.run_taskMethod

runtask( taskname::String, taskinfo::Dict, dependenciesinfo::Dict, system::OrchestratorRegistry.ServiceRegistry, resultscontainer::Dict, updatecondition::Atomic{Bool}, overwrite::Bool )

Executes a single task within a workflow, processing its dependencies and updating the results container as needed.

This function performs the following steps:

  1. Processes task parameters, resolving dependencies from the provided dependencies_info.
  2. Executes the core work of the task using the task_work function.
  3. Updates the update_condition flag if the task modifies the results container.
  4. Logs task execution details, including start and end times and the task duration.

Arguments

  • task_name::String: The name of the task to execute.
  • task_info::Dict: A dictionary containing details about the task, including:
    • "function": The name of the function to call for task execution.
    • "arguments": A list of arguments to pass to the task function.
    • "parameters_dict": A dictionary of parameters that may depend on task dependencies.
  • dependencies_info::Dict: A dictionary containing resolved information about the task's dependencies.
  • system::OrchestratorRegistry.ServiceRegistry: A service registry used to access services required by the task.
  • results_container::Dict: A dictionary representing the results container, where the task can store or retrieve results.
  • update_condition::Atomic{Bool}: An atomic flag to indicate if the results container was updated.
  • overwrite::Bool: A flag indicating whether existing results should be overwritten.

Behavior

  • Resolves dependencies using process_task_parameters.
  • Executes the specified task function with the provided arguments and parameters.
  • Uses an atomic compare-and-swap (atomic_cas!) operation to update the update_condition flag if the task modifies the results container.
  • Logs start time, end time, and execution duration for the task.

Side Effects

  • May modify the results_container if the task produces output.
  • Updates the update_condition flag if the task modifies the results container.

Example

Define a task and its dependencies, and run it:

task_name = "example-task"
task_info = Dict(
    "function" => "example_function",
    "arguments" => ["arg1", "arg2"],
    "parameters_dict" => Dict("param1" => "value1")
)

dependencies_info = Dict(
    "dependency1" => Dict("result" => "dependency-result")
)

results_container = Dict(
    "user" => "root",
    "password" => "devops",
    "host" => "localhost",
    "port" => 8080
)

system = ServiceRegistry()
add_service!(system, Service("123", "example-service", "http://localhost", 9000))

update_condition = Atomic{Bool}(false)
overwrite = true

run_task(task_name, task_info, dependencies_info, system, results_container, update_condition, overwrite)

This example demonstrates how to define and execute a task within a workflow, updating the results container as necessary.

source
FACT_workflow_manager.FACT_workflow_managerModule
Module FACT_workflow_manager

The FACT_workflow_manager module is designed to facilitate the execution of tasks defined in workflow files, particularly in the context of data analysis and processing. It leverages Julia's capabilities for multithreading, data manipulation, and database interaction, as well as Docker container management for isolated execution environments. The module integrates functionalities for parsing workflow definitions, managing Docker containers, executing tasks based on their dependencies, and handling the input/output of results.

Dependencies

  • Base.Threads: For utilizing multithreading capabilities to improve performance and manage concurrent executions.
  • TOML: To parse workflow definitions from TOML files.
  • MySQL: To interact with MySQL databases for storing and retrieving task execution results.
  • DataFrames: For handling and manipulating data within tasks.
  • Random, Dates: Utilized for generating random data and managing date/time operations respectively.
  • FACTResultsIO, FACTDataReader, JuliaDockerClient: Custom modules or packages for managing results I/O, reading data specific to the FACT framework, and interacting with Docker containers.

Key Components

  • TOMLParser.jl: Contains functions for parsing workflow definitions from TOML formatted files.
  • Containers.jl: Includes utilities for managing Docker containers, such as starting, stopping, and interacting with containers.
  • Task.jl: Defines the structure and execution logic for tasks within the workflow, including parameter processing and task execution.
  • Runner.jl: Implements the logic for running the workflow, managing the execution order of tasks based on dependencies, and orchestrating the overall workflow execution.

Exported Functions

  • run_file: The primary interface for executing a workflow defined in a TOML file. It takes the path to a workflow file as input and manages the execution of the defined tasks.

Usage Example

To execute a workflow defined in a TOML file located at /path/to/workflow.toml, ensure that the FACTWorkflowManager module is included in your project, and then call:

using FACTworkflowmanager

run_file("/path/to/workflow.toml")

This will initiate the workflow execution process, handling task dependencies, Docker container management, and results I/O as defined in the workflow file.

source