Changing the Spark Context of an existing RDD

Spark¬†RDDs are supposed to be Resilient. If something bad happens whilst computing, we can recover! At least, that’s the idea.

scala> val myRdd = sc.parallelize(Seq(1,2,3))
myRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12
scala> sc.stop

If we stop the spark context for any reason, we now find our RDD is useless!

scala> myRdd.first
java.lang.IllegalStateException: SparkContext has been shutdown
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1316)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1339)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1353)
at org.apache.spark.rdd.RDD.take(RDD.scala:1098)

This isn’t good at all! Let’s make a new spark context.

scala> val sc = new org.apache.spark.SparkContext("local[8]", "new context")
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@542beecb

We now need to inject this back into our RDD. The spark context is stored in a private field, so we have to reach for reflection.

val rddClass = classOf[org.apache.spark.rdd.RDD[_]]
val scField = rddClass.getDeclaredField("_sc") // spark context stored in _sc
scField.setAccessible(true) // now we can access it

Now we just set the spark context.

scField.set(myRdd, sc)

Observe that this works!

scala> myRdd.sum
res5: Double = 6.0
scala> myRdd.first
res6: Int = 1

This is quite scary and probably should not be used for anything real. Additionally we had an RDD with many dependencies, we’d have to crawl the the dependencies and swap it out in every place (I think).

Sparks

Another approach might be to instead produce a dynamic proxy for the spark context which allows you to point at some true spark context, and then just swap it out there.

What are we actually trying to do here? If we have a long-running application which allows users to create RDDs, it would be nice to be able to recover from spark cluster bounces. We could keep track of the operations required to produce the RDDs in the first place (which is arguably a better approach) but I decided to spend thirty minutes poking around anyway, and was pleasantly surprised at the (illusion of) progress I made!

Changing the Spark Context of an existing RDD

How Spark does Class Loading

Using the spark shell, one can define classes on-the-fly and then use these classes in your distributed computation.

Contrived Example
scala> class Vector2D(val x: Double, val y: Double) extends Serializable {
| def length = Math.sqrt(x*x + y*y)
| }
defined class Vector2D
scala> val sourceRDD = sc.parallelize(Seq((3,4), (5,12), (8,15), (7,24)))
sourceRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[5] at parallelize at :13
scala> sourceRDD.map(x => new Vector2D(x._1, x._2)).map(_.length).collect()
14/03/30 09:21:59 INFO SparkContext: Starting job: collect at :17
...snip...
res1: Array[Double] = Array(5.0, 13.0, 17.0, 25.0)

In order for the remote executors here to actually run your code, they must have knowledge of the Vector2D class, yet they’re running on a different JVM (and probably different physical machine). How do they get it?

  • we choose a directory on disk to store the class files
  • a virtual directory is created at SparkIMain:101
  • a scala compiler is instantiated with this directory as the output directory at SparkIMain:299
  • this means that whenever a class is defined in the REPL, the class file is written to disk
  • a http server is created to serve the contents of this directory at SparkIMain:102
  • we can see info about the Http server in the logs:
    14/03/23 23:39:21 INFO HttpFileServer: HTTP File server directory is /var/folders/8t/bc2vylk13j14j13cccpv9r6r0000gn/T/spark-1c7fbed7-5c87-4c2c-89e8-be95c2c7ac54
    14/03/23 23:39:21 INFO Executor: Using REPL class URI: http://192.168.1.83:61626
  • the http server url is stored in the Spark Config, which is shipped out to the executors
  • the executors install a URL Classloader, pointing at the Http Class Server at Executor:74

For the curious, we can figure out what the url of a particular class is and then go check it out in a browser/with curl.

def urlOf[T:ClassTag] = {
   val clazz = implicitly[ClassTag[T]].erasure
   Seq(sc.getConf.get("spark.repl.class.uri"),
       clazz.getPackage.getName,
       clazz.getName).mkString("/")
}

Do it yourself

It’s pretty trivial to replicate this ourselves – in Spark’s case we have a scala compiler which writes the files to disk, but assuming we want to serve classes from a fairly normal JVM with a fairly standard classloader, we don’t even need to bother with the to disk. We can grab the class file using getResourceAsStream. It also doesn’t require any magic of scala – an example class server in java using Jetty:

class ClasspathClassServer {
	private Server server = null;
	private int port = -1;

	void start() throws Exception {
		System.out.println("Starting server...");
		if(server != null) {
			return;
		}

		server = new Server();
		NetworkTrafficSelectChannelConnector connector = new NetworkTrafficSelectChannelConnector(server);
		connector.setPort(0);
		server.addConnector(connector);

		ClasspathResourceHandler classpath = new ClasspathResourceHandler();

		server.setHandler(classpath);
		server.start();

		port = connector.getLocalPort();
		System.out.println("Running on port " + port);
	}

	class ClasspathResourceHandler extends AbstractHandler {
		@Override
		public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
					throws IOException, ServletException {
			System.out.println("Serving target: " + target);

			try {
				Class<?> clazz = Class.forName(target.substring(1));
				InputStream classStream = clazz.getResourceAsStream('/' + clazz.getName().replace('.', '/') + ".class");

				response.setContentType("text/html;charset=utf-8");
				response.setStatus(HttpServletResponse.SC_OK);
				baseRequest.setHandled(true);

				OutputStream os = response.getOutputStream();

				IOUtils.copy(classStream, os);
			} catch(Exception e) {
				System.out.println("Exception: " + e.getMessage());
				baseRequest.setHandled(false);
			}
		}
	}
}

It’s then just a matter of setting up a URL Classloader on the other side!

Further Examples

http://quoiquilensoit.blogspot.com/2008/02/generic-compute-server-in-scala-with.html: An example of using a similar technique to write a ‘compute server’ in scala – somewhat akin to a very stripped down version of Spark.

How Spark does Class Loading

Connecting to Vertica from Spark

So you have a lot of data in vertica, and you want to do analytics beyond what’s easily expressible in vSQL, at scale, without writing nasty C++ UDFs; or perhaps you already have a lot of data already sitting in HDFS to join against.

Enter spark.

1. Grab the vertica jbdc drivers and hadoop connectors from the vertica support portal and put them on your spark classpath (e.g. via ADD_JARS)

2. Use something like this class

import org.apache.spark.rdd.RDD
import org.apache.hadoop.io.LongWritable
import com.vertica.hadoop._
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.conf.Configuration
class Vertica(val hostnames: String,
              val database: String,
              val username: String,
              val password: String,
              val port: String = "5433") extends Serializable {

    def configuration:Configuration = {
        val conf = new Configuration
        conf.set("mapred.vertica.hostnames", hostnames)
        conf.set("mapred.vertica.database", database)
        conf.set("mapred.vertica.username", username)
        conf.set("mapred.vertica.password", password)
        conf.set("mapred.vertica.port", port)
        conf
    }

    def query(sql: String):RDD[VerticaRecord] = {
        val job = new Job(configuration)
        job.setInputFormatClass(classOf[VerticaInputFormat])

        VerticaInputFormat.setInput(job, sql)
        sc.newAPIHadoopRDD(job.getConfiguration, classOf[VerticaInputFormat], classOf[LongWritable], classOf[VerticaRecord]).map(_._2)
    }
}

3. Voilà!

val vertica = new Vertica("my-node-1,my-node-2,my-node-3", "my-db", "username", "password")
val v:RDD[VerticaRecord] = vertica.query("select date, category, sum(amount) from my_transaction_table group by date, category;")
Connecting to Vertica from Spark