EuroSys 2013 Takeaway

Spring is finally around the corner in Berlin and I’ve just come back after attending EuroSys 2013 in Prague. This was the first time I had the chance to attend such a big international conference and it was a truly eye-opening and highly motivating experience.

I went to EuroSys as a Google conference and travel grant winner. Once in a while, Google offers travel grants to students in order to attend big conferences. I heard about these grants from Iuliia, dear friend and Google Student Ambassador at KTHYou still have time to apply for 3 more conferences taking place in Europe, in 2013!

OLYMPUS DIGITAL CAMERA

I believe that the most important takeaway from EuroSys was the chance to meet and discuss with so many brilliant and interesting people. It feels so good to be surrounded by smart people, full of energy, ideas and willingness to share their experiences, talk about their work and spread their enthusiasm. I was particularly happy to meet again with people I hadn’t seen for years, old classmates and colleagues I had lost contact with. It was also a pleasant surprise to see quite a few female students and scientists participating, even though only a tiny fraction of them were speakers.

The workshops and main presentations had a bit of everything, covering almost every aspect of systems research. Within one day, you would hear about MapReduce, graph algorithms, machine learning, DHTs, Paxos, consistency levels, compilers, schedulers, star trek and cat pictures :p

The program consisted of high-quality presentations, with most of the speakers being well prepared and capable of keeping the audience’s attention alive. Unsurprisingly, some were instantly ready to answer awkward questions with hidden slides, while others were able to avoid tricky questions by “work-in-progress” or “let’s-take-this-offline” kind of answers.

OLYMPUS DIGITAL CAMERA

As a distributed systems and big data person, I personally enjoyed most the large-scale distributed computation track, as well as the Replication and Concurrency and Parallelism topics. On the other hand, I can’t help but deeply admire people working on lower-level topics, such as operating systems, virtualization and compilers. Therefore, the whole program was very interesting to me.

In the list of my favorite papers from the conference (and the ones I would recommend you to read), I would include Optimus, BlinkDB (best paper award), Presto, Mizan, Maygh and Omega (best student paper award).

OLYMPUS DIGITAL CAMERA

Regarding the organization, I believe it was a very successfull conference. I do know first hand how hard it is to organize an event of such scale and how hard it is to meet each participant’s expectations. The organizers were very helpful and friendly and I believe they managed to make everyone feel comfortable and happy to be there. The location of the event was in the beautiful campus of the Czech Technical University at the Technical Library building. Some people thought that the main hall was maybe a bit small. The truth is that some additional chairs had been placed on the balconies on the sides of the room, but even when sitting up there, it was easy to see the slides and have no problems hearing the speakers. These side chairs also served people who arrived late or wanted to check their e-mails in more privacy :p

The venue was very close to the metro, easy to find and easily accessible from the city center.
I know well that in any kind of event, one of the most important factors that determines success is of course food! When people are hungry, they always complain, while when people eat well, they’re always in a good mood. And I personally loved the food 😉 In between sessions, there were coffee breaks with nice snacks, tea and coffee. The lunch was also great, with desserts and the friendly staff being my favorites.

And of course, there was the banquet dinner, at the Municipal House of Prague, in the historical center.
The fancy decoration of a beautiful historical building, plenty of wine and coq-au-vin as the main dish resulted into lively talks. As I happened to be sitting in a table with warm-hearted Mediterranean people, I assure you the conversations included everything from football to army stories and funny computer scientist stereotypes.

OLYMPUS DIGITAL CAMERA

As I arrived on Friday evening, I had the chance to walk around Prague and visit a few sights before the conference started. Prague is a magical city and definitely now one of my favorite in Europe. The old town reminded me a bit of Athens, with narrow streets and hidden pubs inside gardens or courtyards. One would definitely appreciate the architecture and well-preserved old buildings and bridges across the Vltava river. I would totally recommend a visit to the castle, cathedral, palace and surrounding gardens, the Charles bridge and the Lennon wall.  Finally, don’t miss the chance to try local food, such as the delicious beef with creamy sauce, dumplings and cranberries and of course local beer, such as the Velkopopovicky dark sweet beer.

I feel very lucky and grateful both to Google and Iuliia and I am now more than ever encouraged to work hard, write and submit a lot of papers! By the way, EuroSys 2014 is taking place in Amsterdam!
See you there maybe?

Happy reading and coding,

V.

Εγώ πάντως το κρύο το συνήθισα

Με αφορμή το “Εγώ πάντως νιώθω κρύο” που διάβασα στη lifo και μ’ έβαλε σε σκέψεις 🙂

Ούτε εμένα μ’ άρεσε ποτέ το κρύο. Η αλήθεια είναι ότι παραπονιέμαι για τον καιρό της Στοκχόλμης όλη την ώρα.

Εδώ τους τελευταίους 3 μήνες χιονίζει συνέχεια. Πού και πού ανεβαίνει λίγο η θερμοκρασία και βρέχει. Το χιόνι γίνεται πάγος, ο πάγος γίνεται λάσπη. Και μετά χιονίζει ξανά και πάλι απ’ την αρχή. Δεν είναι ότι αυτή η χώρα δεν έχει εποχές. Αρκεί να προσέξεις τα χρώματα γύρω σου. Ειλικρινά, θα έλεγα ότι εδώ πιο έντονα από οπουδήποτε αλλού κατάλαβαίνω πότε αλλάζουν οι εποχές.

148930_356568891074222_1232733868_n334475_436767076387736_1102218196_o534739_460281990702911_667000146_n

Δουλειές υπάρχουν και οι διαφορές με τα εργασιακά δεδομένα στην Ελλάδα είναι τεράστιες. Τα ωράρια είναι ελλαστικά, δεν υπάρχει άγχος, δεν κάνεις υπερωρίες, δεν τρως το μεσημεριανό σου γεύμα μπροστά στον υπολογιστή. Η δουλειά σου αξιολογείται και τα επιτευγματά σου αναγνωρίζονται. Οι κρατήσεις από το μισθό σου μεταφράζονται άμεσα σε κρατικές παροχές τις οποίες απολαμβάνεις καθημερινά.

Και όχι, δε νιώθω ξένη. Προφανώς πολλά πράγματα σε ξενίζουν στην αρχή, πολύ απλά γιατί έχεις συνηθίσει διαφορετικά. Γιατί όμως να επιμένεις ότι αυτό που εσύ έχεις συνηθίσει είναι το σωστό; Νέες εικόνες, νέες εμπειρίες και νέοι άνθρωποι. Αν τα αποδεχθείς, δε θα νιώθεις πια ξένη.

Εγώ δεν έφυγα από την Ελλάδα γιατί βαρέθηκα. Ούτε λόγω κρίσης. Στην πραγματικότητα τίποτα δεν ήταν στραβό στη ζωή μου πριν φύγω. Είχα μια υπέροχη δουλειά με καλές αποδοχές, καλούς φίλους, σπίτι, αυτοκίνητο. Εμφανίστηκε μια ευκαιρία για μάστερ και δεν το σκέφτηκα πολύ, έφυγα. Φυσικά, τότε πίστευα πως μετά τις σπουδές θα γυρνούσα πίσω. Δεν περίμενα να μπορώ να γυρίσω πίσω και να μη θέλω! “Έξω” είναι ωραία, είναι διαφορετικά, οι άνθρωποι είναι όντως ανοιχτόμυαλοι. Είναι ευγενικοί, φιλικοί, μορφωμένοι. Δεν υπάρχει ξενοφοβεία ούτε καχυποψία. Σχεδόν όλοι οι φίλοι μου είναι διασκορπισμένοι σε διάφορα μέρη στην Ευρώπη. Και αυτό είναι υπέροχο. Γιατί οι πτήσεις είναι φθηνές και μπορώ για σαββατοκύριακο να αρπάζω ένα αεροπλάνο και να τους επισκέπτομαι.

Είμαι 26 και ποτέ πιο φιλόδοξη. Μέσα στον πρώτο χρόνο αφότου έφυγα, όλα τα όνειρα, τα πλάνα, τα σχέδια μου πολλαπλασιάστηκαν. Άνοιξα τα μάτια μου και ξαφνικά ένιωσα ότι ο κόσμος είναι όλος δικός μου, όλος μπροστά μου. Πατρις; Τι εστί πατρις; Όχι, φυσικά και στην Ελλάδα του σήμερα δε γίνεται να ζήσεις ως νέος με πτυχία και να να πιάσουν τόπο τα λεφτά που οι γονείς σου επένδυσαν για να σε σπουδάσουν. Μην κάθεσαι εκεί λοιπόν και συνεχίζεις να τα χαραμίζεις. Φύγε!

Μπορεί τα φρούτα και τα λαχανικά να μην είναι από τον κήπο της γιαγιάς μου και να μη μυρίζουν χώμα. Και μπορεί το καλοκαίρι η θερμοκρασία να φτάνει μετά βίας τους 20. Αλλά η πόλη είναι όμορφη και πράσινη και καθαρή. Κι αν δε γνωρίζεις τους γείτονές σου, σκέφτηκες ποτέ να μαγειρέψεις κάτι και να πας να τους χτυπήσεις την πόρτα;

Όσο για την αντιμετώπιση των Ελλήνων από τα μέσα, νομίζω είναι σχεδόν παντού η ίδια. Και ειλικρινά, πολλές φορές προτιμώ τα δικά τους μέσα, από τα σάπια, τα γελοία τα δικά μας.

Δε μεγάλωσα καθόλου απότομα. Ίσα ίσα, νιώθω ακόμα 18. Αντί να σκέφτομαι την άδεια μου και τα μπάνια μου, σκέφτομαι σε ποια συναυλία να πάω την άλλη βδομάδα. Τόσα συγκροτήματα που δε θα έρχονταν στην Ελλάδα ποτέ κι εδώ έχουμε πρόβλημα σε ποιο να πρωτοπάμε. Σκέφτομαι για ποιά πόλη να κλείσω εισιτήρια για το επόμενο σ-κ. Σκέφτομαι σε ποια εταιρία να κάνω αίτηση για internship το καλοκαίρι. Ο μικρόκοσμός μου έγινε μακρόκοσμος. Είναι απέραντος, γεμάτος ευκαιρίες, προκλήσεις, γνώσεις κι εμπειρίες. Μου λείπει κι εμένα η γιαγιά μου και το χωριό μου και ο καφές με τους φίλους στην πλατεία. Αλλά, ήμαρτον, 3 ώρες απόσταση είναι μονάχα η Αθήνα! Δεν είναι δα και τόσο δύσκολο να επισκέπτομαι πού και πού 🙂

Αβέβαιο και άγνωστο το μέλλον. Εκπληκτικό και συναρπαστικό! Ο κόσμος είναι απέραντος και έχουμε δει μόνο ένα μικρούτσικο κομμάτι. Φύγετε, τολμήστε, γνωρίστε, αποδεχτείτε. Να την αγαπάτε τη γιαγιά σας, αλλά προς θεού, μη μένετε στο χωριό σας να ποτίζετε λουλούδια!

Β.

Paper of the week: HAIL

Only Aggressive Elephants are Fast Elephants

The main character of this post’s paper is poor Bob, an analyst, and the story goes about his adventures with elephants, i.e. Hadoop MapReduce. Bob’s queries often reveal interesting information or anomalies that need to be further examined, spawning new queries and therefore triggering new MapReduce jobs. In such a scenario, the problem stems from slow query runtimes, due to lack of proper schemas and data indexing. Poor Bob ends up drinking too much coffee waiting for his queries to finish, while he also has to deal with his angry boss! The paper proposes inexpensive index creation on Hadoop data attributes, in order to reduce execution times in exploratory use-cases of MapReduce.

The main idea comes from the observation that HDFS keeps three replicas of each block by default. Why not keep each one of these copies in a different sort order and with a different index? If it happens that our query filters data by an attribute for which there exists an index, we will gain great speedup!

As usual, it’s not as easy as it sounds. How can indexes be created during upload time, so that no additional scan is required? How does HDFS need to be modified in order to distinguish the different replicas? How can MapReduce applications exploit the sort orders and indexes effectively? And how can we make the scheduler aware?

HAIL (Hadoop Aggressive Indexing Library) answers all these questions.

 

System Overview

HAIL is a Hadoop modification which keeps each physical replica of an HDFS block in a different sort order and creates clustered indexes on HDFS blocks when uploading the data. The HAIL client converts each HDFS block to binary PAX format and sends it to three datanodes. Then, each node sorts the data contained in the block using a different sort order. Sorting and indexing happens in main memory. Each datanode creates a different clustered index for each data block and stores it with the sorted data. The process is shown below:

Hail_architecture

 

Upload Pipeline

HAIL modifies the upload pipeline of HDFS to support index creation and sorting. One major difference is how the HAIL client parses each file into rows. In HDFS, each block is defined by a constant number of bytes, while HAIL parses the file based on content, never splitting a row between two blocks. After the blocks have been defined, each one of them is converted to a binary PAX representation.

HAIL also modifies the checksum mechanism of HDFS. In HDFS, only the datanode receiving the last replica verifies the checksums and then acknowledges the packets back to the other datanodes. However, in HAIL, no checksum or data is flushed to disk, as sorting is required first. A typical size of a block is 64MB, which allows HAIL to perform several block sortings and index creations completely in memory. After sorting, indexing and metadata creation, each datanode computes their own checksums and data is flushed to disk.

At this point, I have to make clear that there is no impact on the fault-tolerance properties of HDFS. Data is only reorganized inside the same block, therefore in case of failure, no information is lost except from the index of that particular replica.

 

Query Pipeline

From the application perspective, a MapReduce program requires only a few changes in order to use HAIL. Bob needs to annotate his map function and specify a selection predicate and the projected attributes. In other words, he needs to specify the values to filter the input records and the fields he wants the query to return as result. This information will help the runtime schedule the tasks on nodes containing the appropriate indexes. This annotation simplifies the map function, where Bob does not have to include the filtering anymore.

From HAIL’s perspective, the record splitting policy needs to be modified, as each replica is now of different size, due to indexes. HAIL groups several data blocks inside one input split, resulting in less map tasks. If the job requires a full scan or no relevant index exists, HAIL falls back to standard Hadoop mechanisms. 

 

Evaluation

Extensive evaluation is provided in the paper. The system is compared to Hadoop and Hadoop++ (previous work of the same group). Experiments are performed to test upload performance, impact of index creation, scaling, query performance and new splitting mechanism impact. Six different clusters are used and two different datasets. Applications include queries with varying degree of selectivities.

The results show that HAIL matches or often outperforms Hadoop in data uploading, even though it has to sort blocks and create indexes and demonstrate the efficiency of PAX representation. Query execution times significantly decrease when appropriate indexes are present and having less map tasks reduces end-to-end job runtimes. However, in the case of a failure of a map task, more data needs to be re-processed, slowing down recovery.

 

Links

 

Happy reading and coding,

V.

    Paper of the week: Themis

    An I/O Efficient MapReduce

    This week’s paper is named after a Titaness of the Greek Mythology, Themis, protector of order and law. However, in our context, Themis is a MapReduce-inspired system, whose main goal is to minimize I/O operations.

    It is true that many MapReduce jobs are I/O-bound and it would certainly benefit performance if we could find a mechanism to limit I/O operations. At this point, I need to clarify that Themis aims to reduce disk I/O, i.e. find a way to process records in memory and spil records to disk as rarely as possible.

    In order to achieve this goal, Themis needs to make substantially different design choices from common MapReduce systems. The three most important features that enable disk I/O elimination are:

    1. Relaxation of fault-tolerance guarantees

    Classical MapReduce implementations offer fault-tolerance at the task level, i.e. if a node fails, the tasks that were running on this node will be re-scheduled and executed on another node. The rest of the tasks, running or completed, are unaffected. This model allows for fast recovery and has satisfactory results in very large clusters where failures are common. However, the creators of Themis argue that clusters with thousands of machines are not that common in the real world. Surely Google, Facebook, Yahoo! and a couple others are running MapReduce jobs on clusters of such scale, however, the majority of Hadoop adaptors have much smaller deployments. Making the assumption of a small to mid-size cluster, Themis offers fault-tolerance at the job level only, i.e. if a node fails during execution, the whole job will need to be re-executed. This decision greatly simplifies implementation and allows for data pipelining and time-consuming checkpoint elimination.

    2. Dynamic memory management

    In order to minimize disk operations, Themis aims to process a record entirely as soon as it is read from disk. To achieve this, the system needs a mechanism to ensure that there will be enough memory throughout the duration of the record processing. Themis implements a dynamic memory manager with the following pluggable policies:

    • Pool-based management

    This policy views the available memory as a pool of fixed-sized pre-allocated buffers. The application and reserve a buffer from the pool and return it when it is not needed anymore. If a worker requests a block from a pool which is empty, it will block until a buffer is returned from another worker. The obvious advantage of this policy is its simplicity by performing all memory allocation and pool setup at startup. On the other hand, this static partitioning limits the maximum record size supported and sacrifices flexibility.

    • Quota-based management

    The second policy is design to better handle varying-sized records. It controls the flow of data among computational stages, so that stages receiving input do not get overwhelmed. To achieve this, the manager uses queues between stages. A quota is the amount of memory that is allocated between a producer and a consumer stage. If the producer is about to exceed its allowed quota, it is stalled until the corresponding consumer has processed enough data. 

    • Constraint-based management

    The third and more sophisticated policy dynamically adjusts memory allocation based on worker requests and currently available memory. The manager monitors memory usage by each worker and accepts requests for memory allocation. If enough memory is available, the request is satisfied. Worker requests are prioritized based on their position on the execution graph. This policy adds a significant overhead to the execution time and performs well when the number of allocation requests is relatively small.

    3. Per-node disk-I/O management

    A disk scheduler, running on every Themis node, manages the way data is grouped and written to disk. The scheduler  organizes data in large batches in order to reduce disk accesses.

     

    System Overview

    Like many other systems, Themis is implemented as a dataflow graph consisting of stages which are grouped in phases and executed in parallel by workers

    Phase Zero samples input data in a distributed fashion and extracts information about the distribution of records and keys. This phase is necessary in order to ensure that partitions in each of the following stages will be small enough to be processed and sorted in memory. Figuring out the distribution of input data is fairly simple, however, intermediate data also need to be considered. Themis applies the map function to a subset of the input data in order to generate a subset of intermediate data, which can be used to approximate the intermediate distribution. Both range and hash partitioning mechanisms are supported.

    Phase One implements the mapping and shuffling, but it is split down to more fine-grained steps, shown in the diagram below:

     

    Themis_phase1

     

    The Reader, Mapper and Sender are the “producer-side” of the phase and their functionality is as simple as their names suggest. The rest of the stages are the “consumer-side” of the phase and it is the place where the disk-scheduler, described in the previous section, comes into the picture.

    Phase Two consists of the sorting and the reduce phase. Each partition is sorted by key, always keeping the result in memory. The stages of phase two are shown below:

    Themis_phase2

     

    Evaluation

    Themis is evaluated using several applications, like sorting, WordCount, PageRank and CloudBurst. Both synthetic and real data are used for evaluation. Experiments are run on a fairly small, 20-node cluster, or even on a single node if data is small enough. Experiments test disk performance, memory management policies and compare Themis with Hadoop. Comparison with Hadoop shows that I/O bound applications can greatly benefit from Themis’ design. However, Themis does not use a distributed file system or any kind of replication and it is not clear whether those factors were taken into account when taking measurements. 

     

    Thoughts

    In my opinion, Themis is a quite immature system with some important limitations that need to be solved. However, it clearly shows that Hadoop is not the best fit for all cluster-sizes and types and that many deployments could be highly benefited by alternative designs.

     

    Happy coding and reading,

    V.

    Paper of the week: HaLoop

    Efficient Iterative Data Processing on Large Clusters

    In this post I’m presenting HaLoop, yet another modified version of Hadoop, as the name reveals. HaLoop aims to overcome those limitations of Hadoop MapReduce, which make iterative applications inefficient and hard to develop.

    The typical way of writing an iterative application in Hadoop MapReduce, is developing one MapReduce job for each iteration and also writing a driver program to manage the jobs. There are several disadvantages in this approach, including:

    • manual orchestration of the jobs
    • no explicit way to define a termination condition. For simplicity, many developers specify the number of iterations beforehand. However, it is often very difficult to predict how many iterations would be enough in order to achieve a solution accurate enough, especially when the termination condition is some kind of convergence criterion.
    • re-loading and re-processing of data that remain invariant across iterations

    HaLoop extends Hadoop’s API to support loops and easy expression of termination conditions. It also proposes caching and indexing mechanisms and novel loop-aware scheduler which can exploit cached data.

    System Overview

    As seen in the figure below, HaLoop reuses most of Hadoop’s master-slaves architecture, including HDFS as its distributed file system. HaLoop adds a control module to the master node, which is responsible for launching new jobs as iterations and also check the termination condition, if specified. The task scheduler and task tracker are modified in order to become loop-aware, while an indexing and a caching module were added.

    Haloop

    Programming Model

    HaLoop’s API was built to support recursive programs of the following general form:

    Ri+1 = R0 U (R\bowtie L)

    where, R0 contains initial values and L is invariant.

    The API provides ways to specify a termination condition. A loop can finish when either the maximum number of iterations has been reached or the difference between the result of two consecutive iterations is below some threshold. HaLoop does not specify any high-level language for writing applications. A developer needs to specify the loop body, consisting of one or more map-reduce pairs. Optionally, they can also define a termination condition and loop-invariant data.

    Scheduling

    HaLoop’s scheduler is built to send to the same physical machines tasks that access the same data on different iterations. This is what the authors call inter-iteration locality. Assuming we have a problem of the general relation above, the mappers that were assigned partitions of the L dataset only need to compute their results once. Their results also need to be sent to the corresponding reducers only after the first iteration. The only requirement for the scheduler to work is that the number of reducer tasks needs to remain constant throughout the program execution.

    Caching and Indexing

    HaLoop’s caching and indexing mechanisms are built in three levels:

    • Reducer Input Cache

    If a reducer receives data from a table that has been defined to be loop-invariant and its input cache is enabled, it will store this data in its local disk for future use and create an index on it. In a subsequent iteration, when the reducer will receive tuples that need to be sent to the user-specified reduce function, it will also search its cache for values corresponding to the same key. Both cached and new tuples will be passed to the reduce function.

    • Reducer Output Cache

    The reducer output cache stores the latest local result of the computation on this node. The aim of this cache is to reduce the costs of computing termination conditions. When all reducers have their local result cached, they can send them over to the master node, which can decide if the termination condition is fulfilled.

    • Mapper Input Cache

    The goal of the mapper input cache is to avoid as many non-local reads as possible during non-initial iterations. If during the first iteration a mapper reads a remote split, it will cache it for possible future use.

    Evaluation

    PageRank, Descendant Query and K-means are the applications used for evaluation and compared against Hadoop implementations. Each cache type was evaluated separately. Evaluation shows that the Reducer Input Cache significantly reduces the the I/O of the MapReduce shuffling phase. This is very important as for a lot of applications, the most time-consuming step is communicating data from mappers to reducers. The Reducer Output Cache is also beneficial and reduces the termination condition calculation cost up to 40%. The Mapper Input Cache only marginally benefits execution.

    Links

    Happy coding and reading,

    V.

    Early survey conclusions…

    How to write a MapReduce-related paper:

    1. Find a feature of MapReduce which is terrible for a *specific* setup or class of applications

    Tips:

    • Read the motivation of the MapReduce system
    • Specify set of goals, set of target applications and set of appropriate physical setups for MapReduce
    • Choose *anything* outside those sets

     

    2. Change this feature using the simplest solution

    Tips:

    • Download Hadoop 
    • Locate the class with the corresponding functionality 
    • Write your own implementation 
    • Build

     

    3. Perform evaluation against vanilla MapReduce, using *only* the setup and applications your version was built for.

    Tips: This is probably the hardest step…

    • Find the optimal configuration for your system, i.e. number of nodes, data sizes, data formats, etc.
    • Use default configuration for Hadoop
    • Keep tuning the configuration until your system outperforms Hadoop

     

    4. Publish

    Tips:

    • Find a cool name for your system 
    • Write a long, descriptive paper 
    • Focus on the limitations of MapReduce that your system overcomes 
    • Include a colourful architecture diagram, showing your system as an additional layer to the Hadoop stack (even though it’s only a couple of modified classes)
    • Publish

     

    Do research they said.. it will be fun they said.. it will be novel they said…

     

    V.

    Sector and Sphere

    design and implementation of a high-performance data cloud

    This week’s post discusses Sector and Sphere, two systems built at the University of Illinois, which together provide a framework for executing data-intensive applications in the cloud.

    The vision of the project is to provide a computing cloud which can be geographically distributed. The authors build on the assumption that available clusters are connected by high-speed networks and that data inputs and query outputs are relatively large.

     

    Sector

    Sector provides the storage layer of the system. It assumes a large number of well-connected nodes, located in the same or different datacenters. Datasets are divided into Sector slices, which are also the units of replication in the system. An overview of the Sector architecture is shown below:

    Sector

    Sector is one of the very few large-scale data storage services which mentions security issues. Security is provided in the system by an external dedicated security server, which maintains user credentials, file access information and IP addresses of authorized nodes. The actual service is provided by a master node and a set of slave nodes. The master stores metadata, monitors the slaves and handles client requests. Client connect to the master over SSL. The master then contacts the security server in order to verify the user’s credentials and obtain their access privileges. When a client requests to read a file, the master will check permissions and elect a slave node to serve the request. Communication between clients and slaves, as well as among slaves, is always coordinated by the master node.

    Sector relies on the local native file system of the node to store files. Each Sector slice is stored as a separate file in the native file system and is not further split (e.g. in blocks like in Hadoop). apart from metadata, the master also stores information about the current topology and available resources of the slaves. The master also periodically checks the number of existing copies of each file in the system. If this is under the specified replication factor, it chooses a slave node to create a copy.

    Sector uses UDP (with a reliable message-passing library on top) for messages and UDT for data transfers. 

     

    Sphere

    Sphere provides the computing layer of the system. It transparently manages data movements, message-passing, scheduling and fault-tolerance.

    Sphere’s computing paradigm is based on stream-processing. In this context, each slave corresponds to an ALU in a GPU or a core in a CPU. A Sphere program takes a stream as input and produces one or more streams as output. Each stream is divided into segments, which are assigned to different Sphere Processing Engines (SPEs) for execution. The computing paradigm is shown in the following diagram:

    Sphere

    One could easily implement MapReduce-like computations in Sphere. When writing a Sphere UDF, one can specify a bucket ID for each record of the output. This way, one can imitate the shuffling phase of MapReduce.

    When a client sends a request for data-processing to the master, it receives a list of slaves available for computation. The client selects the nodes it needs and start an SPE on each of them. While the SPE is running, it periodically reports its progress back to the client. 

    The Sphere client is responsible for splitting the input into segments and uniformly distributing it to the available SPEs. Data locality and data access concurrency are the main goals of the scheduling rules. In order to detect failures, the Sphere client monitors and the running SPEs and uses timeouts to regard them as failed. There exists no check-pointing mechanism, therefore an SPE that failed will have to be re-scheduled and re-computed completely. Sphere also uses a technique very similar to MapReduce in order to deal with stragglers.

     

    Thoughts

    Overall, the design of Sector and Sphere is straight-forward and a number of decisions seemed to have been taken in order to avoid implementation complexity. Sector performs best when dealing with a small number of relatively large files. The programming model is general and more flexible than MapReduce, lower-level though. The Terasort benchmark is used for evaluation and there is a comparison with Hadoop/MR, although not extensive enough.

     

    Links

     

    Happy reading and coding,

    V.

      Paper of the week: Incoop

      MapReduce for Incremental Computations

      This week I am writing about Incoop, a system developed to support incremental computations. Incoop transparently extends Hadoop MapReduce, i.e. existing applications can be executed on it without changing any code.

      Incoop’s motivation comes from the observation that a large amount of MapReduce jobs need to be run repeatedly with slightly different (most often augmented) input. This obviously leads to redundant computations and inefficiencies. In order to overcome this problem, one has to specially design their MapReduce application to handle this case by storing and using state across multiple runs. Incoop’s goal is to handle such cases without demanding any extra effort from the programmer.

      Incoop extends Hadoop to support incremental computations by making three important modifications:

      • Inc-HDFS: A modified HDFS which splits data depending on file contents instead of size
      • Contraction Phase: An additional computation phase added before the Reduce phase, used to control task granularity
      • Memoization-aware Scheduler: An improved scheduler which takes into account data locality of previously computed results while also using a work-stealing algorithm.

      System Overview

      The main goal of Incoop is supporting incremental computations while offering transparency and efficiency. Transparency means being backwards compatible, which puts a limitation on the modifications one can make on the initial MapReduce model. On the other hand, efficiency is mainly determined by the granularity of the elements which can be reused.

      To achieve these goals, Incoop modified HDFS and the MapReduce Scheduler and also added a memoization server in the architecture. The memoization server simply stores a mapping from the input of a computation to its output. Whenever a task runs, it queries the server to find out if the result it is going to produce already exists. A high-level view of the system is shown below:

      Incoop_arch

      Inc-HDFS

      Incremental HDFS mainly differs from original HDFs in the way that data partitioned into chunks. In original HDFS, data is divided into fixed-sized blocks and these blocks are also the units of computation for a map task. This design is not suitable for incremental computations, as the slightest change in the input could affect all subsequent blocks. One could propose computing differences between the entire input files, but this choices would be too expensive in our case.

      What Inc-HDFS does is dividing data into chunks based on content instead of size. The idea comes from the data deduplication area and uses fingerprints to match patterns and mark chunk boundaries. With this approach, a change in the input will most probably only affect the specific chunk of data, while the rest can be reused.

      Incremental MapReduce

      In the computation part, map task results are persistently stored and references are created which are inserted into the memoization server. If the running map task is part of an incremental computation, it first queries the memoization server and retrieves any result that already exists.

      Making the reduce phase support incremental computations is a bit more complicated that in the map case. That is because we do not have control over the input to the reducers, as this is defined directly by the mappers’ output. For this reason, Icoop adds an additional phase to the model, right before the reduce phase. This Contraction phase leverages the idea of Combiners to “break” the reduce task into a tree-hierarchy of smaller tasks. The process is run recursively until the last level where the reduce function is applied. In order to result into a data partitioning suitable for reuse, content-based partitioned is again performed on every level of Combiners.

      Scheduler

      The default MapReduce scheduler assigns tasks to nodes based on data locality and availability of execution slots. Data locality is a desirable factor in the case of Incoop as well, but memoized data need to be taken into account. The memoization-aware scheduler tries to schedule tasks on the nodes that contain data which can be reused. However, this approach might create load imbalance, in case some data is very popular, and lead to straggler tasks. To avoid this situation, the scheduler implements a simple work-stealing algorithm. When a node runs out of work, the scheduler will locate the node with the largest task queue and delegate a task to the idle node.

      Thoughts

      Incoop’s design decisions can yield a lot of discussion on several levels and a thorough evaluation and profiling is needed to measure overheads and locate bottlenecks. For example, the content-based partitioning in inc-HDFS could possibly lead to overloaded map tasks. Moreover, performance gains are questionable in the case where even one map task will need to be re-executed. This is why the reduce phase cannot be initiated before all map tasks have finished, so the speedup inevitably depends on how slow that map task will be.

      The paper contains an extensive evaluation section which I will not describe here. The authors use five applications, both data and computation intensive, and try to answer the above questions and study the tradeoffs of their approach.

      Happy reading and coding,

      V.

      Paper of the week: Nectar

      Automatic Management of Data and Computation in Datacenters

      This week’s paper, Nectar, comes from Microsoft Research and provides a way to encounter some very fundamental problems that occur in modern datacenters. Nectar is fundamentally built to interact with Dryad and LINQ

      Wait! Don’t stop reading!

      I know you know that Microsoft dropped Dryad a while ago and that they decided on admitting Hadoop’s victory, blah blah blah. But trust me, the ideas in this paper are beyond the specific platform and it’s worth reading nevertheless 🙂

       

      Motivation

      Nectar basically deals with data management in large datacenters. Manual data management can cause a lot of pain, including data loss and waste of resources due to careless or oblivious users. Especially in big data scenarios, where user applications generate large amounts of output data, it is common that this data is only accessed once and then the user forgets to delete it. Consequently, large parts of the datacenter’s storage is occupied by data that is never or rarely accessed.

       

      System Overview

      Nectar automates data management by associating the data with the computation which created it. Datasets can be uniquely identified by the computations that produced them. All successfully completed programs run on the datacenter are saved in a special store and can be used in order to re-create deleted data. In this way, data and computations can be used interchangeably: High-value data is cached and can be used instead of computation when possible and low-value data is garbage-collected and can be retrieved in case it is needed, by re-running the computation that produced it.

      Nectar’s input is a LINQ program. Like many of the high-level analysis languages, LINQ’s operators are functional, i.e. they accept a dataset as input, do a transformation on it and output a different dataset. This property allows Nectar to create the association between datasets and programs. 

      Datasets are classified in Nectar as primary or derived. Primary datasets are created once and are not garbage-collected. Derived datasets are created from primary or other derived datasets. A mapping is created between them and the program that produced them and they can be automatically deleted and re-created. In a scenario where we want to analyze website click logs, the raw click logs files would be primary, while all datasets produced by analysis on the click logs, e.g. most popular URLs, would be derived.

      The Nectar architecture consists of a client-side program rewriter, a cache server, a garbage collection service and a datastore on the underlying distributed file system, TidyFS. When a DryadLINQ program is submitted for execution, it first goes through a rewriting step. The rewriter consults the cache and tries to transform the submitted program into a more efficient one. The rewriter uses a cost estimator to choose the best alternative. The rewriter also decided which intermediate datasets to cache and always creates a cache entry for the final output of a computation. Rewriting can be useful in two scenarios: (a) sub-expression replacement by cached datasets and (b) incremental computations where cached data can be reused and save computation time. After the rewriting, the optimized program is compiled into a Dryad job and executed.

      The cache service is distributed and datacenter-wide. It serves lookup requests and in the case of a miss, it has a mechanism of backtracking all necessary computations in order to recreate the requested dataset. It also implements a cache replacement policy, keeping track of the least frequently accessed datasets.

      The garbage collector runs in the background without interfering with program execution or other datacenter jobs. It identifies datasets that are not referenced by any cache entry and deletes them. 

       

      Evaluation Results

      Nectar is deployed and evaluated on an internal 240-node cluster of Microsoft Research and was also evaluated using execution logs from 25 other production clusters. It was discovered that after its deployment, it could eliminate up to 7000 hours of daily computation.

      Overall, we could summarize Nectar’s advantages as following:

      • Space savings by caching, garbage collection and on-demand re-creation of datasets
      • Execution time savings by sub-computation sharing and support for incremental computations
      • Easy content management and dataset retrieval

      Implementation details and extensive evaluation can be found in the papers.

       

      Thoughts

      Personally, I love caching. I believe it’s a brilliant idea and I think we should use it more often, wherever applicable. And especially in big data systems. This recent (and enlightening) study on MapReduce workloads clearly shows that we can only benefit from caching.

       

      Happy coding and reading,

      V.