Με αφορμή το “Εγώ πάντως νιώθω κρύο” που διάβασα στη lifo και μ’ έβαλε σε σκέψεις 🙂
Ούτε εμένα μ’ άρεσε ποτέ το κρύο. Η αλήθεια είναι ότι παραπονιέμαι για τον καιρό της Στοκχόλμης όλη την ώρα.
Εδώ τους τελευταίους 3 μήνες χιονίζει συνέχεια. Πού και πού ανεβαίνει λίγο η θερμοκρασία και βρέχει. Το χιόνι γίνεται πάγος, ο πάγος γίνεται λάσπη. Και μετά χιονίζει ξανά και πάλι απ’ την αρχή. Δεν είναι ότι αυτή η χώρα δεν έχει εποχές. Αρκεί να προσέξεις τα χρώματα γύρω σου. Ειλικρινά, θα έλεγα ότι εδώ πιο έντονα από οπουδήποτε αλλού κατάλαβαίνω πότε αλλάζουν οι εποχές.
Δουλειές υπάρχουν και οι διαφορές με τα εργασιακά δεδομένα στην Ελλάδα είναι τεράστιες. Τα ωράρια είναι ελλαστικά, δεν υπάρχει άγχος, δεν κάνεις υπερωρίες, δεν τρως το μεσημεριανό σου γεύμα μπροστά στον υπολογιστή. Η δουλειά σου αξιολογείται και τα επιτευγματά σου αναγνωρίζονται. Οι κρατήσεις από το μισθό σου μεταφράζονται άμεσα σε κρατικές παροχές τις οποίες απολαμβάνεις καθημερινά.
Και όχι, δε νιώθω ξένη. Προφανώς πολλά πράγματα σε ξενίζουν στην αρχή, πολύ απλά γιατί έχεις συνηθίσει διαφορετικά. Γιατί όμως να επιμένεις ότι αυτό που εσύ έχεις συνηθίσει είναι το σωστό; Νέες εικόνες, νέες εμπειρίες και νέοι άνθρωποι. Αν τα αποδεχθείς, δε θα νιώθεις πια ξένη.
Εγώ δεν έφυγα από την Ελλάδα γιατί βαρέθηκα. Ούτε λόγω κρίσης. Στην πραγματικότητα τίποτα δεν ήταν στραβό στη ζωή μου πριν φύγω. Είχα μια υπέροχη δουλειά με καλές αποδοχές, καλούς φίλους, σπίτι, αυτοκίνητο. Εμφανίστηκε μια ευκαιρία για μάστερ και δεν το σκέφτηκα πολύ, έφυγα. Φυσικά, τότε πίστευα πως μετά τις σπουδές θα γυρνούσα πίσω. Δεν περίμενα να μπορώ να γυρίσω πίσω και να μη θέλω! “Έξω” είναι ωραία, είναι διαφορετικά, οι άνθρωποι είναι όντως ανοιχτόμυαλοι. Είναι ευγενικοί, φιλικοί, μορφωμένοι. Δεν υπάρχει ξενοφοβεία ούτε καχυποψία. Σχεδόν όλοι οι φίλοι μου είναι διασκορπισμένοι σε διάφορα μέρη στην Ευρώπη. Και αυτό είναι υπέροχο. Γιατί οι πτήσεις είναι φθηνές και μπορώ για σαββατοκύριακο να αρπάζω ένα αεροπλάνο και να τους επισκέπτομαι.
Είμαι 26 και ποτέ πιο φιλόδοξη. Μέσα στον πρώτο χρόνο αφότου έφυγα, όλα τα όνειρα, τα πλάνα, τα σχέδια μου πολλαπλασιάστηκαν. Άνοιξα τα μάτια μου και ξαφνικά ένιωσα ότι ο κόσμος είναι όλος δικός μου, όλος μπροστά μου. Πατρις; Τι εστί πατρις; Όχι, φυσικά και στην Ελλάδα του σήμερα δε γίνεται να ζήσεις ως νέος με πτυχία και να να πιάσουν τόπο τα λεφτά που οι γονείς σου επένδυσαν για να σε σπουδάσουν. Μην κάθεσαι εκεί λοιπόν και συνεχίζεις να τα χαραμίζεις. Φύγε!
Μπορεί τα φρούτα και τα λαχανικά να μην είναι από τον κήπο της γιαγιάς μου και να μη μυρίζουν χώμα. Και μπορεί το καλοκαίρι η θερμοκρασία να φτάνει μετά βίας τους 20. Αλλά η πόλη είναι όμορφη και πράσινη και καθαρή. Κι αν δε γνωρίζεις τους γείτονές σου, σκέφτηκες ποτέ να μαγειρέψεις κάτι και να πας να τους χτυπήσεις την πόρτα;
Όσο για την αντιμετώπιση των Ελλήνων από τα μέσα, νομίζω είναι σχεδόν παντού η ίδια. Και ειλικρινά, πολλές φορές προτιμώ τα δικά τους μέσα, από τα σάπια, τα γελοία τα δικά μας.
Δε μεγάλωσα καθόλου απότομα. Ίσα ίσα, νιώθω ακόμα 18. Αντί να σκέφτομαι την άδεια μου και τα μπάνια μου, σκέφτομαι σε ποια συναυλία να πάω την άλλη βδομάδα. Τόσα συγκροτήματα που δε θα έρχονταν στην Ελλάδα ποτέ κι εδώ έχουμε πρόβλημα σε ποιο να πρωτοπάμε. Σκέφτομαι για ποιά πόλη να κλείσω εισιτήρια για το επόμενο σ-κ. Σκέφτομαι σε ποια εταιρία να κάνω αίτηση για internship το καλοκαίρι. Ο μικρόκοσμός μου έγινε μακρόκοσμος. Είναι απέραντος, γεμάτος ευκαιρίες, προκλήσεις, γνώσεις κι εμπειρίες. Μου λείπει κι εμένα η γιαγιά μου και το χωριό μου και ο καφές με τους φίλους στην πλατεία. Αλλά, ήμαρτον, 3 ώρες απόσταση είναι μονάχα η Αθήνα! Δεν είναι δα και τόσο δύσκολο να επισκέπτομαι πού και πού 🙂
Αβέβαιο και άγνωστο το μέλλον. Εκπληκτικό και συναρπαστικό! Ο κόσμος είναι απέραντος και έχουμε δει μόνο ένα μικρούτσικο κομμάτι. Φύγετε, τολμήστε, γνωρίστε, αποδεχτείτε. Να την αγαπάτε τη γιαγιά σας, αλλά προς θεού, μη μένετε στο χωριό σας να ποτίζετε λουλούδια!
There will be no paper of the week for the next few upcoming weeks
I’ll resume as soon as possible!
Happy reading and coding,
Only Aggressive Elephants are Fast Elephants
The main character of this post’s paper is poor Bob, an analyst, and the story goes about his adventures with elephants, i.e. Hadoop MapReduce. Bob’s queries often reveal interesting information or anomalies that need to be further examined, spawning new queries and therefore triggering new MapReduce jobs. In such a scenario, the problem stems from slow query runtimes, due to lack of proper schemas and data indexing. Poor Bob ends up drinking too much coffee waiting for his queries to finish, while he also has to deal with his angry boss! The paper proposes inexpensive index creation on Hadoop data attributes, in order to reduce execution times in exploratory use-cases of MapReduce.
The main idea comes from the observation that HDFS keeps three replicas of each block by default. Why not keep each one of these copies in a different sort order and with a different index? If it happens that our query filters data by an attribute for which there exists an index, we will gain great speedup!
As usual, it’s not as easy as it sounds. How can indexes be created during upload time, so that no additional scan is required? How does HDFS need to be modified in order to distinguish the different replicas? How can MapReduce applications exploit the sort orders and indexes effectively? And how can we make the scheduler aware?
HAIL (Hadoop Aggressive Indexing Library) answers all these questions.
HAIL is a Hadoop modification which keeps each physical replica of an HDFS block in a different sort order and creates clustered indexes on HDFS blocks when uploading the data. The HAIL client converts each HDFS block to binary PAX format and sends it to three datanodes. Then, each node sorts the data contained in the block using a different sort order. Sorting and indexing happens in main memory. Each datanode creates a different clustered index for each data block and stores it with the sorted data. The process is shown below:
HAIL modifies the upload pipeline of HDFS to support index creation and sorting. One major difference is how the HAIL client parses each file into rows. In HDFS, each block is defined by a constant number of bytes, while HAIL parses the file based on content, never splitting a row between two blocks. After the blocks have been defined, each one of them is converted to a binary PAX representation.
HAIL also modifies the checksum mechanism of HDFS. In HDFS, only the datanode receiving the last replica verifies the checksums and then acknowledges the packets back to the other datanodes. However, in HAIL, no checksum or data is flushed to disk, as sorting is required first. A typical size of a block is 64MB, which allows HAIL to perform several block sortings and index creations completely in memory. After sorting, indexing and metadata creation, each datanode computes their own checksums and data is flushed to disk.
At this point, I have to make clear that there is no impact on the fault-tolerance properties of HDFS. Data is only reorganized inside the same block, therefore in case of failure, no information is lost except from the index of that particular replica.
From the application perspective, a MapReduce program requires only a few changes in order to use HAIL. Bob needs to annotate his map function and specify a selection predicate and the projected attributes. In other words, he needs to specify the values to filter the input records and the fields he wants the query to return as result. This information will help the runtime schedule the tasks on nodes containing the appropriate indexes. This annotation simplifies the map function, where Bob does not have to include the filtering anymore.
From HAIL’s perspective, the record splitting policy needs to be modified, as each replica is now of different size, due to indexes. HAIL groups several data blocks inside one input split, resulting in less map tasks. If the job requires a full scan or no relevant index exists, HAIL falls back to standard Hadoop mechanisms.
Extensive evaluation is provided in the paper. The system is compared to Hadoop and Hadoop++ (previous work of the same group). Experiments are performed to test upload performance, impact of index creation, scaling, query performance and new splitting mechanism impact. Six different clusters are used and two different datasets. Applications include queries with varying degree of selectivities.
The results show that HAIL matches or often outperforms Hadoop in data uploading, even though it has to sort blocks and create indexes and demonstrate the efficiency of PAX representation. Query execution times significantly decrease when appropriate indexes are present and having less map tasks reduces end-to-end job runtimes. However, in the case of a failure of a map task, more data needs to be re-processed, slowing down recovery.
Happy reading and coding,
An I/O Efficient MapReduce
This week’s paper is named after a Titaness of the Greek Mythology, Themis, protector of order and law. However, in our context, Themis is a MapReduce-inspired system, whose main goal is to minimize I/O operations.
It is true that many MapReduce jobs are I/O-bound and it would certainly benefit performance if we could find a mechanism to limit I/O operations. At this point, I need to clarify that Themis aims to reduce disk I/O, i.e. find a way to process records in memory and spil records to disk as rarely as possible.
In order to achieve this goal, Themis needs to make substantially different design choices from common MapReduce systems. The three most important features that enable disk I/O elimination are:
1. Relaxation of fault-tolerance guarantees
Classical MapReduce implementations offer fault-tolerance at the task level, i.e. if a node fails, the tasks that were running on this node will be re-scheduled and executed on another node. The rest of the tasks, running or completed, are unaffected. This model allows for fast recovery and has satisfactory results in very large clusters where failures are common. However, the creators of Themis argue that clusters with thousands of machines are not that common in the real world. Surely Google, Facebook, Yahoo! and a couple others are running MapReduce jobs on clusters of such scale, however, the majority of Hadoop adaptors have much smaller deployments. Making the assumption of a small to mid-size cluster, Themis offers fault-tolerance at the job level only, i.e. if a node fails during execution, the whole job will need to be re-executed. This decision greatly simplifies implementation and allows for data pipelining and time-consuming checkpoint elimination.
2. Dynamic memory management
In order to minimize disk operations, Themis aims to process a record entirely as soon as it is read from disk. To achieve this, the system needs a mechanism to ensure that there will be enough memory throughout the duration of the record processing. Themis implements a dynamic memory manager with the following pluggable policies:
- Pool-based management
This policy views the available memory as a pool of fixed-sized pre-allocated buffers. The application and reserve a buffer from the pool and return it when it is not needed anymore. If a worker requests a block from a pool which is empty, it will block until a buffer is returned from another worker. The obvious advantage of this policy is its simplicity by performing all memory allocation and pool setup at startup. On the other hand, this static partitioning limits the maximum record size supported and sacrifices flexibility.
- Quota-based management
The second policy is design to better handle varying-sized records. It controls the flow of data among computational stages, so that stages receiving input do not get overwhelmed. To achieve this, the manager uses queues between stages. A quota is the amount of memory that is allocated between a producer and a consumer stage. If the producer is about to exceed its allowed quota, it is stalled until the corresponding consumer has processed enough data.
- Constraint-based management
The third and more sophisticated policy dynamically adjusts memory allocation based on worker requests and currently available memory. The manager monitors memory usage by each worker and accepts requests for memory allocation. If enough memory is available, the request is satisfied. Worker requests are prioritized based on their position on the execution graph. This policy adds a significant overhead to the execution time and performs well when the number of allocation requests is relatively small.
3. Per-node disk-I/O management
A disk scheduler, running on every Themis node, manages the way data is grouped and written to disk. The scheduler organizes data in large batches in order to reduce disk accesses.
Like many other systems, Themis is implemented as a dataflow graph consisting of stages which are grouped in phases and executed in parallel by workers.
Phase Zero samples input data in a distributed fashion and extracts information about the distribution of records and keys. This phase is necessary in order to ensure that partitions in each of the following stages will be small enough to be processed and sorted in memory. Figuring out the distribution of input data is fairly simple, however, intermediate data also need to be considered. Themis applies the map function to a subset of the input data in order to generate a subset of intermediate data, which can be used to approximate the intermediate distribution. Both range and hash partitioning mechanisms are supported.
Phase One implements the mapping and shuffling, but it is split down to more fine-grained steps, shown in the diagram below:
The Reader, Mapper and Sender are the “producer-side” of the phase and their functionality is as simple as their names suggest. The rest of the stages are the “consumer-side” of the phase and it is the place where the disk-scheduler, described in the previous section, comes into the picture.
Phase Two consists of the sorting and the reduce phase. Each partition is sorted by key, always keeping the result in memory. The stages of phase two are shown below:
Themis is evaluated using several applications, like sorting, WordCount, PageRank and CloudBurst. Both synthetic and real data are used for evaluation. Experiments are run on a fairly small, 20-node cluster, or even on a single node if data is small enough. Experiments test disk performance, memory management policies and compare Themis with Hadoop. Comparison with Hadoop shows that I/O bound applications can greatly benefit from Themis’ design. However, Themis does not use a distributed file system or any kind of replication and it is not clear whether those factors were taken into account when taking measurements.
In my opinion, Themis is a quite immature system with some important limitations that need to be solved. However, it clearly shows that Hadoop is not the best fit for all cluster-sizes and types and that many deployments could be highly benefited by alternative designs.
Happy coding and reading,
Efficient Iterative Data Processing on Large Clusters
In this post I’m presenting HaLoop, yet another modified version of Hadoop, as the name reveals. HaLoop aims to overcome those limitations of Hadoop MapReduce, which make iterative applications inefficient and hard to develop.
The typical way of writing an iterative application in Hadoop MapReduce, is developing one MapReduce job for each iteration and also writing a driver program to manage the jobs. There are several disadvantages in this approach, including:
- manual orchestration of the jobs
- no explicit way to define a termination condition. For simplicity, many developers specify the number of iterations beforehand. However, it is often very difficult to predict how many iterations would be enough in order to achieve a solution accurate enough, especially when the termination condition is some kind of convergence criterion.
- re-loading and re-processing of data that remain invariant across iterations
HaLoop extends Hadoop’s API to support loops and easy expression of termination conditions. It also proposes caching and indexing mechanisms and novel loop-aware scheduler which can exploit cached data.
As seen in the figure below, HaLoop reuses most of Hadoop’s master-slaves architecture, including HDFS as its distributed file system. HaLoop adds a control module to the master node, which is responsible for launching new jobs as iterations and also check the termination condition, if specified. The task scheduler and task tracker are modified in order to become loop-aware, while an indexing and a caching module were added.
HaLoop’s API was built to support recursive programs of the following general form:
Ri+1 = R0 U (Ri L)
where, R0 contains initial values and L is invariant.
The API provides ways to specify a termination condition. A loop can finish when either the maximum number of iterations has been reached or the difference between the result of two consecutive iterations is below some threshold. HaLoop does not specify any high-level language for writing applications. A developer needs to specify the loop body, consisting of one or more map-reduce pairs. Optionally, they can also define a termination condition and loop-invariant data.
HaLoop’s scheduler is built to send to the same physical machines tasks that access the same data on different iterations. This is what the authors call inter-iteration locality. Assuming we have a problem of the general relation above, the mappers that were assigned partitions of the L dataset only need to compute their results once. Their results also need to be sent to the corresponding reducers only after the first iteration. The only requirement for the scheduler to work is that the number of reducer tasks needs to remain constant throughout the program execution.
Caching and Indexing
HaLoop’s caching and indexing mechanisms are built in three levels:
- Reducer Input Cache
If a reducer receives data from a table that has been defined to be loop-invariant and its input cache is enabled, it will store this data in its local disk for future use and create an index on it. In a subsequent iteration, when the reducer will receive tuples that need to be sent to the user-specified reduce function, it will also search its cache for values corresponding to the same key. Both cached and new tuples will be passed to the reduce function.
- Reducer Output Cache
The reducer output cache stores the latest local result of the computation on this node. The aim of this cache is to reduce the costs of computing termination conditions. When all reducers have their local result cached, they can send them over to the master node, which can decide if the termination condition is fulfilled.
- Mapper Input Cache
The goal of the mapper input cache is to avoid as many non-local reads as possible during non-initial iterations. If during the first iteration a mapper reads a remote split, it will cache it for possible future use.
PageRank, Descendant Query and K-means are the applications used for evaluation and compared against Hadoop implementations. Each cache type was evaluated separately. Evaluation shows that the Reducer Input Cache significantly reduces the the I/O of the MapReduce shuffling phase. This is very important as for a lot of applications, the most time-consuming step is communicating data from mappers to reducers. The Reducer Output Cache is also beneficial and reduces the termination condition calculation cost up to 40%. The Mapper Input Cache only marginally benefits execution.
Happy coding and reading,
How to write a MapReduce-related paper:
1. Find a feature of MapReduce which is terrible for a *specific* setup or class of applications
- Read the motivation of the MapReduce system
- Specify set of goals, set of target applications and set of appropriate physical setups for MapReduce
- Choose *anything* outside those sets
2. Change this feature using the simplest solution
- Download Hadoop
- Locate the class with the corresponding functionality
- Write your own implementation
3. Perform evaluation against vanilla MapReduce, using *only* the setup and applications your version was built for.
Tips: This is probably the hardest step…
- Find the optimal configuration for your system, i.e. number of nodes, data sizes, data formats, etc.
- Use default configuration for Hadoop
- Keep tuning the configuration until your system outperforms Hadoop
- Find a cool name for your system
- Write a long, descriptive paper
- Focus on the limitations of MapReduce that your system overcomes
- Include a colourful architecture diagram, showing your system as an additional layer to the Hadoop stack (even though it’s only a couple of modified classes)
Do research they said.. it will be fun they said.. it will be novel they said…