本文共 4968 字,大约阅读时间需要 16 分钟。
user_click_action表
create table user_click_action(click_date string,user_id string,session_id string,page_id string,action_time string,search_keyword string,click_category_id string,click_product_id string,order_category_ids string,order_product_ids string,pay_category_ids string,pay_product_ids string,city_id string ) row format delimited fields terminated by ",";
部分数据截图
product_info表
create table product_info(product_id string,product_name string,extend_info string)row format delimited fields terminated by ",";
部分数据截图
使用本地导入的方式,将数据导入hive(也可以使用直接上传到HDFS对应表路径)
load data local inpath '/home/spark/product_info.txt' into table product_info;
city_info表(注意city_info表是在mysql中创建的)
create table city_info(city_id string,city_name varchar(255),area varchar(255));
insert into city_info(city_id,city_name,area) values (0,'北京','华北'),(1,'上海','华东'),(2,'南京','华东'),(3,'广州','华南'),(4,'三亚','华南'),(5,'武汉','华中'),(6,'长沙','华中'),(7,'西安','西北'),(8,'成都','西南'),(9,'哈尔滨','东北');
IDEA编写Application,使Mysql和Hive表进行交互
第一步、江湖规矩,先上pom文件(注意一下版本问题)
2.11 2.4.3 2.6.0 org.apache.spark spark-core_${scala.version} ${spark.version} org.apache.spark spark-streaming_${scala.version} ${spark.version} org.apache.spark spark-sql_${scala.version} ${spark.version} org.apache.spark spark-hive_${scala.version} ${spark.version} mysql mysql-connector-java 5.1.47
第二步、resource目录hive-site.xml
注意:自己配置的metastore名称(可在hive安装目录下的conf目录下查看),用户名及密码
javax.jdo.option.ConnectionURL jdbc:mysql://hadoop:3306/metastore JDBC connect string for a JDBC metastore javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver Driver class name for a JDBC metastore javax.jdo.option.ConnectionUserName root username to use against metastore database javax.jdo.option.ConnectionPassword 1234 password to use against metastore database
第三步、开工啦,编写Application
import org.apache.log4j.{Level, Logger}import org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.sql.{Row, SparkSession}object HiveDriver { def main(args: Array[String]): Unit = { //设置输出日志级别 Logger.getLogger("org").setLevel(Level.WARN) //获取sparksession val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate() //hive库名 session.sql("use day03") //指定时间范围的TOPN val beginTime = "2019-12-20 00.00.00" val endTime = "2019-12-20 12.30.01" //从hive的user_click_action表选出有用字段,城市ID,点击时间,点击商品ID val df = session.sql("select city_id,action_time,click_product_id from user_click_action" + " where click_product_id != 'null' and action_time >= '"+beginTime+"' and action_time <= '"+endTime+"'") //把DF转化为RDD,这里的转化是为了和mysql中的city_info 进行内连接,RDDjoin的条件要求必须是 //(k,v)类型的元组,所以下面这个rdd的k为city_id,v为DF中的每一行 val user_click_action = df.rdd.map(x=>{ (x.getString(0),x) }) //获取mysql连接 val url = "jdbc:mysql://hadoop:3306/day03" val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","1234") //这个mysql也是df,相当于从mysql创建df的方式 val mysql = session.read.jdbc(url,"city_info",properties) //和上面一致,转换为RDD val city_info = mysql.rdd.map(x=>{ (x.getString(0),x) }) //将两个RDD进行join操作 val TopN = city_info.join(user_click_action) //将RDD中的内容map取出,并转换为行,相当于表中多行 val topN = TopN.map(x=>{ val cityID = x._1 val cityName = x._2._1.getString(1) val area = x._2._1.getString(2) val productId = x._2._2.getString(2) Row(cityID,cityName,area,productId) }) //定义表结构 val schema = StructType(List( StructField("city_id",StringType), StructField("city_name",StringType), StructField("area",StringType), StructField("product_id",StringType) )) //创建DF,利用topN val joinedDF = session.createDataFrame(topN,schema) //设置表名 joinedDF.createTempView("TopN") //查看临时结果 val end = session.sql("select * from TopN limit 10") end.show() //df.show() session.stop() }}
未完待续。。。。。
华丽的分割线。。。
我莱填坑了,哈哈哈哈
详见
转载地址:http://jaazi.baihongyu.com/