Skip to content

Commit

Permalink
Fix lifetime issues and some namings
Browse files Browse the repository at this point in the history
  • Loading branch information
ypatia committed Dec 18, 2024
1 parent 88c0ecb commit d391375
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 27 deletions.
40 changes: 37 additions & 3 deletions tiledb/sm/query/readers/result_tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ ResultTile::ResultTile(
coord_func_ = &ResultTile::zipped_coord;
}

ResultTile::~ResultTile() {
try {
// Wait for all tasks to be done
wait_all_attrs();
wait_all_coords();
} catch (...) {
return;
}
}

/* ****************************** */
/* API */
/* ****************************** */
Expand Down Expand Up @@ -130,7 +140,7 @@ void ResultTile::erase_tile(const std::string& name) {

// Handle attribute tile
for (auto& at : attr_tiles_) {
if (at.first == name) {
if (at.second.has_value() && at.first == name) {
at.second.reset();
return;
}
Expand Down Expand Up @@ -262,8 +272,32 @@ ResultTile::TileTuple* ResultTile::tile_tuple(const std::string& name) {
}

void ResultTile::wait_all_coords() const {
for (auto& coord_tile : coord_tiles_) {
coord_tile.second->fixed_tile().data_as<char>();
for (auto& at : coord_tiles_) {
auto& tile_tuple = at.second;
if (tile_tuple.has_value()) {
tile_tuple.value().fixed_tile().data();
if (tile_tuple.value().var_tile_opt().has_value()) {
tile_tuple.value().var_tile_opt().value().data();
}
if (tile_tuple.value().validity_tile_opt().has_value()) {
tile_tuple.value().validity_tile_opt().value().data();
}
}
}
}

void ResultTile::wait_all_attrs() const {
for (auto& at : attr_tiles_) {
const auto& tile_tuple = at.second;
if (tile_tuple.has_value()) {
tile_tuple.value().fixed_tile().data();
if (tile_tuple.value().var_tile_opt().has_value()) {
tile_tuple.value().var_tile_opt().value().data();
}
if (tile_tuple.value().validity_tile_opt().has_value()) {
tile_tuple.value().validity_tile_opt().value().data();
}
}
}
}

Expand Down
15 changes: 14 additions & 1 deletion tiledb/sm/query/readers/result_tile.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,16 @@ class ResultTile {
return validity_tile_.value();
}

/** @returns Var tile. */
const std::optional<Tile>& var_tile_opt() const {
return var_tile_;
}

/** @returns Validity tile. */
const std::optional<Tile>& validity_tile_opt() const {
return validity_tile_;
}

/** @returns Fixed tile. */
const Tile& fixed_tile() const {
return fixed_tile_;
Expand Down Expand Up @@ -457,7 +467,7 @@ class ResultTile {
DISABLE_MOVE_AND_MOVE_ASSIGN(ResultTile);

/** Default destructor. */
~ResultTile() = default;
virtual ~ResultTile();

/* ********************************* */
/* API */
Expand Down Expand Up @@ -746,6 +756,9 @@ class ResultTile {
/* Waits for all coord tiles results to be available */
void wait_all_coords() const;

/* Waits for all attr tiles results to be available */
void wait_all_attrs() const;

protected:
/* ********************************* */
/* PROTECTED ATTRIBUTES */
Expand Down
8 changes: 4 additions & 4 deletions tiledb/sm/tile/tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ TileBase::TileBase(
const uint64_t cell_size,
const uint64_t size,
tdb::pmr::memory_resource* resource,
const bool ignore_tasks)
const bool skip_waiting_on_io_task)
: resource_(resource)
, data_(tdb::pmr::make_unique<std::byte>(resource_, size))
, size_(size)
, cell_size_(cell_size)
, format_version_(format_version)
, type_(type)
, ignore_tasks_(ignore_tasks) {
, skip_waiting_on_io_task_(skip_waiting_on_io_task) {
/*
* We can check for a bad allocation after initialization without risk
* because none of the other member variables use its value for their own
Expand Down Expand Up @@ -211,7 +211,7 @@ WriterTile::WriterTile(

void TileBase::read(
void* const buffer, const uint64_t offset, const uint64_t nbytes) const {
if (!ignore_tasks_) {
if (!skip_waiting_on_io_task_) {
if (unfilter_data_compute_task_.valid()) {
throw_if_not_ok(unfilter_data_compute_task_.wait());
} else {
Expand Down Expand Up @@ -312,7 +312,7 @@ void WriterTile::write_var(const void* data, uint64_t offset, uint64_t nbytes) {

uint64_t Tile::load_chunk_data(
ChunkData& unfiltered_tile, uint64_t expected_original_size) {
// std::scoped_lock<std::recursive_mutex> lock{filtered_data_io_task_mtx_};
std::scoped_lock<std::recursive_mutex> lock{filtered_data_io_task_mtx_};
assert(filtered());

Deserializer deserializer(filtered_data(), filtered_size());
Expand Down
38 changes: 19 additions & 19 deletions tiledb/sm/tile/tile.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,28 @@ class TileBase {
* @param cell_size The cell size.
* @param size The size of the tile.
* @param resource The memory resource to use.
* @param data_io_task The I/O task to wait on for data to be valid.
* @param skip_waiting_on_io_task whether to skip waiting on I/O tasks and
* directly access data() or block. By default is false, so by default we
* block waiting. Used when we create generic tiles or in testing.
*/
TileBase(
const format_version_t format_version,
const Datatype type,
const uint64_t cell_size,
const uint64_t size,
tdb::pmr::memory_resource* resource,
const bool ignore_tasks = false);
const bool skip_waiting_on_io_task = false);

DISABLE_COPY_AND_COPY_ASSIGN(TileBase);
DISABLE_MOVE_AND_MOVE_ASSIGN(TileBase);

~TileBase() {
// TODO: destructor should not throw, catch any exceptions
virtual ~TileBase() {
if (unfilter_data_compute_task_.valid()) {
auto st = unfilter_data_compute_task_.wait();
try {
auto st = unfilter_data_compute_task_.wait();
} catch (...) {
return;
}
}
}

Expand Down Expand Up @@ -118,7 +123,7 @@ class TileBase {

/** Returns the internal buffer. */
inline void* data() const {
if (!ignore_tasks_) {
if (!skip_waiting_on_io_task_) {
if (unfilter_data_compute_task_.valid()) {
throw_if_not_ok(unfilter_data_compute_task_.wait());
} else {
Expand Down Expand Up @@ -193,18 +198,12 @@ class TileBase {
/** The tile data type. */
Datatype type_;

const bool ignore_tasks_;
/** Whether to block waiting for io data to be ready before accessing data()
*/
const bool skip_waiting_on_io_task_;

/** Compute task to check and block on if unfiltered data is ready. */
mutable ThreadPool::SharedTask unfilter_data_compute_task_;

/**
* Lock for checking task, since this tile can be used by multiple threads.
* The ThreadPool::SharedTask lets multiple threads copy the task, but it
* doesn't let multiple threads access a single task itself. Due to this we
* need a mutext since the tile will be accessed by multiple threads.
*/
mutable std::recursive_mutex unfilter_data_compute_task_mtx_;
};

/**
Expand Down Expand Up @@ -285,9 +284,12 @@ class Tile : public TileBase {
DISABLE_COPY_AND_COPY_ASSIGN(Tile);

~Tile() {
// TODO: destructor should not throw, catch any exceptions
if (unfilter_data_compute_task_.valid()) {
auto st = unfilter_data_compute_task_.wait();
try {
auto st = unfilter_data_compute_task_.wait();
} catch (...) {
return;
}
}
}

Expand Down Expand Up @@ -519,7 +521,6 @@ class WriterTile : public TileBase {
* @param cell_size The cell size.
* @param size The size of the tile.
* @param memory_tracker The memory tracker to use.
* @param data_io_task The I/O task to wait on for data to be valid.
*/
WriterTile(
const format_version_t format_version,
Expand All @@ -536,7 +537,6 @@ class WriterTile : public TileBase {
* @param cell_size The cell size.
* @param size The size of the tile.
* @param resource The memory resource to use.
* @param data_io_task The I/O task to wait on for data to be valid.
*/
WriterTile(
const format_version_t format_version,
Expand Down

0 comments on commit d391375

Please sign in to comment.