Breaking News

Apache Spark

AT A GLANCE

Along with phenomena such as container technology Docker, Apache Spark emerged in 2014 as a new darling of the opensource world, with widespread take-up by data teams and developers, backed by a highly active community and the startup Databricks.

Started in 2009 as a UC Berkeley research project to create a clustering computing framework addressing target workloads poorly served by Hadoop, Spark went open source in 2010 and its September 2014 release counted over 170 contributors.

apache spark

The original goal of the Spark project was to provide a clustering framework that could handle interactive queries and iterative computation, such as machine-learning algorithms — two areas not addressed well by Hadoop at that point. Combining a high-speed, in-memory analytics engine with an elegant API for building data processing applications, Spark shines at iterative workloads, such as machine learning, that repeatedly access the same data sets.

Spark doesn’t replace Hadoop. Rather, it offers an alternative processing engine for workloads that are highly iterative. By avoiding costly writes to disk, Spark jobs often run many orders of magnitude faster than Hadoop MapReduce. By “living” inside the Hadoop cluster, Spark uses Hadoop’s native structures (HDFS, HBase, etc) for the end points of the data pipeline, reading raw data and storing final results.

Spark is designed to provide a more flexible model which supports many of the multi-pass application that falter in MapReduce. It accomplishes this goal by taking advantage of memory whenever possible in order to reduce the amount of data that is written to and read from disk. Spark is not a tool for making MapReduce easier to use like Pig or Hive. It is a complete replacement for MapReduce that includes its own work execution engine.

Spark is based on 3 core processes:

  • Resilient Distributed Dataset (RDD) – structures that contain data that you want to transform or analyze. They can either be be read from an external source such as a file or a database or they can be created by a transformation.
  • Transformation– A transformation modifies an existing RDD to create a new RDD. For example, a filter that pulls ERROR messages out of a log file would be a transformation.
  • Action– An action analyzes an RDD and returns a single result. For example, an action would count the number of results identified by our ERROR filter.

Additionally, Spark is built to run on top of Apache’s Mesos cluster manager. This allows Spark to operate on a cluster side-by-side with Hadoop, Message Passing Interface (MPI), HyperTable, and other applications. This allows an organization to develop hybrid workflows that can benefit from both dataflow models, with the cost, management, and interoperability concerns that would arise from using two independent clusters.

There is a decent quick-start at the Databricks site if you’re so inclined.

PROS

  • Elegant, consistent API for building data processing applications
  • Allows interactive querying and analysis of large data sets on Hadoop clusters
  • Runs iterative workloads orders of magnitude faster than Hadoop
  • Can be deployed in a Hadoop cluster in a stand-alone configuration, in YARN, in Hadoop MapReduce, or on Mesos
  • RDDs (resilient distributed data sets) can be shared with other Spark projects, allowing you to mix SQL, machine learning, streams, and graphs in the same program
  • Web UI provides all sorts of useful information about the Spark cluster and running tasks

CONS

  • No security
  • Poor documentation
  • No cluster resource management
  • Steep learning curve

TRENDING

HISTORY

Apache Spark got its start in 2009 at UC Berkeley’s AMPLab as a way to perform in-memory analytics on large data sets. At that time, Hadoop MapReduce was focused on large-scale data pipelines that were not iterative in nature. Building analytic models on MapReduce in 2009 was a very slow process, so AMPLab designed Spark to help developers perform interactive analysis of large data sets and to run iterative workloads, such as machine-learning algorithms, that repeatedly process the same data sets in RAM.

Yahoo has two large Spark projects, one for personalizing news pages for Web visitors and another for running analytics for advertising. For news personalization, the company uses Machine Learning (ML) algorithms running on Spark to figure out what individual users are interested in, and also to categorize news stories as they arise to figure out what types of users would be interested in reading them.

When you do personalization, you need to react fast to what the user is doing and the events happening in the outside world, and if you look at Yahoo’s home page, which news items are you going to show? You need to learn something about each news item as it comes in to see what users may like it. And you need to learn something about users as they click around to figure out that they’re interest in a topic. Databricks’ CTO Matei Zaharia

To do this, Yahoo (a major contributor to Apache Spark) wrote a Spark ML algorithm 120 lines of Scala. (Previously, its ML algorithm for news personalization was written in 15,000 lines of C++.) With just 30 minutes of training on a large, hundred million record data set, the Scala ML algorithm was ready for business.

Yahoo’s second use case shows off Hive on Spark (Shark) interactive capability. The Web giant wanted to use existing BI tools to view and query their advertising analytic data collected in Hadoop. “The advantage of this is Shark uses the standard Hive server API, so any tool that plugs into Hive, like Tableau, automatically works with Shark,” Zaharia says. “And as a result they were able to achieve this and can actually query their ad visit data interactively.”

RDDs: Resilient Distributed Data sets

The consistent use of RDDs throughout the stack is one of the features that make Spark so powerful. Both conceptually and in implementation, RDDs are quite simple; most of the methods in the RDD class are less than 20 lines long. At the core, RDDs are a collection of distributed records, backed by some form of persistent storage, along with a lineage of transformations.

Existing distributed memory abstractions such as key-value stores and databases allows fine-grained updates to mutable state. This forces the cluster to replicate data or log updates across machines to maintain fault tolerance. Both of these approaches incur substantial overhead for a data-intensive workload. RDDs instead have a restricted interface that only allows coarse-grained updates that apply the same operation to many data items (such as map, filter, or join). In Spark, driver programs are written as a series of transformations of RDDs, followed by actions on them. Transformations, as the name suggests, create new RDDs from existing ones by changing them in some way, such as by filtering the data according to some criteria. Actions work on the RDDs themselves. An action might be counting the number of instances of a data type or saving RDDs to a file.
spark rdd fault tolerance
This allows Spark to provide fault-tolerance through logs that simply record the transformations used to build a dataset instead of all the data. This high-level log is called a lineage and the figure above shows a code snippet of the lineage being utilized. Since parallel applications, by their very nature, typically apply the same operations to a large portion of a dataset, the coarse-grained restriction is not as limiting as it might seem. In fact, the Spark paper showed that RDDs can efficiently express programming models from numerous separate frameworks including MapReduce, DryadLINQ, SQL, Pregel, and HaLoop.

Additionally, Spark also provides additional fault tolerance by allowing a user to specify a persistence condition on any transformation which causes it to immediately write to disk. Data locality is maintained by allowing users to control data partitioning based on a key in each record. (One example use of this partitioning scheme is to ensure that two datasets that will be joined are hashed in the same way.) Spark maintains scalability beyond memory limitations for scan-based operations by storing partitions that grow too large on disk.

Although you can’t modify an RDD, you can easily create a new one with different values. Immutability is an important property of distributed data sets; it means that we don’t have to worry about another thread or process changing the value of an RDD when we’re not looking — a traditional problem in multithreaded programming. It also means we can distribute RDDs throughout a cluster for execution without having to worry about synchronizing changes to the RDD across all nodes.

RDD immutability also plays a role in fault tolerance of Spark applications. Because each RDD keeps a history of computations to arrive at its current value and no other process could have made a change, recomputing an RDD that went down with a lost node is simply a matter of going back to the original persistent data partition and rerunning the computations on a different node. (Most partitions in Hadoop are persisted across multiple nodes.)

RDDs can be constructed from multiple types of data partitions. Most of the time data comes from HDFS, where the term “partition” is literal. However, RDDs can also be constructed from other persistent stores such as HBase, Cassandra, SQL databases (via JDBC), Hive ORC (Optimized Row Columnar) files, and other storage systems that expose a Hadoop InputFormat API. Regardless of the source of the RDD, the behavior is the same.

Another nice benefit of Spark is the ease with which RDDs can be shared with other Spark projects. Because RDDs are used throughout the Spark stack you can freely mix SQL, machine learning, streams, and graphs in the same program.

Spark transformations
Spark transformations are lazy, meaning that no results are computed until an action requires the results to be returned to the user.

The final note on Spark transformations: They’re lazy, meaning that no computation is performed until an action requires a result to be returned to the driver. This is mostly relevant when working at the interactive Scala shell. As RDDs are incrementally transformed, there’s no cost — until an action is performed. It is then that all values are computed and the results returned to the user. Further, because RDDs can be cached in memory, frequently used results don’t need to be recomputed each time.

Executing Spark applications

In order to submit a Spark job to the cluster, the developer needs to execute the driver program and connect it to a cluster manager (also known as a cluster master). The cluster manager presents a consistent interface to the driver, so the same application can be run on any supported cluster type.

Spark currently supports dedicated Spark (stand-alone), Mesos, and YARN clusters. Each driver program running in the cluster allocates resources and schedules tasks independently of every other. While providing application isolation, this architecture makes it difficult for the cluster to efficiently manage RAM, Spark’s most precious resource. Multiple high-memory jobs submitted simultaneously can be starved of resources. Although the stand-alone cluster manager implements a simple resource scheduler, it offers only FIFO scheduling across applications, and it is not resource aware.

Generally speaking, Spark developers must be much closer to the metal than data analysts using higher-level applications like Hive or Pig. For example, because the driver is scheduling tasks on the executors, it needs to run close to these worker nodes to avoid network latency.

Both driver and cluster manager HA are important. If the driver dies, the job will stop. If the cluster master dies, no new jobs can be submitted, but existing stages will continue to execute. As of Spark 1.1, master HA is only available with stand-alone Spark clusters via ZooKeeper. There is no driver HA.

Squeezing maximum performance from a Spark cluster can be somewhat of a black art, involving experimentation with various combinations of options for driver, executors, memory, and cores and measuring the results to optimize CPU and RAM usage for a specific cluster manager. Little documentation exists for these kinds of operational tasks, and you’ll likely have to resort to scouring mailing lists and reading source code.

Spark application architecture
The Spark application architecture. Spark currently can be deployed in a Spark stand-alone, YARN, or Mesos cluster. Note that each driver program running in the cluster allocates and schedules tasks independently of every other.

Monitoring and operations

Each driver has a Web UI, typically on port 4040, that displays all sorts of useful information about running tasks, schedulers, executors, stages, memory and storage usage, RDDs, and so on. This UI is primarily an information tool and not one for managing the Spark application or cluster. Nevertheless, it’s the go-to tool for debugging and performance tuning — nearly everything you need to understand about what’s happening in your application is here.

Although a great start, the Web UI can be rough around the edges. For example, viewing historical jobs requires navigating to a separate history server, except when running a stand-alone-mode cluster manager. But the biggest shortcoming is the lack of any operational information or control. Starting and stopping nodes, viewing their health, and other cluster-level statistics aren’t available. Running a Spark cluster is still a command-line operation.

Spark's web UI
Spark’s Web UI provides a wealth of information about running tasks, but management of the cluster is done entirely from the command line.

COMPARISONS

Spark vs. Tez

Beyond the fact that Spark and Apache’s Tez project both execute directed acyclic graphs (DAGs), the two frameworks are like apples and oranges, differing in both their audience and their design. Even so, I’ve seen a lot of confusion in IT departments around the differences between these two frameworks.

Tez is an application framework designed for application developers that need to write efficient, multistage, MapReduce jobs. For example, in Hive 0.13, HQL (Hive Query Language) is parsed by the language compiler and rendered as a Tez DAG, which maps the flow of data to processing nodes, for efficient execution. A Tez DAG is built up by the application, edge by edge, vertex by vertex. Users will never need to know how to build a Tez DAG or even be aware of its existence.

The real difference between Spark and Tez lies in the implementations. In Spark applications, the same worker nodes are reused across iterations, eliminating the startup costs of a JVM. Spark worker nodes also cache variables, eliminating the need to reread and recompute values across iterations. This is what makes Spark well suited for iterative programming. The downside is that Spark applications consume cluster resources even when the cache is stale. It’s difficult to optimize resource consumption with Spark running on the cluster.

Tez, while supporting multistage job execution, does not have any real form of caching. Variables are cached to the extent that the planner will schedule jobs requiring values from previous stages on the same nodes if possible, but there isn’t a well-planned cross-iteration or broadcast variable mechanism in Tez. In addition, Tez jobs incur JVM startup overhead. Thus, Tez is best suited to processing very large data sets where startup time is small part of the overall job processing time.

As is often the case, there’s a good deal of cross-pollination of ideas in the Hadoop community, and many of the best are making their way into other projects. For example, YARN-1197 will allow Spark executors to be dynamically resized, so they can return resources to the cluster when they’re no longer needed. Similarly,Stinger.next will bring the benefits of cross-query caching to traditional Hadoop applications like Hive.

An integrated analytics ecosystem

The underlying RDD abstraction of Spark forms the core data structure for the entire Spark ecosystem. With modules for machine learning (MLlib), data querying (Spark SQL), graph analysis (GraphX), and streaming (Spark Streaming), a developer can seamlessly use libraries from each in a single application.

For example, a developer can create an RDD from a file in HDFS, transform that RDD into a SchemaRDD, query it with Spark SQL, then feed the results into an MLlib library. Finally, the resulting RDD can be plugged into Spark Streaming to apply predictive modeling to a message feed. Doing all of that outside of Spark would require using multiple libraries, marshaling and transforming data structures, and putting a whole lot of work into deploying it. Mucking about with three or four separate applications not designed to work together isn’t for the faint of heart.

The integrated stack makes Spark uniquely valuable for interactive data exploration and repeated application of functions to the same data set. Machine learning is the sweet spot for Spark, and the ability to share RDDs across the ecosystem transparently greatly simplifies writing and deploying modern data analytics applications.

However, these advantages don’t come without a price. Barely into 1.x, the system has a number of rough edges. The lack of security (Spark doesn’t run in Kerberised clusters, nor has job control), lack of enterprise-grade operational features, poor documentation, and the requirement for rare skills mean that today Spark is best suited for early adopters or organizations that have a specific requirement for large-scale machine learning models and are willing to invest what it takes to build them.

The decision to deploy Spark comes down to “horses for courses.” For some, the benefits of adopting a high-speed, in-memory analytics engine today will be so compelling that the return on investment can be justified. For the others, more mature tools, perhaps slightly slower, with enterprise-ready capabilities and people with the skills to care and feed them will be a better organizational fit.

In either case, watch this space. Spark has introduced a number of innovative ideas into the big data processing market, with great momentum behind it. It will surely become a major player as it matures.

USE CASES

from Steven Nunez, Infoworld

Writing Spark applications

Spark, written in Scala, provides a unified abstraction layer for data processing, making it a great environment for developing data applications. Spark comes with a choice of Scala, Java, and Python language bindings that are, for the most part, equivalent except at the bleeding edge, where only Scala implementations are available.

One of the nice features in Spark is the ability to work interactively from the Scala or Python console. This means you can try out code and immediately see the results of execution. This is handy both for debugging, where you can change a value and proceed again without going through a compile step, and for data exploration, where a typical process consists of tight loops of inspect-visualize-update.

Developers coming from other functional programming languages, such as LISP, Haskell, or F#, will have little difficulty adapting to Spark programming beyond learning the API. Thanks to the excellent collections system of Scala, applications written with Spark’s Scala API can be remarkably clean and concise. Adjusting to Spark programming mostly involves keeping in mind the distributed nature of the system and knowing when object and functions need to be serializable.

Programmers coming from a background in procedural languages, like Java, may not find the functional paradigm so easy to pick up. Finding good Scala and functional programmers is one of the biggest challenges for organizations thinking about adopting Spark (or Hadoop for that matter).

About davidn

Check Also

Pivotal HAWQ

Pivotal HAWQ

AT A GLANCE Pivotal‘s Hawq is a closed-source product offered as part of their PivotalHD …

2 comments

  1. I have researched Spark on numerous web sites. This was the best summary I read. Great description of the basic technology and its strengths and weaknesses. Thank you Steve for such a great piece.

Leave a Reply

Your email address will not be published. Required fields are marked *