Join Types in Pig

This blog post is on joins! This trivial but extremely useful relational operation I know you’ re all familiar with! 

Inner join, equi-join, natural join, theta-join, outer join, left-outer join, right-outer join, full-outer join, self join, semi-join

I bet you remember the definitions and tell the differences as easy as you remember the multiplication tables… Right! Once upon a time, I also could… just right before my undergrad databases exam… hmmm…

Honestly, I’ve always found it hard to remember the specific details for all different types of joins available and I always need to refresh the concepts whenever I need to use a specific type. (Oh, how much I love wikipedia, hell yeah I do! :p)

 

Joins in Map-Reduce

No matter how common and trivial, join operations have always been a headache to Map-Reduce users. A simple google search on “map-reduce join operation” will give you several blog posts, presentations and papers as a result. The problem originates from Map-Reduce’s Map-Shuffle-Sort-Reduce static pipeline and single input second-order functions. The challenge is finding the most effective way to “fit” the join operation into this programming model.

The most common strategies are two and both consist of one Map-Reduce job:

  • Reducer-side join: In this strategy, the map phase serves as the preparation phase. The mapper reads records from both inputs and tags each record with a label based on the origin of the record. It then emits records setting as key the join key. Each reducer then receives all records that share the same key, checks the origin of each record and generates the cross product. Slides 21-22 from this ETH presentation provide a very clear example.

 

  • Mapper-side join: The alternative comes from the introduction of Hadoop’s distributed cache. This facility can be used to broadcast one of the inputs to all mappers and perform the join in the map phase. However, it is quite obvious that this technique only makes sense in the case where one of the inputs is small enough to fit in the distributed cache!

 

Joins in Pig

Fortunately, Pig users do not need to program the join operations themselves as Pig Latin offers the JOIN statement. Also, since Pig is a high-level abstraction that aims to hide low-level implementation details, they do not need to care about the join strategy… Or do they?

Pig users can use the JOIN operator in pair with the USING keyword in order to select the join execution strategy. Pig offers the following Advanced Join Techniques:

  • Fragment-Replicate Join: USING ‘replicated’

It is advised to used this technique when a small table that fits to memory needs to be joined with a significantly larger table. The small table will be loaded in the memory of each machine using the distributed cache, while the large table will be fragmented and distributed to the mappers. No reduce phase is required, as the join can be completely implemented in the map phase. This type of join can only support inner and left-outer join, as the left table is always the one that will be replicated. Pig implements this join by creating two map-only jobs. During the first one, the distributed cache is set and the small input is broadcasted to all machines. The second one is used to actually perform the join operation.

The user must pay attention and have in mind that the second table in their statement will be the one loaded into memory, i.e. in the statement:

joined = JOIN A BY $0, B BY $0 USING ‘replicated’

B is the input that will be loaded into memory. Extra care needs to be taken for one more reason when using this type of join. Pig will not check beforehand if the specified input will fit into memory, thus resulting in a runtime error in case it doesn’t!

  • Merge Join: USING ‘merge’

You should use this type of join when the inputs are already sorted by key. This is a variation of the well-known sort-merge algorithm, where the sort is already performed 🙂

In order to execute this join, Pig will first run an initial Map-Reduce job that will sample the second input and build an index of the values of the join keys for each HDFS block. The second job will take the first input and utilize the index to find the key it is looking for in the correct block. For each key, all records with this particular key will be saved in memory and used to do the join. In other words, two pointers need to be maintained, one for each input. Since both inputs are sorted, only one lookup in the index is required.

  • Skew Join: USING ‘skewed’

The third and last type of join provided by Pig is the skew join. It is quite common that some keys are a lot more popular than others in datasets, that is, most of the values correspond to a very small set of keys. Using the default algorithm in such a case would result in significantly overloading some of the reducers in the system. 

In order to overcome this problem, one can use Pig’s skew join. Pig will first sample one of the inputs, searching for the popular keys, whose records would not fit in memory. The rest of the records will be handled by a default join. However, records that belong to one of the identified as popular keys, will be split among a number of reducers. The records of the other input that correspond to keys that were split, will be replicated in each reducer that contains that key.

Skew is supported in one input only. If both tables have skew, the algorithm will still work, but will be significantly slower.

However, extra care should be taken when using this type of join! This algorithm breaks the Map-Reduce convention that all records with the same key will be processed by the same reducer! This could be dangerous or weild unexpected results if one tries to use an operation that depends on all records with the same key being in the same part file!

     

    Thoughts…

    Pig’s philosophy states that “Pigs are domestic animals”, meaning that users should be able to control and modify its behaviour. This is one of the reasons why Pig does not have an optimizer to choose among the available join strategies and leaves this choice to the user. However, this choice implies that the users have a deep understanding on how the different techniques work, as well as adequate information regarding the format and distribution of the data they want to join is available. If this is not the case, a wrong choice will almost surely lead to severe execution overhead.

    My scepticism comes from the high-level nature that such a system is supposed to offer. What do the users of such systems know and what should they know? In my understanding, the whole point of a high-level abstraction is to hide implementation details and low-level information on how the underlying framework works. And honestly speaking, I can’t see how an optimizer would come in conflict to Pig’s philosophy on it being a “domestic animal”. Maybe, it could be designed so that it is possible to disable.

    How is all this related to my thesis? The truth is that I will probably have no time at all to look into this any further. On the other hand, it is interesting to point out that Stratosphere off
    ers an almost natural way of expressing joins and other relational operations using its Input Contracts. The Match Contract essentially maps to an inner-join, while the PACT compiler can choose the most effective execution strategy to implement it. The CoGroup Input Contract can be used to realize outer and anti-joins, while the Cross Contract can be used to implement all kinds of arbitrary theta-joins. 

    I personally find this kind of issues really intriguing and although I will probably have to “push” them into “future work”, I now have something to look forward after my thesis is done =)

     

    I hope it will be Spring already by the next time I post!

    Until then, happy coding!

    V.

     

    PS: For more info on Pig’s advanced relational operations, here is da book!

    One thought on “Join Types in Pig

    Leave a comment