4.5. Adding Data

Data movement is one of the important elements of any application. In this section, we will take a look at how we can move data between different tasks.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/user_guide/scripts.

In the following example, we will create a Pipeline of two Stages each with one Task. In the first stage, we will create a file of size 1MB. In the next stage, we will perform a character count on the same file and write to another file. Finally, we will bring that output to the current location.

Since we already know how to create this workflow, we simply present the code snippet concerned with the data movement. The task in the second stage needs to perform two data movements: a) copy the data created in the first stage to its current directory and b) bring the output back to the directory where the script is executed. Below we present the statements that perform these operations.

# Copy data from the task in the first stage to the current task's location
t2.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/output.txt'%(p.uid, s1.uid, t1.uid)]
# Download the output of the current task to the current location
t2.download_output_data = ['ccount.txt']

To run the script, simply execute the following from the command line:

Tip

For the purposes of this user guide, we have a MongoDB setup to use. Please run the following command to use it:

export RADICAL_PILOT_DBURL="mongodb://user:user@ds247688.mlab.com:47688/entk-docs"
python add_data.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

from radical.entk import Pipeline, Stage, Task, AppManager, ResourceManager
import os

# ------------------------------------------------------------------------------
# Set default verbosity

if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
    os.environ['RADICAL_ENTK_VERBOSE'] = 'INFO'

if __name__ == '__main__':

    # Create a Pipeline object
    p = Pipeline()

    # Create a Stage object 
    s1 = Stage()

    # Create a Task object which creates a file named 'output.txt' of size 1 MB
    t1 = Task()    
    t1.executable = ['/bin/bash']   
    t1.arguments = ['-l', '-c', 'base64 /dev/urandom | head -c 1000000 > output.txt'] 

    # Add the Task to the Stage
    s1.add_tasks(t1)

    # Add Stage to the Pipeline
    p.add_stages(s1)


    # Create another Stage object
    s2 = Stage()
    s2.name = 'Stage 2'

    # Create a Task object
    t2 = Task()
    t2.executable = ['/bin/bash']    
    t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt']  
    # Copy data from the task in the first stage to the current task's location
    t2.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/output.txt'%(p.uid, s1.uid, t1.uid)]
    # Download the output of the current task to the current location
    t2.download_output_data = ['ccount.txt']

    # Add the Task to the Stage
    s2.add_tasks(t2)

    # Add Stage to the Pipeline
    p.add_stages(s2)


    # Create a dictionary describe four mandatory keys:
    # resource, walltime, cores and project
    # resource is 'local.localhost' to execute locally
    res_dict = {

            'resource': 'local.localhost',
            'walltime': 10,
            'cores': 1,
            'project': '',
    }

    # Create Resource Manager object with the above resource description
    rman = ResourceManager(res_dict)

    # Create Application Manager
    appman = AppManager()

    # Assign resource manager to the Application Manager
    appman.resource_manager = rman

    # Assign the workflow as a set of Pipelines to the Application Manager
    appman.assign_workflow(set([p]))

    # Run the Application Manager
    appman.run()