Archive for the ‘Cassandra’ Category
Managing Indexes in Cassandra using Async Triggers
[This post is written by Maxim Grinev and Martin Hentschel]
Suppose you are building a Cassandra application and you want to speed up your queries via indexing. Cassandra does not support secondary indexes at first, but storing redundant data (in a different layout) will give you the same effect. The main drawback is that your application (the code that writes to the DB) needs to take care of managing the index. Every time you write to the DB, you also need to maintain your index. This will notably slow down the response time of any user of your web application.
In Figure 1 whenever a user request forces a write to the database (Step 1), the application also updates the index. At least two operations to the database layer are needed, the response time of this request increases. The advantage of this approach is that your index is always in sync with the data. Any query to the index will see the latest result (Step 2).
Figure 1: Application inserts data and builds index in one step.
We recently proposed an extension to Cassandra, which we call Async triggers. An Async trigger will listen on a column family inside Cassandra. Whenever a modification to this column family is made, the trigger will be scheduled for an asynchronous execution. In our case, the logic to build and maintain the index shifts from the application to the trigger. This means the application has less work to do and can return faster. The response time of a user request will be reduced.
In Figure 2 a write to the database (Step 1) will return as soon as the write is finished. An Asynch trigger is scheduled to update the index. This trigger will run some time after the response to the client. This also means that for a short period of time, the data and the index will be out of sync (i.e. inconsistent). A query to the index (Step 2) might now always see the latest results. We believe that this is acceptable for many web applications. For example in Twitter it is totally fine if a search ignores tweets that have been posted less than a second ago.
Figure 2: An Asynch trigger maintains the index.
Of course you may get similar good response times using other architectures. For example if you explicitly use queues to separate writes to the database and maintenance of indexes. We are currently doing research on advantages and disadvantages of Async triggers with respect to such architectures.
Example: Index users by name
Here is a concrete example of how to index a user database not only by user id, but also by name (a secondary index). Figure 3 shows a possible layout of column families in Cassandra. The first column family “Users” stores data about users. Each row is naturally indexed by its row key (in our case it is the user id). The second column family “Index” stores redundant data to quickly retrieve users by their name. For example if you want to look up the name “Sue”, you will find two users with ids 2 and 4.
Figure 3: Database layout storing users by id and name.
Using an Async trigger to update the index whenever there is a write to the users column family works involves two steps. First, we need to implement the trigger and second, we need to specify the column family that the trigger will listen on.
To implement a trigger we will implement the execute method of the ITrigger
interface. First, we connect to the local Cassandra instance. The trigger will execute within Cassadra, no network overhead is involved. Then we will get the user name of the user that has just been inserted (or modified). The user id is provided by the key parameter. We can insert this user id into the respective user name row. (Note that I have removed any exception handling or null checks for ease of reading.)
public class UpdateIndex implements ITrigger { public void execute(byte[] key, ColumnFamily cf) { // connect to local Cassandra instance CassandraServer client = new CassandraServer(); client.set_keyspace("TriggerExample"); // get user name byte[] userName = cf.getColumn("name".getBytes()).value(); // insert the user id into the index ColumnParent parent = new ColumnParent("Index"); byte[] userId = key; long timestamp = System.currentTimeMillis(); Column indexedValueColumn = new Column(userId, "1".getBytes(), new Clock(timestamp)); client.insert(userName, parent, indexedValueColumn, ConsistencyLevel.ONE); } }
It remains to specify that this trigger should listen on the “Users” column family. The following entry needs to be added to the cassandra.yaml
file.
triggers: - name: UpdateIndex keyspace: TriggerExample column_family: Users implementation: UpdateIndex
That’s it. The complete example source code along with appropriate scripts to run our example can be found in the directory contrib/trigger_example
.
Currently our extension to Cassandra is under submission. In order to try Async triggers and this example, find the patch here: https://issues.apache.org/jira/browse/CASSANDRA-1311
Extending Cassandra with Asynchronous Triggers
[This post is written by Maxim Grinev and Martin Hentschel on the work done at Systems Group @ ETH Zurich]
Motivation
Latency matters! Amazon found every 100ms of latency cost them 1% in sales. Google found an extra .5 seconds in search page generation time dropped traffic by 20%. Even the smallest delay kills user satisfaction. You can find more on the importance of latency here and here.
Underlying database significantly contribute to the overall application response time, especially on update operations as they cannot be sped up via caching. A common approach to reduce the database-imposed latency is to break a request into sub-operations which are executed asynchronously so that the application will acknowledge the request without waiting to ensure that all the sub-operations are executed. This approach allows for tremendous latency reduction and also makes the latency guaranteed even when sub-operation execution time varies significantly. The implication of asynchronous execution is eventual consistency of data. But eventually consistency is already a common practice that meet requirements of many Web applications.
Asynchronous execution is typically implemented using queues in front of a database. Yet, integrating asynchronous execution into a database provides a number of benefits:
- Much easier programming model and simplified system management – the developer/administrator has to deal with one system (i.e. database) instead of two (i.e. queue and database)
- Asynchronous execution can be used to automate internal database operations with reduced latency – for example, updating indexes.
We have extended Cassandra database system with support for Asynchronous Triggers (CASSANDRA-1311). Asynchronous triggers is a basic mechanism that can be used to implement various use cases of asynchronous execution of application code at database side. Cassandra and Async triggers are a perfect match as they both exploit advantages out of eventual data consistency.
Cassandra Async Triggers
Like traditional database triggers, Cassandra Async trigger is a procedure that is automatically executed by the database in response to certain events on a particular database object (e.g. table or view). The distinguishing feature of Async trigger is that the database responds to the client on successful update execution without waiting for triggers to be executed, thus reducing response latency.
More precisely, Cassandra Async triggers can be described as follows:
- A trigger is set on a column family and is executed in case of any update to the column family. Cassandra triggers are “after” triggers. A trigger is executed after the update operation that fires the trigger and can see the results of the update.
- Trigger procedures are implemented in Java. The application developer implements the
execute
method ofITrigger
interface. - Cassandra Async triggers are mutation-level triggers. A trigger is executed for each mutation issued to the column family. For example, if a trigger is fired by
batch_mutate
it will be executed for each mutation separately. Each trigger is parametrized by the mutation that fires it. - In contrast to traditional triggers, which are synchronous, Cassandra Async triggers are asynchronous. The database acknowledges update execution to the client after the update is executed and the fired triggers are submitted for execution. Actual execution of fired triggers happens after the acknowledgement to the client. It allows saving latency but leads to eventual consistency of data.
- Our implementation guarantees triggers to be executed at least once. The system is responsible for handling failures during trigger execution so that the client does need to care about it. It makes client-side code easier to develop and maintain. Note that triggers may be executed more than once that requires triggers to be idempotent. This is a requirement to any Cassandra update though. We chose not to implement exactly-once semantics because it introduces essential overhead.
Applications
Async triggers is a powerful mechanism that allows for many interesting applications:
1) Index and Materialized View Support
Cassandra does not support secondary indexes. To index data other than by record key or column name, the application developer needs to store data redundantly in a separate column family. In this case triggers can be used to update secondary indexes whenever records in the original column family are modified.
Similar to indexes, materialized views store data redundantly. So they also can be supported via triggers. Find general guidelines on managing redundant data (indexes and materialized views) in Cassandra in my post “Do You Really Need SQL to Do It All in Cassandra?”.
Implementing index and view maintenance synchronously means coupling update and maintenance operations together. This will increase response time of each update operation depending on the number of indexes. Using an asynchronous mechanism will keep the response time constant.
2) Online Analytics
Async triggers can be effectively used to propagate changes (in near real-time) from the “operational” part of the database to the “analytical” part without reducing response time for operational updates. It opens up a way for many new interesting applications! Note that delay in the data propagation caused by asynchronous triggers is negligible to the majority of analytical applications. It is online (near real-time) analytics in comparison, for example, with offline MapReduce analytics.
3) Specific application: Data propagation in social networks
Data propagation in social networks (e.g. sending user’s posts to all her friends) are proven to be only scalable with push-on-change model (i.e. redundant data propagation via updates with simple queries) in comparison with pull-on-demand model (i.e. combining data from normalized storage on demand via queries). Twitter uses the push-on-change model: it employs queues to implement asynchronous data propagation. Using just a few triggers one can easily implement production version of a Twitter-like network.
Asynchronous triggers is submitted as a contribution to Cassandra. You can find it at CASSANDRA-1311
Update Idempotency: Why It is Important in Cassandra Applications
When you develop application for Cassandra you should be aware of the following fact. Even when client observes the failure of an update it is still possible that this update has been executed successfully. The cause of such anomalous behavior is that Cassandra does not support transactional rollback. But please do not rush with judgements. There is a reason for that. Cassandra is a distributed database so rollback requires support for distributed transactions that have a performance cost and do not scale well. So if you want it fast and scalable you have to handle this anomaly.
Before we discuss a solution let’s look at examples when the anomaly might happen:
- In Cassandra data are (usually) replicated. You can specify how many replicas must be successfully updated to consider the whole update to be successful (it allows making update writable even in case when some of replica nodes are down or not accessible due to network partitioning). Still if less nodes confirm updates than required Cassandra will return an error to the application saying the write failed. But it will not cleanup/rollback updates from those nodes where they were successfully executed. For example, if the replication factor is 3, the required number of nodes to be updated 2, and only 1 node was actually updated, the application will receive an error. Despite of the error subsequent reads will see the update. Moreover, the mechanism of eventual update propagation (i.e. read repair that is triggered on the first subsequent read) will update other replicas with a value that was actually deemed a failed write to the client. Note that it should not happen often as Cassandra does go out of its way (via gossip-based node failure detection) to make sure the cluster is healthy enough to executed the update. But it is possible.
- The whole update can be successfully executed but the return message is lost.
The idea is that you retry the failed update until it is successful. As a result, the same update can be executed several times! If the update increments a counter the counter value gets incorrect. Here we come to the main point of this post: all your updates should be idempotent (i.e. repeated update applications have the same effect as one). Designing updates to be idempotent is the standard discipline to cope with repeated updates. Read great articles Life beyond Distributed Transactions: an Apostate’s Opinion and Building on Quicksand by Pat Helland that stress the importance of idempotent updates in highly scalable systems.
To ensure idempotence (i.e. guarantee the processing of retried updates is harmless) the database should be designed to remember that the update has been processed. Typically it can be achieved by storing identifiers which can be used to uniquely identify an update. For example, suppose we want to count how many times each URL has been posted on Twitter. Instead of just storing the mapping of URLs to counters (i.e. column family URL_statistics
where each record has an URL as a key and a single column having counter as its value) a solution can be to store the mapping of each URL to the IDs of the tweets which contains the URL (i.e. column family URL_Tweets
where each record has an URL as a key, columns representing tweets, column names are the tweet IDs, and column values are not used). URL counters will then be computed on retrieval by counting tweet IDs.
It is a good idea to store tweet IDs as column names so that Cassandra automatically eliminates duplicates – repeated update will be ignored in this case (use this great Cassandra feature to make your updates idempotent!). You can easily come up with other options to design the database but this way or another you have to store some information to ensure update idempotence.
It is common that Cassandra applications are not initially designed for idempotence. At first, small scale deployments do not exhibit these subtle problems and work fine. Only as time passes and their deployments expand the problems manifest and the applications respond to handle them. Do it right from the beginning.
Do You Really Need SQL to Do It All in Cassandra?
NoSQL database systems are designed for scalability. The down side of that is a primitive key-value data model and, as the name suggest, no support for SQL. It might sound like a serious limitation – how can I “select”, “join”, “group” and “sort” the data? This post explains how all these operations can be quite naturally and efficiently implemented in one of the most famous NoSQL system – Cassandra.
To understand this post you need to know the Cassandra data model. You can find a quick introduction in my previous post. The power of the Cassandra data model is that it extends a basic key-value store with efficient data nesting (via columns and super columns). It means that you can read/update a column (or a super column) without retrieving the whole record. Below I describe how we can exploit data nesting to support various query operations.
Let’s consider a basic example: departments and employees with one-to-many relationships respectively. So we have two column families: Emps
and Deps
. In Emps
employee IDs are used as keys and there are Name
, Birthdate
, and City
columns. In Deps
keys are department IDs and the single column is Name
.
1) Select
For example: select * from Emps where Birthdate = '25/04/1975'
To support this query we need to add one more column family named Birthdate_Emps
in which key is a date and column names are IDs of those employees that were born on the date. The values are not used here and can be an empty byte array (denoted “-”). Every time when a new employee is inserted/deleted into/from Emps
we need to update Birthdate_Emps
. To execute the query we just need to retrieve all the columns for the key '25/04/1975'
from Birthdate_Emps
.
Notice that Birthdate_Emps
is essentially an index that allows us to execute the query very efficiently. And this index is scalable as it is distributed across Cassandra nodes. You can go even further to speed up the query by redundantly storing information about employees (i.e. employee’s columns from Emps
) in Birthdate_Emps
. In this case employee IDs becomes names of super columns that contain corresponding employee columns.
2) Join
For example: select * from Emps e, Deps d where e.dep_id = d.dep_id
What does join essentially mean? It constructs records that represent relationship between entities. Such relationships can be easily (and even more naturally) represented via nesting. To do that add column family Dep_Emps
in which key is a department ID and column names are IDs of the corresponding employees.
3) Group By
For example: select count(*) from Emps group by City
From implementation viewpoint Group By is very similar to select/indexing described above. You just need to add a column family City_Emps
with cities as keys and employee IDs as column names. In this case you will count the number of employees on retrieval. Or you can have a single column named count
which value is the pre-calculated number of employees in the city.
4) Order By
To keep data sorted in Cassandra you can use two mechanisms: (a) records can be sorted by keys using OrderPreservingPartitioner with range queries (more on this in Cassandra: RandomPartitioner vs OrderPreservingPartitioner). To keep nested data sorted you can use automatically supported ordering for column names.
To support all these operations we store redundant data optimized for each particular query. It has two implications:
1) You must know queries in advance (i.e. no support for ad-hoc queries). However, typically in Web applications and enterprise OLTP applications queries are well known in advance, few in number, and do not change often. Read Mike Stonebraker convincingly talking about that. BTW, Constraint Tree Schema, described in the latter paper, also exploits nesting to organize data for predefined queries.
2) We shift the burden from querying to updating because what we essentially do is supporting materialized views (i.e. pre-computed results of queries). But it makes a lot of sense in case of using Cassandra as Cassandra is very much optimized for updates (thanks to eventual consistency and “log-structured” storage borrowed from Google BigTable). So we can use fast updates to speed up query execution. Moreover, use-cases typical for social applications are proven to be only scalable with push-on-change model (i.e. preliminary data propagation via updates with simple queries – the approach taken in this post) in comparison with pull-on-demand model (i.e. data are stored normalized and combined by queries on demand – classical relational approach). On push-on-change versus pull-on-demand read WHY ARE FACEBOOK, DIGG, AND TWITTER SO HARD TO SCALE?
Cassandra for Enterprises
There is a huge hype about relational databases (MySQL) versus NoSQL systems (Cassandra). After the Digg’s successful migration in September 2009 and Twitter announcement in March 2010 the whole Web has become insanely mad about Cassandra. “NoSQL vs. RDBMS: Let the flames begin!” is just one of hundreds of blog posts on the topic.
As there is a number of successful migrations to Cassandra (Digg is not the only example) and the fact that Facebook has been using it for years we can conclude that it just works. On the other hand, Cassandra has clearly not reached its maturity yet (you can easily feel it by trying to install Cassandra and looking at their stop-server script). There is still not enough understanding of all prons and cons of using it. So how widely will it be adopted for Web applications is still an open question – for more on this read “I Can’t Wait for NoSQL to Die”.
It is interesting that from the very beginning of the NoSQL movement enterprise software companies pay attention also. Of cause they are not talking about Cassandra yet but they are definitely interested in the Amazon Dynamo’s principals on which Cassandra is based. For example, read “Principles for Inconsistency” from SAP. So we can expect NoSQL hype to expand to Enterprise Software soon. As Amazon Dynamo uses MySQL or Berkeley Database as a storage we might foresee Enterprise Version of Cassandra using Oracle or DB2 as a storage. Is it where we are going now? LOL