Amazon EMR 云计算服务编程实践
EMR是Amazon亚马逊云计算平台提供的一项服务,用户可以在此平台使用亚马逊强大的计算资源执行Map Reduce程序。由于Map Reduce很多情况下都是做的海量数据文本统计类的并行计算任务,需要耗费很多时间,使用云计算则可以大大加快执行速度。
EMR建立在Amazon S3和Amazon EC2的 基础上。用户提交一个map程序和一个reduce程序,同时提交需要处理的数据文件作为输入。这些都上传到Amazon S3云端存储平台,在EMR中指定相应的S3路径,就可以开始做数据处理了。EMR会根据用户指定的规模配置,开启一个EC2集群,在每个节点上运行 Hadoop。运行结束后用户可以从S3获取结果数据。下面以word count单词统计任务为例,介绍具体操作过程。
1. 登录Amazon AWS控制台,选择新建任务(点击查看大图)
2. 在运行Map Reduce任务前,我们需要先将Map和Reduce程序,以及待分析的输入文件上传到S3中
3. 回到新建任务上来,输入任务信息
4. 程序位置、输入文件位置等信息,这些文件都预先上传到了S3里
Mapper和Reducer程序,请参考Word Count。
5. 配置EC2实例相关参数
6. 此步可跳过
7. 所有任务信息汇总确认
8. 监控Map Reduce任务运行状态
9. 任务执行完毕
10. 从S3中查看或下载输出结果
Amazon的云计算平台提供了很多API,使得开发者能根据自己的需要与云端交互。boto(Amazon官方介绍)就是一个python API,作为一个脚本语言,使用python可以以简短的代码写出原型程序,快速实现或检验自己的想法。下面是使用boto与Amazon EMR交互的例子。
#!/usr/bin/python
#
# Amazon EMR Interface
# A program to run and monitor streaming job flow on Amazon EMR
import time
connected = 0
jobid =0
def connect():
access_key = raw_input('Your access key:').strip()
secret_key = raw_input('Your secret key:').strip()
from boto.emr.connection import EmrConnection
global conn
conn = EmrConnection(access_key, secret_key)
global connected
connected = 1
def run():
if connected == 0:
print 'Not connected!'
elif connected == 1:
s_mapper = raw_input('Path of the mapper:').strip()
s_reducer = raw_input('Path of the reducer:').strip()
s_input = raw_input('Path of input files:').strip()
s_output = raw_input('Path for storing output files:').strip()
from boto.emr.step import StreamingStep
step = StreamingStep(name='My steps', mapper=s_mapper, reducer=s_reducer, input=s_input, output=s_output)
jf_name = raw_input('Name of job flow:').strip()
jf_log = raw_input('Path of logs:').strip()
instance_type = raw_input('Type of instance:').strip()
instance_num = raw_input('Number of instance:').strip()
global jobid
jobid = conn.run_jobflow(name=jf_name,log_uri=jf_log, steps=[step], slave_instance_type=instance_type,num_instances=int(instance_num))
print 'jobid = '+ jobid
while True:
time.sleep(5)
status = conn.describe_jobflow(jobid)
print status.state
if 'ENDED' in status.state:
break
def showMenu():
title = '''
Amazon EMR Service
connect Connect to Amazon EMR
run Input job flow info and run on Amazon EMR
quit Quit
Enter choice:'''
while True:
choice = raw_input(title).strip().lower()
choices = ['connect','run','quit']
if choice not in choices:
print('Input Error!')
else:
if choice == 'quit':
break
elif choice == 'connect':
connect()
elif choice == 'run':
run()
if __name__ == '__main__':
showMenu()
|

QQ咨询









