Changing the Spark Context of an existing RDD

Spark¬†RDDs are supposed to be Resilient. If something bad happens whilst computing, we can recover! At least, that’s the idea.

scala> val myRdd = sc.parallelize(Seq(1,2,3))
myRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12
scala> sc.stop

If we stop the spark context for any reason, we now find our RDD is useless!

scala> myRdd.first
java.lang.IllegalStateException: SparkContext has been shutdown
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1316)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1339)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1353)
at org.apache.spark.rdd.RDD.take(RDD.scala:1098)

This isn’t good at all! Let’s make a new spark context.

scala> val sc = new org.apache.spark.SparkContext("local[8]", "new context")
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@542beecb

We now need to inject this back into our RDD. The spark context is stored in a private field, so we have to reach for reflection.

val rddClass = classOf[org.apache.spark.rdd.RDD[_]]
val scField = rddClass.getDeclaredField("_sc") // spark context stored in _sc
scField.setAccessible(true) // now we can access it

Now we just set the spark context.

scField.set(myRdd, sc)

Observe that this works!

scala> myRdd.sum
res5: Double = 6.0
scala> myRdd.first
res6: Int = 1

This is quite scary and probably should not be used for anything real. Additionally we had an RDD with many dependencies, we’d have to crawl the the dependencies and swap it out in every place (I think).


Another approach might be to instead produce a dynamic proxy for the spark context which allows you to point at some true spark context, and then just swap it out there.

What are we actually trying to do here? If we have a long-running application which allows users to create RDDs, it would be nice to be able to recover from spark cluster bounces. We could keep track of the operations required to produce the RDDs in the first place (which is arguably a better approach) but I decided to spend thirty minutes poking around anyway, and was pleasantly surprised at the (illusion of) progress I made!

Changing the Spark Context of an existing RDD

Connecting to Vertica from Spark

So you have a lot of data in vertica, and you want to do analytics beyond what’s easily expressible in vSQL, at scale, without writing nasty C++ UDFs; or perhaps you already have a lot of data already sitting in HDFS to join against.

Enter spark.

1. Grab the vertica jbdc drivers and hadoop connectors from the vertica support portal and put them on your spark classpath (e.g. via ADD_JARS)

2. Use something like this class

import org.apache.spark.rdd.RDD
import com.vertica.hadoop._
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.conf.Configuration
class Vertica(val hostnames: String,
              val database: String,
              val username: String,
              val password: String,
              val port: String = "5433") extends Serializable {

    def configuration:Configuration = {
        val conf = new Configuration
        conf.set("mapred.vertica.hostnames", hostnames)
        conf.set("mapred.vertica.database", database)
        conf.set("mapred.vertica.username", username)
        conf.set("mapred.vertica.password", password)
        conf.set("mapred.vertica.port", port)

    def query(sql: String):RDD[VerticaRecord] = {
        val job = new Job(configuration)

        VerticaInputFormat.setInput(job, sql)
        sc.newAPIHadoopRDD(job.getConfiguration, classOf[VerticaInputFormat], classOf[LongWritable], classOf[VerticaRecord]).map(_._2)

3. Voilà!

val vertica = new Vertica("my-node-1,my-node-2,my-node-3", "my-db", "username", "password")
val v:RDD[VerticaRecord] = vertica.query("select date, category, sum(amount) from my_transaction_table group by date, category;")
Connecting to Vertica from Spark