WordCount on Hadoop with Scala

Hadoop is a great technology built with java.

Today we will use Scala to implement a simple map reduce job and then run it using HDInsight.

We shall add the assembly plugin on our assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")


Then we will add the Hadoop core dependency on our build.sbt file. Also will we apply some configuration in the merge strategy to avoid deduplicate errors.



assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"

We will use WordCount as an example.
The original Java class shall be transformed to a Scala class.

package com.gkatzioura.scala

import java.lang.Iterable
import java.util.StringTokenizer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
import scala.collection.JavaConverters._

/**
  * Created by gkatzioura on 2/14/17.
  */
package object WordCount {

  class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] {

    val one = new IntWritable(1)
    val word = new Text()

    override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context): Unit = {
      val itr = new StringTokenizer(value.toString)
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken())
        context.write(word, one)
      }
    }
  }

  class IntSumReader extends Reducer[Text,IntWritable,Text,IntWritable] {
    override def reduce(key: Text, values: Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = {
      var sum = values.asScala.foldLeft(0)(_ + _.get)
      context.write(key, new IntWritable(sum))
    }
  }


  def main(args: Array[String]): Unit = {
    val configuration = new Configuration
    val job = Job.getInstance(configuration,"word count")
    job.setJarByClass(this.getClass)
    job.setMapperClass(classOf[TokenizerMapper])
    job.setCombinerClass(classOf[IntSumReader])
    job.setReducerClass(classOf[IntSumReader])
    job.setOutputKeyClass(classOf[Text])
    job.setOutputKeyClass(classOf[Text]);
    job.setOutputValueClass(classOf[IntWritable]);
    FileInputFormat.addInputPath(job, new Path(args(0)))
    FileOutputFormat.setOutputPath(job, new Path(args(1)))
    System.exit(if(job.waitForCompletion(true))  0 else 1)
  }

}

Then we will build our example

sbt clean compile assembly

Our new jar will reside on target/scala-2.12/ScalaHadoop-assembly-1.0.jar

On the next post we shall run our code using Azure’s HDInsight.

You can find the code on github.

6 thoughts on “WordCount on Hadoop with Scala

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.