XDRush

Hadoop streaming使用总结(持续更新)

1 说明

随着大数据的发展,熟练掌握Hadoop和hadoop streaming已成为一个深度学习/机器学习工程师必备的技能。下面将自己在工作过程中接触到的hadoop streaming技巧做个记录,并持续更新中。

2 Hadoop应用技巧

2.1 Hadoop streaming实现二次排序

所谓二次排序是指先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序。在开发应用中往往有这样的需求:需要对key相同的所有value再进行按照某个字段排序输出。通常可以在redcue函数中自行实现,但是在value过多的情况下会出现内存溢出的问题(在互联网广告应用场景中,针对某个关键词通常pv较大,而且不是定值),因此需要借助框架本身的排序功能来实现二次排序的需求

实际上在Map端的排序中是先按照partition分区号排序,然后再按照key为键进行排序的,所以本质上Map端的排序就是二次排序。鉴于这一点,我们就可以通过自定义实现Partitioner来实现整个数据的二次排序,幸运的是Hadoop有这样的Partitioner实现类——KeyFieldBasedPartitioner,仅仅通过参数控制就可以实现二次排序,控制参数如下表所示:
KeyFieldBasedPartitioner参数列表

从上表中可以看到,KeyFieldBasedPartitioner有三个参数,前两个是基本参数,第三个是高级参数,使用KeyFieldBasedPartitioner实现二次排序问题的本质就是如何结合使用用于partition的KEY和用于排序的KEY来得到二次排序的目的。

首先介绍基本参数的使用。假设作业的输入每条数据有三个字段,字段之间使用冒号“:”作为分隔符,处理的要求是先按照第一个字段排序,第一字段相同的再按照第二字段排序。对于这样的需求可以这样实现:

  • 使用第一字段做partition,这样就可以保证第一字段相同的都在一个redcue中
  • 然后设置前两个字段整体做Mapper的输出KEY,也就是在框架排序阶段是以前两字段进行排序的,由于第一字段相同都在一个partition,又使用前两列进行排序相当于先按照第一字段排序,然后第一字段相同的再按照第二字段排序,这样设置用于partition的KEY和用于排序的KEY就可以实现二次排序的目的。

假设作业的输入每条数据有三个字段,字段之间使用冒号“:”作为分隔符,处理的要求是先按照第一个字段排序,第一字段相同的再按照第二字段排序。对于这样的需求可以这样实现:使用第一字段做partition,这样就可以保证第一字段相同的都在一个redcue中,然后设置前两个字段整体做Mapper的输出KEY,也就是在框架排序阶段是以前两字段进行排序的,由于第一字段相同都在一个partition,又使用前两列进行排序相当于先按照第一字段排序,然后第一字段相同的再按照第二字段排序,这样设置用于partition的KEY和用于排序的KEY就可以实现二次排序的目的。相关参数实现如下:

1
2
3
4
5
6
7
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-D stream.num.map.output.key.fields=2
-D stream.map.output.field.separator=:
-D map.output.key.field.separator=:
-D num.key.fields.for.partition=1

在上述实现中,首先需要使用参数partitioner来指定KeyFieldBasedPartitioner作为MapReduce的Partitioner实现。参数stream.num.map.output.key.fields用于指定前两字段作为Mapper的输出KEY,用于排序,参数stream.map.output.field.separator用于指定输入数据的分隔符,这两个参数是Partitioner的基本控制参数。然后使用KeyFieldBasedPartitioner的控制参数来设置用于控制partition的KEY。

再举一个例子,比如我们需要对以下文本实现按前三列进行排序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
aaaa 000003 126
aaaa 000000
bbbb 000002 532
aaaa 000002 432
aaaa 000002 897983
aaaa 000006 432
aaaa 000002
aaaa 000000 32423
aaaa 000003 896
aaaa 000003 126
aaaa 000006
bbbb 000002 432
bbbb 000002
bbbb 000003 4324

也就是最终我们期待的结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
aaaa 000003 126
aaaa 000003 126
aaaa 000003 896
bbbb 000003 4324
aaaa 000000
aaaa 000000 32423
aaaa 000002
aaaa 000002 432
aaaa 000002 897983
aaaa 000006
aaaa 000006 432
bbbb 000002
bbbb 000002 432
bbbb 000002 532

有了上面的说明,这个MR的job也就比较好写了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
function retrieve_available_accid()
{
input="/context/personal/xudong/test_dir/"
output="/context/personal/xudong/test_dir_output/"
${hadoop} fs -rmr ${output}
${hadoop} streaming \
-D mapred.job.name="ADUR_xudong_stat_was" \
-D mapred.map.tasks=1 \
-D stream.num.map.output.key.fields=3 \
-D mapred.reduce.tasks=2 \
-D num.key.fields.for.partition=2 \
-D mapred.job.map.capacity=200 \
-D mapred.job.reduce.capacity=200 \
-D mapred.reduce.memory.limit=8000 \
-input ${input} \
-output ${output} \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-mapper "cat" \
-reducer "cat"
}

设置partition为2,表示按照前两列进行分桶;设置stream.num.map.output.key.fields=3表示按照前三列进行排序。

接着介绍一下高级参数的使用。在实际工作中往往也有这样的需求:需要指定KEY中的某个字段或某几个字段来做partition,同时还希望排序时不要按照默认的字母序而是按照数字大小或者自定义排序规则。这样的需求显然很难通过基本参数,因此就需要使用KeyFieldBasedPartitioner的高级参数mapred.text.key.partitioner.options来实现。

可以认为参数mapred.text.key.partitioner.options是基本参数num.key.fields.for.partition的高级版,不仅可以指定key中的前几个字段用做partition,而且还可以单独指定key中某个字段或者某几个字段一起做partition。方法如下:

1
-D mapred.text.key.partitioner.options=-pos1[,pos2]

表示从pos1字段的起始字符开始到pos2字段的结束字符来做partition,其中pos2是可选的,如果没有指定pos2的值,则默认表示为到pos1字段末尾字符截止。例如,指定KEY中的第一个字段来做partition,实现如下:

1
-D mapred.text.key.partitioner.options=-1,1

需要使用KEY中的第二个字段和第三个字段一起做partition,参数使用如下:

1
-D mapred.text.key.partitioner.options=-2,3

KeyFieldBasePartitioner的使用只会影响分桶并不会直接影响排序,例如指定了自定义partition参数mapred.text.key.partitioner.options=-2,3,那么KEY中第二字段和第三字段相同的记录一定会被分桶到相同的redcue中。

注意:参数mapred.text.key.partitioner.options和num.key.fields.for.partition不需要一起使用,一起使用则以num.key.fields.for.partition为准。

2.2 Hadoop获取mapper输入文件名

有时候我们在处理Hadoop任务时,不同输入文件的格式可能不一样,这时需要针对不同的文件做不同的处理。一个简单的区分文件方法就是获取输入文件名,根据文件名来做区分。

如果用python来写mapper,通过下面代码就可以获取输入mapper的文件名:

1
path = os.environ["map_input_file"]

如果用awk来下mapper,则可以通过下面的方式获取文件名:

1
if(match(ENVIRON["map_input_file"], "rid_file"))

不同的hadoop版本可能对应的key不一样,比如这个版本中的key是”map_input_file”;有些版本中的key可能是ENVIRON[“mapreduce_map_input_file”],具体版本可以查阅相关资料。

通过environ获取的文件名是HDFS下的完整路径:
hdfs://benz.hadoop.platform.sogou:18310/context/personal/xudong/test_dir

2.3 合理利用test命令

Hadoop fs -test -e命令用于检测HDFS上的文件是否存在,有时我们在线下执行crontab定时任务时,需要先检测HDFS上的文件是否存在,再决定后续处理逻辑,以防对先上服务造成影响。执行命令为:

1
hadoop fs -test -e HDFS_file_dir

返回值为0表示文件存在,为1表示文件不存在:
hadoop test命令

比如,/context/personal/xudong/test_dir文件存在,那么返回值就是0;
/context/personal/xudong/test_dir_xxx文件不存在,那么返回值就是1;

有些时候在线下流程中中,需要先判断HDFS文件是否生成,如果生成,继续执行后面操作;如果没有生成,则可能需要加入重试机制判断文件是否生成。比如像下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for ((try_cnt=0;try_cnt<100;++try_cnt))
do
${HADOOP} fs -test -e ${BASE_DIR}/${DATE}.done
if [ $? -eq 0 ]
then
break
fi
if [ ${try_cnt} -gt 10 ]
then
echo "data of ${DATE} not ready"
fi
echo "try to fetch ${BASE_DIR}/${DATE}.done ${try_cnt} times"
sleep 300
done

2.3 Hadoop处理数据倾斜问题

在实际处理数据过程中,有时候会遇到某一类数据远远多于其他类数据的情况,比如:

1
2
3
4
5
6
7
A filed1 field2
B field1 field2
A field1 field2
A field1 field2
A field1 field2
A field1 field2
...

这个例子中A类数据明显多于B类数据,如果直接以第一个字段为key来分桶的话,所有A类数据将会被分到一个节点中,将会极大的加剧这个节点的计算和存储负担。遇到这种情况该怎么做呢?

首先可以先大概预估下A、B等其他类别数据的大致比例,以决定将A类数据分成多少份,以上例为例,假设A、B类数据比例大概为5:1,因此我们可以将A类数据随机均分为5份:

1
2
3
4
5
6
A 01 filed1 field2
B 01 field1 field2
A 04 field1 field2
A 05 field1 field2
A 02 field1 field2
A 03 field1 field2

引入第二个辅助字段,将A类数据随机的大概均分为5等分,然后以前两个字段为key来分桶,在reducer中通过合理的判断来区分数据真实来源。通过这种方式就能合理的解决Hadoop中的数据倾斜问题。