Big Data/Analytics Zone is brought to you in partnership with:

Enterprise Architect in HCL Technologies a $7Billion IT services organization. My role is to work as a Technology Partner for large enterprise customers providing them low cost opensource solutions around Java, Spring and vFabric stack. I am also working on various projects involving, Cloud base solution, Mobile application and Business Analytics around Spring and vFabric space. Over 23 yrs, I have build repository of technologies and tools I liked and used extensively in my day to day work. In this blog, I am putting all these best practices and tools so that it will help the people who visit my website. Krishna is a DZone MVB and is not an employee of DZone and has posted 64 posts at DZone. You can read more from them at their website. View Full User Profile

ScalaTest a MapReduce using Akka

01.21.2013
| 2614 views |
  • submit to reddit

For people in hurry here is the MapReduce with ScalaTest and Akka code and steps

I was trying to learn Scala and I wanted to kill several birds in one shot. Let me tell you, I am not disappointed -- I feel comfortable working with Scala. If you are coming from Java world, Scala is comparatively more complex, but once you get past initial hurdle you will like it. I wanted to learn

One usecase I wanted to tryout was a simple Word Count MapReduce, this is a hello world ofMapReduce. MapReduce is a function programming technique popularly associated with Hadoop Parallel computing. There is a good MapReduce example using Java and Akka. There isa decent MapReduce example, and an other one here and an another one. Below diagram from the source describe the flow,

Word Count MapReduce with Akka and Java by Munish Gupta

Word Count MapReduce with Akka and Java by Munish Gupta

In this example, I am taking advantage of Akka‘s Actor supports for breaking chunk of tasks and processing in parallel and aggregate the final results.

For a starter, SBT is a build tool similar to Maven, extensively used for Scala development. Refer project/plugins.sbt, this has integration with Eclipse IDE. Once you get the code from github, run the below command, you notice 2 files .project and .classpath got generated.

sbt eclipse

Now import the project in Eclipse as Import => “Existing project into workspace”. Once the project is imported into Eclipse, we can take advantage of IntelliSense and other IDE features and develop the application in a easy way compared to writing the scala code in TextPad.

As always, I will start writing a test, the lowest level test is the aggregation test, which takes a map of words and the number of times it has occurred and aggregates it. I used WordSpec for this ScalaTest as below,

"Aggregrate actor" must {
"send back Map message" in {
// create the aggregate Actor
val aggregateActor = system.actorOf(Props[AggregateActor]);
var map: Map[String, Int] = Map[String, Int]("ak" -> 1, "kp" -> 2)
aggregateActor ! map
var map1: Map[String, Int] = Map[String, Int]("ak" -> 1, "kp" -> 2)
aggregateActor ! map1
Thread.sleep(1000)
var output = Map("kp" -> 4, "ak" -> 2)
aggregateActor ! "DISPLAY_LIST"
expectMsg(output)
}
}

Now I write a Reduce unit test, which takes a Result object and create a Map object and publish it to Aggregator object for future aggregation.

"Reduce actor" must {
"send back Map message" in {
// create the aggregate Actor
val aggregateActor = system.actorOf(Props[AggregateActor]);
// create the list of reduce Actors
val reduceRouter = system.actorOf(Props(new ReduceActor(aggregateActor)))
val list: List[Result] = List[Result](new Result("kp", 1), new Result("ak", 2))
reduceRouter ! list
val list1: List[Result] = List[Result](new Result("ak", 1), new Result("kp", 2))
reduceRouter ! list1
Thread.sleep(1000)
var output = Map("kp" -> 3, "ak" -> 3)
aggregateActor ! "DISPLAY_LIST"
expectMsg(output)
}
}

Write a Map unit test to take a line and create a Result object. If you notice carefully, the map and reduce object implements Akka’s Roundrobin Routers where the line is processed by multiple threads in a roundrobbin way.

"Map actor" must {
"send back Map message" in {
// create the aggregate Actor
val aggregateActor = system.actorOf(Props[AggregateActor]);
// create the list of reduce Actors
val reduceRouter = system.actorOf(Props(new ReduceActor(aggregateActor)).withRouter(RoundRobinRouter(nrOfInstances = 2)))
// create the list of map Actors
val mapRouter = system.actorOf(Props(new MapActor(reduceRouter)).withRouter(RoundRobinRouter(nrOfInstances = 2)))
var line = "Aditya Krishna Kartik Manjula"
mapRouter ! line
Thread.sleep(1000)
var output = Map("Kartik" -> 1, "Krishna" -> 1, "Aditya" -> 1, "Manjula" -> 1)
aggregateActor ! "DISPLAY_LIST"
expectMsg(output)
}
}

We write a listController which tests end to end integrating map/reduce/aggregator and assert the values,

"List Reader Controller actor" must {
"send back Map message" in {
// create the aggregate Actor
 val aggregateActor = system.actorOf(Props[AggregateActor]);
// create the list of reduce Actors
 val reduceRouter = system.actorOf(Props(new ReduceActor(aggregateActor)).withRouter(RoundRobinRouter(nrOfInstances = 2)))
// create the list of map Actors
 val mapRouter = system.actorOf(Props(new MapActor(reduceRouter)).withRouter(RoundRobinRouter(nrOfInstances = 2)))
val controller = system.actorOf(Props(new ControllerActor(aggregateActor, mapRouter)))
val lineReadActor = system.actorOf(Props[LineReadActor])
var list = List[String]("Aditya Krishna Kartik Manjula", "Manjula Anand Aditya Kartik", "Anand Vani Phani Aditya", "Kartik Krishna Manjula Aditya", "Vani Phani Anand Manjula")
lineReadActor.tell(list, controller)
Thread.sleep(1000)
var output = Map("Anand" -> 3, "Kartik" -> 3, "EOF" -> 1, "Krishna" -> 2, "Vani" -> 2, "Phani" -> 2, "Aditya" -> 4, "Manjula" -> 4)
 aggregateActor ! "DISPLAY_LIST"
 expectMsg(output)
 }
 }

Finally we will write a fileReaderActor and pass a large file to it to do MapReduce.

Now if you see the actual code, refer MapActor.scala, there is a keyword called yield. yieldhelps you to create another datatype from one collection. The syntax is as below,

def evaluateExpression(line: String): List[Result] = {
var result = for (word <- line split (" ") toList; if !STOP_WORDS.contains(word.toLowerCase))
yield (new Result(word, 1))
return result
}

Refer ReduceActor.scala you will find the code as “result =>” called as Lambda expression. Lambda expressions are fundamental to functional programming language like Scala,

def reduce(list: List[Result]): Map[String, Int] = {
var results: Map[String, Int] = new HashMap[String, Int]

list.foreach(result => {
if (results.contains(result.word)) {
results(result.word) += result.noOfInstances
} else {
results(result.word) = result.noOfInstances
}
})
return results;
}

Conclusion

If you notice, in this example, I have used and learned:

  • Scala programming aspects like, Collection, keyword yield, lambda expressions
  • Akka‘s Roundrobin Routers for threading and concurrency
  • SBT for integration with Eclipse
  • ScalaTest for TDD


Published at DZone with permission of Krishna Prasad, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)