MapReduce它可以编写应用措施来处理惩罚海量数据,并行,大集群的普通硬件,以靠得住的方法的框架。
MapReduce是什么?
MapReduce是一种处理惩罚技能和措施模子基于Java的漫衍式计较。 MapReduce算法包括了两项重要任务,即Map 和 Reduce。Map回收了一组数据,并将其转换成另一组数据,个中,各个元件被解析成元组(键/值对)。其次,淘汰任务,这需要从Map 作为输入并组合那些数据元构成的一组小的元组输出。作为MapReduce体现的名称的序列在Map功课之后执行reduce任务。
MapReduce主要利益是,它很容易大局限数据处理惩罚在多个计较节点。下面MapReduce模子中,数据处理惩罚的原语被称为映射器和减速器。解析数据处理惩罚应用到映射器和减速器有时是普通的。可是编写MapReduce形式的应用,扩展应用措施运行在几百,几千,甚至几万机集群中的仅仅是一个设置的变动。这个简朴的可扩展性是吸引了浩瀚措施员利用MapReduce模子。
算法
凡是MapReduce典型是基于向发送计较机数据的位置!
MapReduce打算分三个阶段执行,即映射阶段,shuffle阶段,并淘汰阶段。
映射阶段:映射或映射器的事情是处理惩罚输入数据。一般输入数据是在文件或目次的形式,而且被存储在Hadoop的文件系统(HDFS)。输入文件被通报到由线映射器成果线路。映射器处理惩罚该数据,并建设数据的若干小块。
淘汰阶段:这个阶段是:Shuffle阶段和Reduce阶段的组合。减速器的事情是处理惩罚该来自映射器中的数据。处理惩罚之后,它发生一组新的输出,这将被存储在HDFS。
在一个MapReduce事情,Hadoop的发送Map和Reduce任务到集群的相应处事器。
框架打点数据通报譬喻发出任务的所有节点之间的集群周围的具体信息,验证任务完成,和复制数据。
大部门的计较产生在与在当地磁盘上,可以淘汰网络通信量数据的节点。
给定的任务完成后,将群集收集并淘汰了数据,以形成一个符合的功效,而且将其发送回Hadoop处事器。
输入和输出(Java透视图)
MapReduce框架上的<key, value>对操纵,也就是框架视图的输入事情作为一组<key, value>对,并发生一组<key, value>对作为功课的输出可以在差异的范例。
键和值类在框架连载的方法,因此,需要实现接口。另外,键类必需实现可写,可比的接口,以利便框架排序。MapReduce事情的输入和输出范例:(输入)<k1, v1> ->映射 – ><k2, v2>-> reduce – ><k3, v3>(输出)。
输入 | 输出 | |
---|---|---|
Map | <k1, v1> | list (<k2, v2>) |
Reduce | <k2, list(v2)> | list (<k3, v3>) |
术语
PayLoad – 应用措施实现映射和淘汰成果,形成事情的焦点。
Mapper – 映射器的输入键/值对映射到一组中间键/值对。
NamedNode – 节点打点Hadoop漫衍式文件系统(HDFS)。
DataNode – 节点数据出此刻任那里理惩罚产生之前。
MasterNode – 节点地址JobTracker运行并接管来自客户端功课请求。
SlaveNode – 节点地址Map和Reduce措施运行。
JobTracker – 调治功课并跟踪功课分派给任务跟踪器。
Task Tracker – 跟踪任务和陈诉状态的JobTracker。
Job -措施在整个数据集映射器和减速的执行。
Task – 一个映射措施的执行或对数据的一个片断的减速器。
Task Attempt – 一种实验的特定实例在SlaveNode执行任务。
示例场景
#p#分页标题#e#
下面给出是关于一个组织的电耗损量的数据。它包括了每月的用电量,各年的平均。
Jan | Feb | Mar | Apr | May | Jun | Jul | Aug | Sep | Oct | Nov | Dec | Avg | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
#p#分页标题#e#
假如上述数据作为输入,我们需要编写应用措施来处理惩罚它而发生的功效,如发明最大利用量,最低利用年份,依此类推。这是一个轻松取胜用于记录有限数目标编程器。他们将编写简朴地逻辑,以发生所需的输出,而且将数据通报到写入的应用措施。
可是,代表一个特定状态下所有的大局限财富的电力耗损数据。
#p#分页标题#e#
当我们编写应用措施来处理惩罚这样的大量数据,
为了办理这些问题,利用MapReduce框架。
输入数据
#p#分页标题#e#
上述数据被生存为 sample.txt 并作为输入。输入文件看起来如下所示。
1979 23 23 2 43 24 25 26 26 26 26 25 26 25 1980 26 27 28 28 28 30 31 31 31 30 30 30 29 1981 31 32 32 32 33 34 35 36 36 34 34 34 34 1984 39 38 39 39 39 41 42 43 40 39 38 38 40 1985 38 39 39 39 39 41 41 41 00 40 39 39 45
示例措施
下面给出的是利用MapReduce框架的样本数据的措施。
package hadoop; import java.util.*; import java.io.IOException; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class ProcessUnits { //Mapper class public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,/*Input key Type */ Text, /*Input value Type*/ Text, /*Output key Type*/ IntWritable> /*Output value Type*/ { //Map function public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String lasttoken = null; StringTokenizer s = new StringTokenizer(line,"\t"); String year = s.nextToken(); while(s.hasMoreTokens()) { lasttoken=s.nextToken(); } int avgprice = Integer.parseInt(lasttoken); output.collect(new Text(year), new IntWritable(avgprice)); } } //Reducer class public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > { //Reduce function public void reduce( Text key, Iterator <IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxavg=30; int val=Integer.MIN_VALUE; while (values.hasNext()) { if((val=values.next().get())>maxavg) { output.collect(key, new IntWritable(val)); } } } } //Main function public static void main(String args[])throws Exception { JobConf conf = new JobConf(Eleunits.class); conf.setJobName("max_eletricityunits"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(E_EMapper.class); conf.setCombinerClass(E_EReduce.class); conf.setReducerClass(E_EReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
生存上述措施作为ProcessUnits.java。编译和执行的措施如下的说明。
编译和执行历程单元措施
让我们假设是在Hadoop的用户(如/home/hadoop)的主目次。
凭据下面给出编译和执行上面措施的步调。
第1步
下面的呼吁是建设一个目次来存储编译的Java类。
$ mkdir units
第2步
#p#分页标题#e#
下载Hadoop-core-1.2.1.jar,它用于编译和执行MapReduce措施。会见以下链接 http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1下载JAR。假设下载的文件夹是 /home/hadoop/.
第3步
下面的呼吁用于编译ProcessUnits.java措施并建设一个jar措施。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java $ jar -cvf units.jar -C units/ .
第4步
下面的呼吁用来建设一个输入目次在HDFS中。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
第5步
下面的呼吁用于复拟定名sample.txt在HDFS输入目次中输入文件。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
第6步
下面的呼吁用来验证在输入目次中的文件。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
第7步
下面的呼吁用于通过从输入目次以输入文件来运行Eleunit_max应用。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
期待一段时间,直到执行文件。在执行后,如下图所示,输出将包括输入支解的数目,映射任务数,减速器任务的数量等。
INFO mapreduce.Job: Job job_1414748220717_0002 completed successfully 14/10/31 06:02:52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=61 FILE: Number of bytes written=279400 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=546 HDFS: Number of bytes written=40 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=146137 Total time spent by all reduces in occupied slots (ms)=441 Total time spent by all map tasks (ms)=14613 Total time spent by all reduce tasks (ms)=44120 Total vcore-seconds taken by all map tasks=146137 Total vcore-seconds taken by all reduce tasks=44120 Total megabyte-seconds taken by all map tasks=149644288 Total megabyte-seconds taken by all reduce tasks=45178880 Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=45 Map output materialized bytes=67 Input split bytes=208 Combine input records=5 Combine output records=5 Reduce input groups=5 Reduce shuffle bytes=6 Reduce input records=5 Reduce output records=5 Spilled Records=10 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=948 CPU time spent (ms)=5160 Physical memory (bytes) snapshot=47749120 Virtual memory (bytes) snapshot=2899349504 Total committed heap usage (bytes)=277684224 File Output Format Counters Bytes Written=40
第8步
#p#分页标题#e#
下面的呼吁用来验证在输出文件夹所得文件。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
第9步
下面的呼吁是用来查察输出Part-00000文件。该文件由HDFS发生。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
下面是由MapReduce的措施所发生的输出。
1981 34 1984 40 1985 45
第10步
以下呼吁用于从HDFS输出文件夹复制到当地文件系统举办阐明。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
重要呼吁
所有的Hadoop呼吁是由$HADOOP_HOME/bin/hadoop呼吁挪用。不带任何参数运行Hadoop剧本打印所有呼吁的描写。
Usage : hadoop [–config confdir] COMMAND
下表列出了可用的选项及其说明。
操纵 | 描写 |
---|---|
namenode -format | 名目化DFS文件系统。 |
secondarynamenode | 运行DFS二次名称节点。 |
namenode | 运行DFS名称节点。 |
datanode | 运行DFS的Datanode。 |
dfsadmin | 运行DFS打点客户端。 |
mradmin | 运行映射,淘汰打点客户端。 |
fsck | 运行DFS文件系统查抄东西。 |
fs | 运行一个通用的文件系统的用户客户端。 |
balancer | 运行集群均衡东西。 |
oiv | 合用于离线FsImage查察器的fsimage。 |
fetchdt | 从NameNode获取团令牌。 |
jobtracker | 运行MapReduce事情跟踪节点。 |
pipes | 运行管道的事情。 |
tasktracker | 运行MapReduce任务跟踪节点。 |
historyserver | 运行功课汗青记录处事器作为一个独立的守护历程。 |
job | 哄骗MapReduce事情。 |
queue | 获取有关功课行列信息。 |
version | 打印版本。 |
jar <jar> | 运行一个jar文件。 |
distcp <srcurl> <desturl> | 复制文件或目次的递归。 |
distcp2 <srcurl> <desturl> | DistCp第2版。 |
archive -archiveName NAME -p | 建设一个Hadoop的归档。 |
<parent path> <src>* <dest> | |
classpath | 打印需要获得Hadoop jar和所需要的库的类路径。 |
daemonlog | 为每个守护历程获取/配置日志级别 |
如何与MapReduce事情互动
Usage: hadoop job [GENERIC_OPTIONS]
#p#分页标题#e#
以下是在一个Hadoop的功课可用通用选项。
GENERIC_OPTIONS | 描写 |
---|---|
-submit <job-file> | 提交功课。 |
status <job-id> | 打印映射,并淘汰完成的百分比以及所有的事情的计数器。 |
counter <job-id> <group-name> <countername> | 打印的计数器值。 |
-kill <job-id> | 终止任务。 |
-events <job-id> <fromevent-#> <#-of-events> | 打印吸收到JobTracker为给定范畴内的事件的具体信息。 |
-history [all] <jobOutputDir> – history < jobOutputDir> | 打印功课的具体信息,未能终止提示具体信息。有关功课的更多具体信息,如每个任务取得乐成的任务,任务可以实验通过指定[all]选项中查察。 |
-list[all] | 显示所有功课。-list 只显示尚未完成的功课。 |
-kill-task <task-id> | 终止任务。终止任务不计入失败的实验。 |
-fail-task <task-id> | 失败的任务。失败的任务都算对失败的实验。 |
set-priority <job-id> <priority> | 变动功课的优先级。答允优先级值:VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW |
要查察功课的状态
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
要查察功课汗青在output-dir
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> e.g. $ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
终止任务
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004