From bdc084805dca6cdfd04efd1be6658cafa0fc0f37 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 23 Oct 2023 14:18:51 +0100 Subject: [PATCH] Add chunk iterator API to osiris_log To allow readers to read chunks incrementally rather than in one go. osiris_log:read_chunk_parsed/2 has been updated to use the chunk iterator API as well but with a full chunk read ahead configuration. osiris_log:chunk_iterator/2 tries to estimate a read ahead amount based on the "credit hint" that can be passed to indicate how many entries are likely to be read in one go. --- src/osiris.erl | 5 +- src/osiris_log.erl | 258 +++++++++++++++++++++++++++----------- test/osiris_log_SUITE.erl | 70 +++++++++++ 3 files changed, 260 insertions(+), 73 deletions(-) diff --git a/src/osiris.erl b/src/osiris.erl index ff3cbf4..ed88dc9 100644 --- a/src/osiris.erl +++ b/src/osiris.erl @@ -74,6 +74,8 @@ batch() | {filter_value(), iodata() | batch()}. +%% returned when reading +-type entry() :: binary() | batch(). -type reader_options() :: #{transport => tcp | ssl, chunk_selector => all | user_data, filter_spec => osiris_bloom:filter_spec() @@ -89,7 +91,8 @@ retention_spec/0, timestamp/0, writer_id/0, - data/0]). + data/0, + entry/0]). -spec start_cluster(config()) -> {ok, config()} | diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 9bae498..8473037 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -29,6 +29,9 @@ init_data_reader/2, init_offset_reader/2, read_header/1, + chunk_iterator/1, + chunk_iterator/2, + iterator_next/1, read_chunk/1, read_chunk_parsed/1, read_chunk_parsed/2, @@ -375,7 +378,8 @@ index_files => [file:filename_all()], filter_size => osiris_bloom:filter_size() }. --type record() :: {offset(), osiris:data()}. +-type record() :: {offset(), osiris:entry()}. +-type offset_entry() :: {offset(), osiris:entry()}. -type offset_spec() :: osiris:offset_spec(). -type retention_spec() :: osiris:retention_spec(). -type header_map() :: @@ -452,6 +456,7 @@ -opaque state() :: #?MODULE{}. -export_type([state/0, + chunk_iterator/0, range/0, config/0, counter_spec/0, @@ -1364,6 +1369,136 @@ read_header(#?MODULE{cfg = #cfg{}} = State0) -> Err end. +-record(iterator, {fd :: file:io_device(), + next_offset :: offset(), + %% entries left + num_left :: non_neg_integer(), + %% any trailing data from last read + %% we try to capture at least the size of the next record + data :: undefined | binary(), + next_record_pos :: non_neg_integer()}). +-opaque chunk_iterator() :: #iterator{}. +-define(REC_MATCH_SIMPLE(Len, Rem), + <<0:1, Len:31/unsigned, Rem/binary>>). +-define(REC_MATCH_SUBBATCH(CompType, NumRec, UncompLen, Len, Rem), + <<1:1, CompType:3/unsigned, _:4/unsigned, + NumRecs:16/unsigned, + UncompressedLen:32/unsigned, + Len:32/unsigned, Rem/binary>>). + +-define(REC_HDR_SZ_SIMPLE_B, 4). +-define(REC_HDR_SZ_SUBBATCH_B, 11). +-define(ITER_READ_AHEAD_B, 64). + + +-spec chunk_iterator(state()) -> + {ok, header_map(), chunk_iterator(), state()} | + {end_of_stream, state()} | + {error, {invalid_chunk_header, term()}}. +chunk_iterator(State) -> + chunk_iterator(State, 1). + +-spec chunk_iterator(state(), pos_integer() | all) -> + {ok, header_map(), chunk_iterator(), state()} | + {end_of_stream, state()} | + {error, {invalid_chunk_header, term()}}. +chunk_iterator(#?MODULE{cfg = #cfg{}, + mode = #read{type = RType, + chunk_selector = Selector} + } = State0, CreditHint) + when (is_integer(CreditHint) andalso CreditHint > 0) orelse + is_atom(CreditHint) -> + %% reads the next chunk of unparsed chunk data + case catch read_header0(State0) of + {ok, + #{type := ChType, + chunk_id := ChId, + crc := Crc, + num_entries := NumEntries, + num_records := NumRecords, + data_size := DataSize, + filter_size := FilterSize, + position := Pos, + next_position := NextPos} = Header, + #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State1} -> + State = State1#?MODULE{mode = Read#read{next_offset = ChId + NumRecords, + position = NextPos}}, + case needs_handling(RType, Selector, ChType) of + true -> + DataPos = Pos + ?HEADER_SIZE_B + FilterSize, + Data = iter_read_ahead(Fd, DataPos, ChId, Crc, CreditHint, + DataSize, NumEntries), + Iterator = #iterator{fd = Fd, + data = Data, + next_offset = ChId, + num_left = NumEntries, + next_record_pos = DataPos}, + {ok, Header, Iterator, State}; + false -> + %% skip + chunk_iterator(State, CreditHint) + end; + Other -> + Other + end. + +-spec iterator_next(chunk_iterator()) -> + end_of_chunk | {offset_entry(), chunk_iterator()}. +iterator_next(#iterator{num_left = 0}) -> + end_of_chunk; +iterator_next(#iterator{fd = Fd, + next_offset = NextOffs, + num_left = Num, + data = ?REC_MATCH_SIMPLE(Len, Rem0), + next_record_pos = Pos} = I0) -> + {Record, Rem} = + case Rem0 of + <> -> + {Record0, Rem1}; + _ -> + %% not enough in Rem0 to read the entire record + %% so we need to read it from disk + {ok, <>} = + file:pread(Fd, Pos + ?REC_HDR_SZ_SIMPLE_B, + Len + ?ITER_READ_AHEAD_B), + {Record0, Rem1} + end, + + I = I0#iterator{next_offset = NextOffs + 1, + num_left = Num - 1, + data = Rem, + next_record_pos = Pos + ?REC_HDR_SZ_SIMPLE_B + Len}, + {{NextOffs, Record}, I}; +iterator_next(#iterator{fd = Fd, + next_offset = NextOffs, + num_left = Num, + data = ?REC_MATCH_SUBBATCH(CompType, NumRecs, + UncompressedLen, + Len, Rem0), + next_record_pos = Pos} = I0) -> + {Data, Rem} = + case Rem0 of + <> -> + {Record0, Rem1}; + _ -> + %% not enough in Rem0 to read the entire record + %% so we need to read it from disk + {ok, <>} = + file:pread(Fd, Pos + ?REC_HDR_SZ_SUBBATCH_B, + Len + ?ITER_READ_AHEAD_B), + {Record0, Rem1} + end, + Record = {batch, NumRecs, CompType, UncompressedLen, Data}, + I = I0#iterator{next_offset = NextOffs + NumRecs, + num_left = Num - 1, + data = Rem, + next_record_pos = Pos + ?REC_HDR_SZ_SUBBATCH_B + Len}, + {{NextOffs, Record}, I}; +iterator_next(#iterator{fd = Fd, + next_record_pos = Pos} = I) -> + {ok, Data} = file:pread(Fd, Pos, ?ITER_READ_AHEAD_B), + iterator_next(I#iterator{data = Data}). + -spec read_chunk(state()) -> {ok, binary(), state()} | {end_of_stream, state()} | @@ -1410,42 +1545,30 @@ read_chunk_parsed(State) -> {ok, header_map(), [record()], state()} | {end_of_stream, state()} | {error, {invalid_chunk_header, term()}}. -read_chunk_parsed(#?MODULE{mode = #read{type = RType, - chunk_selector = Selector}} = State0, - HeaderOrNot) -> - %% reads the next chunk of entries, parsed - %% NB: this may return records before the requested index, - %% that is fine - the reading process can do the appropriate filtering - case catch read_header0(State0) of - {ok, - #{type := ChType, - chunk_id := ChId, - crc := Crc, - num_records := NumRecords, - data_size := DataSize, - position := Pos, - filter_size := FilterSize, - next_position := NextPos} = Header, - #?MODULE{fd = Fd, mode = #read{next_offset = _ChId} = Read} = State1} -> - {ok, Data} = file:pread(Fd, Pos + ?HEADER_SIZE_B + FilterSize, DataSize), - validate_crc(ChId, Crc, Data), - State = State1#?MODULE{mode = Read#read{next_offset = ChId + NumRecords, - position = NextPos}}, - - case needs_handling(RType, Selector, ChType) of - true when HeaderOrNot == no_header -> - %% parse data into records - {parse_records(ChId, Data, []), State}; - true -> - {ok, Header, parse_records(ChId, Data, []), State}; - false -> - %% skip - read_chunk_parsed(State, HeaderOrNot) - end; - Ret -> - Ret +read_chunk_parsed(#?MODULE{mode = #read{}} = State0, + HeaderOrNot) -> + %% the Header parameter isn't used anywhere in RabbitMQ so is ignored + case chunk_iterator(State0, all) of + {end_of_stream, _} = Eos -> + Eos; + {ok, _H, I0, State1} when HeaderOrNot == no_header -> + Records = iter_all_records(iterator_next(I0), []), + {Records, State1}; + {ok, Header, I0, State1} when HeaderOrNot == with_header -> + Records = iter_all_records(iterator_next(I0), []), + {ok, Header, Records, State1} end. +iter_all_records(end_of_chunk, Acc) -> + lists:reverse(Acc); +iter_all_records({{ChId, {batch, _Num, 0, _Size, Data}}, I}, Acc0) -> + %% TODO validate that sub batch is correct + Acc = parse_subbatch(ChId, Data, Acc0), + iter_all_records(iterator_next(I), Acc); +iter_all_records({X, I}, Acc0) -> + Acc = [X | Acc0], + iter_all_records(iterator_next(I), Acc). + is_valid_chunk_on_disk(SegFile, Pos) -> %% read a chunk from a specified location in the segment %% then checks the CRC @@ -1625,42 +1748,17 @@ next_chunk_pos(Fd, Pos) -> _Reserved:24>>} = file:pread(Fd, Pos, ?HEADER_SIZE_B), Pos + ?HEADER_SIZE_B + FSize + Size + TSize. -parse_records(_Offs, <<>>, Acc) -> - %% TODO: this could probably be changed to body recursive - lists:reverse(Acc); -parse_records(Offs, - <<0:1, %% simple - Len:31/unsigned, - Data:Len/binary, - Rem/binary>>, - Acc) -> - parse_records(Offs + 1, Rem, [{Offs, Data} | Acc]); -parse_records(Offs, - <<1:1, %% simple - 0:3/unsigned, %% compression type - _:4/unsigned, %% reserved - NumRecs:16/unsigned, - _UncompressedLen:32/unsigned, - Len:32/unsigned, - Data:Len/binary, - Rem/binary>>, - Acc) -> - Recs = parse_records(Offs, Data, []), - parse_records(Offs + NumRecs, Rem, lists:reverse(Recs) ++ Acc); -parse_records(Offs, - <<1:1, %% simple - CompType:3/unsigned, %% compression type - _:4/unsigned, %% reserved - NumRecs:16/unsigned, - UncompressedLen:32/unsigned, - Len:32/unsigned, - Data:Len/binary, - Rem/binary>>, - Acc) -> - %% return the first offset of the sub batch and the batch, unparsed - %% as we don't want to decompress on the server - parse_records(Offs + NumRecs, Rem, - [{Offs, {batch, NumRecs, CompType, UncompressedLen, Data}} | Acc]). + +parse_subbatch(_Offs, <<>>, Acc) -> + Acc; +parse_subbatch(Offs, + <<0:1, %% simple + Len:31/unsigned, + Data:Len/binary, + Rem/binary>>, + Acc) -> + parse_subbatch(Offs + 1, Rem, [{Offs, Data} | Acc]). + sorted_index_files(#{index_files := IdxFiles}) -> %% cached @@ -3000,7 +3098,23 @@ dump_crc_check(Fd) -> dump_crc_check(Fd) end. - +iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, 1, _DataSize, _NumEntries) -> + %% no point reading ahead if there is only one entry to be read at this + %% time + undefined; +iter_read_ahead(Fd, Pos, ChunkId, Crc, Credit, DataSize, NumEntries) + when Credit == all orelse NumEntries == 1 -> + {ok, Data} = file:pread(Fd, Pos, DataSize), + validate_crc(ChunkId, Crc, Data), + Data; +iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries) -> + %% read ahead, assumes roughly equal entry sizes which may not be the case + %% TODO round up to nearest block? + %% We can only practically validate CRC if we read the whole data + Credit = min(Credit0, NumEntries), + Size = DataSize div NumEntries * Credit, + {ok, Data} = file:pread(Fd, Pos, Size + ?ITER_READ_AHEAD_B), + Data. -ifdef(TEST). diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 498ad3c..12f1204 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -36,6 +36,8 @@ all_tests() -> write_batch_with_filters_variable_size, subbatch, subbatch_compressed, + iterator_read_chunk, + iterator_read_chunk_mixed_sizes_with_credit, read_chunk_parsed, read_chunk_parsed_2, read_chunk_parsed_multiple_chunks, @@ -86,6 +88,18 @@ all_tests() -> overview ]. +-define(assertMatch_(Guard, Expr), + begin + case (Expr) of + Guard -> ok; + X__V -> erlang:error({assertMatch, + [{module, ?MODULE}, + {line, ?LINE}, + {expression, (??Expr)}, + {pattern, (??Guard)}, + {value, X__V}]}) + end + end). groups() -> @@ -303,6 +317,52 @@ subbatch_compressed(Config) -> osiris_log:close(R1), ok. +iterator_read_chunk(Config) -> + Conf = ?config(osiris_conf, Config), + S0 = osiris_log:init(Conf), + Shared = osiris_log:get_shared(S0), + RConf = Conf#{shared => Shared}, + {ok, R0} = osiris_log:init_offset_reader(0, RConf), + {end_of_stream, R1} = osiris_log:chunk_iterator(R0), + IOData = <<0:1, 2:31/unsigned, "hi", 0:1, 2:31/unsigned, "h0">>, + CompType = 0, %% no compression + Batch = {batch, 2, CompType, byte_size(IOData), IOData}, + EntriesRev = [Batch, + <<"ho">>, + {<<"filter">>, <<"hi">>}], + {ChId, _S1} = write_committed(EntriesRev, S0), + {ok, _H, I0, _R2} = osiris_log:chunk_iterator(R1), + HoOffs = ChId + 1, + BatchOffs = ChId + 2, + {{ChId, <<"hi">>}, I1} = osiris_log:iterator_next(I0), + {{HoOffs, <<"ho">>}, I2} = osiris_log:iterator_next(I1), + {{BatchOffs, Batch}, I} = osiris_log:iterator_next(I2), + ?assertMatch(end_of_chunk, osiris_log:iterator_next(I)), + ok. + +iterator_read_chunk_mixed_sizes_with_credit(Config) -> + Conf = ?config(osiris_conf, Config), + S0 = osiris_log:init(Conf), + Shared = osiris_log:get_shared(S0), + RConf = Conf#{shared => Shared}, + {ok, R0} = osiris_log:init_offset_reader(0, RConf), + {end_of_stream, R1} = osiris_log:chunk_iterator(R0), + Big = crypto:strong_rand_bytes(100_000), + EntriesRev = [Big, + <<"ho">>, + {<<"filter">>, <<"hi">>}], + {ChId, _S1} = write_committed(EntriesRev, S0), + %% this is a less than ideal case where we have one large and two very + %% small entries inthe same batch. The read ahead only + {ok, _H, I0, _R2} = osiris_log:chunk_iterator(R1, 2), + HoOffs = ChId + 1, + BigOffs = ChId + 2, + {{ChId, <<"hi">>}, I1} = osiris_log:iterator_next(I0), + {{HoOffs, <<"ho">>}, I2} = osiris_log:iterator_next(I1), + {{BigOffs, Big}, I} = osiris_log:iterator_next(I2), + ?assertMatch(end_of_chunk, osiris_log:iterator_next(I)), + ok. + read_chunk_parsed(Config) -> Conf = ?config(osiris_conf, Config), S0 = osiris_log:init(Conf), @@ -1702,3 +1762,13 @@ make_trailer(Type, K, V) -> set_shared(#{shared := Ref}, Value) -> osiris_log_shared:set_committed_chunk_id(Ref, Value). + +%% writes and commits the chunk +write_committed(Entries, S0) -> + ChId = osiris_log:next_offset(S0), + S = osiris_log:write(Entries, S0), + Shared = osiris_log:get_shared(S0), + osiris_log_shared:set_committed_chunk_id(Shared, ChId), + osiris_log_shared:set_last_chunk_id(Shared, ChId), + {ChId, S}. +