4.4. Adding StagesΒΆ

We have looked at creating a simple bag of tasks application, adding data movement, creating BoT with varying bag size. What’s next? Let’s add more stages to the pattern!

You can download the entire script for this section here or find it in your virtualenv under share/radical.ensemblemd/user_guide/scripts.

In this section, we will modify the example from the previous section by adding another stage, which concatenates two files and transfers the output back to the localhost. One of the files is staged from localhost and the other file is the file from stage_1.The kernel for concatenation of files is already a part of Ensemble MD. This is how our stage_2 looks:

k = Kernel(name="misc.cat")
k.upload_input_data = ['./input_file_2.txt > file2.txt']
k.copy_input_data = ['$STAGE_1/temp.txt > file1.txt']
k.arguments = ["--file1=file1.txt","--file2=file2.txt"]
k.download_output_data = ['./file1.txt > output_file.txt']

Also need to specify the number of stages during pattern object creation:

A few points to note:

  • We upload a secondary file in stage_2 called input_file_2.txt.
  • As part of the pattern, the data at each stage can be referred by $STAGE_*. So in stage_2 we refer to the output of stage_1 by $STAGE_1. We create a copy of the temp.txt created in stage_1 and rename it to file1.txt
  • We concatenate file2.txt to file1.txt and download file1.txt

All the data references can be found here.

To run the script, simply execute the following from command line:

RADICAL_ENTK_VERBOSE=REPORT python add_stages.py

You can generate a more verbose output by setting RADICAL_ENTK_VERBOSE=INFO.

A look at the complete code in this section:

import sys
import os
import json

from radical.ensemblemd import Kernel
from radical.ensemblemd import PoE
from radical.ensemblemd import EnsemblemdError
from radical.ensemblemd import ResourceHandle

# ------------------------------------------------------------------------------
# Set default verbosity

if os.environ.get('RADICAL_ENTK_VERBOSE') == None:

class MyApp(PoE):

	def __init__(self, stages,instances):
		 PoE.__init__(self, stages,instances)

	def stage_1(self, instance):
		k = Kernel(name="misc.hello")
		k.upload_input_data = ['./input_file.txt > temp.txt']
		k.arguments = ["--file=temp.txt"]
		k.download_output_data = ['./temp.txt > output_file_{0}.txt'.format(instance)]
		return k

	def stage_2(self, instances):

		k = Kernel(name="misc.cat")
		k.upload_input_data = ['./output_file_2.txt > file2.txt']
		k.copy_input_data = ['$STAGE_1/temp.txt > file1.txt']
		k.arguments = ["--file1=file1.txt","--file2=file2.txt"]
		k.download_output_data = ['./file1.txt > output_file.txt']
		return k

if __name__ == "__main__":

	# use the resource specified as argument, fall back to localhost
	if   len(sys.argv)  > 2: 
		print 'Usage:\t%s [resource]\n\n' % sys.argv[0]
	elif len(sys.argv) == 2: 
		resource = sys.argv[1]
		resource = 'local.localhost'


		with open('%s/config.json'%os.path.dirname(os.path.abspath(__file__))) as data_file:    
			config = json.load(data_file)

		# Create a new resource handle with one resource and a fixed
		# number of cores and runtime.
		cluster = ResourceHandle(

				access_schema = config[resource]['schema'],
				queue = config[resource]['queue'],

		os.system('/bin/echo Welcome! > input_file.txt')

		# Allocate the resources.

		# Set the 'instances' of the BagofTasks to 16. This means that 16 instances
		# of each BagofTasks step are executed.
		app = MyApp(stages=2,instances=16)


	except EnsemblemdError, er:

		print "Ensemble MD Toolkit Error: {0}".format(str(er))
		raise # Just raise the execption again to get the backtrace

		# Deallocate the resources. 


And that’s pretty much all there is to Ensemble Toolkit. The rest is really all about defining your application logic with the different patterns and Kernels. We will go through some examples throughout the rest of this User guide.