Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: request body compression #1358

Merged
merged 18 commits into from
Oct 22, 2024
187 changes: 187 additions & 0 deletions packages/http/fetch/src/middlewares/compressionHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/**
* -------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All Rights Reserved. Licensed under the MIT License.
* See License in the project root for license information.
* -------------------------------------------------------------------------------------------
*/

import type { RequestOption } from "@microsoft/kiota-abstractions";
import { Span, trace } from "@opentelemetry/api";

import { getObservabilityOptionsFromRequest } from "../observabilityOptions";
import type { Middleware } from "./middleware";
import { CompressionHandlerOptions, CompressionHandlerOptionsKey } from "./options/compressionHandlerOptions";

/**
* Compress the url content.
*/
export class CompressionHandler implements Middleware {
next: Middleware | undefined;

/**
* @private
* @static
* A member holding the name of content range header
*/
private static CONTENT_RANGE_HEADER = "Content-Range";

/**
* @private
* @static
* A member holding the name of content encoding header
*/
private static CONTENT_ENCODING_HEADER = "Content-Encoding";

/**
* @public
* @constructor
* Creates a new instance of the CompressionHandler class
* @param {CompressionHandlerOptions} handlerOptions The options for the compression handler.
* @returns An instance of the CompressionHandler class
*/
public constructor(private readonly handlerOptions: CompressionHandlerOptions = new CompressionHandlerOptions()) {
if (!handlerOptions) {
throw new Error("handlerOptions cannot be undefined");
}
}

/**
* @inheritdoc
*/
public execute(url: string, requestInit: RequestInit, requestOptions?: Record<string, RequestOption> | undefined): Promise<Response> {
let currentOptions = this.handlerOptions;
if (requestOptions && requestOptions[CompressionHandlerOptionsKey]) {
currentOptions = requestOptions[CompressionHandlerOptionsKey] as CompressionHandlerOptions;
}
const obsOptions = getObservabilityOptionsFromRequest(requestOptions);
if (obsOptions) {
return trace.getTracer(obsOptions.getTracerInstrumentationName()).startActiveSpan("compressionHandler - execute", (span) => {
try {
span.setAttribute("com.microsoft.kiota.handler.compression.enable", currentOptions.ShouldCompress);
return this.executeInternal(currentOptions, url, requestInit, requestOptions, span);
} finally {
span.end();
}
});
}
return this.executeInternal(currentOptions, url, requestInit, requestOptions);
}

private async executeInternal(options: CompressionHandlerOptions, url: string, requestInit: RequestInit, requestOptions?: Record<string, RequestOption> | undefined, span?: Span): Promise<Response> {
if (!options.ShouldCompress || this.contentRangeBytesIsPresent(requestInit.headers) || this.contentEncodingIsPresent(requestInit.headers) || requestInit.body === null || requestInit.body === undefined) {
return this.next?.execute(url, requestInit, requestOptions) ?? Promise.reject(new Error("Response is undefined"));
}

span?.setAttribute("http.request.body.compressed", true);

const unCompressedBody = await this.readBodyAsBytes(requestInit.body);
const unCompressedBodySize = unCompressedBody.length;

// compress the request body
const compressedBody = await this.compressReqBody(unCompressedBody);

// add Content-Encoding to request header
requestInit.headers = new Headers(requestInit.headers);
requestInit.headers.set(CompressionHandler.CONTENT_ENCODING_HEADER, "gzip");
requestInit.body = compressedBody.body;

span?.setAttribute("http.request.body.size", compressedBody.size);

// execute the next middleware and check if the response code is 415

const response = await this.next?.execute(url, requestInit, requestOptions);
if (response?.status === 415) {
// remove the Content-Encoding header
requestInit.headers.delete(CompressionHandler.CONTENT_ENCODING_HEADER);
requestInit.body = unCompressedBody.buffer;
span?.setAttribute("http.request.body.compressed", false);
span?.setAttribute("http.request.body.size", unCompressedBodySize);

return this.next?.execute(url, requestInit, requestOptions) ?? Promise.reject(new Error("Response is undefined"));
}
return response != null ? Promise.resolve(response) : Promise.reject(new Error("Response is undefined"));
}

private contentRangeBytesIsPresent(header: HeadersInit | undefined): boolean {
if (!header) {
return false;
}
if (header instanceof Headers) {
const contentRange = header.get(CompressionHandler.CONTENT_RANGE_HEADER);
if (contentRange) {
return contentRange.toLowerCase().includes("bytes");
}
} else if (typeof header === "object") {
const contentRange = (header as Record<string, string>)[CompressionHandler.CONTENT_RANGE_HEADER];
if (contentRange) {
return contentRange.toLowerCase().includes("bytes");
}
}
return false;
}

private contentEncodingIsPresent(header: HeadersInit | undefined): boolean {
if (!header) {
return false;
}
if (header instanceof Headers) {
return header.has(CompressionHandler.CONTENT_ENCODING_HEADER);
} else if (typeof header === "object") {
return CompressionHandler.CONTENT_ENCODING_HEADER in header;
}
return false;
}

private async readBodyAsBytes(body: BodyInit | null | undefined): Promise<Uint8Array> {
if (!body) {
return new Uint8Array();
}
if (typeof body === "string") {
return new TextEncoder().encode(body);
}
if (body instanceof Blob) {
return new Uint8Array(await body.arrayBuffer());
}
if (body instanceof ArrayBuffer) {
return new Uint8Array(body);
}
if (ArrayBuffer.isView(body)) {
return new Uint8Array(body.buffer);
}
throw new Error("Unsupported body type");
}

private compressReqBody(reqBody: Uint8Array): Promise<{ body: ArrayBuffer; size: number }> {
return new Promise((resolve, reject) => {
const buffer = new ArrayBuffer(reqBody.length);
const gzipWriter = new CompressionStream("gzip");
const writer = gzipWriter.writable.getWriter();

writer
.write(reqBody)
.then(() => {
writer
.close()
.then(() => {
const compressedStream = gzipWriter.readable;
const reader = compressedStream.getReader();
let size = 0;

reader
.read()
.then(function process({ done, value }) {
if (done) {
resolve({ body: buffer, size });
return;
}
size += value.length;
reader.read().then(process);
})
.catch(reject);
})
.catch(reject);
})
.catch(reject);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* -------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All Rights Reserved. Licensed under the MIT License.
* See License in the project root for license information.
* -------------------------------------------------------------------------------------------
*/

/**
* @module RetryHandlerOptions
*/

import type { RequestOption } from "@microsoft/kiota-abstractions";

/**
* Key for the compression replace handler options.
*/
export const CompressionHandlerOptionsKey = "CompressionHandlerOptionsKey";

/**
* Options for the compression handler.
*/
export class CompressionHandlerOptions implements RequestOption {
private readonly _enableCompression: boolean;
/**
* Create a new instance of the CompressionHandlerOptions class
* @param config the configuration to apply to the compression handler options.
*/
public constructor(config?: Partial<CompressionHandlerOptionsParams>) {
this._enableCompression = config?.enableCompression ?? true;
}
/**
* @inheritdoc
*/
public getKey(): string {
return CompressionHandlerOptionsKey;
}
/**
* Returns whether the compression handler is enabled or not.
* @returns whether the compression handler is enabled or not.
*/
public get ShouldCompress(): boolean {
return this._enableCompression;
}
}

/**
* Parameters for the CompressionHandlerOptionsParams class constructor
*/
export interface CompressionHandlerOptionsParams {
enableCompression: boolean;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { DummyFetchHandler } from "./dummyFetchHandler";
import { CompressionHandlerOptions } from "../../../src/middlewares/options/compressionHandlerOptions";
import { CompressionHandler } from "../../../src/middlewares/compressionHandler";

const defaultOptions = new CompressionHandlerOptions();

import { assert, describe, it, expect, beforeEach, vi } from "vitest";

describe("CompressionHandler", () => {
let compressionHandler: CompressionHandler;
let nextMiddleware: DummyFetchHandler;

beforeEach(() => {
nextMiddleware = new DummyFetchHandler();
compressionHandler = new CompressionHandler();
compressionHandler.next = nextMiddleware;
});

describe("constructor", () => {
it("Should create an instance with given options", () => {
const handler = new CompressionHandler(defaultOptions);
assert.isDefined(handler["handlerOptions"]);
});

it("Should create an instance with default set of options", () => {
const handler = new CompressionHandler();
assert.isDefined(handler["handlerOptions"]);
});
});

it("should throw an error if handlerOptions is undefined", () => {
expect(() => new CompressionHandler(null as any)).toThrow("handlerOptions cannot be undefined");
});

it("should not compress if ShouldCompress is false", async () => {
const options = new CompressionHandlerOptions({ enableCompression: false });
compressionHandler = new CompressionHandler(options);

compressionHandler.next = nextMiddleware;
vi.spyOn(nextMiddleware, "execute");
nextMiddleware.setResponses([new Response("ok", { status: 200 })]);

const requestInit = { headers: new Headers(), body: "test" };
const response = await compressionHandler.execute("http://example.com", requestInit);

expect(requestInit.headers.has("Content-Encoding")).toBe(false);
expect(nextMiddleware.execute).toHaveBeenCalled();
expect(response).toBeInstanceOf(Response);
});

it("should compress the request body if ShouldCompress is true", async () => {
const options = new CompressionHandlerOptions({ enableCompression: true });
compressionHandler = new CompressionHandler(options);

compressionHandler.next = nextMiddleware;
nextMiddleware.setResponses([new Response("ok", { status: 200 })]);

const requestInit = { headers: new Headers(), body: "test" };
await compressionHandler.execute("http://example.com", requestInit);

expect(requestInit.headers.get("Content-Encoding")).toBe("gzip");
});

it("should handle 415 response and retry without compression", async () => {
const options = new CompressionHandlerOptions({ enableCompression: true });
compressionHandler = new CompressionHandler(options);

compressionHandler.next = nextMiddleware;
nextMiddleware.setResponses([new Response("nope", { status: 415 }), new Response("ok", { status: 200 })]);

const requestInit = { headers: new Headers(), body: "test" };
const response = await compressionHandler.execute("http://example.com", requestInit);

expect(requestInit.headers.has("Content-Encoding")).toBe(false);
expect(response).toBeInstanceOf(Response);
});
});
Loading