Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: request body compression #1358

Merged
merged 18 commits into from
Oct 22, 2024
38 changes: 38 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Debug Current Test File",
"autoAttachChildProcesses": true,
"skipFiles": ["<node_internals>/**", "**/node_modules/**"],
"program": "${workspaceRoot}/node_modules/vitest/vitest.mjs",
"args": ["run", "${relativeFile}"],
"smartStep": true,
"console": "integratedTerminal"
},
{
"type": "node",
"request": "launch",
"name": "Run Vitest Browser",
"program": "${workspaceRoot}/node_modules/vitest/vitest.mjs",
"console": "integratedTerminal",
"args": ["--inspect-brk", "--browser", "--no-file-parallelism"]
},
{
"type": "chrome",
"request": "attach",
"name": "Attach to Vitest Browser",
"port": 9229
}
],
"compounds": [
{
"name": "Debug Vitest Browser",
"configurations": ["Attach to Vitest Browser", "Run Vitest Browser"],
"stopAll": true
}
]
}
11 changes: 6 additions & 5 deletions packages/abstractions/src/utils/enumUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@
* See License in the project root for license information.
* -------------------------------------------------------------------------------------------
*/
function reverseRecord(input: Record<PropertyKey, PropertyKey>): Record<PropertyKey, PropertyKey> {
const reverseRecord = (input: Record<PropertyKey, PropertyKey>): Record<PropertyKey, PropertyKey> => {
const entries = Object.entries(input).map(([key, value]) => [value, key]);
return Object.fromEntries(entries) as Record<PropertyKey, PropertyKey>;
}
};

/**
* Factory to create an UntypedString from a string.
* @param stringValue The string value to lookup the enum value from.
* @param originalType The type definition of the enum.
* @return The enu value.
* @typeparam T The type of the enum.
* @return The enum value.
*/
export function getEnumValueFromStringValue<T>(stringValue: string, originalType: Record<PropertyKey, PropertyKey>): T | undefined {
export const getEnumValueFromStringValue = <T>(stringValue: string, originalType: Record<PropertyKey, PropertyKey>): T | undefined => {
const reversed: Record<PropertyKey, PropertyKey> = reverseRecord(originalType);
return originalType[reversed[stringValue]] as T;
}
};
9 changes: 6 additions & 3 deletions packages/http/fetch/src/fetchRequestAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ export class FetchRequestAdapter implements RequestAdapter {
if (response) {
const responseContentLength = response.headers.get("Content-Length");
if (responseContentLength) {
spanForAttributes.setAttribute("http.response.body.size", parseInt(responseContentLength));
spanForAttributes.setAttribute("http.response.body.size", parseInt(responseContentLength, 10));
}
const responseContentType = response.headers.get("Content-Type");
if (responseContentType) {
Expand Down Expand Up @@ -527,13 +527,16 @@ export class FetchRequestAdapter implements RequestAdapter {
}
const requestContentLength = requestInfo.headers.tryGetValue("Content-Length");
if (requestContentLength) {
spanForAttributes.setAttribute("http.response.body.size", parseInt(requestContentLength[0]));
spanForAttributes.setAttribute("http.response.body.size", parseInt(requestContentLength[0], 10));
}
const requestContentType = requestInfo.headers.tryGetValue("Content-Type");
if (requestContentType) {
spanForAttributes.setAttribute("http.request.header.content-type", requestContentType);
}
const headers: [string, string][] | undefined = requestInfo.headers ? Array.from(requestInfo.headers.keys()).map((key) => [key.toString().toLocaleLowerCase(), this.foldHeaderValue(requestInfo.headers.tryGetValue(key))]) : undefined;
const headers: Record<string, string> | undefined = {};
requestInfo.headers?.forEach((_, key) => {
headers[key.toString().toLocaleLowerCase()] = this.foldHeaderValue(requestInfo.headers.tryGetValue(key));
});
const request = {
method,
headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { Middleware } from "../middleware";
import { ParametersNameDecodingHandler } from "../parametersNameDecodingHandler";
import { RetryHandler } from "../retryHandler";
import { UserAgentHandler } from "../userAgentHandler";
import { CompressionHandler } from "../compressionHandler";

/**
* @class
Expand All @@ -25,6 +26,6 @@ export class MiddlewareFactory {
*/
public static getDefaultMiddlewares(customFetch: (request: string, init: RequestInit) => Promise<Response> = (...args) => fetch(...args) as any): Middleware[] {
// Browsers handles redirection automatically and do not require the redirectionHandler
return [new RetryHandler(), new ParametersNameDecodingHandler(), new UserAgentHandler(), new HeadersInspectionHandler(), new CustomFetchHandler(customFetch)];
return [new RetryHandler(), new ParametersNameDecodingHandler(), new UserAgentHandler(), new CompressionHandler(), new HeadersInspectionHandler(), new CustomFetchHandler(customFetch)];
}
}
239 changes: 239 additions & 0 deletions packages/http/fetch/src/middlewares/compressionHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/**
* -------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All Rights Reserved. Licensed under the MIT License.
* See License in the project root for license information.
* -------------------------------------------------------------------------------------------
*/

import { type RequestOption, inNodeEnv } from "@microsoft/kiota-abstractions";
import { Span, trace } from "@opentelemetry/api";

import { getObservabilityOptionsFromRequest } from "../observabilityOptions";
import type { Middleware } from "./middleware";
import { CompressionHandlerOptions, CompressionHandlerOptionsKey } from "./options/compressionHandlerOptions";
import type { FetchHeadersInit, FetchRequestInit } from "../utils/fetchDefinitions";
import { deleteRequestHeader, getRequestHeader, setRequestHeader } from "../utils/headersUtil";

/**
* Compress the url content.
*/
export class CompressionHandler implements Middleware {
next: Middleware | undefined;

/**
* @private
* @static
* A member holding the name of content range header
*/
private static readonly CONTENT_RANGE_HEADER = "Content-Range";

/**
* @private
* @static
* A member holding the name of content encoding header
*/
private static readonly CONTENT_ENCODING_HEADER = "Content-Encoding";

/**
* @public
* @constructor
* Creates a new instance of the CompressionHandler class
* @param {CompressionHandlerOptions} handlerOptions The options for the compression handler.
* @returns An instance of the CompressionHandler class
*/
public constructor(private readonly handlerOptions: CompressionHandlerOptions = new CompressionHandlerOptions()) {
if (!handlerOptions) {
throw new Error("handlerOptions cannot be undefined");
}
}

/**
* @inheritdoc
*/
public execute(url: string, requestInit: RequestInit, requestOptions?: Record<string, RequestOption> | undefined): Promise<Response> {
let currentOptions = this.handlerOptions;
if (requestOptions?.[CompressionHandlerOptionsKey]) {
currentOptions = requestOptions[CompressionHandlerOptionsKey] as CompressionHandlerOptions;
}
const obsOptions = getObservabilityOptionsFromRequest(requestOptions);
if (obsOptions) {
return trace.getTracer(obsOptions.getTracerInstrumentationName()).startActiveSpan("compressionHandler - execute", (span) => {
try {
span.setAttribute("com.microsoft.kiota.handler.compression.enable", currentOptions.ShouldCompress);
return this.executeInternal(currentOptions, url, requestInit as FetchRequestInit, requestOptions, span);
} finally {
span.end();
}
});
}
return this.executeInternal(currentOptions, url, requestInit as FetchRequestInit, requestOptions);
}

private async executeInternal(options: CompressionHandlerOptions, url: string, requestInit: FetchRequestInit, requestOptions?: Record<string, RequestOption> | undefined, span?: Span): Promise<Response> {
if (!options.ShouldCompress || this.contentRangeBytesIsPresent(requestInit.headers) || this.contentEncodingIsPresent(requestInit.headers) || requestInit.body === null || requestInit.body === undefined) {
return this.next?.execute(url, requestInit as RequestInit, requestOptions) ?? Promise.reject(new Error("Response is undefined"));
}

span?.setAttribute("http.request.body.compressed", true);

const unCompressedBody = requestInit.body;
const unCompressedBodySize = this.getRequestBodySize(unCompressedBody);

// compress the request body
const compressedBody = await this.compressRequestBody(unCompressedBody);

// add Content-Encoding to request header
setRequestHeader(requestInit, CompressionHandler.CONTENT_ENCODING_HEADER, "gzip");
requestInit.body = compressedBody.compressedBody;

span?.setAttribute("http.request.body.size", compressedBody.size);

// execute the next middleware and check if the response code is 415
let response = await this.next?.execute(url, requestInit as RequestInit, requestOptions);
if (!response) {
throw new Error("Response is undefined");
}
if (response.status === 415) {
// remove the Content-Encoding header
deleteRequestHeader(requestInit, CompressionHandler.CONTENT_ENCODING_HEADER);
requestInit.body = unCompressedBody;
span?.setAttribute("http.request.body.compressed", false);
span?.setAttribute("http.request.body.size", unCompressedBodySize);

response = await this.next?.execute(url, requestInit as RequestInit, requestOptions);
}
return response !== undefined && response !== null ? Promise.resolve(response) : Promise.reject(new Error("Response is undefined"));
}

private contentRangeBytesIsPresent(header: FetchHeadersInit | undefined): boolean {
if (!header) {
return false;
}
const contentRange = getRequestHeader(header, CompressionHandler.CONTENT_RANGE_HEADER);
return contentRange?.toLowerCase().includes("bytes") ?? false;
}

private contentEncodingIsPresent(header: FetchHeadersInit | undefined): boolean {
if (!header) {
return false;
}
return getRequestHeader(header, CompressionHandler.CONTENT_ENCODING_HEADER) !== undefined;
}

private getRequestBodySize(body: unknown): number {
if (!body) {
return 0;
}
if (typeof body === "string") {
return body.length;
}
if (body instanceof Blob) {
return body.size;
}
if (body instanceof ArrayBuffer) {
return body.byteLength;
}
if (ArrayBuffer.isView(body)) {
return body.byteLength;
}
if (inNodeEnv() && Buffer.isBuffer(body)) {
return body.byteLength;
}
throw new Error("Unsupported body type");
}

private readBodyAsBytes(body: unknown): { stream: ReadableStream<Uint8Array>; size: number } {
if (!body) {
return { stream: new ReadableStream<Uint8Array>(), size: 0 };
}

const uint8ArrayToStream = (uint8Array: Uint8Array): ReadableStream<Uint8Array> => {
return new ReadableStream({
start(controller) {
controller.enqueue(uint8Array);
controller.close();
},
});
};

if (typeof body === "string") {
return { stream: uint8ArrayToStream(new TextEncoder().encode(body)), size: body.length };
}
if (body instanceof Blob) {
return { stream: body.stream(), size: body.size };
}
if (body instanceof ArrayBuffer) {
return { stream: uint8ArrayToStream(new Uint8Array(body)), size: body.byteLength };
}
if (ArrayBuffer.isView(body)) {
return { stream: uint8ArrayToStream(new Uint8Array(body.buffer, body.byteOffset, body.byteLength)), size: body.byteLength };
}
throw new Error("Unsupported body type");
}

private async compressRequestBody(body: unknown): Promise<{
compressedBody: ArrayBuffer | Buffer;
size: number;
}> {
if (!inNodeEnv()) {
// in browser
const compressionData = this.readBodyAsBytes(body);
const compressedBody = await this.compressUsingCompressionStream(compressionData.stream);
return {
compressedBody: compressedBody.body,
size: compressedBody.size,
};
} else {
// In Node.js
const compressedBody = await this.compressUsingZlib(body);
return {
compressedBody,
size: compressedBody.length,
};
}
}

private async compressUsingZlib(body: unknown): Promise<Buffer> {
// @ts-ignore
const zlib = await import("zlib");
return new Promise((resolve, reject) => {
zlib.gzip(body as string | ArrayBuffer | NodeJS.ArrayBufferView, (err, compressed) => {
if (err) {
reject(err);
} else {
resolve(compressed);
}
});
});
}

private async compressUsingCompressionStream(uint8ArrayStream: ReadableStream<Uint8Array>): Promise<{ body: ArrayBuffer; size: number }> {
const compressionStream = new CompressionStream("gzip");

const compressedStream = uint8ArrayStream.pipeThrough<Uint8Array>(compressionStream);

const reader = compressedStream.getReader();
const compressedChunks: Uint8Array[] = [];
let totalLength = 0;

let result = await reader.read();
while (!result.done) {
const chunk = result.value;
compressedChunks.push(chunk);
totalLength += chunk.length;
result = await reader.read();
}

const compressedArray = new Uint8Array(totalLength);
let offset = 0;
for (const chunk of compressedChunks) {
compressedArray.set(chunk, offset);
offset += chunk.length;
}

return {
body: compressedArray.buffer,
size: compressedArray.length,
};
}
}
8 changes: 2 additions & 6 deletions packages/http/fetch/src/middlewares/customFetchHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@ export class CustomFetchHandler implements Middleware {
*/
next: Middleware | undefined;

constructor(private customFetch: (input: string, init: RequestInit) => Promise<Response>) {}
constructor(private readonly customFetch: (input: string, init: RequestInit) => Promise<Response>) {}

/**
* @public
* @async
* To execute the current middleware
* @param {Context} context - The request context object
* @returns A promise that resolves to nothing
* @inheritdoc
*/
public async execute(url: string, requestInit: RequestInit): Promise<Response> {
return await this.customFetch(url, requestInit);
Expand Down
2 changes: 2 additions & 0 deletions packages/http/fetch/src/middlewares/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export interface Middleware {
next: Middleware | undefined;

/**
* @public
* @async
* Main method of the middleware.
* @param requestInit The Fetch RequestInit object.
* @param url The URL of the request.
Expand Down
3 changes: 2 additions & 1 deletion packages/http/fetch/src/middlewares/middlewareFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { ParametersNameDecodingHandler } from "./parametersNameDecodingHandler";
import { RedirectHandler } from "./redirectHandler";
import { RetryHandler } from "./retryHandler";
import { UserAgentHandler } from "./userAgentHandler";
import { CompressionHandler } from "./compressionHandler";

/**
* @class
Expand All @@ -25,6 +26,6 @@ export class MiddlewareFactory {
* @returns an array of the middleware handlers of the default middleware chain
*/
public static getDefaultMiddlewares(customFetch: (request: string, init: RequestInit) => Promise<Response> = (...args) => fetch(...args) as any): Middleware[] {
return [new RetryHandler(), new RedirectHandler(), new ParametersNameDecodingHandler(), new UserAgentHandler(), new HeadersInspectionHandler(), new CustomFetchHandler(customFetch)];
return [new RetryHandler(), new RedirectHandler(), new ParametersNameDecodingHandler(), new UserAgentHandler(), new CompressionHandler(), new HeadersInspectionHandler(), new CustomFetchHandler(customFetch)];
}
}
Loading
Loading