MapReduce Streaming

1. streaming

https://hadoop.apache.org/docs/r2.7.1/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredCommands.html

http://hadoop.apache.org/docs/r1.2.1/streaming.html#Hadoop+Streaming

1.1 介绍

  Hadoop streaming能够让Ruby、Python、PHP、和C++等非Java类语言编写的map或reduce程序在hadoop集群上运行,且map/reduce程序只要遵循从标准输入stdin,到标准输出stdout即可。

  • MapReduceHDFS采用Java实现,默认提供Java编程接口
  • Streaming框架允许任何程序语言实现的程序在Hadoop MapReduce中 使用
  • Streaming方便已有程序向Hadoop平台移植

1.2 原理

MapReduce计算流程

MapReduce计算流程

MapReduce计算流程

1.3 优点

  • 开发效率高
  • 方便移植Hadoop平台,只需按照一定的格式从标准输入读取数据、向标以
  • 原有的单机程序稍加改动就可以在Hadoop平台进行分布式处理
  • 容易单机调试**cat input | mapper | sort | reducer > output**

  • 程序运行效率

  • 对于CPU密集的计算,有些语言如C/C++编写的程序可能比用Java编写的程序效率更高一些

  • 便于平台进行资源控制

  • Streaming框架中通过limit等方式可以灵活地限制应用程序使用的内存等资源

1.4 局限

  • Streaming默认只能处理文本数据,如果要对二进制数据进行处理,比较好的方法是将二进

制的key和value进行base64的编码转化成文本即可

  • 两次数据拷贝和解析(分割),带来一定的开销

1.5 工作流程

Hadoop Streaming的工作流程大概如下:

  • hadoop-streaming.jar向Hadoop集群注册一个新的job,传入input path和output path等
  • 开始mapper时,Hadoop Streaming会将输入文件按行传入stdin
  • 我们自己编写的mapper需要读取stdin每一行,对其进行处理
  • mapper处理完毕的中间结果也要写入stdout,在Python中print语句默认即输出到stdout,当然若是不放心,也可以手动指定输出流。对于输出到stdout中的每一行,hadoop将默认以’\t’作为分隔符切割成k-v的格式。
  • mapper处理结束后,Hadoop 自动进行partition、sort、group,准备进行reduce任务
  • Hadoop Streaming将中间结果按行传给reducer
  • 我们编写的reducer需要读取stdin的每一行,对其进行处理
  • 处理结束之后仍然输出到stdout中
  • Hadoop 转存到output path中

1.6 Hadoop命令使用

参数 可选/必选 解释
-input 必选 输入文件路径
-output 必选 输出文件路径
-mapper 必选 用户自己写的mapper程序,可以是可执行文件或者脚本
-reducer 必选 用户自己写的reducer程序,可以是可执行文件或者脚本
-file 可选 打包文件到提交的作业中,可以是mapper
或者reducer要用的输入文件,如配置文件,字典等
-partitioner 可选 用户自定义的partitioner程序
-combiner 可选 用户自定义的combiner程序(必须用java实现)
参数 可选/必选 参数描述
-input
支持*通配符,指定多个文件或目录,多次-input,指定多个输入文件/目录
必选 Mapper的输入数据,文件要在任务提交前手动上传到HDFS
-output
# 路径不能已存在,否则认为是其他job的输出
必选 reducer输出结果的HDFS存放路径, 不能已存在,但脚本中一定要配置
-mapper <可执行命令或java类>
-mapper “python map.py”
-mapper “bash map.sh”
-mapper “perl map.perl”
必选 Mapper程序
-reducer <可执行命令或java类>
-reducer “python reducer.py”
-reducer “bash reducer.sh”
-reducer “perl reducer.sh”
可选 Reducer程序,不需要reduce处理就不指定
-combiner <可执行命令或java类>
-combiner “python map.py”
-combiner “bash map.sh”
-combiner “perl map.perl”
可选 处理mapper输出的combiner程序
-file <本地mapper、reducer程序文件、程序运行需要的其他文件>
-file map.py
-file reduce.py
-file white_list
可选 文件在本地,小文件 将本地文件分发给计算节点 文件作为作业的一部分,一起被打包并提交,所有分发的文件最终会被放置在datanode该job的同一个专属目录下:jobcache/job_xxx/jar
-cacheFile hdfs://master:9000/cachefile_dir/white_list 分发HDFS文件Job运行需要的程序,辅助文件都先放到HDFS上,指定HDFS文件路径,将HDFS文件拷贝到计算节点,也是都放置在job的同一个专属目录下:jobcache/job_xxx/jar
-cacheArchive “hdfs://master:9000/w.tar.gz#WLDIR” 分发HDFS压缩文件、压缩文件内部具有目录结构
-numReduceTasks <数字> -numReduceTasks 2 可选 指定该任务的reducer个数
-inputformat 可选 指定自己定义的inputformat类,默认TextInputformat类
-outputformat 可选 指定自己定义的outputformat类,默认TextOutputformat类
-cmdenv name=value 可选 传递给streaming命令的环境变量

2. 实验

2.1 统词

  Hadoop是使用Java语言编写的,所以最直接的方式的就是使用Java语言来实现Mapper和Reducer,然后配置MapReduce Job,提交到集群计算环境来完成计算。但是很多开发者可能对Java并不熟悉,而是对一些具有脚本特性的语言,如C++、Shell、Python、 Ruby、PHP、Perl有实际开发经验,Hadoop Streaming为这一类开发者提供了使用Hadoop集群来进行处理数据的工具,即工具包hadoop-streaming-*.jar。

  Hadoop Streaming使用了Unix的标准输入输出作为Hadoop和其他编程语言的开发接口,因此在其他的编程语言所写的程序中,只需要将标准输入作为程 序的输入,将标准输出作为程序的输出就可以了。在标准的输入输出中,Key和Value是以Tab作为分隔符,并且在Reducer的标准输入中,Hadoop框架保证了输入的数据是经过了按Key排序的。

  如何使用Hadoop Streaming工具呢?我们可以查看该工具的使用方法,通过命令行来获取,如下所示

[root@master ~]# hadoop jar /usr/local/hadoop-2.6.5/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.5.jar
An example program must be given as the first argument.
Valid program names are:
  aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files.
  aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files.
  bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi.
  dbcount: An example job that count the pageview counts from a database.
  distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi.
  grep: A map/reduce program that counts the matches of a regex in the input.
  join: A job that effects a join over sorted, equally partitioned datasets
  multifilewc: A job that counts words from several files.
  pentomino: A map/reduce tile laying program to find solutions to pentomino problems.
  pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method.
  randomtextwriter: A map/reduce program that writes 10GB of random textual data per node.
  randomwriter: A map/reduce program that writes 10GB of random data per node.
  secondarysort: An example defining a secondary sort to the reduce.
  sort: A map/reduce program that sorts the data written by the random writer.
  sudoku: A sudoku solver.
  teragen: Generate data for the terasort
  terasort: Run the terasort
  teravalidate: Checking results of terasort
  wordcount: A map/reduce program that counts the words in the input files.
  wordmean: A map/reduce program that counts the average length of the words in the input files.
  wordmedian: A map/reduce program that counts the median length of the words in the input files.
  wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files.
我们分别选择几个可以使用Hadoop Streaming工具来进行计算的例子,比如对单词词频进行统计计算,即WordCount功能。首先,我们准备测试使用的数据集,如下所示.

[root@master ~]# mkdir -p /usr/local/src/tmp/
[root@master ~]# cd /usr/local/src/tmp/
[root@master tmp]# cp /etc/hosts /etc/passwd .
[root@master tmp]# hadoop fs -ls /
Found 8 items
drwxr-xr-x   - root supergroup          0 2019-06-24 11:01 /hbase
-rw-r--r--   3 root supergroup        225 2019-06-23 17:13 /hosts
drwxr-xr-x   - root supergroup          0 2019-06-26 20:22 /inputdata
drwxr-xr-x   - root supergroup          0 2019-06-26 20:21 /outputdata
drwxr-xr-x   - root supergroup          0 2019-06-26 20:26 /outputdata1
drwxr-xr-x   - root supergroup          0 2019-06-26 20:27 /outputdata2
drwx-wx-wx   - root supergroup          0 2019-06-26 20:23 /tmp
drwxr-xr-x   - root supergroup          0 2019-06-27 20:55 /user
[root@master tmp]# hadoop fs -mkdir /streaming
[root@master tmp]# hadoop fs -put /etc/hosts /etc/passwd /streaming
[root@master tmp]# hadoop fs -ls /streaming
Found 2 items
-rw-r--r--   3 root supergroup        225 2019-06-29 15:56 /streaming/hosts
-rw-r--r--   3 root supergroup        997 2019-06-29 15:56 /streaming/passwd
[root@master tmp]# 
下面,选择Python语言来实现MapReduce Job的运行。使用Python实现Mapper,代码文件为word_count_mapper.py
[root@master tmp]# cat word_count_mapper.py 
#!/usr/bin/env python

import sys

for line in sys.stdin:
    line = line.strip()
    words = filter(lambda word: word, line.split())
    for word in words:
        print '%s\t%s' % (word, 1)
使用Python实现Reducer,代码文件为word_count_reducer.py,代码如下所示
[root@master tmp]# cat word_count_reducer.py 
#!/usr/bin/env python

import sys
from operator import itemgetter

wc_dict = {}

for line in sys.stdin:
    line = line.strip()
    word, count = line.split()
    try:
        count = int(count)
        wc_dict[word] = wc_dict.get(word, 0) + count
    except ValueError:
        pass

sorted_dict = sorted(wc_dict.items(), key=itemgetter(0))
for word, count in sorted_dict:
    print '%s\t%s' % (word, count)
[root@master tmp]# cat run.sh 
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input /streaming/ \
-output /streaming_output \
-mapper word_count_mapper.py \
-reducer word_count_reducer.py \
-numReduceTasks 2 \
-file *.py

MapReduce

手动测试
[root@master tmp]# cat hosts | python word_count_mapper.py 
127.0.0.1   1
localhost   1
localhost.localdomain   1
localhost4  1
localhost4.localdomain4 1
::1 1
localhost   1
localhost.localdomain   1
localhost6  1
localhost6.localdomain6 1
192.168.186.10  1
master  1
192.168.186.11  1
slave1  1
192.168.186.12  1
slave2  1
[root@master tmp]# cat hosts | python word_count_mapper.py |python word_count_reducer.py 
127.0.0.1   1
192.168.186.10  1
192.168.186.11  1
192.168.186.12  1
::1 1
localhost   2
localhost.localdomain   2
localhost4  1
localhost4.localdomain4 1
localhost6  1
localhost6.localdomain6 1
master  1
slave1  1
slave2  1

手动测试mapreduce成功后,在使用hadoop测试运行

hadoop 运行
[root@master tmp]# sh run.sh 
19/06/29 16:05:33 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [word_count_mapper.py, word_count_reducer.py, /tmp/hadoop-unjar3187438588282227932/] [] /tmp/streamjob6299520794908170392.jar tmpDir=null
19/06/29 16:05:34 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/29 16:05:34 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/29 16:05:35 INFO mapred.FileInputFormat: Total input paths to process : 2
19/06/29 16:05:35 INFO mapreduce.JobSubmitter: number of splits:3
19/06/29 16:05:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561552001692_0003
19/06/29 16:05:35 INFO impl.YarnClientImpl: Submitted application application_1561552001692_0003
19/06/29 16:05:35 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561552001692_0003/
19/06/29 16:05:35 INFO mapreduce.Job: Running job: job_1561552001692_0003
19/06/29 16:05:45 INFO mapreduce.Job: Job job_1561552001692_0003 running in uber mode : false
19/06/29 16:05:45 INFO mapreduce.Job:  map 0% reduce 0%
19/06/29 16:05:56 INFO mapreduce.Job:  map 100% reduce 0%
19/06/29 16:06:03 INFO mapreduce.Job:  map 100% reduce 50%
19/06/29 16:06:04 INFO mapreduce.Job:  map 100% reduce 100%
19/06/29 16:06:04 INFO mapreduce.Job: Job job_1561552001692_0003 completed successfully
19/06/29 16:06:04 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=1415
        FILE: Number of bytes written=556266
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=1868
        HDFS: Number of bytes written=1271
        HDFS: Number of read operations=15
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Job Counters 
        Launched map tasks=3
        Launched reduce tasks=2
        Data-local map tasks=3
        Total time spent by all maps in occupied slots (ms)=28325
        Total time spent by all reduces in occupied slots (ms)=9302
        Total time spent by all map tasks (ms)=28325
        Total time spent by all reduce tasks (ms)=9302
        Total vcore-milliseconds taken by all map tasks=28325
        Total vcore-milliseconds taken by all reduce tasks=9302
        Total megabyte-milliseconds taken by all map tasks=29004800
        Total megabyte-milliseconds taken by all reduce tasks=9525248
    Map-Reduce Framework
        Map input records=28
        Map output records=48
        Map output bytes=1307
        Map output materialized bytes=1439
        Input split bytes=260
        Combine input records=0
        Combine output records=0
        Reduce input groups=46
        Reduce shuffle bytes=1439
        Reduce input records=48
        Reduce output records=46
        Spilled Records=96
        Shuffled Maps =6
        Failed Shuffles=0
        Merged Map outputs=6
        GC time elapsed (ms)=1527
        CPU time spent (ms)=11410
        Physical memory (bytes) snapshot=1034854400
        Virtual memory (bytes) snapshot=10570633216
        Total committed heap usage (bytes)=756547584
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=1608
    File Output Format Counters 
        Bytes Written=1271
19/06/29 16:06:04 INFO streaming.StreamJob: Output directory: /streaming_output

检查

[root@master tmp]# hadoop fs -ls /
Found 10 items
drwxr-xr-x   - root supergroup          0 2019-06-24 11:01 /hbase
-rw-r--r--   3 root supergroup        225 2019-06-23 17:13 /hosts
drwxr-xr-x   - root supergroup          0 2019-06-26 20:22 /inputdata
drwxr-xr-x   - root supergroup          0 2019-06-26 20:21 /outputdata
drwxr-xr-x   - root supergroup          0 2019-06-26 20:26 /outputdata1
drwxr-xr-x   - root supergroup          0 2019-06-26 20:27 /outputdata2
drwxr-xr-x   - root supergroup          0 2019-06-29 15:56 /streaming
drwxr-xr-x   - root supergroup          0 2019-06-29 16:06 /streaming_output
drwx-wx-wx   - root supergroup          0 2019-06-26 20:23 /tmp
drwxr-xr-x   - root supergroup          0 2019-06-27 20:55 /user
[root@master tmp]# hadoop fs -ls /streaming_output
Found 3 items
-rw-r--r--   3 root supergroup          0 2019-06-29 16:06 /streaming_output/_SUCCESS
-rw-r--r--   3 root supergroup        737 2019-06-29 16:06 /streaming_output/part-00000
-rw-r--r--   3 root supergroup        534 2019-06-29 16:06 /streaming_output/part-00001

查看内容
[root@master tmp]# hadoop fs -text /streaming_output/part-00000 
127.0.0.1   1
192.168.186.10  1
192.168.186.12  1
::1 1
SSH:/var/empty/sshd:/sbin/nologin   1
bus:/:/sbin/nologin 1
chrony:x:998:996::/var/lib/chrony:/sbin/nologin 1
daemon:x:2:2:daemon:/sbin:/sbin/nologin 1
dbus:x:81:81:System 1
for 1
games:x:12:100:games:/usr/games:/sbin/nologin   1
halt:x:7:0:halt:/sbin:/sbin/halt    1
localhost   2
localhost4  1
localhost6  1
mail:x:8:12:mail:/var/spool/mail:/sbin/nologin  1
message 1
nobody:x:99:99:Nobody:/:/sbin/nologin   1
operator:x:11:0:operator:/root:/sbin/nologin    1
polkitd:x:999:998:User  1
postfix:x:89:89::/var/spool/postfix:/sbin/nologin   1
shutdown:x:6:0:shutdown:/sbin:/sbin/shutdown    1
slave2  1
sshd:x:74:74:Privilege-separated    1
systemd-network:x:192:192:systemd   1
user:/run/saslauthd:/sbin/nologin   1
[root@master tmp]# hadoop fs -text /streaming_output/part-00001
192.168.186.11  1
Management:/:/sbin/nologin  1
Network 1
Server:/var/lib/mysql:/bin/false    1
User:/var/ftp:/sbin/nologin 1
adm:x:3:4:adm:/var/adm:/sbin/nologin    1
bin:x:1:1:bin:/bin:/sbin/nologin    1
ftp:x:14:50:FTP 1
localhost.localdomain   2
localhost4.localdomain4 1
localhost6.localdomain6 1
lp:x:4:7:lp:/var/spool/lpd:/sbin/nologin    1
master  1
mysql:x:27:27:MySQL 1
ntp:x:38:38::/etc/ntp:/sbin/nologin 1
polkitd:/:/sbin/nologin 1
root:x:0:0:root:/root:/bin/bash 1
saslauth:x:997:76:Saslauthd 1
slave1  1
sync:x:5:0:sync:/sbin:/bin/sync 1

2.2 file

  如果程序运行所需要的可执行文件、脚本或者配置文件在Hadoop集群的计算节点上不存在,则首先需要将这些文件分发到集群上才能成功进行计算。Hadoop提供了自动分发文件和压缩包的机制,只需要在启动Streaming作业时配置相应的参数。

  • 如果要分发的文件在本地且没有目录结构,可以使用-file /path/to/FILENAME选项

分发文件,将本地文件/path/to/FILENAME分发到每个计算节点。

  • 在Streaming程序中通过./FILENAME就可以访问该文件

  • 对于本地可执行的文件,除了指定的mapper或reducer程序外,可能分发后没有可

执行权限,所以需要在包装程序如mapper.sh中运行chmod +x ./FILENAME设置

可执行权限,然后设置-mapper “mapper.sh”。

  • 顺便注意下 ./mapred/local/taskTracker/root/jobcache/job_201704060437_xxxx这个目录

指定计算白名单内单词的wordcount

map.py
#!/usr/bin/python
import sys
import time
def read_local_file_func(f):
    word_set = set()
    file_in = open(f, 'r')
    for line in file_in:
        word = line.strip()
        word_set.add(word)
    return word_set

def mapper_func(white_list_fd):
    word_set = read_local_file_func(white_list_fd)

    for line in sys.stdin:
        ss = line.strip().split(' ')
        for s in ss:
        time.sleep(100)
            print "===="
            word = s.strip()
            if word != "" and (word in word_set):
        #print s + "\t" + "1"
        #print '\t'.join([s, "1"])
                print "%s\t%s" % (s, 1)

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)
red.py
#!/usr/bin/python
import sys
def reduer_func():
    current_word = None
    count_pool = []
    sum = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')
        if current_word == None:
            current_word = word
        if current_word != word:
            for count in count_pool:
                sum += count
            print "%s\t%s" % (current_word, sum)
            current_word = word
            count_pool = []
            sum = 0
        count_pool.append(int(val))
    for count in count_pool:
        sum += count
    print "%s\t%s" % (current_word, str(sum))

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
        func(*args)
run.sh
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
OUTPUT_PATH="/output_file_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func white_list" \
    -reducer "python red.py reduer_func" \
    -jobconf "mapred.reduce.tasks=3" \
    -file ./map.py \
    -file ./red.py \
    -file ./white_list


#$HADOOP_CMD jar $STREAM_JAR_PATH -input $INPUT_FILE_PATH_1 -output $OUTPUT_PATH -mapper "python map.py mapp
# er_func white_list" -reducer "python red.py reduer_func" -jobconf "mapred.reduce.tasks=3" -file ./map.py -file ./red.py -file ./white_list
white_list
suitable
against
recent

手动测试

[root@master mr_file_broadcast]# cat The_Man_of_Property.txt | python map.py mapper_func white_list | sort -k1 | python red.py reduer_func > cmz.out
[root@master mr_file_broadcast]# cat cmz.out 
against 93
recent  2
suitable    2
脚本
上传
[root@master mr_file_broadcast]# hadoop fs -ls /
Found 4 items
-rw-r--r--   3 root supergroup    1896621 2019-06-21 13:54 /1.data
drwxr-xr-x   - root supergroup          0 2019-06-21 14:03 /output
-rw-r--r--   3 root supergroup        944 2019-06-20 19:50 /passwd
drwx------   - root supergroup          0 2019-06-21 14:03 /tmp
[root@master mr_file_broadcast]# hadoop fs -put The_Man_of_Property.txt /
[root@master mr_file_broadcast]# hadoop fs -ls /
Found 5 items
-rw-r--r--   3 root supergroup    1896621 2019-06-21 13:54 /1.data
-rw-r--r--   3 root supergroup     632207 2019-06-21 21:11 /The_Man_of_Property.txt
drwxr-xr-x   - root supergroup          0 2019-06-21 14:03 /output
-rw-r--r--   3 root supergroup        944 2019-06-20 19:50 /passwd
drwx------   - root supergroup          0 2019-06-21 14:03 /tmp

hadoop测试
[root@master mr_file_broadcast]# bash run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
rmr: `/output_file_broadcast': No such file or directory
19/06/21 21:11:55 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/21 21:11:55 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/21 21:11:55 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
packageJobJar: [./map.py, ./red.py, ./white_list, /tmp/hadoop-unjar5356561627923561485/] [] /tmp/streamjob514955250889635986.jar tmpDir=null
19/06/21 21:11:55 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/21 21:11:55 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/21 21:11:56 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/21 21:11:56 INFO mapreduce.JobSubmitter: number of splits:2
19/06/21 21:11:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561122558196_0002
19/06/21 21:11:57 INFO impl.YarnClientImpl: Submitted application application_1561122558196_0002
19/06/21 21:11:57 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561122558196_0002/
19/06/21 21:11:57 INFO mapreduce.Job: Running job: job_1561122558196_0002
19/06/21 21:12:03 INFO mapreduce.Job: Job job_1561122558196_0002 running in uber mode : false
19/06/21 21:12:03 INFO mapreduce.Job:  map 0% reduce 0%
19/06/21 21:12:09 INFO mapreduce.Job:  map 100% reduce 0%
19/06/21 21:12:17 INFO mapreduce.Job:  map 100% reduce 67%
19/06/21 21:12:18 INFO mapreduce.Job:  map 100% reduce 100%
19/06/21 21:12:18 INFO mapreduce.Job: Job job_1561122558196_0002 completed successfully
19/06/21 21:12:18 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=1182
        FILE: Number of bytes written=556758
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=636491
        HDFS: Number of bytes written=31
        HDFS: Number of read operations=15
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=6
    Job Counters 
        Launched map tasks=2
        Launched reduce tasks=3
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=8635
        Total time spent by all reduces in occupied slots (ms)=16566
        Total time spent by all map tasks (ms)=8635
        Total time spent by all reduce tasks (ms)=16566
        Total vcore-milliseconds taken by all map tasks=8635
        Total vcore-milliseconds taken by all reduce tasks=16566
        Total megabyte-milliseconds taken by all map tasks=8842240
        Total megabyte-milliseconds taken by all reduce tasks=16963584
    Map-Reduce Framework
        Map input records=2866
        Map output records=97
        Map output bytes=970
        Map output materialized bytes=1200
        Input split bytes=188
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=1200
        Reduce input records=97
        Reduce output records=3
        Spilled Records=194
        Shuffled Maps =6
        Failed Shuffles=0
        Merged Map outputs=6
        GC time elapsed (ms)=1042
        CPU time spent (ms)=9350
        Physical memory (bytes) snapshot=1057755136
        Virtual memory (bytes) snapshot=10571837440
        Total committed heap usage (bytes)=654311424
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=636303
    File Output Format Counters 
        Bytes Written=31
19/06/21 21:12:18 INFO streaming.StreamJob: Output directory: /output_file_broadcast
[root@master mr_file_broadcast]# hadoop fs -ls /
Found 6 items
-rw-r--r--   3 root supergroup    1896621 2019-06-21 13:54 /1.data
-rw-r--r--   3 root supergroup     632207 2019-06-21 21:11 /The_Man_of_Property.txt
drwxr-xr-x   - root supergroup          0 2019-06-21 14:03 /output
drwxr-xr-x   - root supergroup          0 2019-06-21 21:12 /output_file_broadcast
-rw-r--r--   3 root supergroup        944 2019-06-20 19:50 /passwd
drwx------   - root supergroup          0 2019-06-21 14:03 /tmp
[root@master mr_file_broadcast]# hadoop fs -ls /output_file_broadcast/
Found 4 items
-rw-r--r--   3 root supergroup          0 2019-06-21 21:12 /output_file_broadcast/_SUCCESS
-rw-r--r--   3 root supergroup         22 2019-06-21 21:12 /output_file_broadcast/part-00000
-rw-r--r--   3 root supergroup          9 2019-06-21 21:12 /output_file_broadcast/part-00001
-rw-r--r--   3 root supergroup          0 2019-06-21 21:12 /output_file_broadcast/part-00002
[root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/part-00000
against 93
suitable    2
[root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/part-00001
recent  2
[root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/part-00002

可以到其他机器上查看执行的任务,除了web界面yarn可以查看,也通过在系统中查看。

查看执行过程中任务
[root@slave1 tmp]# pwd
/usr/local/hadoop-2.6.5/tmp
[root@slave1 tmp]# tree .
.
└── nm-local-dir
    ├── filecache
    ├── nmPrivate
    │   └── application_1561122558196_0003
    │       ├── container_1561122558196_0003_01_000001
    │       │   ├── container_1561122558196_0003_01_000001.pid
    │       │   ├── container_1561122558196_0003_01_000001.tokens
    │       │   └── launch_container.sh
    │       ├── container_1561122558196_0003_01_000002
    │       │   ├── container_1561122558196_0003_01_000002.pid
    │       │   ├── container_1561122558196_0003_01_000002.tokens
    │       │   └── launch_container.sh
    │       └── container_1561122558196_0003_01_000003
    │           ├── container_1561122558196_0003_01_000003.pid
    │           ├── container_1561122558196_0003_01_000003.tokens
    │           └── launch_container.sh
    └── usercache
        └── root
            ├── appcache
            │   └── application_1561122558196_0003
            │       ├── container_1561122558196_0003_01_000001
            │       │   ├── container_tokens
            │       │   ├── default_container_executor_session.sh
            │       │   ├── default_container_executor.sh
            │       │   ├── job.jar -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/11/job.jar
            │       │   ├── jobSubmitDir
            │       │   │   ├── job.split -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/12/job.split
            │       │   │   └── job.splitmetainfo -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/10/job.splitmetainfo
            │       │   ├── job.xml -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/13/job.xml
            │       │   ├── launch_container.sh
            │       │   ├── map.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/13/map.py
            │       │   ├── red.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/15/red.py
            │       │   ├── tmp
            │       │   └── white_list -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/14/white_list
            │       ├── container_1561122558196_0003_01_000002
            │       │   ├── container_tokens
            │       │   ├── default_container_executor_session.sh
            │       │   ├── default_container_executor.sh
            │       │   ├── job.jar -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/11/job.jar
            │       │   ├── job.xml
            │       │   ├── launch_container.sh
            │       │   ├── map.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/13/map.py
            │       │   ├── red.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/15/red.py
            │       │   ├── tmp
            │       │   └── white_list -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/14/white_list
            │       ├── container_1561122558196_0003_01_000003
            │       │   ├── container_tokens
            │       │   ├── default_container_executor_session.sh
            │       │   ├── default_container_executor.sh
            │       │   ├── job.jar -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/11/job.jar
            │       │   ├── job.xml
            │       │   ├── launch_container.sh
            │       │   ├── map.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/13/map.py
            │       │   ├── red.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/15/red.py
            │       │   ├── tmp
            │       │   └── white_list -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/14/white_list
            │       ├── filecache
            │       │   ├── 10
            │       │   │   └── job.splitmetainfo
            │       │   ├── 11
            │       │   │   └── job.jar
            │       │   │       └── job.jar
            │       │   ├── 12
            │       │   │   └── job.split
            │       │   └── 13
            │       │       └── job.xml
            │       └── work
            └── filecache
                ├── 10
                │   └── map.py
                ├── 11
                │   └── white_list
                ├── 12
                │   └── red.py
                ├── 13
                │   └── map.py
                ├── 14
                │   └── white_list
                └── 15
                    └── red.py

35 directories, 45 files

2.3 cachefile

  指定计算白名单内单词的wordcount,如果文件(如字典文件)存放在HDFS中,希望计算时在每个计算节点上将 文件当作本地文件处理,可以使用-cacheFile,hdfs://host:port/path/to/file#linkname选项在计算节点缓存文件, Streaming程序通过./linkname访问文件。

[root@master mr_cachefile_broadcast]# pwd
/root/mr_cachefile_broadcast
[root@master mr_cachefile_broadcast]# hadoop fs -put white_list /
[root@master mr_cachefile_broadcast]# hadoop fs -ls /
Found 7 items
-rw-r--r--   3 root supergroup    1896621 2019-06-21 13:54 /1.data
-rw-r--r--   3 root supergroup     632207 2019-06-21 21:11 /The_Man_of_Property.txt
drwxr-xr-x   - root supergroup          0 2019-06-21 14:03 /output
drwxr-xr-x   - root supergroup          0 2019-06-21 21:38 /output_file_broadcast
-rw-r--r--   3 root supergroup        944 2019-06-20 19:50 /passwd
drwx------   - root supergroup          0 2019-06-21 14:03 /tmp
-rw-r--r--   3 root supergroup         24 2019-06-21 21:57 /white_list

map.py
#!/usr/bin/python
import sys

def read_local_file_func(f):
    word_set = set()
    file_in = open(f, 'r')
    for line in file_in:
        word = line.strip()
        word_set.add(word)
    return word_set


def mapper_func(white_list_fd):
    word_set = read_local_file_func(white_list_fd)

    for line in sys.stdin:
        ss = line.strip().split(' ')
        for s in ss:
            word = s.strip()
            if word != "" and (word in word_set):
                print "%s\t%s" % (s, 1)



if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)
red.py
#!/usr/bin/python

import sys

def reduer_func():
    current_word = None
    count_pool = []
    sum = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if current_word == None:
            current_word = word

        if current_word != word:
            for count in count_pool:
                sum += count
            print "%s\t%s" % (current_word, sum)
            current_word = word
            count_pool = []
            sum = 0

        count_pool.append(int(val))

    for count in count_pool:
        sum += count
    print "%s\t%s" % (current_word, str(sum))



if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)
run.sh
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
OUTPUT_PATH="/output_cachefile_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func ABC" \
    -reducer "python red.py reduer_func" \
    -jobconf "mapred.reduce.tasks=2" \
    -jobconf  "mapred.job.name=cachefile_demo" \
    -cacheFile "hdfs://master:9000/white_list#ABC" \
    -file "./map.py" \
    -file "./red.py"

    #-cacheFile "$HDFS_FILE_PATH#WH" \

hdfs://master:9000/white_list#ABC,期中#后面的ABC就是表示white_list,上面调用的时候,直接使用ABC替代,ABC可以随便命名最好顾名思义。

hadoop运行
[root@master mr_cachefile_broadcast]# bash run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
rmr: `/output_cachefile_broadcast': No such file or directory
19/06/21 22:08:16 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/21 22:08:16 WARN streaming.StreamJob: -cacheFile option is deprecated, please use -files instead.
19/06/21 22:08:16 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/21 22:08:16 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
19/06/21 22:08:16 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [./map.py, ./red.py, /tmp/hadoop-unjar8224574872481683209/] [] /tmp/streamjob7921590089314550150.jar tmpDir=null
19/06/21 22:08:17 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/21 22:08:17 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/21 22:08:17 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/21 22:08:17 INFO mapreduce.JobSubmitter: number of splits:2
19/06/21 22:08:18 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561122558196_0005
19/06/21 22:08:18 INFO impl.YarnClientImpl: Submitted application application_1561122558196_0005
19/06/21 22:08:18 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561122558196_0005/
19/06/21 22:08:18 INFO mapreduce.Job: Running job: job_1561122558196_0005
19/06/21 22:20:45 INFO mapreduce.Job: Job job_1561122558196_0005 running in uber mode : false
19/06/21 22:20:45 INFO mapreduce.Job:  map 0% reduce 0%
19/06/21 22:20:49 INFO mapreduce.Job:  map 100% reduce 0%
19/06/21 22:20:55 INFO mapreduce.Job:  map 100% reduce 100%
19/06/21 22:20:55 INFO mapreduce.Job: Job job_1561122558196_0005 completed successfully
19/06/21 22:20:56 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=1176
        FILE: Number of bytes written=445494
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=636491
        HDFS: Number of bytes written=31
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Job Counters 
        Launched map tasks=2
        Launched reduce tasks=2
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=5072
        Total time spent by all reduces in occupied slots (ms)=6478
        Total time spent by all map tasks (ms)=5072
        Total time spent by all reduce tasks (ms)=6478
        Total vcore-milliseconds taken by all map tasks=5072
        Total vcore-milliseconds taken by all reduce tasks=6478
        Total megabyte-milliseconds taken by all map tasks=5193728
        Total megabyte-milliseconds taken by all reduce tasks=6633472
    Map-Reduce Framework
        Map input records=2866
        Map output records=97
        Map output bytes=970
        Map output materialized bytes=1188
        Input split bytes=188
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=1188
        Reduce input records=97
        Reduce output records=3
        Spilled Records=194
        Shuffled Maps =4
        Failed Shuffles=0
        Merged Map outputs=4
        GC time elapsed (ms)=268
        CPU time spent (ms)=3520
        Physical memory (bytes) snapshot=880959488
        Virtual memory (bytes) snapshot=8461725696
        Total committed heap usage (bytes)=558891008
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=636303
    File Output Format Counters 
        Bytes Written=31
19/06/21 22:20:56 INFO streaming.StreamJob: Output directory: /output_cachefile_broadcast

2.4 cacheArchive

  如果要分发的文件有目录结构,可以先将整个目录打包,然后上传到HDFS,再用-cacheArchive, hdfs://host:port/path/to/archivefile#linkname分发压缩包。

[root@master tp2]# tree .
.
├── map.py
├── red.py
├── run.sh
└── tmp
    ├── white_list_1
    └── white_list_2

1 directory, 5 files

[root@master tmp]# cat white_list_1
the
man
give
[root@master tmp]# cat white_list_2
have
big
[root@master tmp]# tar zcf white_list.tar.gz *
[root@master tmp]# ls
white_list_1  white_list_2  white_list.tar.gz
[root@master tmp]# hadoop fs -put white_list.tar.gz /
[root@master tmp]# hadoop fs -ls /white_list.tar.gz 
-rw-r--r--   3 root supergroup        159 2019-06-29 21:00 /white_list.tar.gz
[root@master tmp]# cd ..
[root@master tp2]# ls
map.py  red.py  run.sh  tmp
map.py
#!/usr/bin/python

import os
import sys
import gzip
import time

def get_file_handler(f):
    file_in = open(f, 'r')
    return file_in

def get_cachefile_handlers(f):
    f_handlers_list = []
    if os.path.isdir(f):
        for fd in os.listdir(f):
            f_handlers_list.append(get_file_handler(f + '/' + fd))
    return f_handlers_list


def read_local_file_func(f):
    word_set = set()
    for cachefile in get_cachefile_handlers(f):
        for line in cachefile:
            word = line.strip()
            word_set.add(word)
    return word_set


def mapper_func(white_list_fd):
    word_set = read_local_file_func(white_list_fd)

    for line in sys.stdin:
    #time.sleep(100)
        ss = line.strip().split(' ')
        for s in ss:
            word = s.strip()
            if word != "" and (word in word_set):
                print "%s\t%s" % (s, 1)



if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)
red.py
#!/usr/bin/python

import sys

def reduer_func():
    current_word = None
    count_pool = []
    sum = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if current_word == None:
            current_word = word

        if current_word != word:
            for count in count_pool:
                sum += count
            print "%s\t%s" % (current_word, sum)
            current_word = word
            count_pool = []
            sum = 0

        count_pool.append(int(val))

    for count in count_pool:
        sum += count
    print "%s\t%s" % (current_word, str(sum))



if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)
run.sh
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_1="/cachefilearchive.txt"
OUTPUT_PATH="/mp_output_cachearchive_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func WH.gz" \
    -reducer "python red.py reduer_func" \
    -jobconf "mapred.reduce.tasks=2" \
    -jobconf  "mapred.job.name=cache_file_archive_demo" \
    -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \
    -file "./map.py" \
    -file "./red.py"

解释

cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" 
mapper "python map.py mapper_func WH.gz" 
1. 期中WH.gz 就表示#前面的文件,就是个别名,理论上随便你起,最好顾名思义
2. 调用 WH.gz,也就是调用hdfs://master:9000/w.tar.gz
hadoop 运行
[root@master tp2]# sh run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
Deleted /mp_output_cachearchive_broadcast
    19/06/29 22:29:21 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/29 22:29:22 WARN streaming.StreamJob: -cacheArchive option is deprecated, please use -archives instead.
19/06/29 22:29:22 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/29 22:29:22 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
19/06/29 22:29:22 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [./map.py, ./red.py, /tmp/hadoop-unjar4879077363844241273/] [] /tmp/streamjob4932232602831946314.jar tmpDir=null
19/06/29 22:29:22 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/29 22:29:22 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/29 22:29:23 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/29 22:29:23 INFO mapreduce.JobSubmitter: number of splits:2
19/06/29 22:29:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0003
19/06/29 22:29:23 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0003
19/06/29 22:29:23 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0003/
19/06/29 22:29:23 INFO mapreduce.Job: Running job: job_1561818301923_0003
19/06/29 22:29:28 INFO mapreduce.Job: Job job_1561818301923_0003 running in uber mode : false
19/06/29 22:29:28 INFO mapreduce.Job:  map 0% reduce 0%
19/06/29 22:29:33 INFO mapreduce.Job:  map 100% reduce 0%
19/06/29 22:29:40 INFO mapreduce.Job:  map 100% reduce 50%
19/06/29 22:29:41 INFO mapreduce.Job:  map 100% reduce 100%
19/06/29 22:29:41 INFO mapreduce.Job: Job job_1561818301923_0003 completed successfully
19/06/29 22:29:41 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=181
        FILE: Number of bytes written=446268
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=460
        HDFS: Number of bytes written=26
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Job Counters 
        Killed map tasks=1
        Launched map tasks=2
        Launched reduce tasks=2
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=5701
        Total time spent by all reduces in occupied slots (ms)=8328
        Total time spent by all map tasks (ms)=5701
        Total time spent by all reduce tasks (ms)=8328
        Total vcore-milliseconds taken by all map tasks=5701
        Total vcore-milliseconds taken by all reduce tasks=8328
        Total megabyte-milliseconds taken by all map tasks=5837824
        Total megabyte-milliseconds taken by all reduce tasks=8527872
    Map-Reduce Framework
        Map input records=4
        Map output records=20
        Map output bytes=129
        Map output materialized bytes=193
        Input split bytes=182
        Combine input records=0
        Combine output records=0
        Reduce input groups=4
        Reduce shuffle bytes=193
        Reduce input records=20
        Reduce output records=4
        Spilled Records=40
        Shuffled Maps =4
        Failed Shuffles=0
        Merged Map outputs=4
        GC time elapsed (ms)=251
        CPU time spent (ms)=6840
        Physical memory (bytes) snapshot=832233472
        Virtual memory (bytes) snapshot=8458309632
        Total committed heap usage (bytes)=553123840
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=278
    File Output Format Counters 
        Bytes Written=26
19/06/29 22:29:41 INFO streaming.StreamJob: Output directory: /mp_output_cachearchive_broadcast
杀死MapReduce进程
因为是后台启动,虽然脚本停止了,但是后台还在运行。
[root@master tp2]# bash run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
Deleted /mp_output_cachearchive_broadcast
19/06/29 22:38:16 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/29 22:38:16 WARN streaming.StreamJob: -cacheArchive option is deprecated, please use -archives instead.
19/06/29 22:38:16 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/29 22:38:16 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
19/06/29 22:38:16 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [./map.py, ./red.py, /tmp/hadoop-unjar5517120324769277489/] [] /tmp/streamjob7237556585252410281.jar tmpDir=null
19/06/29 22:38:16 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/29 22:38:17 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/29 22:38:17 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/29 22:38:17 INFO mapreduce.JobSubmitter: number of splits:2
19/06/29 22:38:18 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0004
19/06/29 22:38:18 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0004
19/06/29 22:38:18 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0004/
19/06/29 22:38:18 INFO mapreduce.Job: Running job: job_1561818301923_0004
19/06/29 22:38:23 INFO mapreduce.Job: Job job_1561818301923_0004 running in uber mode : false
19/06/29 22:38:23 INFO mapreduce.Job:  map 0% reduce 0%
19/06/29 22:38:33 INFO mapreduce.Job:  map 67% reduce 0%
^C[root@master tp2]# hadoop job -kill job_1561818301923_0004
DEPRECATED: Use of this script to execute mapred command is deprecated.
Instead use the mapred command for it.

19/06/29 22:40:32 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
Killed job job_1561818301923_0004

2.5 compression

  • 输出数据量较大时,可以使用Hadoop提供的压缩机制对数据进行压缩,减少网络传输带宽和存储的消耗。
  • 可以指定对map的输出也就是中间结果进行压缩
  • 可以指定对reduce的输出也就是最终输出进行压缩
  • 可以指定是否压缩以及采用哪种压缩方式。
  • 对map输出进行压缩主要是为了减少shuffle过程中网络传输数据量
  • 对reduce输出进行压缩主要是减少输出结果占用的HDFS存储。
[root@master mr_compression]# ls
map.py  red.py  run_2.sh  run.sh
map.py
#!/usr/bin/python

import os
import sys
import gzip


def get_file_handler(f):
    file_in = open(f, 'r')
    return file_in

def get_cachefile_handlers(f):
    f_handlers_list = []
    if os.path.isdir(f):
        for fd in os.listdir(f):
            f_handlers_list.append(get_file_handler(f + '/' + fd))
    return f_handlers_list


def read_local_file_func(f):
    word_set = set()
    for cachefile in get_cachefile_handlers(f):
        for line in cachefile:
            word = line.strip()
            word_set.add(word)
    return word_set


def mapper_func(white_list_fd):
    word_set = read_local_file_func(white_list_fd)

    for line in sys.stdin:
        ss = line.strip().split(' ')
        for s in ss:
            word = s.strip()
            #if word != "" and (word in word_set):
            if word != "":
                print "%s\t%s" % (s, 1)

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)
red.py
#!/usr/bin/python

import sys

def reduer_func():
    current_word = None
    count_pool = []
    sum = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if current_word == None:
            current_word = word

        if current_word != word:
            for count in count_pool:
                sum += count
            print "%s\t%s" % (current_word, sum)
            current_word = word
            count_pool = []
            sum = 0

        count_pool.append(int(val))

    for count in count_pool:
        sum += count
    print "%s\t%s" % (current_word, str(sum))



if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)
run.sh
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
OUTPUT_PATH="/output_cachearchive_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func WH.gz" \
    -reducer "python red.py reduer_func" \
    -jobconf "mapred.reduce.tasks=10" \
    -jobconf  "mapred.job.name=cachefile_demo" \
    -jobconf  "mapred.compress.map.output=true" \
    -jobconf  "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \
    -jobconf  "mapred.output.compress=true" \
    -jobconf  "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \
    -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \
    -file "./map.py" \
    -file "./red.py"
解释
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
OUTPUT_PATH="/output_cachearchive_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func WH.gz" \
    -reducer "python red.py reduer_func" \
    -jobconf "mapred.reduce.tasks=10" \   # 最终结果可以看到10个压缩文件
    -jobconf  "mapred.job.name=cachefile_demo" \ # 任务名字,会在yarn后台 web上显示的名字
    -jobconf  "mapred.compress.map.output=true" \ # 开启压缩,压缩格式为gzip
    -jobconf  "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \
    -jobconf  "mapred.output.compress=true" \  # 输出开启压缩,压缩格式为gzip
    -jobconf  "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \
    -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \   # 将HDFS上已有的压缩文件分发给Task
    -file "./map.py" \ # 分发本地的map程序到计算节点
    -file "./red.py"  # 分发本地的reduce程序到计算节点
hadoop 运行
[root@master mr_compression]# sh run
sh: run: No such file or directory
[root@master mr_compression]# sh run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
rmr: `/output_cachearchive_broadcast': No such file or directory
19/06/29 23:22:04 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/29 23:22:05 WARN streaming.StreamJob: -cacheArchive option is deprecated, please use -archives instead.
19/06/29 23:22:05 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/29 23:22:05 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
19/06/29 23:22:05 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
19/06/29 23:22:05 INFO Configuration.deprecation: mapred.compress.map.output is deprecated. Instead, use mapreduce.map.output.compress
19/06/29 23:22:05 INFO Configuration.deprecation: mapred.map.output.compression.codec is deprecated. Instead, use mapreduce.map.output.compress.codec
19/06/29 23:22:05 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
19/06/29 23:22:05 INFO Configuration.deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec
packageJobJar: [./map.py, ./red.py, /tmp/hadoop-unjar8973310826152543911/] [] /tmp/streamjob4114282878628426821.jar tmpDir=null
19/06/29 23:22:05 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/29 23:22:05 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/29 23:22:06 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/29 23:22:06 INFO mapreduce.JobSubmitter: number of splits:2
19/06/29 23:22:06 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0005
19/06/29 23:22:07 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0005
19/06/29 23:22:07 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0005/
19/06/29 23:22:07 INFO mapreduce.Job: Running job: job_1561818301923_0005
19/06/29 23:22:12 INFO mapreduce.Job: Job job_1561818301923_0005 running in uber mode : false
19/06/29 23:22:12 INFO mapreduce.Job:  map 0% reduce 0%
19/06/29 23:22:21 INFO mapreduce.Job:  map 100% reduce 0%
19/06/29 23:22:37 INFO mapreduce.Job:  map 100% reduce 10%
19/06/29 23:22:41 INFO mapreduce.Job:  map 100% reduce 17%
19/06/29 23:22:42 INFO mapreduce.Job:  map 100% reduce 27%
19/06/29 23:22:44 INFO mapreduce.Job:  map 100% reduce 47%
19/06/29 23:22:46 INFO mapreduce.Job:  map 100% reduce 70%
19/06/29 23:22:49 INFO mapreduce.Job:  map 100% reduce 90%
19/06/29 23:22:50 INFO mapreduce.Job:  map 100% reduce 100%
19/06/29 23:22:50 INFO mapreduce.Job: Job job_1561818301923_0005 completed successfully
19/06/29 23:22:50 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=114091
        FILE: Number of bytes written=1594014
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=636491
        HDFS: Number of bytes written=80938
        HDFS: Number of read operations=36
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=20
    Job Counters 
        Killed reduce tasks=1
        Launched map tasks=2
        Launched reduce tasks=11
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=14495
        Total time spent by all reduces in occupied slots (ms)=200288
        Total time spent by all map tasks (ms)=14495
        Total time spent by all reduce tasks (ms)=200288
        Total vcore-milliseconds taken by all map tasks=14495
        Total vcore-milliseconds taken by all reduce tasks=200288
        Total megabyte-milliseconds taken by all map tasks=14842880
        Total megabyte-milliseconds taken by all reduce tasks=205094912
    Map-Reduce Framework
        Map input records=2866
        Map output records=111783
        Map output bytes=852872
        Map output materialized bytes=142941
        Input split bytes=188
        Combine input records=0
        Combine output records=0
        Reduce input groups=16984
        Reduce shuffle bytes=142941
        Reduce input records=111783
        Reduce output records=16984
        Spilled Records=223566
        Shuffled Maps =20
        Failed Shuffles=0
        Merged Map outputs=20
        GC time elapsed (ms)=5246
        CPU time spent (ms)=50310
        Physical memory (bytes) snapshot=2004160512
        Virtual memory (bytes) snapshot=25431437312
        Total committed heap usage (bytes)=1276641280
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=636303
    File Output Format Counters 
        Bytes Written=80938
19/06/29 23:22:50 INFO streaming.StreamJob: Output directory: /output_cachearchive_broadcast

查看job运行完成后的reduce结果
[root@master mr_compression]# hadoop fs -ls /output_cachearchive_broadcast
Found 11 items
-rw-r--r--   3 root supergroup          0 2019-06-29 23:29 /output_cachearchive_broadcast/_SUCCESS
-rw-r--r--   3 root supergroup       8091 2019-06-29 23:29 /output_cachearchive_broadcast/part-00000.gz
-rw-r--r--   3 root supergroup       7949 2019-06-29 23:29 /output_cachearchive_broadcast/part-00001.gz
-rw-r--r--   3 root supergroup       8067 2019-06-29 23:29 /output_cachearchive_broadcast/part-00002.gz
-rw-r--r--   3 root supergroup       8244 2019-06-29 23:29 /output_cachearchive_broadcast/part-00003.gz
-rw-r--r--   3 root supergroup       7913 2019-06-29 23:29 /output_cachearchive_broadcast/part-00004.gz
-rw-r--r--   3 root supergroup       8147 2019-06-29 23:29 /output_cachearchive_broadcast/part-00005.gz
-rw-r--r--   3 root supergroup       7912 2019-06-29 23:29 /output_cachearchive_broadcast/part-00006.gz
-rw-r--r--   3 root supergroup       8257 2019-06-29 23:29 /output_cachearchive_broadcast/part-00007.gz
-rw-r--r--   3 root supergroup       8349 2019-06-29 23:29 /output_cachearchive_broadcast/part-00008.gz
-rw-r--r--   3 root supergroup       8009 2019-06-29 23:29 /output_cachearchive_broadcast/part-00009.gz

2.6 全局排序 单reduce

  输出-单reduce

[root@master mr_allsort_1reduce_python]# ls
a.txt  b.txt  map_sort.py  red_sort.py  run.sh
[root@master mr_allsort_1reduce_python]# cat map_sort.py 
#!/usr/local/bin/python

import sys

#base_count = 10000
base_count = 99999

for line in sys.stdin:
    ss = line.strip().split('\t')
    key = ss[0]
    val = ss[1]

    new_key = base_count - int(key)
    #new_key = base_count + int(key)
    print "%s\t%s" % (new_key, val)

[root@master mr_allsort_1reduce_python]# cat red_sort.py 
#!/usr/local/bin/python

import sys

#base_value = 10000
base_value = 99999

for line in sys.stdin:
    key, val = line.strip().split('\t')
    #print str(int(key) - base_value) + "\t" + val
    print str(base_value - int(key)) + "\t" + val


[root@master mr_allsort_1reduce_python]# cat run.sh 
#set -e -x
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_A="/a.txt"
INPUT_FILE_PATH_B="/b.txt"

OUTPUT_SORT_PATH="/output_sort"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH

# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\
    -output $OUTPUT_SORT_PATH \
    -mapper "python map_sort.py" \
    -reducer "python red_sort.py" \
    -jobconf "mapred.reduce.tasks=1" \
    -file ./map_sort.py \
    -file ./red_sort.py \

升降序

降序: print str(int(key) - base_value) + "\t" + val
升序: print str(int(key) + base_value) + "\t" + val
hadoop运行
[root@master mr_allsort_1reduce_python]# sh run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
rmr: `/output_sort': No such file or directory
19/06/30 00:14:35 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/30 00:14:36 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/30 00:14:36 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
packageJobJar: [./map_sort.py, ./red_sort.py, /tmp/hadoop-unjar5718119915866454712/] [] /tmp/streamjob1423052824935220100.jar tmpDir=null
19/06/30 00:14:36 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/30 00:14:36 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/30 00:14:37 INFO mapreduce.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn/staging/root/.staging/job_1561818301923_0007
19/06/30 00:14:37 ERROR streaming.StreamJob: Error Launching job : Input path does not exist: hdfs://master:9000/a.txt
Input path does not exist: hdfs://master:9000/b.txt
Streaming Command Failed!
[root@master mr_allsort_1reduce_python]# ls
a.txt  b.txt  map_sort.py  red_sort.py  run.sh
[root@master mr_allsort_1reduce_python]# hadoop fs -put a.txt  b.txt /
[root@master mr_allsort_1reduce_python]# sh run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
rmr: `/output_sort': No such file or directory
19/06/30 00:15:03 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/30 00:15:03 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/30 00:15:03 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
packageJobJar: [./map_sort.py, ./red_sort.py, /tmp/hadoop-unjar5542916557294836465/] [] /tmp/streamjob810285156858649674.jar tmpDir=null
19/06/30 00:15:04 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/30 00:15:04 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/30 00:15:04 INFO mapred.FileInputFormat: Total input paths to process : 2
19/06/30 00:15:04 INFO mapreduce.JobSubmitter: number of splits:3
19/06/30 00:15:04 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0008
19/06/30 00:15:05 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0008
19/06/30 00:15:05 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0008/
19/06/30 00:15:05 INFO mapreduce.Job: Running job: job_1561818301923_0008
19/06/30 00:15:10 INFO mapreduce.Job: Job job_1561818301923_0008 running in uber mode : false
19/06/30 00:15:10 INFO mapreduce.Job:  map 0% reduce 0%
19/06/30 00:15:17 INFO mapreduce.Job:  map 33% reduce 0%
19/06/30 00:15:18 INFO mapreduce.Job:  map 100% reduce 0%
19/06/30 00:15:22 INFO mapreduce.Job:  map 100% reduce 100%
19/06/30 00:15:22 INFO mapreduce.Job: Job job_1561818301923_0008 completed successfully
19/06/30 00:15:22 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=1419
        FILE: Number of bytes written=445371
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=1173
        HDFS: Number of bytes written=899
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=3
        Launched reduce tasks=1
        Data-local map tasks=3
        Total time spent by all maps in occupied slots (ms)=16708
        Total time spent by all reduces in occupied slots (ms)=2409
        Total time spent by all map tasks (ms)=16708
        Total time spent by all reduce tasks (ms)=2409
        Total vcore-milliseconds taken by all map tasks=16708
        Total vcore-milliseconds taken by all reduce tasks=2409
        Total megabyte-milliseconds taken by all map tasks=17108992
        Total megabyte-milliseconds taken by all reduce tasks=2466816
    Map-Reduce Framework
        Map input records=101
        Map output records=101
        Map output bytes=1211
        Map output materialized bytes=1431
        Input split bytes=228
        Combine input records=0
        Combine output records=0
        Reduce input groups=101
        Reduce shuffle bytes=1431
        Reduce input records=101
        Reduce output records=101
        Spilled Records=202
        Shuffled Maps =3
        Failed Shuffles=0
        Merged Map outputs=3
        GC time elapsed (ms)=660
        CPU time spent (ms)=3980
        Physical memory (bytes) snapshot=931897344
        Virtual memory (bytes) snapshot=8446369792
        Total committed heap usage (bytes)=674758656
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=945
    File Output Format Counters 
        Bytes Written=899
19/06/30 00:15:22 INFO streaming.StreamJob: Output directory: /output_sort
[root@master mr_allsort_1reduce_python]# hadoop fs -ls /output_sort
Found 2 items
-rw-r--r--   3 root supergroup          0 2019-06-30 00:15 /output_sort/_SUCCESS
-rw-r--r--   3 root supergroup        899 2019-06-30 00:15 /output_sort/part-00000
[root@master mr_allsort_1reduce_python]# hadoop fs -text /output_sort/part-00000|head
100 java
99  hadoop
98  java
97  hadoop
96  java
95  hadoop
94  java
93  hadoop
92  java
91  hadoop

2.7 全部排序 多reduce

[root@master mr_allsort_python]# ls
a.txt  b.txt  map_sort.py  red_sort.py  run.sh
[root@master mr_allsort_python]# cat a.txt 
1   hadoop
3   hadoop
5   hadoop
7   hadoop
9   hadoop
[root@master mr_allsort_python]# cat b.txt 
0   java
2   java
4   java
6   java
8   java
10  java
[root@master mr_allsort_python]# cat map_sort.py 
#!/usr/local/bin/python

import sys

base_count = 10000

for line in sys.stdin:
    ss = line.strip().split('\t')
    key = ss[0]
    val = ss[1]

    new_key = base_count + int(key)

    red_idx = 1
    if new_key < (10000 + 10010) / 2:
        red_idx = 0

    print "%s\t%s\t%s" % (red_idx, new_key, val)

[root@master mr_allsort_python]# cat red_sort.py 
#!/usr/local/bin/python

import sys

base_count = 10000

for line in sys.stdin:
    idx_id, key, val = line.strip().split('\t')

    new_key = int(key) - base_count
    print '\t'.join([str(new_key), val])

[root@master mr_allsort_python]# cat run.sh 
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_A="/a.txt"
INPUT_FILE_PATH_B="/b.txt"

OUTPUT_SORT_PATH="/cmz_output_sort"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH

# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\
    -output $OUTPUT_SORT_PATH \
    -mapper "python map_sort.py" \
    -reducer "python red_sort.py" \
    -file ./map_sort.py \
    -file ./red_sort.py \
    -jobconf mapred.reduce.tasks=2 \
    -jobconf stream.num.map.output.key.fields=2 \
    -jobconf num.key.fields.for.partition=1 \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
运行
[root@master mr_allsort_python]# sh run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
Deleted /cmz_output_sort
19/06/30 12:01:12 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/30 12:01:12 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/30 12:01:12 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
packageJobJar: [./map_sort.py, ./red_sort.py, /tmp/hadoop-unjar2785950129081995646/] [] /tmp/streamjob990761707239727652.jar tmpDir=null
19/06/30 12:01:12 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/30 12:01:13 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/30 12:01:13 INFO mapred.FileInputFormat: Total input paths to process : 2
19/06/30 12:01:13 INFO mapreduce.JobSubmitter: number of splits:2
19/06/30 12:01:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0015
19/06/30 12:01:13 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0015
19/06/30 12:01:13 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0015/
19/06/30 12:01:13 INFO mapreduce.Job: Running job: job_1561818301923_0015
19/06/30 12:01:17 INFO mapreduce.Job: Job job_1561818301923_0015 running in uber mode : false
19/06/30 12:01:17 INFO mapreduce.Job:  map 0% reduce 0%
19/06/30 12:01:23 INFO mapreduce.Job:  map 100% reduce 0%
19/06/30 12:01:28 INFO mapreduce.Job:  map 100% reduce 100%
19/06/30 12:01:28 INFO mapreduce.Job: Job job_1561818301923_0015 completed successfully
19/06/30 12:01:28 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=187
        FILE: Number of bytes written=444696
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=240
        HDFS: Number of bytes written=88
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Job Counters 
        Launched map tasks=2
        Launched reduce tasks=2
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=4630
        Total time spent by all reduces in occupied slots (ms)=5008
        Total time spent by all map tasks (ms)=4630
        Total time spent by all reduce tasks (ms)=5008
        Total vcore-milliseconds taken by all map tasks=4630
        Total vcore-milliseconds taken by all reduce tasks=5008
        Total megabyte-milliseconds taken by all map tasks=4741120
        Total megabyte-milliseconds taken by all reduce tasks=5128192
    Map-Reduce Framework
        Map input records=11
        Map output records=11
        Map output bytes=153
        Map output materialized bytes=199
        Input split bytes=152
        Combine input records=0
        Combine output records=0
        Reduce input groups=11
        Reduce shuffle bytes=199
        Reduce input records=11
        Reduce output records=11
        Spilled Records=22
        Shuffled Maps =4
        Failed Shuffles=0
        Merged Map outputs=4
        GC time elapsed (ms)=207
        CPU time spent (ms)=3010
        Physical memory (bytes) snapshot=834572288
        Virtual memory (bytes) snapshot=8461389824
        Total committed heap usage (bytes)=573571072
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=88
    File Output Format Counters 
        Bytes Written=88
19/06/30 12:01:28 INFO streaming.StreamJob: Output directory: /cmz_output_sort
[root@master mr_allsort_python]# hadoop fs -text /cmz_output_sort/part-00001
5   hadoop
6   java
7   hadoop
8   java
9   hadoop
10  java
[root@master mr_allsort_python]# hadoop fs -text /cmz_output_sort/part-00000
0   java
1   hadoop
2   java
3   hadoop
4   java

2.8 自定义排序

partition:分桶过程,用户输出的key经过partition分发到不同的reduce里,因而partitioner就是分桶器,一般使用平台默认的hash分桶,也可以用户自己指定。

key:是需要排序的字段,相同分桶&&相同key的行,排序到一起。

例子:用来搭配不同的参数跑出真实作业的结果来演示这些参数的使用方法。

[root@master mr_allsort_python2]# ls
aaa.txt  map_sort.py  red_sort.py  run.sh
[root@master mr_allsort_python2]# hadoop fs -ls /aaa.txt
-rw-r--r--   3 root supergroup         60 2019-06-30 12:22 /aaa.txt
[root@master mr_allsort_python2]# cat aaa.txt 
d.1.5.23
e.9.4.5
e.5.9.22
e.5.1.45
e.5.1.23
a.7.2.6
f.8.3.3
[root@master mr_allsort_python2]# cat map_sort.py 
#!/usr/local/bin/python

import sys

for line in sys.stdin:
    ss = line.strip().split('\t')
    key = ss[0]
    val = ss[1]

    print "%s\t%s" % (key, val)

[root@master mr_allsort_python2]# cat red_sort.py 
#!/usr/local/bin/python

▽mport sys

for line in sys.stdin:
    print line.strip()

[root@master mr_allsort_python2]# cat run.sh 
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_A="/aaa.txt"

OUTPUT_SORT_PATH="/output_sort"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH

# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_A \
    -output $OUTPUT_SORT_PATH \
    -mapper "cat" \
    -reducer "cat" \
    -jobconf stream.num.map.output.key.fields=3 \
    -jobconf stream.map.output.field.separator=. \
    -jobconf mapred.text.key.partitioner.options=-k2,3 \
    -jobconf mapred.reduce.tasks=3

运行
[root@master mr_allsort_python2]# sh run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
Deleted /output_sort
19/06/30 12:29:37 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
packageJobJar: [/tmp/hadoop-unjar1891359708360072922/] [] /tmp/streamjob5747616032102106944.jar tmpDir=null
19/06/30 12:29:37 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/30 12:29:38 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.186.10:8032
19/06/30 12:29:38 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/30 12:29:38 INFO mapreduce.JobSubmitter: number of splits:2
19/06/30 12:29:38 INFO Configuration.deprecation: mapred.text.key.partitioner.options is deprecated. Instead, use mapreduce.partition.keypartitioner.options
19/06/30 12:29:38 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
19/06/30 12:29:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0016
19/06/30 12:29:39 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0016
19/06/30 12:29:39 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0016/
19/06/30 12:29:39 INFO mapreduce.Job: Running job: job_1561818301923_0016
19/06/30 12:29:44 INFO mapreduce.Job: Job job_1561818301923_0016 running in uber mode : false
19/06/30 12:29:44 INFO mapreduce.Job:  map 0% reduce 0%
19/06/30 12:29:48 INFO mapreduce.Job:  map 100% reduce 0%
19/06/30 12:29:53 INFO mapreduce.Job:  map 100% reduce 33%
19/06/30 12:29:54 INFO mapreduce.Job:  map 100% reduce 67%
19/06/30 12:29:55 INFO mapreduce.Job:  map 100% reduce 100%
19/06/30 12:29:55 INFO mapreduce.Job: Job job_1561818301923_0016 completed successfully
19/06/30 12:29:55 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=92
        FILE: Number of bytes written=548228
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=246
        HDFS: Number of bytes written=60
        HDFS: Number of read operations=15
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=6
    Job Counters 
        Killed reduce tasks=1
        Launched map tasks=2
        Launched reduce tasks=3
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=4835
        Total time spent by all reduces in occupied slots (ms)=8760
        Total time spent by all map tasks (ms)=4835
        Total time spent by all reduce tasks (ms)=8760
        Total vcore-milliseconds taken by all map tasks=4835
        Total vcore-milliseconds taken by all reduce tasks=8760
        Total megabyte-milliseconds taken by all map tasks=4951040
        Total megabyte-milliseconds taken by all reduce tasks=8970240
    Map-Reduce Framework
        Map input records=7
        Map output records=7
        Map output bytes=60
        Map output materialized bytes=110
        Input split bytes=156
        Combine input records=0
        Combine output records=0
        Reduce input groups=6
        Reduce shuffle bytes=110
        Reduce input records=7
        Reduce output records=7
        Spilled Records=14
        Shuffled Maps =6
        Failed Shuffles=0
        Merged Map outputs=6
        GC time elapsed (ms)=344
        CPU time spent (ms)=4810
        Physical memory (bytes) snapshot=976941056
        Virtual memory (bytes) snapshot=10577739776
        Total committed heap usage (bytes)=656932864
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=90
    File Output Format Counters 
        Bytes Written=60
19/06/30 12:29:55 INFO streaming.StreamJob: Output directory: /output_sort
[root@master mr_allsort_python2]# hadoop fs -ls /output
^[[A^Cls: `/output': No such file or directory
^[[A[root@master mr_allsort_python2]# hadoop fs -ls /output_sort
Found 4 items
-rw-r--r--   3 root supergroup          0 2019-06-30 12:29 /output_sort/_SUCCESS
-rw-r--r--   3 root supergroup          8 2019-06-30 12:29 /output_sort/part-00000
-rw-r--r--   3 root supergroup         26 2019-06-30 12:29 /output_sort/part-00001
-rw-r--r--   3 root supergroup         26 2019-06-30 12:29 /output_sort/part-00002
[root@master mr_allsort_python2]# hadoop fs -text /output_sort/part-00000
e.9.4   5
[root@master mr_allsort_python2]# hadoop fs -text /output_sort/part-00001
a.7.2   6
d.1.5   23
e.5.9   22
[root@master mr_allsort_python2]# hadoop fs -text /output_sort/part-00002
e.5.1   45
e.5.1   23
f.8.3   3

解释

[root@master mr_allsort_python2]# cat run.sh 
HADOOP_CMD="/usr/local/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_A="/aaa.txt"

OUTPUT_SORT_PATH="/output_sort"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH  # 删除存在的目录

# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_A \  # hdfs 源,数据输入的源
    -output $OUTPUT_SORT_PATH \  # hdfs 输出源,
    -mapper "cat" \  # 对输入不处理
    -reducer "cat" \ # 对输出不处理
    -jobconf stream.num.map.output.key.fields=3 \   # 源输入进行三等分
    -jobconf stream.map.output.field.separator=. \  # 以.为分割
    -jobconf mapred.text.key.partitioner.options=-k2,3 \  # 以第2,3列为一个桶排序也就是上面5.1这样的来分桶
    -jobconf mapred.reduce.tasks=3                        # 分三个桶

2.9 join

3. mapreduce 任务

  我们除了在系统中查看MapReduce任务。其实我们还可以通过yarn的web界面查看MapReduce任务。

后台地址:

http://master:8088/cluster

MapReduce