Measuring Throughput
Note: I'm trying out enabling comments. Behave! I reserve the right to disable them again for any reason including "the very idea of someone commenting made me anxious."
We talked recently about why it’s easy to use a closed-loop benchmark to inappropriately measure latency, and why an open-loop benchmark is a better choice for this, generally.
Today, let's try to measure throughput.
Consider a database (or similar system) that's a black box, including any queueing and request dispatching system. We don't have any insight into those things.
We'd like to know the maximum throughput (queries per second) that this database can handle. Assuming no funny business, this is a question that has an answer, more or less. If you fix the like, distribution on the individual queries themselves, there is a steady-state number of queries-per-second that we can handle.
Let's bring back our database model from last time.
type Database struct {
requests chan Request
}
func NewDatabase() Database {
return Database{
requests: make(chan Request),
}
}
// Clients call this function to simulate a request.
func (d *Database) Request() {
response := make(chan Response)
d.requests <- Request{response: response}
<-response
}
const maxConcurrentRequests = 5
// Run is a long-running function that processes requests.
func (d *Database) Run() chan struct{} {
close := make(chan struct{})
// Our database is modern and concurrent, so we can handle a number of
// concurrent requests at once, but not too many. If too many come in, some
// will have to wait.
semaphore := make(chan struct{}, maxConcurrentRequests)
go func() {
for {
select {
case r := <-d.requests:
semaphore <- struct{}{}
go func() {
d.processRequest(r)
<-semaphore
}()
case <-close:
return
}
}
}()
return close
}
func (d *Database) processRequest(r Request) {
time.Sleep(10 * time.Millisecond)
r.response <- Response{}
}
Our open-loop benchmark looks like this:
// Returns the average latency observed.
func openLoop(db *Database, qps float64) float64 {
var wg sync.WaitGroup
ticker := time.NewTicker(10 * time.Millisecond)
done := make(chan struct{})
rate := 1.0 / qps
debt := 0.0
previousTime := time.Now()
sumLatency := 0.0
requests := 0
latencies := make(chan float64)
wg.Add(1)
go func() {
for {
select {
case <-done:
wg.Done()
return
case latency := <-latencies:
requests += 1
sumLatency += latency
case <-ticker.C:
debt += time.Since(previousTime).Seconds()
previousTime = time.Now()
// We need to catch up if we're behind.
for debt >= rate {
debt -= rate
go func() {
start := time.Now()
db.Request()
latencies <- time.Since(start).Seconds()
}()
}
}
}
}()
time.Sleep(10 * time.Second)
close(done)
wg.Wait()
return sumLatency / float64(requests)
}
Recall that the way an unbounded queueing system like this responds to an open-loop benchmark is that if the request generation rate is greater than the request processing rate, the queue, and accordingly, latency, will grow without bound. If the request generation rate is within the request processing rate, then the queue will be effectively empty all of the time.
So, what we're interested in observing is the value of qps
above which causes our latencies to grow without bound. We can find this with a simple gallop search, where we keep doubling the qps until we exceed the target average latency, followed by a binary search to find the exact location.
The configuration of our server is such that each request takes 10ms, and we allow up to five concurrent requests at any given time. So in theory, the server should be able to handle at most 500 queries per second. But it will probably be a little less than that.
func main() {
// Use the average latency as a signal that the system is overloaded.
targetLatency := 0.015
run := 0
// Gallop search to find the right ballpark.
qps := 1.0
for {
db := NewDatabase()
closeDb := db.Run()
latency := openLoop(&db, qps)
fmt.Printf("%d,%f,%f\n", run, qps, latency)
run++
closeDb <- struct{}{}
if latency > targetLatency {
break
}
qps *= 2
}
// Then we can do a binary search to find the maximum QPS that meets the target
// latency.
low := qps / 2
high := qps
for {
if high-low < 10.0 {
break
}
db := NewDatabase()
closeDb := db.Run()
qps = (low + high) / 2
latency := openLoop(&db, qps)
fmt.Printf("%d,%f,%f\n", run, qps, latency)
run++
closeDb <- struct{}{}
if latency > targetLatency {
high = (low + high) / 2
} else {
low = (low + high) / 2
}
}
}
If we run this, we get some fun data. I've been cosplaying as a data scientist again and re-learning how to use Jupyter for visualizing data, check this bad boy out. Green dots are successful runs, red dots are unsuccessful:
We get a maximum observed QPS of 480. Which is a pretty reasonable number based on our back-of-the-napkin estimate of "a little bit less than 500."
Cool, so this worked! It was a little finicky, though. We had to do this search algorithm, that was tracking the observed latency, which is a little finicky, and one outlier result could have messed up the whole search procedure, so in practice we'd probably want to run each of these trials multiple times, or something.
There's an easier to do this, it turns out. We talked last time about how using a closed-loop benchmark is dangerous for measuring latency, but it turns out it can actually be very effective for measuring throughput. The property that the benchmarking harness goes easier on the database as the database slows down is actually a benefit here: as we ramp up the number of workers, the load just eases up, rather than the database completely falling over and queueing unboundedly. This means that the database, chugging through requests as fast as it can, should max out its throughput without building up an unmanageable number of requests and dying like it would with an open-loop benchmark.
(Of course, if the number of workers was really high, we might start to overload the number of resources that the database used to manage them, but in a real-world scenario it would start shedding them. This is a consequence of the way that with sufficiently many workers, a closed-loop benchmark starts to resemble an open-loop benchmark.)
Here's our closed-loop benchmark, modified to take the number of workers as a parameter and returning the observed QPS.
// Returns the observed QPS.
func closedLoop(db *Database, workers int) float64 {
txnCounts := make([]int, workers)
var wg sync.WaitGroup
chans := make([]chan struct{}, workers)
for i := 0; i < workers; i++ {
wg.Add(1)
done := make(chan struct{})
chans[i] = done
txns := 0
go func() {
for {
select {
case <-done:
txnCounts[i] = txns
wg.Done()
return
default:
db.Request()
txns++
}
}
}()
}
time.Sleep(10 * time.Second)
for i := 0; i < workers; i++ {
chans[i] <- struct{}{}
close(chans[i])
}
wg.Wait()
total := 0
for _, cnt := range txnCounts {
total += cnt
}
return float64(total) / float64(seconds)
}
Running this, we get some more data that is fun to plot (I'll be using words like "regression" soon):
Which puts us at around the same value as before, around 470 QPS. Note that the QPS scales linearly with the concurrency until we saturate the maximum number of queries in flight at any one time (five) at which point it pretty dramatically caps out.
Of course, this is a very simplified example, and we knew the value we should find at the end of it all ("a little less than 500").
I believe intuitively that these two strategies will, in an idealized system, always arrive at the same value, but I don't have a proof of this fact. I guess maybe it's just a direct consequence of there being a maximum throughput of the system in the first place? That sounds true, I guess. If you have a more convincing reason then let me know.
This is a trick I learned from Denis Rystsov (of CASPaxos), and I think it's a pretty good one—I'm terrified of mushy real-world systems and benchmarking them and I think pre-built experiments like this are useful tools to have in your repertoire. Both kinds of benchmarks (open and closed) are useful! And it's important to know when to use one over the other, and when one will provide misleading results.