From c80fb66dc1ec27cb719dcd8f82a8504921c2ac82 Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Wed, 4 Dec 2024 10:22:58 +0100 Subject: [PATCH] RS-543: Support conditional query (#97) * intregrate when condition to Bucket.removeQuery * suport when condition in Bucket.query method * update CHANGELOG --- CHANGELOG.md | 4 ++ README.md | 4 +- src/Bucket.ts | 84 ++++++++++++++++++++++-------------- src/index.ts | 3 +- src/messages/QueryEntry.ts | 88 ++++++++++++++++++++++++++++++++++++++ test/Bucket.test.ts | 74 ++++++++++++++++++++++++++------ 6 files changed, 207 insertions(+), 50 deletions(-) create mode 100644 src/messages/QueryEntry.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index debf070..9aeba71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added: + +- RS-543: Support conditional query, [PR-97](https://github.com/reductstore/reduct-js/pull/97) + ## [1.12.0] - 2024-10-04 ### Added: diff --git a/README.md b/README.md index 7d527cf..2a58637 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ data stored in ReductStore. ## Features - Promise-based API for easy asynchronous programming -- Support for [ReductStore HTTP API v1.12](https://www.reduct.store/docs/http-api) +- Support for [ReductStore HTTP API v1.13](https://www.reduct.store/docs/http-api) - Token-based authentication for secure access to the database - Labeling for read-write operations and querying - Batch operations for efficient data processing @@ -63,4 +63,4 @@ for await (const record of bucket.query( } ``` -For more examples, see the [Guides](https://reduct.store/docs/guides) section in the ReductStore documentation. +For more examples, see the [Guides](https://www.reduct.store/docs/guides) section in the ReductStore documentation. diff --git a/src/Bucket.ts b/src/Bucket.ts index 99941b1..ac2082f 100644 --- a/src/Bucket.ts +++ b/src/Bucket.ts @@ -9,21 +9,7 @@ import Stream, { Readable } from "stream"; import { Buffer } from "buffer"; import { Batch, BatchType } from "./Batch"; import { isCompatibale } from "./Client"; - -/** - * Options for querying records - */ -export interface QueryOptions { - ttl?: number; // Time to live in seconds - include?: LabelMap; // include only record which have all these labels with the same value - exclude?: LabelMap; // exclude record which have all these labels with the same value - eachS?: number; // return only one record per S second - eachN?: number; // return each N-th record - limit?: number; // limit number of records - continuous?: boolean; // await for new records - pollInterval?: number; // interval for polling new records (only for continue=true) - head?: boolean; // return only head of the record -} +import { QueryOptions, QueryType } from "./messages/QueryEntry"; /** * Options for writing records @@ -142,20 +128,28 @@ export class Bucket { * @param entry {string} name of the entry * @param start {BigInt} start point of the time period, if undefined, the query starts from the first record * @param stop {BigInt} stop point of the time period. If undefined, the query stops at the last record - * @param QueryOptions {QueryOptions} options for query. You can use only include, exclude, eachS, eachN other options are ignored + * @param options {QueryOptions} options for query. You can use only include, exclude, eachS, eachN other options are ignored */ async removeQuery( entry: string, start?: bigint, stop?: bigint, - QueryOptions?: QueryOptions, + options?: QueryOptions, ): Promise { - const ret = this.parse_query_params(start, stop, QueryOptions); + if (options !== undefined && options.when !== undefined) { + const { data } = await this.httpClient.post( + `/b/${this.name}/${entry}/q`, + QueryOptions.serialize(QueryType.REMOVE, options), + ); + return Promise.resolve(data["removed_records"]); + } else { + const ret = this.parse_query_params(start, stop, options); - const { data } = await this.httpClient.delete( - `/b/${this.name}/${entry}/q?${ret.query}`, - ); - return Promise.resolve(data["removed_records"]); + const { data } = await this.httpClient.delete( + `/b/${this.name}/${entry}/q?${ret.query}`, + ); + return Promise.resolve(data["removed_records"]); + } } /** @@ -294,27 +288,51 @@ export class Bucket { stop?: bigint, options?: number | QueryOptions, ): AsyncGenerator { - const ret = this.parse_query_params(start, stop, options); + let id; + let header_api_version; + let continuous = false; + let pollInterval = 1; + let head = false; + if ( + options !== undefined && + typeof options === "object" && + "when" in options + ) { + const { data, headers } = await this.httpClient.post( + `/b/${this.name}/${entry}/q`, + QueryOptions.serialize(QueryType.QUERY, options), + ); + ({ id } = data); + header_api_version = headers["x-reduct-api"]; + continuous = options.continuous ?? false; + pollInterval = options.pollInterval ?? 1; + head = options.head ?? false; + } else { + // TODO: remove this block after 1.xx + const ret = this.parse_query_params(start, stop, options); + + const url = `/b/${this.name}/${entry}/q?` + ret.query; + const { data, headers } = await this.httpClient.get(url); + ({ id } = data); + header_api_version = headers["x-reduct-api"]; + ({ continuous, pollInterval, head } = ret); + } - const url = `/b/${this.name}/${entry}/q?` + ret.query; - const { data, headers } = await this.httpClient.get(url); - const { id } = data; - const header_api_version = headers["x-reduct-api"]; if (isCompatibale("1.5", header_api_version) && !this.isBrowser) { yield* this.fetchAndParseBatchedRecords( entry, id, - ret.continuous, - ret.pollInterval, - ret.head, + continuous, + pollInterval, + head, ); } else { yield* this.fetchAndParseSingleRecord( entry, id, - ret.continuous, - ret.pollInterval, - ret.head, + continuous, + pollInterval, + head, ); } } diff --git a/src/index.ts b/src/index.ts index 08a1542..68d832f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,5 @@ import { Client } from "./Client"; -import { Bucket, QueryOptions, WriteOptions } from "./Bucket"; +import { Bucket, WriteOptions } from "./Bucket"; import { APIError } from "./APIError"; import { ServerInfo, LicenseInfo } from "./messages/ServerInfo"; import { BucketSettings, QuotaType } from "./messages/BucketSettings"; @@ -10,6 +10,7 @@ import { ReplicationInfo } from "./messages/ReplicationInfo"; import { ReplicationSettings } from "./messages/ReplicationSettings"; import { FullReplicationInfo } from "./messages/ReplicationInfo"; import { Batch } from "./Batch"; +import { QueryOptions } from "./messages/QueryEntry"; export { Client, diff --git a/src/messages/QueryEntry.ts b/src/messages/QueryEntry.ts new file mode 100644 index 0000000..8f91192 --- /dev/null +++ b/src/messages/QueryEntry.ts @@ -0,0 +1,88 @@ +import { LabelMap } from "../Record"; + +export enum QueryType { + QUERY, + REMOVE, +} + +export interface QueryEntry { + query_type: string; + + /** Start query from (Unix timestamp in microseconds) */ + start?: number; + /** Stop query at (Unix timestamp in microseconds) */ + stop?: number; + + /** Include records with label */ + include?: Record; + /** Exclude records with label */ + exclude?: Record; + /** Return a record every S seconds */ + each_s?: number; + /** Return a record every N records */ + each_n?: number; + /** Limit the number of records returned */ + limit?: number; + + /** TTL of query in seconds */ + ttl?: number; + /** Retrieve only metadata */ + only_metadata?: boolean; + /** Continuous query, it doesn't stop until the TTL is reached */ + continuous?: boolean; + + /** Conditional query */ + when?: any; + /** Strict conditional query + * If true, the query returns an error if any condition cannot be evaluated + */ + strict?: boolean; +} + +/** + * Options for querying records + */ +export class QueryOptions { + /** Time to live in seconds */ + ttl?: number; + /** Include records with label + * @deprecated: use when instead + * */ + include?: LabelMap; + /** Exclude records with label + * @deprecated: use when instead + * */ + exclude?: LabelMap; + /** Return only one record per S second */ + eachS?: number; + /** Return only one record per N records */ + eachN?: number; + /** Limit number of records */ + limit?: number; + /** Don't stop query until TTL is reached */ + continuous?: boolean; + /** Poll interval for new records only for continue=true */ + pollInterval?: number; + /** Return only metadata */ + head?: boolean; + /** Conditional query */ + when?: Record; + /** strict conditional query */ + strict?: boolean; + + static serialize(queryType: QueryType, data: QueryOptions): QueryEntry { + return { + query_type: QueryType[queryType], + ttl: data.ttl, + include: data.include as Record, + exclude: data.exclude as Record, + each_s: data.eachS, + each_n: data.eachN, + limit: data.limit, + continuous: data.continuous, + when: data.when, + strict: data.strict, + only_metadata: data.head, + }; + } +} diff --git a/test/Bucket.test.ts b/test/Bucket.test.ts index 79813c1..1d956e6 100644 --- a/test/Bucket.test.ts +++ b/test/Bucket.test.ts @@ -355,34 +355,48 @@ describe("Bucket", () => { expect(records[0].time).toEqual(2_000_000n); }); - it("should query records with labels", async () => { + it_api("1.13")("should query records with condition", async () => { const bucket: Bucket = await client.getBucket("bucket"); let record = await bucket.beginWrite("entry-labels", { - labels: { label1: "value1", label2: "value2" }, + labels: { score: 10, class: "cat" }, }); await record.write("somedata1"); record = await bucket.beginWrite("entry-labels", { - labels: { label1: "value1", label2: "value3" }, + labels: { score: 20, class: "dog" }, }); await record.write("somedata1"); - let records: ReadableRecord[] = await all( - bucket.query("entry-labels", undefined, undefined, { - include: { label1: "value1", label2: "value2" }, - }), - ); - expect(records.length).toEqual(1); - expect(records[0].labels).toEqual({ label1: "value1", label2: "value2" }); - - records = await all( + const records: ReadableRecord[] = await all( bucket.query("entry-labels", undefined, undefined, { - exclude: { label1: "value1", label2: "value2" }, + when: { "&score": { $gt: 10 } }, }), ); expect(records.length).toEqual(1); - expect(records[0].labels).toEqual({ label1: "value1", label2: "value3" }); + expect(records[0].labels).toEqual({ score: "20", class: "dog" }); }); + + it_api("1.13")( + "should query records with strict or non-strict condition", + async () => { + const bucket: Bucket = await client.getBucket("bucket"); + await expect( + all( + bucket.query("entry-1", undefined, undefined, { + when: { "&NOT_EXIST": { $gt: 10 } }, + strict: true, + }), + ), + ).rejects.toMatchObject({ status: 404 }); + + const records: ReadableRecord[] = await all( + bucket.query("entry-1", undefined, undefined, { + when: { "&NOT_EXIST": { $gt: 10 } }, + }), + ); + expect(records.length).toEqual(0); + }, + ); }); describe("remove", () => { @@ -438,6 +452,38 @@ describe("Bucket", () => { expect(records[0].time).toEqual(3000000n); expect(records[1].time).toEqual(4000000n); }); + + it_api("1.13")("should remove records by condition", async () => { + const bucket: Bucket = await client.getBucket("bucket"); + const removed = await bucket.removeQuery( + "entry-1", + 1_000_000n, + 3_000_000n, + { when: { "&label3": { $eq: true } } }, + ); + expect(removed).toEqual(1); + }); + + it_api("1.13")( + "should remove records by strict and non-strict condition", + async () => { + const bucket: Bucket = await client.getBucket("bucket"); + const removed = await bucket.removeQuery( + "entry-1", + 1_000_000n, + 3_000_000n, + { when: { "&NOT_EXIST": { $eq: true } } }, + ); + expect(removed).toEqual(0); + + await expect( + bucket.removeQuery("entry-1", 1_000_000n, 3_000_000n, { + when: { "&NOT_EXIST": { $eq: true } }, + strict: true, + }), + ).rejects.toMatchObject({ status: 404 }); + }, + ); }); describe("update", () => {