Skip to content

[C++][Acero] Add the ability to merge already-sorted input nodes #38381

@JerAguilon

Description

@JerAguilon

Describe the enhancement requested

I have a use case wherein I want to asof join two datasets. However, each dataset on the left and right hand side is sharded across N files. Each file is individually sorted from top to bottom. Imagine a dataset composed of two files:

file A

ts,col
1,"foo"
3,"foo"
5,"foo"

file B

ts,col
2,"bar"
3,"bar"
6,"bar"

After merging

ts,col
1,"foo"
2,"bar"
3,"bar"
3,"foo"
5,"foo"
6,"bar"

Sorting using order_by isn't tenable for huge datasets. It'd be more efficient to stream each input table and emit batches in sorted order via a heap.

Merging N files this way is actually computationally similar to asof_join_node.cc in that you can efficiently do it by buffering data from all your input nodes and spawning a process thread that emits data in sorted fashion.

I propose refactoring some of the guts of asof_join_node.cc so that we can achieve the above computation. I think that this will unlock lots of potential that is hidden behind specialized databases like KDB.

I have a draft PR for the idea here: #38380 and have locally tested it for O(100GB) files.

Would be curious to get opinions from @icexelloss, @bkietz, and @westonpace on this approach!

Component(s)

C++

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions