Creating a Small-Scale Cassandra Cluster

My last post started a discussion about various possible ways to improve the Nova Scheduler, so I thought that I’d start putting together a proof-of-concept for the solution I proposed, namely, using Cassandra as the data store and replication mechanism. But there’s a problem: not everyone has the experience to set up Cassandra, so I thought I’d show you what I did. I’m using 3 small cloud instances on Digital Ocean, but you could set this up with some local VMs, too.

We’ll create 3 512MB droplets (that’s their term for VMs). The 512MB size is the smallest they offer (hey, this is POC, not production!). I named mine ‘cass0’, ‘cass1’, and ‘cass2’. Choose a region near you, and in the “Select Image” section, click on the “Applications” tab. In the lower right side of the various options, you should see one for Docker (as of this writing, it’s “Docker 1.8.3 on 14.04”). Select that, and then below that select the “Private Networking” option; this will allow your Cassandra nodes to communicate more efficiently with each other. Add your SSH key, and go! In about a minute the instances should be ready, so click on their name to get to the instance information page. Click the word ‘Settings’ along the left side of the page, and you will see both the public and private IP addresses for that instance. Record those, as we’ll need them in a bit. I’ll refer to them as $IP_PRIVn for the instance cass(n); e.g., $IP_PRIV2 is the private IP address for cass2.

If you are using something other than Digital Ocean, such as Virtual Box or Rackspace or anything else, and you don’t have access to an image with Docker pre-installed, you’ll have to install it using either sudo apt-get install docker-engine or sudo yum install docker-engine.

Once the droplets are running, ssh into them (I use cssh to make this easier), and run the usual apt-get updates to pull all the security fixes. Reboot. Reconnect to each droplet, and then grab the latest Cassandra image for Docker by running: docker pull cassandra:latest. [EDIT – I realized that without using volumes, restarting the node would lose all the data. So here are the corrected steps.] Then you’ll create directories to use for Cassandra’s data and logs:

mkdir data
mkdir log

To set up your Cassandra cluster, first ssh into the cass0 instance. Then run the following to create your container:

docker run --name node0 
    -v data:/var/lib/cassandra 
    -v log:/var/log/cassandra 
    -e CASSANDRA_BROADCAST_ADDRESS=$IP_PRIV0 
    -p 9042:9042 -p 7000:7000 
    -d cassandra:latest

If you’re not familiar with Docker, what this does is create a container with the name ‘node0’ from the image cassandra:latest. It creates two volumes (the sections beginning with the -v parameter: the first maps the local ‘data’ directory to the container’s ‘/var/lib/cassandra’ directory (where Cassandra stores its data), and the second maps the local ‘log’ directory to where Cassandra would normally write its logs. It passes in the private IP address in environment variable CASSANDRA_BROADCAST_ADDRESS; in Cassandra, the broadcast address is what that node should use to communicate. It also opens 2 ports: 9042 (the CQL query port) and 7000 (for intra-cluster communication). Now run docker ps -a to verify that the container is up and running.

For the other two nodes, you do something similar, but you also specify the CASSANDRA_SEEDS parameter to tell them how to join the cluster; this is the private IP address of the first node you just created. On cass1, run:

docker run --name node1 
    -v data:/var/lib/cassandra 
    -v log:/var/log/cassandra 
    -e CASSANDRA_BROADCAST_ADDRESS=$IP_PRIV1 
    -e CASSANDRA_SEEDS=$IP_PRIV0 
    -p 9042:9042 -p 7000:7000 
    -d cassandra:latest

Then on cass2 run:

docker run --name node2 
    -v data:/var/lib/cassandra
    -v log:/var/log/cassandra 
    -e CASSANDRA_BROADCAST_ADDRESS=$IP_PRIV2 
    -e CASSANDRA_SEEDS=$IP_PRIV0 
    -p 9042:9042 -p 7000:7000 
    -d cassandra:latest

That’s it! You have a working 3-node Cassandra cluster. Now you can start playing around with it for your tests. I’m using the Python library for Cassandra to connect to my cluster, which you can install with pip install cassandra-driver. But working with that is the subject for another post in the future!

Re-imagining the Nova Scheduler

The Problem

OpenStack is a distributed, asynchronous system, and much of the difficulty in designing such a system is keeping the data about the state of the various components up-to-date and available across the entire system. There are several examples of this, but as I’m most familiar with the scheduler, let’s consider that and the data it needs in order to fulfill its role.

The Even Bigger Problem

There is no way that Nova could ever incrementally adopt a solution like this. It would require changing a huge part of the way things currently work all at once, which is why I’m not writing this as a spec, as it would generate a slew of -2s immediately. So please keep in mind that I am fully aware of this limitation; I only present it to help people think of alternative solutions, instead of always trying to incrementally refine an existing solution that will probably never get us where we need to be.

The Example: Nova Scheduler

The scheduler receives a request for resources, and then must select a provider of those resources (i.e., the host) that has the requested resources in sufficient amounts. It does this by querying the Nova compute_node table, and then updating an in-memory copy of that information with anything changed in the database. That means that there is a copy of the information in the compute node database held in memory by the scheduler, and that most of the queries it runs do not actually update anything, as the data doesn’t change that often. Then, once it has updated all of the hosts, it runs them through a series of filters to remove those that cannot fulfill the request. It then runs those that make it through the filters through a series of weighers to determine the best fit. This filtering and weighing process takes a small but finite amount of time, and while it is going on, other requests can be received and also processed in a similar manner. Once a host has been selected, a message is sent to the host (via the conductor, but let’s simplify things here) to claim the resources and build the requested virtual machine; this request can sometimes fail due to a classic race condition, where two requests for similar resources are received in a short period of time, and different threads handling the requests select the same host. To make things even more tenuous, in the case of cells each cell will have its own database, and keeping data synchronized across these cells can further complicate this process.

Another big problem with this is that it is Nova-centric. It assumes that a request has a flavor, which is comprised of RAM, CPU and ephemeral disk requirements, with possibly some other compute-related information. Work is being done now to create more generic Resource classes that the scheduler could use to allocate Cinder and Neutron resources, too. The bigger problem, though, is the sheer clumsiness of the design. Data is stored in one place, and each resource type will require a separate table to store its unique constraints. Then this data is perpetually passed around to the parts of the system that might need it. Updates to that data are likewise passed around, and a lot of code is in place to help insure that these different copies of the data stay in sync. The design of the scheduler is inherently racy, because in the case of multiple schedulers (or multiple threads of the same service), none of the schedulers has any idea what any of the others are doing. It is common for similar requests to come in close to each other, and thus likely that in those cases that the same host will be selected by different schedulers, since they are both using the same criteria to make that selection. In those cases, one request will build successfully, and the other will fail and have to be retried.

Current Direction

For the past year a great deal of work has been done to clean up the interface of the scheduler with nova, and there are also other thoughts on how we can improve the current design to make it work a little better. While these are steps in the right direction, it very much feels like we are ignoring the big problem: the overall design is wrong. We are trying to implement a technology solution that has already been implemented, and not doing a very good job of it. Sure, it’s way, way better than things were a few years ago, but it isn’t good enough for what we need, and it seems clear that it will never be better than “good enough” under the current design.

Proposal

I propose replacing all the internal communication that handles the distribution and synchronization of data among the various parts of Nova with a system that is designed to do this natively. Apache Cassandra is a mature, proven database that is a great fit for this problem domain. It is a masterless design, with all nodes capable of full read and write access. It also provide for extremely low overhead for writes, as well as low overhead for reads with correct data modeling. Its flexible data schemas will also enable the scheduler to support additional types of resources, not just compute as in the current design, without having to have different tables for each type. And since Cassandra is replicated across all clusters equally, different cells would be reading and writing to the same data, even with physically separate installations. Data updates are obviously not instant across the globe, but they are only limited by the connection speed.

Wait – a NoSQL database?

Well, yeah, but the NoSQL part isn’t the reason for suggesting Cassandra. It is the extremely fast, efficient replication of data across all clusters that makes it a great fit for this problem. The schemaless design of the database does have an advantage when it comes to the implementation, but many other products offer similar capabilities. It is the efficient replication combined with very high write capabilities that make it ideal.

Cassandra is used by some of the biggest sites in the world. It is the backbone of Apple’s AppStore and iTunes; Netflix uses Cassandra for its stream services database. And it is used by CERN and WalMart, two of the biggest OpenStack deployments.

Implementation

How would this work in practice? I have some ideas which I’ll outline here, but please keep in mind that this is not intended to be a full-on spec, nor is it the only possible design.

Resource Classes

Instead of limiting this to compute resources, we create the concept of resource type, and have each resource class define its properties. These will map to columns in the database, and give Cassandra’s schemaless design, will make representing different resource types much easier. There would be some columns in common with all resource types, and others that are specific to each type. The subclasses that define each resource type would enumerate their specific columns, as well as define the method for comparing to a request for that resource.

Resource Providers

Resources providers are what the scheduler schedules. In our example here, the resource provider is a compute node.

Compute Nodes

Compute nodes would write their state to the database when the compute service starts up, and then update that record whenever anything significant changes. There should also be a periodic update to make sure things are in sync, but that shouldn’t be as critical as it is in the current system. What the node will write will consist of the resources available on the node, along with the resource type of ‘compute’. When a request to build an instance is received, the compute node will find the matching claim record, and after creating the new instance delete that claim record and update its state with its current state. Similarly when an instance is destroyed, a write will update the record to reflect the newly-available resources. There will be no need for a compute node to use a Resource Tracker, as querying Cassandra for claim info will be faster and more reliable than trying to keep yet another object in sync.

Scheduler

Filters now work by comparing requested amounts of resources (for consumable resources) or host properties (for things like aggregates) with an in-memory copy of each compute node, and deciding if it meets the requirement. This is relatively slow and prone to race conditions, especially with multiple scheduler instances or threads. With this proposal, the scheduler will no longer maintain in-memory copies of HostState information. Instead, it will be able to query the data to find all hosts that match the requested resources, and then process those with additional filters if necessary. Each resource class will know its own database columns, and how to take a request object along with the enabled filters and turn it into the appropriate query. The query will return all matching resource providers, which can then be further processed by weighers in order to select the best fit. Note that in this design, bare metal hosts are considered a different resource type, so we will eliminate the need for the (host, node) tracking that is currently necessary to fit non-divisible resources into the compute resource model.

When a host is selected, the scheduler will write a claim record to the compute node table; this will be the same format as the compute node record, but with negative amounts to reflect reserved consumption. Therefore, at any time, the available resources on the host is the sum of the actual resources reported by the host along with any claims on that host. However, when writing the claim, a Lightweight Transaction can be used to ensure that another thread hasn’t already claimed resources on the compute node, or that the state of that node hasn’t changed in any other way. This will greatly reduce (and possibly eliminate) the frequency of retries due to threads racing with each other.

The remaining internal communication will remain the same. API requests will be passed to the conductor, which will hand them off to the scheduler. After the scheduler selects a host, it will send a message to that effect back to the conductor, which will then notify the appropriate host.

Summary

There is a distributed, reliable way to share data among disconnected systems, but for historical reasons, we do not use it. Instead, we have attempted to create a different approach and then tweak it as much as possible. It is my belief that these incremental improvements will not be sufficient to make this design work well enough, and that by making the hard decision now to change course and adopt a different design will make OpenStack better in the long run.