4.3. Adding InstancesΒΆ

So far in our example, we had only 1 instance of the stage, i.e. a Bag of Tasks with a bag size of 1. Let’s now increase the bag size to 16, keeping the workload the same as in the previous examples. This change is trivial: we simply specify the bag size as the number of instances during pattern object creation:

app = MyApp(stages=1,instances=16)

You can download the entire script for this section here or find it in your virtualenv under share/radical.ensemblemd/user_guide/scripts.

So now we will have 16 instances of stage_1 executed. Two things to note:

  • In the resource handle, we have acquired only 1 core. So these 16 instances will execute one at a time. If we had 2 cores, there will be 2 instances executed concurrently. With 16 cores, all instances execute concurrently. Play around with the number of cores and see how the runtime of the script varies!
  • The output files of all instances are called output.txt, they will overwrite existing files. To differentiate each of the output files, we can simply use the instance as an index in the filename.

Let’s change the name of the ‘staged out’ file by appending to its name the instance number:

k.download_output_data = ['./temp.txt > output_file_{0}.txt'.format(instance)]

To run the script, simply execute the following from command line:

RADICAL_ENTK_VERBOSE=REPORT python add_instances.py

You can generate a more verbose output by setting RADICAL_ENTK_VERBOSE=INFO.

As a result of the execution we will obtain 16 different output files.

A look at the complete code in this section:

import sys
import os
import json

from radical.ensemblemd import Kernel
from radical.ensemblemd import PoE
from radical.ensemblemd import EnsemblemdError
from radical.ensemblemd import ResourceHandle

# ------------------------------------------------------------------------------
# Set default verbosity

if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
	os.environ['RADICAL_ENTK_VERBOSE'] = 'REPORT'


class MyApp(PoE):

	def __init__(self, stages,instances):
		 PoE.__init__(self, stages,instances)

	def stage_1(self, instance):
		k = Kernel(name="misc.hello")
		k.upload_input_data = ['./input_file.txt > temp.txt']
		k.arguments = ["--file=temp.txt"]
		k.download_output_data = ['./temp.txt > output_file_{0}.txt'.format(instance)]
		return k

if __name__ == "__main__":

	# use the resource specified as argument, fall back to localhost
	if   len(sys.argv)  > 2: 
		print 'Usage:\t%s [resource]\n\n' % sys.argv[0]
		sys.exit(1)
	elif len(sys.argv) == 2: 
		resource = sys.argv[1]
	else: 
		resource = 'local.localhost'

	try:

		with open('%s/config.json'%os.path.dirname(os.path.abspath(__file__))) as data_file:    
			config = json.load(data_file)


		# Create a new resource handle with one resource and a fixed
		# number of cores and runtime.
		cluster = ResourceHandle(
				resource=resource,
				cores=config[resource]["cores"],
				walltime=15,
				#username=None,

				project=config[resource]['project'],
				access_schema = config[resource]['schema'],
				queue = config[resource]['queue'],
				database_url='mongodb://rp:rp@ds015335.mlab.com:15335/rp',
			)

		os.system('/bin/echo Welcome! > input_file.txt')

		# Allocate the resources.
		cluster.allocate()

		# Set the 'instances' of the BagofTasks to 16. This means that 16 instances
		# of each BagofTasks step are executed.
		app = MyApp(stages=1,instances=16)

		cluster.run(app)

	except EnsemblemdError, er:

		print "Ensemble MD Toolkit Error: {0}".format(str(er))
		raise # Just raise the execption again to get the backtrace

	try:
		# Deallocate the resources. 
		cluster.deallocate()

	except:
		pass