Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rr/sc 60366 sparse global order reader merge #5417

Open
wants to merge 167 commits into
base: main
Choose a base branch
from

Conversation

rroelke
Copy link
Contributor

@rroelke rroelke commented Jan 2, 2025

The story contains more details, but in brief this pull request adds an additional mode to the sparse global order reader in which we pre-process the minimum bounding rectangles of all tiles from all fragments to determine a single global order in which all of the tiles must be loaded.

This pre-processing step is implemented using a "parallel merge" algorithm which merges the tiles from each fragment (which are arranged in global order within the fragment).

Parallel Merge

The parallel merge code lives in tiledb/common/algorithm/parallel_merge.h. It is written generically to merge streams of a copyable type T using any type which can compare T (default is std::less<T> of course). An explanation of the algorithm is provided within the file.

The top-level function parallel_merge is asynchronous, i.e. it returns a future which can be polled to see how much of the merge has already completed. This enables callers to begin processing merged data from the head of the eventual output before the tail of the eventual output has finished.

Sparse Global Order Reader

We extend the sparse global order reader with a new configuration sm.query.sparse_global_order.preprocess_tile_merge. If nonzero, the sparse global order reader will run a parallel merge on the fragments to find the unified tile order and then use that to populate result tiles.

  • preprocess_compute_result_tile_order kicks off the parallel merge.
  • create_result_tiles_using_preprocess advances along the global tile order to create result tiles.

The fields which are used for the old "per fragment result tiles" mode have been encapsulated into their own struct to emphasize that their use does not overlap with this new mode.

create_result_tiles_using_preprocess does not need a per-fragment memory budget; instead it pulls tiles off of the globally ordered tile list until it has saturated the memory budget as much as it can.

Tiles in the unified global order are arranged on their lower bound. The upper bounds of the tiles in the list may be out of order. To prevent cells from tile A to be emitted out of order with cells from tile B, we augment add_next_cell_to_queue to check the lower bound of the tiles which have not populated result tiles yet.

The value of sm.query.sparse_global_order.preprocess_tile_merge configures the minimum amount of work that each parallel unit of the merge will do. This is so we can benchmark with different values without re-compiling; we will either want to recommend a value to customers, or choose one and flip this to a boolean.

Serialization

The unified global tile order is state which must be communicated back and forth between the client and REST server. We can either serialize this whole list (16 bytes per tile across all fragments) or we can re-compute the parallel merge each time we run a submit on the REST server side. The current implementation chooses the latter, assuming that smaller messages are preferred to the additional CPU overhead.

Testing

Testing of all changes is augmented using rapidcheck. With this library, rather than writing some test data examples, we write properties which contain generic claims about what the expected output must look like for a given input. The rapidcheck runtime generates arbitrary inputs to the property to test our claims.

The parallel merge algorithm is tested in unit_parallel_merge.cc and has rapidcheck properties implemented for each step of the algorithm.

The sparse global order reader tests are in unit-sparse-global-order-reader.cc. The gist is that we have a generic function CSparseGlobalOrderFx::run which writes a bunch of fragments, and then reads the data back in global order, comparing against an expected result. There's a fair bit of refactoring to support this. For 1D arrays we have tests Sparse global order reader: fragment skew, fragment interleave, and fragment many overlap which set up inputs which are expected to exercise some of the edge cases in the global order reader. And then we add rapidcheck 1D and rapidcheck 2D tests which generate totally arbitrary 1D and 2D inputs respectively.

Performance Results

I still have more to do here, but things are looking pretty good... will fill in more details here as I have them. Notes are here.


TYPE: FEATURE | BUG | IMPROVEMENT
DESC: sparse global order reader determine global order of result tiles

stdx::reverse_comparator<stdx::or_equal<GlobalCellCmp>> cmp(
stdx::or_equal<GlobalCellCmp>(array_schema_.domain()));
if (tile_queue.empty()) {
length = to_process.max_slab_length(global_order_lower_bound, cmp);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs some work. The intent of this code is correct (i.e. we must bound the slab here to avoid out-of-order results) but the implementation of that intent may not be.

Running on a real-world 2D array, with a highly selective subarray I observed horrid performance, and I believe this code here is responsible.
In my repro, the lower end of emit_bound is (3352576, 2) but the current coordinate from rc is (3353524, 1484). The tile extents are 2048 in both dimensions, so this coordinate occurs after emit_bound in the global order.

Surprisingly this doesn't lead to an incorrect result (or at least, a result which is different than with this code OFF), but it does cause the accumulation of lots of result slabs of length 1, which is horridly slow.

The next_global_order_tile is {fragment_idx_ = 121, tile_idx_ = 643}, what are the bounds of the previous tile which should occur earlier in global order?

Tile 642's MBR has a lower bound of (3352576, 40963) and an upper bound of (3354623, 55294).
Tile 643's MBR has a lower bound of (3352576, 2) and an upper bound of (3354905, 59234).

This smells kind of funky but it does check out if the MBR is the minimum bound on each dimension rather than the value of the minimum coordinate in the tile. Tile 643 straddles two tiles in dimension 0 so going up to 2 for dimension 1 is plausible.

And if that's the case (which I hope to confirm soon), then using the MBR here is not correct, we need a different way to get the safe-to-emit bound here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now mostly resolved - I say mostly because the changes I pushed in 2efe0fa are actually still not correct, but the code looks more similar to what I believe to be the correct code.

It is not only allowed, but commonplace for the MBRs of the tiles to be out of order with respect to each other.

I added a lengthy comment to preprocess_compute_result_tile_order which goes into detail about what the merge bound should be instead.

I'm not going to resolve this yet. I want to add another 2D test for the condition which may provoke this. But I think this is definitely review-able now. There's a FIXME comment which explains what the current problem might be.

Copy link
Member

@ypatia ypatia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review part 2, everything but the changes in the readers.

test/performance/tiledb_submit_a_b.cc Show resolved Hide resolved
test/performance/tiledb_submit_a_b.cc Show resolved Hide resolved
tiledb/type/range/range.h Show resolved Hide resolved
tiledb/sm/serialization/tiledb-rest.capnp Show resolved Hide resolved
tiledb/common/pmr.h Show resolved Hide resolved
tiledb/common/algorithm/test/main.cc Outdated Show resolved Hide resolved
tiledb/common/algorithm/test/compile_algorithm_main.cc Outdated Show resolved Hide resolved
tiledb/common/algorithm/parallel_merge.h Show resolved Hide resolved
tiledb/common/algorithm/parallel_merge.h Show resolved Hide resolved
@@ -118,6 +118,8 @@ const std::string Config::SM_MEMORY_BUDGET_VAR = "10737418240"; // 10GB
const std::string Config::SM_QUERY_DENSE_QC_COORDS_MODE = "false";
const std::string Config::SM_QUERY_DENSE_READER = "refactored";
const std::string Config::SM_QUERY_SPARSE_GLOBAL_ORDER_READER = "refactored";
const std::string Config::SM_QUERY_SPARSE_GLOBAL_ORDER_PREPROCESS_TILE_MERGE =
"128";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a TODO or a reference to a ticket to eventually

we will either want to recommend a value to customers, or choose one and flip this to a boolean.

tiledb/sm/query/readers/sparse_index_reader_base.h Outdated Show resolved Hide resolved
tiledb/sm/query/readers/sparse_global_order_reader.h Outdated Show resolved Hide resolved
tiledb/sm/query/readers/sparse_global_order_reader.h Outdated Show resolved Hide resolved
tiledb/sm/query/readers/sparse_global_order_reader.h Outdated Show resolved Hide resolved
if (tile_.has_value()) {
return (*tile_)->coord((*tile_)->cell_num() - 1, d);
} else {
return mbr_.value().coord(d);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (mbr_.has_value()) ?
same for line 166.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In its current state, one of the two optional fields must be valid. I would prefer to express that using the constructors.

However this is the one part of this code which I expect needs some more tweaks - this may look different when I am done with that

: fragment_idx_(fragment_idx) {
}

unsigned fragment_idx_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some more documentation on member variables and methods of this class would help.

Comment on lines +217 to +220
} else if (!preprocess_tile_order_.enabled_) {
return memory_used_for_coords_total_ != 0;
} else if (preprocess_tile_order_.has_more_tiles()) {
return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else if (!preprocess_tile_order_.enabled_) {
return memory_used_for_coords_total_ != 0;
} else if (preprocess_tile_order_.has_more_tiles()) {
return false;
} else if (preprocess_tile_order_.enabled_ && preprocess_tile_order_.has_more_tiles()) {
return false;

ratio_coords_.c_str(),
&error) == TILEDB_OK);
REQUIRE(error == nullptr);
REQUIRE(memory_.apply(config) == nullptr);

REQUIRE(tiledb_ctx_alloc(config, &ctx_) == TILEDB_OK);
Copy link
Member

@ypatia ypatia Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you added a vfs_test_setup_ I'd expect not to allocate a fresh context but update the vfs_test_setup_.ctx_ instead and set ctx_ to that. Something like:

  vfs_test_setup_.update_config(config.ptr().get());
  ctx_ = vfs_test_setup_.ctx_c;

See for example: https://github.com/TileDB-Inc/TileDB/blob/main/test/src/test-capi-consolidation-plan.cc#L73

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the link!

9ea370e

Copy link
Member

@ypatia ypatia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't claim I understood everything and was able to verify correctness or completeness of all the, again excellent, code of this PR. But the thorough testing gives me high confidence, so LGTM once tests pass :)

test/src/unit-sparse-global-order-reader.cc Outdated Show resolved Hide resolved
* *there are ways to get around this but they are not implemented.
*/
template <InstanceType Instance>
static bool can_complete_in_memory_budget(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea but I think that's too much logic to be the source of truth. We'd need to unit test this one too 😅 Anyway, no objection in keeping it, I was just thinking out loud.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one does get pretty heavy coverage! Since ::run calls it on success or failure it does get some nice if-and-only-if coverage.

The problem with this one, though, is that it makes a strong assumption about what the merge bound is, and as I have since learned that assumption is only true in one dimension.

And yeah, I'm not intending to figure out how to adapt this for two dimensions, given that...

* Data tile 1 has a MBR of [(1, 1), (5, 4)].
* Data tile 2 has a MBR of [(5, 1), (10, 4)].
*
* The lower bound of data tile 2's MBR is less than the upper bound
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the excellent analysis and documentation of each scenario and especially the tricky points here but also across this work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants