processing_layer
Processing Layer's Protocol classes.
This module contains base Protocol classes for OM's Processing Layer.
OmProcessingProtocol
Bases: Protocol
See documentation for the __init__
function.
__init__(*, monitor_parameters)
Protocol for OM's Processing classes.
Processing classes implement scientific data processing pipelines in OM. A Processing class defines how each individual retrieved data event is analyzed on the processing nodes and how multiple events are aggregated on the collecting node. A Processing class also determined which actions OM performs at the beginning and at the end of the data processing.
This Protocol class describes the interface that every Processing class in OM must implement.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
monitor_parameters |
MonitorParameters
|
An object storing OM's configuration parameters. |
required |
initialize_processing_node(*, node_rank, node_pool_size)
Initializes an OM processing node.
This function is invoked on each processing node when OM starts. It performs all the operations needed to prepares the node to retrieve and process data events (recovering additional needed external data, initializing the algorithms with all required parameters, etc.)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_rank |
int
|
The OM rank of the current node int the OM node pool. The rank is an integer that unambiguously identifies the node in the pool. |
required |
node_pool_size |
int
|
The total number of nodes in the OM pool, including all the processing nodes and the collecting node. |
required |
initialize_collecting_node(*, node_rank, node_pool_size)
Initializes an OM collecting node.
This function is invoked on the collecting node when OM starts. It performs all the operation needed to prepare the collecting node to aggregate events received from the processing nodes (creating memory buffers, initializing the collecting algorithm, etc.)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_rank |
int
|
The OM rank of the current node int the OM node pool. The rank is an integer that unambiguously identifies the node in the pool. |
required |
node_pool_size |
int
|
The total number of nodes in the OM pool, including all the processing nodes and the collecting node. |
required |
process_data(*, node_rank, node_pool_size, data)
Processes a single data event.
This function is invoked on each processing node for every retrieved data event. It receives the data event as input and returns processed data. The output of this function is transferred by OM to the collecting node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_rank |
int
|
The OM rank of the current node int the OM node pool. The rank is an integer that unambiguously identifies the node in the pool. |
required |
node_pool_size |
int
|
The total number of nodes in the OM pool, including all the processing nodes and the collecting node. |
required |
data |
Dict[str, Any]
|
A dictionary containing the data retrieved by OM for the data event being processed.
|
required |
Returns:
Type | Description |
---|---|
Tuple[Dict[str, Any], int]
|
A tuple with two entries, with the first entry being a dictionary storing the processed data that should be sent to the collecting node, and the second being the OM rank number of the node that processed the information. |
wait_for_data(*, node_rank, node_pool_size)
Performs operations on the collecting node when no data is received.
This function is called on the collecting node continuously, when the node is
not receiving data from any processing node (When data is received, the
collect_data
is invoked instead). This function can be used to perform operations that need
to be carried out even when the data stream is not active (reacting to external
commands and requests, updating graphical interfaces, etc.)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_rank |
int
|
The OM rank of the current node, which is an integer that unambiguously identifies the current node in the OM node pool. |
required |
node_pool_size |
int
|
The total number of nodes in the OM pool, including all the processing nodes and the collecting node. |
required |
collect_data(*, node_rank, node_pool_size, processed_data)
Collects processed data from a processing node.
This function is invoked on the collecting node every time data is received
from a processing node (When data is not being received, the collecting node
continuously calls the
wait_for_data
function instead). The function accepts as input the data received from
the processing node (the tuple returned by the
process_data
method of this class), and performs calculations that must be carried out on
aggregated data (computing cumulative statistics, preparing data for external
programs or visualization, etc.)
The function usually does not return any value, but can optionally return a nested dictionary (a dictionary whose values are other dictionaries). When this happens, the data in the dictionary is provided as feedback data to the processing nodes. The nested dictionary must have the following format:
-
The keys of the outer dictionary must match the OM rank numbers of the processing nodes which receive the feedback data. A key value of 0 can be used to send feedback data to all the processing nodes at the same time.
-
The value corresponding to each key of the outer dictionary must in turn be a dictionary that stores the feedback data that is sent to the node defined by the key.
-
On each processing node, the feedback data dictionary, when received, is merged with the
data
argument of theprocess_data
function the next time the function is called.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_rank |
int
|
The OM rank of the current node, which is an integer that unambiguously identifies the current node in the OM node pool. |
required |
node_pool_size |
int
|
The total number of nodes in the OM pool, including all the processing nodes and the collecting node. |
required |
processed_data |
Tuple[Dict, int]
|
A tuple whose first entry is a dictionary storing the data received from a processing node, and whose second entry is the OM rank number of the node that processed the information. |
required |
Returns:
Type | Description |
---|---|
Union[Dict[int, Dict[str, Any]], None]
|
Usually nothing. Optionally, a nested dictionary that can be used to send feedback data to the processing nodes. |
end_processing_on_processing_node(*, node_rank, node_pool_size)
Executes end-of-processing actions on a processing node.
This function is called on each processing node at the end of the data processing, immediately before OM stops. It performs clean up and shut down operations (closing communication sockets, computing final statistics, etc.). This function usually does not return any value, but can optionally return a dictionary. If this happens, the dictionary is transferred to the collecting node before the processing node shuts down.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_rank |
int
|
The OM rank of the current node, which is an integer that unambiguously identifies the current node in the OM node pool. |
required |
node_pool_size |
int
|
The total number of nodes in the OM pool, including all the processing nodes and the collecting node. |
required |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], None]
|
Usually nothing. Optionally, a dictionary storing information that must be sent to the processing node. |
end_processing_on_collecting_node(*, node_rank, node_pool_size)
Executes end-of-processing actions on the collecting node.
This function is called on the collecting node at the end of the data processing, immediately before OM stops. It often performs clean up and shut operations (closing communication sockets, computing final statistics, etc.).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_rank |
int
|
The OM rank of the current node, which is an integer that unambiguously identifies the current node in the OM node pool. |
required |
node_pool_size |
int
|
The total number of nodes in the OM pool, including all the processing nodes and the collecting node. |
required |