Fragmented Data

(This is a follow-up to my earlier post on Distributed Data)

One of the more interesting design sessions today at the OpenStack Design Summit was focused on Nova Cells V2, which is the effort to rework the way cells work in Nova. Briefly, cells are a mechanism for allowing separate independent deployments to work as a single cloud, primarily as a way to provide horizontal scalability. They also have other uses for operators, but that’s the main reason for them. And as separate deployments, they have their own API service, conductor service, message queue, and database. There are several advantages that this kind of independence offers, with failure isolation being one of the biggest. By this I mean that something goes wrong and a cell is unreachable, it doesn’t affect the performance of the remaining cells.

There are tradeoffs with any approach, and this one is no different. One glaring issue that came up at that session is that there is no simple way to get a global view of your cloud. The example that was discussed was the common case of listing all your instances, which would require querying each cell independently, aggregating the results, and then sorting the aggregated records. For small clouds this process is negligible, but as the size grows, so does the overhead and complexity. It is particularly problematic for something that requires multiple calls, like pagination. Let’s consider a site with thousands of instances spread across dozens of cells. Typically when querying a large list like that, the API will return the first few, and include a link for the next batch. With a fragmented database, this will require some form of centralized caching approach, or, if that’s not feasible or the cache is stale, re-running the same costly query, aggregation, and sorting process for each page of data requested. With that, any gain that might have been realized by separating the databases will be more than offset by a need for a way to efficiently recombine that data. This isn’t only a cost for more memory/CPU for the API service to handle the aggregation and caching, which will only need to be borne by the larger cloud operating companies. It is an ongoing cost of complexity to the developers and maintainers of the Nova codebase to handle this, and every new part of Nova will be similarly difficult to fit.

There are other places where this fragmented database design will cause complexity, such as having the Scheduler require a database connection to every cell, and then query every cell on each request, followed by aggregating the results… see the pattern? Splitting a database to improve performance, or sharding, only makes sense if you shard along a line that logically separates the data so that each shard can be queried efficiently. We’re not doing that in the design of cells.

It’s not too late. There is a project that makes minimal changes to the oslo.db driver to allow replacing the SQLAlchemy and MySQL database that underpins Nova with a distributed database (they used Redis, but it doesn’t depend on Redis). It should really be investigated further before we create a huge pile of technical and design debt by fragmenting the data in Nova.

Distributed Data and Nova

Last year I wrote about the issues I saw with the design of the Nova Scheduler, and put forth a few proposals that I felt would address those issues. I’m not going to rehash them in depth here, but summarize instead:

  • The choice of having the state of compute nodes copied back to the scheduler over RPC was the source of the raciness observed when more than one scheduler was running. It would be better to have a database be the single source of truth.
  • The scheduler was created specifically for selecting hosts based on basic characteristics of VMs: RAM, disk, and VCPU. The growth of virtualization, though, has meant that we now need to select based on myriad other qualities of a host, and those don’t fit into the original ‘flavor’-based design. We could address that by creating Resource classes that encapsulated the knowledge of a resource’s characteristics, and which also “knew” how to both write the state of that resource to the database, and generate the query for selecting that resource from the database.
  • Nova spends an awful lot of effort trying to move state around, and to be honest, it doesn’t do it all that well. Instead of trying to re-invent a distributed data store, it should use something that is designed to do it, and which does it better than anything we could come up with.

But I’m pleased to report that some progress has been made, although not exactly in the manner that I believe will solve the issues long-term. True, there are now Resource classes that encapsulate the differences between different resources, but because the solution assumed that an SQL database was the only option, the classes reflect an inflexible structure that SQL demands. The process of squeezing all these different types of things into a rigid structure was brilliantly done, though, so it will most likely do just what is needed. But there is a glaring hole: the lack of a distributed data system. Until that issue is addressed, Nova developers will spend an inordinate amount of time trying to create one, and working around the limitations of an incomplete solution to this problem. Reading Chris Dent’s blog post on generic resource pools made this problem glaringly apparent to me: instead of a single, distributed data store, we are now making several separate databases: one in the API layer for data that applies across the cells, and a separate cell database for data that is just in that cell. And given that design choice, Chris is thinking about having a scheduler whose design mirrors that choice. This is simply adding complexity to deal with the complexity that has been added at another layer. Tracking the state of the cloud will now require knowing what bit of data is in which database, and I can guarantee you that as we move forward, this separation will be constantly changing as we run into situations where the piece of data we need is in the wrong place.

When I wrote last year, in the blog posts and subsequent mailing list discussions, I think the fatal mistake that I made was offering a solution instead of just outlining the problem. If I had limited it to “we need a distributed data store”, instead of “we need a distributed data store like Apache Cassandra“, I think much of the negative reaction could have been avoided. There are several such products out there, and who knows? Maybe one of them would be a much better solution than Cassandra. I only knew that I had gotten a proof-of-concept working with Cassandra, so I wanted to let everyone know that it was indeed possible. I was hoping that others would then present their preferred solution, and we could run a series of tests to evaluate them. And while several people did start discussing their ideas, the majority of the community heard ‘Cassandra’, which made them think ‘Java’, which soured the entire proposal in their minds.

So forget about Cassandra. It’s not the important thing. But please consider some distributed database for Nova instead of the current design. What does that design buy us, anyway? Failure isolation? So that if a cell goes down or is cut off from the internet, the rest can still continue? That’s exactly what distributed databases are designed to handle. Scalability? I doubt you could get much more scalable than Cassandra, which is used to run, among other things, Netflix and the Apple App Store. I’m sure that other distributed DBs scale as well or better than MySQL. And with a distributed DB, you can then drop the notion of a separate API database and separate cell databases that all have to coordinate with each other to get the information they need, and you can avoid the endless discussions about, say, whether the RequestSpec (the data representing a request to build a VM) belongs in the API layer (since it was received there) or in the cell DB (since that’s where the instance associated with it lives). The data is in the database. Write to it. Query it. Stop making things more complicated than they need to be.

Moving Forward (carefully)

It’s a classic problem in software development: how to change a system to make it better without breaking existing deployments. That’s the battle that comes up regularly in the OpenStack ecosystem, and there aren’t any simple answers.

On the one hand, you’ve released software that has a defined interface: if you call a particular API method with certain values, you expect a particular result. If one day making that exact same call has a different result, users will be angry, and rightfully so.

On the other hand, nobody ever releases perfect software. Maybe the call described above works, but does so in a very unintuitive way, and confuses a lot of new users, causing them a great deal of frustration. Or maybe a very similar call gives a wildly different result, surprising users who didn’t expect it. We could just leave them as is, but that isn’t a great option. The idea of iterative software is to constantly make things better with each release.

Enter microversions: a controlled, opt-in approach to revising the API. If this is a new concept, read Sean Dague’s excellent summary of microversions. The concept is simple enough: the API won’t ever change, unless you explicitly ask it to. Let’s take the example of an inconsistent API call that we want to make consistent with other similar calls: we make the change, bump the microversion (let’s call this microversion number 36, just for example), and we’re done! Existing code that relies on the old behavior continues to work, but anyone who wants to take advantage of the improved API just has to specify that they want to use microversion 36 or later in their request header, and they get the new behavior. Done! What could be simpler?

Well, there are potential problems. Let’s continue with the example above, and assume that later on some really cool new feature is added to the API. Let’s assume that this is added in microversion 42. A user who might want to use this new feature sets their headers to request microversion 42, but now they may have a problem if other code still expects the inconsistent call that existed in pre-36 versions of the API. In other words, moving to a new microversion to get one specific change requires that you also accept all of the changes that were added before that one!

In my opinion, that is a very small price to pay. Each microversion change has to be documented with a release note explaining the change, so before you jump into microversion 42, you have ample opportunity to learn what has changed in microversions 2-41, too. We really can’t spend too much mental effort on protecting the people who can’t be bothered to read the release notes, as the developers and reviewers have gone to great lengths to make sure that these changes are completely visible to anyone who cares to make the effort. We can’t assume that the way we did something years ago is going to work optimally forever; we need to be able to evolve the API as computing in general evolves, too. Static is just another word for ‘dead’ in this business. So let’s continue to provide a sane, controlled path forward for our users, and yes, it will take a little effort on their part, too. That’s perfectly OK.

Creating a Small-Scale Cassandra Cluster

My last post started a discussion about various possible ways to improve the Nova Scheduler, so I thought that I’d start putting together a proof-of-concept for the solution I proposed, namely, using Cassandra as the data store and replication mechanism. But there’s a problem: not everyone has the experience to set up Cassandra, so I thought I’d show you what I did. I’m using 3 small cloud instances on Digital Ocean, but you could set this up with some local VMs, too.

We’ll create 3 512MB droplets (that’s their term for VMs). The 512MB size is the smallest they offer (hey, this is POC, not production!). I named mine ‘cass0’, ‘cass1’, and ‘cass2’. Choose a region near you, and in the “Select Image” section, click on the “Applications” tab. In the lower right side of the various options, you should see one for Docker (as of this writing, it’s “Docker 1.8.3 on 14.04”). Select that, and then below that select the “Private Networking” option; this will allow your Cassandra nodes to communicate more efficiently with each other. Add your SSH key, and go! In about a minute the instances should be ready, so click on their name to get to the instance information page. Click the word ‘Settings’ along the left side of the page, and you will see both the public and private IP addresses for that instance. Record those, as we’ll need them in a bit. I’ll refer to them as $IP_PRIVn for the instance cass(n); e.g., $IP_PRIV2 is the private IP address for cass2.

If you are using something other than Digital Ocean, such as Virtual Box or Rackspace or anything else, and you don’t have access to an image with Docker pre-installed, you’ll have to install it using either sudo apt-get install docker-engine or sudo yum install docker-engine.

Once the droplets are running, ssh into them (I use cssh to make this easier), and run the usual apt-get updates to pull all the security fixes. Reboot. Reconnect to each droplet, and then grab the latest Cassandra image for Docker by running: docker pull cassandra:latest. [EDIT – I realized that without using volumes, restarting the node would lose all the data. So here are the corrected steps.] Then you’ll create directories to use for Cassandra’s data and logs:

mkdir data
mkdir log

To set up your Cassandra cluster, first ssh into the cass0 instance. Then run the following to create your container:

docker run --name node0 
    -v data:/var/lib/cassandra 
    -v log:/var/log/cassandra 
    -e CASSANDRA_BROADCAST_ADDRESS=$IP_PRIV0 
    -p 9042:9042 -p 7000:7000 
    -d cassandra:latest

If you’re not familiar with Docker, what this does is create a container with the name ‘node0’ from the image cassandra:latest. It creates two volumes (the sections beginning with the -v parameter: the first maps the local ‘data’ directory to the container’s ‘/var/lib/cassandra’ directory (where Cassandra stores its data), and the second maps the local ‘log’ directory to where Cassandra would normally write its logs. It passes in the private IP address in environment variable CASSANDRA_BROADCAST_ADDRESS; in Cassandra, the broadcast address is what that node should use to communicate. It also opens 2 ports: 9042 (the CQL query port) and 7000 (for intra-cluster communication). Now run docker ps -a to verify that the container is up and running.

For the other two nodes, you do something similar, but you also specify the CASSANDRA_SEEDS parameter to tell them how to join the cluster; this is the private IP address of the first node you just created. On cass1, run:

docker run --name node1 
    -v data:/var/lib/cassandra 
    -v log:/var/log/cassandra 
    -e CASSANDRA_BROADCAST_ADDRESS=$IP_PRIV1 
    -e CASSANDRA_SEEDS=$IP_PRIV0 
    -p 9042:9042 -p 7000:7000 
    -d cassandra:latest

Then on cass2 run:

docker run --name node2 
    -v data:/var/lib/cassandra
    -v log:/var/log/cassandra 
    -e CASSANDRA_BROADCAST_ADDRESS=$IP_PRIV2 
    -e CASSANDRA_SEEDS=$IP_PRIV0 
    -p 9042:9042 -p 7000:7000 
    -d cassandra:latest

That’s it! You have a working 3-node Cassandra cluster. Now you can start playing around with it for your tests. I’m using the Python library for Cassandra to connect to my cluster, which you can install with pip install cassandra-driver. But working with that is the subject for another post in the future!

Re-imagining the Nova Scheduler

The Problem

OpenStack is a distributed, asynchronous system, and much of the difficulty in designing such a system is keeping the data about the state of the various components up-to-date and available across the entire system. There are several examples of this, but as I’m most familiar with the scheduler, let’s consider that and the data it needs in order to fulfill its role.

The Even Bigger Problem

There is no way that Nova could ever incrementally adopt a solution like this. It would require changing a huge part of the way things currently work all at once, which is why I’m not writing this as a spec, as it would generate a slew of -2s immediately. So please keep in mind that I am fully aware of this limitation; I only present it to help people think of alternative solutions, instead of always trying to incrementally refine an existing solution that will probably never get us where we need to be.

The Example: Nova Scheduler

The scheduler receives a request for resources, and then must select a provider of those resources (i.e., the host) that has the requested resources in sufficient amounts. It does this by querying the Nova compute_node table, and then updating an in-memory copy of that information with anything changed in the database. That means that there is a copy of the information in the compute node database held in memory by the scheduler, and that most of the queries it runs do not actually update anything, as the data doesn’t change that often. Then, once it has updated all of the hosts, it runs them through a series of filters to remove those that cannot fulfill the request. It then runs those that make it through the filters through a series of weighers to determine the best fit. This filtering and weighing process takes a small but finite amount of time, and while it is going on, other requests can be received and also processed in a similar manner. Once a host has been selected, a message is sent to the host (via the conductor, but let’s simplify things here) to claim the resources and build the requested virtual machine; this request can sometimes fail due to a classic race condition, where two requests for similar resources are received in a short period of time, and different threads handling the requests select the same host. To make things even more tenuous, in the case of cells each cell will have its own database, and keeping data synchronized across these cells can further complicate this process.

Another big problem with this is that it is Nova-centric. It assumes that a request has a flavor, which is comprised of RAM, CPU and ephemeral disk requirements, with possibly some other compute-related information. Work is being done now to create more generic Resource classes that the scheduler could use to allocate Cinder and Neutron resources, too. The bigger problem, though, is the sheer clumsiness of the design. Data is stored in one place, and each resource type will require a separate table to store its unique constraints. Then this data is perpetually passed around to the parts of the system that might need it. Updates to that data are likewise passed around, and a lot of code is in place to help insure that these different copies of the data stay in sync. The design of the scheduler is inherently racy, because in the case of multiple schedulers (or multiple threads of the same service), none of the schedulers has any idea what any of the others are doing. It is common for similar requests to come in close to each other, and thus likely that in those cases that the same host will be selected by different schedulers, since they are both using the same criteria to make that selection. In those cases, one request will build successfully, and the other will fail and have to be retried.

Current Direction

For the past year a great deal of work has been done to clean up the interface of the scheduler with nova, and there are also other thoughts on how we can improve the current design to make it work a little better. While these are steps in the right direction, it very much feels like we are ignoring the big problem: the overall design is wrong. We are trying to implement a technology solution that has already been implemented, and not doing a very good job of it. Sure, it’s way, way better than things were a few years ago, but it isn’t good enough for what we need, and it seems clear that it will never be better than “good enough” under the current design.

Proposal

I propose replacing all the internal communication that handles the distribution and synchronization of data among the various parts of Nova with a system that is designed to do this natively. Apache Cassandra is a mature, proven database that is a great fit for this problem domain. It is a masterless design, with all nodes capable of full read and write access. It also provide for extremely low overhead for writes, as well as low overhead for reads with correct data modeling. Its flexible data schemas will also enable the scheduler to support additional types of resources, not just compute as in the current design, without having to have different tables for each type. And since Cassandra is replicated across all clusters equally, different cells would be reading and writing to the same data, even with physically separate installations. Data updates are obviously not instant across the globe, but they are only limited by the connection speed.

Wait – a NoSQL database?

Well, yeah, but the NoSQL part isn’t the reason for suggesting Cassandra. It is the extremely fast, efficient replication of data across all clusters that makes it a great fit for this problem. The schemaless design of the database does have an advantage when it comes to the implementation, but many other products offer similar capabilities. It is the efficient replication combined with very high write capabilities that make it ideal.

Cassandra is used by some of the biggest sites in the world. It is the backbone of Apple’s AppStore and iTunes; Netflix uses Cassandra for its stream services database. And it is used by CERN and WalMart, two of the biggest OpenStack deployments.

Implementation

How would this work in practice? I have some ideas which I’ll outline here, but please keep in mind that this is not intended to be a full-on spec, nor is it the only possible design.

Resource Classes

Instead of limiting this to compute resources, we create the concept of resource type, and have each resource class define its properties. These will map to columns in the database, and give Cassandra’s schemaless design, will make representing different resource types much easier. There would be some columns in common with all resource types, and others that are specific to each type. The subclasses that define each resource type would enumerate their specific columns, as well as define the method for comparing to a request for that resource.

Resource Providers

Resources providers are what the scheduler schedules. In our example here, the resource provider is a compute node.

Compute Nodes

Compute nodes would write their state to the database when the compute service starts up, and then update that record whenever anything significant changes. There should also be a periodic update to make sure things are in sync, but that shouldn’t be as critical as it is in the current system. What the node will write will consist of the resources available on the node, along with the resource type of ‘compute’. When a request to build an instance is received, the compute node will find the matching claim record, and after creating the new instance delete that claim record and update its state with its current state. Similarly when an instance is destroyed, a write will update the record to reflect the newly-available resources. There will be no need for a compute node to use a Resource Tracker, as querying Cassandra for claim info will be faster and more reliable than trying to keep yet another object in sync.

Scheduler

Filters now work by comparing requested amounts of resources (for consumable resources) or host properties (for things like aggregates) with an in-memory copy of each compute node, and deciding if it meets the requirement. This is relatively slow and prone to race conditions, especially with multiple scheduler instances or threads. With this proposal, the scheduler will no longer maintain in-memory copies of HostState information. Instead, it will be able to query the data to find all hosts that match the requested resources, and then process those with additional filters if necessary. Each resource class will know its own database columns, and how to take a request object along with the enabled filters and turn it into the appropriate query. The query will return all matching resource providers, which can then be further processed by weighers in order to select the best fit. Note that in this design, bare metal hosts are considered a different resource type, so we will eliminate the need for the (host, node) tracking that is currently necessary to fit non-divisible resources into the compute resource model.

When a host is selected, the scheduler will write a claim record to the compute node table; this will be the same format as the compute node record, but with negative amounts to reflect reserved consumption. Therefore, at any time, the available resources on the host is the sum of the actual resources reported by the host along with any claims on that host. However, when writing the claim, a Lightweight Transaction can be used to ensure that another thread hasn’t already claimed resources on the compute node, or that the state of that node hasn’t changed in any other way. This will greatly reduce (and possibly eliminate) the frequency of retries due to threads racing with each other.

The remaining internal communication will remain the same. API requests will be passed to the conductor, which will hand them off to the scheduler. After the scheduler selects a host, it will send a message to that effect back to the conductor, which will then notify the appropriate host.

Summary

There is a distributed, reliable way to share data among disconnected systems, but for historical reasons, we do not use it. Instead, we have attempted to create a different approach and then tweak it as much as possible. It is my belief that these incremental improvements will not be sufficient to make this design work well enough, and that by making the hard decision now to change course and adopt a different design will make OpenStack better in the long run.