博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
根据用户指定的日期范围,统计各个区域下的最热门的top3商品,最后将结果写入MySQL表中(一)
阅读量:3960 次
发布时间:2019-05-24

本文共 4968 字,大约阅读时间需要 16 分钟。

 user_click_action表

create table user_click_action(click_date string,user_id string,session_id  string,page_id string,action_time string,search_keyword string,click_category_id string,click_product_id string,order_category_ids string,order_product_ids string,pay_category_ids string,pay_product_ids string,city_id string ) row format delimited fields terminated by ",";

部分数据截图

product_info表 

create table product_info(product_id string,product_name string,extend_info string)row format delimited fields terminated by ",";

部分数据截图

使用本地导入的方式,将数据导入hive(也可以使用直接上传到HDFS对应表路径)

load data local inpath '/home/spark/product_info.txt' into table product_info;

city_info表(注意city_info表是在mysql中创建的)

create table city_info(city_id string,city_name varchar(255),area varchar(255));
insert into city_info(city_id,city_name,area) values (0,'北京','华北'),(1,'上海','华东'),(2,'南京','华东'),(3,'广州','华南'),(4,'三亚','华南'),(5,'武汉','华中'),(6,'长沙','华中'),(7,'西安','西北'),(8,'成都','西南'),(9,'哈尔滨','东北');

IDEA编写Application,使Mysql和Hive表进行交互

第一步、江湖规矩,先上pom文件(注意一下版本问题)

2.11
2.4.3
2.6.0
org.apache.spark
spark-core_${scala.version}
${spark.version}
org.apache.spark
spark-streaming_${scala.version}
${spark.version}
org.apache.spark
spark-sql_${scala.version}
${spark.version}
org.apache.spark
spark-hive_${scala.version}
${spark.version}
mysql
mysql-connector-java
5.1.47

第二步、resource目录hive-site.xml

注意:自己配置的metastore名称(可在hive安装目录下的conf目录下查看),用户名及密码

javax.jdo.option.ConnectionURL
jdbc:mysql://hadoop:3306/metastore
JDBC connect string for a JDBC metastore
javax.jdo.option.ConnectionDriverName
com.mysql.jdbc.Driver
Driver class name for a JDBC metastore
javax.jdo.option.ConnectionUserName
root
username to use against metastore database
javax.jdo.option.ConnectionPassword
1234
password to use against metastore database

第三步、开工啦,编写Application

import org.apache.log4j.{Level, Logger}import org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.sql.{Row, SparkSession}object HiveDriver {  def main(args: Array[String]): Unit = {        //设置输出日志级别    Logger.getLogger("org").setLevel(Level.WARN)        //获取sparksession    val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate()    //hive库名    session.sql("use day03")        //指定时间范围的TOPN    val beginTime = "2019-12-20 00.00.00"    val endTime = "2019-12-20 12.30.01"        //从hive的user_click_action表选出有用字段,城市ID,点击时间,点击商品ID    val df = session.sql("select city_id,action_time,click_product_id from user_click_action" +      " where click_product_id != 'null' and action_time >= '"+beginTime+"' and action_time <= '"+endTime+"'")        //把DF转化为RDD,这里的转化是为了和mysql中的city_info 进行内连接,RDDjoin的条件要求必须是    //(k,v)类型的元组,所以下面这个rdd的k为city_id,v为DF中的每一行    val user_click_action = df.rdd.map(x=>{      (x.getString(0),x)    })        //获取mysql连接    val url = "jdbc:mysql://hadoop:3306/day03"    val properties = new Properties()    properties.setProperty("user","root")    properties.setProperty("password","1234")    //这个mysql也是df,相当于从mysql创建df的方式    val mysql = session.read.jdbc(url,"city_info",properties)        //和上面一致,转换为RDD    val city_info = mysql.rdd.map(x=>{      (x.getString(0),x)    })        //将两个RDD进行join操作    val TopN = city_info.join(user_click_action)    //将RDD中的内容map取出,并转换为行,相当于表中多行    val topN = TopN.map(x=>{      val cityID = x._1      val cityName = x._2._1.getString(1)      val area = x._2._1.getString(2)      val productId = x._2._2.getString(2)      Row(cityID,cityName,area,productId)    })        //定义表结构    val schema = StructType(List(      StructField("city_id",StringType),      StructField("city_name",StringType),      StructField("area",StringType),      StructField("product_id",StringType)    ))        //创建DF,利用topN    val joinedDF = session.createDataFrame(topN,schema)    //设置表名    joinedDF.createTempView("TopN")        //查看临时结果    val end =  session.sql("select * from TopN limit 10")    end.show()    //df.show()    session.stop()  }}

未完待续。。。。。


华丽的分割线。。。

我莱填坑了,哈哈哈哈

详见  

转载地址:http://jaazi.baihongyu.com/

你可能感兴趣的文章
linux下shell获取不到PID
查看>>
sort详解
查看>>
linux,shell中if else if的写法,if elif
查看>>
shell中单引号、双引号、反引号的区别
查看>>
shell脚本死循环方法
查看>>
shell中$*和$@的区别
查看>>
log4cxx 的编译安装过程和使用
查看>>
简单邮件系统程序
查看>>
STL里的multimap使用详解
查看>>
STL 库其中的 std::string用法总结
查看>>
模态对话框的销毁过程与非模态对话的几种销毁方法
查看>>
C++实现http下载 && 24点计算编码风格
查看>>
memcached了解使用和常用命令详解
查看>>
GDB调试各功能总结
查看>>
"undefined reference to" 多种可能出现的问题解决方法
查看>>
类结构定义
查看>>
Windows下关于多线程类 CSemaphore,CMutex,CCriticalSection,CEvent,信号量CSemaphore的使用介绍
查看>>
图像处理基本算法(汇总)以及实现
查看>>
C++编程获取本机网卡信息 本机IP 包括Windows和Linux
查看>>
23种设计模式详解及C++实现
查看>>