07 Mar 2016
Spark SQL: Relational Data Processing in Spark
Spark SQL is successor of Shark, SQL engine built on top of Spark. Spark SQL makes two contributions: (1) DataFrame (2) Catalyst.
DataFrame
- Inspired from DataFrame in R, Python Pandas. But unlike R and Python, and like Spark, DataFrames are lazily evaluated lending the opportunity to optimize relational queries (e.g. computing multiple aggregates in one pass over data).
- Bridges the gap between two different models of computation in Spark: (1) procedural, and (2) declarative (e.g. SQL). Spark SQL allows seamless intermixing of procedural and declarative APIs.
- Collection of structured records (analogous to tables in RDBMS). Can be manipulated through common APIs of RDDs. Similar to RDDs, DataFrames are lazily evaluated i.e., unmaterialized and embodies a logical plan to construct them and it is possible to materialize DataFrames by calling “actions”. During lazy evaluation, all the operations applied on DataFrames build up abstract syntax tree (AST). Before execution, the tree is passed to Catalyst to optimization. Logical plan of DataFrames is evaluated eagerly. In other words, if the user references a column which does not exist, the error is notified to user immediately not in the query optimization phase. This helps developers to immediately notice and debug simple errors.
- DataFrames support all basic SQL data-types. In addition, they support complex data types like structs, arrays. maps, union.
- Beside APIs, DataFrames also support native SQL syntax.
- Spark SQL can also cache DataFrames in memory in columnar format (column-major order) which is more compact than JVM/Python objects. Also columnar format enables efficient filtering and projection of rows.
- DataFrames also support User Defined Function (UDF). UDFs do not need special syntax or tools and can be created by simply lambda function in Scala.
Catalyst
- Catalyst is an extensible query optimizer. User can add new optimization rule, new data source, data-source specific rules, and new data types.
- Catalyst optimizer is a rule-based optimizer. The AST is rewritten based on the rules until it reaches fixed-point - the tree stops changing upon applying the re-write rules. Logical optimizations include constant folding, predicate pushdown, projection pruning, null propagation, Boolean expression simplification, and other rules. Physical optimizations include pipelining projections or filters into one map operation, pushing operations from the logical plan into data sources that support predicate or projection pushdown.
- Catalyst generates Java bytecode (also type checked) which in turn transformed into native instructions in run-time by JVM JIT compiler to speed-up the query execution. Without native code generation AST has to be interpreted for each row of data, traversing AST and virtual function calls would make the execution very slow.
- User Defined Types: Catalyst provide options to user for defining user defined data types that constructed from Catalyst built-in types. Built-in types are optimized to store in columnar compressed format, so user defined datatypes have the same efficiency of built-in types by simply composing from them. To define a new type, user has to provide conversions methods from object of that type to a Catalyst row and vice-versa
Discussion
- Impala also optimizes and generates native code from query during run-time. Maybe that is one of the reason there is not much difference in performance between Spark SQL and Impala.
References
07 Mar 2016
Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing
The Case Against MapReduce
- Many applications (almost all machine learning algorithms) are iterative. Analysis is essentially a sequence of “iteration” where the result/output of previous iteration is used as input for next iteration.
- The MR way of computing iterative algorithms is series of MR job. After each MR job, intermediate results are saved in disk and then loaded again for the next iteration MR job. Although from fault tolerance perspective, this is beneficial because all intermediate results are stored in disk; so if a machine fails, only the work in done in current iteration is lost. But the cost of storing all intermediate data in disk outweighs the benefit of fault tolerance which occurs infrequently in practice.
Resilient Distributed Datasets (RDDs)
- RDDs are distributed parallel data structures that can be partitioned among different machines, provide fault tolerance. RDDs provide rich set of high level APIs to programmer; Spark authors argue that these APIs often enough to express computation of large number of application and more expressive than existing frameworks like MR, HaLoop.
- RDD can be created two ways: (1) from a data source (2) from other RDDs by transformations.
- Computation on RDD is divided into two types: (1) transformations (2) actions. Transformations (e.g. map, join, filter) on RDDs are lazily evaluated; Spark keeps the lineage (log) of a RDD that how it is derived from other RDDs. Lineage also provides fault tolerance for RDDs, because if a RDD is lost in case of failure, the information for recomputing the RDD is already available. Actions (e.g. collect, take, count) are evaluated eagerly. In other words, Spark only starts computation when it encounters “actions”; otherwise Spark keeps lineage of RDDs as in lazy evaluation manner.
- Lazy evaluation in Spark is essential for setting up the data pipeline, setting up directed acyclic graph (DAG) of a job where nodes represent computation to be performed and edges represent data-flow between tasks. DAG model of data pipeline was first introduced in Dryad [1].
- In case of failures, only the lost partitions of RDDs will be recomputed from the lineage information of RDDs. Unlike other frameworks, where the whole application has to be rolled back to the last checkpoint, Spark only recomputes the lost paritions.
- For iterative applications, online analytics, intermediate results i.e., RDDs can be ‘persisted’ in memory, disk. Users can also prioritize specify which RDDs should be spilled to disk first.
Discussion
- RDD concept is interesting because now fault tolerance is not a property of a job, but it is a property of RDD. Computations are expressed in functions of RDD, so for computations (transformation of RDD) that are very expensive, Spark provides an API to checkpoint those RDDs instead of checkpointing the whole state of a job.
- The design choice of Spark is interesting. One of the assumption behind Spark was opposite of mantra popular in that time. For fault-tolerance, most distributed frameworks checkpoints i.e., stores intermediate results into the disk, even in case of failures of task, computation is not lost. In other words, computation i.e., CPU time is expensive relative to the time spent in checkpointing i.e., disk IO. Spark went against this mantra assuming disk IO is more expensive than CPU time. Therefore, it is better to just recompute the whole computation in case of failure instead of checkpointing periodically. Although a recent paper [2] and Project Tungsten of Spark showed that CPU is the bottleneck for Spark not disk or network. But this does not invalidate Spark assumption. The CPU bottleneck is due to serialization, deserialization of JVM objects, virtual method calling which would not make a difference even if we switch to classic checkpointing approach.
References
[1] Dryad
[2] Making Sense of Performance in Data Analytics Frameworks
07 Mar 2016
One Trillion Edges: Graph Processing at Facebook-Scale
Background
- Apache Giraph has started as open source implementation of Google Pregel [2]. I have covered the Pregel paper here. If you are new to distributed graph processing systems, please first cover the summary of Pregel.
- In this review, I will discuss Facebook experience and modification computation abstraction of Pregel model. Implementation details like parallelization support in nodes to enable more local parallel computation, memory optimization is left out.
Sharded Aggregators
- Aggregators in Giraph is implemented using Zookeeper znode (a type of data storage in Zookeeper). But Zookeeper has inherent limitation of maximum one megabytes znode. To circumvent this, workers can directly send values to master, but the master becomes the bottleneck. Therefore, “sharded aggregator” is introduced where workers are assigned randomly in shards. A worker becomes the leader of shards and compute the aggregator of its assigned shards. Now aggregation is balanced between master and the shared leaders.
Master and Worker Computation
- “The methods preSuperstep(), postSuperstep(), preApplication(), and postApplication() were added to the Computation class and have access to the worker state.” - It seems very application (k-means clustering) specific. I am skeptical how useful these methods are.
- In Pregel, the Master abstains from actual graph processing, instead focuses on global coordination of workers.
- Facebook modified Giraph to introduce computation in master node. Master node computes central, and global computation and the result of this is available via aggregators to all workers in the following superstep.
- This model is very useful for lightweight task in global scope i.e., applies to all worker nodes. One simple example each iteration-bounded PageRank where the workers check the number of supersteps performed and “vote to halt” if desired number of computation is completed. This check can be easily performed in master after each superstep removing the complexities to check in every worker.
Composable Computation
- Many graph application perform different type of computation each implementing different program logic in different stages. One “Computation” per vertex model in Pregel leads to code-bloat (a switch statement with different type of computation depending on the stage) in vertex computation. Giraph separates the computation itself from the vertex, computation is defined in separate class and the vertex can choose which type computation to perform at current superstep.
- Pregel model allows only message and message combiner type. Giraph splits this into two different message type: incoming message type and outgoing message type for each computation class. Giraph ensures types of computation, messages and combiners match at run-time.
Superstep Splitting
- In some graph applications, vertices can send messages in pattern that exceeds the memory of a worker. Messages that are aggregatable i.e., commutative and associative, combiners solve this problem by combining multiple messages into one message. But some messages are not
- In case of such message patterns, a “logical” superstep is split into multiple supersteps. A vertex sends only a fraction of all messages, and the receiving vertices can partially compute its state based on the received messages. Similar to aggregators, vertex update computation must be commutative and associative.
Discussion
- Varying degree of parallelism in graph processing systems is a huge issue. Simple graph computation like PageRank converges slowly because only a small sub-graph still has not converged yet. Stanford GPS [4] tries to address the issue of slow convergence in several graph algorithms.
- IBM has introduced “Think Like a Graph” [5] abstraction for graph processing. I am curious the centralized computation in master compares to this paradigm.
[1] One Trillion Edges: Graph Processing at Facebook-Scale
[2] Pregel
[3] Apache Giraph
[4] GPS
[5] From “Think Like a Vertex” to “Think Like a Graph”
07 Mar 2016
Goal
- Sharing a cluster between multiple cluster computing frameworks like YARN, MPI.
- To share a cluster among multiple frameworks, statically partition the cluster into sub-clusters and assign each framework a sub-cluster for computation. It is obvious how static partitioning leads to cluster under-utilization or overloaded. Other way is to introduce another layer of VM (e.g. AWS) and dynamically partition them. It increases complexity to manage another layer of VMs and inefficient.
- Frameworks like YARN, Dryad employ fine grained resource sharing model. But there is no way to share fine grained resource across the frameworks.
Overview
- Mesos is a thin resource sharing layer that enables fine-grained sharing across diverse cluster computing frameworks. It provides client frameworks a common interface for accessing cluster resources.
- Like YARN, Mesos is a resource negotiator for frameworks. Each framework is responsible for scheduling its own allocated resource. Advantage is each framework can schedule to its domain specific workloads instead of one-size-fits-all scheduler for all workloads. This is known as “two-level scheduling”.
- Mesos offers resources to the frameworks in “resource offer”. A resource offer is a bundle of resources that a framework can allocate on a cluster node to run tasks. Mesos decides how many resources to offer each framework, based on an organizational policy such as fair sharing, while frameworks decide which resources to accept and which tasks to run on them.
- Many frameworks are optimized for data locality - tasks are executed in the node where data is located. A framework can reject resource offers if the offer does not satisfy the constraint of data-locality and so on. Delay scheduling can be utilized where a framework waits for limited time to acquire resources on nodes storing the data. This is effective because most of the tasks in Mesos are short-lived.
Allocation Modules
- Resource allocation decisions are provided by pluggable allocation modules (e.g. fair sharing, priority sharing) in Mesos. An organization can roll out its own resource allocation policy without need to change Mesos itself.
- Allocation can be revoked if tasks do not complete in short period of time since Mesos strictly focuses short-lived tasks. Mesos regains resources from long running tasks by killing them. Before killing a task, Mesos gives the framework a grace period to save its state or clean-up.
- Allocation modules decide whether resources should be revoked from frameworks since resource revocation heavily depends on organizational policy.
- Guaranteed allocation: Killing tasks has negligible impact on framework like Hadoop which is designed keeping fault tolerance in mind. But for frameworks like MPI, killing tasks can be fatal since tasks are highly synchronized and inter-dependent. For these frameworks, Mesos introduces “guaranteed allocation” - allocation offer that a framework can use without losing tasks. Tasks are never killed if the job utilizes resources below guaranteed allocation. One or more tasks can be killed if resource usage goes above guaranteed allocation.
Isolation
- Isolates framework resources using existing OS containers e.g. Linux containers, Solaris jails.
Fault Tolerance
- Soft State: Master can recompute its internal state from messages from slaves and framework schedulers.
- Hot-standby: Masters is shadowed by several hot standbys. Upon current master’s failure, Zookeeper selects a new master from the standbys. New master constructs internal state from the messages.
- Mesos also reports task, executor failures to the frameworks’ scheduler.
- What if the Framework Master/Scheduler fails: Mesos allows each framework to register multiple backup scheduler. When the current scheduler fails, Mesos notifies another scheduler to take over. Frameworks are responsible for sharing states between their schedulers.
Discussion
- Mesos is designed for frameworks for short-lived jobs e.g. batch analytics. But cluster is still statically partitioned between log-running jobs (e.g. web-servers, database servers) and batch analytics. It is possible to go one step above and also handle services and batch analytics in same resource negotiator.
- Mesos is pre-emptive that means it prevents conflict over resources beforehand. But study indicate that users of frameworks hence the frameworks themselves widely over-estimates resource needed for finishing a job i.e., users claim more resources than actually needed for finishing a job. This leads to servers under-utilization. Google Omega Scheduler [3] solves this problem by optimistic resource allocation i.e., more resources are promised to frameworks than actually available. Conflicts are resolved only after conflicts over resource actually occurs.
References
[1] Mesos
[2] Hadoop
[3] Dryad
[4] Omega
07 Mar 2016
Pregel: A System for Large-Scale Graph Processing
- Developed at Google for distributed graph processing
- Bulk Synchronous Parallel (BSP) parallel computing model
- Distributed message passing system
Why Graph Computation Is Different
- Poor locality of memory access
- Very little work has to be done per vertex
- Degree of parallelism changes substantially over the course of execution
Why not MapReduce
- Even though much data might be unchanged from iteration to iteration, the data must be reloaded and reprocessed at each iteration, wasting I/O, network bandwidth, and processor resources.
- The termination condition might involve the detection of when a fixed point is reached. The condition itself might require an extra MapReduce job on each iteration, again increasing resource use in terms of scheduling extra tasks, reading extra data from disk, and moving data across the network.
Bulk Synchronous Parallel (BSP) Model
- Computations are consist of a sequence of iterations, called superstep.
- Supersteps end with barrier synchronization
- During a superstep S, framework calls user-defined “Computation” function on every “logical worker” (in Pregel, a vertex) that works on local (worker’s own piece of) data in parallel.
- All communication is from superstep S to superstep S+1.
- Ensure that programs are inherently free of classic concurrency bugs e.g. deadlocks, data-races. Performance of BSP model is predictable.
Pregel Model of Computation
- “Think like a Vertex” - vertex centric computation. No edge centric computations
- Algorithm terminates when all vertices vote to halt and there are no messages to deliver.
- Initially every vertex is active.
- All active vertices participate in the computation in a superstep. “Computation” function specifies behaviour at a single vertex V and a single superstep S. During “Computation”, a vertex can mutate state of its own and outgoing edges.
- At the end of each superstep a vertex can send message to other vertices in the graph. Messages are typically sent along outgoing edges. The messages are delivered at the beginning of next superstep. Message may be sent to any vertex whose identifier is known.
- A vertex deactivates itself by vote to halt. Deactivated vertices are not allowed to participate in computation. Vertices are reactivated upon receiving message.
- Graph state is kept in-memory, occasional saving checkpointing to disk for fault tolerance
Message Passing
- Vertices communicate directly with one another by message.
- A vertex can send any number of messages.
- Each message consists of a message value, and the destination vertex.
- There is no guaranteed order of messages.
- Messages are guaranteed be delivered and not duplicated.
Master
- Partitions the input graph and assigns one or more partitions to each worker.
- Responsible for coordinating the workers.
- Keeps list of all workers known to be alive, worker’s unique identifiers, addressing informations and which partition of the graph is assigned to the worker.
- Size of data structure is proportional to the number of partitions not the number of vertices, number of edges.
- Maintains statistics of the progress of computation and the state of the graph.
- Runs HTTP server for user monitoring.
Worker
- Maintains the current state of assigned partition(s) of the graph, responsible for computation of the assigned vertices, and delivers messages.
Message Combiner
- Sending message incurs overhead which can be reduced in some cases.
- System calls “Combine()” for several messages intended for a vertex V into a single message containing the combined message.
- User defined, application specific.
- Not enabled by default.
- No guarantee which messages will be combined, or in what order. Therefore, combiners should be enabled for commutative and associative operations.
Aggregator
- Provides mechanism for global communication, monitoring, and data.
- Each vertex can provide a value to aggregator in each superstep S, the system combines these values using a reduction operator, and resulting value is made available to all vertices at next superstep S+1.
- Only reduces input values from a single superstep.
- Possible to define a sticky aggregator that uses input values from all supersteps.
- Aggregator operation should be commutative and associative.
Fault Tolerance
- Achieved through checkpointing.
- Upon failures, instead of re-computing all vertices from last checkpoint, only the lost partition is re-computed.
Graph Mutation
- Input graph can be mutated in run-time.
- Mutations become effective in the superstep after the requests are issued.
- Within superstep, removals are performed first. All mutations are before “Computation()”. First edge removal, vertex removal. Additions are after removal, first vertex addition, edge addition.
- Local mutations (mutating own edges) becomes immediately effective since there is no reason of conflicts.
Experiment
- Naive Single-Source-Shortest-Path was used
- The time for initializing the cluster, generating the test graphs in-memory, and verifying results is not included in the measurements
- Checkpointing was disabled
- Default hash partitioning was used
Critique
- No fault tolerance for master is described in the paper
- How varying degree of parallelism i.e., load-balancing among workers over the course of execution is handled not mentioned
- How the message delivery is guaranteed is not mentioned
- Given the power-law distribution of real world networks, static hash partitioning is sub-optimal. This is is addressed in PowerGraph paper