Map Slots Hadoop
Its not uncommon for a beginner to think Spark as a replacement to Hadoop. The term “Hadoop” is interchangeably used to refer to either Hadoop ecosystem (or) Hadoop MapReduce (or) Hadoop HDFS
. Apache Spark came in as a very strong contender to replace Hadoop MapReduce
computation engine.
(In Hadoop 1, the JobTracker assigns mapper tasks to specific processing slots.) The mapper task itself processes its input split one record at a time — in the figure, this lone record is represented by the key/value pair. Pools basically specify the minimum no. Of map slots, reduce slots and a limit on running jobs. 2) Capacity Scheduler. This scheduler was basically designed by Yahoo. The capacity scheduler basically supports several features that are too supported by fair scheduler: Queues are allocated a fraction of total resource capacity. I will continue the topic with configuration of avaiable map and reduce slots in the cluster. Hadoop uses these slots to run map and reduce tasks and these slots are fixed with certain properties. Hadoop 1.0.3 is used on Ubuntu 12.04 machine.
This blog is to better understand what motivated Spark and how it evolved successfully as a strong contender to MapReduce.
We will section this blog in 3 parts:
MapReduce computation Engine in a Nutshell
- Cons of MapReduce as motivation for Spark
- Look at the drawbacks of MapReduce
- How Spark addressed them
- How Spark works
- Behind the scenes of a spark application running in cluster
- Appendix
- Look at other attempts like Corona done to make up for the downsides of MapReduce Engine.
1. MapReduce (MR) computation in a nutshell
I’ll not go deep into the details, but, lets see birds eye view of how Hadoop MapReduce works. Below figure shows a typical Hadoop Cluster running two Map-Reduce applications. Each of these application’s Map(M) and Reduce(R) jobs are marked with black and white colours respectively.
- NameNode and DataNode:
- NameNode + DataNodes essentially make up HDFS.
- NameNode only stores the metadata of HDFS i.e., it stores the list of all files in the file system (not its data), and keeps a track of them across the cluster.
- DataNodes store the actual data of files.
- NameNode JVM heart beats with DataNode JVM’s every 3secs.
- JobTracker (JT) and TaskTracker (TT):
- JobTracker JVM is the brain of the MapReduce Engine and runs on NameNode.
- JobTracker creates and allocates jobs to TaskTracker which runs on DataNodes.
- TaskTrackers runs the task and reports task status to JobTracker.
- Inside TaskTracker JVM, we have slots where we run our jobs. These slots are hardcoded to be either Map slot or Reduce slot. One cannot run a reduce job on a map slot and vice-versa.
- Parallelism in MapReduce is achieved by having multiple parallel map & reduce jobs running as processes in respective TaskTracker JVM slots.
- Job execution: In a typical MapReduce application, we chain multiple jobs of map and reduce together. It starts execution by reading a chunk of data from HDFS, run one-phase of map-reduce computation, write results back to HDFS, read those results into another map-reduce and write it back to HDFS again. There is usually like a loop going on there where we run this process over and over again
2. Cons of Map-Reduce as motivation for Spark
One can say that Spark has taken direct motivation from the downsides of MapReduce computation system. Let’s see the drawbacks of MapReduce computation engine and how Spark addressed them:
- Parallelism via processes:
- MapReduce: MapReduce doesn’t run Map and Reduce jobs as threads. They are processes which are heavyweight compared to threads.
- Spark: Spark runs its jobs by spawning different threads running inside the executor.
- CPU Utilization:
- MapReduce: The slots within TaskTracker, where Map and Reduce jobs gets executed, are not generic slots that can be used to run either Map or Reduce job. These slots are categorized into two types, one to run Map jobs and the other to run Reduce jobs. How does it matter? So, when you start a MapReduce application, initially, that application might spend like hours in just the Map phase. So, during this time none of the reduce slots are going to be used. This is why if you notice, your CPU% would not be high because all these Reduce slots are sitting empty. Facebook came up with an improvement to address this a bit. If you are interested please check Appendix section below.
- Spark: Similar to TaskTracker in MapReduce, Spark has Executor JVM’s on each machine. But, unlike hardcoded Map and Reduce slots in TaskTracker, these slots are generic where any task can run.
- Extensive Reads and writes:
- MapReduce: There is a whole lot of intermediate results which are written to HDFS and then read back by the next job from HDFS. Data handshake between the any two jobs chained together happens via reads and writes to HDFS.
- Spark: Spark is an in-memory processing engine. All of the data and intermediate results are kept in-memory. This is one of the reasons that you get 10-100x faster speed because of the efficient memory leverage.
Note: Facebook came up with Corona to address some of these cons and it did achieve 17% performance improvements on MapReduce Jobs. I’ve detailed it in Appendix.
3. How Spark works:
Now that we have seen the disadvantages with MapReduce and how Spark addressed it, its time to jump in and look at the internals of Spark briefly. In that, i’ll mainly try to cover how a spark application running in a cluster looks like. Below picture depicts Spark cluster:
Let’s look at differenct components shown in the above picture:
- Spark Master, Worker and Executor JVM’s:SparkMaster and Worker JVM’s are the resource managers. All worker JVM’s will register themselves with SparkMaster. They are very small. Example: Master use like 500MB of RAM & Worker uses like 1GB RAM.
- Master:Master JVM’s job is to decide and schedule the launch of Executor JVM’s across the nodes. But, Mater will not launch executors.
- Worker:It is the Worker who heartbeats with Master and launches the executors as per schedule.
- Executor:Executor JVM has generic slots where tasks run as threads. Also, all the data needed to run a task is cached within Executor memory.
- Driver:When we start our spark application with spark submit command, a driver will start and that driver will contact spark master to launch executors and run the tasks. Basically, Driver is a representative of our application and does all the communication with Spark.
- Task:Task is the smallest unit of execution which works on a partition of our data. Spark actually calls them cores. —executor-cores setting defines number of tasks that run within the Executor. For example, if we have set —executor-cores to six, then we can run six simultaneous threads within the executor JVM.
- Resilience:Worker JVM’s work is only to launch Executor JVM’s whenever Master tells them to do so. If Executor crashes, Worker will restart it. If Worker JVM crashes, Master will start it. Master will take care of driver JVM restart as well. But then, if a driver restarts, all the Ex’s will have to restart.
- Flexible Distribution of CPU resources:By CPU resources, We are referring to the tasks/threads running within an executor. Let’s assume that the second machine in the cluster has lot more ram and cpu resources. Can we run more threads in this second machine? Yes! You can do that by tweaking spark-env.sh file and set SPARK_WORKER_CORES to 10 in the second machine. The same setting if set to 6 in other machines, then master will launch 10 threads/tasks in that second machine and 6 in the remaining one’s. But, you could still oversubscribe in general. SPARK_WORKER_CORES tells worker JVM as to how many cores/tasks it can give out to its underlying executor JVM’s.
3. Conclusion:
We’ve seen:
- The initial motivation behind Spark
- Why it evolved successfully as a strong contender to MapReduce
- Why is Spark orders of magnitude faster than traditional Hadoop’s map-reduce system
- An overview of Spark application running in cluster
4. Appendix:
4.1 Corona - An attempt to make up for the downsides of MapReduce and improve CPU Utilization
Facebook came up with Corona to address the CPU Utilization problem that MapReduce has. In their hadoop cluster, when Facebook was running 100’s of (MapReduce) MR jobs with lots of them already in the backlog waiting to be run because all the MR slots were full with currently running MR jobs, they noticed that their CPU utilisation was pretty low (~60%). It was weird because they thought that all the Map (M) & Reduce (R) slots were full and they had a whole lot of backlog waiting out there for a free slot. What they noticed was that in traditional MR, once a Map Job finishes, then TaskTracker has to let JobTracker know that there is an empty slot. JobTracker will then allot this empty slot to the next job. This handshake between TaskTracker & JobTracker is taking ~15-20secs before the next job takes up that freed up slot. This is because, heartbeat of JobTracker is 3secs. So, it checks with TaskTracker for free slots once in every 3secs and it is not necessary that the next job will be assigned in the very next heartbeat. So, FaceBook added Corona which is a more aggressive job scheduler added on top of JobTracker. MapReduce took 66secs to fill a slot while Corona took like 55 secs (~17%). Slots here are M or R process id’s.
4.2. Legend:
- MR - MapReduce
- M - Map Job
- R - Reduce Job
- JT - JobTracker
- TT - TaskTracker
5. References:
What is YARN?
YARN stands for Yet Another Resource Negotiator. Apache YARN is part of the core Hadoop project. It is Hadoop’s cluster resource management system. Cluster resource management means managing the resources of the Hadoop Clusters. And by resources we mean Memory, CPU etc. YARN took over this task of cluster management from MapReduce and MapReduce is streamlined to perform Data Processing only in which it is best. YARN was introduced in Hadoop 2 to improve the MapReduce implementation, but it is general enough to support other distributed computing paradigms as well.
With Hadoop 1 and older versions, you were limited to only running MapReduce jobs. This was great if the type of work you were performing fit well into the MapReduce processing model, but it was restrictive for those wanting to perform graph processing, iterative computing, or any other type of work.
In Hadoop 2 the scheduling pieces of MapReduce were externalized and reworked into a new component called YARN, which is short for Yet Another Resource Negotiator. The fundamental idea of YARN is to split up the functionalities of resource management and job scheduling/monitoring into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM). Now applications that wish to operate on Hadoop are implemented as YARN applications. As a result, MapReduce is now a YARN application.
YARN provides APIs for requesting and working with cluster resources, but these APIs are not typically used directly by user code. Instead, users write to higher-level APIs provided by distributed computing frameworks, which themselves are built on YARN and hide the resource management details from the user.
Below figure shows some distributed computing frameworks (MapReduce, Spark and so on) running as YARN applications on the cluster compute layer (YARN) and the cluster storage layer (HDFS and HBase).
There is also a layer of applications that build on the frameworks shown above. Pig, Hive, and Crunch are all examples of processing frameworks that run on MapReduce, Spark, or Tez (or on all three), and don’t interact with YARN directly.
Why YARN?
MapReduce is a powerful distributed framework and programming model that allows batch-based parallelized work to be performed on a cluster of multiple nodes. Despite being very efficient at what it does, though, MapReduce has some disadvantages; principally that it’s batch-based, and as a result isn’t suited to real-time or even near-real-time data processing. Historically this has meant that processing models such as graph, iterative, and real-time data processing are not a natural fit for MapReduce.
YARN changes all of this by taking over the scheduling portions of MapReduce, and nothing else. At its core, YARN is a distributed scheduler and is responsible for two activities:
- Responding to a client’s request to create a container - A container is in essence a process, with a contract governing the physical resources that it’s permitted to use.
- Monitoring containers that are running, and terminating them if needed - Containers can be terminated if a YARN scheduler wants to free up resources so that containers from other applications can run, or if a container is using more than its allocated resources.
YARN vs Mapreduce
In MapReduce 1, there are two types of daemon that control the job execution process: a jobtracker and one or more tasktrackers. The jobtracker coordinates all the jobs run on the system by scheduling tasks to run on tasktrackers. Tasktrackers run tasks and send progress reports to the jobtracker, which keeps a record of the overall progress of each job. If a task fails, the jobtracker can reschedule it on a different tasktracker.
In MapReduce 1, the jobtracker takes care of both job scheduling (matching tasks with tasktrackers) and task progress monitoring (keeping track of tasks, restarting failed or slow tasks, and doing task bookkeeping, such as maintaining counter totals)
By contrast, in YARN these responsibilities are handled by separate entities: the resource manager and an application master (one for each MapReduce job). The jobtracker is also responsible for storing job history for completed jobs, although it is possible to run a job history server as a separate daemon to take the load off the jobtracker. In YARN, the equivalent role is the timeline server, which stores application history.
The mapping of components used in YARN and Mapreduce is summarized in the below tableMapReduce-1 | YARN |
---|---|
Jobtracker | Resource manager, Application master, Timeline server |
Tasktracker | Node manager |
Slot | Container |
Map Slots Hadoop Games
Benefits of Hadoop YARN
YARN was designed to address many of the limitations in MapReduce 1. The benefits to using YARN include the following:
- Scalability
YARN can run on larger clusters than MapReduce 1. MapReduce 1 hits scalability bottlenecks in the region of 4,000 nodes and 40,000 tasks, stemming from the fact that the jobtracker has to manage both jobs and tasks.
YARN overcomes these limitations by virtue of its split resource manager/application master architecture: it is designed to scale up to 10,000 nodes and 100,000 tasks. In contrast to the jobtracker, each instance of an application (like a MapReduce job) has a dedicated application master, which runs for the duration of the application.
- Availability
High availability (HA) is usually achieved by replicating the state needed for another daemon to take over the work needed to provide the service, in the event of the service daemon failing. However, the large amount of rapidly changing complex state in the jobtracker’s memory (each task status is updated every few seconds, for example) makes it very difficult to retrofit HA into the jobtracker service.
With the jobtracker’s responsibilities split between the resource manager and application master in YARN, making the service highly available became a divide-and conquer problem: provide HA for the resource manager, then for YARN applications (on a per-application basis). And indeed, Hadoop 2 supports HA both for the resource manager and for the application master for MapReduce jobs.
- Utilization
In MapReduce 1, each tasktracker is configured with a static allocation of fixed-size slots, which are divided into map slots and reduce slots at configuration time. A map slot can only be used to run a map task, and a reduce slot can only be used for a reduce task.
In YARN, a node manager manages a pool of resources, rather than a fixed number of designated slots. MapReduce running on YARN will not hit the situation where a reduce task has to wait because only map slots are available on the cluster, which can happen in MapReduce 1. If the resources to run the task are available, then the application will be eligible for them.
Furthermore, resources in YARN are fine grained, so an application can make a request for what it needs, rather than for an indivisible slot, which may be too big (which is wasteful of resources) or too small (which may cause a failure) for the particular task.
- Multitenancy
In some ways, the biggest benefit of YARN is that it opens up Hadoop to other types of distributed application beyond MapReduce. MapReduce is just one YARN application among many. It is even possible for users to run different versions of MapReduce on the same YARN cluster, which makes the process of upgrading MapReduce more manageable.