MapReduce Streaming
1. streaming¶
http://hadoop.apache.org/docs/r1.2.1/streaming.html#Hadoop+Streaming
1.1 介绍¶
Hadoop streaming能够让Ruby、Python、PHP、和C++等非Java类语言编写的map或reduce程序在hadoop集群上运行,且map/reduce程序只要遵循从标准输入stdin,到标准输出stdout即可。
MapReduce
和HDFS
采用Java
实现,默认提供Java
编程接口Streaming
框架允许任何程序语言实现的程序在Hadoop MapReduce
中 使用Streaming
方便已有程序向Hadoop
平台移植
1.2 原理¶
1.3 优点¶
- 开发效率高
- 方便移植Hadoop平台,只需按照一定的格式从标准输入读取数据、向标以
- 原有的单机程序稍加改动就可以在Hadoop平台进行分布式处理
-
容易单机调试**cat input | mapper | sort | reducer > output**
-
程序运行效率
-
对于CPU密集的计算,有些语言如C/C++编写的程序可能比用Java编写的程序效率更高一些
-
便于平台进行资源控制
- Streaming框架中通过limit等方式可以灵活地限制应用程序使用的内存等资源
1.4 局限¶
- Streaming默认只能处理文本数据,如果要对二进制数据进行处理,比较好的方法是将二进
制的key和value进行base64的编码转化成文本即可
- 两次数据拷贝和解析(分割),带来一定的开销
1.5 工作流程¶
Hadoop Streaming的工作流程大概如下:
- hadoop-streaming.jar向Hadoop集群注册一个新的job,传入input path和output path等
- 开始mapper时,Hadoop Streaming会将输入文件按行传入stdin
- 我们自己编写的mapper需要读取stdin每一行,对其进行处理
- mapper处理完毕的中间结果也要写入stdout,在Python中print语句默认即输出到stdout,当然若是不放心,也可以手动指定输出流。对于输出到stdout中的每一行,hadoop将默认以’\t’作为分隔符切割成k-v的格式。
- mapper处理结束后,Hadoop 自动进行partition、sort、group,准备进行reduce任务
- Hadoop Streaming将中间结果按行传给reducer
- 我们编写的reducer需要读取stdin的每一行,对其进行处理
- 处理结束之后仍然输出到stdout中
- Hadoop 转存到output path中
1.6 Hadoop命令使用¶
参数 | 可选/必选 | 解释 |
---|---|---|
-input | 必选 | 输入文件路径 |
-output | 必选 | 输出文件路径 |
-mapper | 必选 | 用户自己写的mapper程序,可以是可执行文件或者脚本 |
-reducer | 必选 | 用户自己写的reducer程序,可以是可执行文件或者脚本 |
-file | 可选 | 打包文件到提交的作业中,可以是mapper 或者reducer要用的输入文件,如配置文件,字典等 |
-partitioner | 可选 | 用户自定义的partitioner程序 |
-combiner | 可选 | 用户自定义的combiner程序(必须用java实现) |
参数 | 可选/必选 | 参数描述 |
---|---|---|
-input 支持*通配符,指定多个文件或目录,多次-input,指定多个输入文件/目录 |
必选 | Mapper的输入数据,文件要在任务提交前手动上传到HDFS |
-output # 路径不能已存在,否则认为是其他job的输出 |
必选 | reducer输出结果的HDFS存放路径, 不能已存在,但脚本中一定要配置 |
-mapper <可执行命令或java类> -mapper “python map.py” -mapper “bash map.sh” -mapper “perl map.perl” |
必选 | Mapper程序 |
-reducer <可执行命令或java类> -reducer “python reducer.py” -reducer “bash reducer.sh” -reducer “perl reducer.sh” |
可选 | Reducer程序,不需要reduce处理就不指定 |
-combiner <可执行命令或java类> -combiner “python map.py” -combiner “bash map.sh” -combiner “perl map.perl” |
可选 | 处理mapper输出的combiner程序 |
-file <本地mapper、reducer程序文件、程序运行需要的其他文件> -file map.py -file reduce.py -file white_list |
可选 文件在本地,小文件 | 将本地文件分发给计算节点 文件作为作业的一部分,一起被打包并提交,所有分发的文件最终会被放置在datanode该job的同一个专属目录下:jobcache/job_xxx/jar |
-cacheFile hdfs://master:9000/cachefile_dir/white_list | 分发HDFS文件Job运行需要的程序,辅助文件都先放到HDFS上,指定HDFS文件路径,将HDFS文件拷贝到计算节点,也是都放置在job的同一个专属目录下:jobcache/job_xxx/jar | |
-cacheArchive “hdfs://master:9000/w.tar.gz#WLDIR” | 分发HDFS压缩文件、压缩文件内部具有目录结构 | |
-numReduceTasks <数字> -numReduceTasks 2 | 可选 | 指定该任务的reducer个数 |
-inputformat |
可选 | 指定自己定义的inputformat类,默认TextInputformat类 |
-outputformat |
可选 | 指定自己定义的outputformat类,默认TextOutputformat类 |
-cmdenv name=value | 可选 | 传递给streaming命令的环境变量 |
2. 实验¶
2.1 统词¶
Hadoop是使用Java语言编写的,所以最直接的方式的就是使用Java语言来实现Mapper和Reducer,然后配置MapReduce Job,提交到集群计算环境来完成计算。但是很多开发者可能对Java并不熟悉,而是对一些具有脚本特性的语言,如C++、Shell、Python、 Ruby、PHP、Perl有实际开发经验,Hadoop Streaming为这一类开发者提供了使用Hadoop集群来进行处理数据的工具,即工具包hadoop-streaming-*.jar。
Hadoop Streaming使用了Unix的标准输入输出作为Hadoop和其他编程语言的开发接口,因此在其他的编程语言所写的程序中,只需要将标准输入作为程 序的输入,将标准输出作为程序的输出就可以了。在标准的输入输出中,Key和Value是以Tab作为分隔符,并且在Reducer的标准输入中,Hadoop框架保证了输入的数据是经过了按Key排序的。
如何使用Hadoop Streaming工具呢?我们可以查看该工具的使用方法,通过命令行来获取,如下所示
[root@master ~]# hadoop jar /usr/local/hadoop-2.6.5/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.5.jar An example program must be given as the first argument. Valid program names are: aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files. aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files. bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi. dbcount: An example job that count the pageview counts from a database. distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi. grep: A map/reduce program that counts the matches of a regex in the input. join: A job that effects a join over sorted, equally partitioned datasets multifilewc: A job that counts words from several files. pentomino: A map/reduce tile laying program to find solutions to pentomino problems. pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method. randomtextwriter: A map/reduce program that writes 10GB of random textual data per node. randomwriter: A map/reduce program that writes 10GB of random data per node. secondarysort: An example defining a secondary sort to the reduce. sort: A map/reduce program that sorts the data written by the random writer. sudoku: A sudoku solver. teragen: Generate data for the terasort terasort: Run the terasort teravalidate: Checking results of terasort wordcount: A map/reduce program that counts the words in the input files. wordmean: A map/reduce program that counts the average length of the words in the input files. wordmedian: A map/reduce program that counts the median length of the words in the input files. wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files.
[root@master ~]# mkdir -p /usr/local/src/tmp/ [root@master ~]# cd /usr/local/src/tmp/ [root@master tmp]# cp /etc/hosts /etc/passwd . [root@master tmp]# hadoop fs -ls / Found 8 items drwxr-xr-x - root supergroup 0 2019-06-24 11:01 /hbase -rw-r--r-- 3 root supergroup 225 2019-06-23 17:13 /hosts drwxr-xr-x - root supergroup 0 2019-06-26 20:22 /inputdata drwxr-xr-x - root supergroup 0 2019-06-26 20:21 /outputdata drwxr-xr-x - root supergroup 0 2019-06-26 20:26 /outputdata1 drwxr-xr-x - root supergroup 0 2019-06-26 20:27 /outputdata2 drwx-wx-wx - root supergroup 0 2019-06-26 20:23 /tmp drwxr-xr-x - root supergroup 0 2019-06-27 20:55 /user [root@master tmp]# hadoop fs -mkdir /streaming [root@master tmp]# hadoop fs -put /etc/hosts /etc/passwd /streaming [root@master tmp]# hadoop fs -ls /streaming Found 2 items -rw-r--r-- 3 root supergroup 225 2019-06-29 15:56 /streaming/hosts -rw-r--r-- 3 root supergroup 997 2019-06-29 15:56 /streaming/passwd [root@master tmp]#
[root@master tmp]# cat word_count_mapper.py #!/usr/bin/env python import sys for line in sys.stdin: line = line.strip() words = filter(lambda word: word, line.split()) for word in words: print '%s\t%s' % (word, 1)
[root@master tmp]# cat word_count_reducer.py #!/usr/bin/env python import sys from operator import itemgetter wc_dict = {} for line in sys.stdin: line = line.strip() word, count = line.split() try: count = int(count) wc_dict[word] = wc_dict.get(word, 0) + count except ValueError: pass sorted_dict = sorted(wc_dict.items(), key=itemgetter(0)) for word, count in sorted_dict: print '%s\t%s' % (word, count)
[root@master tmp]# cat run.sh HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar" $HADOOP_CMD jar $STREAM_JAR_PATH \ -input /streaming/ \ -output /streaming_output \ -mapper word_count_mapper.py \ -reducer word_count_reducer.py \ -numReduceTasks 2 \ -file *.py
手动测试
[root@master tmp]# cat hosts | python word_count_mapper.py 127.0.0.1 1 localhost 1 localhost.localdomain 1 localhost4 1 localhost4.localdomain4 1 ::1 1 localhost 1 localhost.localdomain 1 localhost6 1 localhost6.localdomain6 1 192.168.186.10 1 master 1 192.168.186.11 1 slave1 1 192.168.186.12 1 slave2 1 [root@master tmp]# cat hosts | python word_count_mapper.py |python word_count_reducer.py 127.0.0.1 1 192.168.186.10 1 192.168.186.11 1 192.168.186.12 1 ::1 1 localhost 2 localhost.localdomain 2 localhost4 1 localhost4.localdomain4 1 localhost6 1 localhost6.localdomain6 1 master 1 slave1 1 slave2 1
手动测试mapreduce成功后,在使用hadoop测试运行
hadoop 运行
[root@master tmp]# sh run.sh 19/06/29 16:05:33 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. packageJobJar: [word_count_mapper.py, word_count_reducer.py, /tmp/hadoop-unjar3187438588282227932/] [] /tmp/streamjob6299520794908170392.jar tmpDir=null 19/06/29 16:05:34 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/29 16:05:34 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/29 16:05:35 INFO mapred.FileInputFormat: Total input paths to process : 2 19/06/29 16:05:35 INFO mapreduce.JobSubmitter: number of splits:3 19/06/29 16:05:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561552001692_0003 19/06/29 16:05:35 INFO impl.YarnClientImpl: Submitted application application_1561552001692_0003 19/06/29 16:05:35 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561552001692_0003/ 19/06/29 16:05:35 INFO mapreduce.Job: Running job: job_1561552001692_0003 19/06/29 16:05:45 INFO mapreduce.Job: Job job_1561552001692_0003 running in uber mode : false 19/06/29 16:05:45 INFO mapreduce.Job: map 0% reduce 0% 19/06/29 16:05:56 INFO mapreduce.Job: map 100% reduce 0% 19/06/29 16:06:03 INFO mapreduce.Job: map 100% reduce 50% 19/06/29 16:06:04 INFO mapreduce.Job: map 100% reduce 100% 19/06/29 16:06:04 INFO mapreduce.Job: Job job_1561552001692_0003 completed successfully 19/06/29 16:06:04 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=1415 FILE: Number of bytes written=556266 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=1868 HDFS: Number of bytes written=1271 HDFS: Number of read operations=15 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Job Counters Launched map tasks=3 Launched reduce tasks=2 Data-local map tasks=3 Total time spent by all maps in occupied slots (ms)=28325 Total time spent by all reduces in occupied slots (ms)=9302 Total time spent by all map tasks (ms)=28325 Total time spent by all reduce tasks (ms)=9302 Total vcore-milliseconds taken by all map tasks=28325 Total vcore-milliseconds taken by all reduce tasks=9302 Total megabyte-milliseconds taken by all map tasks=29004800 Total megabyte-milliseconds taken by all reduce tasks=9525248 Map-Reduce Framework Map input records=28 Map output records=48 Map output bytes=1307 Map output materialized bytes=1439 Input split bytes=260 Combine input records=0 Combine output records=0 Reduce input groups=46 Reduce shuffle bytes=1439 Reduce input records=48 Reduce output records=46 Spilled Records=96 Shuffled Maps =6 Failed Shuffles=0 Merged Map outputs=6 GC time elapsed (ms)=1527 CPU time spent (ms)=11410 Physical memory (bytes) snapshot=1034854400 Virtual memory (bytes) snapshot=10570633216 Total committed heap usage (bytes)=756547584 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=1608 File Output Format Counters Bytes Written=1271 19/06/29 16:06:04 INFO streaming.StreamJob: Output directory: /streaming_output
检查
[root@master tmp]# hadoop fs -ls / Found 10 items drwxr-xr-x - root supergroup 0 2019-06-24 11:01 /hbase -rw-r--r-- 3 root supergroup 225 2019-06-23 17:13 /hosts drwxr-xr-x - root supergroup 0 2019-06-26 20:22 /inputdata drwxr-xr-x - root supergroup 0 2019-06-26 20:21 /outputdata drwxr-xr-x - root supergroup 0 2019-06-26 20:26 /outputdata1 drwxr-xr-x - root supergroup 0 2019-06-26 20:27 /outputdata2 drwxr-xr-x - root supergroup 0 2019-06-29 15:56 /streaming drwxr-xr-x - root supergroup 0 2019-06-29 16:06 /streaming_output drwx-wx-wx - root supergroup 0 2019-06-26 20:23 /tmp drwxr-xr-x - root supergroup 0 2019-06-27 20:55 /user [root@master tmp]# hadoop fs -ls /streaming_output Found 3 items -rw-r--r-- 3 root supergroup 0 2019-06-29 16:06 /streaming_output/_SUCCESS -rw-r--r-- 3 root supergroup 737 2019-06-29 16:06 /streaming_output/part-00000 -rw-r--r-- 3 root supergroup 534 2019-06-29 16:06 /streaming_output/part-00001
查看内容
[root@master tmp]# hadoop fs -text /streaming_output/part-00000 127.0.0.1 1 192.168.186.10 1 192.168.186.12 1 ::1 1 SSH:/var/empty/sshd:/sbin/nologin 1 bus:/:/sbin/nologin 1 chrony:x:998:996::/var/lib/chrony:/sbin/nologin 1 daemon:x:2:2:daemon:/sbin:/sbin/nologin 1 dbus:x:81:81:System 1 for 1 games:x:12:100:games:/usr/games:/sbin/nologin 1 halt:x:7:0:halt:/sbin:/sbin/halt 1 localhost 2 localhost4 1 localhost6 1 mail:x:8:12:mail:/var/spool/mail:/sbin/nologin 1 message 1 nobody:x:99:99:Nobody:/:/sbin/nologin 1 operator:x:11:0:operator:/root:/sbin/nologin 1 polkitd:x:999:998:User 1 postfix:x:89:89::/var/spool/postfix:/sbin/nologin 1 shutdown:x:6:0:shutdown:/sbin:/sbin/shutdown 1 slave2 1 sshd:x:74:74:Privilege-separated 1 systemd-network:x:192:192:systemd 1 user:/run/saslauthd:/sbin/nologin 1 [root@master tmp]# hadoop fs -text /streaming_output/part-00001 192.168.186.11 1 Management:/:/sbin/nologin 1 Network 1 Server:/var/lib/mysql:/bin/false 1 User:/var/ftp:/sbin/nologin 1 adm:x:3:4:adm:/var/adm:/sbin/nologin 1 bin:x:1:1:bin:/bin:/sbin/nologin 1 ftp:x:14:50:FTP 1 localhost.localdomain 2 localhost4.localdomain4 1 localhost6.localdomain6 1 lp:x:4:7:lp:/var/spool/lpd:/sbin/nologin 1 master 1 mysql:x:27:27:MySQL 1 ntp:x:38:38::/etc/ntp:/sbin/nologin 1 polkitd:/:/sbin/nologin 1 root:x:0:0:root:/root:/bin/bash 1 saslauth:x:997:76:Saslauthd 1 slave1 1 sync:x:5:0:sync:/sbin:/bin/sync 1
2.2 file¶
如果程序运行所需要的可执行文件、脚本或者配置文件在Hadoop集群的计算节点上不存在,则首先需要将这些文件分发到集群上才能成功进行计算。Hadoop提供了自动分发文件和压缩包的机制,只需要在启动Streaming作业时配置相应的参数。
- 如果要分发的文件在本地且没有目录结构,可以使用-file /path/to/FILENAME选项
分发文件,将本地文件/path/to/FILENAME分发到每个计算节点。
-
在Streaming程序中通过./FILENAME就可以访问该文件
-
对于本地可执行的文件,除了指定的mapper或reducer程序外,可能分发后没有可
执行权限,所以需要在包装程序如mapper.sh中运行chmod +x ./FILENAME设置
可执行权限,然后设置-mapper “mapper.sh”。
- 顺便注意下 ./mapred/local/taskTracker/root/jobcache/job_201704060437_xxxx这个目录
指定计算白名单内单词的wordcount
map.py
#!/usr/bin/python import sys import time def read_local_file_func(f): word_set = set() file_in = open(f, 'r') for line in file_in: word = line.strip() word_set.add(word) return word_set def mapper_func(white_list_fd): word_set = read_local_file_func(white_list_fd) for line in sys.stdin: ss = line.strip().split(' ') for s in ss: time.sleep(100) print "====" word = s.strip() if word != "" and (word in word_set): #print s + "\t" + "1" #print '\t'.join([s, "1"]) print "%s\t%s" % (s, 1) if __name__ == "__main__": module = sys.modules[__name__] func = getattr(module, sys.argv[1]) args = None if len(sys.argv) > 1: args = sys.argv[2:] func(*args)
red.py
#!/usr/bin/python import sys def reduer_func(): current_word = None count_pool = [] sum = 0 for line in sys.stdin: word, val = line.strip().split('\t') if current_word == None: current_word = word if current_word != word: for count in count_pool: sum += count print "%s\t%s" % (current_word, sum) current_word = word count_pool = [] sum = 0 count_pool.append(int(val)) for count in count_pool: sum += count print "%s\t%s" % (current_word, str(sum)) if __name__ == "__main__": module = sys.modules[__name__] func = getattr(module, sys.argv[1]) args = None if len(sys.argv) > 1: args = sys.argv[2:] func(*args)
run.sh
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar" INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output_file_broadcast" $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH # Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_1 \ -output $OUTPUT_PATH \ -mapper "python map.py mapper_func white_list" \ -reducer "python red.py reduer_func" \ -jobconf "mapred.reduce.tasks=3" \ -file ./map.py \ -file ./red.py \ -file ./white_list #$HADOOP_CMD jar $STREAM_JAR_PATH -input $INPUT_FILE_PATH_1 -output $OUTPUT_PATH -mapper "python map.py mapp # er_func white_list" -reducer "python red.py reduer_func" -jobconf "mapred.reduce.tasks=3" -file ./map.py -file ./red.py -file ./white_list
white_list
suitable against recent
手动测试
[root@master mr_file_broadcast]# cat The_Man_of_Property.txt | python map.py mapper_func white_list | sort -k1 | python red.py reduer_func > cmz.out [root@master mr_file_broadcast]# cat cmz.out against 93 recent 2 suitable 2
上传 [root@master mr_file_broadcast]# hadoop fs -ls / Found 4 items -rw-r--r-- 3 root supergroup 1896621 2019-06-21 13:54 /1.data drwxr-xr-x - root supergroup 0 2019-06-21 14:03 /output -rw-r--r-- 3 root supergroup 944 2019-06-20 19:50 /passwd drwx------ - root supergroup 0 2019-06-21 14:03 /tmp [root@master mr_file_broadcast]# hadoop fs -put The_Man_of_Property.txt / [root@master mr_file_broadcast]# hadoop fs -ls / Found 5 items -rw-r--r-- 3 root supergroup 1896621 2019-06-21 13:54 /1.data -rw-r--r-- 3 root supergroup 632207 2019-06-21 21:11 /The_Man_of_Property.txt drwxr-xr-x - root supergroup 0 2019-06-21 14:03 /output -rw-r--r-- 3 root supergroup 944 2019-06-20 19:50 /passwd drwx------ - root supergroup 0 2019-06-21 14:03 /tmp
hadoop测试
[root@master mr_file_broadcast]# bash run.sh rmr: DEPRECATED: Please use 'rm -r' instead. rmr: `/output_file_broadcast': No such file or directory 19/06/21 21:11:55 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. 19/06/21 21:11:55 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead. 19/06/21 21:11:55 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces packageJobJar: [./map.py, ./red.py, ./white_list, /tmp/hadoop-unjar5356561627923561485/] [] /tmp/streamjob514955250889635986.jar tmpDir=null 19/06/21 21:11:55 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/21 21:11:55 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/21 21:11:56 INFO mapred.FileInputFormat: Total input paths to process : 1 19/06/21 21:11:56 INFO mapreduce.JobSubmitter: number of splits:2 19/06/21 21:11:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561122558196_0002 19/06/21 21:11:57 INFO impl.YarnClientImpl: Submitted application application_1561122558196_0002 19/06/21 21:11:57 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561122558196_0002/ 19/06/21 21:11:57 INFO mapreduce.Job: Running job: job_1561122558196_0002 19/06/21 21:12:03 INFO mapreduce.Job: Job job_1561122558196_0002 running in uber mode : false 19/06/21 21:12:03 INFO mapreduce.Job: map 0% reduce 0% 19/06/21 21:12:09 INFO mapreduce.Job: map 100% reduce 0% 19/06/21 21:12:17 INFO mapreduce.Job: map 100% reduce 67% 19/06/21 21:12:18 INFO mapreduce.Job: map 100% reduce 100% 19/06/21 21:12:18 INFO mapreduce.Job: Job job_1561122558196_0002 completed successfully 19/06/21 21:12:18 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=1182 FILE: Number of bytes written=556758 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=636491 HDFS: Number of bytes written=31 HDFS: Number of read operations=15 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Launched map tasks=2 Launched reduce tasks=3 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=8635 Total time spent by all reduces in occupied slots (ms)=16566 Total time spent by all map tasks (ms)=8635 Total time spent by all reduce tasks (ms)=16566 Total vcore-milliseconds taken by all map tasks=8635 Total vcore-milliseconds taken by all reduce tasks=16566 Total megabyte-milliseconds taken by all map tasks=8842240 Total megabyte-milliseconds taken by all reduce tasks=16963584 Map-Reduce Framework Map input records=2866 Map output records=97 Map output bytes=970 Map output materialized bytes=1200 Input split bytes=188 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=1200 Reduce input records=97 Reduce output records=3 Spilled Records=194 Shuffled Maps =6 Failed Shuffles=0 Merged Map outputs=6 GC time elapsed (ms)=1042 CPU time spent (ms)=9350 Physical memory (bytes) snapshot=1057755136 Virtual memory (bytes) snapshot=10571837440 Total committed heap usage (bytes)=654311424 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=636303 File Output Format Counters Bytes Written=31 19/06/21 21:12:18 INFO streaming.StreamJob: Output directory: /output_file_broadcast [root@master mr_file_broadcast]# hadoop fs -ls / Found 6 items -rw-r--r-- 3 root supergroup 1896621 2019-06-21 13:54 /1.data -rw-r--r-- 3 root supergroup 632207 2019-06-21 21:11 /The_Man_of_Property.txt drwxr-xr-x - root supergroup 0 2019-06-21 14:03 /output drwxr-xr-x - root supergroup 0 2019-06-21 21:12 /output_file_broadcast -rw-r--r-- 3 root supergroup 944 2019-06-20 19:50 /passwd drwx------ - root supergroup 0 2019-06-21 14:03 /tmp [root@master mr_file_broadcast]# hadoop fs -ls /output_file_broadcast/ Found 4 items -rw-r--r-- 3 root supergroup 0 2019-06-21 21:12 /output_file_broadcast/_SUCCESS -rw-r--r-- 3 root supergroup 22 2019-06-21 21:12 /output_file_broadcast/part-00000 -rw-r--r-- 3 root supergroup 9 2019-06-21 21:12 /output_file_broadcast/part-00001 -rw-r--r-- 3 root supergroup 0 2019-06-21 21:12 /output_file_broadcast/part-00002 [root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/part-00000 against 93 suitable 2 [root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/part-00001 recent 2 [root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/part-00002
可以到其他机器上查看执行的任务,除了web界面yarn可以查看,也通过在系统中查看。
查看执行过程中任务
[root@slave1 tmp]# pwd /usr/local/hadoop-2.6.5/tmp [root@slave1 tmp]# tree . . └── nm-local-dir ├── filecache ├── nmPrivate │ └── application_1561122558196_0003 │ ├── container_1561122558196_0003_01_000001 │ │ ├── container_1561122558196_0003_01_000001.pid │ │ ├── container_1561122558196_0003_01_000001.tokens │ │ └── launch_container.sh │ ├── container_1561122558196_0003_01_000002 │ │ ├── container_1561122558196_0003_01_000002.pid │ │ ├── container_1561122558196_0003_01_000002.tokens │ │ └── launch_container.sh │ └── container_1561122558196_0003_01_000003 │ ├── container_1561122558196_0003_01_000003.pid │ ├── container_1561122558196_0003_01_000003.tokens │ └── launch_container.sh └── usercache └── root ├── appcache │ └── application_1561122558196_0003 │ ├── container_1561122558196_0003_01_000001 │ │ ├── container_tokens │ │ ├── default_container_executor_session.sh │ │ ├── default_container_executor.sh │ │ ├── job.jar -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/11/job.jar │ │ ├── jobSubmitDir │ │ │ ├── job.split -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/12/job.split │ │ │ └── job.splitmetainfo -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/10/job.splitmetainfo │ │ ├── job.xml -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/13/job.xml │ │ ├── launch_container.sh │ │ ├── map.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/13/map.py │ │ ├── red.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/15/red.py │ │ ├── tmp │ │ └── white_list -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/14/white_list │ ├── container_1561122558196_0003_01_000002 │ │ ├── container_tokens │ │ ├── default_container_executor_session.sh │ │ ├── default_container_executor.sh │ │ ├── job.jar -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/11/job.jar │ │ ├── job.xml │ │ ├── launch_container.sh │ │ ├── map.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/13/map.py │ │ ├── red.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/15/red.py │ │ ├── tmp │ │ └── white_list -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/14/white_list │ ├── container_1561122558196_0003_01_000003 │ │ ├── container_tokens │ │ ├── default_container_executor_session.sh │ │ ├── default_container_executor.sh │ │ ├── job.jar -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/11/job.jar │ │ ├── job.xml │ │ ├── launch_container.sh │ │ ├── map.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/13/map.py │ │ ├── red.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/15/red.py │ │ ├── tmp │ │ └── white_list -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/14/white_list │ ├── filecache │ │ ├── 10 │ │ │ └── job.splitmetainfo │ │ ├── 11 │ │ │ └── job.jar │ │ │ └── job.jar │ │ ├── 12 │ │ │ └── job.split │ │ └── 13 │ │ └── job.xml │ └── work └── filecache ├── 10 │ └── map.py ├── 11 │ └── white_list ├── 12 │ └── red.py ├── 13 │ └── map.py ├── 14 │ └── white_list └── 15 └── red.py 35 directories, 45 files
2.3 cachefile¶
指定计算白名单内单词的wordcount,如果文件(如字典文件)存放在HDFS中,希望计算时在每个计算节点上将 文件当作本地文件处理,可以使用-cacheFile,hdfs://host:port/path/to/file#linkname选项在计算节点缓存文件, Streaming程序通过./linkname访问文件。
[root@master mr_cachefile_broadcast]# pwd /root/mr_cachefile_broadcast [root@master mr_cachefile_broadcast]# hadoop fs -put white_list / [root@master mr_cachefile_broadcast]# hadoop fs -ls / Found 7 items -rw-r--r-- 3 root supergroup 1896621 2019-06-21 13:54 /1.data -rw-r--r-- 3 root supergroup 632207 2019-06-21 21:11 /The_Man_of_Property.txt drwxr-xr-x - root supergroup 0 2019-06-21 14:03 /output drwxr-xr-x - root supergroup 0 2019-06-21 21:38 /output_file_broadcast -rw-r--r-- 3 root supergroup 944 2019-06-20 19:50 /passwd drwx------ - root supergroup 0 2019-06-21 14:03 /tmp -rw-r--r-- 3 root supergroup 24 2019-06-21 21:57 /white_list
map.py
#!/usr/bin/python import sys def read_local_file_func(f): word_set = set() file_in = open(f, 'r') for line in file_in: word = line.strip() word_set.add(word) return word_set def mapper_func(white_list_fd): word_set = read_local_file_func(white_list_fd) for line in sys.stdin: ss = line.strip().split(' ') for s in ss: word = s.strip() if word != "" and (word in word_set): print "%s\t%s" % (s, 1) if __name__ == "__main__": module = sys.modules[__name__] func = getattr(module, sys.argv[1]) args = None if len(sys.argv) > 1: args = sys.argv[2:] func(*args)
red.py
#!/usr/bin/python import sys def reduer_func(): current_word = None count_pool = [] sum = 0 for line in sys.stdin: word, val = line.strip().split('\t') if current_word == None: current_word = word if current_word != word: for count in count_pool: sum += count print "%s\t%s" % (current_word, sum) current_word = word count_pool = [] sum = 0 count_pool.append(int(val)) for count in count_pool: sum += count print "%s\t%s" % (current_word, str(sum)) if __name__ == "__main__": module = sys.modules[__name__] func = getattr(module, sys.argv[1]) args = None if len(sys.argv) > 1: args = sys.argv[2:] func(*args)
run.sh
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar" INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output_cachefile_broadcast" $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH # Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_1 \ -output $OUTPUT_PATH \ -mapper "python map.py mapper_func ABC" \ -reducer "python red.py reduer_func" \ -jobconf "mapred.reduce.tasks=2" \ -jobconf "mapred.job.name=cachefile_demo" \ -cacheFile "hdfs://master:9000/white_list#ABC" \ -file "./map.py" \ -file "./red.py" #-cacheFile "$HDFS_FILE_PATH#WH" \
hdfs://master:9000/white_list#ABC,期中#后面的ABC就是表示white_list,上面调用的时候,直接使用ABC替代,ABC可以随便命名最好顾名思义。
hadoop运行
[root@master mr_cachefile_broadcast]# bash run.sh rmr: DEPRECATED: Please use 'rm -r' instead. rmr: `/output_cachefile_broadcast': No such file or directory 19/06/21 22:08:16 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. 19/06/21 22:08:16 WARN streaming.StreamJob: -cacheFile option is deprecated, please use -files instead. 19/06/21 22:08:16 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead. 19/06/21 22:08:16 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces 19/06/21 22:08:16 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name packageJobJar: [./map.py, ./red.py, /tmp/hadoop-unjar8224574872481683209/] [] /tmp/streamjob7921590089314550150.jar tmpDir=null 19/06/21 22:08:17 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/21 22:08:17 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/21 22:08:17 INFO mapred.FileInputFormat: Total input paths to process : 1 19/06/21 22:08:17 INFO mapreduce.JobSubmitter: number of splits:2 19/06/21 22:08:18 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561122558196_0005 19/06/21 22:08:18 INFO impl.YarnClientImpl: Submitted application application_1561122558196_0005 19/06/21 22:08:18 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561122558196_0005/ 19/06/21 22:08:18 INFO mapreduce.Job: Running job: job_1561122558196_0005 19/06/21 22:20:45 INFO mapreduce.Job: Job job_1561122558196_0005 running in uber mode : false 19/06/21 22:20:45 INFO mapreduce.Job: map 0% reduce 0% 19/06/21 22:20:49 INFO mapreduce.Job: map 100% reduce 0% 19/06/21 22:20:55 INFO mapreduce.Job: map 100% reduce 100% 19/06/21 22:20:55 INFO mapreduce.Job: Job job_1561122558196_0005 completed successfully 19/06/21 22:20:56 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=1176 FILE: Number of bytes written=445494 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=636491 HDFS: Number of bytes written=31 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Job Counters Launched map tasks=2 Launched reduce tasks=2 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=5072 Total time spent by all reduces in occupied slots (ms)=6478 Total time spent by all map tasks (ms)=5072 Total time spent by all reduce tasks (ms)=6478 Total vcore-milliseconds taken by all map tasks=5072 Total vcore-milliseconds taken by all reduce tasks=6478 Total megabyte-milliseconds taken by all map tasks=5193728 Total megabyte-milliseconds taken by all reduce tasks=6633472 Map-Reduce Framework Map input records=2866 Map output records=97 Map output bytes=970 Map output materialized bytes=1188 Input split bytes=188 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=1188 Reduce input records=97 Reduce output records=3 Spilled Records=194 Shuffled Maps =4 Failed Shuffles=0 Merged Map outputs=4 GC time elapsed (ms)=268 CPU time spent (ms)=3520 Physical memory (bytes) snapshot=880959488 Virtual memory (bytes) snapshot=8461725696 Total committed heap usage (bytes)=558891008 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=636303 File Output Format Counters Bytes Written=31 19/06/21 22:20:56 INFO streaming.StreamJob: Output directory: /output_cachefile_broadcast
2.4 cacheArchive¶
如果要分发的文件有目录结构,可以先将整个目录打包,然后上传到HDFS,再用-cacheArchive
, hdfs://host:port/path/to/archivefile#linkname
分发压缩包。
[root@master tp2]# tree . . ├── map.py ├── red.py ├── run.sh └── tmp ├── white_list_1 └── white_list_2 1 directory, 5 files [root@master tmp]# cat white_list_1 the man give [root@master tmp]# cat white_list_2 have big [root@master tmp]# tar zcf white_list.tar.gz * [root@master tmp]# ls white_list_1 white_list_2 white_list.tar.gz [root@master tmp]# hadoop fs -put white_list.tar.gz / [root@master tmp]# hadoop fs -ls /white_list.tar.gz -rw-r--r-- 3 root supergroup 159 2019-06-29 21:00 /white_list.tar.gz [root@master tmp]# cd .. [root@master tp2]# ls map.py red.py run.sh tmp
map.py
#!/usr/bin/python import os import sys import gzip import time def get_file_handler(f): file_in = open(f, 'r') return file_in def get_cachefile_handlers(f): f_handlers_list = [] if os.path.isdir(f): for fd in os.listdir(f): f_handlers_list.append(get_file_handler(f + '/' + fd)) return f_handlers_list def read_local_file_func(f): word_set = set() for cachefile in get_cachefile_handlers(f): for line in cachefile: word = line.strip() word_set.add(word) return word_set def mapper_func(white_list_fd): word_set = read_local_file_func(white_list_fd) for line in sys.stdin: #time.sleep(100) ss = line.strip().split(' ') for s in ss: word = s.strip() if word != "" and (word in word_set): print "%s\t%s" % (s, 1) if __name__ == "__main__": module = sys.modules[__name__] func = getattr(module, sys.argv[1]) args = None if len(sys.argv) > 1: args = sys.argv[2:] func(*args)
red.py
#!/usr/bin/python import sys def reduer_func(): current_word = None count_pool = [] sum = 0 for line in sys.stdin: word, val = line.strip().split('\t') if current_word == None: current_word = word if current_word != word: for count in count_pool: sum += count print "%s\t%s" % (current_word, sum) current_word = word count_pool = [] sum = 0 count_pool.append(int(val)) for count in count_pool: sum += count print "%s\t%s" % (current_word, str(sum)) if __name__ == "__main__": module = sys.modules[__name__] func = getattr(module, sys.argv[1]) args = None if len(sys.argv) > 1: args = sys.argv[2:] func(*args)
run.sh
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar" INPUT_FILE_PATH_1="/cachefilearchive.txt" OUTPUT_PATH="/mp_output_cachearchive_broadcast" $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH # Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_1 \ -output $OUTPUT_PATH \ -mapper "python map.py mapper_func WH.gz" \ -reducer "python red.py reduer_func" \ -jobconf "mapred.reduce.tasks=2" \ -jobconf "mapred.job.name=cache_file_archive_demo" \ -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \ -file "./map.py" \ -file "./red.py"
解释
cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" mapper "python map.py mapper_func WH.gz" 1. 期中WH.gz 就表示#前面的文件,就是个别名,理论上随便你起,最好顾名思义 2. 调用 WH.gz,也就是调用hdfs://master:9000/w.tar.gz
hadoop 运行
[root@master tp2]# sh run.sh rmr: DEPRECATED: Please use 'rm -r' instead. Deleted /mp_output_cachearchive_broadcast 19/06/29 22:29:21 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. 19/06/29 22:29:22 WARN streaming.StreamJob: -cacheArchive option is deprecated, please use -archives instead. 19/06/29 22:29:22 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead. 19/06/29 22:29:22 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces 19/06/29 22:29:22 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name packageJobJar: [./map.py, ./red.py, /tmp/hadoop-unjar4879077363844241273/] [] /tmp/streamjob4932232602831946314.jar tmpDir=null 19/06/29 22:29:22 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/29 22:29:22 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/29 22:29:23 INFO mapred.FileInputFormat: Total input paths to process : 1 19/06/29 22:29:23 INFO mapreduce.JobSubmitter: number of splits:2 19/06/29 22:29:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0003 19/06/29 22:29:23 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0003 19/06/29 22:29:23 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0003/ 19/06/29 22:29:23 INFO mapreduce.Job: Running job: job_1561818301923_0003 19/06/29 22:29:28 INFO mapreduce.Job: Job job_1561818301923_0003 running in uber mode : false 19/06/29 22:29:28 INFO mapreduce.Job: map 0% reduce 0% 19/06/29 22:29:33 INFO mapreduce.Job: map 100% reduce 0% 19/06/29 22:29:40 INFO mapreduce.Job: map 100% reduce 50% 19/06/29 22:29:41 INFO mapreduce.Job: map 100% reduce 100% 19/06/29 22:29:41 INFO mapreduce.Job: Job job_1561818301923_0003 completed successfully 19/06/29 22:29:41 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=181 FILE: Number of bytes written=446268 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=460 HDFS: Number of bytes written=26 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Job Counters Killed map tasks=1 Launched map tasks=2 Launched reduce tasks=2 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=5701 Total time spent by all reduces in occupied slots (ms)=8328 Total time spent by all map tasks (ms)=5701 Total time spent by all reduce tasks (ms)=8328 Total vcore-milliseconds taken by all map tasks=5701 Total vcore-milliseconds taken by all reduce tasks=8328 Total megabyte-milliseconds taken by all map tasks=5837824 Total megabyte-milliseconds taken by all reduce tasks=8527872 Map-Reduce Framework Map input records=4 Map output records=20 Map output bytes=129 Map output materialized bytes=193 Input split bytes=182 Combine input records=0 Combine output records=0 Reduce input groups=4 Reduce shuffle bytes=193 Reduce input records=20 Reduce output records=4 Spilled Records=40 Shuffled Maps =4 Failed Shuffles=0 Merged Map outputs=4 GC time elapsed (ms)=251 CPU time spent (ms)=6840 Physical memory (bytes) snapshot=832233472 Virtual memory (bytes) snapshot=8458309632 Total committed heap usage (bytes)=553123840 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=278 File Output Format Counters Bytes Written=26 19/06/29 22:29:41 INFO streaming.StreamJob: Output directory: /mp_output_cachearchive_broadcast
杀死MapReduce进程
因为是后台启动,虽然脚本停止了,但是后台还在运行。 [root@master tp2]# bash run.sh rmr: DEPRECATED: Please use 'rm -r' instead. Deleted /mp_output_cachearchive_broadcast 19/06/29 22:38:16 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. 19/06/29 22:38:16 WARN streaming.StreamJob: -cacheArchive option is deprecated, please use -archives instead. 19/06/29 22:38:16 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead. 19/06/29 22:38:16 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces 19/06/29 22:38:16 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name packageJobJar: [./map.py, ./red.py, /tmp/hadoop-unjar5517120324769277489/] [] /tmp/streamjob7237556585252410281.jar tmpDir=null 19/06/29 22:38:16 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/29 22:38:17 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/29 22:38:17 INFO mapred.FileInputFormat: Total input paths to process : 1 19/06/29 22:38:17 INFO mapreduce.JobSubmitter: number of splits:2 19/06/29 22:38:18 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0004 19/06/29 22:38:18 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0004 19/06/29 22:38:18 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0004/ 19/06/29 22:38:18 INFO mapreduce.Job: Running job: job_1561818301923_0004 19/06/29 22:38:23 INFO mapreduce.Job: Job job_1561818301923_0004 running in uber mode : false 19/06/29 22:38:23 INFO mapreduce.Job: map 0% reduce 0% 19/06/29 22:38:33 INFO mapreduce.Job: map 67% reduce 0% ^C[root@master tp2]# hadoop job -kill job_1561818301923_0004 DEPRECATED: Use of this script to execute mapred command is deprecated. Instead use the mapred command for it. 19/06/29 22:40:32 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 Killed job job_1561818301923_0004
2.5 compression¶
- 输出数据量较大时,可以使用Hadoop提供的压缩机制对数据进行压缩,减少网络传输带宽和存储的消耗。
- 可以指定对map的输出也就是中间结果进行压缩
- 可以指定对reduce的输出也就是最终输出进行压缩
- 可以指定是否压缩以及采用哪种压缩方式。
- 对map输出进行压缩主要是为了减少shuffle过程中网络传输数据量
- 对reduce输出进行压缩主要是减少输出结果占用的HDFS存储。
[root@master mr_compression]# ls map.py red.py run_2.sh run.sh
map.py
#!/usr/bin/python import os import sys import gzip def get_file_handler(f): file_in = open(f, 'r') return file_in def get_cachefile_handlers(f): f_handlers_list = [] if os.path.isdir(f): for fd in os.listdir(f): f_handlers_list.append(get_file_handler(f + '/' + fd)) return f_handlers_list def read_local_file_func(f): word_set = set() for cachefile in get_cachefile_handlers(f): for line in cachefile: word = line.strip() word_set.add(word) return word_set def mapper_func(white_list_fd): word_set = read_local_file_func(white_list_fd) for line in sys.stdin: ss = line.strip().split(' ') for s in ss: word = s.strip() #if word != "" and (word in word_set): if word != "": print "%s\t%s" % (s, 1) if __name__ == "__main__": module = sys.modules[__name__] func = getattr(module, sys.argv[1]) args = None if len(sys.argv) > 1: args = sys.argv[2:] func(*args)
red.py
#!/usr/bin/python import sys def reduer_func(): current_word = None count_pool = [] sum = 0 for line in sys.stdin: word, val = line.strip().split('\t') if current_word == None: current_word = word if current_word != word: for count in count_pool: sum += count print "%s\t%s" % (current_word, sum) current_word = word count_pool = [] sum = 0 count_pool.append(int(val)) for count in count_pool: sum += count print "%s\t%s" % (current_word, str(sum)) if __name__ == "__main__": module = sys.modules[__name__] func = getattr(module, sys.argv[1]) args = None if len(sys.argv) > 1: args = sys.argv[2:] func(*args)
run.sh
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar" INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output_cachearchive_broadcast" $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH # Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_1 \ -output $OUTPUT_PATH \ -mapper "python map.py mapper_func WH.gz" \ -reducer "python red.py reduer_func" \ -jobconf "mapred.reduce.tasks=10" \ -jobconf "mapred.job.name=cachefile_demo" \ -jobconf "mapred.compress.map.output=true" \ -jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \ -jobconf "mapred.output.compress=true" \ -jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \ -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \ -file "./map.py" \ -file "./red.py"
解释
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar" INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output_cachearchive_broadcast" $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH # Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_1 \ -output $OUTPUT_PATH \ -mapper "python map.py mapper_func WH.gz" \ -reducer "python red.py reduer_func" \ -jobconf "mapred.reduce.tasks=10" \ # 最终结果可以看到10个压缩文件 -jobconf "mapred.job.name=cachefile_demo" \ # 任务名字,会在yarn后台 web上显示的名字 -jobconf "mapred.compress.map.output=true" \ # 开启压缩,压缩格式为gzip -jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \ -jobconf "mapred.output.compress=true" \ # 输出开启压缩,压缩格式为gzip -jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \ -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \ # 将HDFS上已有的压缩文件分发给Task -file "./map.py" \ # 分发本地的map程序到计算节点 -file "./red.py" # 分发本地的reduce程序到计算节点
hadoop 运行
[root@master mr_compression]# sh run sh: run: No such file or directory [root@master mr_compression]# sh run.sh rmr: DEPRECATED: Please use 'rm -r' instead. rmr: `/output_cachearchive_broadcast': No such file or directory 19/06/29 23:22:04 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. 19/06/29 23:22:05 WARN streaming.StreamJob: -cacheArchive option is deprecated, please use -archives instead. 19/06/29 23:22:05 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead. 19/06/29 23:22:05 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces 19/06/29 23:22:05 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name 19/06/29 23:22:05 INFO Configuration.deprecation: mapred.compress.map.output is deprecated. Instead, use mapreduce.map.output.compress 19/06/29 23:22:05 INFO Configuration.deprecation: mapred.map.output.compression.codec is deprecated. Instead, use mapreduce.map.output.compress.codec 19/06/29 23:22:05 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress 19/06/29 23:22:05 INFO Configuration.deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec packageJobJar: [./map.py, ./red.py, /tmp/hadoop-unjar8973310826152543911/] [] /tmp/streamjob4114282878628426821.jar tmpDir=null 19/06/29 23:22:05 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/29 23:22:05 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/29 23:22:06 INFO mapred.FileInputFormat: Total input paths to process : 1 19/06/29 23:22:06 INFO mapreduce.JobSubmitter: number of splits:2 19/06/29 23:22:06 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0005 19/06/29 23:22:07 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0005 19/06/29 23:22:07 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0005/ 19/06/29 23:22:07 INFO mapreduce.Job: Running job: job_1561818301923_0005 19/06/29 23:22:12 INFO mapreduce.Job: Job job_1561818301923_0005 running in uber mode : false 19/06/29 23:22:12 INFO mapreduce.Job: map 0% reduce 0% 19/06/29 23:22:21 INFO mapreduce.Job: map 100% reduce 0% 19/06/29 23:22:37 INFO mapreduce.Job: map 100% reduce 10% 19/06/29 23:22:41 INFO mapreduce.Job: map 100% reduce 17% 19/06/29 23:22:42 INFO mapreduce.Job: map 100% reduce 27% 19/06/29 23:22:44 INFO mapreduce.Job: map 100% reduce 47% 19/06/29 23:22:46 INFO mapreduce.Job: map 100% reduce 70% 19/06/29 23:22:49 INFO mapreduce.Job: map 100% reduce 90% 19/06/29 23:22:50 INFO mapreduce.Job: map 100% reduce 100% 19/06/29 23:22:50 INFO mapreduce.Job: Job job_1561818301923_0005 completed successfully 19/06/29 23:22:50 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=114091 FILE: Number of bytes written=1594014 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=636491 HDFS: Number of bytes written=80938 HDFS: Number of read operations=36 HDFS: Number of large read operations=0 HDFS: Number of write operations=20 Job Counters Killed reduce tasks=1 Launched map tasks=2 Launched reduce tasks=11 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=14495 Total time spent by all reduces in occupied slots (ms)=200288 Total time spent by all map tasks (ms)=14495 Total time spent by all reduce tasks (ms)=200288 Total vcore-milliseconds taken by all map tasks=14495 Total vcore-milliseconds taken by all reduce tasks=200288 Total megabyte-milliseconds taken by all map tasks=14842880 Total megabyte-milliseconds taken by all reduce tasks=205094912 Map-Reduce Framework Map input records=2866 Map output records=111783 Map output bytes=852872 Map output materialized bytes=142941 Input split bytes=188 Combine input records=0 Combine output records=0 Reduce input groups=16984 Reduce shuffle bytes=142941 Reduce input records=111783 Reduce output records=16984 Spilled Records=223566 Shuffled Maps =20 Failed Shuffles=0 Merged Map outputs=20 GC time elapsed (ms)=5246 CPU time spent (ms)=50310 Physical memory (bytes) snapshot=2004160512 Virtual memory (bytes) snapshot=25431437312 Total committed heap usage (bytes)=1276641280 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=636303 File Output Format Counters Bytes Written=80938 19/06/29 23:22:50 INFO streaming.StreamJob: Output directory: /output_cachearchive_broadcast 查看job运行完成后的reduce结果 [root@master mr_compression]# hadoop fs -ls /output_cachearchive_broadcast Found 11 items -rw-r--r-- 3 root supergroup 0 2019-06-29 23:29 /output_cachearchive_broadcast/_SUCCESS -rw-r--r-- 3 root supergroup 8091 2019-06-29 23:29 /output_cachearchive_broadcast/part-00000.gz -rw-r--r-- 3 root supergroup 7949 2019-06-29 23:29 /output_cachearchive_broadcast/part-00001.gz -rw-r--r-- 3 root supergroup 8067 2019-06-29 23:29 /output_cachearchive_broadcast/part-00002.gz -rw-r--r-- 3 root supergroup 8244 2019-06-29 23:29 /output_cachearchive_broadcast/part-00003.gz -rw-r--r-- 3 root supergroup 7913 2019-06-29 23:29 /output_cachearchive_broadcast/part-00004.gz -rw-r--r-- 3 root supergroup 8147 2019-06-29 23:29 /output_cachearchive_broadcast/part-00005.gz -rw-r--r-- 3 root supergroup 7912 2019-06-29 23:29 /output_cachearchive_broadcast/part-00006.gz -rw-r--r-- 3 root supergroup 8257 2019-06-29 23:29 /output_cachearchive_broadcast/part-00007.gz -rw-r--r-- 3 root supergroup 8349 2019-06-29 23:29 /output_cachearchive_broadcast/part-00008.gz -rw-r--r-- 3 root supergroup 8009 2019-06-29 23:29 /output_cachearchive_broadcast/part-00009.gz
2.6 全局排序 单reduce¶
输出-单reduce
[root@master mr_allsort_1reduce_python]# ls a.txt b.txt map_sort.py red_sort.py run.sh [root@master mr_allsort_1reduce_python]# cat map_sort.py #!/usr/local/bin/python import sys #base_count = 10000 base_count = 99999 for line in sys.stdin: ss = line.strip().split('\t') key = ss[0] val = ss[1] new_key = base_count - int(key) #new_key = base_count + int(key) print "%s\t%s" % (new_key, val) [root@master mr_allsort_1reduce_python]# cat red_sort.py #!/usr/local/bin/python import sys #base_value = 10000 base_value = 99999 for line in sys.stdin: key, val = line.strip().split('\t') #print str(int(key) - base_value) + "\t" + val print str(base_value - int(key)) + "\t" + val [root@master mr_allsort_1reduce_python]# cat run.sh #set -e -x HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar" INPUT_FILE_PATH_A="/a.txt" INPUT_FILE_PATH_B="/b.txt" OUTPUT_SORT_PATH="/output_sort" $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH # Step 3. $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\ -output $OUTPUT_SORT_PATH \ -mapper "python map_sort.py" \ -reducer "python red_sort.py" \ -jobconf "mapred.reduce.tasks=1" \ -file ./map_sort.py \ -file ./red_sort.py \
升降序
降序: print str(int(key) - base_value) + "\t" + val 升序: print str(int(key) + base_value) + "\t" + val
hadoop运行
[root@master mr_allsort_1reduce_python]# sh run.sh rmr: DEPRECATED: Please use 'rm -r' instead. rmr: `/output_sort': No such file or directory 19/06/30 00:14:35 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. 19/06/30 00:14:36 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead. 19/06/30 00:14:36 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces packageJobJar: [./map_sort.py, ./red_sort.py, /tmp/hadoop-unjar5718119915866454712/] [] /tmp/streamjob1423052824935220100.jar tmpDir=null 19/06/30 00:14:36 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/30 00:14:36 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/30 00:14:37 INFO mapreduce.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn/staging/root/.staging/job_1561818301923_0007 19/06/30 00:14:37 ERROR streaming.StreamJob: Error Launching job : Input path does not exist: hdfs://master:9000/a.txt Input path does not exist: hdfs://master:9000/b.txt Streaming Command Failed! [root@master mr_allsort_1reduce_python]# ls a.txt b.txt map_sort.py red_sort.py run.sh [root@master mr_allsort_1reduce_python]# hadoop fs -put a.txt b.txt / [root@master mr_allsort_1reduce_python]# sh run.sh rmr: DEPRECATED: Please use 'rm -r' instead. rmr: `/output_sort': No such file or directory 19/06/30 00:15:03 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. 19/06/30 00:15:03 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead. 19/06/30 00:15:03 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces packageJobJar: [./map_sort.py, ./red_sort.py, /tmp/hadoop-unjar5542916557294836465/] [] /tmp/streamjob810285156858649674.jar tmpDir=null 19/06/30 00:15:04 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/30 00:15:04 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/30 00:15:04 INFO mapred.FileInputFormat: Total input paths to process : 2 19/06/30 00:15:04 INFO mapreduce.JobSubmitter: number of splits:3 19/06/30 00:15:04 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0008 19/06/30 00:15:05 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0008 19/06/30 00:15:05 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0008/ 19/06/30 00:15:05 INFO mapreduce.Job: Running job: job_1561818301923_0008 19/06/30 00:15:10 INFO mapreduce.Job: Job job_1561818301923_0008 running in uber mode : false 19/06/30 00:15:10 INFO mapreduce.Job: map 0% reduce 0% 19/06/30 00:15:17 INFO mapreduce.Job: map 33% reduce 0% 19/06/30 00:15:18 INFO mapreduce.Job: map 100% reduce 0% 19/06/30 00:15:22 INFO mapreduce.Job: map 100% reduce 100% 19/06/30 00:15:22 INFO mapreduce.Job: Job job_1561818301923_0008 completed successfully 19/06/30 00:15:22 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=1419 FILE: Number of bytes written=445371 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=1173 HDFS: Number of bytes written=899 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=3 Launched reduce tasks=1 Data-local map tasks=3 Total time spent by all maps in occupied slots (ms)=16708 Total time spent by all reduces in occupied slots (ms)=2409 Total time spent by all map tasks (ms)=16708 Total time spent by all reduce tasks (ms)=2409 Total vcore-milliseconds taken by all map tasks=16708 Total vcore-milliseconds taken by all reduce tasks=2409 Total megabyte-milliseconds taken by all map tasks=17108992 Total megabyte-milliseconds taken by all reduce tasks=2466816 Map-Reduce Framework Map input records=101 Map output records=101 Map output bytes=1211 Map output materialized bytes=1431 Input split bytes=228 Combine input records=0 Combine output records=0 Reduce input groups=101 Reduce shuffle bytes=1431 Reduce input records=101 Reduce output records=101 Spilled Records=202 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=660 CPU time spent (ms)=3980 Physical memory (bytes) snapshot=931897344 Virtual memory (bytes) snapshot=8446369792 Total committed heap usage (bytes)=674758656 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=945 File Output Format Counters Bytes Written=899 19/06/30 00:15:22 INFO streaming.StreamJob: Output directory: /output_sort [root@master mr_allsort_1reduce_python]# hadoop fs -ls /output_sort Found 2 items -rw-r--r-- 3 root supergroup 0 2019-06-30 00:15 /output_sort/_SUCCESS -rw-r--r-- 3 root supergroup 899 2019-06-30 00:15 /output_sort/part-00000 [root@master mr_allsort_1reduce_python]# hadoop fs -text /output_sort/part-00000|head 100 java 99 hadoop 98 java 97 hadoop 96 java 95 hadoop 94 java 93 hadoop 92 java 91 hadoop
2.7 全部排序 多reduce¶
[root@master mr_allsort_python]# ls a.txt b.txt map_sort.py red_sort.py run.sh [root@master mr_allsort_python]# cat a.txt 1 hadoop 3 hadoop 5 hadoop 7 hadoop 9 hadoop [root@master mr_allsort_python]# cat b.txt 0 java 2 java 4 java 6 java 8 java 10 java [root@master mr_allsort_python]# cat map_sort.py #!/usr/local/bin/python import sys base_count = 10000 for line in sys.stdin: ss = line.strip().split('\t') key = ss[0] val = ss[1] new_key = base_count + int(key) red_idx = 1 if new_key < (10000 + 10010) / 2: red_idx = 0 print "%s\t%s\t%s" % (red_idx, new_key, val) [root@master mr_allsort_python]# cat red_sort.py #!/usr/local/bin/python import sys base_count = 10000 for line in sys.stdin: idx_id, key, val = line.strip().split('\t') new_key = int(key) - base_count print '\t'.join([str(new_key), val]) [root@master mr_allsort_python]# cat run.sh HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar" INPUT_FILE_PATH_A="/a.txt" INPUT_FILE_PATH_B="/b.txt" OUTPUT_SORT_PATH="/cmz_output_sort" $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH # Step 3. $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\ -output $OUTPUT_SORT_PATH \ -mapper "python map_sort.py" \ -reducer "python red_sort.py" \ -file ./map_sort.py \ -file ./red_sort.py \ -jobconf mapred.reduce.tasks=2 \ -jobconf stream.num.map.output.key.fields=2 \ -jobconf num.key.fields.for.partition=1 \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
运行
[root@master mr_allsort_python]# sh run.sh rmr: DEPRECATED: Please use 'rm -r' instead. Deleted /cmz_output_sort 19/06/30 12:01:12 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead. 19/06/30 12:01:12 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead. 19/06/30 12:01:12 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces packageJobJar: [./map_sort.py, ./red_sort.py, /tmp/hadoop-unjar2785950129081995646/] [] /tmp/streamjob990761707239727652.jar tmpDir=null 19/06/30 12:01:12 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/30 12:01:13 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/30 12:01:13 INFO mapred.FileInputFormat: Total input paths to process : 2 19/06/30 12:01:13 INFO mapreduce.JobSubmitter: number of splits:2 19/06/30 12:01:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0015 19/06/30 12:01:13 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0015 19/06/30 12:01:13 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0015/ 19/06/30 12:01:13 INFO mapreduce.Job: Running job: job_1561818301923_0015 19/06/30 12:01:17 INFO mapreduce.Job: Job job_1561818301923_0015 running in uber mode : false 19/06/30 12:01:17 INFO mapreduce.Job: map 0% reduce 0% 19/06/30 12:01:23 INFO mapreduce.Job: map 100% reduce 0% 19/06/30 12:01:28 INFO mapreduce.Job: map 100% reduce 100% 19/06/30 12:01:28 INFO mapreduce.Job: Job job_1561818301923_0015 completed successfully 19/06/30 12:01:28 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=187 FILE: Number of bytes written=444696 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=240 HDFS: Number of bytes written=88 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Job Counters Launched map tasks=2 Launched reduce tasks=2 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=4630 Total time spent by all reduces in occupied slots (ms)=5008 Total time spent by all map tasks (ms)=4630 Total time spent by all reduce tasks (ms)=5008 Total vcore-milliseconds taken by all map tasks=4630 Total vcore-milliseconds taken by all reduce tasks=5008 Total megabyte-milliseconds taken by all map tasks=4741120 Total megabyte-milliseconds taken by all reduce tasks=5128192 Map-Reduce Framework Map input records=11 Map output records=11 Map output bytes=153 Map output materialized bytes=199 Input split bytes=152 Combine input records=0 Combine output records=0 Reduce input groups=11 Reduce shuffle bytes=199 Reduce input records=11 Reduce output records=11 Spilled Records=22 Shuffled Maps =4 Failed Shuffles=0 Merged Map outputs=4 GC time elapsed (ms)=207 CPU time spent (ms)=3010 Physical memory (bytes) snapshot=834572288 Virtual memory (bytes) snapshot=8461389824 Total committed heap usage (bytes)=573571072 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=88 File Output Format Counters Bytes Written=88 19/06/30 12:01:28 INFO streaming.StreamJob: Output directory: /cmz_output_sort [root@master mr_allsort_python]# hadoop fs -text /cmz_output_sort/part-00001 5 hadoop 6 java 7 hadoop 8 java 9 hadoop 10 java [root@master mr_allsort_python]# hadoop fs -text /cmz_output_sort/part-00000 0 java 1 hadoop 2 java 3 hadoop 4 java
2.8 自定义排序¶
partition:分桶过程,用户输出的key经过partition分发到不同的reduce里,因而partitioner就是分桶器,一般使用平台默认的hash分桶,也可以用户自己指定。
key:是需要排序的字段,相同分桶&&相同key的行,排序到一起。
例子:用来搭配不同的参数跑出真实作业的结果来演示这些参数的使用方法。
[root@master mr_allsort_python2]# ls aaa.txt map_sort.py red_sort.py run.sh [root@master mr_allsort_python2]# hadoop fs -ls /aaa.txt -rw-r--r-- 3 root supergroup 60 2019-06-30 12:22 /aaa.txt [root@master mr_allsort_python2]# cat aaa.txt d.1.5.23 e.9.4.5 e.5.9.22 e.5.1.45 e.5.1.23 a.7.2.6 f.8.3.3 [root@master mr_allsort_python2]# cat map_sort.py #!/usr/local/bin/python import sys for line in sys.stdin: ss = line.strip().split('\t') key = ss[0] val = ss[1] print "%s\t%s" % (key, val) [root@master mr_allsort_python2]# cat red_sort.py #!/usr/local/bin/python ▽mport sys for line in sys.stdin: print line.strip() [root@master mr_allsort_python2]# cat run.sh HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar" INPUT_FILE_PATH_A="/aaa.txt" OUTPUT_SORT_PATH="/output_sort" $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH # Step 3. $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_A \ -output $OUTPUT_SORT_PATH \ -mapper "cat" \ -reducer "cat" \ -jobconf stream.num.map.output.key.fields=3 \ -jobconf stream.map.output.field.separator=. \ -jobconf mapred.text.key.partitioner.options=-k2,3 \ -jobconf mapred.reduce.tasks=3
运行
[root@master mr_allsort_python2]# sh run.sh rmr: DEPRECATED: Please use 'rm -r' instead. Deleted /output_sort 19/06/30 12:29:37 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead. packageJobJar: [/tmp/hadoop-unjar1891359708360072922/] [] /tmp/streamjob5747616032102106944.jar tmpDir=null 19/06/30 12:29:37 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/30 12:29:38 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032 19/06/30 12:29:38 INFO mapred.FileInputFormat: Total input paths to process : 1 19/06/30 12:29:38 INFO mapreduce.JobSubmitter: number of splits:2 19/06/30 12:29:38 INFO Configuration.deprecation: mapred.text.key.partitioner.options is deprecated. Instead, use mapreduce.partition.keypartitioner.options 19/06/30 12:29:38 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces 19/06/30 12:29:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0016 19/06/30 12:29:39 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0016 19/06/30 12:29:39 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0016/ 19/06/30 12:29:39 INFO mapreduce.Job: Running job: job_1561818301923_0016 19/06/30 12:29:44 INFO mapreduce.Job: Job job_1561818301923_0016 running in uber mode : false 19/06/30 12:29:44 INFO mapreduce.Job: map 0% reduce 0% 19/06/30 12:29:48 INFO mapreduce.Job: map 100% reduce 0% 19/06/30 12:29:53 INFO mapreduce.Job: map 100% reduce 33% 19/06/30 12:29:54 INFO mapreduce.Job: map 100% reduce 67% 19/06/30 12:29:55 INFO mapreduce.Job: map 100% reduce 100% 19/06/30 12:29:55 INFO mapreduce.Job: Job job_1561818301923_0016 completed successfully 19/06/30 12:29:55 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=92 FILE: Number of bytes written=548228 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=246 HDFS: Number of bytes written=60 HDFS: Number of read operations=15 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Killed reduce tasks=1 Launched map tasks=2 Launched reduce tasks=3 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=4835 Total time spent by all reduces in occupied slots (ms)=8760 Total time spent by all map tasks (ms)=4835 Total time spent by all reduce tasks (ms)=8760 Total vcore-milliseconds taken by all map tasks=4835 Total vcore-milliseconds taken by all reduce tasks=8760 Total megabyte-milliseconds taken by all map tasks=4951040 Total megabyte-milliseconds taken by all reduce tasks=8970240 Map-Reduce Framework Map input records=7 Map output records=7 Map output bytes=60 Map output materialized bytes=110 Input split bytes=156 Combine input records=0 Combine output records=0 Reduce input groups=6 Reduce shuffle bytes=110 Reduce input records=7 Reduce output records=7 Spilled Records=14 Shuffled Maps =6 Failed Shuffles=0 Merged Map outputs=6 GC time elapsed (ms)=344 CPU time spent (ms)=4810 Physical memory (bytes) snapshot=976941056 Virtual memory (bytes) snapshot=10577739776 Total committed heap usage (bytes)=656932864 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=90 File Output Format Counters Bytes Written=60 19/06/30 12:29:55 INFO streaming.StreamJob: Output directory: /output_sort [root@master mr_allsort_python2]# hadoop fs -ls /output ^[[A^Cls: `/output': No such file or directory ^[[A[root@master mr_allsort_python2]# hadoop fs -ls /output_sort Found 4 items -rw-r--r-- 3 root supergroup 0 2019-06-30 12:29 /output_sort/_SUCCESS -rw-r--r-- 3 root supergroup 8 2019-06-30 12:29 /output_sort/part-00000 -rw-r--r-- 3 root supergroup 26 2019-06-30 12:29 /output_sort/part-00001 -rw-r--r-- 3 root supergroup 26 2019-06-30 12:29 /output_sort/part-00002 [root@master mr_allsort_python2]# hadoop fs -text /output_sort/part-00000 e.9.4 5 [root@master mr_allsort_python2]# hadoop fs -text /output_sort/part-00001 a.7.2 6 d.1.5 23 e.5.9 22 [root@master mr_allsort_python2]# hadoop fs -text /output_sort/part-00002 e.5.1 45 e.5.1 23 f.8.3 3
解释
[root@master mr_allsort_python2]# cat run.sh HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar" INPUT_FILE_PATH_A="/aaa.txt" OUTPUT_SORT_PATH="/output_sort" $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH # 删除存在的目录 # Step 3. $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_A \ # hdfs 源,数据输入的源 -output $OUTPUT_SORT_PATH \ # hdfs 输出源, -mapper "cat" \ # 对输入不处理 -reducer "cat" \ # 对输出不处理 -jobconf stream.num.map.output.key.fields=3 \ # 源输入进行三等分 -jobconf stream.map.output.field.separator=. \ # 以.为分割 -jobconf mapred.text.key.partitioner.options=-k2,3 \ # 以第2,3列为一个桶排序也就是上面5.1这样的来分桶 -jobconf mapred.reduce.tasks=3 # 分三个桶
2.9 join¶
3. mapreduce 任务¶
我们除了在系统中查看MapReduce任务。其实我们还可以通过yarn的web界面查看MapReduce任务。
后台地址:
http://master:8088/cluster