Share

Archive for May, 2011

Clearspring’s Big Data Architecture, Part 2

Wednesday, May 18th, 2011

This is part two of a four part series on Clearspring’s approach to big data analytics. (See part 1.)  This post will focus on building tree-based data structures to store the massive quantity of information that Clearspring receives each day. Trees have several useful characteristics that make them the ideal data structure for our needs. For example, trees provide a compressed representation, an index, of our data where a path through a tree represents a row of data. Trees can also be sharded and distributed throughout our clusters which allows us to create massive trees composed of hundreds of different shards. At query time we can easily gather and merge information from each shard.

 

Tree Building Process

The foundation of our distributed analytics framework is the tree building process. We selected a tree-based storage structure for our processed data because we feel that it fills a useful middle ground between the datamodel-centric view of row oriented databases and the query-centric view more common with column-oriented databases.

The TreeBuilder consumes data from one or more data sources (typically a set of Stream Servers) and then builds a tree based on the input data and a job configuration that defines the desired tree structure. An example tree might look like:

One of the more interesting aspects of our tree data structure is the concept of data attachments. We often run into situations where we’d like to store certain bits of information in a way that is easily retrievable. Some nodes in a tree may have many millions of children so questions like “which URLs were clicked the most times?”, can be very expensive. This is where the data attachment comes into play. We can use operations from our recently open sourced stream-lib project to find and store the “top” URLs as a data attachment on the parent node. This top element is dynamically updated anytime new data is added to the tree. As each new record is processed the data attachment is updated to reflect the current list of n top elements. Tops may be lossy but if the top is large enough the results are generally accurate enough for our needs. Stream-lib is a very powerful library that makes it possible to count a huge number of elements in a distributed system, find the top-k elements from massive lists of data, and perform membership detection using bloom filters. You can read more about stream-lib here or checkout the source code on github.

Let’s take a look at an example JSON configuration used to create a tree:

In the example above we create a simple tree with the root “urltree”. Underneath “urltree” we have two branches,“ymd” and “ym”, representing data broken down by day and by month. Each of those branches have two levels of children. The first level contains a list of dates and the second level has a list of URLs. The first level of both branches has one data attachment called “topurls”. This data attachment will contain a list of the URLs with the highest cardinality. So if we want to know which URL was seen the most times in the month of May or on a specific day in May that data can be retrieved quickly without requiring iteration over all of the children of the date node. The resulting tree with sample data may look like:

We use Berkely DB JE as our underlying database for each job. Berkley DB JE is a powerful database and provides a nice abstraction layer to the data structure we wanted to use. It is very common for us to have jobs that store more data than we can practically fit into memory. We created a library called PageDB that provides an abstraction layer between Berkley DB JE and the rest of our system. PageDB tracks and estimates memory consumption for each database that is paged into memory. It groups keys and their values together for efficient block level compression which reduces the disk IO required to store and retrieve data. Rather than storing first class Java objects in memory, PageDB keeps the data in compressed byte format which is significantly more efficient than storing the first class Java objects in the heap.

 

Whats Next?

Parts one and two of this series introduced Hydra, Stream Servers, and the tree-building process. These components provide the foundation necessary to discuss the distributed query engine we use to retrieve information from our clusters and that will be the topic of the next post in this series. If you’d like to work on problems like these we are always hiring.

Big Data DC – Analytics at Clearspring

Thursday, May 12th, 2011

Last night Clearspring hosted a Big Data DC meeting at our offices in Tysons Corner, VA. Matt Abrams, Director of Data Analytics, presented to a crowd of around 40 people about how the data processing team breaks down 4-5 terabytes a day, including philosophies about design, speed and simplicity.

In case you missed out on the fun (that included pizza and beer!), here is the presentation:

Clearspring Raises $20MM from IVP

Tuesday, May 10th, 2011

We’re excited to announce that we’ve just closed an investment round led by Institutional Venture Partners (IVP). IVP has backed some of the best data-driven companies on the web, including Twitter and Zynga. 86 out of the 300 companies they last invested in have gone public.

In the last couple of years, we’ve grown tremendously across the board. AddThis is now used by over 9MM domains, reaching over 1B unique users monthly. It’s so popular that we’re now one of a handful of sites to have a Google PageRank of 10. We’re processing 10TB of data/day – every week, we process as much data as the entire digital Library of Congress.

AddThis has in turn fueled the growth of our Audience Platform, enabling hundreds of advertisers to deliver more relevant advertising. All the while we’ve been hiring as fast as we can to continue to support the needs of all the great companies that use our products.

We want to thank all of you for working with us over the years. We hope to show you just how thankful we are by putting this money to work on great new products. As always, if you have any ideas about how we can better serve publishers and advertisers, hit us up.

Look forward to continuing to rock (and roll) with all of you!

Cheers,

Hooman

 

Clearspring Listed in Lead411′s “2011 Hottest DC Companies”

Tuesday, May 10th, 2011

Clearspring is honored to be listed as one of Lead411′s Hottest DC Companies in 2011 for the tech industry. The list recognizes fast growing companies which also includes LivingSocial, SBNation and OPower.

It’s great to be recognized among such a talented and hardworking group of companies. Clearspring is very proud to be a part of the DC tech community and cheers for the success of our neighbors.

CS Motor Club Featured in the Washington Post

Monday, May 9th, 2011

Clearspring’s CS Motor Club crew was featured in today’s Washington Post in an article about unconventional recruitment techniques. A handful of Clearspring team members with a love of cars started the Motor Club as a side hobby. They have since purchased a $500 car, or hooptie, and plan to compete in the 24 Hours of LeMons race this summer.

There has been talk of other CS clubs starting up but so far nothing has materialized to rival the so-far impressive results of the motor gang.

Of course, we are hiring and it may take some fresh team members to grab the reigns and get something going. So make sure to check out our job openings and send in your application. You could be the next founder of CS Knitting Club or CS Bowling Club!

Clearspring’s Big Data Architecture, Part 1

Thursday, May 5th, 2011

This is the first post in a four part series that will describe the data analytics architecture Clearspring has created to process large amounts of semi-structured data.

On a daily basis, the Clearspring network yields tens of billions of new, unique data points. In storage terms, we ingest 4 to 5 TB of new data each day; that could easily double in 12-18 months. That data must be processed as live streams, re-processed from the archive as new algorithms are developed and able to be queried in flexible ways. An interesting mix of batch, live and hybrid jobs are employed. To keep pace with a rapidly expanding network, the architecture must not only efficiently handle the current set of tasks, but it must also enable significant growth while guarding against hardware and software faults. To satisfy these needs, we built an entire software suite that leverages open source technologies and many in-house software components.

 

Desired System Characteristics and Challenges

  1. Efficient and reliable storage of large amounts of data on unreliable machines in a distributed system with fast writes, data partitioning and replication, and data streaming capabilities.
  2. Knowledge discovery.  Our system consumes a large amount of raw data that in and of itself is not very interesting so we need to be able to extract the interesting information by analyzing large streams of data from multiple sources.
  3. Management of distributed resources.  We wanted the system to distribute jobs in the cluster, determine when it is safe to execute the job, report failures, and provide a nice GUI to visualize the current state of the cluster.
  4. Speed.  The small things add up when you do them over a billion times.  For example, if we have 2.5 billion messages and we shave 1 millisecond (.001 seconds) off the time it takes to process a single event we will have saved nearly 29 days of computation time.  In our case, milliseconds matter!

There are many systems available that provide some of the characteristics listed above, including  Enterprise SANs, HDFS, Open Stack (Swift), CouchDB, Redis, ElephantDB, Voldemort, Hadoop, Cassandra, and others.   While we do use some of these projects for various components of our architecture, as an example we use Cassandra for our distributed counting system that keeps track of how many times a URL is shared and serves over 200M view requests daily, the primary system we use for our distributed processing needs is a project we call Hydra.

 

Hydra

The Hydra system we created internally has four main functional areas:
  1. Capability to partition, store, and stream data in a distributed system.
  2. Software that takes an input data stream, processes that data (filtering, counting, membership detection, etc) and then stores the batch output in Berkley DB JE which uses sequential writes and indexes the batch output for fast reads.
  3. A set of software modules that enable ad-hoc queries against tree-based data structures.
  4. Command and control subsystem responsible for monitoring and executing jobs in the cluster.

 

Streaming and Splitting Data

We have too much raw data to store on a single machine and we were not interested in purchasing an enterprise SAN capable of storing petabytes of data.  While we looked at several distributed file systems like HDFS, GlusterFS, and others we ended up going with a standard ext3 filesystem and implemented our own distribution and replication framework on top of that standard file system. There are two standard modules, Splitters and Stream Servers.

Splitter

The Splitter consumes an input stream from a data source.  The data source could be a flat file on the local system, an input stream sourced by a Stream Server, a firehose of data exposed via an API, or any other source you can think of.  The format of the source data from the stream is described to the splitter (in the near future we will be using Zookeeper to store stream formatting information).  Each splitter consumes a sub-set of the data based on a hash function. The Splitter then uses the stream format information and the job configuration script (usually a JSON file) to filter, transform, and create data elements which will be stored in n partitions on the local file system.  After a checkpoint is reached (all available input data has been consumed or some time limit has been reached), the files the Splitter stores are made available to other processes in the cluster via the Stream Servers.

Splitter processes are responsible for ensuring that they do not reprocess data that has already been consumed.  They do this by storing meta data in their local job directory that indicates which files they’ve consumed and an offset indicating how far into those files the process has read.  Because we only append to log files and never modify existing content the splitter process doesn’t need to re-read data it has already processed when more data is appended to a file.  We feel having each client track its own state is a better approach than having the StreamServer attempt to track the current state of every consumer in the cluster.

This simple setup can accomplish some very powerful things beyond the obvious capability of storing and partitioning raw data to a local node in the cluster.  One useful function the Splitter provides is recombining many small files into a smaller number of time sorted files.  This can be accomplished by having n splitter nodes consume from m sources where n < m.  All of the data from the data sources will be consumed but they will be persisted into a smaller number of files than originally required.

Stream Server

Stream Servers provide access to the files stored on one node to any other node in the cluster. The Stream Server transmits raw bytes to clients because they may be in a compressed format. This limits the the number of times the data in the file is decoded and  reduces the total amount of data transmitted over the network. This approach does prevent the Stream Server from applying fine grained filters to the individual log lines. However we think that this limitation is acceptable given CPU cycles saved by only decoding the file once on the client side and the massive reduction in network IO.

Requests for data come into the Stream Server in the form of regular expressions which describe which files the client would like to receive.  Here is an example of what a request might look like:

 

 

The snippet above would tell the Stream Server to find all files in any directory under ‘job/1/*/live/split/us’ that has a file that matches the regular expression defined in the match field.  The date format will be replaced at run-time with the set of dates the client is interested in and the mod variable will be replaced with the modulus for the client.  This approach enables a client to stream data for a specific partition and time range without requiring complex filtering on the server side.

 

Conclusion

This was part one in a series of posts where we will explain how Clearspring processes and stores multiple terabytes of data daily.  In upcoming articles we will explore how and why we use a tree based data structure, our distributed query system, and the command and control infrastructure that binds it all together.  Subscribe to our RSS feed or follow us on twitter to stay informed on upcoming articles.

 

By clicking OPT OUT you will not receive Clearspring targeted advertising. Learn more.

NAI IAB