0%

hive杂记

本文主要包括:

  • hive中的复杂数据类型数据如何导入(array)
  • hive中load数据到分区和add partition的区别:
  • hive引用udf的jar报无效
  • hive实现job并发执行
  • 验证hive两个join的结果是否相等

    hive中的复杂数据类型数据如何导入(array)

    创建hive表

    create table temp.dws_search_by_program_set_count_his(
      program_set_id string, 
      click_array array<string>)
    row format delimited
    fields terminated by ','
    collection items terminated by '#'
    lines terminated by '\n';
    其中click_array 为array类型。

注意:

  • 在建表的时候一定要指定row format delimited,我这里指定了列与列质检为逗号,array的元素内容为#

数据格式:

100051130,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051133,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051134,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051136,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051138,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051140,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051157,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051161,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0
100051163,0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0#0

下面来导入数据:

load data local inpath '/home/gold/dws_search_by_program_set_count_his.csv' overwrite into table temp.dws_search_by_program_set_count_his;

效果:
hive-import-data

hive中load数据到分区和add partition的区别:

load data的方式需要移动文件路径,如果把文件就放在分区位置,这时候如果用load data的方式,就会报错,需要用add partition的方式

ALTER TABLE dws.dws_device_box_info_his_v2 ADD partition(province_alias='js',dt='20190701') 
location 'hdfs://ycluster-3/user/hive/warehouse/dws.db/dws_device_box_info_his_v2/province_alias=js/dt=20190701';

如果用load data的方式:

load data inpath 'hdfs://ycluster-3/user/hive/warehouse/dws.db/dws_device_box_info_his_v2/province_alias=js/dt=20190701' 
overwrite into table dws.dws_device_box_info_his_v2 partition(province_alias='js',dt='20190701');
````
就会报错:
```shell
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MoveTask. Unable to move source hdfs://ycluster-3/user/hive/warehouse/dws.db/dws_device_box_info_his_v2/province_alias=js/dt=20190701 to destination hdfs://ycluster-3/user/hive/warehouse/dws.db/dws_device_box_info_his_v2/province_alias=js/dt=20190701

可以看出,报错是不能移动文件位置,因为文件已经在这个路径下了

总结:

  • 如果文件已经在分区的位置,这时候,需要用add partition的方式
  • 如果文件不在分区的位置,这时候用load data的方式

具体的可以参考
hive中的复杂类型struct、array、map,这里struct、array、map都有

hive引用udf的jar报无效

背景

原始的hive jar包在/opt/hive/auxlib/udf.jar,因为要测试代码,就又创建了一个jar包,/opt/hive/auxlib/udf1.jar
但是不管怎么创建udf,新的udf的代码都没有被引用

原因

udf1.jar和udf.jar的java 类的路径和类名都是一样的,虽然在引用udf1.jar的时候,重新add jar了,但是hive不是把原始udf.jar从资源配置里拿去,当创建udf的时候,由于引用的类在原始的udf.jar中也有,所以,hive默认会引用hive在启动的时候加载的udf.jar,而不会使用udf1.jar

解决办法

新旧两个jar包,类名或者路径保持不一致

hive实现job并发执行

写了个sql,job数有20多个,一直都是上一个job跑完,下一个才开始执行,需要执行40多分钟
最近找到个方法,可以设置hive-job并发执行,但是这样会提高资源消耗,如果读取的表都是明细表,谨慎使用

set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=最大并发job数; 

启动了5个并发,时间被控制在10分钟以内了。

hive 两个join的结果是否相等

有3个表A(id,name) B(id,name) C(id,name)
其中A的name为空,B两个都非空

A inner join C on a.id = c.id 
union all
B join C on b.id = c.id and b.name = c.name

是否等于

(A union all B) t1
join C t2
on t1.id = t2.id
where t1.name is null or t1.name = t2.name) 

验证过程:

create table temp.gjc_test_111(id int,name string)
create table temp.gjc_test_222(id int,name string)
create table temp.gjc_test_333(id int,name string)
insert overwrite table temp.gjc_test_111 values
(1,null),
(2,null),
(3,null),
(4,null),
(5,null)
insert overwrite table temp.gjc_test_222 values
(1,'a'),
(1,'b'),
(3,'c'),
(4,'d'),
(5,'e')
insert overwrite table temp.gjc_test_333 values
(1,'a'),
(1,'b'),
(4,'c'),
(4,'d'),
(5,'e')

select * from 
(select * from temp.gjc_test_111 
union all 
select * from temp.gjc_test_222) t1
inner join 
temp.gjc_test_333 t2
on t1.id = t2.id
where t1.name is null or t1.name = t2.name

select * from 
temp.gjc_test_111 a inner join temp.gjc_test_333 b
on a.id = b.id
union all
select * from 
temp.gjc_test_222 a inner join temp.gjc_test_333 c
on a.id = c.id and a.name = c.name

经过验证,两个结果相同

基于python编写udf,实现判断字符串是否是标准json

# -*- coding: utf-8 -*-
## add file /home/19190845/udf.py
## select transform(fbdeal_id,orderdata) USING 'python udf.py'  AS (fbdeal_id,orderdata) from app.app_onedata_oms_orders_t_bdeal_da;
import sys
import json

def is_json(myjson):
    try:
        json_object = json.loads(myjson)
    except ValueError, e:
        return False
    return True
for line in sys.stdin:
    detail = line.strip().split("\t")
    if len(detail) != 2:
        continue
    else:
        deal_id = detail[0]
        orderData = detail[1]
        isJson = is_json(orderData)
        if not isJson:
            print str(deal_id) + "\t" + orderData
add file /home/19190845/udf.py
select transform(fbdeal_id,orderdata) USING 'python udf.py'  AS (fbdeal_id,orderdata) from app.app_onedata_oms_orders_t_bdeal_da;

以上语句,支持hive mr、tez引擎,并支持spark-sql命令行

hive列转行

可以使用map和ateral view explode新造列名,很实用

select  model_code,
        fact_rate,
        item_code,
        quota_name,
        refer_enum,
        busi_cnt
from
(select model_code,
       'POP001' AS fact_rate,
       item_code,
       case when item_code = 'POP00101' then '商品满意度'
            when item_code = 'POP00103' then '物流配送'
            when item_code = 'POP00104' then '服务满意度'
            else 'unknown'
       end as quota_name,
       count(distinct if(item_value >= 2 and item_value <= 2.99,business_id,null)) as cnt2,
       count(distinct if(item_value >= 3 and item_value <= 3.99,business_id,null)) as cnt3,
       count(distinct if(item_value >= 4 and item_value <= 4.99,business_id,null)) as cnt4,
       count(distinct if(item_value >= 5 and item_value <= 5.99,business_id,null)) as cnt5,
       count(distinct if(item_value >= 6 and item_value <= 6.99,business_id,null)) as cnt6,
       count(distinct if(item_value >= 7 and item_value <= 7.99,business_id,null)) as cnt7,
       count(distinct if(item_value >= 8 and item_value <= 8.99,business_id,null)) as cnt8,
       count(distinct if(item_value >= 9 and item_value <= 10,business_id,null)) as cnt9
from adm.adm_business_growth_comment_item_score_da
where dt = '${stat_date}'
  and item_code in ('POP00101','POP00103','POP00104')
group by model_code,
         item_code) a
lateral view explode(map('2-2.99', cnt2,
                         '3-3.99', cnt3,
                         '4-4.99', cnt4,
                         '5-5.99', cnt5,
                         '6-6.99', cnt6,
                         '7-7.99', cnt7,
                         '8-8.99', cnt8,
                         '9-10', cnt9)) b as refer_enum, busi_cnt

或者使用str_to_map函数,不过感觉还不如直接用map

select  model_code,
        fact_rate,
        item_code,
        quota_name,
        refer_enum,
        busi_cnt
from
(select model_code,
       'POP001' AS fact_rate,
       item_code,
       case when item_code = 'POP00101' then '商品满意度'
            when item_code = 'POP00103' then '物流配送'
            when item_code = 'POP00104' then '服务满意度'
            else 'unknown'
       end as quota_name,
       count(distinct if(item_value >= 2 and item_value <= 2.99,business_id,null)) as cnt2,
       count(distinct if(item_value >= 3 and item_value <= 3.99,business_id,null)) as cnt3,
       count(distinct if(item_value >= 4 and item_value <= 4.99,business_id,null)) as cnt4,
       count(distinct if(item_value >= 5 and item_value <= 5.99,business_id,null)) as cnt5,
       count(distinct if(item_value >= 6 and item_value <= 6.99,business_id,null)) as cnt6,
       count(distinct if(item_value >= 7 and item_value <= 7.99,business_id,null)) as cnt7,
       count(distinct if(item_value >= 8 and item_value <= 8.99,business_id,null)) as cnt8,
       count(distinct if(item_value >= 9 and item_value <= 10,business_id,null)) as cnt9
from adm.adm_business_growth_comment_item_score_da
where dt = '${stat_date}'
  and item_code in ('POP00101','POP00103','POP00104')
group by model_code,
         item_code) a
LATERAL VIEW
    EXPLODE(
            STR_TO_MAP(
                    CONCAT(
                        '2-2.99=',CAST (cnt2 AS STRING),
                        '&3-3.99=',CAST (cnt3 AS STRING),
                        '&4-4.99=',CAST (cnt4 AS STRING),
                        '&5-5.99=',CAST (cnt5 AS STRING),
                        '&6-6.99=',CAST (cnt6 AS STRING),
                        '&7-7.99=',CAST (cnt7 AS STRING),
                        '&8-8.99=',CAST (cnt8 AS STRING),
                        '&9-10=',CAST (cnt9 AS STRING),
                    )
                ,'&', '=')
        ) lateral_table AS refer_enum, busi_cnt
;

hive数据倾斜

这里数据倾斜的原因是partner_id = 3的数据好几百万,这里的解决办法是,把partner_id = 3的数据单独处理,添加随机数

-- 取下单前的最后的广告,提交订单的session关联到广告
drop table if exists tmp.tmp_adm_tracker_order_dt_de_3;
create  table tmp.tmp_adm_tracker_order_dt_de_3
stored as orc
as
select * from (
select *,
       row_number() over(partition by ftrade_id,partner_id order by front_time desc) as rank -- 最新的广告
from (
  select
       t1.*,
       null as rand_num,
       t2.ad_type,-- 广告类型
       t2.ad_id,-- 广告id
       t2.keyword_id,-- 竞价关键词id
       t2.keyword_contect ,-- 竞价关键词内容
       t2.sku_id as search_ad_sku_id  ,-- 搜索广告商品id
       t2.is_premium                  ,-- 是否溢价
       front_time
  from (
  select * from tmp.tmp_adm_tracker_order_dt_de_1
  where partner_id <> 3)  t1
  left join (
      select ad_type,                      -- 广告类型
             ad_id,                        -- 广告id
             keyword_id,                   -- 竞价关键词id
             keyword_contect ,             -- 竞价关键词内容
             sku_id                      , -- 搜索广告商品id
             is_premium                  , -- 是否溢价
             partner_id                  ,
             front_time
      from adm.adm_tracker_ad_click_dt_de
      where dt = '${v_dt}' and partner_id <> 3
  ) t2
    on t1.partner_id = t2.partner_id
 where to_unix_timestamp(t1.ftrade_gen_time)*1000 > t2.front_time

 union all

  select
       t1.*,
       t2.ad_type,-- 广告类型
       t2.ad_id,-- 广告id
       t2.keyword_id,-- 竞价关键词id
       t2.keyword_contect ,-- 竞价关键词内容
       t2.sku_id as search_ad_sku_id  ,-- 搜索广告商品id
       t2.is_premium                  ,-- 是否溢价
       front_time
  from (
    select *,
    (row_number() over(partition by 1 order by 1 desc) % 100) as rand_num   --数据倾斜,使用随机数重新关联
    from tmp.tmp_adm_tracker_order_dt_de_1
    where partner_id = 3)  t1
  left join (
      select ad_type,                      -- 广告类型
             ad_id,                        -- 广告id
             keyword_id,                   -- 竞价关键词id
             keyword_contect ,             -- 竞价关键词内容
             sku_id                      , -- 搜索广告商品id
             is_premium                  , -- 是否溢价
             partner_id                  ,
             front_time,
             (row_number() over(partition by 1 order by 1 desc) % 100) as rand_num  --数据倾斜,使用随机数重新关联
      from adm.adm_tracker_ad_click_dt_de
      where dt = '${v_dt}' and partner_id = 3
  ) t2
    on t1.partner_id = t2.partner_id
    and t1.rand_num = t2.rand_num
 where to_unix_timestamp(t1.ftrade_gen_time)*1000 > t2.front_time
 ) t ) tt
 where rank = 1
;

Hive优化技巧

控制reducer数量

控制hive中reducer的数量由三种方式,分别是:

script
set hive.exec.reducers.bytes.per.reducer=<number> set hive.exec.reducers.max=<number> set mapreduce.job.reduces=<number>

其中set mapreduce.job.reduces=<number>的方式优先级最高,
set hive.exec.reducers.max=<number>优先级次之,
set hive.exec.reducers.bytes.per.reducer=<number> 优先级最低。
从hive0.14开始,一个reducer处理文件的大小的默认值是256M。
reducer的数量并不是越多越好,我们知道有多少个reducer就会生成多少个文件,小文件过多在hdfs中就会占用大量的空间,造成资源的浪费。
如果reducer数量过小,导致某个reducer处理大量的数据(数据倾斜就会出现这样的现象),没有利用hadoop的分而治之功能,甚至会产生OOM内存溢出的错误。
使用多少个reducer处理数据和业务场景相关,不同的业务场景处理的办法不同。

使用Map join

set hive.auto.convert.join = true
或者使用

select /*+ MAPJOIN(table_a)*/,
       a.*,
       b.* 
from table_a a 
join table_b b 
on a.id = b.id
;

使用distinct + union all代替union

使用

select count(distinct *) 
from (
select order_id,user_id,order_type from orders where order_type='0' union all
select order_id,user_id,order_type from orders where order_type='1' union all 
select order_id,user_id,order_type from orders where order_type='1'
)a;

代替

select count(*) 
from(
select order_id,user_id,order_type from orders where order_type='0' union
select order_id,user_id,order_type from orders where order_type='0' union
select order_id,user_id,order_type from orders where order_type='1')t;

解决数据倾斜的通用办法

数据倾斜的现象:任务进度长时间维持在99%,只有少量reducer任务完成,未完成任务数据读写量非常大,超过10G。在聚合操作是经常发生。
通用解决方法:set hive.groupby.skewindata=true;
将一个map reduce拆分成两个map reduce。

最常用的是,把key设置一个随机数值。保证所有数据平均的分配到所有的reducer中处理

通过group by代替count(distinct)使用

left semi join替代in/exsits

Hive Join的实现原理

hive执行引擎会将HQL“翻译”成为map-reduce任务,如果多张表使用同一列做join则将被翻译成一个reduce,否则将被翻译成多个map-reduce任务
例如:

SELECT a.val, 
       b.val, 
       c.val 
FROM a 
JOIN b 
ON (a.key = b.key1) 
JOIN c 
ON (c.key = b.key1)
;

将被翻译成1个map-reduce任务

SELECT a.val, 
       b.val,
       c.val 
FROM a 
JOIN b 
ON (a.key = b.key1) 
JOIN c 
ON (c.key = b.key2)
;

将被翻译成2个map-reduce任务

Hive中的Join可分为Common Join(Reduce阶段完成join)和Map Join(Map阶段完成join)

Common Join

  • Map阶段
    读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key;
    Map输出的value为join之后所关心的(select或者where中需要用到的)列;同时在value中还会包含表的Tag信息,用于标明此value对应哪个表;
    按照key进行排序

  • Shuffle阶段
    根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中

  • Reduce阶段
    根据key的值完成join操作,期间通过Tag来识别不同表中的数据。

Map Join

  • 首先是Task A,它是一个Local Task(在客户端本地执行的Task),负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中
  • 接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。
    由于MapJoin没有Reduce,所以由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。

具体可以参考Hive Join 的原理与机制