44 Hive实战
lix_uan 591 1

需求描述

  • 统计视频观看数Top10

  • 计视频类别热度Top10

  • 统计出视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数

  • 统计视频观看数Top50所关联视频的所属类别Rank

  • 计每个类别中的视频热度Top10,以Music为例

  • 统计每个类别视频观看数Top10

  • 统计上传视频最多的用户Top10以及他们上传的视频观看次数在前20的视频

数据结构

视频表

字段 备注 详细描述
videoId 视频唯一id(String) 11位字符串
uploader 视频上传者(String) 上传视频的用户名String
age 视频年龄(int) 视频在平台上的整数天
category 视频类别(Array 上传视频指定的视频分类
length 视频长度(Int) 整形数字标识的视频长度
views 观看次数(Int) 视频被浏览的次数
rate 视频评分(Double) 满分5分
Ratings 流量(Int) 视频的流量,整型数字
conments 评论数(Int) 一个视频的整数评论数
relatedId 相关视频id(Array 相关视频的id,最多20个

用户表

字段 备注 字段类型
uploader 上传者用户名 string
videos 上传视频数 int
friends 朋友数量 int

ETL

封装工具类

public class ETLUtil {
    /**
     * 数据清洗方法
     */
    public static  String  etlData(String srcData){
        StringBuffer resultData = new StringBuffer();
        //1. 先将数据通过\t 切割
        String[] datas = srcData.split("\t");
        //2. 判断长度是否小于9
        if(datas.length <9){
            return null ;
        }
        //3. 将数据中的视频类别的空格去掉
        datas[3]=datas[3].replaceAll(" ","");
        //4. 将数据中的关联视频id通过&拼接
        for (int i = 0; i < datas.length; i++) {
            if(i < 9){
                //4.1 没有关联视频的情况
                if(i == datas.length-1){
                    resultData.append(datas[i]);
                }else{
                    resultData.append(datas[i]).append("\t");
                }
            }else{
                //4.2 有关联视频的情况
                if(i == datas.length-1){
                    resultData.append(datas[i]);
                }else{
                    resultData.append(datas[i]).append("&");
                }
            }
        }
        return resultData.toString();
    }
}  

Mapper

  /**
 * 清洗谷粒影音的原始数据
 * 清洗规则
 *  1. 将数据长度小于9的清洗掉
 *  2. 将数据中的视频类别中间的空格去掉   People & Blogs
 *  3. 将数据中的关联视频id通过&符号拼接
 */
public class EtlMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    private Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       //获取一行
        String line = value.toString();
        //清洗
        String resultData = ETLUtil.etlData(line);

        if(resultData != null) {
            //写出
            k.set(resultData);
            context.write(k,NullWritable.get());
        }
    }
}

Driver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EtlDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job  = Job.getInstance(conf);
        job.setJarByClass(EtlDriver.class);
        job.setMapperClass(EtlMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(0);
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        job.waitForCompletion(true);
    }
}  
  • 将ETL程序打包为etl.jar,并上传到Linux的/opt/module/hive/datas目录下

上传原始数据

cd /opt/module/hive/datas
hadoop fs -mkdir -p  /gulivideo/video
hadoop fs -mkdir -p  /gulivideo/user
hadoop fs -put gulivideo/user/user.txt   /gulivideo/user
hadoop fs -put gulivideo/video/*.txt   /gulivideo/video

ETL数据

hadoop jar  etl.jar  cn.lixuan.hive.etl.EtlDriver /gulivideo/video /gulivideo/video/output

准备表

需要准备的表

  • 创建原始数据表
    • gulivideo_ori,gulivideo_user_ori
  • 创建最终表
    • gulivideo_orc、gulivideo_user_orc

gulivideo_ori

create table gulivideo_ori(
    videoId string, 
    uploader string, 
    age int, 
    category array<string>, 
    length int, 
    views int, 
    rate float, 
    ratings int, 
    comments int,
    relatedId array<string>)
row format delimited fields terminated by "\t"
collection items terminated by "&"
stored as textfile;

gulivideo_user_ori

create table gulivideo_user_ori(
    uploader string,
    videos int,
    friends int)
row format delimited 
fields terminated by "\t" 
stored as textfile;

gulivideo_orc

create table gulivideo_orc(
    videoId string, 
    uploader string, 
    age int, 
    category array<string>, 
    length int, 
    views int, 
    rate float, 
    ratings int, 
    comments int,
    relatedId array<string>)
stored as orc
tblproperties("orc.compress"="SNAPPY");

gulivideo_user_orc

create table gulivideo_user_orc(
    uploader string,
    videos int,
    friends int)
row format delimited 
fields terminated by "\t" 
stored as orc
tblproperties("orc.compress"="SNAPPY");

向ori表插入数据

load data inpath "/gulivideo/video/output" into table gulivideo_ori;
load data inpath "/gulivideo/user" into table gulivideo_user_ori;

向orc表插入数据

insert into table gulivideo_orc select * from gulivideo_ori;
insert into table gulivideo_user_orc select * from gulivideo_user_ori;

安装Tez引擎

  • Tez是一个Hive的运行引擎,性能优于MR

44 Hive实战

# 将tez安装包拷贝到集群,并解压tar包
mkdir /opt/module/tez
tar -zxvf /opt/software/tez-0.10.1-SNAPSHOT-minimal.tar.gz -C /opt/module/tez

# 上传tez依赖到HDFS
hadoop fs -mkdir /tez
hadoop fs -put /opt/software/tez-0.10.1-SNAPSHOT.tar.gz /tez

# 新建tez-site.xml
vim $HADOOP_HOME/etc/hadoop/tez-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
    <name>tez.lib.uris</name>
    <value>${fs.defaultFS}/tez/tez-0.10.1-SNAPSHOT.tar.gz</value>
</property>
<property>
     <name>tez.use.cluster.hadoop-libs</name>
     <value>true</value>
</property>
<property>
     <name>tez.am.resource.memory.mb</name>
     <value>1024</value>
</property>
<property>
     <name>tez.am.resource.cpu.vcores</name>
     <value>1</value>
</property>
<property>
     <name>tez.container.max.java.heap.fraction</name>
     <value>0.4</value>
</property>
<property>
     <name>tez.task.resource.memory.mb</name>
     <value>1024</value>
</property>
<property>
     <name>tez.task.resource.cpu.vcores</name>
     <value>1</value>
</property>
</configuration>
# 修改Hadoop环境变量
vim $HADOOP_HOME/etc/hadoop/shellprofile.d/tez.sh

# 添加Tez的Jar包相关信息
hadoop_add_profile tez
function _tez_hadoop_classpath
{
    hadoop_add_classpath "$HADOOP_HOME/etc/hadoop" after
    hadoop_add_classpath "/opt/module/tez/*" after
    hadoop_add_classpath "/opt/module/tez/lib/*" after
}
# 修改Hive的计算引擎
vim $HIVE_HOME/conf/hive-site.xml
<property>
    <name>hive.execution.engine</name>
    <value>tez</value>
</property>
<property>
    <name>hive.tez.container.size</name>
    <value>1024</value>
</property>
# 解决日志Jar包冲突
rm /opt/module/tez/lib/slf4j-log4j12-1.7.10.jar

业务分析

统计视频观看数Top10

SELECT 
     videoId,
     views 
FROM 
     gulivideo_orc
ORDER BY 
     views DESC 
LIMIT 10;

统计视频类别热度Top10

SELECT 
    t1.category_name , 
    COUNT(t1.videoId) hot
FROM 
(
SELECT 
    videoId, 
    category_name 
FROM 
    gulivideo_orc 
lateral VIEW explode(category) gulivideo_orc_tmp AS category_name
) t1
GROUP BY 
    t1.category_name 
ORDER BY
    hot 
DESC 
LIMIT 10

统计视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数

SELECT 
    t2.category_name,
    COUNT(t2.videoId) video_sum
FROM 
(
SELECT
    t1.videoId,
    category_name
FROM 
(
SELECT 
    videoId, 
    views ,
    category 
FROM 
    gulivideo_orc
ORDER BY 
    views 
DESC 
LIMIT 20 
) t1
lateral VIEW explode(t1.category) t1_tmp AS category_name
) t2
GROUP BY t2.category_name

统计视频观看数Top50所关联视频的所属类别排序

SELECT
   t6.category_name,
   t6.video_sum,
   rank() over(ORDER BY t6.video_sum DESC ) rk
FROM
(
SELECT
   t5.category_name,
   COUNT(t5.relatedid_id) video_sum
FROM
(
SELECT
  t4.relatedid_id,
  category_name
FROM
(
SELECT 
  t2.relatedid_id ,
  t3.category 
FROM 
(
SELECT 
   relatedid_id
FROM 
(
SELECT 
   videoId, 
   views,
   relatedid 
FROM 
   gulivideo_orc
ORDER BY
   views 
DESC 
LIMIT 50
)t1
lateral VIEW explode(t1.relatedid) t1_tmp AS relatedid_id
)t2 
JOIN 
   gulivideo_orc t3 
ON 
 t2.relatedid_id = t3.videoId 
) t4 
lateral VIEW explode(t4.category) t4_tmp AS category_name
) t5
GROUP BY
  t5.category_name
ORDER BY 
  video_sum
DESC 
) t6

统计每个类别中的视频热度Top10,以Music为例

SELECT 
    t1.videoId, 
    t1.views,
    t1.category_name
FROM 
(
SELECT
    videoId,
    views,
    category_name
FROM gulivideo_orc
lateral VIEW explode(category) gulivideo_orc_tmp AS category_name
)t1    
WHERE 
    t1.category_name = "Music" 
ORDER BY 
    t1.views 
DESC 
LIMIT 10

统计每个类别视频观看数Top10

SELECT 
  t2.videoId,
  t2.views,
  t2.category_name,
  t2.rk
FROM 
(
SELECT 
   t1.videoId,
   t1.views,
   t1.category_name,
   rank() over(PARTITION BY t1.category_name ORDER BY t1.views DESC ) rk
FROM    
(
SELECT
    videoId,
    views,
    category_name
FROM gulivideo_orc
lateral VIEW explode(category) gulivideo_orc_tmp AS category_name
)t1
)t2
WHERE t2.rk <=10

统计上传视频最多的用户Top10以及他们上传的视频观看次数在前20的视频

SELECT 
   t2.videoId,
   t2.views,
   t2.uploader
FROM
(
SELECT 
   uploader,
   videos
FROM gulivideo_user_orc 
ORDER BY 
   videos
DESC
LIMIT 10    
) t1
JOIN gulivideo_orc t2 
ON t1.uploader = t2.uploader
ORDER BY 
  t2.views 
DESC
LIMIT 20

常见错误及解决方案

更换Tez引擎后,执行任务卡住,可以尝试调节容量调度器的资源调度策略

  • 将$HADOOP_HOME/etc/hadoop/capacity-scheduler.xml文件中的

    <property>
        <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
        <value>0.1</value>
        <description>
          Maximum percent of resources in the cluster which can be used to run 
          application masters i.e. controls number of concurrent running
          applications.
        </description>
    </property>
  • 改成

    <property>
        <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
        <value>1</value>
        <description>
          Maximum percent of resources in the cluster which can be used to run 
          application masters i.e. controls number of concurrent running
          applications.
        </description>
    </property>

hive默认的输入格式处理是CombineHiveInputFormat,会对小文件进行合并

set hive.input.format;
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

# 可以采用HiveInputFormat就会根据分区数输出相应的文件
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

JVM堆内存溢出

  • java.lang.OutOfMemoryError: Java heap space

  • 在yarn-site.xml中加入如下代码

    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>2048</value>
    </property>
    <property>
          <name>yarn.scheduler.minimum-allocation-mb</name>
          <value>2048</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-pmem-ratio</name>
        <value>2.1</value>
    </property>
    <property>
        <name>mapred.child.java.opts</name>
        <value>-Xmx1024m</value>
    </property>

虚拟内存限制

  • yarn-site.xml中添加

    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
     </property>
评论区

索引目录