How to implement a Join from Relational Algebra using MapReduce?

There are several types of joins:

  • broadcast join
  • reduce-side join


  • when one table is small enough to fit into memory
  • small one is broadcasted to each mapper and kept in memory there
  • go through blocks of other one and do the join

Hadoop MapReduce Implementation

Reduce-Side Join

  • preparation step
    • each mapper tags each record to identify which entity it is
  • mapper outputs (id, record) for each record
    • same keys will be copied to same reducer during shuffling
  • each reducer does the join based on equal kets
  • similar to Hash Join in DBMS


  • it may lead to massive data re-distribution
  • when input is huge
  • even though data may be on one node it may be moved to others
  • need to take the cost of communication into account


Suppose we have the following schema:

  • Employee(name, SSN)
  • Department(emplSSN, depName)

We want to have the following join:

  • $\text{Employee} \Join_\text{SSN = emplSSN} \text{Department}$

Our tagged dataset

Emp Sue 999
Emp Tony 777
Dep 999 Accounts
Dep 777 Sales
Dep 777 Marketing

After applying map we get

999 (Emp, Sue, 999)
777 (Emp, Tony, 777)
999 (Dep, 999, Accounts)
777 (Dep, 777, Sales)
777 (Dep, 777, Marketing)

And finally after the reduce stage we get

key=999 [(Emp, Sue, 999), (Dep, 999, Accounts)]
key=777 [(Emp, Tony, 777), (Dep, 777, Sales), (Dep, 777, Marketing)]

Python Implementation

Source: [1]

def mapper(record):
  id = record[1]
  emit(id, record)

def reducer(key, list_of_values):
  grouped = itertools.groupby(list_of_values, operator.itemgetter(0))
  g = {k: list(v) for (k, v) in grouped}
  order = g['order'][0]

  for line_item in g['line_item']:
    emit(order + line_item)

Hadoop MapReduce Implementation

From AIM3:

High Level APIs

  • High level APIs such as Pig/Hive or Flink/Spark already provide join abstractions
  • so there's no need to implement them


Machine Learning Bookcamp: Learn machine learning by doing projects. Get 40% off with code "grigorevpc".

Share your opinion