MapReduce

1. 简介

  MapReduce是一个用于处理海量数据的分布式计算框架。MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

1.1 起因

  • 海量数据在单机上处理因为硬件资源限制,无法胜任
  • 而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度
  • 引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理

这个框架解决了

  • 数据分布式存储
  • 作业调度
  • 容错
  • 机器间通信等复杂问题

1.2 原理

  MapReduce是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义

  • MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。
  • MapReduce是一个并行计算与运行软件框架(Software Framework)。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。
  • MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology)。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理

1.3 思想

  • 分治思想
  • 分解
  • 求解
  • 合并

MapReduce映射

  • 分: map
  • 把复杂的问题分解为若干简单的任务

  • 合: reduce

1.4 执行流程

MapReduce计算框架的执行流程

MapReduce计算流程

MapReduce计算流程

1.4.1 MapReduce结构

  一个完整的mapreduce程序在分布式运行时有三类实例进程:

  • 1、MRAppMaster:负责整个程序的过程调度及状态协调
  • 2、mapTask:负责map阶段的整个数据处理流程
  • 3、ReduceTask:负责reduce阶段的整个数据处理流程

执行过程:

  • 读取HDFS上的文件,每一行通过InputSplit解析成一个<k,v>,每个InputSplit都会分配一个Mapper任务,每个k,v会调用一次map函数。<line0, a b c c>,<line1, d e f>
  • 覆盖map(),接收前面的<k,v>,转换为新的<k,v>输出。<a,1>,<c,1>,<c,1>,<d,1>,<e,1>
  • 上面的输出会先存放在缓存中,每个map都有一个环形内存缓冲区用于存储任务输出(默认大小100Mio.sort.mb属性指定),到达阈值0.8(io.sort.spill.percent)就溢写到指定的本地目录中
  • 对上述的输出溢写到磁盘前进行分区(partitioner),默认是一个分区,分区数量是根据reduce的数量来取模。<0,a,1>,<0,b,1>,<0,c,1>,<0,c,1>

  • 分区后按照<k,v>中的k排序以及分组,分组是指相同keyvalue放到一个集合中。排序后:<a,1>,<b,1>,<c,1>,<c,1>,分组后:<a,{1}>,<b,{1}>,<c,{1,1}>

  • (可选)对分组后的数据进行归约,combiner

1.4.2 MapReduce运行流程

  • 1、一个mapreduce程序启动的时候,最先启动的是MRAppMasteMRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程
  • 2、maptask进程启动之后,根据给定的数据切片范围进行数据处理
  • 3、MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进 程要处理的数据范围(数据分区)
  • 4、Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask 输出结果文件,并在本地进行重新归并排序,然后按照相同keyKV为一个组,调用客户定义的reduce() 方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储

执行过程

  • 多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点上进行shuffle
  • map端复制过来的数据首先会写到reduce端的缓存中,缓存占用到达一定的阈值后会写到磁盘中进行partition、combine、排序等过程,如果形成了多个磁盘文件还会进行合并,最后一次合并的结果作为reduce的输入而不是写入到磁盘中。
  • 最后一次合并的结果作为输入传入到reduce任务中,当reduce输入文件确定后,整个shuffle操作才算最终结束,之后就是reduce的计算,并把结果存到hdfs

2. 案例

2.1 JobTracker进程

  • 主进程,负责接收客户作业提交,调度任务到作节点上运行,并提供诸如监控工作节点状态及任务进度等管理功能,一个MapReduce集群有一个Jobtracker,一般运行在可靠的硬件上。

  • TaskTracker是通过周期性的心跳来通知Jobtracker其当前的健康状态,每一次心跳包含了可用的map和reduce任务数目、占用的数目以及运行中的任务详细信息。Jobtracker利用一个线程池来同时处理心跳和客户请求。

2.2 TaskTracker进程

  • JobTracker指派任务,实例化用户程序,在本地执行任务并周期性地向JobTracker汇报状态。在每一个工 作节点上永远只会有一个TaskTracker
  • JobTracker一直在等待JobClient提交作业
  • TaskTracker每隔3秒向JobTracker发送心跳询问有没有任务可做,如果有,让 其派发任务给它执行
  • Slave主动向master拉生意

MapReduce计算流程

MapReduce计算流程

2.3 调度模式

  默认先进先出队列调度模式(FIFO)

  • 优先级(very_high、high、normal,low,very low)

2.4 执行流程

MapReduce计算流程

MapReduce计算流程

MapReduce计算流程

2.5 文件分发与打包file

  统计1.data文件中的单词或者其他的个数,也就是wordcount。指定计算白名单内单词的wordcount,原理如下:

MapReduce计算流程

Map 输出

- Worker 1
  - (the 1), (weather 1), (is 1), (good 1)
- Worker 2
  - (today 1), (is 1), (good 1)
- Worker 3
  - (good 1), (weather 1), (is 1), (good 1)  

Reduce 输入

- Worker 1 
  - (the 1) 
- Worker 2 
  - (is 1), (is 1), (is 1) 
- Worker 3 
  - (weather 1), (weather 1) 
- Worker 4 
  - (today 1) 
- worker 5 
  - (good 1), (good 1), (good 1), (good 1) 

Reduce 输出

- Worker 1
  - (the 1)
- Worker 2
  - (is 3)
- Worker 3
  - (weather 2)
- Worker 4
  - (today 1)
- Worker 5
  - (good 4) 
[root@master mapreduce]# ls
1.data  a.txt  map_new.py  map.py  red_new.py  red.py  result.local  run.sh  The_Man_of_Property.txt
[root@master mapreduce]# hadoop fs -put 1.data /
[root@master mapreduce]# hadoop fs -ls /
Found 2 items
-rw-r--r--   3 root supergroup    1896621 2019-06-21 13:54 /1.data
-rw-r--r--   3 root supergroup        944 2019-06-20 19:50 /passwd
要统计的文件 1.data
Preface
“My wife can see no one,” he muttered doggedly.
Young Jolyon answered gently: “I shouldn’t keep her a minute.”
Soames brushed by him and barred the way.
“She can see no one,” he said again.
Young Jolyon’s glance shot past him into the hall, and Soames turned. There in the drawing-room doorway stood Irene, her eyes were wild and eager, her lips were parted, her hands outstretched. In the sight of both men that light vanished from her face; her hands dropped to her sides; she stood like stone.
Soames spun round, and met his visitor’s eyes, and at the look he saw in them, a sound like a snarl escaped him. He drew his lips back in the ghost of a smile.
“This is my house,” he said; “I manage my own affairs. I’ve told you once — I tell you again; we are not at home.”
And in young Jolyon’s face he slammed the door.
The End

代码

[root@master mapreduce]# cat run.sh 
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
#INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
INPUT_FILE_PATH_1="/1.data"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map_new.py" \
    -reducer "python red_new.py" \
    -file ./map_new.py \
    -file ./red_new.py

[root@master mapreduce]# cat map_new.py 
import sys
for line in sys.stdin:
    ss = line.strip().split(' ')
    for word in ss:
        print '\t'.join([word.strip(), '1'])

[root@master mapreduce]# cat red_new.py
import sys
cur_word = None
sum = 0
for line in sys.stdin:
    ss = line.strip().split('\t')
    if len(ss) != 2:
        continue
    word, cnt = ss
    if cur_word == None:
        cur_word = word
    if cur_word != word:
        print '\t'.join([cur_word, str(sum)])
        cur_word = word
        sum = 0
    sum += int(cnt)
print '\t'.join([cur_word, str(sum)])

手工测试:

[root@master mapreduce]# cat The_Man_of_Property.txt | python map_new.py |sort -k1 |python red_new.py >cmz.output
查看统计后的部分结果
[root@master mapreduce]# cat cmz.output | sort -k2 -rn |head
the 5144
of  3407
to  2782
and 2573
a   2543
he  2139
his 1912
was 1702
in  1694
had 1526

集群测试

[root@master mapreduce]# bash run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
rmr: `/output': No such file or directory
19/06/21 14:03:27 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./map_new.py, ./red_new.py, /tmp/hadoop-unjar7250046706565071409/] [] /tmp/streamjob644384274506457532.jar tmpDir=null
19/06/21 14:03:28 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/21 14:03:28 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/21 14:03:29 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/21 14:03:29 INFO mapreduce.JobSubmitter: number of splits:2
19/06/21 14:03:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561031036568_0001
19/06/21 14:03:29 INFO impl.YarnClientImpl: Submitted application application_1561031036568_0001
19/06/21 14:03:29 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561031036568_0001/
19/06/21 14:03:29 INFO mapreduce.Job: Running job: job_1561031036568_0001
19/06/21 14:03:37 INFO mapreduce.Job: Job job_1561031036568_0001 running in uber mode : false
19/06/21 14:03:37 INFO mapreduce.Job:  map 0% reduce 0%
19/06/21 14:03:44 INFO mapreduce.Job:  map 50% reduce 0%
19/06/21 14:03:45 INFO mapreduce.Job:  map 100% reduce 0%
19/06/21 14:03:49 INFO mapreduce.Job:  map 100% reduce 100%
19/06/21 14:03:50 INFO mapreduce.Job: Job job_1561031036568_0001 completed successfully
19/06/21 14:03:50 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=3229845
        FILE: Number of bytes written=6791387
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=1900871
        HDFS: Number of bytes written=183609
        HDFS: Number of read operations=9
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=2
        Launched reduce tasks=1
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=9746
        Total time spent by all reduces in occupied slots (ms)=2902
        Total time spent by all map tasks (ms)=9746
        Total time spent by all reduce tasks (ms)=2902
        Total vcore-milliseconds taken by all map tasks=9746
        Total vcore-milliseconds taken by all reduce tasks=2902
        Total megabyte-milliseconds taken by all map tasks=9979904
        Total megabyte-milliseconds taken by all reduce tasks=2971648
    Map-Reduce Framework
        Map input records=8598
        Map output records=335454
        Map output bytes=2558931
        Map output materialized bytes=3229851
        Input split bytes=154
        Combine input records=0
        Combine output records=0
        Reduce input groups=16985
        Reduce shuffle bytes=3229851
        Reduce input records=335454
        Reduce output records=16984
        Spilled Records=670908
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=823
        CPU time spent (ms)=6340
        Physical memory (bytes) snapshot=709455872
        Virtual memory (bytes) snapshot=6344896512
        Total committed heap usage (bytes)=470810624
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=1900717
    File Output Format Counters 
        Bytes Written=183609
19/06/21 14:03:50 INFO streaming.StreamJob: Output directory: /output

查看执行后结果

[root@master mapreduce]# hadoop fs -ls /output
Found 2 items
-rw-r--r--   3 root supergroup          0 2019-06-21 14:03 /output/_SUCCESS
-rw-r--r--   3 root supergroup     183609 2019-06-21 14:03 /output/part-00000
[root@master mapreduce]# hadoop fs -text /output/part-00000 | sort -k2 -rn |head
[root@master mapreduce]# hadoop fs -text /output/part-00000 | sort -k2 -rn |head
the 15432
of  10221
to  8346
and 7719
a   7629
he  6417
his 5736
was 5106
in  5082
had 4578