Skip to content

removed redundant and snapshot from Wal::BasicEncoder #74

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: pp
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ void load_protobuf_wal_and_save_gorilla_to_wal_with_redundants::execute(const Co
<< " ns" << std::endl;
}

auto redundant = wal.write(out);
out << redundant->encoders.size();
out << wal;
out.clear();
}

Expand Down
3 changes: 1 addition & 2 deletions pp/wal/encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ class GenericEncoder {
template <class Stats, class Output>
inline __attribute__((always_inline)) void finalize(Stats* stats, Output& out) {
write_stats(stats);

encoder_.write(out);
out << encoder_;
}
};

Expand Down
49 changes: 1 addition & 48 deletions pp/wal/tests/wal_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,53 +377,6 @@ TEST_F(WalFixture, BasicEncoderBasicDecoder) {
EXPECT_EQ(writer_latest_sample, reader.latest_sample());
}

TEST_F(WalFixture, Snapshots) {
using WALEncoder = PromPP::WAL::BasicEncoder<PromPP::Primitives::SnugComposites::LabelSet::EncodingBimap<BareBones::Vector>>;
using WALDecoder = PromPP::WAL::BasicDecoder<PromPP::Primitives::SnugComposites::LabelSet::EncodingBimap<BareBones::Vector>>;
WALEncoder encoder(2, 3);
PromPP::Primitives::SnugComposites::LabelSet::EncodingBimap<BareBones::Vector> encoding_bimap;
WALDecoder decoder{encoding_bimap, PromPP::WAL::BasicEncoderVersion::kV3};
std::stringstream writer_stream;
std::vector<std::unique_ptr<WALEncoder::Redundant>> redundants;
std::ofstream devnull("/dev/null");
for (int i = 0; i < 10; ++i) { // segments
for (int j = 0; j < 10; ++j) { // samples
const LabelSetForTest label_set = generate_label_set();
const samples_sequence_type samples = generate_samples(1000, 1.123);
const TimeSeriesForTest timeseries = TimeSeriesForTest(label_set, samples);
encoder.add(timeseries);
}
if (i < 5) {
writer_stream << encoder;
writer_stream >> decoder;
decoder.process_segment([](uint32_t, uint64_t, double) {});
} else {
redundants.emplace_back(encoder.write(devnull));
}
}

std::stringstream stream_buffer;

// save wal
encoder.snapshot(redundants, stream_buffer);

EXPECT_GT(stream_buffer.tellp(), 0);

// read wal from snapshot
PromPP::Primitives::SnugComposites::LabelSet::EncodingBimap<BareBones::Vector> encoding_bimap2;
WALDecoder decoder2{encoding_bimap2, PromPP::WAL::BasicEncoderVersion::kV3};
decoder2.load_snapshot(stream_buffer);

EXPECT_GT(stream_buffer.tellg(), 0);

// check label sets
std::stringstream reader_sbuf1, reader_sbuf2;
reader_sbuf1 << decoder.label_sets().checkpoint();
reader_sbuf2 << decoder2.label_sets().checkpoint();
EXPECT_EQ(reader_sbuf1.view(), reader_sbuf2.view());
EXPECT_EQ(decoder.sample_decoder().gorilla(), decoder2.sample_decoder().gorilla());
}

TEST_F(WalFixture, BasicEncoderMany) {
using WALEncoder = PromPP::WAL::BasicEncoder<PromPP::Primitives::SnugComposites::LabelSet::EncodingBimap<BareBones::Vector>>;
using AddManyCallbackType = WALEncoder::add_many_generator_callback_type;
Expand Down Expand Up @@ -523,7 +476,7 @@ class CreateBasicEncoderFromBasicDecoderFixture : public ::testing::Test {
timeseries.label_set().add(label_set);

encoder.add(timeseries);
encoder.write(stream);
stream << encoder;

for (const auto decoder : decoders) {
EXPECT_NO_THROW(std::ispanstream(stream.view()) >> *decoder);
Expand Down
185 changes: 4 additions & 181 deletions pp/wal/wal.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,21 +237,6 @@ class BasicEncoder {
}
};

struct __attribute__((__packed__)) EncoderWithID {
BareBones::Encoding::Gorilla::StreamEncoder<BareBones::Encoding::Gorilla::ZigZagTimestampEncoder<>, BareBones::Encoding::Gorilla::ValuesEncoder> encoder;
Primitives::LabelSetID id;
};

struct Redundant {
uint32_t segment;
uint32_t encoders_count;
checkpoint_type label_sets_checkpoint;
BareBones::Vector<EncoderWithID> encoders;

inline __attribute__((always_inline)) Redundant(uint32_t _segment, const checkpoint_type& _label_sets_checkpoint, uint32_t _encoders_count)
: segment(_segment), encoders_count(_encoders_count), label_sets_checkpoint(_label_sets_checkpoint) {}
};

template <Bitset BitsetType = roaring::Roaring>
struct StaleNaNsState {
void* parent;
Expand Down Expand Up @@ -291,7 +276,7 @@ class BasicEncoder {
uint16_t shard_id_ = 0;
uint8_t pow_two_of_total_shards_ = 0;

std::unique_ptr<Redundant> encode_segment(std::ostream& stream) {
void encode_segment(std::ostream& stream) {
BareBones::BitSequence gorilla_ts_bitseq, gorilla_v_bitseq;
BareBones::EncodedSequence<BareBones::Encoding::DeltaRLE<>> ls_id_delta_rle_seq;
BareBones::EncodedSequence<BareBones::Encoding::DeltaZigZagRLE<>> ts_delta_rle_seq;
Expand All @@ -314,16 +299,9 @@ class BasicEncoder {

gorilla_.resize(label_sets_.next_item_index());

auto redundant = std::make_unique<Redundant>(next_encoded_segment_, label_sets_checkpoint_, label_sets_.size());
constexpr Primitives::LabelSetID max_last_id = std::numeric_limits<Primitives::LabelSetID>::max();
Primitives::LabelSetID last_id = max_last_id;
const auto label_sets_checkpoint = label_sets_checkpoint_;
if (ts_delta_rle_is_worth_trying) {
buffer_.for_each([&](Primitives::LabelSetID ls_id, Primitives::Timestamp ts, Primitives::Sample::value_type v) {
assert(last_id == max_last_id || ls_id >= last_id);
if (last_id == max_last_id || last_id != ls_id) {
last_id = ls_id;
redundant->encoders.emplace_back(gorilla_[ls_id], ls_id);
}
gorilla_[ls_id].encode(ts - ts_base_, v, gorilla_ts_bitseq, gorilla_v_bitseq);

ls_id_delta_rle_seq.push_back(ls_id);
Expand All @@ -335,10 +313,6 @@ class BasicEncoder {
});
} else {
buffer_.for_each([&](Primitives::LabelSetID ls_id, Primitives::Timestamp ts, Primitives::Sample::value_type v) {
if (last_id == max_last_id || last_id != ls_id) {
last_id = ls_id;
redundant->encoders.emplace_back(gorilla_[ls_id], ls_id);
}
gorilla_[ls_id].encode(ts - ts_base_, v, gorilla_ts_bitseq, gorilla_v_bitseq);

ls_id_delta_rle_seq.push_back(ls_id);
Expand All @@ -364,8 +338,8 @@ class BasicEncoder {

// write new label sets
label_sets_checkpoint_ = label_sets_.checkpoint();
label_sets_bytes_ += label_sets_checkpoint_.save_size(&redundant->label_sets_checkpoint);
lz4stream_ << (label_sets_checkpoint_ - redundant->label_sets_checkpoint);
label_sets_bytes_ += label_sets_checkpoint_.save_size(&label_sets_checkpoint);
lz4stream_ << (label_sets_checkpoint_ - label_sets_checkpoint);
if constexpr (shrink_lss) {
label_sets_.shrink_to_checkpoint_size(label_sets_checkpoint_);
}
Expand Down Expand Up @@ -401,7 +375,6 @@ class BasicEncoder {
samples_ += buffer_.samples_count();
buffer_.clear();
++next_encoded_segment_;
return redundant;
}

static auto generate_uuid() {
Expand Down Expand Up @@ -603,108 +576,6 @@ class BasicEncoder {
wal.encode_segment(out);
return out;
}

template <class OutputStream>
inline __attribute__((always_inline)) std::unique_ptr<Redundant> write(OutputStream& out) {
return encode_segment(out);
}

template <std::ranges::bidirectional_range Iterator, typename OutputStream>
requires std::is_same_v<typename std::iterator_traits<typename Iterator::iterator>::value_type, Redundant*> ||
std::is_same_v<typename std::iterator_traits<typename Iterator::iterator>::value_type, std::unique_ptr<Redundant>>
inline __attribute__((always_inline)) void snapshot(Iterator& it, OutputStream& out) {
assert(next_encoded_segment_ > 0);
uint32_t segment = (std::begin(it) != std::end(it)) ? (*std::begin(it))->segment : next_encoded_segment_;
auto label_sets_checkpoint = (std::begin(it) != std::end(it)) ? (*std::begin(it))->label_sets_checkpoint : label_sets_.checkpoint();

auto encoders_count = segment == next_encoded_segment_ ? gorilla_.size() : (*it.begin())->encoders_count;
if (encoders_count > gorilla_.size()) {
throw BareBones::Exception(0xfd921d184ca372ee, "Encoder's Snapshot %d has more encoders in first redundant (%u) than already on the Writer (%u)", segment,
encoders_count, gorilla_.size());
}

decltype(gorilla_) encoders;
encoders.reserve(encoders_count);
std::ranges::copy(gorilla_ | std::ranges::views::take(encoders_count), std::back_inserter(encoders));
BareBones::Bitset changed;
changed.resize(encoders.size());

uint32_t redundant_segment_id = segment;
bool has_any_redundant_segment_id = false;
for (auto& redundant : it) {
// check that all redundant ids are sequential and throw error if not.
if (!has_any_redundant_segment_id) {
has_any_redundant_segment_id = true;
} else {
if (not(redundant->segment == redundant_segment_id + 1)) {
throw BareBones::Exception(0xcddf21b039fe5a18,
"The next redundant in encoders' snapshot (segment_id=%d) must be in order with previous redundant (segment_id=%d)",
redundant->segment, redundant_segment_id + 1);
}
redundant_segment_id = redundant->segment;
}

for (const auto& encoder_with_id : redundant->encoders) {
if (encoder_with_id.id >= encoders_count) {
continue;
}
if (!changed[encoder_with_id.id]) {
changed.set(encoder_with_id.id);
encoders[encoder_with_id.id] = encoder_with_id.encoder;
}
}
}

// after cycle there is a last segment id.
// if have any redundants check that it not equals current redundant_segment_id and next_encoded_segment_ - 1,
// redundants must be consistent
if (has_any_redundant_segment_id && redundant_segment_id != next_encoded_segment_ - 1) {
throw BareBones::Exception(0xc318a18809c8167e,
"The encoder's snapshot doesn't have the latest redundant with expected segment_id=%d, the last redundant has segment_id=%d",
(next_encoded_segment_ - 1), redundant_segment_id);
}

BareBones::Vector<
BareBones::Encoding::Gorilla::StreamDecoder<BareBones::Encoding::Gorilla::ZigZagTimestampDecoder<>, BareBones::Encoding::Gorilla::ValuesDecoder>>
decoders;

// move out the encoders into decoders.
for (auto& encoder : encoders) {
decoders.emplace_back(encoder.state());
}

auto original_exceptions = out.exceptions();
auto sg1 = std::experimental::scope_exit([&]() { out.exceptions(original_exceptions); });
out.exceptions(std::ifstream::failbit | std::ifstream::badbit);

// write version
out.put(1);

// write uuid
out.write(reinterpret_cast<const char*>(uuid_.as_bytes().data()), 16);

// write shard ID
out.write(reinterpret_cast<const char*>(&shard_id_), 2);

// Write total shards count (in power of two).
out.write(reinterpret_cast<const char*>(&pow_two_of_total_shards_), 1);

// write prev segment number
// It will be used in reader as a last processed segment number (which is 1 less than segment value)
segment = segment - 1;
out.write(reinterpret_cast<const char*>(&segment), sizeof(segment));

// write base label sets snapshot
label_sets_checkpoint.save(out);
// TODO: calculate additional segments' deltas, see !155#note_240342.

// write decoders
uint32_t decoders_count = decoders.size();
out.write(reinterpret_cast<const char*>(&decoders_count), sizeof(decoders_count));
for (auto& decoder : decoders) {
decoder.save(out);
}
}
};

struct SampleCrc {
Expand Down Expand Up @@ -957,54 +828,6 @@ class BasicDecoder {
this->pow_two_of_total_shards_ == reader.pow_two_of_total_shards_;
}

template <typename InputStream>
void load_snapshot(InputStream& in) {
assert(segment_gorilla_v_bitseq_.empty());

// the snapshot must be loaded from first segment! (from review)
// !155#note_241482
if (last_processed_segment_ + 1 != 0) {
throw BareBones::Exception(0x25fa0d279a79b3f6, "Can't load Snapshot into non-empty Decoder");
}

// read version
uint8_t version = in.get();

// return successfully, if stream is empty
if (in.eof())
return;

// check version
if (version != 1) {
throw BareBones::Exception(0xccd8f4f87758ca2f, "Invalid snapshot version (%d), only version 1 is supported", version);
}

auto original_exceptions = in.exceptions();
auto sg1 = std::experimental::scope_exit([&]() { in.exceptions(original_exceptions); });
in.exceptions(std::ifstream::failbit | std::ifstream::badbit | std::ifstream::eofbit);

// read uuid
auto uuid = read_uuid(in);

// associate uuid, assuming that it's a first segment
uuid_ = uuid;

// read shard ID
in.read(reinterpret_cast<char*>(&shard_id_), sizeof(shard_id_));

// read pow of two of total shards
in.read(reinterpret_cast<char*>(&pow_two_of_total_shards_), sizeof(pow_two_of_total_shards_));

// read segment number
in.read(reinterpret_cast<char*>(&last_processed_segment_), sizeof(last_processed_segment_));

// read label sets snapshot
label_sets_.load(in);

// read decoders
sample_decoder_.load(in);
}

inline __attribute__((always_inline)) const LabelSetsTable& label_sets() const { return label_sets_; }

inline __attribute__((always_inline)) uint32_t series() const { return label_sets_.size(); }
Expand Down
Loading