API Reference
FACT_workflow_manager.get_task_df
— Methodget_task_df(task_name::String, dependencies_info::Dict, results_connection::MySQL.Connection) -> DataFrame
Retrieves a DataFrame for a given task by executing a database query through a results connection. This function looks up the task's details in dependencies_info
using task_name
as the key. If the task is found, it extracts the function name, arguments, and parameters dictionary from the task's definition and then queries the database to fetch the corresponding data.
Arguments
task_name::String
: The name of the task for which data is to be fetched.dependencies_info::Dict
: A dictionary containing task definitions, including function names, arguments, and parameters.results_connection::MySQL.Connection
: An active database connection through which the query will be executed.
Returns
DataFrame
: A DataFrame containing the results of the query. Returns an empty DataFrame if the task is not found in dependencies_info
.
Example
Assuming dependencies_info
is a dictionary with task definitions and results_connection
is an established MySQL connection:
gettaskdf("myTask", dependenciesinfo, resultsconnection)
This will return a DataFrame with the data for myTask
, based on the details specified in dependencies_info
.
FACT_workflow_manager.manage_workflow
— Methodmanageworkflow(taskslist::Vector{Dict}, results_container::Dict, system::OrchestratorRegistry.ServiceRegistry)
Executes a workflow by managing the execution of tasks based on their dependencies and the provided service registry.
This function handles a workflow where tasks are represented as a list of dictionaries. Each task is executed in parallel using multithreading, respecting dependencies between tasks. Tasks communicate their completion status via channels, and their execution may update the results container.
Arguments
- tasks_list::Vector{Dict}: A list of task dictionaries. Each dictionary represents a task and includes the following keys:
- id (String): The unique identifier for the task.
- dependencies (Vector{String}): A list of task IDs that must complete before this task can run.
- Other relevant task-specific information (e.g., service, function, arguments).
- results_container::Dict: A dictionary representing the results container, used by tasks to store or retrieve results.
- system::OrchestratorRegistry.ServiceRegistry: The service registry used to manage services required by tasks during execution.
Returns
- None
Behavior
- Converts the tasks_list into a dictionary (Dict{String, Dict}) where the task id is the key for efficient access.
- Initializes channels for each task to signal dependency completion.
- Spawns a thread for each task:
- Waits for dependencies to complete before execution.
- Executes the task, updating the results_container and other relevant data structures.
- Ensures all tasks are completed before the function exits.
Example
tasks_list = [
Dict(
"id" => "task1",
"dependencies" => [],
"service" => "example_service",
"function" => "run_task1",
"arguments" => "arg1",
"parameters_dict" => Dict()
),
Dict(
"id" => "task2",
"dependencies" => ["task1"],
"service" => "example_service",
"function" => "run_task2",
"arguments" => "arg2",
"parameters_dict" => Dict()
)
]
results_container = Dict(
"user" => "root",
"password" => "password",
"host" => "localhost",
"port" => 5432
)
system = ServiceRegistry() # Assuming ServiceRegistry is preconfigured
manage_workflow(tasks_list, results_container, system)
FACT_workflow_manager.parseWorkflow
— MethodparseWorkflow(filePath::String)
Parses a workflow configuration file in TOML format into a flat list of dictionaries. Each task is represented as a dictionary, including an id
key for its name.
Arguments
- filePath::String: The path to the TOML file containing the workflow configuration.
Returns
- Vector{Dict}: A list of task dictionaries. Each dictionary represents a task and includes:
- id (String): The unique identifier for the task (extracted from the task name).
- Other key-value pairs corresponding to task-specific details from the TOML file.
Behavior
- Reads and parses the specified TOML file.
- Ensures the file contains a "tasks" section.
- Converts each task entry into a dictionary, adding an
id
field corresponding to the task name. - Returns a list of these dictionaries.
Example
```julia
Example TOML file structure:
[tasks.nocounties] service = "FACTgeo" function = "getgeoobjects" arguments = "nocounties" parametersdict = {} dependencies = []
[tasks.anothertask] service = "Anotherservice" function = "anotherfunction" arguments = "argvalue" parametersdict = {key1 = "value1"} dependencies = ["nocounties"]
Usage:
tasks = parseWorkflow("path/to/workflow.toml") for task in tasks println("Task ID: ", task["id"]) println("Service: ", task["service"]) end ``
FACT_workflow_manager.process_task_parameters
— Methodprocess_task_parameters(original_dict::Dict, dependencies_info::Dict, results_container::Dict) -> Dict
Processes a dictionary of task parameters to resolve any references to data produced by other tasks. References are identified by a specific format (REFtaskName[!, :columnName]) and are replaced with the actual data values from the task outputs. This function supports dynamic parameter passing between tasks in a workflow, where the output of one task can be used as an input parameter for another.
Arguments
original_dict::Dict
: The original dictionary containing the task parameters, potentially including references to other tasks' outputs.dependencies_info::Dict
: A dictionary containing information about the tasks and their dependencies.results_container::Dict
: The information to activate a database connection used to fetch data referenced by task outputs.
Returns
Dict
: A new dictionary with all references resolved to actual data values. If a reference cannot be resolved due to missing data or incorrect reference format, an error is raised.
Example
Given a task parameter dictionary with references to outputs of other tasks, this function will resolve those references:
processtaskparameters(originalparameters, dependenciesinfo, results_connection)
This will return a new dictionary where all REF references have been replaced with the actual values from the respective tasks' outputs.
FACT_workflow_manager.run_file
— Methodrun_file(workflowFilePath::String, system::OrchestratorRegistry.ServiceRegistry)
Executes a workflow defined in a file, leveraging a provided ServiceRegistry
to manage the services involved.
This function automates the setup, execution, and teardown of a containerized workflow, simplifying the process of running complex workflows with multiple dependencies. It performs the following steps:
- Parses the workflow file specified by
workflowFilePath
to extract tasks. - Retrieves the
FACTResultsIO
service from the providedServiceRegistry
to configure the results container. - Manages workflow execution, including handling tasks, updating global variables, and interacting with services.
- Optionally updates the results container with the final workflow results.
Arguments
workflowFilePath::String
: The path to the file containing the workflow definition.system::OrchestratorRegistry.ServiceRegistry
: A registry of services required for workflow execution.
Behavior
- Parses the workflow file to determine the tasks to execute.
- Uses the
FACTResultsIO
service from the registry to set up a results container. - Executes tasks defined in the workflow, interacting with the necessary services as defined in the registry.
- Manages cleanup and updates for the results container after workflow execution.
Side Effects
- Starts and stops Docker containers as required for workflow execution.
- Modifies the results container based on workflow outcomes.
Example
To run a workflow defined in a file located at /path/to/workflow.yml
, using a preconfigured service registry system
:
run_file("/path/to/workflow.yml", system)
This will execute the workflow, managing all service interactions and container operations as needed.
FACT_workflow_manager.run_task
— Methodruntask( taskname::String, taskinfo::Dict, dependenciesinfo::Dict, system::OrchestratorRegistry.ServiceRegistry, resultscontainer::Dict, updatecondition::Atomic{Bool}, overwrite::Bool )
Executes a single task within a workflow, processing its dependencies and updating the results container as needed.
This function performs the following steps:
- Processes task parameters, resolving dependencies from the provided
dependencies_info
. - Executes the core work of the task using the
task_work
function. - Updates the
update_condition
flag if the task modifies the results container. - Logs task execution details, including start and end times and the task duration.
Arguments
task_name::String
: The name of the task to execute.task_info::Dict
: A dictionary containing details about the task, including:"function"
: The name of the function to call for task execution."arguments"
: A list of arguments to pass to the task function."parameters_dict"
: A dictionary of parameters that may depend on task dependencies.
dependencies_info::Dict
: A dictionary containing resolved information about the task's dependencies.system::OrchestratorRegistry.ServiceRegistry
: A service registry used to access services required by the task.results_container::Dict
: A dictionary representing the results container, where the task can store or retrieve results.update_condition::Atomic{Bool}
: An atomic flag to indicate if the results container was updated.overwrite::Bool
: A flag indicating whether existing results should be overwritten.
Behavior
- Resolves dependencies using
process_task_parameters
. - Executes the specified task function with the provided arguments and parameters.
- Uses an atomic compare-and-swap (
atomic_cas!
) operation to update theupdate_condition
flag if the task modifies the results container. - Logs start time, end time, and execution duration for the task.
Side Effects
- May modify the
results_container
if the task produces output. - Updates the
update_condition
flag if the task modifies the results container.
Example
Define a task and its dependencies, and run it:
task_name = "example-task"
task_info = Dict(
"function" => "example_function",
"arguments" => ["arg1", "arg2"],
"parameters_dict" => Dict("param1" => "value1")
)
dependencies_info = Dict(
"dependency1" => Dict("result" => "dependency-result")
)
results_container = Dict(
"user" => "root",
"password" => "devops",
"host" => "localhost",
"port" => 8080
)
system = ServiceRegistry()
add_service!(system, Service("123", "example-service", "http://localhost", 9000))
update_condition = Atomic{Bool}(false)
overwrite = true
run_task(task_name, task_info, dependencies_info, system, results_container, update_condition, overwrite)
This example demonstrates how to define and execute a task within a workflow, updating the results container as necessary.
FACT_workflow_manager.FACT_workflow_manager
— ModuleModule FACT_workflow_manager
The FACT_workflow_manager
module is designed to facilitate the execution of tasks defined in workflow files, particularly in the context of data analysis and processing. It leverages Julia's capabilities for multithreading, data manipulation, and database interaction, as well as Docker container management for isolated execution environments. The module integrates functionalities for parsing workflow definitions, managing Docker containers, executing tasks based on their dependencies, and handling the input/output of results.
Dependencies
Base.Threads
: For utilizing multithreading capabilities to improve performance and manage concurrent executions.TOML
: To parse workflow definitions from TOML files.MySQL
: To interact with MySQL databases for storing and retrieving task execution results.DataFrames
: For handling and manipulating data within tasks.Random
,Dates
: Utilized for generating random data and managing date/time operations respectively.FACTResultsIO
,FACTDataReader
,JuliaDockerClient
: Custom modules or packages for managing results I/O, reading data specific to the FACT framework, and interacting with Docker containers.
Key Components
TOMLParser.jl
: Contains functions for parsing workflow definitions from TOML formatted files.Containers.jl
: Includes utilities for managing Docker containers, such as starting, stopping, and interacting with containers.Task.jl
: Defines the structure and execution logic for tasks within the workflow, including parameter processing and task execution.Runner.jl
: Implements the logic for running the workflow, managing the execution order of tasks based on dependencies, and orchestrating the overall workflow execution.
Exported Functions
run_file
: The primary interface for executing a workflow defined in a TOML file. It takes the path to a workflow file as input and manages the execution of the defined tasks.
Usage Example
To execute a workflow defined in a TOML file located at /path/to/workflow.toml
, ensure that the FACTWorkflowManager
module is included in your project, and then call:
using FACTworkflowmanager
run_file("/path/to/workflow.toml")
This will initiate the workflow execution process, handling task dependencies, Docker container management, and results I/O as defined in the workflow file.