Class Pipeline

Inheritance Relationships

Base Type

  • public ABC

Derived Types

Class Documentation

stage.Pipeline : public ABC
An interface class representing a sequence of transformations/actions to be performed
to store data in the AMS kosh-store. The actions can be performed either sequentially,
or in parallel using different poclies/vehicles (threads or processes).

Attributes:
    ams_config: The AMS configuration required when publishing to the AMS store.
    dest_dir: The final path to store data to.
    stage_dir: An intermediate location to store files. Usefull if the configuration requires
        storing the data in some scratch directory (SSD) before making them public to the parallel filesystem.
    actions: A list of actions to be performed before storing the data in the filesystem
    db_type: The file format of the data to be stored
    writer: The class to be used to write data to the filesystem.

Subclassed by stage.FSPipeline, stage.RMQPipeline

Public Functions

__init__(self, application_name, dest_dir, db_url, db_type='dhdf5')
initializes the Pipeline class to write the final data in the 'dest_dir' using a file writer of type 'db_type'
and optionally caching the data in the 'stage_dir' before making them available in the cache store.
signal_wrapper(self, name, pid)
init_signals(self)
release_signals(self)
add_user_action(self, obj)
Adds an action to be performed at the data before storing them in the filesystem

Args:
    callback: A callback to be called on every input, output.
execute(self, policy)
Execute the pipeline of tasks using the specified policy (blocking).

Args:
    policy: The policy to be used to execute the pipeline
requires_model_update(self)
Returns whether the pipeline provides a model-update message parsing mechanism
get_model_update_task(self, o_queue, policy)
get_load_task(self, o_queue, policy)
Callback to the child class to return the task that loads data from some unspecified entry-point.
from_cli(cls)
shutdown(self)

Public Members

application_name
dest_dir
user_action
db_type
db_url
released
signals
original_handlers

Public Static Functions

add_cli_args(parser)
Initialize root pipeline class cli parser with the options.
get_q_type(policy)
Returns the type of the queue to be used to create Queues for the specified policy.

Public Static Attributes

supported_policies = {"sequential", "thread", "process"}
supported_writers = {"shdf5", "dhdf5"}

Protected Functions

_seq_execute(self)
Executes all tasks sequentially. Every task starts after all incoming messages
are processed by the previous task.
_parallel_execute(self, exec_vehicle_cls)
parallel execute of all tasks using the specified vehicle type

Args:
    exec_vehicle_cls: The class to be used to generate entities
    executing actions by reading data from i/o_queue(s).
_execute_tasks(self, policy)
Executes all tasks using the specified policy

Args:
    policy: The policy to be used to execute the pipeline
Links all actions/stages of the pipeline with input/output queues.

Args:
    policy: The policy to be used to execute the pipeline

Protected Attributes

_writer
_executors
_tasks
_queues