Run WordCount with Scala and Spark on HDInsight

Previously we tried to solve the word count problem with a Scala and Spark approach.
The next step is to deploy our solution to HDInsight using spark, hdfs, and scala

We shall provision a Sprak cluster.

screenshot-from-2017-02-22-23-12-22

Since we are going to use HDInsight we can utilize hdfs and therefore use the azure storage.

screenshot-from-2017-02-22-23-12-59

Then we choose our instance types.

screenshot-from-2017-02-22-23-13-21

And we are ready to create the Spark cluster.

screenshot-from-2017-02-22-23-13-55

Our data shall be uploaded to the hdfs file system
To do so we will upload our text files to the azure storage account which is integrated with hdfs.

For more information on managing a storage account with azure cli check the official guide. Any text file will work.

azure storage blob upload mytextfile.txt sparkclusterscala  example/data/mytextfile.txt

Since we use hdfs we shall make some changes to the original script

val text = sc.textFile("wasb:///example/data/mytextfile.txt")
val counts = text.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.collect

Then we can upload our scala class to the head node using ssh

scp WordCountscala.scala demon@{your cluster}-ssh.azurehdinsight.net:/home/demo/WordCountscala.scala

Again in order to run the script, things are pretty straightforward.

spark-shell -i WordCountscala.scala

And once the task is done we are presented with the spark prompt. Plus we can now save our results to the hdfs file system.

scala> counts.saveAsTextFile("/wordcount_results")

And do a quick check.

hdfs dfs -ls /wordcount_results/
hdfs dfs -text /wordcount_results/part-00000

Run Scala implemented Storm topologies on HDInsight

Previously we set up a Scala implemented storm topology in order to count words.

What comes next is uploading our topology to HDInsight.

So we shall proceed in creating a Storm topology on HDInsight.

screenshot-from-2017-02-22-07-10-08

Then we choose the instance types.

screenshot-from-2017-02-22-07-32-37

Next step is to upload our jar file to the head node in order to deploy it. We can use scp for this purpose.

scp target/scala-2.12/ScalaStorm-assembly-1.0.jar  {your user}@{your azure endpoint}:/home/demo

Now we can ssh to our storm cluster’s head node and issue the storm command.

storm jar ScalaStorm-assembly-1.0.jar com.gkatzioura.scala.storm.WordCountTopology word-count-stream-scala

Then we can check our topology by navigating to https://{your cluster}.azurehdinsight.net/stormui

WordCount with Storm and Scala

Apache Storm is a free and open source distributed realtime computation system running on the JVM.
To get started we will implement a very simple example. Previously we implemented a word count hadoop job using scala and we uploaded it to hdinsight.
We will focus on the same word count concept but for real time cases and implement a word count topology utilizing apache storm. Our source code will be based on the official storm examples.

Storm works with spouts and bolts.

First We shall implement a spout which will emit fake data events. In our case sentences.

package com.gkatzioura.scala.storm


import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.{Fields, Values}
import org.apache.storm.utils.Utils

import scala.util.Random

/**
  * Created by gkatzioura on 2/17/17.
  */
class RandomSentenceSpout extends BaseRichSpout {

  var _collector:SpoutOutputCollector = _
  var _rand:Random = _

  override def nextTuple(): Unit = {

    Utils.sleep(100)

    val sentences = Array("the cow jumped over the moon","an apple a day keeps the doctor away",
      "four score and seven years ago","snow white and the seven dwarfs","i am at two with nature")
    val sentence = sentences(_rand.nextInt(sentences.length))
    _collector.emit(new Values(sentence))
  }

  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
    _collector = collector
    _rand = Random
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("word"))
  }

}

Next step is to implement a bolt which splits the sentences and emits them.

package com.gkatzioura.scala.storm

import java.text.BreakIterator

import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}

/**
  * Created by gkatzioura on 2/18/17.
  */
class SplitSentenceBolt extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {

    val sentence = input.getString(0)
    val boundary = BreakIterator.getWordInstance

    boundary.setText(sentence)
    var start = boundary.first
    var end:Int = start

    while(end!=BreakIterator.DONE) {

      end = boundary.next
      val word = sentence.substring(start,end).replaceAll("\\s+","")
      start = end
      if(!word.equals("")) {
        collector.emit(new Values(word))
      }
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("word"))
  }
}

And the last step is the word count bolt.

package com.gkatzioura.scala.storm

import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}

/**
  * Created by gkatzioura on 2/18/17.
  */
class WordCountBolt extends BaseBasicBolt{

  val counts = scala.collection.mutable.Map[String,Int]()

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {

    val word = input.getString(0)

    val optCount = counts.get(word)
    if(optCount.isEmpty) {
      counts.put(word,1)
    } else {
      counts.put(word,optCount.get+1)
    }

    collector.emit(new Values(word,counts))
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {

    declarer.declare(new Fields("word","count"));
  }

}

The final step is to create our topology which takes care whether we run locally or in a cluster environment.

package com.gkatzioura.scala.storm

import org.apache.storm.{Config, LocalCluster, StormSubmitter}
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields

/**
  * Created by gkatzioura on 2/18/17.
  */
object WordCountTopology  {

  def main(args: Array[String]): Unit = {
    println("Hello, world!")
    val builder = new TopologyBuilder
    builder.setSpout("spout", new RandomSentenceSpout, 5)
    builder.setBolt("split", new SplitSentenceBolt, 8).shuffleGrouping("spout")
    builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split", new Fields("word"))

    val conf = new Config()
    conf.setDebug(true)

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3)
      StormSubmitter.submitTopology(args(0), conf, builder.createTopology())
    }
    else {
      conf.setMaxTaskParallelism(3)
      val cluster = new LocalCluster
      cluster.submitTopology("word-count", conf, builder.createTopology())
      Thread.sleep(10000)
      cluster.shutdown()
    }
  }
}

Now we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt file.

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Our sbt file is as follows

name := "ScalaStorm"

version := "1.0"

scalaVersion := "2.12.1"

scalacOptions += "-Yresolve-term-conflict:package"

libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided"

And then we issue a build

sbt clean compile assembly

You can find the sourcecode on github.

On the next post we shall deploy our Storm app to HDInsight.