是python鸭

Hive常用调优

一.Map阶段调优

(一)Map输入调优

Map输入阶段主要考虑因素为:切片大小,输入文件数,hdfs block大小

体现在mapreduce运行阶段的现象主要为两种:

  • map加载数据时间过长
  • map执行时间过长

1.加载时间过长

引起此问题的原因主要为:加载小文件数量过多,输入切片需切割大量block块。

查看是否是小文件过多: hadoop fs -ls hdfs路径,如果有大量小于block块(128M)的文件存在就是小文件过多。

# 一个节点上split的至少的大小 ,决定了多个data node上的文件是否需要合并
set mapred.min.split.size.per.node=100000000;

# 一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
set mapred.min.split.size.per.rack=100000000; 

# 数据的最大分割单元大小;max的默认值是256MB,决定合并后输入文件的大小
set mapred.max.split.size=256000000;

# 开启map前小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

2.map执行时间过长

此问题体现为:map阶段各百分比运行时间均匀,整体较长,增加map数

set mapred.map.tasks =n;

map数的计算:

  • 默认map个数:default_num = total_size / block_size;
  • 期望大小:goal_num = mapred.map.tasks;
  • 设置处理的文件大小:split_size=max(mapred.min.split.size, block_size);split_num = total_size / split_size;
  • 计算map数:compute_map_num = min(split_num, max(default_num, goal_num))

设置map数总结如下:

  • 如果想增加map个数,则设置mapred.map.tasks为一个较大的值
  • 如果想减少map个数,则设置mapred.min.split.size为一个较大的值

3.map优化数据倾斜

(1) 数据打散

将key值添加随机值,生成新的key,散列到不同的map中,初步聚合计算完成后,再加一层处理逻辑,删除添加的随机值后再次聚合,此时已经合并过一次,数据量较少,不会产生倾斜。

(2) 小表join小表

set hive.auto.convert.join=true; set hive.mapjoin.smalltable.filesize=25*1024*1024 #默认值是25mb

这种情况可直接开启map join,将小表加载到内存中,在map端进行join,但是该方法要控制小表大小,会有OOM隐患。

(3) 大表join大表

开启skewjoin,倾斜数据进行map join,非倾斜数据进行普通join。只能针对单字段聚合。

set hive.optimize.skewjoin = true;
set hive.skewjoin.key=500000;  #key超过这个值视为数据倾斜,执行map join
set hive.skewjoin.mapjoin.map.tasks=10000;
set hive.skewjoin.mapjoin.min.split=33554432;

(二) shuffle优化

1.开启map端聚合(combiner)

map端聚合数据,减少reduce压力,速度快,但是量不能过大.

set hive.map.aggr=true
set hive.groupby.mapaggr.checkinterval = 100000   #(默认)执行聚合的条数
set hive.map.aggr.hash.min.reduction=0.5   
#(默认)如果hash表的容量与输入行数之比超过这个数,那么map端的hash聚合将被关闭,默认是0.5,设置为1可以保证hash聚合永不被关闭

2.map输出文件合并

控制输出给reduce的文件数,一般配合文件压缩使用。

hive.merge.size.per.task    #控制每个任务合并小文件后的文件大小
hive.merge.smallfiles.avgsize    #定义小文件大小
hive.merge.mapfiles    #是否合并map输出文件

(三) reduce优化

Reduce的个数对整个作业的运行性能有很大影响。

  •  如果Reduce设置的过大,那么将会产生很多小文件,对NameNode会产生一定的影响,而且整个作业的运行时间未必会减少;
  • 如果Reduce设置的过小,那么单个Reduce处理的数据将会加大,很可能会引起OOM异常。

1.reduce计算

  • numTasks = min(maxReducers, input.size / perReducer)
  • maxReducers = hive.exec.reducers.max
  • perReducer=hive.exec.reducers.bytes.per.reducer

以上是reduce计算公式,影响参数主要是以下两个:

  • hive.exec.reducers.bytes.per.reducer 每个reducer处理的数据量
  • hive.exec.reducers.max 每个任务最大的reduce数

2.设置reduce个数

两种方法,一种是修改每个reducer需处理的任务数据量,二是直接修改reduce task数量:

hive.exec.reducers.bytes.per.reducer
mapred.reduce.tasks

3.reduce优化数据倾斜

(1) sql优化数据倾斜:

常遇到的数据倾斜场景中,很多是由sql语句中不合理使用聚合函数和高级查询导致的。

需要reduce操作的主要有:

  • 聚合函数(sum,count,distinct…)
  • 高级查询(group by, join, distribute by, cluster by,order by…)

count(distinct )
count(distinct )在reduce阶段对来自于不同Map Task、相同Key的结果进行去重统计,这就会使所有. 相同key的数据全部汇总到一个reduce中,导致部分reduce处理数据量远远大于其他reduce,产生数据倾斜。
优化:先内层进行group by去重,再外层进行count

order by
order by 整体排序,会将所有的数据放到一个reduce里执行
优化:cluster by/distribute by sort by

in/join
部分场景会涉及到两张表相同key的关联,left semi join可以更好的优化join/in,右表有重复值得情况下 left semi join 只产生一条,join 会产生多条,也会导致 left semi join 的性能更高。
优化: left semi join

(2) ruduce端负载均衡

hive.groupby.skewindata = true  

生成的查询计划会有两个MR Job:

  • 第一个 MR Job 中,Map 的输出结果集合会随机分布到Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;
  • 第二个 MR Job 再根据预处理的数据结果按照 Group ByKey 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce中),最后完成最终的聚合操作。

(3) reduce输出优化

hive.merge.mapredfiles

控制是否合并reduce输出文件,Map-Reduce的任务结束时合并小文件,结合reduce数量一起使用

  *压缩格式和存储格式

1. 输出结果压缩
1)Map输出阶段压缩

set hive.exec.compress.intermediate=true;
set mapred.map.output.compression.codec=com.hadoop.compression.lzo.LzoCodec;

2)Reduce输出阶段压缩

set hive.exec.compress.output=true;
mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;

2. 总结

  • 数据格式:orc | parquet
  • 压缩格式:sanppy
  • 使用压缩方式要注意是否支持切分

(四) 其他

1. Jvm重用

mapred.job.reuse.jvm.num.tasks

预先申请task插槽,直到所有task全部运行完(整个job结束)才会释放资源
场景:小文件个数过多,task个数过多
慎用!会导致长时间占用资源不释放

2. 推测执行

hive.mapred.map.tasks.speculative.execution
hive.mapred.reduce.tasks.speculative.execution

在分布式集群环境下,因为程序Bug(包括Hadoop本身的bug),负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。为了避免这种情况发生,Hadoop采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
慎用!防止bug导致某个任务卡住

3. Job优化

1)并行执行
每个查询被Hive转化为多个阶段,有些阶段关联性不大,则可以并行化执行,减少执行时间

set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8;

2)本地执行

set hive.exec.mode.local.auto=true;  

条件:
A. Job的输入数据大小必须小于参数
hive.exec.mode.local.auto.inputbytes.max(默认128MB)
B. Job的map数必须小于参数
hive.exec.mode.local.auto.tasks.max(默认4)
C. Job的reduce数必须为0或者1

3)取数与计算分开

内层只做取数,计算逻辑放到外层,尽量做到取数与计算分层进行,多层进行计算,会产生多余的job

4. 动态分区

hive.exec.dynamici.partition=true;  #开启动态分区,默认是false
set hive.exec.dynamic.partition.mode=nonstrict; #开启允许所有分区都是动态的,否则必须要有静态分区才能使用

动态分区会产生大量小文件

5.分桶表

每一个表或者分区,Hive可以进一步组织成桶。也就是说,桶为细粒度的数据范围划分。

分桶规则:对分桶字段值进行哈希,哈希值除以桶的个数求余,余数决定了该条记录在哪个桶中,也就是余数相同的在一个桶中。分桶不会改变原有表和原有分区目录的组织方式。只是更改了数据在文件中的分布。

优点:1、提高join查询效率 2、提高抽样效率

Hive常用调优已关闭评论