Chapter 1. A new paradigm for Big Data

[p1-2]

Traditional systems, and the data management techniques associated with them, have failed to scale to Big Data.

To tackle the challenges of Big Data, a lot of new technologies have emerged, many of which have been grouped under the term NoSQL. In some ways, these new technologies are more complex than traditional databases, and in other ways they're simpler. These systems can scale to vastly larger sets of data, but using these technologies effectively requires a fundamentally new set of techniques. They aren't one-size-fits-all solutions.

Many of these Big Data systems were pioneered by Google, including:

Another notable pioneer in the space was Amazon, which created an innovative distributed key/value store called Dynamo. The open source community responded in the years following with Hadoop, HBase, MongoDB, Cassandra, RabbitMQ, and countless other projects.

This book is about complexity as much as it is about scalability. Some of the most basic ways people manage data in traditional systems like relational database management systems (RDBMSs) are too complex for Big Data systems.The simpler, alternative approach is the new paradigm for Big Data. This approach is dubbed the Lambda Architecture.

How this book is structured

This book is a theory book, focusing on how to approach building a solution to any Big Data problem. It is structured into theory and illustration chapters.

Scaling with a traditional database

The example in this section is a simple web analytics application, which tracks the number of pageviews for any URL a customer wishes to track. The customer's web page pings the application's web server with its URL every time a pageview is received. Additionally, the application should be able to tell you at any point what the top 100 URLs are by number of pageviews.

You start with a traditional relational schema for the pageviews similiar to the table below:

Column name Type
id integer
user_id integer
url varchar(255)
pageviews bigint

Your back end consists of an RDBMS with a table of that schema and a web server. Whenever someone loads a web page being tracked by your application, the web page pings your web server with the pageview, and your web server increments the corresponding row in the database.

The following subsections discuss what problems emerge as you evolve the application: you'll run into problems with both scalability and complexity.

Scaling with a queue

As the traffic to your application is growing, you got a lot of "Timeout error on inserting to the database" error, since the database can't keep up with the load, so write requests to increment pageviews are timing out.

Instead of having the web server hit the database directly, you insert a queue between the web server and the database. Whenever you receive a new pageview, that event is added to the queue. You then create a worker process that reads 100 events at a time off the queue, and batches them into a single database update. This is illustrated in the figure below:

Figure 1.2 Batching updates with queue and worker

This scheme resolves the timeout issues you were getting. If the database ever gets overloaded again, the queue will just get bigger instead of timing out to the web server and potentially losing data.

Scaling by sharding the database

As your application continues to get more and more popular, and again the database gets overloaded. Your worker can't keep up with the writes; adding more workers to parallelize the updates doesn't help; the database is clearly the bottleneck.

The approach is to use multiple database servers and spread the table across all the servers. Each server will have a subset of the data for the table. This is known as horizontal partitioning or sharding. This technique spreads the write load across multiple machines.

The sharding technique you use is to choose the shard for each key by taking the hash of the key modded by the number of shards. Mapping keys to shards using a hash function causes the keys to be uniformly distributed across the shards. You do the following:

  1. Write a script to map over all the rows in your single database instance, and split the data into four shards. Since it takes a while to run this script, you turn off the worker that increments pageviews to avoid losing increments during the transition.
  2. Wrap a library around database-handling code that reads the number of shards from a configuration file, and redeploy all of your application code, since all application code needs to know how to find the shard for each key. You have to modify your top-100-URLs query to get the top 100 URLs from each shard and merge those together for the global top 100 URLs.

As the application gets more popular, you keep having to reshard the database into more shards to keep up with the write load:

Fault-tolerance issues begin

With so many shards, it becomes a frequent occurrence for the disk on one of the database machines to go bad. That portion of the data is unavailable while that machine is down. You do a couple of things to address this:

Corruption issues

You accidentally deploy a bug to production that increments the number of pageviews by two, instead of by one, for every URL and you don't notice until 24 hours later, but by then the damage is done. Your weekly backups don't help because there's no way of knowing which data got corrupted. After all this work trying to make your system scalable and tolerant of machine failures, your system has no resilience to a human making a mistake.

What went wrong?

As the application evolved, the system continued to get more and more complex: queues, shards, replicas, resharding scripts, etc. Developing applications on the data requires a lot more than just knowing the database schema; your code needs to know how to talk to the right shards, and if you make a mistake, there's nothing preventing you from reading from or writing to the wrong shard.

One problem is that your database is not self-aware of its distributed nature, so it can't help you deal with shards, replication, and distributed queries. All that complexity got pushed to you both in operating the database and developing the application code.

However, the worst problem is that the system is not engineered for human mistakes. As the system keeps getting more complex, it is more likely that a mistake will be made:

How will Big Data techniques help?

The Big Data techniques to be discussed address these scalability and complexity issues in dramatically:

  1. The databases and computation systems for Big Data are aware of their distributed nature. Sharding and replication are handled for you.
    • Shading: the logic is internalized in the database, preventing situations where you accidentally query the wrong shard.
    • Scaling: just add new nodes and the systems will automatically rebalance onto the new nodes.
  2. Make data immutable. Instead of storing the pageview counts as your core dataset, which you continuously mutate as new pageviews come in, you store the raw pageview information, which is never modified. When you make a mistake, you might write bad data, but at least you won't destroy good data. This is a much stronger human-fault tolerance guarantee than in a traditional system based on mutation. [p6]

NoSQL is not a panacea

Innovation in scalable data systems in the past decades include:

These systems can handle very large amounts of data, but with serious trade-offs:

These tools on their own are not a panacea. But when intelligently used in conjunction with one another, you can produce scalable systems for arbitrary data problems with human-fault tolerance and a minimum of complexity. This is the Lambda Architecture discussed throughout the book.

First principles

What does a data system do? An intuitive definition is:

A data system answers questions based on information that was acquired in the past up to the present.

Data is often used interchangeably with the word information. But for the remainder of this book, when we use the word data, we're referring to that special information from which everything else is derived.

The most general-purpose data system answers questions by looking at the entire dataset, which has the definition:

query = function(all data)

[p7]

The Lambda Architecture provides a general-purpose approach to implementing an arbitrary function on an arbitrary dataset and having the function return its results with low latency. This does not mean always using the same technologies to implement a database system; the Lambda Architecture defines a consistent approach to choosing those technologies and to wiring them together to meet your requirements.

Desired properties of a Big Data system

Not only must a Big Data system perform well and be resource-efficient, it must be easy to reason about as well.

Robustness and fault tolerance

Systems need to behave correctly despite any of the following situations:

These challenges make it difficult even to reason about a system is doing. Part of making a Big Data system robust is avoiding these complexities so that you can easily reason about the system

It's imperative for systems to be human-fault tolerant, which is an oft-overlooked property. In a production system, it's inevitable that someone will make a mistake, such as by deploying incorrect code that corrupts values in a database. If you build immutability and recomputation into the core of a Big Data system, the system will be innately resilient to human error by providing a clear and simple mechanism for recovery.

Low latency reads and updates

You need to be able to:

Scalability

Scalability is the ability to maintain performance in the face of increasing data or load by adding resources to the system. The Lambda Architecture is horizontally scalable across all layers of the system stack: scaling is accomplished by adding more machines.

Generalization

A general system can support a wide range of applications. Because the Lambda Architecture is based on functions of all data, it generalizes to all applications.

Extensibility

YExtensible systems allow functionality to be added with a minimal development cost, without having to reinvent the wheel each time you add a related feature or make a change to how your system works.

Oftentimes a new feature or a change to an existing feature requires a migration of old data into a new format. Part of making a system extensible is making it easy to do large-scale migrations. Being able to do big migrations quickly and easily is core to the approach under discussion.

Ad hoc queries

Every large dataset has unanticipated value within it. Being able to mine a dataset arbitrarily gives opportunities for business optimization and new applications. Ultimately, you can't discover interesting things to do with your data unless you can ask arbitrary questions of it.

Minimal maintenance

Maintenance is the work required to keep a system running smoothly. This includes:

An important part of minimizing maintenance is choosing components that have as little implementation complexity as possible. You want to rely on components that have simple mechanisms underlying them. In particular, distributed databases tend to have very complicated internals. The more complex a system, the more likely something will go wrong, and the more you need to understand about the system to debug and tune it.

You combat implementation complexity by relying on simple algorithms and simple components. A trick employed in the Lambda Architecture is to push complexity out of the core components and into pieces of the system whose outputs are discardable after a few hours. The most complex components used, like read/write distributed databases, are in this layer where outputs are eventually discardable.

Debuggability

A Big Data system must provide the information necessary to debug the system when things go wrong. The key is to be able to trace, for each value in the system, exactly what caused it to have that value.

Debuggability is accomplished in the Lambda Architecture through the functional nature of the batch layer and by preferring to use recomputation algorithms when possible.

The problems with fully incremental architectures

Traditional architectures look like the figure below:

Figure 1.3 Fully incremental architecture

What characterizes these architectures is the use of read/write databases and maintaining the state in those databases incrementally as new data is seen. For example, an incremental approach to counting pageviews would be to process a new pageview by adding one to the counter for its URL. The vast majority of both relational and non-relational database deployments are done as fully incremental architectures. This has been true for many decades.

Fully incremental architectures are so widespread that many people don't realize it's possible to avoid their problems with a different architecture. This is called familiar complexity (complexity that's so ingrained, you don't even think to find a way to avoid it).

The problems with fully incremental architectures are significant. This section discusses:

You'll see that the fully incremental version is significantly worse in every respect.

Operational complexity

With many complexities inherent in fully incremental architectures that create difficulties in operating production infrastructure, this section focuses on one: the need for read/write databases to perform online compaction, and what you have to do operationally to keep things running smoothly.

In a read/write database, as a disk index is incrementally added to and modified, parts of the index become unused. These unused parts take up space and eventually need to be reclaimed to prevent the disk from filling up. Reclaiming space as soon as it becomes unused is too expensive, so the space is occasionally reclaimed in bulk in a process called compaction.

Compaction is an intensive operation. The server places substantially higher demand on the CPU and disks during compaction, which dramatically lowers the performance of that machine during that time period. Databases such as HBase and Cassandra are well-known for requiring careful configuration and management to avoid problems or server lockups during compaction. The performance loss during compaction is a complexity that can even cause cascading failure: if too many machines compact at the same time, the load they were supporting will have to be handled by other machines in the cluster. This can potentially overload the rest of your cluster, causing total failure.

To manage compaction correctly, you have to:

The best way to deal with complexity is to get rid of that complexity altogether. The fewer failure modes you have in your system, the less likely it is that you'll suffer unexpected downtime. Dealing with online compaction is a complexity inherent to fully incremental architectures, but in a Lambda Architecture the primary databases don't require any online compaction.

Extreme complexity of achieving eventual consistency

Incremental architectures have another complexity when trying to make the system highly available. A highly available system allows for queries and updates even in the presence of machine or partial network failure.

Achieving high availability competes directly with another important property called consistency. A consistent system returns results that take into account all previous writes. The CAP theorem has shown that it's impossible to achieve both high availability and consistency in the same system in the presence of network partitions. Therefore, a highly available system sometimes returns stale results during a network partition.

In order for a highly available system to return to consistency once a network partition ends (known as eventual consistency), a lot of help is required from your application. [p11] Distributed databases achieve high availability by keeping multiple replicas of all information stored. When you keep many copies of the same information, that information is still available even if a machine goes down or the network gets partitioned, as shown in the figure below. During a network partition, a system that chooses to be highly available has clients update whatever replicas are reachable to them. This causes replicas to diverge and receive different sets of updates. Only when the partition goes away can the replicas be merged together into a common value.

Figure 1.4 Using replication to increase availability

Example: highly available counting *

For example, suppose you have two replicas with a count of 10 when a network partition begins. Suppose the first replica gets two increments and the second gets one increment. When it comes time to merge these replicas together, with values of 12 and 11, what should the merged value be? Although the correct answer is 13, there's no way to know just by looking at the numbers 12 and 11. They could have diverged at 11 (in which case the answer would be 12), or they could have diverged at 0 (in which case the answer would be 23).

To do highly available counting correctly, it's not enough to just store a count:

This is an amazing amount of complexity you have to deal with just to maintain a simple count.

In general, handling eventual consistency in incremental, highly available systems is unintuitive and prone to error. This complexity is innate to highly available, fully incremental systems. However, the Lambda Architecture structures itself in a different way that greatly lessens the burdens of achieving highly available, eventually consistent systems.

Lack of human-fault tolerance

The last problem with fully incremental architectures is their inherent lack of human-fault tolerance. An incremental system is constantly modifying the state it keeps in the database, which means a mistake can also modify the state in the database. Because mistakes are inevitable, the database in a fully incremental architecture is guaranteed to be corrupted.

This is one of the few complexities of fully incremental architectures that can be resolved without a complete rethinking of the architecture. Consider the two architectures shown in the following figure:

Figure 1.5 Adding logging to fully incremental architectures

In both cases, every event is permanently logged to an events datastore. By keeping every event, if a human mistake causes database corruption, you can go back to the events store and reconstruct the proper state for the database. Because the events store is immutable and constantly growing, redundant checks, like permissions, can be put in to make it highly unlikely for a mistake to trample over the events store. This technique is also core to the Lambda Architecture and is discussed in depth in Chapter 2 and Chapter 3.

Although fully incremental architectures with logging can overcome the human-fault tolerance deficiencies of those without logging, the logging cannot handle the other complexities that have been discussed.

Fully incremental solution vs. Lambda Architecture solution

One of the example queries implemented throughout the book serves as a great contrast between fully incremental and Lambda architectures. The query has to do with pageview analytics and is done on two kinds of data coming in:

The goal of the query is to compute the number of unique visitors to a URL over a range of time. Queries should be up to date with all data and respond with minimal latency (less than 100 milliseconds). Below is the interface for the query:

long uniquesOverTime(String url, int startHour, int endHour)

If a person visits the same URL in a time range with two user IDs connected via equivs (even transitively), that should only count as one visit. A new equiv coming in can change the results for any query over any time range for any URL.

Instead of showing details of the solutions which require covering many concepts such as indexing, distributed databases, batch processing, HyperLogLog, we'll focus on the characteristics of the solutions and the striking differences between them. The best possible fully incremental solution is shown in detail in Chapter 10, and the Lambda Architecture solution is built up in Chapter 8, 9, 14, and 15.

The two solutions can be compared on three axes: accuracy, latency, and throughput. [p14] The Lambda Architecture solution is significantly better in all respects. Lambda Architecture can produce solutions with higher performance in every respect, while also avoiding the complexity that plagues fully incremental architectures.

Lambda Architecture

Computing arbitrary functions on an arbitrary dataset in real time is not a simple problem. There's no single tool that provides a complete solution. Instead, you have to use a variety of tools and techniques to build a complete Big Data system.

The main idea of the Lambda Architecture is to build Big Data systems as a series of layers, as shown in the following figure.

Figure 1.6 Lambda Architecture

Each layer satisfies a subset of the properties and builds upon the functionality provided by the layers beneath it. Each layer requires a lot of work to design, implement, and deploy, but the high-level ideas of the whole system are easy to understand.

Starting everything from the query = function(all data) equation, you could ideally run the functions on the fly to get the results. However, this would take a huge amount of resources to do and would be unreasonably expensive. This is similar to having to read a petabyte dataset every time you wanted to answer the query of someone's current location.

The most obvious alternative approach is to precompute the query function, which is called the batch view. Instead of computing the query on the fly, you read the results from the precomputed view. The precomputed view is indexed so that it can be accessed with random reads:

batch view = function(all data)

query = function(batch view)

This system works as follows:

  1. Run a function on all the data to get the batch view.
  2. When you want to know the value for a query, run a function on that batch view.
  3. The batch view makes it possible to get the values you need from it very quickly, without having to scan everything in it.

For example, you're building a web analytics application, and you want to query the number of pageviews for a URL on any range of days. If you were computing the query as a function of all the data, you'd scan the dataset for pageviews for that URL within that time range, and return the count of those results.

Instead, the batch view approach (as show in the figure below) works as follows:

  1. Run a function on all the pageviews to precompute an index from a key of [url, day] to the count of the number of pageviews for that URL for that day.
  2. To resolve the query, retrieve all values from that view for all days within that time range, and sum up the counts to get the result.

Figure 1.7 Architecture of the batch layer

Creating the batch view (with this approach described so far) is a high-latency operation, because it's running a function on all the data you have. By the time it finishes, a lot of new data will have collected that's not represented in the batch views, and the queries will be out of date by many hours. We will ignore this issue for the moment (because we'll be able to fix it) and assume it's fine for queries to be out of date by a few hours and continue exploring this idea of precomputing a batch view by running a function on the complete dataset.

Batch layer

The batch layer is the portion of the Lambda Architecture that implements the batch view = function(all data) equation.

The batch layer stores the master copy of the dataset and precomputes batch views on that master dataset, as show in the figure below. The master dataset can be thought of as a very large list of records.

Figure 1.8 Batch layer

The batch layer needs to be able to do two things:

  1. Store an immutable, constantly growing master dataset.
  2. Compute arbitrary functions on that dataset.

This type of processing is best done using batch-processing systems. Hadoop is the canonical example of a batch-processing system.

The batch layer can be represented in pseudo-code like this:

function runBatchLayer():
  while(true):
    recomputeBatchViews()

The batch layer runs in a while(true) loop and continuously recomputes the batch views from scratch. This is the best way to think about the batch layer at the moment, though in reality, the batch layer is a little more involved.

The batch layer is simple to use:

The following is an example of a batch layer computation. You don't have to understand this code; the point is to show what an inherently parallel program looks like:

Api.execute(Api.hfsSeqfile("/tmp/pageview-counts"),
    new Subquery("?url", "?count")
        .predicate(Api.hfsSeqfile("/data/pageviews"),
            "?url", "?user", "?timestamp")
        .predicate(new Count(), "?count");)

This code computes the number of pageviews for every URL given an input dataset of raw pageviews:

Serving layer

The serving layer is a specialized distributed database that loads in a batch view (emitted by the batch layer as the result of its functions) and makes it possible to do random reads on it, as seen in the following figure. When new batch views are available, the serving layer automatically swaps those in so that more up-to-date results are available.

Figure 1.9 Serving layer

A serving layer database supports batch updates and random reads, but it doesn't need to support random writes. This is a very important point, as random writes cause most of the complexity in databases. By not supporting random writes, these databases are extremely simple. That simplicity makes them robust, predictable, easy to configure, and easy to operate. ElephantDB, the serving layer database discussed in this book, is only a few thousand lines of code.

Batch and serving layers satisfy almost all properties

The batch and serving layers support arbitrary queries on an arbitrary dataset with the trade-off that queries will be out of date by a few hours. It takes a new piece of data a few hours to propagate through the batch layer into the serving layer where it can be queried. Other than low latency updates, the batch and serving layers satisfy every property desired in a Big Data system, as outlined in Section 1.5:

The batch and serving layers satisfy almost all the properties you want with a simple and easy-to-understand approach. There are no concurrency issues to deal with, and it scales trivially. The only property missing is low latency updates. The final layer, the speed layer, fixes this problem.

Speed layer

Since the serving layer updates whenever the batch layer finishes precomputing a batch view, the only data not represented in the batch view is the data that came in while the precomputation was running. The purpose of the speed layer is to compensate for those last few hours of data, in order to have arbitrary functions computed on arbitrary data in real time. The goal of the speed layer is to ensure new data is represented in query functions as quickly as needed for the application requirements (see figure below).

Figure 1.10 Speed layer

The speed layer is similar to the batch layer in that it produces views based on data it receives, but has two major differences:

  1. The speed layer only looks at recent data, whereas the batch layer looks at all the data at once.
  2. In order to achieve the smallest latencies possible, the speed layer doesn't look at all the new data at once. Instead, it updates the realtime views as it receives new data instead of recomputing the views from scratch like the batch layer does. The speed layer does incremental computation instead of the recomputation done in the batch layer.

The data flow on the speed layer can be formalized with the following equation:

realtime view = function(realtime view, new data)

A realtime view is updated based on new data and the existing realtime view.

The Lambda Architecture in full is summarized by these three equations:

batch view = function(all data)

realtime view = function(realtime view, new data)

query = function(batch view, realtime view)

A pictorial representation of these ideas is shown in the figure below:

Figure 1.11 Lambda Architecture diagram

Instead of resolving queries by just doing a function of the batch view, you resolve queries by looking at both the batch and realtime views and merging the results together.

The speed layer uses databases that support random reads and random writes, so they're orders of magnitude more complex than the databases in the serving layer, both in terms of implementation and operation. However, once data makes it through the batch layer into the serving layer, the corresponding results in the realtime views are no longer needed. This means you can discard pieces of the realtime view as they're no longer needed. This is a wonderful result, because the speed layer is far more complex than the batch and serving layers. This property of the Lambda Architecture is called complexity isolation, meaning that complexity is pushed into a layer whose results are only temporary. If anything ever goes wrong, you can discard the state for the entire speed layer, and everything will be back to normal within a few hours.

Example of the web analytics application (continued) *

To continue the example of building a web analytics application that supports queries about the number of pageviews over a range of days, recall that the batch layer produces batch views from [url, day] to the number of pageviews. The speed layer keeps its own separate view of [url, day] to number of pageviews. Whereas the batch layer recomputes its views by literally counting the pageviews, the speed layer updates its views by incrementing the count in the view whenever it receives new data. To resolve a query, you query both the batch and realtime views as necessary to satisfy the range of dates specified, and you sum up the results to get the final count. The little work that needs to be done to properly synchronize the results will be covered in a future chapter.

Eventual accuracy *

Some algorithms are difficult to compute incrementally. The batch/speed layer split gives you the flexibility to use the exact algorithm on the batch layer and an approximate algorithm on the speed layer. The batch layer repeatedly overrides (corrects) the speed layer, so the approximation gets corrected and your system exhibits the property of eventual accuracy. For exmple, computing unique counts can be challenging if the sets of uniques get large. It's easy to do the unique count on the batch layer, because you look at all the data at once, but on the speed layer you might use a HyperLogLog set as an approximation.

Performance and robustness *

The resulting system has both performance and robustness. [p20] You can still get low latency updates, the complexity of achieving this doesn't affect the robustness of your results because the speed layer is transient. The transient nature of the speed layer gives you the flexibility to be very aggressive when it comes to making trade-offs for performance. For computations that can be done exactly in an incremental fashion, the system is fully accurate.

A number of trends in technology deeply influence the ways to build Big Data systems.

CPUs aren't getting faster

CPUs hit the physical limits of speed. That means that if you want to scale to more data, you must be able to parallelize your computation. This has led to the rise of shared-nothing parallel algorithms and their corresponding systems, such as MapReduce. Instead of just trying to scale by buying a better machine, known as vertical scaling, systems scale by adding more machines, known as horizontal scaling.

Elastic clouds

Elastic clouds, also known as Infrastructure as a Service (IaaS), is another trend in technology. Amazon Web Services (AWS) is the most notable elastic cloud. Elastic clouds allow you to:

Elastic clouds has the following advantages:

Vibrant open source ecosystem for Big Data

There are five categories of open source projects under discussion.

As these open source projects have matured, companies have formed around some of them to provide enterprise support. For example:

Enterprise support:

Company products:

Example application: SuperWebAnalytics.com

This book discusses building an example Big Data application. The data management layer is built for a Google Analytics–like service. The service will be able to track billions of pageviews per day.

The service will support a variety of different metrics. Each metric will be supported in real time. The metrics range from simple counting metrics to complex analyses of how visitors are navigating a website.

The following metrics are to be supported:

The layers that store, process, and serve queries to the application will be build out.

Summary

[p23]

The problems of scaling a relational system with traditional techniques (e.g. sharding) can be beyond scaling itself as the system becomes more complex to manage, extend, and even understand. The upcoming chapters focus as much on robustness as on scalability, and shows that when building things the right way, both robustness and scalability are achievable in the same system.

The benefits of data systems built using the Lambda Architecture is not only scaling:

The next chapter discusses how to build the Lambda Architecture. You'll start at the very core of the stack with how you model and schematize the master copy of your dataset.

Doubts and Solutions

Verbatim

p10 on Operational complexity

In a read/write database, as a disk index is incrementally added to and modified, parts of the index become unused. These unused parts take up space and eventually need to be reclaimed to prevent the disk from filling up. Reclaiming space as soon as it becomes unused is too expensive, so the space is occasionally reclaimed in bulk in a process called compaction.

Question: What is a disk index?