(Created page with "== MapReduce/Joins == How to implement a Join from Relational Algebra using MapReduce? There are several types of joins: * broadcast join * reduce-side join == Broa...")
 
(No difference)

Latest revision as of 18:19, 30 December 2015

MapReduce/Joins

How to implement a Join from Relational Algebra using MapReduce?

There are several types of joins:

  • broadcast join
  • reduce-side join


Broadcast-Join

  • when one table is small enough to fit into memory
  • small one is broadcasted to each mapper and kept in memory there
  • go through blocks of other one and do the join


Hadoop MapReduce Implementation



Reduce-Side Join

  • preparation step
    • each mapper tags each record to identify which entity it is
  • mapper outputs (id, record) for each record
    • same keys will be copied to same reducer during shuffling
  • each reducer does the join based on equal kets
  • similar to Hash Join in DBMS

note

  • it may lead to massive data re-distribution
  • when input is huge
  • even though data may be on one node it may be moved to others
  • need to take the cost of communication into account


Example

Suppose we have the following schema:

  • Employee(name, SSN)
  • Department(emplSSN, depName)

We want to have the following join:

  • $\text{Employee} \Join_\text{SSN = emplSSN} \text{Department}$

Our tagged dataset

Emp Sue 999
Emp Tony 777
Dep 999 Accounts
Dep 777 Sales
Dep 777 Marketing


After applying map we get

999 (Emp, Sue, 999)
777 (Emp, Tony, 777)
999 (Dep, 999, Accounts)
777 (Dep, 777, Sales)
777 (Dep, 777, Marketing)


And finally after the reduce stage we get

key=999 [(Emp, Sue, 999), (Dep, 999, Accounts)]
key=777 [(Emp, Tony, 777), (Dep, 777, Sales), (Dep, 777, Marketing)]


Python Implementation

Source: [1]

def mapper(record):
  id = record[1]
  emit(id, record)

def reducer(key, list_of_values):
  grouped = itertools.groupby(list_of_values, operator.itemgetter(0))
  g = {k: list(v) for (k, v) in grouped}
  order = g['order'][0]

  for line_item in g['line_item']:
    emit(order + line_item)


Hadoop MapReduce Implementation

From AIM3:


High Level APIs

  • High level APIs such as Pig/Hive or Flink/Spark already provide join abstractions
  • so there's no need to implement them

Sources