教程目录0x00 教程内容0x01 项目分析1. 项目回顾2. 项目目标0x02 编程实现1. 按cookie进行分组2. 按user进行分组3. 按时排序日志4. 切割会话5. 生成会话6. 目前的结果是7. 实现do ** in_label字段8. 实现cookie_label字段9. 保存统计结果10. 解决报错0x03 结果展示0xFF 总结
0x00 教程内容项目分析编程实现结果展示上一个教程:网站用户行为分析项目会话切割(1) 我们做了很多准备,包括最后一步是过滤非法数据。现在让我们回顾一下我们的数据变化过程。
0x01 项目分析1. 项目回顾数据流程回顾(原始数据)=> rawRDD => parsedLogRDD)a. 原始数据:
#type|server time|cookie|ip|urlpageview|2017-09-04 12:00:00|cookie1|127.0.0.3|https:// ** .baidu.comclick|2017-09-04 12:00:02|cookie1|127.0.0.3|https:// ** .baidu.compageview|2017-09-04 12:00:01|cookie2|127.0.0.4|https:// ** .baidu.comclick|2017-09-04 12:00:04|cookie1|127.0.0.3|https:// ** .baidu.compageview|2017-09-04 12:00:02|cookie2|127.0.0.4|http://news.baidu.comclick|2017-09-04 12:00:03|cookie2|127.0.0.4|http://news.baidu.compageview|2017-09-04 12:00:04|cookie2|127.0.0.4|http://music.baidu.com/?fr=tiebapageview|2017-09-04 12:45:01|cookie1|127.0.0.3|https://tieba.baidu.com/index.htmlclick|2017-09-04 12:45:02|cookie1|127.0.0.3|https://tieba.baidu.com/index.htmlclick|2017-09-04 12:45:03|cookie1|127.0.0.3|https://tieba.baidu.com/index.htmlhhhh|2017-09-04 12:45:03|cookie1|127.0.0.3|https://tieba.baidu.com/index.html3333ss|2017-09-04 12:45:03|cookie1|127.0.0.3|https://tieba.baidu.com/index.htmlb. 加载数据后,生成rawRDD,接着尝试将RDD转换成以下格式:
NoneSome({"log_type": "pageview","log_server_time": "2017-09-04 12:00:00","cookie": "cookie1","ip": "127.0.0.3","url": "https:// ** .baidu.com"})Some({"log_type": "click","log_server_time": "2017-09-04 12:00:02","cookie": "cookie1","ip": "127.0.0.3","url": "https:// ** .baidu.com"})Some({"log_type": "pageview","log_server_time": "2017-09-04 12:00:01","cookie": "cookie2","ip": "127.0.0.4","url": "https:// ** .baidu.com"})Some({"log_type": "click","log_server_time": "2017-09-04 12:00:04","cookie": "cookie1","ip": "127.0.0.3","url": "https:// ** .baidu.com"})Some({"log_type": "pageview","log_server_time": "2017-09-04 12:00:02","cookie": "cookie2","ip": "127.0.0.4","url": "http://news.baidu.com"})Some({"log_type": "click","log_server_time": "2017-09-04 12:00:03","cookie": "cookie2","ip": "127.0.0.4","url": "http://news.baidu.com"})Some({"log_type": "pageview","log_server_time": "2017-09-04 12:00:04","cookie": "cookie2","ip": "127.0.0.4","url": "http://music.baidu.com/?fr=tieba"})Some({"log_type": "pageview","log_server_time": "2017-09-04 12:45:01","cookie": "cookie1","ip": "127.0.0.3","url": "https://tieba.baidu.com/index.html"})Some({"log_type": "click","log_server_time": "2017-09-04 12:45:02","cookie": "cookie1","ip": "127.0.0.3","url": "https://tieba.baidu.com/index.html"})Some({"log_type": "click","log_server_time": "2017-09-04 12:45:03","cookie": "cookie1","ip": "127.0.0.3","url": "https://tieba.baidu.com/index.html"})Some({"log_type": "hhhh","log_server_time": "2017-09-04 12:45:03","cookie": "cookie1","ip": "127.0.0.3","url": "https://tieba.baidu.com/index.html"})Some({"log_type": "3333ss","log_server_time": "2017-09-04 12:45:03","cookie": "cookie1","ip": "127.0.0.3","url": "https://tieba.baidu.com/index.html"})c. 但我们认为这不是我们想要的格式,所以我们把它变成了parsedLogRDD:
{"log_type": "pageview","log_server_time": "2017-09-04 12:00:00","cookie": "cookie1","ip": "127.0.0.3","url": "https:// ** .baidu.com"}{"log_type": "click","log_server_time": "2017-09-04 12:00:02","cookie": "cookie1","ip": "127.0.0.3","url": "https:// ** .baidu.com"}{"log_type": "pageview","log_server_time": "2017-09-04 12:00:01","cookie": "cookie2","ip": "127.0.0.4","url": "https:// ** .baidu.com"}{"log_type": "click","log_server_time": "2017-09-04 12:00:04","cookie": "cookie1","ip": "127.0.0.3","url": "https:// ** .baidu.com"}{"log_type": "pageview","log_server_time": "2017-09-04 12:00:02", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"}{"log_type": "click", "log_server_time": "2017-09-04 12:00:03", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"}{"log_type": "pageview", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://music.baidu.com/?fr=tieba"}{"log_type": "pageview", "log_server_time": "2017-09-04 12:45:01", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}{"log_type": "click", "log_server_time": "2017-09-04 12:45:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}{"log_type": "click", "log_server_time": "2017-09-04 12:45:03", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}d. 而此处的parsedLogRDD里面的value部分其实是与trackerLog对象的属性一一相对应的,trackerLog对象格式类似如下:
TrackerLog(pageview,2017-09-04 12:00:00,cookie1,127.0.0.3,https:// ** .baidu.com)............2. 项目目标a. 会话切割类型
我们是想要进行会话切割,会话切割必定是cookie级别或者user级别的,即我们按cookie、按user进程切成切割,一个cookie或者user可以有多个会话。
如果无法理解,可以先往后面看,再由结果回过头来看。
0x02 编程实现1. 按cookie进行分组现在,我们这里采取先用cookie分组,然后再按user切割的方式。即看一下有多少个cookie,类似于有多少个用户,然后再从用户中切成多少个会话,会话默认是每30分钟切一个。
a. 按照cookie进行分组
val cookieGroupRDD: RDD[(String, Iterable[TrackerLog])] = parsedLogRDD.groupBy(trackerLog => trackerLog.getCookie.toString)分组之后,我们的数据形式类似于如下格式,即按cookie进行了分组:
cookie1 -> Iterator(trackerLog1,trackerLog2.....)cookie2 -> Iterator(trackerLog4,trackerLog5.....)此时的每个cookie分成一个key-value,value为装有trackerLog对象的迭代器,cookie均相同。
b. 实际得到的效果如下:
(cookie2,CompactBuffer({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:01", "cookie": "cookie2", "ip": "127.0.0.4", "url": "https:// ** .baidu.com"}, {"log_type": "pageview", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"}, {"log_type": "click", "log_server_time": "2017-09-04 12:00:03", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://news.baidu.com"}, {"log_type": "pageview", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie2", "ip": "127.0.0.4", "url": "http://music.baidu.com/?fr=tieba"}))(cookie1,CompactBuffer({"log_type": "pageview", "log_server_time": "2017-09-04 12:00:00", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https:// ** .baidu.com"}, {"log_type": "click", "log_server_time": "2017-09-04 12:00:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https:// ** .baidu.com"}, {"log_type": "click", "log_server_time": "2017-09-04 12:00:04", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https:// ** .baidu.com"}, {"log_type": "pageview", "log_server_time": "2017-09-04 12:45:01", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}, {"log_type": "click", "log_server_time": "2017-09-04 12:45:02", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}, {"log_type": "click", "log_server_time": "2017-09-04 12:45:03", "cookie": "cookie1", "ip": "127.0.0.3", "url": "https://tieba.baidu.com/index.html"}))cookie2有4条数据,cookie1有6条数据。
2. 按user进行分组按cookie进行切割之后,下面还需要进行按user进行切割,即每30分钟切割成一份。
a. 对每个cookie再按user进行会话切割
//4、按user进行分组 val sessionRDD: RDD[(String, TrackerSession)] = cookieGroupRDD.flatMapValues { case iter => //处理每个user的日志 val processor = new OneUserTrackerLogsProcessor(iter.toArray) processor.buildSessions() } sessionRDD.collect().foreach(println)b. 这里先抽离出一部分代码,新建OneUserTrackerLogsProcessor,后面再将逻辑结构补上
package com.shaonaiyi.spark.session/** * @Auther: shaonaiyi@163.com * @Date: 2019/12/14 20:38 * @Description: 转化每个user的trackerLogs为trackerSession */class OneUserTrackerLogsProcessor(trackerLogs: Array[TrackerLog]) { def buildSessions() : Array[TrackerSession] = { //1、会话切割 //2、生成会话 Array() }}到目前为止,应该清楚sessionRDD的结构,key是cookie,value是TrackerSession,此过程只不过是将value转化了下。
3. 将日志按时间进行排序a. 用户的日志session间隔超过30分钟,则标记为一个新的session,那就需要对日志进行时间的比较,所以要先将日志进行排序,在OneUserTrackerLogsProcessor添加语句:
private val sortedTrackerLogs = trackerLogs.sortBy(trackerLog => trackerLog.getLogServerTime.toString)4. 切割会话a. 此时OneUserTrackerLogsProcessor的完整代码如下
package com.shaonaiyi.sessionimport com.shaonaiyi.spark.session.{TrackerLog, TrackerSession}import org.apache.commons.lang3.time.FastDateFor ** timport scala.collection.mutable.ArrayBuffer/** * @Auther: shaonaiyi@163.com * @Date: 2019/12/14 20:38 * @Description: 转化每个user的trackerLogs为trackerSession */class OneUserTrackerLogsProcessor(trackerLogs: Array[TrackerLog]) { private val sortedTrackerLogs = trackerLogs.sortBy(trackerLog => trackerLog.getLogServerTime.toString) private val dateFor ** t = FastDateFor ** t.getInstance("yyyy-MM-dd HH:mm:ss") //1、会话切割 val oneCuttingSessionLogs = new ArrayBuffer[TrackerLog]() //存放正在切割会话的所有日志 val initBuilder = ArrayBuffer.newBuilder[ArrayBuffer[TrackerLog]] //存放切割完的会话的所有日志 def buildSessions() : Array[TrackerSession] = { sortedTrackerLogs.foldLeft((initBuilder, Option.empty[TrackerLog])) { case ((builder, preLog), currLog) => val currLogTime = dateFor ** t.parse(currLog.getLogServerTime.toString).getTime if (preLog.nonEmpty && currLogTime - dateFor ** t.parse(preLog.get.getLogServerTime.toString).getTime >= 30 * 60 * 1000) { //切割成新的会话 builder += oneCuttingSessionLogs.clone() oneCuttingSessionLogs.clear() } oneCuttingSessionLogs += currLog (builder, Some(currLog)) } //2、生成会话 Array() }}b. 根据时间排好序后,因为要进行时间比较,而日志的格式是无法进行比较的,所以需要将时间转化为时间戳。此处是使用FastDateFor ** t类,注意引入的类应该是下面这句
import org.apache.commons.lang3.time.FastDateFor ** tc. 判断的两条日志,此处定义为preLog,currLog,只需要进行时间的比较即可,如果比较的第一条日志,因为没有得比较,所以就略过,不需要进行操作。如果后一条日志的时间减去前一条时间的相差30分钟,则将当前遍历到的日志(oneCuttingSessionLogs)往builder里写一份,也就是往initBuilder里写一份,initBuilder存放的是切割完的会话的所有日志。
d. 写一份到initBuilder后,删除oneCuttingSessionLogs的内容,将currLog写入到oneCuttingSessionLogs。
e. 最后返回的已经切分好的会话的所有日志,以及当前的日志。
f. 此时可以再新建一个变量cuttedSessionLogsResult来获得想要的结果
val cuttedSessionLogsResult = sortedTrackerLogs.foldLeft((init.......}._1.result()g. 最后一个会话也要放进去,如果有的话
if (oneCuttingSessionLogs.nonEmpty) { cuttedSessionLogsResult += oneCuttingSessionLogs }最后,一组日志里面,又重新切成了一个又一个的会话,一个会话里面,可以有多条日志。
此时cuttedSessionLogsResult返回的类型为:ArrayBuffer[ArrayBuffer[TrackerLog]]
5. 生成会话目前我们的会话已经切割完成了,现在要将切割后的会话再进行完善,以达到我们想要的TrackerSession,所以我们需要对数据进行整合。回顾上一篇教程:网站用户行为分析项目之会话切割(一) ,我们的目的是得到下面两张表:TrackerLog表,字段为:
log_typelog_server_timecookieipurl
TrackerSession表,字段为:
session_idsession_server_timecookiecookie_labeliplanding_urlpageview_countclick_countdo ** indo ** in_label
所以现在需要一个一个拼凑出来。a. 代码如下:
//2、生成会话 cuttedSessionLogsResult. ** p { case sessionLogs => val session = new TrackerSession() session.setSessionId(UUID.randomUUID().toString) session.setSessionServerTime(sessionLogs.head.getLogServerTime) session.setCookie(sessionLogs.head.getCookie) session.setIp(sessionLogs.head.getIp) val pageviewLogs = sessionLogs.filter(_.getLogType.toString.equals("pageview")) if(pageviewLogs.length == 0) { session.setLandingUrl("-") } else { session.setLandingUrl(pageviewLogs.head.getUrl) } session.setPageviewCount(pageviewLogs.length) val clickLogs = sessionLogs.filter(_.getLogType.toString.equals("click")) session.setClickCount(clickLogs.length) if (pageviewLogs.length == 0) { session.setDo ** in("-") } else { val url = new URL(pageviewLogs.head.getUrl.toString) session.setDo ** in(url.getHost) } session }b. 删除原来的Array(),将buildSessions返回的类型修改为ArrayBuffer
def buildSessions() : ArrayBuffer[TrackerSession] = {6. 当前结果查看至此,还有cookie_label、do ** in_label两个字段没有加进去。
a. 在TrackerSession加上实现序列化接口Serializable。然后执行,得到一下结果。
(cookie2,{"session_id": "38059172-e0aa-4d37-97da-12778a5a ** 55", "session_server_time": "2017-09-04 12:00:01", "cookie": "cookie2", "cookie_label": null, "ip": "127.0.0.4", "landing_url": "https:// ** .baidu.com", "pageview_count": 3, "click_count": 1, "do ** in": " ** .baidu.com", "do ** in_label": null})(cookie1,{"session_id": "218fdf54-8b34-484d-b53b-0769ea5d1421", "session_server_time": "2017-09-04 12:00:00", "cookie": "cookie1", "cookie_label": null, "ip": "127.0.0.3", "landing_url": "https:// ** .baidu.com", "pageview_count": 1, "click_count": 2, "do ** in": " ** .baidu.com", "do ** in_label": null})(cookie1,{"session_id": "ec2f3a38-3335-45f5-99f8-c27947bca687", "session_server_time": "2017-09-04 12:45:01", "cookie": "cookie1", "cookie_label": null, "ip": "127.0.0.3", "landing_url": "https://tieba.baidu.com/index.html", "pageview_count": 1, "click_count": 2, "do ** in": "tieba.baidu.com", "do ** in_label": null})b. 结果讲解parsedLogRDD一共有十条数据,现在是得到3个会话。观察每个会话的pageview_count、click_count两个字段,(3+1)+(1+2)+(1+2)=10条。也就是cookie2有一个会话,此会话里面有3个pageview事件,1个click事件。而cookie1因为它的日志里面,时间间隔有超过30分钟的,所以进行了切分,切分成了两个会话。
7. 实现do ** in_label字段a. 观察前面的会话结果,其实cookie_label、do ** in_label两个字段都还是NULL的,现在我们需要统计一下,先完成do ** in_label,我们的do ** in_label数据量比较小,所以我们可以存放在传统数据库里面。因为这里只是演示,所以我就直接在代码中写死了。在SessionCutETL中添加代码
//网站域名标签数据,此处只是演示,其实可以存放在数据库里 val do ** inLabelMap = Map( " ** .baidu.com" -> "level1", " ** .taobao.com" -> "level2", "jd.com" -> "level3", "youku.com" -> "level4" )b. 因为数据量比较小,所以,还可以将此数据广播出去
//广播 val do ** inLabelMapB = sc.broadcast(do ** inLabelMap)c. 将do ** inLabelMapB传进buildSessions函数,以参数的形式传,修改两行代码为:
processor.buildSessions(do ** inLabelMapB.value)def buildSessions(do ** inLabelMap:Map[String, String]) : ArrayBuffer[TrackerSession] = {d. 设置do ** inLabel,根据do ** in获得相对应的do ** inLabel,没有就用“-”
val do ** inLabel = do ** inLabelMap.getOrElse(session.getDo ** in.toString, "-") session.setDo ** inLabel(do ** inLabel)e. 执行,查看结果,发现标签已经有了
8. 实现cookie_label字段a. 获取cookie_label数据
//5、给会话的cookie打标签 val cookieLabelRDD: RDD[(String, String)] = sc.textFile("data/cookie_label.txt"). ** p { case line => val temp = line.split("\\|") (temp(0), temp(1)) // (cookie, cookie_label) }b. sessionRDD、cookieLabelRDD的key都是cookie,所以可以进行关联,sessionRDD的数据肯定是要的,只不过是加入cookieLabelRDD的数据而已
val joinRDD: RDD[(String,(TrackerSession, Option[String]))] = sessionRDD.leftOuterJoin(cookieLabelRDD) val cookieLabeledSessionRDD: RDD[TrackerSession] = joinRDD. ** p { case (cookie, (session, cookieLabelOpt)) => if (cookieLabelOpt.nonEmpty) { session.setCookieLabel(cookieLabelOpt.get) } else { session.setCookieLabel("-") } session } cookieLabeledSessionRDD.collect().foreach(println)因为左关联后,cookieLabelRDD所对应的value可能是空的,所以对应的应该是Option[String]。
c. 执行,查看结果,发现cookie标签已经有了
9. 保存统计结果a. 因为是以parquet方式保存,所以需要引入一个jar包,勿忘!
<dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.8.1</version> </dependency>b. 保存parsedLogRDD
//6、保存数据 //6.1、保存TrackerLog,对应的是parsedLogRDD val trackerLogOutputPath = "data/output/trackerLog" AvroWriteSupport.setSche ** (sc.hadoopConfiguration, TrackerLog.SCHEMA$) parsedLogRDD. ** p((null, _)).saveAsNewAPIHadoopFile(trackerLogOutputPath, classOf[Void], classOf[TrackerLog], classOf[AvroParquetOutputFor ** t[TrackerLog]] )c. 保存cookieLabeledSessionRDD
//6.2、保存TrackerSession,对应的是cookieLabeledSessionRDD val trackerSessionOutputPath = "data/output/trackerSession" AvroWriteSupport.setSche ** (sc.hadoopConfiguration, TrackerSession.SCHEMA$) cookieLabeledSessionRDD. ** p((null, _)).saveAsNewAPIHadoopFile(trackerSessionOutputPath, classOf[Void], classOf[TrackerSession], classOf[AvroParquetOutputFor ** t[TrackerSession]] )d. 然后执行,发现报错,第一个错是一直都有的,第二个错是新的。
10. 解决报错a. 请查看本博主另一篇文章:Windows本地安装Hadoop
0x03 结果展示a. 删除报错时所生成的文件夹,不然会报错
b. 删除data/output/trackerLog文件夹,然后重新执行,即可得到想要的答案
0xFF 总结数据转化的过程比较繁琐,想要自己多动手尝试,了解其来龙去脉,反复看多几遍。请点赞关注,获取网站用户行为分析项目系列全教程!作者简介:邵奈一 全栈工程师、市场洞察者、专栏编辑 | 公众号 | 微信 | 微博 | CSDN | 简书 |
福利:邵奈一的技术博客导航邵奈一 原创不易,如转载请标明出处。
扫码咨询与免费使用
申请免费使用