2024-11-20 20:16:00
www.denormalized.io
Databases are some of the most complex pieces of software conceived
since the advent of the computing age over half a century ago. [1]
Nearly every piece of technology ends up touching the database in some
way shape or form. Despite the ubiquity of Databases in the software
stack, the vast majority of developers have been conditioned to treat
databases as more or less a black box — complex dense pieces of
software conjured up by arch wizards and mavens ensconced in the
rarified elite tiers of database companies or places like Google. The
adage for the rest of us as it goes is never attempt to write your own
database.
That said, despite their longevity, we have seen continued innovation in
the space that first started with Hadoop arriving on the scene about 2
decades ago. The ClickBench website now lists over 50+ databases in its
benchmark suite [2] . And that’s just the analytics engines. With the
recent trends of rewriting everything big data in Rust [3] not a month
goes by without an interesting new project trending in the Hacker News.
In this post we will take a look at how easy (or hard) it is to build
Databases using Apache Datafusion and whether you can in fact, as a
mere mortal realistically build a custom database and innovate around
the developer experience.
Most modern databases can be disaggregated into compute and storage
layers, with sophisticated query engines responsible for the “compute”
portion of the database. A query engine typically consists of a Query
Parser, Logical Plan generation and then the Physical Plan generation to
run the computations on the Execution Engine. A query generally goes
through multiple optimization phases in the logical plan generation as
well as physical plan generation. No matter what the target use-case of
the end system is, the query engine more or less follows this model.
Given decades of database research that has gone into each of these
individual layers, the bar for writing a functional query engine with
table stakes features remains strikingly high. And you need to nail all
that before you can get around to writing your use-case specific
features. While there are many projects that help you write some of
these layers individually, Apache DataFusion remains the only game in
town that helps you with the entire gamut.
You can think of DataFusion extensible database development toolkit. At
its most basic level, you can use it as a query engine a la DuckDB with
its builtin SQL and Dataframe front ends, while at the same time you can
extend or even wholesale replace different layers to build your own
experience entirely.
In the rest of this post we will walk through how to extend DataFusion
to add your own operators to its execution engine and then weave it
through the Physical and Logical planners and expose it to the frontend.
DataFusion architecture
At Denormalized, we are building a duck db like single node experience
for stream processing applications. While DataFusion does have some
support for unbounded computations, it doesn’t have a streaming window
operator. Windows are at the core of stream processing applications,
they provide an easy way to bucket infinite data streams into finite
buckets so that we can apply aggregations over them.
For this tutorial we will implement a simple window operator for
infinite streams. Our operator will have the following signature —
pub fn window(
self
group_expr: Vec,
aggr_expr: Vec,
window_length: Duration,
slide: Option,
) -> Result { ... }
Writing the Execution Plan
An ExecutionPlan represents a node in the DataFusion Physical plan. This is where
the actual code with our custom computations would go. DataFusions execution model
is pull based, meaning that the execution starts at the sinks and works its
way up the physical plan. Calling the execute method on this trait
produces an asynchronous SendableRecordBatchStream of record batches
by incrementally deriving a partition of the output by running
computations over Execution Plan’s input.
In our use case, ExecutionPlan’s execute() method returns a struct
GroupedWindowAggStream which implements a RecordBatchStream, a wrapper around
futures::Stream trait. The actual computations should be implemented
within the poll_next() of the Stream implementation.
impl RecordBatchStream for GroupedWindowAggStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl Stream for GroupedWindowAggStream {
type Item = Result;
fn poll_next(mut self: Pin, cx: &mut Context) ->
Poll
Here in our example, calling the poll_next_inner for a streaming
window takes care of
-
Processing the incoming data.
-
Accumulating rows into open windows.
-
Updating the watermark (which is behind a mutex)
-
Opening new windows if required.
-
Closing any triggering windows and producing the output
RecordBatches from them.
Hooking into the Physical Planner
Having created our custom Execution Plan, we need to make the Physical
Planner aware of its existence. Implementing the ExtensionPlanner for our
ExtensionPlan is all we need to do here.
Extending the Logical Plan
Now that we have the custom Execution Plan implemented along with
ExtensionPlanner, we need to add a companion node to the Logical Plan.
This not only allows us to expose this to the SQL/DataFrame frontends
but also hook into the logical optimizers for optimizations such as the
predicate pushdowns.
In DataFusion we need to first implement a user defined Logical Plan
Node and then add a LogicalPlanExtension
to the logical plan builder which exposes this to SQL/DataFrame
frontends.
Logical Plan to Physical Plan
The last piece of the puzzle is the touch point where the logical plan
gets converted into the physical plan. For this, we will implement a
custom QueryPlanner that ensures that the physical planner is initialized with
the custom extensions we wrote for our ExecutionPlan.
Custom Optimization Rules
Since our operator implemented a group by aggregation, we need to ensure
that all the rows for a particular group end up in the same partition.
To that end we will add a new physical optimization
rule to add a HashPartition operator on our group keys.
Putting it all together
Finally, all we need to do is construct a DataFusion session with the
custom QueryPlanner we wrote above as well as the additional physical
optimizer rule we added and voila, now we have our own extended version
of DataFusion.
let state = SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.with_query_planner(Arc::new(StreamingQueryPlanner {}))
.with_optimizer_rules(get_default_optimizer_rules()).with_physical_optimizer_rule(Arc::new(EnsureHashPartititionOnGroupByForStreamingAggregates::new(),
))
.build();
Footnotes
Keep your files stored safely and securely with the SanDisk 2TB Extreme Portable SSD. With over 69,505 ratings and an impressive 4.6 out of 5 stars, this product has been purchased over 8K+ times in the past month. At only $129.99, this Amazon’s Choice product is a must-have for secure file storage.
Help keep private content private with the included password protection featuring 256-bit AES hardware encryption. Order now for just $129.99 on Amazon!
Support Techcratic
If you find value in Techcratic’s insights and articles, consider supporting us with Bitcoin. Your support helps me, as a solo operator, continue delivering high-quality content while managing all the technical aspects, from server maintenance to blog writing, future updates, and improvements. Support Innovation! Thank you.
Bitcoin Address:
bc1qlszw7elx2qahjwvaryh0tkgg8y68enw30gpvge
Please verify this address before sending funds.
Bitcoin QR Code
Simply scan the QR code below to support Techcratic.
Please read the Privacy and Security Disclaimer on how Techcratic handles your support.
Disclaimer: As an Amazon Associate, Techcratic may earn from qualifying purchases.