Apache SparkGetting started with Spark (now shipping inside CDH 5) is easy using this simple example.

Apache Spark is a general-purpose, cluster computing framework that, like MapReduce in Apache Hadoop, offers powerful abstractions for processing large datasets. For various reasons pertaining to performance, functionality, and APIs, Spark is already becoming more popular than MapReduce for certain types of workloads. (For more background about Spark, read this post.)

In this how-to, you’ll learn how to write, compile, and run a simple Spark program written in Scala on CDH 5 (in which Spark ships and is supported by Cloudera). The full code for the example is hosted at https://github.com/sryza/simplesparkapp.

Writing

Our example app will be a souped-up version of WordCount, the classic MapReduce example. In WordCount, the goal is to learn the distribution of letters in the most popular words in our corpus. That is, we want to:

  1. Read an input set of text documents
  2. Count the number of times each word appears
  3. Filter out all words that show up less than a million times
  4. For the remaining set, count the number of times each letter occurs

In MapReduce, this would require two MapReduce jobs, as well as persisting the intermediate data to HDFS in between them. In constrast, in Spark, you can write a single job in about 90 percent fewer lines of code.

Our input is a huge text file where each line contains all the words in a document, stripped of punctuation. The full Scala program looks like this:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setAppName(“Spark Count”))
val threshold = args(1).toInt

// split each document into words
val tokenized = sc.textFile(args(0)).flatMap(_.split(” “))

// count the occurrence of each word
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)

// filter out words with less than threshold occurrences
val filtered = wordCounts.filter(_._2 >= threshold)

// count characters
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)

System.out.println(charCounts.collect().mkString(“, “))
}
}

Spark uses “lazy evaluation”, meaning that transformations don’t execute on the cluster until an “action” operation is invoked. Examples of action operations are collect, which pulls data to the client, and saveAsTextFile, which writes data to a filesystem like HDFS. Read more