Line 3: Line 3:
  
 
The main characteristics
 
The main characteristics
* it's scalable and fault tolerant  
+
* it's scalable and fault tolerant: it scales lineraly with amount of data
 
* it a data processing tool that can handle parallel processing of large volumes of data
 
* it a data processing tool that can handle parallel processing of large volumes of data
 
* it typically runs on a [[Hadoop Distributed File System|Distributed File System]]
 
* it typically runs on a [[Hadoop Distributed File System|Distributed File System]]
 +
  
 
Main Idea:
 
Main Idea:
Line 12: Line 13:
  
  
=== Programming Model ===
+
== Programming Model ==
 
The Map-Reduce model is simple. A programmer needs to specify two primitives:
 
The Map-Reduce model is simple. A programmer needs to specify two primitives:
* a Map function which produces "intermediate" result
+
* a '''Map''' function which produces "intermediate" result
* a Reduce function which produces the final result
+
* a '''Reduce''' function which produces the final result
* There's also a combine stage in-between  
+
* And there's a shuffle stage in-between
 +
 
  
==== Map Function ====
+
=== Map Function ===
 
Map($[(k, v)]$) $\to [(k_2, v_2)]$
 
Map($[(k, v)]$) $\to [(k_2, v_2)]$
 
* input: a list of $(k, v)$ pairs
 
* input: a list of $(k, v)$ pairs
Line 24: Line 26:
 
* and it outputs a list of $(k_2, v_2)$ pairs  
 
* and it outputs a list of $(k_2, v_2)$ pairs  
  
==== Combine ====
+
=== Shuffle ===
Combine
+
Shuffle
* from a list of $(k2, v2)$ pairs form a list of $(k_2, [v_2])$ pairs
+
* from a list of $(k_2, v_2)$ pairs form a list of $(k_2, [v_2])$ pairs
 
* i.e. group intermediate results by key
 
* i.e. group intermediate results by key
  
==== Reduce Function ====
+
=== Reduce Function ===
 
Reduce($[(k_2, [v_2])]$)
 
Reduce($[(k_2, [v_2])]$)
 
* now for each combined pair we apply reduce
 
* now for each combined pair we apply reduce
  
=== Map-Reduce Job Execution ===
+
 
 +
== Program Execution ==
 
Procession
 
Procession
* each processing job is broken down to pieces
+
* input is broken into pieces
* each piece is given for a map task to execute
+
* each piece is given for a map to execute
* also there are one or more reduce tasks
+
* there are one or more reduces
 +
 
 +
{{void|https://raw.github.com/alexeygrigorev/ulb-adb-project-couchbd/master/report/images/map-reduce.png}}
  
https://raw.github.com/alexeygrigorev/ulb-adb-project-couchbd/master/report/images/map-reduce.png
+
http://hsto.org/files/9f4/126/a0c/9f4126a0c41c47c7a21c3f4888f5966e.png
  
So it's performed in two steps
 
* map phase
 
* reduce phase
 
  
Implementation on top of [[Hadoop Distributed File System|Distributed File System]] is little bit more complex and needs some additional logic for replicating and so on.
+
=== [[Hadoop MapReduce]] ===
* For Hadoop implementation refer to [[Hadoop#Map-Reduce Job Execution]]
+
For Hadoop implementation refer to [[Hadoop MapReduce#Map-Reduce Job Execution]]
  
  
=== Example ===
+
== Example ==
 +
=== Word Count ===
 
Word-Counting: need to calculate how many occurrences of each word there are  
 
Word-Counting: need to calculate how many occurrences of each word there are  
 
* distribute all documents among $k$ computers  
 
* distribute all documents among $k$ computers  
Line 69: Line 72:
  
  
== High Level Languages ==
+
=== [[Matrix-Matrix Multiplication]] ===
There are SQL-like languages that work on top of [[Hadoop]] and translate into a set of Map-Reduce jobs
+
* Suppose we have two sparse matrices: $A$, a $m \times k$ matrix and $B$, a $k \times n$ matrix
* [[Pig]]
+
* [[Hive]]
+
 
+
 
+
== Joins in Map-Reduce ==
+
=== Broadcast-Join ===
+
* when one table is small enough to fit into memory
+
* small one is broadcasted to each mapper and kept in memory there
+
* go through blocks of other one and do the join
+
 
+
 
+
=== Reduce-Side Join ===
+
* preparation step
+
** each mapper tags each record to identify which entity it is
+
* mapper outputs (id, record) for each record
+
** same keys will be copied to same reducer during shuffling
+
* each reducer does the join based on equal kets
+
* similar to [[Physical Operators (databases)#(Partition) Hash Join|Hash Join]] in DBMS
+
 
+
note
+
* it may lead to massive data re-distribution
+
* when input is huge
+
* even though data may be on one node it may be moved to others
+
* need to take the cost of communication into account
+
 
+
 
+
==== Example ====
+
Suppose we have the following schema:
+
* Employee(name, SSN)
+
* Department(emplSSN, depName)
+
 
+
We want to have the following join:
+
* $\text{Employee} \Join_\text{SSN = emplSSN} \text{Department}$
+
 
+
Our tagged dataset
+
{| class="wikitable"
+
|-
+
| Emp || Sue || 999
+
|-
+
| Emp || Tony || 777
+
|-
+
| Dep || 999 || Accounts
+
|-
+
| Dep || 777 || Sales
+
|-
+
| Dep || 777 || Marketing
+
|}
+
 
+
 
+
After applying map we get
+
{| class="wikitable"
+
|-
+
| 999 || (Emp, Sue, 999)
+
|-
+
| 777 || (Emp, Tony, 777)
+
|-
+
| 999 || (Dep, 999, Accounts)
+
|-
+
| 777 || (Dep, 777, Sales)
+
|-
+
| 777 || (Dep, 777, Marketing)
+
|}
+
 
+
 
+
And finally after the reduce stage we get
+
 
+
{| class="wikitable"
+
|-
+
| key=999 || [(Emp, Sue, 999), (Dep, 999, Accounts)]
+
|-
+
| key=777 || [(Emp, Tony, 777), (Dep, 777, Sales), (Dep, 777, Marketing)]
+
|}
+
 
+
 
+
For additional code refer to this [http://code.google.com/p/stolzen/source/browse/trunk/courses/coursera/Introduction%20to%20Data%20Science/assignment3/p2_join.py implementation in Python]
+
 
+
 
+
== Other Map-Reduce Examples ==
+
=== Matrix Multiplication ===
+
* Suppose we have two sparse matrices: $A (l \times m)$ and $B (m \times n)$
+
 
* We want to calculate $C = A \times B$
 
* We want to calculate $C = A \times B$
  
 
Map:  
 
Map:  
 
* for each element $(i, j) \in A$
 
* for each element $(i, j) \in A$
: emit $((i, k), A[i, j])$ for $k \in 1..N$
+
** emit $((i, k), A[i, j])$ for $k \in 1..N$
 
* for each element $(j, k) \in B$
 
* for each element $(j, k) \in B$
: emit $((i, k), B[j, k])$ for $i \in 1..L$
+
** emit $((i, k), B[j, k])$ for $i \in 1..L$
  
 
Reduce:  
 
Reduce:  
Line 166: Line 89:
  
  
== MapReduce vs RDBMS ==
 
[[Relational Databases|RDBMS]]
 
* Declarative query language
 
* Schemas
 
* Logical Data Independence
 
* [[Indexing (databases)|Indexing]]
 
* [[Logical Query Plan Optimization|Algebraic Optimization]]
 
* Caching / [[View Materialization|Materialized Views]]
 
* [[ACID]] and transactions
 
  
MapReduce
+
== MapReduce Patterns ==
* High Scalability
+
=== Joins ===
* Fault-tolerance
+
* see [[MapReduce/Joins]]
  
 +
 +
== Implementations ==
 +
MapReduce is implemented on the following systems:
 +
* [[Hadoop MapReduce]]
 +
* [[CouchDB]] uses MapReduce for querying the database
 +
* [[Spark]], [[Flink]] also provide map and reduce functions
  
  
 
== See also ==
 
== See also ==
* [[Hadoop]]
+
* [[Hadoop]] and [[Hadoop MapReduce]]
 
* [[Hadoop Distributed File System]]
 
* [[Hadoop Distributed File System]]
  
Line 189: Line 109:
 
* [[Introduction to Data Science (coursera)]]
 
* [[Introduction to Data Science (coursera)]]
 
* Lee et al, Parallel Data Processing with MapReduce: A Survey [http://www.cs.arizona.edu/~bkmoon/papers/sigmodrec11.pdf]
 
* Lee et al, Parallel Data Processing with MapReduce: A Survey [http://www.cs.arizona.edu/~bkmoon/papers/sigmodrec11.pdf]
 
+
* [[Hadoop: The Definitive Guide (book)]]
  
 
[[Category:Algorithms]]
 
[[Category:Algorithms]]
 
[[Category:Hadoop]]
 
[[Category:Hadoop]]
[[Category:Distributed Systems]]
+
[[Category:MapReduce]]

Latest revision as of 18:10, 30 December 2015

Map Reduce

Map-Reduce is a paradigm of parallel computation that initially comes from Functional Programming

The main characteristics

  • it's scalable and fault tolerant: it scales lineraly with amount of data
  • it a data processing tool that can handle parallel processing of large volumes of data
  • it typically runs on a Distributed File System


Main Idea:

  • hide details of parallel execution
  • allow user to focus only on data processing strategies


Programming Model

The Map-Reduce model is simple. A programmer needs to specify two primitives:

  • a Map function which produces "intermediate" result
  • a Reduce function which produces the final result
  • And there's a shuffle stage in-between


Map Function

Map($[(k, v)]$) $\to [(k_2, v_2)]$

  • input: a list of $(k, v)$ pairs
  • the map function is applied to each pair in the input
  • and it outputs a list of $(k_2, v_2)$ pairs

Shuffle

Shuffle

  • from a list of $(k_2, v_2)$ pairs form a list of $(k_2, [v_2])$ pairs
  • i.e. group intermediate results by key

Reduce Function

Reduce($[(k_2, [v_2])]$)

  • now for each combined pair we apply reduce


Program Execution

Procession

  • input is broken into pieces
  • each piece is given for a map to execute
  • there are one or more reduces


9f4126a0c41c47c7a21c3f4888f5966e.png


Hadoop MapReduce

For Hadoop implementation refer to Hadoop MapReduce#Map-Reduce Job Execution


Example

Word Count

Word-Counting: need to calculate how many occurrences of each word there are

  • distribute all documents among $k$ computers
  • for each document return a set of (word, freq) pairs (the map phase)
  • now sum the occurrences for each word (the reduce phase)

Pseudo-code

def map(String input_key, String doc):
  for each word w in doc:
    EmitIntermediate(w, 1)

def reduce(String output_key, Iterator output_vals):
  int res = 0
  for each v in output_vals:
    res += v
  Emit(res)


Matrix-Matrix Multiplication

  • Suppose we have two sparse matrices: $A$, a $m \times k$ matrix and $B$, a $k \times n$ matrix
  • We want to calculate $C = A \times B$

Map:

  • for each element $(i, j) \in A$
    • emit $((i, k), A[i, j])$ for $k \in 1..N$
  • for each element $(j, k) \in B$
    • emit $((i, k), B[j, k])$ for $i \in 1..L$

Reduce:

  • key = $(i, k)$
  • value = $\sum_j (A[i, j] \times B[j, k])$

Implementation in Python


MapReduce Patterns

Joins


Implementations

MapReduce is implemented on the following systems:


See also

Sources