4.2. Adding DataΒΆ

Now that we have a basic example running, let’s try to add some data to this application! Our kernel in the first stage, creates a file if it does not already exist and adds Hello World to the file. In this section, we’ll try to first transfer an input file (“input_file.txt”) from localhost, add Hello World to this file and transfer it back to the localhost.

You can download the complete code for this section here or find it in your virtualenv under share/radical.ensemblemd/user_guide/scripts.

To do this, we just have to add the upload_input_data and download_output_data properties of the kernel. This is how our new kernel should look:

k = Kernel(name="misc.hello")
k.upload_input_data = ['./input_file.txt']
k.arguments = ["--file=input_file.txt"]
k.download_output_data = ['./input_file.txt']

It is also possible to rename the files while staging them! Let’s rename “input_file.txt” to “temp.txt”. The local file still is named “input_file.txt”, but the same file on remote will be called “temp.txt”. We will also name the downloaded file to “output_file.txt”. Let’s look at how we can do that:

k = Kernel(name="misc.hello")
k.upload_input_data = ['./input_file.txt > temp.txt']
k.arguments = ["--file=temp.txt"]
k.download_output_data = ['./temp.txt > output_file.txt']

We can simply rename files by using the ‘>’ operator. The convention is as follows:

['old_name > new_name']

All the data staging properties of the Kernel can be found in the API discussion here.

To run the script, simply execute the following from the command line:

RADICAL_ENTK_VERBOSE=REPORT python add_data.py

You can generate a more verbose output by setting RADICAL_ENTK_VERBOSE=INFO.

A look at the complete code in this section:

import sys
import os
import json

from radical.ensemblemd import Kernel
from radical.ensemblemd import PoE
from radical.ensemblemd import EnsemblemdError
from radical.ensemblemd import ResourceHandle

# ------------------------------------------------------------------------------
# Set default verbosity

if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
	os.environ['RADICAL_ENTK_VERBOSE'] = 'REPORT'


class MyApp(PoE):

	def __init__(self, stages,instances):
		 PoE.__init__(self, stages,instances)

	def stage_1(self, instance):
		k = Kernel(name="misc.hello")
		k.upload_input_data = ['./input_file.txt > temp.txt']
		k.arguments = ["--file=temp.txt"]
		k.download_output_data = ['./temp.txt > output_file.txt']
		return k

if __name__ == "__main__":

	# use the resource specified as argument, fall back to localhost
	if   len(sys.argv)  > 2: 
		print 'Usage:\t%s [resource]\n\n' % sys.argv[0]
		sys.exit(1)
	elif len(sys.argv) == 2: 
		resource = sys.argv[1]
	else: 
		resource = 'local.localhost'

	try:

		with open('%s/config.json'%os.path.dirname(os.path.abspath(__file__))) as data_file:    
			config = json.load(data_file)


		# Create a new resource handle with one resource and a fixed
		# number of cores and runtime.
		cluster = ResourceHandle(
				resource=resource,
				cores=config[resource]["cores"],
				walltime=15,
				#username=None,

				project=config[resource]['project'],
				access_schema = config[resource]['schema'],
				queue = config[resource]['queue'],
				database_url='mongodb://rp:rp@ds015335.mlab.com:15335/rp',
			)

		os.system('/bin/echo Welcome! > input_file.txt')

		# Allocate the resources.
		cluster.allocate()

		# Set the 'instances' of the BagofTasks to 1. This means that 1 instance
		# of each BagofTasks stage is executed.
		app = MyApp(stages=1,instances=1)

		cluster.run(app)

	except EnsemblemdError, er:

		print "Ensemble MD Toolkit Error: {0}".format(str(er))
		raise # Just raise the execption again to get the backtrace

	try:
		# Deallocate the resources. 
		cluster.deallocate()

	except:
		pass