Moving beyond MapReduce for Graphs

Many problems by nature form a graph such as the web, transport and social media. However, a common large scale distributed processing pipeline, MapReduce is ill suited for such tasks. MapReduce requires the data chunks to be processed independently. In other words, every job needs to know all the information to compute. This independence is certainly not the case with graphs and jobs might require previous computation or information from neighbouring jobs in order to calculate. To address this issue, Google’s Pregel architecture employs a message passing system creating a “large-scale graph processing” framework.

Another problem is the fact that MapReduce moves data around, for example by shuffling, in order to process it. This approach entails graph partitions moving around machines incurring high network overhead. A similar problem arises when MapReduce jobs are chained to implement iterative graph algorithms. Therefore, “MapReduce requires passing the entire state of the graph from one stage to the next - in general requiring much more communication and associated serialization overhead.”

Finally, “coordinating the steps of a chained MapReduce adds programming complexity that is avoided by Pregel’s iteration over supersteps”. MapReduce is not iterative, it can handle single iteration and requires the user to handle the iteration, i.e. the chaining of multiple MapReduce jobs. This situation can get complicated really quick for real world graph algorithms running at scale not only to implement but also to debug when things go wrong.

Pregel

Google Pregel Architecture

Google Pregel Architecture

The user algorithm is distributed to many machines one of which becomes the Master. The Master partitions the graph into multiple graph partitions and assigns them to workers. A worker can have multiple graph partitions which contain a set of vertices and their corresponding outgoing edges which might point to any vertex in the graph, possibly stored on another worker.

Once the setup is complete, the master tells each worker to execute a superstep. At the beginning of a superstep, workers save their current state into persistent storage for fault tolerance. Workers then Compute() the user function for active vertices in their graph partitions sending and receiving messages asynchronously. When the worker is done computing, it tells the master it has finished and how many vertices will be active in the next superstep.

For message passing, 2 queues are maintained for superstep S and S+1 to simultaneously receive messages while computing the current superstep. Any message sent at superstep S is received at superstep S+1 to avoid locking at message level. There is no guarantee on the ordering of received messages. While computing, a worker can send messages to other workers including itself depending on where the vertex is stored. Messages are sent in a single network message when a certain message buffer size threshold is reached in order to reduce network traffic. If an optional combiner is specified by the user, messages can be combined when either outbound or inbound reducing network usage and storage respectively. For example, several messages can be combined into one if the user is interested in the sum of their value.

Aggregators can be specified by the user to collect results at each superstep. Workers provide values to an aggregator instance to produce a single local value Aggr. At the end of the superstep, workers combine the partially reduced aggregator local values into a global value using a tree-based reduction and deliver it to the master.

When all workers complete a superstep, the master initiates the execution of the next superstep. This process repeats until there are no active vertices. The vertices become inactive after they vote to halt and there are no incoming messages to wake them up again. When all vertices are inactive, the execution of the algorithm terminates.

Alternative to Message Passing

An alternative to message passing is a state passing model in which the sent messages are stored locally and only exchanged at the end of a superstep. In other words, the model latches onto the synchronisation barrier between supersteps to exchange the state of message queues rather than during computation. As an immediate advantage, workers don’t incur any network traffic during computation. At the end of a superstep, messages can be sent in bulk, larger than the current buffer size, compensating for the handshake overhead of opening and closing multiple connections to different machines. In this case, all messages destined to a another worker will be sent in one network connection.

A disadvantage would be a longer pause time in between supersteps. State passing approach would be worse whenever computation is less than information passing. For example, if nodes only compute the average of neighbours, getting neighbour values outweighs computing an average causing the state to be transferred at the end of the first superstep. Perhaps, the user can specify depending on the algorithm when messages should be exchanged prioritising computation or communication. In this manner, the message passing of the Pregel execution can be fine tuned to optimise the performance of the user’s algorithm and dataset.

Scalability Limitation

A common limitation with large-scale distributed frameworks is the network traffic message passing creates. For dense graphs, the network could pose as a bottleneck when a lot of messages are passed during computation. Similarly if there is a node with a high degree, whenever it sends or receives messages a single worker will have to deal with all the network traffic causing that worker to slow down and increase wait time on superstep barrier synchronisation.

Another limitation is the superstep barrier itself when every worker waits for other workers to complete to proceed to the next superstep. When scaled horizontally, the increased probability of failing machines might mean that the computation can wait a long time until all workers have completed and are ready for the next superstep. For example, while a worker is being recovered another can fail causing another recovery during which the overall computation is stagnating. Unlike MapReduce in which other sub branches of the computation such as reduction continue, Pregel needs to wait for the barrier to be passed.