Spark的分区机制的应用及PageRank算法的实现

Stella981
• 阅读 803

佩奇排名(PageRank),又称网页排名谷歌左侧排名,是一种由搜索引擎根据网页之间相互的超链接计算的技术,而作为网页排名的要素之一,以Google公司创办人拉里·佩奇(Larry Page)之姓来命名。Google用它来体现网页的相关性和重要性,在搜索引擎优化操作中是经常被用来评估网页优化的成效因素之一。

概念

Spark中有一个很重要的特性是对数据集在节点间的分区进行控制,因为在分布式系统中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能,Spark程序可以通过控制RDD分区方式来减少通信开销。分区适用于那种基于类似join操作基于键的操作,并且一方的RDD数据是比较少变动且需要多次扫描的情况,这个时候可以对这个RDD做一个分区,最常用的是用Hash来进行分区,比如可以对RDD分100个区,此时spark会用每个键的hash值对100取模,然后把相同结果的放到同一个节点上。

Spark分区的讲解

现在用一个例子(来自《Learning Spark: Lightning-Fast Big Data Analysis》一书)来说明一下:

 1// Initialization code; we load the user info from a Hadoop SequenceFile on HDFS. 2// This distributes elements of userData by the HDFS block where they are found, 3// and doesn't provide Spark with any way of knowing in which partition a 4// particular UserID is located. 5val sc = new SparkContext(...) 6val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist() 7 8// Function called periodically to process a logfile of events in the past 5 minutes; 9// we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.10def processNewLogs(logFileName: String) {11  val events = sc.sequenceFile[UserID, LinkInfo](logFileName)12  val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs13  val offTopicVisits = joined.filter {14    case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components15      !userInfo.topics.contains(linkInfo.topic)16  }.count()17  println("Number of visits to non-subscribed topics: " + offTopicVisits)18}

上面的例子中,有两个RDD,userData的键值对是(UserID, UserInfo),UserInfo包含了一个该用户订阅的主题的列表,该程序会周期性地将这张表与一个小文件进行组合,这个小文件中存着过去五分钟内某个网站各用户的访问情况,由(UserID, LinkInfo)。现在,我们需要对用户访问其未订阅主题的页面进行统计。可以通过Spark的join()操作来完成这个功能,其中需要把UserInfo和LinkInfo的有序对根据UserID进行分组,如上代码。

可以看出,因为每次调用processNewLogs()时都需要执行一次join()操作,但是数据具体的shuffle对我们来说却是不可控的,也就是我们不知道spark是如何进行分区的。spark默认在执行join()的时候会将两个RDD的键的hash值都算出来,然后将该hash值通过网络传输到同一个节点上进行相同键值的记录的连接操作,如下图所示:

Spark的分区机制的应用及PageRank算法的实现

因为userData这个RDD里面的数据是几乎不会变动的,或者说是极少会变动的,且它的内容也比events大很多,所以每次都要对它进行shuffle的话,是没有必要且浪费时间的,实际上只需要进行一次shuffle就可以了。

所以,可以通过预先分区来解决这个问题:在进行join()之前,对userData使用partitionBy()转化操作,把它变成一个哈希分区的RDD:

1val sc = new SparkContext(...)2val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")3                 .partitionBy(new HashPartitioner(100))   // Create 100 partitions4                 .persist()

调用partitionBy()之后,spark就可以预先知道这个RDD是已经进行过哈希分区的了,等到执行join()之时,它就会利用这一点:只对events进行shuffle,将events中特定UserID的记录发送到userData对应分区的机器节点上去。这样的话,就减少了大量的重复的网络通信,程序性能也会大大提高。改进后的程序的执行过程如下:

Spark的分区机制的应用及PageRank算法的实现

还有一点,你们可能注意到了新代码里最后还调用了一个persist()方法,这是另一个小优化点:对于那些数据不常变动且数据量较大的RDD,在进行诸如join()这种连接操作的时候尽量用persist()来做缓存,以提高性能。另外,分区数目的设置也有讲究,分区数目决定了这个任务在执行连接操作时的并行度,所以一般来说这个数目应该和集群中的总核心数保持一致。

最后,可能有人会问,能不能对events也进行分区进一步提高程序性能?这是没有必要的,因为events RDD是本地变量,每次执行都会更新,所以对它进行分区没有意义,即便对这种一次性变量进行分区,spark依然需要进行一次shuffle,所以,这是没有必要的。

使用分区来加快PageRank算法

PageRank算法是一种从RDD分区获益的更复杂的算法,下面我们用它为例来进一步讲解Spark分区的使用。

如果有不清楚的PageRank算法的具体实现的可以参考我以前的一篇文章:hadoop下基于mapreduce实现pagerank算法

PageRank是一个迭代算法,因此它是一个能从RDD分区中获得性能加速的很好的例子,先上代码:

 1// Assume that our neighbor list was saved as a Spark objectFile 2val links = sc.objectFile[(String, Seq[String])]("links") 3              .partitionBy(new HashPartitioner(100)) 4              .persist() 5 6// Initialize each page's rank to 1.0; since we use mapValues, the resulting RDD 7// will have the same partitioner as links 8var ranks = links.mapValues(v => 1.0) 910// Run 10 iterations of PageRank11for (i <- 0 until 10) {12  val contributions = links.join(ranks).flatMap {13    case (pageId, (links, rank)) =>14      links.map(dest => (dest, rank / links.size))15  }16  ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)17}1819// Write out the final ranks20ranks.saveAsTextFile("ranks")

这个算法维护两个RDD,一个的键值对是(pageID, linkList),包含了每个页面的出链指向的相邻页面列表(由pageID组成);另一个的键值对是(pageID, rank),包含了每个页面的当前权重值。算法流程如下:

  1. 将每个页面的权重值初始化为1.0;

  2. 在每次迭代中,对页面p,向其每个出链指向的页面加上一个rank(p) / neighborsSize(p)的贡献值contributionReceived;

  3. 将每个页面的权重值设置为:0.15 + 0.85 * contributionReceived。

不断迭代步骤2和3,过程中算法会逐渐收敛于每个页面的实际PageRank值,实际运行之时大概迭代10+次以上即可。

算法将ranksRDD的每个元素的值设置为1.0,然后在每次迭代中不断更新ranks变量:首先对ranksRDD和静态的linksRDD进行一次join()操作,来获取每个页面ID对应的相邻页面列表和当前的权重值,然后使用flatMap创建出『contributions』来记录每个页面对各相邻页面的贡献值。然后再把这些贡献值按照pageID分别累加起来,把该页面的权重值设为0.15 + 0.85 * contributionsReceived。

接下来分析下上述代码做的的一些优化点:

  1. linksRDD在每次迭代中都会和ranks发生连接操作,由于links是一个静态RDD(数据几乎不会变动),所以在一开始可以对它进行分区以减少网络shuffle,降低网络通信的开销。而且,linksRDD的字节数一般来说也会比ranks大很多,因为这个RDD包含了每个页面的出链指向的页面列表,类似于一个笛卡尔积的数量级。所以通过预先分区可以获得比原算法的普通MapReduce实现更好的性能;

  2. 用persist()方法缓存RDD,使得在每次迭代里都可以复用,进一步提高性能;

  3. 第一次创建ranks时,使用mapValues()而不是map(),保留了父RDD(links)的分区方式(因为map操作理论上可能会修改键值导致父RDD的分区不可用,所以map操作不保留父RDD的分区),这样第一次的join()操作的开销也会更小;

  4. 在循环体中,调用reduceByKey()后使用mapValues();因为reduceByKey()的结果已经是哈希分区的了,这样一来,下一次循环中将映射操作的结果再次与links进行连接时就会更加高效。

参考

https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

本文分享自微信公众号 - 潘建锋(R136a1_Pan)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
java实现 PageRank算法
 PageRank算法是Google的核心搜索算法,在所有链接型文档搜索中有极大用处,而且在我们的各种关联系统中都有好的用法,比如专家评分系统,微博搜索/排名,SNS系统等。  PageRank算法的依据或思想:   1,被重要的网页链接的越多(外链) ,此网页就越重要   2,此网页对外的链接越少越重要   这两个依据不能
Stella981 Stella981
2年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这