本文主要包括:
- hive中的复杂数据类型数据如何导入(array)
- hive中load数据到分区和add partition的区别:
- hive引用udf的jar报无效
- hive实现job并发执行
- 验证hive两个join的结果是否相等
hive中的复杂数据类型数据如何导入(array)
创建hive表
其中click_array 为array类型。create table temp.dws_search_by_program_set_count_his( program_set_id string, click_array array<string>) row format delimited fields terminated by ',' collection items terminated by '#' lines terminated by '\n';
注意:
- 在建表的时候一定要指定row format delimited,我这里指定了列与列质检为逗号,array的元素内容为#
数据格式:
100051130,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051133,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051134,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051136,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051138,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051140,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051157,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051161,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051163,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
下面来导入数据:
load data local inpath '/home/gold/dws_search_by_program_set_count_his.csv' overwrite into table temp.dws_search_by_program_set_count_his;
效果:
hive中load数据到分区和add partition的区别:
load data的方式需要移动文件路径,如果把文件就放在分区位置,这时候如果用load data的方式,就会报错,需要用add partition的方式
ALTER TABLE dws.dws_device_box_info_his_v2 ADD partition(province_alias='js',dt='20190701')
location 'hdfs://ycluster-3/user/hive/warehouse/dws.db/dws_device_box_info_his_v2/province_alias=js/dt=20190701';
如果用load data的方式:
load data inpath 'hdfs://ycluster-3/user/hive/warehouse/dws.db/dws_device_box_info_his_v2/province_alias=js/dt=20190701'
overwrite into table dws.dws_device_box_info_his_v2 partition(province_alias='js',dt='20190701');
````
就会报错:
```shell
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MoveTask. Unable to move source hdfs://ycluster-3/user/hive/warehouse/dws.db/dws_device_box_info_his_v2/province_alias=js/dt=20190701 to destination hdfs://ycluster-3/user/hive/warehouse/dws.db/dws_device_box_info_his_v2/province_alias=js/dt=20190701
可以看出,报错是不能移动文件位置,因为文件已经在这个路径下了
总结:
- 如果文件已经在分区的位置,这时候,需要用add partition的方式
- 如果文件不在分区的位置,这时候用load data的方式
具体的可以参考
hive中的复杂类型struct、array、map,这里struct、array、map都有
hive引用udf的jar报无效
背景
原始的hive jar包在/opt/hive/auxlib/udf.jar,因为要测试代码,就又创建了一个jar包,/opt/hive/auxlib/udf1.jar
但是不管怎么创建udf,新的udf的代码都没有被引用
原因
udf1.jar和udf.jar的java 类的路径和类名都是一样的,虽然在引用udf1.jar的时候,重新add jar了,但是hive不是把原始udf.jar从资源配置里拿去,当创建udf的时候,由于引用的类在原始的udf.jar中也有,所以,hive默认会引用hive在启动的时候加载的udf.jar,而不会使用udf1.jar
解决办法
新旧两个jar包,类名或者路径保持不一致
hive实现job并发执行
写了个sql,job数有20多个,一直都是上一个job跑完,下一个才开始执行,需要执行40多分钟
最近找到个方法,可以设置hive-job并发执行,但是这样会提高资源消耗,如果读取的表都是明细表,谨慎使用
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=最大并发job数;
启动了5个并发,时间被控制在10分钟以内了。
hive 两个join的结果是否相等
有3个表A(id,name) B(id,name) C(id,name)
其中A的name为空,B两个都非空
A inner join C on a.id = c.id
union all
B join C on b.id = c.id and b.name = c.name
是否等于
(A union all B) t1
join C t2
on t1.id = t2.id
where t1.name is null or t1.name = t2.name)
验证过程:
create table temp.gjc_test_111(id int,name string)
create table temp.gjc_test_222(id int,name string)
create table temp.gjc_test_333(id int,name string)
insert overwrite table temp.gjc_test_111 values
(1,null),
(2,null),
(3,null),
(4,null),
(5,null)
insert overwrite table temp.gjc_test_222 values
(1,'a'),
(1,'b'),
(3,'c'),
(4,'d'),
(5,'e')
insert overwrite table temp.gjc_test_333 values
(1,'a'),
(1,'b'),
(4,'c'),
(4,'d'),
(5,'e')
select * from
(select * from temp.gjc_test_111
union all
select * from temp.gjc_test_222) t1
inner join
temp.gjc_test_333 t2
on t1.id = t2.id
where t1.name is null or t1.name = t2.name
select * from
temp.gjc_test_111 a inner join temp.gjc_test_333 b
on a.id = b.id
union all
select * from
temp.gjc_test_222 a inner join temp.gjc_test_333 c
on a.id = c.id and a.name = c.name
经过验证,两个结果相同
基于python编写udf,实现判断字符串是否是标准json
# -*- coding: utf-8 -*-
## add file /home/19190845/udf.py
## select transform(fbdeal_id,orderdata) USING 'python udf.py' AS (fbdeal_id,orderdata) from app.app_onedata_oms_orders_t_bdeal_da;
import sys
import json
def is_json(myjson):
try:
json_object = json.loads(myjson)
except ValueError, e:
return False
return True
for line in sys.stdin:
detail = line.strip().split("\t")
if len(detail) != 2:
continue
else:
deal_id = detail[0]
orderData = detail[1]
isJson = is_json(orderData)
if not isJson:
print str(deal_id) + "\t" + orderData
add file /home/19190845/udf.py
select transform(fbdeal_id,orderdata) USING 'python udf.py' AS (fbdeal_id,orderdata) from app.app_onedata_oms_orders_t_bdeal_da;
以上语句,支持hive mr、tez引擎,并支持spark-sql命令行
hive列转行
可以使用map和ateral view explode新造列名,很实用
select model_code,
fact_rate,
item_code,
quota_name,
refer_enum,
busi_cnt
from
(select model_code,
'POP001' AS fact_rate,
item_code,
case when item_code = 'POP00101' then '商品满意度'
when item_code = 'POP00103' then '物流配送'
when item_code = 'POP00104' then '服务满意度'
else 'unknown'
end as quota_name,
count(distinct if(item_value >= 2 and item_value <= 2.99,business_id,null)) as cnt2,
count(distinct if(item_value >= 3 and item_value <= 3.99,business_id,null)) as cnt3,
count(distinct if(item_value >= 4 and item_value <= 4.99,business_id,null)) as cnt4,
count(distinct if(item_value >= 5 and item_value <= 5.99,business_id,null)) as cnt5,
count(distinct if(item_value >= 6 and item_value <= 6.99,business_id,null)) as cnt6,
count(distinct if(item_value >= 7 and item_value <= 7.99,business_id,null)) as cnt7,
count(distinct if(item_value >= 8 and item_value <= 8.99,business_id,null)) as cnt8,
count(distinct if(item_value >= 9 and item_value <= 10,business_id,null)) as cnt9
from adm.adm_business_growth_comment_item_score_da
where dt = '${stat_date}'
and item_code in ('POP00101','POP00103','POP00104')
group by model_code,
item_code) a
lateral view explode(map('2-2.99', cnt2,
'3-3.99', cnt3,
'4-4.99', cnt4,
'5-5.99', cnt5,
'6-6.99', cnt6,
'7-7.99', cnt7,
'8-8.99', cnt8,
'9-10', cnt9)) b as refer_enum, busi_cnt
或者使用str_to_map函数,不过感觉还不如直接用map
select model_code,
fact_rate,
item_code,
quota_name,
refer_enum,
busi_cnt
from
(select model_code,
'POP001' AS fact_rate,
item_code,
case when item_code = 'POP00101' then '商品满意度'
when item_code = 'POP00103' then '物流配送'
when item_code = 'POP00104' then '服务满意度'
else 'unknown'
end as quota_name,
count(distinct if(item_value >= 2 and item_value <= 2.99,business_id,null)) as cnt2,
count(distinct if(item_value >= 3 and item_value <= 3.99,business_id,null)) as cnt3,
count(distinct if(item_value >= 4 and item_value <= 4.99,business_id,null)) as cnt4,
count(distinct if(item_value >= 5 and item_value <= 5.99,business_id,null)) as cnt5,
count(distinct if(item_value >= 6 and item_value <= 6.99,business_id,null)) as cnt6,
count(distinct if(item_value >= 7 and item_value <= 7.99,business_id,null)) as cnt7,
count(distinct if(item_value >= 8 and item_value <= 8.99,business_id,null)) as cnt8,
count(distinct if(item_value >= 9 and item_value <= 10,business_id,null)) as cnt9
from adm.adm_business_growth_comment_item_score_da
where dt = '${stat_date}'
and item_code in ('POP00101','POP00103','POP00104')
group by model_code,
item_code) a
LATERAL VIEW
EXPLODE(
STR_TO_MAP(
CONCAT(
'2-2.99=',CAST (cnt2 AS STRING),
'&3-3.99=',CAST (cnt3 AS STRING),
'&4-4.99=',CAST (cnt4 AS STRING),
'&5-5.99=',CAST (cnt5 AS STRING),
'&6-6.99=',CAST (cnt6 AS STRING),
'&7-7.99=',CAST (cnt7 AS STRING),
'&8-8.99=',CAST (cnt8 AS STRING),
'&9-10=',CAST (cnt9 AS STRING),
)
,'&', '=')
) lateral_table AS refer_enum, busi_cnt
;
hive数据倾斜
这里数据倾斜的原因是partner_id = 3
的数据好几百万,这里的解决办法是,把partner_id = 3
的数据单独处理,添加随机数
-- 取下单前的最后的广告,提交订单的session关联到广告
drop table if exists tmp.tmp_adm_tracker_order_dt_de_3;
create table tmp.tmp_adm_tracker_order_dt_de_3
stored as orc
as
select * from (
select *,
row_number() over(partition by ftrade_id,partner_id order by front_time desc) as rank -- 最新的广告
from (
select
t1.*,
null as rand_num,
t2.ad_type,-- 广告类型
t2.ad_id,-- 广告id
t2.keyword_id,-- 竞价关键词id
t2.keyword_contect ,-- 竞价关键词内容
t2.sku_id as search_ad_sku_id ,-- 搜索广告商品id
t2.is_premium ,-- 是否溢价
front_time
from (
select * from tmp.tmp_adm_tracker_order_dt_de_1
where partner_id <> 3) t1
left join (
select ad_type, -- 广告类型
ad_id, -- 广告id
keyword_id, -- 竞价关键词id
keyword_contect , -- 竞价关键词内容
sku_id , -- 搜索广告商品id
is_premium , -- 是否溢价
partner_id ,
front_time
from adm.adm_tracker_ad_click_dt_de
where dt = '${v_dt}' and partner_id <> 3
) t2
on t1.partner_id = t2.partner_id
where to_unix_timestamp(t1.ftrade_gen_time)*1000 > t2.front_time
union all
select
t1.*,
t2.ad_type,-- 广告类型
t2.ad_id,-- 广告id
t2.keyword_id,-- 竞价关键词id
t2.keyword_contect ,-- 竞价关键词内容
t2.sku_id as search_ad_sku_id ,-- 搜索广告商品id
t2.is_premium ,-- 是否溢价
front_time
from (
select *,
(row_number() over(partition by 1 order by 1 desc) % 100) as rand_num --数据倾斜,使用随机数重新关联
from tmp.tmp_adm_tracker_order_dt_de_1
where partner_id = 3) t1
left join (
select ad_type, -- 广告类型
ad_id, -- 广告id
keyword_id, -- 竞价关键词id
keyword_contect , -- 竞价关键词内容
sku_id , -- 搜索广告商品id
is_premium , -- 是否溢价
partner_id ,
front_time,
(row_number() over(partition by 1 order by 1 desc) % 100) as rand_num --数据倾斜,使用随机数重新关联
from adm.adm_tracker_ad_click_dt_de
where dt = '${v_dt}' and partner_id = 3
) t2
on t1.partner_id = t2.partner_id
and t1.rand_num = t2.rand_num
where to_unix_timestamp(t1.ftrade_gen_time)*1000 > t2.front_time
) t ) tt
where rank = 1
;
Hive优化技巧
控制reducer数量
控制hive中reducer的数量由三种方式,分别是:
set hive.exec.reducers.bytes.per.reducer=<number>
set hive.exec.reducers.max=<number>
set mapreduce.job.reduces=<number>
其中set mapreduce.job.reduces=<number>
的方式优先级最高,set hive.exec.reducers.max=<number>
优先级次之,set hive.exec.reducers.bytes.per.reducer=<number>
优先级最低。
从hive0.14开始,一个reducer处理文件的大小的默认值是256M。
reducer的数量并不是越多越好,我们知道有多少个reducer就会生成多少个文件,小文件过多在hdfs中就会占用大量的空间,造成资源的浪费。
如果reducer数量过小,导致某个reducer处理大量的数据(数据倾斜就会出现这样的现象),没有利用hadoop的分而治之功能,甚至会产生OOM内存溢出的错误。
使用多少个reducer处理数据和业务场景相关,不同的业务场景处理的办法不同。
使用Map join
set hive.auto.convert.join = true
或者使用
select /*+ MAPJOIN(table_a)*/,
a.*,
b.*
from table_a a
join table_b b
on a.id = b.id
;
使用distinct + union all代替union
使用
select count(distinct *)
from (
select order_id,user_id,order_type from orders where order_type='0' union all
select order_id,user_id,order_type from orders where order_type='1' union all
select order_id,user_id,order_type from orders where order_type='1'
)a;
代替
select count(*)
from(
select order_id,user_id,order_type from orders where order_type='0' union
select order_id,user_id,order_type from orders where order_type='0' union
select order_id,user_id,order_type from orders where order_type='1')t;
解决数据倾斜的通用办法
数据倾斜的现象:任务进度长时间维持在99%,只有少量reducer任务完成,未完成任务数据读写量非常大,超过10G。在聚合操作是经常发生。
通用解决方法:set hive.groupby.skewindata=true;
将一个map reduce拆分成两个map reduce。
最常用的是,把key设置一个随机数值。保证所有数据平均的分配到所有的reducer中处理
通过group by代替count(distinct)使用
left semi join替代in/exsits
Hive Join的实现原理
hive执行引擎会将HQL“翻译”成为map-reduce任务,如果多张表使用同一列做join则将被翻译成一个reduce,否则将被翻译成多个map-reduce任务
例如:
SELECT a.val,
b.val,
c.val
FROM a
JOIN b
ON (a.key = b.key1)
JOIN c
ON (c.key = b.key1)
;
将被翻译成1个map-reduce任务
SELECT a.val,
b.val,
c.val
FROM a
JOIN b
ON (a.key = b.key1)
JOIN c
ON (c.key = b.key2)
;
将被翻译成2个map-reduce任务
Hive中的Join可分为Common Join(Reduce阶段完成join)和Map Join(Map阶段完成join)
Common Join
Map阶段
读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key;
Map输出的value为join之后所关心的(select或者where中需要用到的)列;同时在value中还会包含表的Tag信息,用于标明此value对应哪个表;
按照key进行排序Shuffle阶段
根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中Reduce阶段
根据key的值完成join操作,期间通过Tag来识别不同表中的数据。
Map Join
- 首先是Task A,它是一个Local Task(在客户端本地执行的Task),负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中
- 接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。
由于MapJoin没有Reduce,所以由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。
具体可以参考Hive Join 的原理与机制