Skip to content

Commit

Permalink
Add chunk iterator API to osiris_log
Browse files Browse the repository at this point in the history
To allow readers to read chunks incrementally rather than in one go.

osiris_log:read_chunk_parsed/2 has been updated to use the chunk
iterator API as well but with a full chunk read ahead configuration.

osiris_log:chunk_iterator/2 tries to estimate a read ahead amount based
on the "credit hint" that can be passed to indicate how many entries
are likely to be read in one go.
  • Loading branch information
kjnilsson committed Oct 24, 2023
1 parent 755e0d7 commit bdc0848
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 73 deletions.
5 changes: 4 additions & 1 deletion src/osiris.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
batch() |
{filter_value(), iodata() | batch()}.

%% returned when reading
-type entry() :: binary() | batch().
-type reader_options() :: #{transport => tcp | ssl,
chunk_selector => all | user_data,
filter_spec => osiris_bloom:filter_spec()
Expand All @@ -89,7 +91,8 @@
retention_spec/0,
timestamp/0,
writer_id/0,
data/0]).
data/0,
entry/0]).

-spec start_cluster(config()) ->
{ok, config()} |
Expand Down
258 changes: 186 additions & 72 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
init_data_reader/2,
init_offset_reader/2,
read_header/1,
chunk_iterator/1,
chunk_iterator/2,
iterator_next/1,
read_chunk/1,
read_chunk_parsed/1,
read_chunk_parsed/2,
Expand Down Expand Up @@ -375,7 +378,8 @@
index_files => [file:filename_all()],
filter_size => osiris_bloom:filter_size()
}.
-type record() :: {offset(), osiris:data()}.
-type record() :: {offset(), osiris:entry()}.
-type offset_entry() :: {offset(), osiris:entry()}.
-type offset_spec() :: osiris:offset_spec().
-type retention_spec() :: osiris:retention_spec().
-type header_map() ::
Expand Down Expand Up @@ -452,6 +456,7 @@
-opaque state() :: #?MODULE{}.

-export_type([state/0,
chunk_iterator/0,
range/0,
config/0,
counter_spec/0,
Expand Down Expand Up @@ -1364,6 +1369,136 @@ read_header(#?MODULE{cfg = #cfg{}} = State0) ->
Err
end.

-record(iterator, {fd :: file:io_device(),
next_offset :: offset(),
%% entries left
num_left :: non_neg_integer(),
%% any trailing data from last read
%% we try to capture at least the size of the next record
data :: undefined | binary(),
next_record_pos :: non_neg_integer()}).
-opaque chunk_iterator() :: #iterator{}.
-define(REC_MATCH_SIMPLE(Len, Rem),
<<0:1, Len:31/unsigned, Rem/binary>>).
-define(REC_MATCH_SUBBATCH(CompType, NumRec, UncompLen, Len, Rem),
<<1:1, CompType:3/unsigned, _:4/unsigned,
NumRecs:16/unsigned,
UncompressedLen:32/unsigned,
Len:32/unsigned, Rem/binary>>).

-define(REC_HDR_SZ_SIMPLE_B, 4).
-define(REC_HDR_SZ_SUBBATCH_B, 11).
-define(ITER_READ_AHEAD_B, 64).


-spec chunk_iterator(state()) ->
{ok, header_map(), chunk_iterator(), state()} |
{end_of_stream, state()} |
{error, {invalid_chunk_header, term()}}.
chunk_iterator(State) ->
chunk_iterator(State, 1).

-spec chunk_iterator(state(), pos_integer() | all) ->
{ok, header_map(), chunk_iterator(), state()} |
{end_of_stream, state()} |
{error, {invalid_chunk_header, term()}}.
chunk_iterator(#?MODULE{cfg = #cfg{},
mode = #read{type = RType,
chunk_selector = Selector}
} = State0, CreditHint)
when (is_integer(CreditHint) andalso CreditHint > 0) orelse
is_atom(CreditHint) ->
%% reads the next chunk of unparsed chunk data
case catch read_header0(State0) of
{ok,
#{type := ChType,
chunk_id := ChId,
crc := Crc,
num_entries := NumEntries,
num_records := NumRecords,
data_size := DataSize,
filter_size := FilterSize,
position := Pos,
next_position := NextPos} = Header,
#?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State1} ->
State = State1#?MODULE{mode = Read#read{next_offset = ChId + NumRecords,
position = NextPos}},
case needs_handling(RType, Selector, ChType) of
true ->
DataPos = Pos + ?HEADER_SIZE_B + FilterSize,
Data = iter_read_ahead(Fd, DataPos, ChId, Crc, CreditHint,
DataSize, NumEntries),
Iterator = #iterator{fd = Fd,
data = Data,
next_offset = ChId,
num_left = NumEntries,
next_record_pos = DataPos},
{ok, Header, Iterator, State};
false ->
%% skip
chunk_iterator(State, CreditHint)
end;
Other ->
Other
end.

-spec iterator_next(chunk_iterator()) ->
end_of_chunk | {offset_entry(), chunk_iterator()}.
iterator_next(#iterator{num_left = 0}) ->
end_of_chunk;
iterator_next(#iterator{fd = Fd,
next_offset = NextOffs,
num_left = Num,
data = ?REC_MATCH_SIMPLE(Len, Rem0),
next_record_pos = Pos} = I0) ->
{Record, Rem} =
case Rem0 of
<<Record0:Len/binary, Rem1/binary>> ->
{Record0, Rem1};
_ ->
%% not enough in Rem0 to read the entire record
%% so we need to read it from disk
{ok, <<Record0:Len/binary, Rem1/binary>>} =
file:pread(Fd, Pos + ?REC_HDR_SZ_SIMPLE_B,
Len + ?ITER_READ_AHEAD_B),
{Record0, Rem1}
end,

I = I0#iterator{next_offset = NextOffs + 1,
num_left = Num - 1,
data = Rem,
next_record_pos = Pos + ?REC_HDR_SZ_SIMPLE_B + Len},
{{NextOffs, Record}, I};
iterator_next(#iterator{fd = Fd,
next_offset = NextOffs,
num_left = Num,
data = ?REC_MATCH_SUBBATCH(CompType, NumRecs,
UncompressedLen,
Len, Rem0),
next_record_pos = Pos} = I0) ->
{Data, Rem} =
case Rem0 of
<<Record0:Len/binary, Rem1/binary>> ->
{Record0, Rem1};
_ ->
%% not enough in Rem0 to read the entire record
%% so we need to read it from disk
{ok, <<Record0:Len/binary, Rem1/binary>>} =
file:pread(Fd, Pos + ?REC_HDR_SZ_SUBBATCH_B,
Len + ?ITER_READ_AHEAD_B),
{Record0, Rem1}
end,
Record = {batch, NumRecs, CompType, UncompressedLen, Data},
I = I0#iterator{next_offset = NextOffs + NumRecs,
num_left = Num - 1,
data = Rem,
next_record_pos = Pos + ?REC_HDR_SZ_SUBBATCH_B + Len},
{{NextOffs, Record}, I};
iterator_next(#iterator{fd = Fd,
next_record_pos = Pos} = I) ->
{ok, Data} = file:pread(Fd, Pos, ?ITER_READ_AHEAD_B),
iterator_next(I#iterator{data = Data}).

-spec read_chunk(state()) ->
{ok, binary(), state()} |
{end_of_stream, state()} |
Expand Down Expand Up @@ -1410,42 +1545,30 @@ read_chunk_parsed(State) ->
{ok, header_map(), [record()], state()} |
{end_of_stream, state()} |
{error, {invalid_chunk_header, term()}}.
read_chunk_parsed(#?MODULE{mode = #read{type = RType,
chunk_selector = Selector}} = State0,
HeaderOrNot) ->
%% reads the next chunk of entries, parsed
%% NB: this may return records before the requested index,
%% that is fine - the reading process can do the appropriate filtering
case catch read_header0(State0) of
{ok,
#{type := ChType,
chunk_id := ChId,
crc := Crc,
num_records := NumRecords,
data_size := DataSize,
position := Pos,
filter_size := FilterSize,
next_position := NextPos} = Header,
#?MODULE{fd = Fd, mode = #read{next_offset = _ChId} = Read} = State1} ->
{ok, Data} = file:pread(Fd, Pos + ?HEADER_SIZE_B + FilterSize, DataSize),
validate_crc(ChId, Crc, Data),
State = State1#?MODULE{mode = Read#read{next_offset = ChId + NumRecords,
position = NextPos}},

case needs_handling(RType, Selector, ChType) of
true when HeaderOrNot == no_header ->
%% parse data into records
{parse_records(ChId, Data, []), State};
true ->
{ok, Header, parse_records(ChId, Data, []), State};
false ->
%% skip
read_chunk_parsed(State, HeaderOrNot)
end;
Ret ->
Ret
read_chunk_parsed(#?MODULE{mode = #read{}} = State0,
HeaderOrNot) ->
%% the Header parameter isn't used anywhere in RabbitMQ so is ignored
case chunk_iterator(State0, all) of
{end_of_stream, _} = Eos ->
Eos;
{ok, _H, I0, State1} when HeaderOrNot == no_header ->
Records = iter_all_records(iterator_next(I0), []),
{Records, State1};
{ok, Header, I0, State1} when HeaderOrNot == with_header ->
Records = iter_all_records(iterator_next(I0), []),
{ok, Header, Records, State1}
end.

iter_all_records(end_of_chunk, Acc) ->
lists:reverse(Acc);
iter_all_records({{ChId, {batch, _Num, 0, _Size, Data}}, I}, Acc0) ->
%% TODO validate that sub batch is correct
Acc = parse_subbatch(ChId, Data, Acc0),
iter_all_records(iterator_next(I), Acc);
iter_all_records({X, I}, Acc0) ->
Acc = [X | Acc0],
iter_all_records(iterator_next(I), Acc).

is_valid_chunk_on_disk(SegFile, Pos) ->
%% read a chunk from a specified location in the segment
%% then checks the CRC
Expand Down Expand Up @@ -1625,42 +1748,17 @@ next_chunk_pos(Fd, Pos) ->
_Reserved:24>>} = file:pread(Fd, Pos, ?HEADER_SIZE_B),
Pos + ?HEADER_SIZE_B + FSize + Size + TSize.

parse_records(_Offs, <<>>, Acc) ->
%% TODO: this could probably be changed to body recursive
lists:reverse(Acc);
parse_records(Offs,
<<0:1, %% simple
Len:31/unsigned,
Data:Len/binary,
Rem/binary>>,
Acc) ->
parse_records(Offs + 1, Rem, [{Offs, Data} | Acc]);
parse_records(Offs,
<<1:1, %% simple
0:3/unsigned, %% compression type
_:4/unsigned, %% reserved
NumRecs:16/unsigned,
_UncompressedLen:32/unsigned,
Len:32/unsigned,
Data:Len/binary,
Rem/binary>>,
Acc) ->
Recs = parse_records(Offs, Data, []),
parse_records(Offs + NumRecs, Rem, lists:reverse(Recs) ++ Acc);
parse_records(Offs,
<<1:1, %% simple
CompType:3/unsigned, %% compression type
_:4/unsigned, %% reserved
NumRecs:16/unsigned,
UncompressedLen:32/unsigned,
Len:32/unsigned,
Data:Len/binary,
Rem/binary>>,
Acc) ->
%% return the first offset of the sub batch and the batch, unparsed
%% as we don't want to decompress on the server
parse_records(Offs + NumRecs, Rem,
[{Offs, {batch, NumRecs, CompType, UncompressedLen, Data}} | Acc]).

parse_subbatch(_Offs, <<>>, Acc) ->
Acc;
parse_subbatch(Offs,
<<0:1, %% simple
Len:31/unsigned,
Data:Len/binary,
Rem/binary>>,
Acc) ->
parse_subbatch(Offs + 1, Rem, [{Offs, Data} | Acc]).


sorted_index_files(#{index_files := IdxFiles}) ->
%% cached
Expand Down Expand Up @@ -3000,7 +3098,23 @@ dump_crc_check(Fd) ->
dump_crc_check(Fd)
end.


iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, 1, _DataSize, _NumEntries) ->
%% no point reading ahead if there is only one entry to be read at this
%% time
undefined;
iter_read_ahead(Fd, Pos, ChunkId, Crc, Credit, DataSize, NumEntries)
when Credit == all orelse NumEntries == 1 ->
{ok, Data} = file:pread(Fd, Pos, DataSize),
validate_crc(ChunkId, Crc, Data),
Data;
iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries) ->
%% read ahead, assumes roughly equal entry sizes which may not be the case
%% TODO round up to nearest block?
%% We can only practically validate CRC if we read the whole data
Credit = min(Credit0, NumEntries),
Size = DataSize div NumEntries * Credit,
{ok, Data} = file:pread(Fd, Pos, Size + ?ITER_READ_AHEAD_B),
Data.

-ifdef(TEST).

Expand Down
Loading

0 comments on commit bdc0848

Please sign in to comment.