Issue 13: Working with Big Data
Been heads down on work recently on some exciting new stuff I can't talk about just yet, but I've learned some nice tricks in the past couple of weeks!
What I'm Working On
Reducing Big Data Shuffle by Substituting Integers for Strings
Recently I was working on a BigQuery job that reads in 2.6 billion rows, expands some data in them, runs some GROUP BY
statements, and generates 20 billion rows from that. This was too much for BigQuery, which cancelled the job with this error:
Resources exceeded during query execution: Your project or organization exceeded the maximum disk and memory limit available for shuffle operations. Consider provisioning more slots, reducing query concurrency, or using more efficient logic in this job.
The really painful part of trying to work around this is that it took about 20 minutes for the query to fail each time. After various attempts, I finally came upon the solution.
Two of the columns in this data are extremely long strings, 64 and 128 characters respectively, and I had to use both of them as GROUP BY
keys at different points in the query. The GROUP BY
requires moving all the data for the same set of grouping keys to the same set of processing nodes. This is called a "shuffle" or a "repartition," and with this much data we're talking multiple TB of data moving around the system.
Of course (in retrospect), the best way to reduce the data to be shuffled around is not to rearrange the queries, but to reduce the amount of data there is. And as we all know, integers are smaller than strings.
So, the solution was to replace the strings with numbers, and then swap them back at the end as needed. It looks something like this.
WITH
claim_numbers AS (
SELECT ROW_NUMBER() OVER (ORDER BY claim_id) AS id, claim_id
FROM claims_table
GROUP BY claim_id
),
patient_numbers AS (
SELECT ROW_NUMBER() OVER (ORDER BY patient_id) AS id, patient_id
FROM claims_table
GROUP BY patient_id
),
claims AS (
SELECT cn.id AS claim_id, pn.id AS patient_id, other_columns
FROM claims_table
JOIN claim_numbers cn USING(claim_id)
JOIN patient_numbers pn USING(patient_id)
),
other_ctes AS (...)
-- And then can join on claim_numbers and patient_numbers again if you need
-- to restore the strings.
Not only does this technique work well for reducing shuffling, but it also speeds up large queries a lot too. I don't recommend doing this all the time, but it can be a lifesaver in cases like the problem I was working on.
A Rust Stream Implementation
Another thing I've been working on recently is adding S3 output support to a Rust tool that we have at work. This tool generates multi-GB database files from the output of the query I was working on above, and while it currently writes them to disk before copying them to S3, the new generation of this data is going to be so large that this is less tenable.
The short version of the task here was that I needed to create an async Stream<Item=Result<Bytes, std:io::Error>>
for the S3 client to read from. But the existing code predates when Rust had good async support, and the data is just written ad hoc to the file instead of to a stream. I also didn't want to rewrite a bunch of working, fairly complex code to async right now.
After looking around in the tokio
, tokio_util
, and other crates for a solution to this, I finally just decided to write my own and it turned out to be really easy. The Tokio mpsc
channel allows writing from both asynchronous and synchronous contexts, and so this object wraps one and turns it into a stream in under 30 lines of code.
use bytes:Bytes;
use futures::stream::Stream;
use tokio::sync::mpsc;
pub struct BytesSource {
receiver: mpsc::Receiver<Bytes>,
}
impl BytesSource {
pub fn new() -> (mpsc::Sender<Bytes>, BytesSource) {
BytesSource::with_capacity(100)
}
pub fn with_capacity(capacity: usize) -> (mpsc::Sender<Bytes>, BytesSource) {
let (sender, receiver) = mpsc::channel(capacity);
(sender, BytesSource { receiver })
}
}
impl Stream for BytesSource {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx).map(|item| item.map(|i| Ok(i)))
}
}
The poll_next
function just defers to the internal channel receiver, so everything works nice and easy.
Recommended Reading and Videos
Not much here today. since I've been spending most of my free time attempting to write some long-delayed blog posts on the side. There's a mostly-done draft of an Actix Web middleware post that hopefully will get out next week.
Postgres 14 beta 1 is out and has a bunch of nice new features, so it'll be great to see that later this year. You can read the full change list or checkout the official blog post for the highlights.
If you enjoyed this, I'd love if you share it with a friend (sign up here) or just reply to this email with your thoughts. Thanks for reading!