AT A GLANCE
Apache’s Giraph project is said to be “a scalable, fault-tolerant implementation of graph-processing algorithms in Apache Hadoop clusters of up to thousands of computing nodes.” Giraph is in use at companies like Facebook and PayPal to help represent and analyze the billions (or even trillions) of connections across massive datasets. Giraph was inspired by Google’s Pregel framework and integrates well with Apache Accumulo, Apache HBase, Apache Hive, and Cloudera’s Impala.
As in the core MapReduce framework, in Giraph all data and workload distribution related details are hidden behind an API. As of Feb 2015 the Giraph API was used on top of MR1 via a slightly unstable mechanism, although a YARN-based implementation was planned.
In Giraph, a worker node or a slave node is a host (either a physical server or even a virtualized server) that performs the computation and stores data in HDFS. Such workers load the graph and keep the full graph or just a part of it (in case of distributed graph analysis) in memory. Very large graphs are partitioned and distributed across many worker nodes.
(Note: the term partition has a different meaning in Giraph. Here, the partitioning of the graph is not necessarily the result of the application of a graph-partitioning algorithm. Rather, it is more a way to group data based on a vertex hash value and the number of workers. This approach is comparable to the partitioning that is done during the shuffle-and-sort phase in MapReduce.)
A Giraph algorithm is an iterative execution of “super-steps” that consist of a message exchange phase followed by an aggregation and node or edge property update phase. While vertices and edges are held in memory, the nodes exchange messages in parallel. Therefore, all worker nodes communicate and send each other small messages, usually of very low data volume.
After this message-exchange phase, a kind of aggregation is done. This leads to an update of the vertex and/or edge properties and a super-step is finished. Now, the next super-step can be executed and nodes communicate again. This approach is known as Bulk Synchronous Processing (BSP). The BSP model is vertex based and generally works with a configurable graph model G<I,V,E,M>, where I is a vertex ID, V a vertex value, E an edge value, and M a message data type, which all implement the Writable interface (which is well known from the Hadoop API).
However, the iterative character of many graph algorithms is a poor fit for the MapReduce paradigm, especially if the graph is a very large one like the full set of interlinked Wikipedia pages. In MapReduce, a data set is streamed into the mappers and aggregation of intermediate results is done in reducers. One MapReduce job can implement one super-step. In a next super-step, the whole graph structure — together with the stored intermediate state of the previous step — have to be loaded from HDFS and stored at the end again. Between processing steps, the full graph is loaded from HDFS and stored there, which is a really large overhead. And let‘s not forget that the intermediate data requires local storage on each worker node while it passes the shuffle-sort phase.
For very large graphs, it is inefficient to repeatedly store and load the more or less fixed structural data. In contrast, the BSP approach loads the graph only once, at the outset. The algorithms assume that runtime-only messages are passed between the worker nodes. To support fault-tolerant operation the intermediate state of the graph can be stored from time to time, but usually not after each super-step.