博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark 实现自己的RDD,让代码更优雅
阅读量:6610 次
发布时间:2019-06-24

本文共 3740 字,大约阅读时间需要 12 分钟。

hot3.png

你是否在最初书写spark的代码时总是使用object 是否在为代码的重复而忧心,接下来的博客中,我会专注于spark代码简洁性。

1,什么事RDD,官网上有很全面的解释,在此不再赘述,不过我们需要从代码层面上理解什么事RDD,如果他是一个类,他又有哪些重要的属性和方法,现在列出以下几点:

    1)partitions():Get the array of partitions of this RDD, taking into account whether the

RDD is checkpointed or not. Partition是一个特质,分布在每一个excutor上的分区,都会有一个Partition实现类去做唯一标识。

    2)iterator():Internal method to this RDD; will read from cache if applicable, or otherwise compute it. This should not be called by users directly, but is available for implementors of custom subclasses of RDD. 这是一个RDD的迭代器,传入的参数是Partition和TaskContext,这样就可以在每一个Partition上执行相应的逻辑了。

    3)dependencies():Get the list of dependencies of this RDD,在1.6中,Dependency共有如下几个继承类,后续博文会详解它,感兴趣的读者可以直接阅读源码进一步了解

            180902_ApZb_2494265.png

    4)partitioner():此函数返回一个Option[Partitioner],如果RDD不是key-value pair RDD类型的数据,那么为None,我们和以自己实现这个抽象类。当时看到这里,我就在想为什么不能实现一个特质,而要用

抽象类,个人理解这是属于面向对象的东西了,类是实体的抽象爱,而接口则定义一些行为。

    5)preferredLocations():Optionally overridden by subclasses to specify placement preferences.

 

下面我们自己实现一个和Mysql交互的RDD,只涉及到上面说的部分函数,当然在生产环境中不建议这样做,除非你自己想把自己的mysql搞挂,此处只是演示,对于像Hbase之类的分布式数据库,逻辑类似。

package com.hypers.rddimport java.sql.{Connection, ResultSet}import org.apache.spark.annotation.DeveloperApiimport org.apache.spark.rdd.RDDimport org.apache.spark.{Logging, Partition, SparkContext, TaskContext}import scala.reflect.ClassTag//TODO 去重class HFAJdbcRDD[T: ClassTag]    (sc: SparkContext,     connection: () => Connection, //method     sql: String,     numPartittions: Int,     mapRow: (ResultSet) => T) extends RDD[T](sc, Nil) with Logging {    /**      * 若是这个Rdd是有父RDD 那么 compute一般会调用到iterator方法 将taskContext传递出去      * @param thePart      * @param context      * @return      */    @DeveloperApi    override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new Iterator[T] {        val part = thePart.asInstanceOf[HFAJdbcPartition]        val conn = connection()        //如果直接执行sql会使数据重复,因此此处使用分页        val stmt = conn.prepareStatement(String.format("%s limit %s,1",sql,thePart.index.toString), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)        logInfo("Get sql data size is " + stmt.getFetchSize)        val rs: ResultSet = stmt.executeQuery()        override def hasNext: Boolean = {            if(rs.next()){                true            }else{                conn.close()                false            }        }        override def next(): T = {            mapRow(rs)        }    }    /**      * 将一些信息传递到compute方法 例如sql limit 的参数      * @return      */    override protected def getPartitions: Array[Partition] = {        (0 until numPartittions).map { inx =>            new HFAJdbcPartition(inx)        }.toArray    }}private class HFAJdbcPartition(inx: Int) extends Partition {    override def index: Int = inx}

 

package com.hypers.rdd.executeimport java.sql.{DriverManager, ResultSet}import com.hypers.commons.spark.BaseJobimport com.hypers.rdd.HFAJdbcRDD//BaseJob里面做了sc的初始化,在此不做演示,您也可以自己new出sparkContextobject HFAJdbcTest extends BaseJob {    def main(args: Array[String]) {        HFAJdbcTest(args)    }    override def apply(args: Array[String]): Unit = {        val jdbcRdd = new HFAJdbcRDD[Tuple2[Int, String]](sc,            getConnection,            "select id,name from user where id<10",            3,            reseultHandler        )        logger.info("count is " + jdbcRdd.count())        logger.info("count keys " + jdbcRdd.keys.collect().toList)    }    def getConnection() = {        Class.forName("com.mysql.jdbc.Driver").newInstance()        DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")    }    def reseultHandler(rs: ResultSet): Tuple2[Int, String] = {        rs.getInt("id") -> rs.getString("name")    }}

 

转载于:https://my.oschina.net/u/2494265/blog/825151

你可能感兴趣的文章
【Lucene】Lucene通过CustomScoreQuery实现自定义评分
查看>>
linux 内核网络,数据接收流程图
查看>>
我的友情链接
查看>>
在windows下与linux虚拟机进行文件共享
查看>>
php 图形用户界面GUI 开发
查看>>
正则表达式详解
查看>>
linux文件与目录之权限对比
查看>>
LeetCode问题5
查看>>
AIX系列------ISO挂载
查看>>
如何打开被管理员禁止的注册表编辑器
查看>>
java根据经纬度计算距离
查看>>
MYSQL简单主从复制原理及实现
查看>>
U-Mail邮件服务器安全高效为政府信息化奠定基石
查看>>
sqlplus登陆方式说明
查看>>
窝里斗,只给微软看笑话
查看>>
递归函数打印斐波那契数列
查看>>
Too many open files 问题的解决
查看>>
CF976D. Degree Set
查看>>
I2C总线介绍及AT24C02驱动编写 笔记
查看>>
我的友情链接
查看>>