Class Pipeline¶
Defined in File stage.py
Inheritance Relationships¶
Base Type¶
public ABC
Derived Types¶
public stage.FSPipeline(Class FSPipeline)public stage.RMQPipeline(Class RMQPipeline)
Class Documentation¶
- stage.Pipeline : public ABC
An interface class representing a sequence of transformations/actions to be performed to store data in the AMS kosh-store. The actions can be performed either sequentially, or in parallel using different poclies/vehicles (threads or processes). Attributes: ams_config: The AMS configuration required when publishing to the AMS store. dest_dir: The final path to store data to. stage_dir: An intermediate location to store files. Usefull if the configuration requires storing the data in some scratch directory (SSD) before making them public to the parallel filesystem. actions: A list of actions to be performed before storing the data in the filesystem db_type: The file format of the data to be stored writer: The class to be used to write data to the filesystem.
Subclassed by stage.FSPipeline, stage.RMQPipeline
Public Functions
- __init__(self, application_name, dest_dir, db_url, db_type='dhdf5')¶
initializes the Pipeline class to write the final data in the 'dest_dir' using a file writer of type 'db_type' and optionally caching the data in the 'stage_dir' before making them available in the cache store.
- signal_wrapper(self, name, pid)¶
- init_signals(self)¶
- release_signals(self)¶
- add_user_action(self, obj)¶
Adds an action to be performed at the data before storing them in the filesystem Args: callback: A callback to be called on every input, output.
- execute(self, policy)¶
Execute the pipeline of tasks using the specified policy (blocking). Args: policy: The policy to be used to execute the pipeline
- requires_model_update(self)¶
Returns whether the pipeline provides a model-update message parsing mechanism
- get_model_update_task(self, o_queue, policy)¶
- get_load_task(self, o_queue, policy)¶
Callback to the child class to return the task that loads data from some unspecified entry-point.
- from_cli(cls)¶
- shutdown(self)¶
Public Members
- application_name¶
- dest_dir¶
- user_action¶
- db_type¶
- db_url¶
- released¶
- signals¶
- original_handlers¶
Public Static Functions
- add_cli_args(parser)¶
Initialize root pipeline class cli parser with the options.
- get_q_type(policy)¶
Returns the type of the queue to be used to create Queues for the specified policy.
Public Static Attributes
- supported_policies = {"sequential", "thread", "process"}¶
- supported_writers = {"shdf5", "dhdf5"}¶
Protected Functions
- _seq_execute(self)¶
Executes all tasks sequentially. Every task starts after all incoming messages are processed by the previous task.
- _parallel_execute(self, exec_vehicle_cls)¶
parallel execute of all tasks using the specified vehicle type Args: exec_vehicle_cls: The class to be used to generate entities executing actions by reading data from i/o_queue(s).
- _execute_tasks(self, policy)¶
Executes all tasks using the specified policy Args: policy: The policy to be used to execute the pipeline
- _link_pipeline(self, policy)¶
Links all actions/stages of the pipeline with input/output queues. Args: policy: The policy to be used to execute the pipeline