Skip to content

Commit

Permalink
RS-543: Support conditional query (#97)
Browse files Browse the repository at this point in the history
* intregrate when condition to Bucket.removeQuery

* suport when condition in Bucket.query method

* update CHANGELOG
  • Loading branch information
atimin authored Dec 4, 2024
1 parent a04d5d2 commit c80fb66
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 50 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added:

- RS-543: Support conditional query, [PR-97](https://github.com/reductstore/reduct-js/pull/97)

## [1.12.0] - 2024-10-04

### Added:
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data stored in ReductStore.
## Features

- Promise-based API for easy asynchronous programming
- Support for [ReductStore HTTP API v1.12](https://www.reduct.store/docs/http-api)
- Support for [ReductStore HTTP API v1.13](https://www.reduct.store/docs/http-api)
- Token-based authentication for secure access to the database
- Labeling for read-write operations and querying
- Batch operations for efficient data processing
Expand Down Expand Up @@ -63,4 +63,4 @@ for await (const record of bucket.query(
}
```

For more examples, see the [Guides](https://reduct.store/docs/guides) section in the ReductStore documentation.
For more examples, see the [Guides](https://www.reduct.store/docs/guides) section in the ReductStore documentation.
84 changes: 51 additions & 33 deletions src/Bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,7 @@ import Stream, { Readable } from "stream";
import { Buffer } from "buffer";
import { Batch, BatchType } from "./Batch";
import { isCompatibale } from "./Client";

/**
* Options for querying records
*/
export interface QueryOptions {
ttl?: number; // Time to live in seconds
include?: LabelMap; // include only record which have all these labels with the same value
exclude?: LabelMap; // exclude record which have all these labels with the same value
eachS?: number; // return only one record per S second
eachN?: number; // return each N-th record
limit?: number; // limit number of records
continuous?: boolean; // await for new records
pollInterval?: number; // interval for polling new records (only for continue=true)
head?: boolean; // return only head of the record
}
import { QueryOptions, QueryType } from "./messages/QueryEntry";

/**
* Options for writing records
Expand Down Expand Up @@ -142,20 +128,28 @@ export class Bucket {
* @param entry {string} name of the entry
* @param start {BigInt} start point of the time period, if undefined, the query starts from the first record
* @param stop {BigInt} stop point of the time period. If undefined, the query stops at the last record
* @param QueryOptions {QueryOptions} options for query. You can use only include, exclude, eachS, eachN other options are ignored
* @param options {QueryOptions} options for query. You can use only include, exclude, eachS, eachN other options are ignored
*/
async removeQuery(
entry: string,
start?: bigint,
stop?: bigint,
QueryOptions?: QueryOptions,
options?: QueryOptions,
): Promise<void> {
const ret = this.parse_query_params(start, stop, QueryOptions);
if (options !== undefined && options.when !== undefined) {
const { data } = await this.httpClient.post(
`/b/${this.name}/${entry}/q`,
QueryOptions.serialize(QueryType.REMOVE, options),
);
return Promise.resolve(data["removed_records"]);
} else {
const ret = this.parse_query_params(start, stop, options);

const { data } = await this.httpClient.delete(
`/b/${this.name}/${entry}/q?${ret.query}`,
);
return Promise.resolve(data["removed_records"]);
const { data } = await this.httpClient.delete(
`/b/${this.name}/${entry}/q?${ret.query}`,
);
return Promise.resolve(data["removed_records"]);
}
}

/**
Expand Down Expand Up @@ -294,27 +288,51 @@ export class Bucket {
stop?: bigint,
options?: number | QueryOptions,
): AsyncGenerator<ReadableRecord> {
const ret = this.parse_query_params(start, stop, options);
let id;
let header_api_version;
let continuous = false;
let pollInterval = 1;
let head = false;
if (
options !== undefined &&
typeof options === "object" &&
"when" in options
) {
const { data, headers } = await this.httpClient.post(
`/b/${this.name}/${entry}/q`,
QueryOptions.serialize(QueryType.QUERY, options),
);
({ id } = data);
header_api_version = headers["x-reduct-api"];
continuous = options.continuous ?? false;
pollInterval = options.pollInterval ?? 1;
head = options.head ?? false;
} else {
// TODO: remove this block after 1.xx
const ret = this.parse_query_params(start, stop, options);

const url = `/b/${this.name}/${entry}/q?` + ret.query;
const { data, headers } = await this.httpClient.get(url);
({ id } = data);
header_api_version = headers["x-reduct-api"];
({ continuous, pollInterval, head } = ret);
}

const url = `/b/${this.name}/${entry}/q?` + ret.query;
const { data, headers } = await this.httpClient.get(url);
const { id } = data;
const header_api_version = headers["x-reduct-api"];
if (isCompatibale("1.5", header_api_version) && !this.isBrowser) {
yield* this.fetchAndParseBatchedRecords(
entry,
id,
ret.continuous,
ret.pollInterval,
ret.head,
continuous,
pollInterval,
head,
);
} else {
yield* this.fetchAndParseSingleRecord(
entry,
id,
ret.continuous,
ret.pollInterval,
ret.head,
continuous,
pollInterval,
head,
);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Client } from "./Client";
import { Bucket, QueryOptions, WriteOptions } from "./Bucket";
import { Bucket, WriteOptions } from "./Bucket";
import { APIError } from "./APIError";
import { ServerInfo, LicenseInfo } from "./messages/ServerInfo";
import { BucketSettings, QuotaType } from "./messages/BucketSettings";
Expand All @@ -10,6 +10,7 @@ import { ReplicationInfo } from "./messages/ReplicationInfo";
import { ReplicationSettings } from "./messages/ReplicationSettings";
import { FullReplicationInfo } from "./messages/ReplicationInfo";
import { Batch } from "./Batch";
import { QueryOptions } from "./messages/QueryEntry";

export {
Client,
Expand Down
88 changes: 88 additions & 0 deletions src/messages/QueryEntry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { LabelMap } from "../Record";

export enum QueryType {
QUERY,
REMOVE,
}

export interface QueryEntry {
query_type: string;

/** Start query from (Unix timestamp in microseconds) */
start?: number;
/** Stop query at (Unix timestamp in microseconds) */
stop?: number;

/** Include records with label */
include?: Record<string, string>;
/** Exclude records with label */
exclude?: Record<string, string>;
/** Return a record every S seconds */
each_s?: number;
/** Return a record every N records */
each_n?: number;
/** Limit the number of records returned */
limit?: number;

/** TTL of query in seconds */
ttl?: number;
/** Retrieve only metadata */
only_metadata?: boolean;
/** Continuous query, it doesn't stop until the TTL is reached */
continuous?: boolean;

/** Conditional query */
when?: any;
/** Strict conditional query
* If true, the query returns an error if any condition cannot be evaluated
*/
strict?: boolean;
}

/**
* Options for querying records
*/
export class QueryOptions {
/** Time to live in seconds */
ttl?: number;
/** Include records with label
* @deprecated: use when instead
* */
include?: LabelMap;
/** Exclude records with label
* @deprecated: use when instead
* */
exclude?: LabelMap;
/** Return only one record per S second */
eachS?: number;
/** Return only one record per N records */
eachN?: number;
/** Limit number of records */
limit?: number;
/** Don't stop query until TTL is reached */
continuous?: boolean;
/** Poll interval for new records only for continue=true */
pollInterval?: number;
/** Return only metadata */
head?: boolean;
/** Conditional query */
when?: Record<string, any>;
/** strict conditional query */
strict?: boolean;

static serialize(queryType: QueryType, data: QueryOptions): QueryEntry {
return {
query_type: QueryType[queryType],
ttl: data.ttl,
include: data.include as Record<string, string>,
exclude: data.exclude as Record<string, string>,
each_s: data.eachS,
each_n: data.eachN,
limit: data.limit,
continuous: data.continuous,
when: data.when,
strict: data.strict,
only_metadata: data.head,
};
}
}
74 changes: 60 additions & 14 deletions test/Bucket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,34 +355,48 @@ describe("Bucket", () => {
expect(records[0].time).toEqual(2_000_000n);
});

it("should query records with labels", async () => {
it_api("1.13")("should query records with condition", async () => {
const bucket: Bucket = await client.getBucket("bucket");

let record = await bucket.beginWrite("entry-labels", {
labels: { label1: "value1", label2: "value2" },
labels: { score: 10, class: "cat" },
});
await record.write("somedata1");
record = await bucket.beginWrite("entry-labels", {
labels: { label1: "value1", label2: "value3" },
labels: { score: 20, class: "dog" },
});
await record.write("somedata1");

let records: ReadableRecord[] = await all(
bucket.query("entry-labels", undefined, undefined, {
include: { label1: "value1", label2: "value2" },
}),
);
expect(records.length).toEqual(1);
expect(records[0].labels).toEqual({ label1: "value1", label2: "value2" });

records = await all(
const records: ReadableRecord[] = await all(
bucket.query("entry-labels", undefined, undefined, {
exclude: { label1: "value1", label2: "value2" },
when: { "&score": { $gt: 10 } },
}),
);
expect(records.length).toEqual(1);
expect(records[0].labels).toEqual({ label1: "value1", label2: "value3" });
expect(records[0].labels).toEqual({ score: "20", class: "dog" });
});

it_api("1.13")(
"should query records with strict or non-strict condition",
async () => {
const bucket: Bucket = await client.getBucket("bucket");
await expect(
all(
bucket.query("entry-1", undefined, undefined, {
when: { "&NOT_EXIST": { $gt: 10 } },
strict: true,
}),
),
).rejects.toMatchObject({ status: 404 });

const records: ReadableRecord[] = await all(
bucket.query("entry-1", undefined, undefined, {
when: { "&NOT_EXIST": { $gt: 10 } },
}),
);
expect(records.length).toEqual(0);
},
);
});

describe("remove", () => {
Expand Down Expand Up @@ -438,6 +452,38 @@ describe("Bucket", () => {
expect(records[0].time).toEqual(3000000n);
expect(records[1].time).toEqual(4000000n);
});

it_api("1.13")("should remove records by condition", async () => {
const bucket: Bucket = await client.getBucket("bucket");
const removed = await bucket.removeQuery(
"entry-1",
1_000_000n,
3_000_000n,
{ when: { "&label3": { $eq: true } } },
);
expect(removed).toEqual(1);
});

it_api("1.13")(
"should remove records by strict and non-strict condition",
async () => {
const bucket: Bucket = await client.getBucket("bucket");
const removed = await bucket.removeQuery(
"entry-1",
1_000_000n,
3_000_000n,
{ when: { "&NOT_EXIST": { $eq: true } } },
);
expect(removed).toEqual(0);

await expect(
bucket.removeQuery("entry-1", 1_000_000n, 3_000_000n, {
when: { "&NOT_EXIST": { $eq: true } },
strict: true,
}),
).rejects.toMatchObject({ status: 404 });
},
);
});

describe("update", () => {
Expand Down

0 comments on commit c80fb66

Please sign in to comment.