Max's Output

Archive for July 2010

Managing Indexes in Cassandra using Async Triggers

with 6 comments

[This post is written by Maxim Grinev and Martin Hentschel]

Suppose you are building a Cassandra application and you want to speed up your queries via indexing. Cassandra does not support secondary indexes at first, but storing redundant data (in a different layout) will give you the same effect. The main drawback is that your application (the code that writes to the DB) needs to take care of managing the index. Every time you write to the DB, you also need to maintain your index. This will notably slow down the response time of any user of your web application.

In Figure 1 whenever a user request forces a write to the database (Step 1), the application also updates the index. At least two operations to the database layer are needed, the response time of this request increases. The advantage of this approach is that your index is always in sync with the data. Any query to the index will see the latest result (Step 2).

Figure 1: Application inserts data and builds index in one step.

We recently proposed an extension to Cassandra, which we call Async triggers. An Async trigger will listen on a column family inside Cassandra. Whenever a modification to this column family is made, the trigger will be scheduled for an asynchronous execution. In our case, the logic to build and maintain the index shifts from the application to the trigger. This means the application has less work to do and can return faster. The response time of a user request will be reduced.

In Figure 2 a write to the database (Step 1) will return as soon as the write is finished. An Asynch trigger is scheduled to update the index. This trigger will run some time after the response to the client. This also means that for a short period of time, the data and the index will be out of sync (i.e. inconsistent). A query to the index (Step 2) might now always see the latest results. We believe that this is acceptable for many web applications. For example in Twitter it is totally fine if a search ignores tweets that have been posted less than a second ago.

Figure 2: An Asynch trigger maintains the index.

Of course you may get similar good response times using other architectures. For example if you explicitly use queues to separate writes to the database and maintenance of indexes. We are currently doing research on advantages and disadvantages of Async triggers with respect to such architectures.

Example: Index users by name

Here is a concrete example of how to index a user database not only by user id, but also by name (a secondary index). Figure 3 shows a possible layout of column families in Cassandra. The first column family “Users” stores data about users. Each row is naturally indexed by its row key (in our case it is the user id). The second column family “Index” stores redundant data to quickly retrieve users by their name. For example if you want to look up the name “Sue”, you will find two users with ids 2 and 4.

Figure 3: Database layout storing users by id and name.

Using an Async trigger to update the index whenever there is a write to the users column family works involves two steps. First, we need to implement the trigger and second, we need to specify the column family that the trigger will listen on.

To implement a trigger we will implement the execute method of the ITrigger interface. First, we connect to the local Cassandra instance. The trigger will execute within Cassadra, no network overhead is involved. Then we will get the user name of the user that has just been inserted (or modified). The user id is provided by the key parameter. We can insert this user id into the respective user name row. (Note that I have removed any exception handling or null checks for ease of reading.)

public class UpdateIndex implements ITrigger
{
    public void execute(byte[] key, ColumnFamily cf)
    {
        // connect to local Cassandra instance
        CassandraServer client = new CassandraServer();
        client.set_keyspace("TriggerExample");

        // get user name
        byte[] userName = cf.getColumn("name".getBytes()).value();

        // insert the user id into the index
        ColumnParent parent = new ColumnParent("Index");
        byte[] userId = key;
        long timestamp = System.currentTimeMillis();
        Column indexedValueColumn = new Column(userId,
                 "1".getBytes(), new Clock(timestamp));
        client.insert(userName, parent, indexedValueColumn, ConsistencyLevel.ONE);
    }
}

It remains to specify that this trigger should listen on the “Users” column family. The following entry needs to be added to the cassandra.yaml file.

triggers:
    - name: UpdateIndex
      keyspace: TriggerExample
      column_family: Users
      implementation: UpdateIndex

That’s it. The complete example source code along with appropriate scripts to run our example can be found in the directory contrib/trigger_example.

Currently our extension to Cassandra is under submission. In order to try Async triggers and this example, find the patch here: https://issues.apache.org/jira/browse/CASSANDRA-1311

Written by maxgrinev

July 23, 2010 at 3:23 pm

Posted in Cassandra

Extending Cassandra with Asynchronous Triggers

with 17 comments

[This post is written by Maxim Grinev and Martin Hentschel on the work done at Systems Group @ ETH Zurich]

Motivation

Latency matters! Amazon found every 100ms of latency cost them 1% in sales. Google found an extra .5 seconds in search page generation time dropped traffic by 20%. Even the smallest delay kills user satisfaction. You can find more on the importance of latency here and here.

Underlying database significantly contribute to the overall application response time, especially on update operations as they cannot be sped up via caching.  A common approach to reduce the database-imposed latency is to break a request into sub-operations which are executed asynchronously so that the application will acknowledge the request  without waiting to ensure that all the sub-operations are executed. This approach allows for tremendous latency reduction and also makes the latency guaranteed even when sub-operation execution time varies significantly. The implication of asynchronous execution is eventual consistency of data. But eventually consistency is already a common practice that meet requirements of many Web applications.

Asynchronous execution is typically implemented using queues in front of a database. Yet, integrating asynchronous execution into a database provides a number of benefits:

  1. Much easier programming model and simplified system management – the developer/administrator has to deal with one system (i.e. database) instead of two (i.e. queue and database)
  2. Asynchronous execution can be used to automate internal database operations with reduced latency – for example, updating indexes.

We have extended Cassandra database system with support for Asynchronous Triggers (CASSANDRA-1311). Asynchronous triggers is a basic mechanism that can be used to implement various use cases of asynchronous execution of application code at database side. Cassandra and Async triggers are a perfect match as they both exploit advantages out of eventual data consistency.

Cassandra Async Triggers

Like traditional database triggers, Cassandra Async trigger is a procedure that is automatically executed by the database in response to certain events on a particular database object (e.g. table or view). The distinguishing feature of Async trigger is that the database responds to the client on successful update execution without waiting for triggers to be executed, thus reducing response latency.

More precisely, Cassandra Async triggers can be described as follows:

  • A trigger is set on a column family and is executed in case of any update to the column family. Cassandra triggers are “after” triggers. A trigger is executed after the update operation that fires the trigger and can see the results of the update.
  • Trigger procedures are implemented in Java. The application developer implements the execute method of ITrigger interface.
  • Cassandra Async triggers are mutation-level triggers. A trigger is executed for each mutation issued to the column family. For example, if a trigger is fired by batch_mutate it will be executed for each mutation separately. Each trigger is parametrized by the mutation that fires it.
  • In contrast to traditional triggers, which are synchronous, Cassandra Async triggers are asynchronous. The database acknowledges update execution to the client after the update is executed and the fired triggers are submitted for execution. Actual execution of fired triggers happens after the acknowledgement to the client. It allows saving latency but leads to eventual consistency of data.
  • Our implementation guarantees triggers to be executed at least once. The system is responsible for handling failures during trigger execution so that the client does need to care about it.  It makes client-side code easier to develop and maintain. Note that triggers may be executed more than once that requires triggers to be idempotent. This is a requirement to any Cassandra update though. We chose not to implement exactly-once semantics because it introduces essential overhead.

Applications

Async triggers is a powerful mechanism that allows for many interesting applications:

1) Index and Materialized View Support
Cassandra does not support secondary indexes. To index data other than by record key or column name, the application developer needs to store data redundantly in a separate column family. In this case triggers can be used to update secondary indexes whenever records in the original column family are modified.

Similar to indexes, materialized views store data redundantly. So they also can be supported via triggers. Find general guidelines on managing redundant data (indexes and materialized views) in Cassandra in my post “Do You Really Need SQL to Do It All in Cassandra?”.

Implementing index and view maintenance synchronously means coupling update and maintenance operations together. This will increase response time of each update operation depending on the number of indexes. Using an asynchronous mechanism will keep the response time constant.

2) Online Analytics
Async triggers can be effectively used to propagate changes (in near real-time) from the “operational” part of the database to the “analytical” part without reducing response time for operational updates. It opens up a way for many new interesting applications! Note that delay in the data propagation caused by asynchronous triggers is negligible to the majority of analytical applications. It is online (near real-time) analytics in comparison, for example, with offline MapReduce analytics.

3) Specific application: Data propagation in social networks
Data propagation in social networks (e.g. sending user’s posts to all her friends) are proven to be only scalable with push-on-change model (i.e. redundant data propagation via updates with simple queries) in comparison with pull-on-demand model (i.e. combining data from normalized storage on demand via queries). Twitter uses the push-on-change model: it employs queues to implement asynchronous data propagation. Using just a few triggers one can easily implement production version of a Twitter-like network.

Asynchronous triggers is submitted as a contribution to Cassandra. You can find it at CASSANDRA-1311

Written by maxgrinev

July 23, 2010 at 3:22 pm

Posted in Cassandra

Update Idempotency: Why It is Important in Cassandra Applications

with 16 comments

When you develop application for Cassandra you should be aware of the following fact. Even when client observes the failure of an update it is still possible that this update has been executed successfully. The cause of such anomalous behavior is that Cassandra does not support transactional rollback. But please do not rush with judgements. There is a reason for that. Cassandra is a distributed database so rollback requires support for distributed transactions that have a performance cost and do not scale well. So if you want it fast and scalable you have to handle this anomaly.

Before we discuss a solution let’s look at examples when the anomaly might happen:

  1. In Cassandra data are (usually) replicated. You can specify how many replicas must be successfully updated to consider the whole update to be successful (it allows making update writable even in case when some of replica nodes are down or not accessible due to network partitioning). Still if less nodes confirm updates than required Cassandra will return an error to the application saying the write failed. But it will not cleanup/rollback updates from those nodes where they were successfully executed. For example, if the replication factor is 3, the required number of nodes to be updated 2, and only 1 node was actually updated, the application will receive an error. Despite of the error subsequent reads will see the update. Moreover, the mechanism of eventual update propagation (i.e. read repair that is triggered on the first subsequent read) will update other replicas with a value that was actually deemed a failed write to the client. Note that it should not happen often as Cassandra does go out of its way (via gossip-based node failure detection) to make sure the cluster is healthy enough to executed the update. But it is possible.
  2. The whole update can be successfully executed but the return message is lost.

The idea is that you retry the failed update until it is successful. As a result, the same update can be executed several times! If the update increments a counter the counter value gets incorrect. Here we come to the main point of this post: all your updates should be idempotent (i.e. repeated update applications have the same effect as one). Designing updates to be idempotent is the standard discipline to cope with repeated updates. Read great articles Life beyond Distributed Transactions: an Apostate’s Opinion and Building on Quicksand by Pat Helland that stress the importance of idempotent updates in highly scalable systems.

To ensure idempotence (i.e. guarantee the processing of retried updates is harmless) the database should be designed to remember that the update has been processed. Typically it can be achieved by storing identifiers which can be used to uniquely identify an update. For example, suppose we want to count how many times each URL has been posted on Twitter.  Instead of just storing the mapping of URLs to counters (i.e. column family URL_statistics where each record has an URL as a key and a single column having counter as its value) a solution can be to store the mapping of each URL to the IDs of the tweets which contains the URL (i.e. column family URL_Tweets where each record has an URL as a key, columns representing tweets, column names are the tweet IDs, and column values are not used). URL counters will then be computed on retrieval by counting tweet IDs.

It is a good idea to store tweet IDs as column names so that Cassandra automatically eliminates duplicates – repeated update will be ignored in this case (use this great Cassandra feature to make your updates idempotent!). You can easily come up with other options to design the database but this way or another you have to store some information to ensure update idempotence.

It is common that Cassandra applications are not initially designed for idempotence. At first, small scale deployments do not exhibit these subtle problems and work fine. Only as time passes and their deployments expand the problems manifest and the applications respond to handle them. Do it right from the beginning.

Written by maxgrinev

July 12, 2010 at 1:02 am

Posted in Cassandra

Do You Really Need SQL to Do It All in Cassandra?

with 78 comments

NoSQL database systems are designed for scalability. The down side of that is a primitive key-value data model and, as the name suggest, no support for SQL. It might sound like a serious limitation – how can I “select”, “join”, “group” and “sort” the data? This post explains how all these operations can be quite naturally and efficiently implemented in one of the most famous NoSQL system – Cassandra.

To understand this post you need to know the Cassandra data model. You can find a quick introduction in my previous post. The power of the Cassandra data model is that it extends a basic key-value store with efficient data nesting (via columns and super columns). It means that you can read/update a column (or a super column) without retrieving the whole record. Below I describe how we can exploit data nesting to support various query operations.

Let’s consider a basic example: departments and employees with one-to-many relationships respectively.  So we have two column families: Emps and Deps. In Emps employee IDs are used as keys and there are Name, Birthdate, and City columns. In Deps keys are department IDs and the single column is Name.

1) Select
For example: select * from Emps where Birthdate = '25/04/1975'
To support this query we need to add one more column family named Birthdate_Emps in which key is a date and column names are IDs of those employees that were born on the date. The values are not used here and can be an empty byte array (denoted “-”). Every time when a new employee is inserted/deleted into/from Emps we need to update Birthdate_Emps. To execute the query we just need to retrieve all the columns for the key '25/04/1975' from Birthdate_Emps.

Notice that Birthdate_Emps is essentially an index that allows us to execute the query very efficiently. And this index is scalable as it is distributed across Cassandra nodes. You can go even further to speed up the query by redundantly storing information about employees (i.e. employee’s columns from Emps) in Birthdate_Emps. In this case employee IDs becomes names of super columns that contain corresponding employee columns.

2) Join
For example: select * from Emps e, Deps d where e.dep_id = d.dep_id
What does join essentially mean? It constructs records that represent relationship between entities. Such relationships can be easily (and even more naturally) represented via nesting. To do that add column family Dep_Emps in which key is a department ID and column names are IDs of the corresponding employees.

3) Group By
For example: select count(*) from Emps group by City
From implementation viewpoint Group By is very similar to select/indexing described above. You just need to add a column family City_Emps with cities as keys and employee IDs as column names. In this case you will count the number of employees on retrieval. Or you can have a single column named count which value is the pre-calculated number of employees in the city.

4) Order By
To keep data sorted in Cassandra you can use two mechanisms: (a) records can be sorted by keys using OrderPreservingPartitioner with range queries (more on this in Cassandra: RandomPartitioner vs OrderPreservingPartitioner). To keep nested data sorted you can use automatically supported ordering for column names.

To support all these operations we store redundant data optimized for each particular query. It has two implications:
1) You must know queries in advance (i.e. no support for ad-hoc queries). However, typically in Web applications and enterprise OLTP applications queries are well known in advance, few in number, and do not change often. Read Mike Stonebraker convincingly talking about that.  BTW, Constraint Tree Schema, described in the latter paper, also exploits nesting to organize data for predefined queries.

2) We shift the burden from querying to updating because what we essentially do is supporting materialized views (i.e. pre-computed results of queries). But it makes a lot of sense in case of using Cassandra as Cassandra is very much optimized for updates (thanks to eventual consistency and “log-structured” storage borrowed from Google BigTable). So we can use fast updates to speed up query execution. Moreover, use-cases typical for social applications are proven to be only scalable with push-on-change model (i.e. preliminary data propagation via updates with simple queries – the approach taken in this post) in comparison with pull-on-demand model (i.e. data are stored normalized and combined by queries on demand – classical relational approach). On push-on-change versus pull-on-demand read WHY ARE FACEBOOK, DIGG, AND TWITTER SO HARD TO SCALE?

Written by maxgrinev

July 12, 2010 at 12:56 am

Posted in Cassandra

A Quick Introduction to the Cassandra Data Model

with 101 comments

Further reading: for an in-depth introduction see Understanding the Cassandra Data Model at datastax.com

For newcomers Cassandra data model is a mess. Even experienced database developers spend quite a bit of time learning it. There are great articles on the Web that explain the model. Read WTF is a SuperColumn? An Intro to the Cassandra Data Model and my favorite one – Installing and using Apache Cassandra With Java. This blog post is my take to explain Cassandra model to those who would like to understand the key ideas in 15 minutes or less.

In a nutshell, Cassandra data model can be described as follows:

1) Cassandra is based on a key-value model

A database consists of column families. A column family is a set of key-value pairs. I know the terminology is confusing but so far it is just basic key-value model. Drawing an analogy with relational databases, you can think about column family as table and a key-value pair as a record in a table.

2) Cassandra extends basic key-value model with two levels of nesting

At the first level the value of a record is in turn a sequence of key-value pairs. These nested key-value pairs are called columns where key is the name of the column. In other words you can say that a record in a column family has a key and consists of columns. This level of nesting is mandatory – a record must contain at least one column (so in the first point above value of a record was an intermediate notion as value is actually a sequence of columns).

At the second level, which is arbitrary, the value of a nested key-value pair can be a sequence of key-value pairs as well. When the second level of nesting is presented, outer key-value pairs are called super columns with key being the name of the super column and inner key-value pairs are called columns.

3) The names of both columns and super columns can be used in two ways: as names or as values (usually reference value).

First, names can play the role of attribute names. For example, the name of a column in a record about User can be Email. That is how we used to think about columns in relational databases.

Second, names can also be used to store values! For example, column names in a record which represent Blog can be identifiers of the posts of this blog and the corresponding column values are posts themselves. You can really use column (or super column) names to store some values because (a) theoretically there is no limitation on the number of columns (or super columns) for any given record and (b) names are byte arrays so that you can encode any value in it.

4) Columns and super columns are stored ordered by names.

You can specify sorting behavior by defining how Cassandra treats the names of (super) columns (recall that a name is just an byte array). Name can be treated as Bytes Type, Long Type, Ascii Type, UTF8 Type, Lexical UUID Type, Time UUID Type.

So now you know everything you need to know. Let’s consider an classical :) example of Twitter database to demonstrate the points.

Column family Tweetscontains records representing tweets. The key of a record is of Time UUID type and generated when the tweet is received (we will use this feature in User_Timelines column family below). The records consist of columns (no super columns here). Columns simply represent attributes of tweets. So it is very similar to how one would store it in a relational database.

The next example is User_Timelines (i.e. tweets posted by a user). Records are keyed by user IDs (referenced by User_ID columns in Tweets column family). User_Timelines demonstrates how column names can be used to store values – tweet IDs in this case. The type of column names is defined as Time UUID. It means that tweets IDs are kept ordered by the time of posting. That is very useful as we usually want to show the last N tweets for a user. Values of all columns are set to an empty byte array (denoted “-“) as they are not used.

To demonstrate super columns let us assume that we want to collect statistics about URLs posted by each user. For that we need to group all the tweets posted by a user by URLs contained in the tweets. It can be stored using super columns as follows.

In User_URLs the names of the super columns are used to store URLs and the names of the nested columns are the corresponding tweet IDs.

Important note: currently Cassandra automatically supports indexes for column names but does not support indexes for the names of super columns. In our example it means that you cannot efficiently retrieve/update tweet ids by URL.

[Update: The above note is incorrect! It is subcolumn names that are not indexed inside super columns. Supercolumn names are always indexed. It is a great news as it enables the use-case of data denormalization to speed up queries. For more on this, find the first comment by Jonathan Ellis below. I cover denormalization use-cases in my next post.]

Let me know if I missed anything or something is unclear.

Written by maxgrinev

July 9, 2010 at 9:52 pm

Posted in Uncategorized

Follow

Get every new post delivered to your Inbox.

Join 25 other followers