EDITING BOARD
RO
EN
×
▼ BROWSE ISSUES ▼
Issue 33

Hadoop MapReduce deep diving and tuning

Tudor Lăpușan
Java & Big Data developer @ Telenav



PROGRAMMING

MapReduce is the main batch processing framework from Apache Hadoop project. It was developed by Google and in 2004 they published an article describing the MapReduce concept.

In 2006, Dug Cutting succeeded to implement this concept and put it into an Apache project, namely Apache Hadoop. First release happened in 14 Sep, 2007.

This was the beginning of the Big Data for everyone, starting from just simple curious people to any kind of company. Soon Apache Hadoop1 reached a very strong community and big players, as well, such as Yahoo, Facebook, Ebay, IBM, Linkedin and others2.

For easy adaptation by the world, other frameworks were developed on top of MapReduce, which are much easier to learn and work with. One example is Apache Hive3, which was developed at Facebook. Because almost anyone from computer science has SQL knowledge, Facebook developed Hive, which allowed them to query and analyze their datasets by simply using HiveQL language, very similar with SQL. This way, anyone from the Facebook team with SQL knowledge had the ability to use the power of MapReduce.

MapReduce general view.

MapReduce is a distributed framework, which works on commodity hardware and it is used for data processing. It has two main phases, Map and Reduce and another phase, Shuffle, which is not so well known, but in some of use cases, it can slow down or boost your entire execution.

For the majority of use cases of data processing using MapReduce framework, the Map phase goes through the entire dataset and applies more filters and the Reduce phase is the place where we actually apply our algorithms.

To better understand how MapReduce works, I recommend reading more about the MapReduce HelloWorld, the Wordcount 4example. It simply finds out the frequency of each word from a datasets. The beauty of MapReduce is that the same code which works for a dataset of few MBs can work on much bigger ones, TBs, PBs or even more, without any code modification in our program. This is due to the nature of MapReduce distributed execution, which automatically takes care of work distribution and task failure.

Bellow, you can see the pseudo-code representation of the Wordcount example.

mapper (filename, file-contents):
 for each word in file-contents:
   emit (word,1)

reducer (word, values):
  sum=0
  for each value in values:
    sum=sum + value
     emit (word, sum)

In the next picture, you can see the general process of MapReduce for Wordcount execution. Each map phase receives its input and prepares intermediary key as pairs of (key,value), where the key is the actual word and the value is the word's current frequency, namely 1. Shuffling phase guarantees that all pairs with the same key will serve as input for only one reducer, so in reduce phase we can very easily calculate the frequency of each word.

MapReduce deep dive.

First of all, the next configuration properties and steps implied into MapReduce tuning refer to MapReduce V1. There is a new MapReduce version, V2, which can have very few changes. It is supposed to have more than basic MapReduce knowledge to understand the next sections.

As I just already mentioned, into a complete MapReduce execution, there are two main phases, map and reduce, and another phase, shuffle, between them.

Map side.

Each Map phase receives as input a block (input split) from a file stored into HDFS. Default value for a block file is 64 MB. If the entire file size is less than 64 MB, the Map phase will receive as input the entire file.

When the Map phase starts to produce output, it is not written directly to the disk. The process is more involved and takes advantage of the RAM memory by allocating a buffer where the intermediary results are stored. By default, the size of this buffer is 100 MB, but it can be tuned by changing the io.sort.mb property. When more than 80% of the buffer size if fulfilled, a background process will spill the content to disk. The 80% threshold can be changed as well using the io.sort.spill.percent property.

Before the data is spilled to the disk, it is partitioned based on the number of reduce processes. For each partition, an in memory sorting by key is executed and also if a combiner function is available, it is run on the output of the sorting process. Having a combiner function helps us compact the map output, so we'll have less data to write to the disk and to transfer through the network. Each time the buffer memory threshold is reached, a new spill file is created, so in the majority of map executions, at the end, we can have multiple spill files into a map execution.

After the map phase is finished, all the spill files are merged into a single partitioned and sorted output file. It is also recommended to compress the map output as it is written to disk to speed up the disk writing, to save disk space and also to reduce the amount of data transferred to the reducers. Compression option is disabled by default, but it can be changed very easily by setting the mapred.compress.map.output property to true. Supported compression algorithms are DEFLATE, gzip, bzip2, LZO, LZ4 and Snappy.

The Reduce phase takes its input through a fetch data method using the HTTP protocol. Let's see what happens on the reduce side.

Reduce side.

After the map execution is finished, it informs the job tracker, who knows to which reducers to send each partition. Furthermore, the reduce needs the map output from several map tasks, so it starts copying their outputs as soon they are finished.

The map outputs are copied directly to the educe task JVM' memory if they are small enough. If not, they are copied to the disk. When the in-memory buffer reaches a threshold size (controlled by mapred.job.shuffle.merge.percent), or reaches a threshold number of map outputs (mapred.inmem.merge.threshold), it is merged and spilled to the disk. If a combiner is specified, it will be run during the merge, to reduce the amount of data written to the disk. If we end up having multiple spill files to disk, they are also merged into larger, sorted files to save some time for later on.

When all the map tasks are finished and their outputs are copied to reduce tasks, we are going into reduce merge phase, which merges all map outputs, maintaining their sort order by key. The result of this merge serves as input for reduce phase. During the reduce phase, the reduce function is invoked for each key in the sorted output. The output of this phase is written directly to the output file system, typically HDFS.

The shuffle phase means all the processes from the point where the map produces output to where the reduce consumes it. In other words, shuffle phase implies sorting, merging and copying data between the map and the reduce phases.

MapReduce configuration tuning

After we saw MapReduce internal steps and we have a better understanding of them, we can now start to improve the overall MapReduce execution.

Now I'm going to give you some general advice how to tune your MapReduce execution.

Generally, it is better to give to shuffle phase as much memory as possible, so the data will be processed into RAM instead of disk. Because the shuffle phase is using RAM memory from the memory assigned to the map and reduce phases, we should be careful to let enough memory for the map and reduce execution. This is why it is best to write out the map and reduce functions to use as little memory as possible (by avoiding the accumulation of values in a map, for example).

The amount of memory given to each map and reduce execution is given by the mapred.child.java.opts property. We should give as much memory as possible to them, but also not to exceed the quantity of RAM server memory.

On the map side, the best performance can be obtained by avoiding multiple spills to the disk, one is optimal. For this, we should detect the size of map output and change the corresponding properties (e.g., io.sort.mb) to minimize the number of spill files to the disk.

On the reduce side, the best performance is obtained when the intermediate data can reside entirely in the memory. By default, this does not happen, since for the general case, all the memory is reserved for the reduce function. But if your reduce function has light memory requirements, setting the right properties may boost your performance. For this, take a look at mapred.inmem.merge.threshold and mapred.job.reduce.input.buffer.percent properties.

Conclusion

If you just want to give it a try to MapReduce framework I don't recommend you to bother with the above tuning because the default configuration works good enough. But if you are really working with big data sets and want to wait only for 3 days instead of 5 days for your analyze results, I strongly recommend to take tuning into consideration. Here in Cluj-Napoca, we have a strong BigData5 community where we started to have relevant topics and workshops about BigData. Just join us if you want to discover more! Next meet up6 will be the biggest one until now, we have awesome speakers from Bucharest and Timisoara, who will talk about Elasticsearch, Spark, Tachyon and Machine Learning.

VIDEO: ISSUE 109 LAUNCH EVENT

Sponsors

  • Accenture
  • BT Code Crafters
  • Bosch
  • Betfair
  • MHP
  • BoatyardX
  • .msg systems
  • P3 group
  • Ing Hubs
  • Cognizant Softvision
  • Colors in projects

VIDEO: ISSUE 109 LAUNCH EVENT

Tudor Lăpușan wrote also