Skip to content

Commit

Permalink
Non tip-isa-batch changes from #251
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Oct 21, 2020
1 parent f4f1e1a commit 539e7ef
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 146 deletions.
67 changes: 32 additions & 35 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@ Store | Holds Events for a
Stream | Ordered sequence of Events in a Store
Synchronous Query | Consistent read direct from Stream State (breaking CQRS and coupling implementation to the State used to support the Decision process). See CQRS, Query, Reactions

## CosmosDb
## CosmosDB

Term | Description
-------------------------|------------
Change Feed | set of query patterns enabling the running of continuous queries reading Items (documents) in a Range (physical partition) in order of their last update
Change Feed Processor | Library from Microsoft exposing facilities to Project from a Change Feed, maintaining Offsets per Range of the Monitored Container in a Lease Container
Container | logical space in a CosmosDb holding [loosely] related Items (aka Documents). Items bear logical Partition Keys. Formerly Collection. Can be allocated Request Units.
Container | logical space in a CosmosDB holding [loosely] related Items (aka Documents). Items bear logical Partition Keys. Formerly Collection. Can be allocated Request Units.
CosmosDB | Microsoft Azure's managed document database system
Database | Group of Containers. Can be allocated Request Units.
DocumentDb | Original offering of CosmosDB, now entitled the SQL Query Model, `Microsoft.Azure.DocumentDb.Client[.Core]`
Expand Down Expand Up @@ -485,7 +485,7 @@ events on a given category of stream:
When using a Store with support for synchronous unfolds and/or snapshots, one
will typically implement two further functions in order to avoid having every
`'event` in the stream be loaded and processed in order to build the `'state`
per Decision or Query (versus a single cheap point read from CosmosDb to read
per Decision or Query (versus a single cheap point read from CosmosDB to read
the _tip_):

- `isOrigin: 'event -> bool`: predicate indicating whether a given `'event` is
Expand Down Expand Up @@ -519,7 +519,7 @@ When running a decision process, we have the following stages:

b. if there is a conflict, obtain the conflicting events [that other
writers have produced] since the Position used in step 1, `fold` them into
our `state`, and go back to 2 (aside: the CosmosDb stored procedure can
our `state`, and go back to 2 (aside: the CosmosDB stored procedure can
send them back immediately at zero cost or latency, and there is [a
proposal for EventStore to afford the same
facility](https://github.com/EventStore/EventStore/issues/1652))
Expand Down Expand Up @@ -1457,7 +1457,7 @@ Key aspects relevant to the Equinox programming model:

- Projections can be managed by either tailing streams (including the synthetic
`$all` stream) or using the Projections facility - there's no obvious reason
to wrap it, aside from being able to uniformly target CosmosDb (i.e. one
to wrap it, aside from being able to uniformly target CosmosDB (i.e. one
could build an `Equinox.EventStore.Projection` library and an `eqx project
stats es` with very little code).

Expand All @@ -1478,16 +1478,16 @@ Key aspects relevant to the Equinox programming model:
that json (presented as UTF-8 byte arrays) is a good default for reasons of
interoperability (the projections facility also strongly implies json)

### Azure CosmosDb concerns
### Azure CosmosDB concerns

TL;DR caching can optimize RU consumption significantly. Due to the intrinsic
ability to mutate easily, the potential to integrate rolling snapshots into
core storage is clear. Providing ways to cache and snapshot matter a lot on
CosmosDb, as lowest-common-denominator queries loading lots of events cost in
CosmosDB, as lowest-common-denominator queries loading lots of events cost in
performance and cash. The specifics of how you use the changefeed matters more
than one might thing from the CosmosDb high level docs.
than one might thing from the CosmosDB high level docs.

Overview: CosmosDb has been in production for >5 years and is a mature Document
Overview: CosmosDB has been in production for >5 years and is a mature Document
database. The initial DocumentDb offering is at this point a mere projected
programming model atop a generic Document data store. Its changefeed mechanism
affords a base upon which one can manage projections, but there is no directly
Expand All @@ -1497,7 +1497,7 @@ consumer offsets in the store itself).

Key aspects relevant to the Equinox programming model:

- CosmosDb has pervasive optimization feedback per call in the form of a
- CosmosDB has pervasive optimization feedback per call in the form of a
Request Charge attached to each and every action. Working to optimize one's
request charges per scenario is critical both in terms of the effect it has
on the amount of Request Units/s one you need to pre-provision (which
Expand All @@ -1510,7 +1510,7 @@ Key aspects relevant to the Equinox programming model:
plus a price per KB and are optimal. Queries, even ones returning that same
single document, have significant overhead and hence are to be avoided

- One key mechanism CosmosDb provides to allow one to work efficiently is that
- One key mechanism CosmosDB provides to allow one to work efficiently is that
any point-read request where one supplies a valid `etag` is charged at 1 RU,
regardless of the size one would be transferring in the case of a cache miss
(the other key benefit of using this is that it avoids unnecessarily clogging
Expand All @@ -1519,12 +1519,12 @@ Key aspects relevant to the Equinox programming model:
- Indexing things surfaces in terms of increased request charges; at scale,
each indexing hence needs to be justified

- Similarly to EventStore, the default ARS encoding CosmosDb provides, together
- Similarly to EventStore, the default ARS encoding CosmosDB provides, together
with interoperability concerns, means that straight json makes sense as an
encoding form for events (UTF-8 arrays)

- Collectively, the above implies (arguably counter-intuitively) that using the
powerful generic querying facility that CosmosDb provides should actually be
powerful generic querying facility that CosmosDB provides should actually be
a last resort.

- See [Cosmos Storage Model](#cosmos-storage-model) for further information on
Expand Down Expand Up @@ -1575,11 +1575,11 @@ Events are stored in immutable batches consisting of:
- `n`extIndex: `int64` // base index ('i') position value of the next record in
the stream - NB this _always_ corresponds to `i`+`e.length` (in the case of
the `Tip` record, there won't actually be such a record yet)
- `id`: `string` // same as `i` (CosmosDb forces every item (document) to have one[, and it must be a `string`])
- `id`: `string` // same as `i` (CosmosDB forces every item (document) to have one[, and it must be a `string`])
- `e`vents: `Event[]` // (see next section) typically there is one item in the
array (can be many if events are small, for RU and performance/efficiency
reasons; RU charges are per 1024 byte block)
- `ts` // CosmosDb-intrinsic last updated date for this record (changes when
- `ts` // CosmosDB-intrinsic last updated date for this record (changes when
replicated etc, hence see `t` below)

## Events
Expand All @@ -1588,7 +1588,7 @@ Per `Event`, we have the following:

- `c`ase - the case of this union in the Discriminated Union of Events this
stream bears (aka Event Type)
- `d`ata - json data (CosmosDb maintains it as actual json; you are free to
- `d`ata - json data (CosmosDB maintains it as actual json; you are free to
index it and/or query based on that if desired)
- `m`etadata - carries ancillary information for an event; also json
- `t` - creation timestamp
Expand All @@ -1601,7 +1601,7 @@ Batch (`Tip` *isa* `Batch`), adding the following:

- `id`: always `-1` so one can reference it in a point-read GET request and not
pay the cost and latency associated with a full indexed query
- `_etag`: CosmosDb-managed field updated per-touch (facilitates `NotModified`
- `_etag`: CosmosDB-managed field updated per-touch (facilitates `NotModified`
result, see below)
- `u`: Array of _unfold_ed events based on a point-in-time _state_ (see _State,
Snapshots, Events and Unfolds_, _Unfolded Events_ and `unfold` in the
Expand Down Expand Up @@ -1754,36 +1754,34 @@ stream). The request includes the following elements:
- `expectedVersion`: the position the requester has based their [proposed]
events on (no,
[providing an `etag` to save on Request Charges is not possible in the Stored Proc](https://stackoverflow.com/questions/53355886/azure-cosmosdb-stored-procedure-ifmatch-predicate))
- the `expectedEtag` enables competing writers to
maintain and update `u`nfold data in a consistent fashion (backing off and
retrying in the case of conflict, _without any events being written per state
change_) (See `AccessStrategy.RollingState`, `AccessStrategy.Custom`)
- `e`: array of Events (see Event, above) to append if, and only if, the
expectedVersion check is fulfilled
- `u`: array of `unfold`ed events (aka snapshots) that supersede items with
equivalent `c`ase values
- `maxEvents`: the maximum number of events in an individual batch prior to
starting a new one. For example:
- `maxEvents`: the maximum number of events permitted to be retained in the Tip (subject to that not exceeding the `maxStringifyLen` rule). For example:

- if `e` contains 2 events, the _tip_ document's `e` has 2 events and the
`maxEvents` is `5`, the events get appended onto the tip
- if the total length including the new `e`vents would exceed `maxEvents`,
the Tip is 'renamed' (gets its `id` set to `i.toString()`) to become a
batch, and the new events go into the new Tip-Batch, the _tip_ gets frozen
as a `Batch`, and the new request becomes the _tip_ (as an atomic
transaction on the server side)
`maxEvents` is `5`, the events get appended onto the tip's `e`vents
- if the total length including the new `e`vents would exceed `maxEvents`, the Tip is 'renamed' (gets its `id` set to `i.toString()`) to become a
batch (with the new `e`vents included in that calved Batch), and the new Tip
has a zero-length `e`vents array, and a set of `u`nfolds
(as an atomic transaction on the server side)

- (PROPOSAL/FUTURE) `thirdPartyUnfoldRetention`: how many events to keep before
the base (`i`) of the batch if required by lagging `u`nfolds which would
otherwise fall out of scope as a result of the appends in this batch (this
will default to `0`, so for example if a writer says maxEvents `10` and there
is an `u`nfold based on an event more than `10` old it will be removed as
part of the appending process)
- (PROPOSAL/FUTURE): adding an `expectedEtag` would enable competing writers to
maintain and update `u`nfold data in a consistent fashion (backing off and
retrying in the case of conflict, _without any events being written per state
change_)

## Equinox.CosmosStore.Core.Events

The `Equinox.CosmosStore.Core` namespace provides a lower level API that can be used
to manipulate events stored within a Azure CosmosDb using optimized native
to manipulate events stored within a Azure CosmosDB using optimized native
access patterns.

The higher level APIs (i.e. not `Core`), as demonstrated by the `dotnet new`
Expand Down Expand Up @@ -1816,7 +1814,7 @@ type EventData with
static member FromT eventType value =
EventData.FromUtf8Bytes(eventType, Json.toBytes value)
// Load connection sring from your Key Vault (example here is the CosmosDb
// Load connection sring from your Key Vault (example here is the CosmosDB
// simulator's well known key)
let connectionString : string =
"AccountEndpoint=https://localhost:8081;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;"
Expand All @@ -1832,9 +1830,8 @@ let gatewayLog =
let factory : Equinox.CosmosStore.CosmosStoreClientFactory =
CosmosStoreClientFactory(
requestTimeout = TimeSpan.FromSeconds 5.,
maxRetryAttemptsOnThrottledRequests = 1,
maxRetryWaitTimeInSeconds = 3,
log = gatewayLog)
maxRetryAttemptsOnRateLimitedRequests = 1,
maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromSeconds 3.)
let client =
factory.Create("Application.CommandProcessor", Discovery.FromConnectionString connectionString)
|> Async.RunSynchronously
Expand Down Expand Up @@ -2262,7 +2259,7 @@ rich relative to the need of consumers to date. Some things remain though:
- performance, efficiency and concurrency improvements based on
[`tip-isa-batch`](https://github.com/jet/equinox/tree/tip-isa-batch) schema
generalization [#109](https://github.com/jet/equinox/issues/109)
- performance improvements in loading logic
- low level performance improvements in loading logic (reducing allocations etc)

## Wouldn't it be nice - `Equinox.DynamoDb`

Expand Down
Loading

0 comments on commit 539e7ef

Please sign in to comment.