diff --git a/plugins/out_azure_kusto/CMakeLists.txt b/plugins/out_azure_kusto/CMakeLists.txt index 6803bee09c2..14e0e8a6403 100644 --- a/plugins/out_azure_kusto/CMakeLists.txt +++ b/plugins/out_azure_kusto/CMakeLists.txt @@ -1,7 +1,8 @@ set(src - azure_kusto.c - azure_kusto_conf.c - azure_kusto_ingest.c - ) + azure_kusto.c + azure_kusto_conf.c + azure_kusto_ingest.c + azure_kusto_store.c +) -FLB_PLUGIN(out_azure_kusto "${src}" "") +FLB_PLUGIN(out_azure_kusto "${src}" "") \ No newline at end of file diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 4b79d80c4bb..c8b87117c21 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -18,61 +18,266 @@ */ #include -#include #include #include #include #include +#include #include +#include +#include +#include +#include +#include #include #include "azure_kusto.h" #include "azure_kusto_conf.h" #include "azure_kusto_ingest.h" +#include "azure_kusto_store.h" -/* Create a new oauth2 context and get a oauth2 token */ -static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx) + +/** + * flb_azure_imds_get_token - Retrieve an Azure IMDS token for authentication. + * + * This function constructs an HTTP GET request to the Azure Instance Metadata Service (IMDS) + * to obtain an authentication token. The token is used for accessing Azure services. + * + * Parameters: + * - ctx: A pointer to the flb_azure_kusto context, which contains configuration and state + * information necessary for the operation. + * - client_id: A string representing the client ID for which the token is requested. + * + * Returns: + * - A dynamically allocated string (flb_sds_t) containing the token if the request is successful. + * - NULL if the request fails or if any error occurs during the process. + * + * The function performs the following steps: + * 1. Constructs the request URL using the Azure IMDS endpoint, API version, and resource. + * 2. Establishes a connection to the IMDS service using the upstream connection from the context. + * 3. Sends an HTTP GET request with the necessary headers, including Metadata and client_id. + * 4. Checks the HTTP response status and retrieves the token from the response payload if successful. + * 5. Cleans up resources, including the HTTP client, connection, and URL string, before returning. + * + * Note: + * - The caller is responsible for freeing the returned token string. + * - The function logs debug information about the HTTP request and response. + */ +flb_sds_t flb_azure_imds_get_token(struct flb_azure_kusto *ctx, flb_sds_t client_id) { int ret; - char *token; - - /* Clear any previous oauth2 payload content */ - flb_oauth2_payload_clear(ctx->o); + flb_sds_t url; + flb_sds_t response = NULL; + struct flb_http_client *client; + struct flb_connection *u_conn; + size_t b_sent; - ret = flb_oauth2_payload_append(ctx->o, "grant_type", 10, "client_credentials", 18); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - return -1; + url = flb_sds_create_size(256); + if (!url) { + flb_errno(); + return NULL; } - ret = flb_oauth2_payload_append(ctx->o, "scope", 5, FLB_AZURE_KUSTO_SCOPE, 39); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - return -1; - } + flb_sds_printf(&url, "%s?api-version=%s&resource=%s", + FLB_AZURE_IMDS_ENDPOINT, + FLB_AZURE_IMDS_API_VERSION, + FLB_AZURE_IMDS_RESOURCE); - ret = flb_oauth2_payload_append(ctx->o, "client_id", 9, ctx->client_id, -1); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - return -1; + u_conn = flb_upstream_conn_get(ctx->imds_upstream); + if (!u_conn) { + flb_sds_destroy(url); + return NULL; } - ret = flb_oauth2_payload_append(ctx->o, "client_secret", 13, ctx->client_secret, -1); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - return -1; + flb_plg_debug(ctx->ins, "[flb_azure_imds_get_token] client_id : %s", client_id); + + client = flb_http_client(u_conn, FLB_HTTP_GET, url, NULL, 0, "169.254.169.254", 80, NULL, 0); + flb_http_add_header(client, "Metadata", 8, "true", 4); + flb_http_add_header(client, "client_id", 8, client_id, 36); + + ret = flb_http_do(client, &b_sent); + if (ret != 0 || client->resp.status != 200) { + flb_plg_debug(ctx->ins, "[flb_azure_imds_get_token] HTTP response status %d & payload: %.*s",client->resp.status, (int)client->resp.payload_size, client->resp.payload); + flb_http_client_destroy(client); + flb_upstream_conn_release(u_conn); + flb_sds_destroy(url); + return NULL; } - /* Retrieve access token */ - token = flb_oauth2_token_get(ctx->o); - if (!token) { - flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); - return -1; + flb_plg_debug(ctx->ins, "[flb_azure_imds_get_token] HTTP response status: %d & payload: %.*s", client->resp.status, (int)client->resp.payload_size, client->resp.payload); + + response = flb_sds_create_len(client->resp.payload, client->resp.payload_size); + flb_http_client_destroy(client); + flb_upstream_conn_release(u_conn); + flb_sds_destroy(url); + + return response; +} + + +/** + * azure_kusto_get_oauth2_token - Retrieve an OAuth2 token for Azure Kusto + * + * This function is responsible for obtaining an OAuth2 access token required + * for authenticating requests to Azure Kusto. It supports two methods of + * token retrieval: using the Azure Instance Metadata Service (IMDS) or + * through a client secret flow. + * + * Parameters: + * - ctx: A pointer to the flb_azure_kusto context structure, which contains + * configuration and state information necessary for token retrieval. + * + * Returns: + * - 0 on success, indicating that the token was successfully retrieved and + * stored in the context. + * - -1 on failure, indicating an error occurred during the token retrieval + * process. + * + * The function performs the following steps: + * 1. Logs the start of the token retrieval process. + * 2. Checks if IMDS-based token retrieval is enabled: + * - If enabled, it attempts to retrieve a token from the Azure IMDS. + * - Parses the JSON response to extract the access token. + * - Logs and handles errors if token extraction fails. + * 3. If IMDS is not used, it falls back to the client secret flow: + * - Clears any existing OAuth2 payload data. + * - Appends necessary parameters to the OAuth2 payload, including + * grant type, scope, client ID, and client secret. + * - Requests an OAuth2 token using the constructed payload. + * - Logs and handles errors if token retrieval fails. + * 4. Logs the successful completion of the token retrieval process. + * + * Note: + * - The function assumes that the context (ctx) is properly initialized + * and contains valid configuration data. + * - The extracted access token is stored in the context's OAuth2 structure + * for subsequent use in authenticated requests. + */ + +static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx) +{ + char *token = NULL; + int ret; + flb_sds_t response = NULL; + char *access_token = NULL; + + flb_plg_debug(ctx->ins, "Starting token retrieval process"); + + /* Check if IMDS-based token retrieval is needed */ + if (ctx->use_imds == FLB_TRUE) { + flb_plg_debug(ctx->ins, "Using IMDS for token retrieval"); + + response = flb_azure_imds_get_token(ctx, ctx->client_id); + if (!response) { + flb_plg_error(ctx->ins, "Failed to retrieve token from Azure IMDS"); + return -1; + } + + // Parse the JSON response to extract the access token + size_t access_token_len = 0; + char *p = strstr(response, "\"access_token\":\""); + if (p) { + p += strlen("\"access_token\":\""); + char *end = strstr(p, "\""); + if (end) { + access_token_len = end - p; + access_token = flb_malloc(access_token_len + 1); + if (access_token) { + strncpy(access_token, p, access_token_len); + access_token[access_token_len] = '\0'; + } + } + } + + flb_plg_debug(ctx->ins, "Access token extracted is %s", access_token); + + if (!access_token) { + flb_plg_error(ctx->ins, "Error extracting access token from IMDS response"); + flb_sds_destroy(response); + return -1; + } + + ctx->o->access_token = flb_sds_create(access_token); + ctx->o->token_type = flb_sds_create("Bearer"); + flb_free(access_token); + + flb_sds_destroy(response); + + } else { + /* Use client secret flow */ + flb_plg_debug(ctx->ins, "Using client secret flow for token retrieval"); + + flb_oauth2_payload_clear(ctx->o); + + ret = flb_oauth2_payload_append(ctx->o, "grant_type", 10, "client_credentials", 18); + if (ret == -1) { + flb_plg_error(ctx->ins, "Error appending oauth2 params: grant_type"); + return -1; + } + + ret = flb_oauth2_payload_append(ctx->o, "scope", 5, FLB_AZURE_KUSTO_SCOPE, 39); + if (ret == -1) { + flb_plg_error(ctx->ins, "Error appending oauth2 params: scope"); + return -1; + } + + ret = flb_oauth2_payload_append(ctx->o, "client_id", 9, ctx->client_id, -1); + if (ret == -1) { + flb_plg_error(ctx->ins, "Error appending oauth2 params: client_id"); + return -1; + } + + ret = flb_oauth2_payload_append(ctx->o, "client_secret", 13, ctx->client_secret, -1); + if (ret == -1) { + flb_plg_error(ctx->ins, "Error appending oauth2 params: client_secret"); + return -1; + } + + flb_plg_debug(ctx->ins, "Requesting oauth2 token"); + token = flb_oauth2_token_get(ctx->o); + if (!token) { + flb_plg_error(ctx->ins, "Error retrieving oauth2 access token"); + return -1; + } } + flb_plg_debug(ctx->ins, "OAuth2 token retrieval process completed successfully"); return 0; } +/** + * get_azure_kusto_token - Retrieve the OAuth2 token for Azure Kusto + * + * This function is responsible for obtaining a valid OAuth2 token for + * authenticating requests to Azure Kusto. It ensures thread safety by + * using a mutex to lock the token retrieval process, preventing race + * conditions when accessing or modifying the token. + * + * The function first attempts to lock the mutex associated with the + * token context. If the lock is successful, it checks if the current + * token has expired. If the token is expired, it calls + * `azure_kusto_get_oauth2_token` to refresh the token. + * + * After ensuring the token is valid, the function logs the token type + * and access token for debugging purposes. It then creates a new + * string buffer to store the token type and access token, ensuring + * that the original token strings are not modified or freed during + * this process. + * + * Finally, the function unlocks the mutex and returns the newly + * created token string. If any errors occur during the process, + * appropriate error messages are logged, and the function returns + * NULL. + * + * Parameters: + * ctx - A pointer to the `flb_azure_kusto` context structure, which + * contains the token information and mutex. + * + * Returns: + * A newly allocated `flb_sds_t` string containing the token type and + * access token, or NULL if an error occurs. + */ + flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) { int ret = 0; @@ -87,6 +292,9 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) ret = azure_kusto_get_oauth2_token(ctx); } + flb_plg_debug(ctx->ins, "oauth token type is %s", ctx->o->token_type); + flb_plg_debug(ctx->ins, "oauth token is %s", ctx->o->access_token); + /* Copy string to prevent race conditions (get_oauth2 can free the string) */ if (ret == 0) { output = flb_sds_create_size(flb_sds_len(ctx->o->token_type) + @@ -132,16 +340,20 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:"); flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha); flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha); - flb_plg_debug(ctx->ins, "load_time: %lu", ctx->resources->load_time); + flb_plg_debug(ctx->ins, "load_time: %llu", ctx->resources->load_time); - ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout ; + ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; + if (ctx->buffering_enabled == FLB_TRUE){ + ctx->u->base.flags &= ~(FLB_IO_ASYNC); + } + flb_plg_debug(ctx->ins, "execute_ingest_csl_command -- async flag is %d", flb_stream_is_async(&ctx->u->base)); /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); if (u_conn) { token = get_azure_kusto_token(ctx); - flb_plg_debug(ctx->ins, "after get azure kusto token"); + flb_plg_debug(ctx->ins, "after get azure kusto token"); if (token) { /* Compose request body */ @@ -171,26 +383,25 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs /* Send HTTP request */ ret = flb_http_do(c, &b_sent); flb_plg_debug( - ctx->ins, - "Kusto ingestion command request http_do=%i, HTTP Status: %i", - ret, c->resp.status); + ctx->ins, + "Kusto ingestion command request http_do=%i, HTTP Status: %i", + ret, c->resp.status); + flb_plg_debug(ctx->ins, "Kusto ingestion command HTTP request payload: %.*s", (int)c->resp.payload_size, c->resp.payload); if (ret == 0) { if (c->resp.status == 200) { - /* Copy payload response to the response param */ - resp = - flb_sds_create_len(c->resp.payload, c->resp.payload_size); - } - else if (c->resp.payload_size > 0) { - flb_plg_debug(ctx->ins, "Request failed and returned: \n%s", - c->resp.payload); + // Copy payload response to the response param + resp = flb_sds_create_len(c->resp.payload, c->resp.payload_size); } else { - flb_plg_debug(ctx->ins, "Request failed"); + flb_plg_error(ctx->ins, "Kusto Ingestion Resources Request failed with HTTP Status: %i", c->resp.status); + if (c->resp.payload_size > 0) { + flb_plg_error(ctx->ins, "Kusto Ingestion Resources Response payload: \n%s", c->resp.payload); + } } } else { - flb_plg_error(ctx->ins, "cannot send HTTP request"); + flb_plg_error(ctx->ins, "Kusto Ingestion Resources :: cannot send HTTP request"); } flb_http_client_destroy(c); @@ -220,12 +431,600 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs return resp; } +/** + * construct_request_buffer - Constructs a request buffer for Azure Kusto ingestion. + * + * This function is responsible for preparing a data buffer that will be used + * to send data to Azure Kusto. It handles both new incoming data and data + * that has been previously buffered in a file. The function performs the + * following tasks: + * + * 1. Validates Input: Checks if both `new_data` and `upload_file` are NULL, + * which would indicate an error since there is no data to process. + * + * 2. Reads Buffered Data: If an `upload_file` is provided, it reads the + * locally buffered data from the file and locks the file to prevent + * concurrent modifications. + * + * 3. Appends New Data: If `new_data` is provided, it appends this data to + * the buffered data, reallocating memory as necessary to accommodate the + * combined data size. + * + * 4. Outputs the Result: Sets the output parameters `out_buf` and `out_size` + * to point to the constructed buffer and its size, respectively. + * + * The function ensures that the buffer is correctly terminated if compression + * is not enabled, and it handles memory allocation and error checking + * throughout the process. + * + * Parameters: + * @ctx: The context containing configuration and state information. + * @new_data: The new data to be appended to the buffer, if any. + * @upload_file: The file containing previously buffered data, if any. + * @out_buf: Pointer to the output buffer that will be constructed. + * @out_size: Pointer to the size of the constructed buffer. + * + * Returns: + * 0 on success, or -1 on failure with an appropriate error message logged. + */ +static int construct_request_buffer(struct flb_azure_kusto *ctx, flb_sds_t new_data, + struct azure_kusto_file *upload_file, + char **out_buf, size_t *out_size) +{ + char *body; + char *tmp; + size_t body_size = 0; + char *buffered_data = NULL; + size_t buffer_size = 0; + int ret; + + if (new_data == NULL && upload_file == NULL) { + flb_plg_error(ctx->ins, "[construct_request_buffer] Something went wrong" + " both chunk and new_data are NULL"); + return -1; + } + + if (upload_file) { + ret = azure_kusto_store_file_upload_read(ctx, upload_file->fsf, &buffered_data, &buffer_size); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not read locally buffered data %s", + upload_file->fsf->name); + return -1; + } + + /* + * lock the upload_file from buffer list + */ + azure_kusto_store_file_lock(upload_file); + body = buffered_data; + body_size = buffer_size; + } + + flb_plg_debug(ctx->ins, "[construct_request_buffer] size of buffer file read %zu", buffer_size); + + /* + * If new data is arriving, increase the original 'buffered_data' size + * to append the new one. + */ + if (new_data) { + body_size += flb_sds_len(new_data); + flb_plg_debug(ctx->ins, "[construct_request_buffer] size of new_data %zu", body_size); + + tmp = flb_realloc(buffered_data, body_size + 1); + if (!tmp) { + flb_errno(); + flb_free(buffered_data); + if (upload_file) { + azure_kusto_store_file_unlock(upload_file); + } + return -1; + } + body = buffered_data = tmp; + memcpy(body + buffer_size, new_data, flb_sds_len(new_data)); + if (ctx->compression_enabled == FLB_FALSE){ + body[body_size] = '\0'; + } + } + + flb_plg_debug(ctx->ins, "[construct_request_buffer] final increased %zu", body_size); + + *out_buf = body; + *out_size = body_size; + + return 0; +} + +/** + * Ingest all data chunks from the file storage streams into Azure Kusto. + * + * This function iterates over all file storage streams associated with the + * given Azure Kusto context. For each + * file in the stream, it checks if the file (chunk) is locked or has exceeded + * the maximum number of retry attempts. If the chunk is eligible for processing, + * it constructs a request buffer from the chunk data, optionally compresses + * the payload, and attempts to ingest it into Azure Kusto. + * + * The function performs the following steps: + * 1. Iterate over each file storage stream in the context. + * 2. For each file in the stream, check if it is locked or has exceeded + * the maximum retry attempts. If so, skip processing. + * 3. Construct a request buffer from the chunk data. + * 4. Create a payload from the buffer and optionally compress it if + * compression is enabled. + * 5. Load the necessary ingestion resources for Azure Kusto. + * 6. Attempt to ingest the payload into Azure Kusto using queued ingestion. + * 7. If ingestion is successful, clean up the local buffer file. + * 8. Handle errors by unlocking the chunk, incrementing failure counts, + * and logging appropriate error messages. + * + * @param ctx Pointer to the Azure Kusto context containing configuration + * and state information. + * @param config Pointer to the Fluent Bit configuration structure. + * + * @return 0 on success, or -1 on failure. + */ +static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *config) +{ + struct azure_kusto_file *chunk; + struct mk_list *tmp; + struct mk_list *head; + struct mk_list *f_head; + struct flb_fstore_file *fsf; + struct flb_fstore_stream *fs_stream; + flb_sds_t payload = NULL; + void *final_payload = NULL; + size_t final_payload_size = 0; + char *buffer = NULL; + size_t buffer_size; + int ret; + int is_compressed = FLB_FALSE; + flb_sds_t tag_sds; + + mk_list_foreach(head, &ctx->fs->streams) { + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + if (fs_stream == ctx->stream_upload) { + continue; + } + + mk_list_foreach_safe(f_head, tmp, &fs_stream->files) { + fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); + chunk = fsf->data; + + /* Locked chunks are being processed, skip */ + if (chunk->locked == FLB_TRUE) { + continue; + } + + if (chunk->failures >= ctx->scheduler_max_retries) { + flb_plg_warn(ctx->ins, + "ingest_all_old_buffer_files :: Chunk for tag %s failed to send %i times, " + "will not retry", + (char *) fsf->meta_buf, ctx->scheduler_max_retries); + if (ctx->delete_on_max_upload_error){ + azure_kusto_store_file_delete(ctx, chunk); + }else{ + azure_kusto_store_file_inactive(ctx, chunk); + } + continue; + } + + ret = construct_request_buffer(ctx, NULL, chunk, + &buffer, &buffer_size); + if (ret < 0) { + flb_plg_error(ctx->ins, + "ingest_all_old_buffer_files :: Could not construct request buffer for %s", + chunk->file_path); + return -1; + } + + payload = flb_sds_create_len(buffer, buffer_size); + tag_sds = flb_sds_create(fsf->meta_buf); + flb_free(buffer); + + /* Compress the JSON payload */ + if (ctx->compression_enabled == FLB_TRUE) { + ret = flb_gzip_compress((void *) payload, flb_sds_len(payload), + &final_payload, &final_payload_size); + if (ret != 0) { + flb_plg_error(ctx->ins, + "ingest_all_old_buffer_files :: cannot gzip payload"); + flb_sds_destroy(payload); + flb_sds_destroy(tag_sds); + return -1; + } + else { + is_compressed = FLB_TRUE; + flb_plg_debug(ctx->ins, "ingest_all_old_buffer_files :: enabled payload gzip compression"); + } + } else { + final_payload = payload; + final_payload_size = flb_sds_len(payload); + } + + ret = azure_kusto_load_ingestion_resources(ctx, config); + if (ret != 0) { + flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: cannot load ingestion resources"); + return -1; + } + + // Call azure_kusto_queued_ingestion to ingest the payload + ret = azure_kusto_queued_ingestion(ctx, tag_sds, flb_sds_len(tag_sds), final_payload, final_payload_size, chunk); + if (ret != 0) { + flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: Failed to ingest data to Azure Kusto"); + if (chunk){ + azure_kusto_store_file_unlock(chunk); + chunk->failures += 1; + } + flb_sds_destroy(tag_sds); + flb_sds_destroy(payload); + if (is_compressed) { + flb_free(final_payload); + } + return -1; + } + + flb_sds_destroy(tag_sds); + flb_sds_destroy(payload); + if (is_compressed) { + flb_free(final_payload); + } + + /* data was sent successfully- delete the local buffer */ + azure_kusto_store_file_cleanup(ctx, chunk); + } + } + + return 0; +} + +/** + * cb_azure_kusto_ingest - Callback function for ingesting data to Azure Kusto. + * + * Parameters: + * @config: Pointer to the Fluent Bit configuration context. + * @data: Pointer to the Kusto plugin context, which contains configuration and + * state information for the ingestion process. + * + * The function performs the following steps: + * 1. Initializes a random seed for staggered refresh intervals. + * 2. Logs the start of the upload timer callback. + * 3. Iterates over all files in the active stream. + * 4. Checks if each file has timed out and skips those that haven't. + * 5. Skips files that are currently locked. + * 6. For each eligible file, enters a retry loop to handle ingestion attempts: + * a. Constructs the request buffer for the file. + * b. Compresses the payload if compression is enabled. + * c. Loads necessary ingestion resources. + * d. Performs the queued ingestion to Azure Kusto. + * e. Deletes the file upon successful ingestion. + * 7. Implements exponential backoff with jitter for retries. + * 8. Logs errors and warnings for failed operations and retries. + * 9. If the maximum number of retries is reached, logs an error and either + * deletes or marks the file as inactive based on configuration. + * 10. Logs the end of the upload timer callback. + */ +static void cb_azure_kusto_ingest(struct flb_config *config, void *data) +{ + struct flb_azure_kusto *ctx = data; + struct azure_kusto_file *file = NULL; + struct flb_fstore_file *fsf; + char *buffer = NULL; + size_t buffer_size = 0; + void *final_payload = NULL; + size_t final_payload_size = 0; + struct mk_list *tmp; + struct mk_list *head; + int ret; + time_t now; + flb_sds_t payload; + flb_sds_t tag_sds; + int is_compressed = FLB_FALSE; + int retry_count; + int backoff_time; + int max_backoff_time = 64; // Maximum backoff time in seconds + + // Initialize random seed for staggered refresh intervals + srand(time(NULL)); + + // Log the start of the upload timer callback + flb_plg_debug(ctx->ins, "Running upload timer callback (scheduler_kusto_ingest).."); + now = time(NULL); + + // Iterate over all files in the active stream + mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { + fsf = mk_list_entry(head, struct flb_fstore_file, _head); + file = fsf->data; + flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: Iterating files inside upload timer callback (cb_azure_kusto_ingest).. %s", file->fsf->name); + + // Check if the file has timed out + if (now < (file->create_time + ctx->upload_timeout + ctx->retry_time)) { + continue; // Skip files that haven't timed out + } + + flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: Before file locked check %s", file->fsf->name); + + // Skip locked files + if (file->locked == FLB_TRUE) { + continue; + } + + retry_count = 0; + backoff_time = 2; // Initial backoff time in seconds + + // Retry loop for handling retries + while (retry_count < ctx->scheduler_max_retries) { + flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: Before construct_request_buffer %s", file->fsf->name); + + // Construct the request buffer + ret = construct_request_buffer(ctx, NULL, file, &buffer, &buffer_size); + if (ret < 0) { + flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: Could not construct request buffer for %s", file->fsf->name); + retry_count++; + // Add jitter: random value between 0 and backoff_time + int jitter = rand() % backoff_time; + sleep(backoff_time + jitter); // Exponential backoff with jitter + backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; // Double the backoff time, but cap it + continue; // Retry on failure + } + + payload = flb_sds_create_len(buffer, buffer_size); + tag_sds = flb_sds_create(fsf->meta_buf); + + // Compress the JSON payload if compression is enabled + if (ctx->compression_enabled == FLB_TRUE) { + ret = flb_gzip_compress((void *) payload, flb_sds_len(payload), &final_payload, &final_payload_size); + if (ret != 0) { + flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: cannot gzip payload"); + flb_free(buffer); + flb_sds_destroy(payload); + flb_sds_destroy(tag_sds); + retry_count++; + if (file){ + azure_kusto_store_file_unlock(file); + file->failures += 1; + } + // Add jitter: random value between 0 and backoff_time + int jitter = rand() % backoff_time; + flb_plg_warn(ctx->ins, "scheduler_kusto_ingest :: failed while compressing payload :: Retrying in %d seconds (attempt %d of %d) with jitter %d for file %s", + backoff_time + jitter, retry_count, ctx->scheduler_max_retries, jitter, file->fsf->name); + sleep(backoff_time + jitter); // Exponential backoff with jitter + backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; // Double the backoff time, but cap it + continue; // Retry on failure + } else { + is_compressed = FLB_TRUE; + flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: enabled payload gzip compression"); + } + } else { + final_payload = payload; + final_payload_size = flb_sds_len(payload); + } + + flb_plg_debug(ctx->ins, "scheduler_kusto_ingest ::: tag of the file %s", tag_sds); + + // Load ingestion resources + ret = azure_kusto_load_ingestion_resources(ctx, config); + if (ret != 0) { + flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: cannot load ingestion resources"); + + // Free allocated resources + flb_free(buffer); + flb_sds_destroy(payload); + flb_sds_destroy(tag_sds); + if (is_compressed) { + flb_free(final_payload); + } + + retry_count++; + if (file){ + azure_kusto_store_file_unlock(file); + file->failures += 1; + } + // Add jitter: random value between 0 and backoff_time + int jitter = rand() % backoff_time; + flb_plg_warn(ctx->ins, "scheduler_kusto_ingest :: error loading ingestion resources :: Retrying in %d seconds (attempt %d of %d) with jitter %d for file %s", + backoff_time + jitter, retry_count, ctx->scheduler_max_retries, jitter, file->fsf->name); + sleep(backoff_time + jitter); // Exponential backoff with jitter + backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; // Double the backoff time, but cap it + continue; // Retry on failure + } + + flb_plg_debug(ctx->ins, "scheduler_kusto_ingest ::: before starting kusto queued ingestion %s", file->fsf->name); + + // Perform the queued ingestion + ret = azure_kusto_queued_ingestion(ctx, tag_sds, flb_sds_len(tag_sds), final_payload, final_payload_size, NULL); + if (ret != 0) { + flb_plg_error(ctx->ins, "scheduler_kusto_ingest: Failed to ingest data to kusto"); + + // Free allocated resources + flb_free(buffer); + flb_sds_destroy(payload); + flb_sds_destroy(tag_sds); + if (is_compressed) { + flb_free(final_payload); + } + + retry_count++; + if (file){ + azure_kusto_store_file_unlock(file); + file->failures += 1; + } + // Add jitter: random value between 0 and backoff_time + int jitter = rand() % backoff_time; + flb_plg_warn(ctx->ins, "scheduler_kusto_ingest :: error while ingesting to kusto :: Retrying in %d seconds (attempt %d of %d) with jitter %d for file %s", + backoff_time + jitter, retry_count, ctx->scheduler_max_retries, jitter, file->fsf->name); + sleep(backoff_time + jitter); // Exponential backoff with jitter + backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; // Double the backoff time, but cap it + continue; // Retry on failure + } + + // Delete the file after successful ingestion + ret = azure_kusto_store_file_delete(ctx, file); + if (ret == 0) { + flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: deleted successfully ingested file %s", fsf->name); + } else { + flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: failed to delete ingested file %s", fsf->name); + if (file){ + azure_kusto_store_file_unlock(file); + file->failures += 1; + } + } + + // Free allocated resources + flb_free(buffer); + flb_sds_destroy(payload); + flb_sds_destroy(tag_sds); + if (is_compressed) { + flb_free(final_payload); + } + + // If all operations succeed, break out of the retry loop + break; + } + + // If the maximum number of retries is reached, log an error and move to the next file + if (retry_count >= ctx->scheduler_max_retries) { + flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: Max retries reached for file %s", file->fsf->name); + //azure_kusto_store_file_unlock(file); // Unlock the file for retry in the next iteration + if (ctx->delete_on_max_upload_error){ + azure_kusto_store_file_delete(ctx, file); + }else{ + azure_kusto_store_file_inactive(ctx, file); + } + } + } + // Log the end of the upload timer callback + flb_plg_debug(ctx->ins, "Exited upload timer callback (cb_azure_kusto_ingest).."); +} + + +/** + * Ingest data to Azure Kusto + * + * This function is responsible for preparing and sending data to Azure Kusto for ingestion. + * It constructs a request buffer from the provided data, optionally compresses the payload, + * and then sends it to Azure Kusto using a queued ingestion method. + * + * Parameters: + * - out_context: A pointer to the output context, which is expected to be of type `struct flb_azure_kusto`. + * - new_data: The new data to be ingested, represented as a flexible string descriptor (flb_sds_t). + * - upload_file: A pointer to an `azure_kusto_file` structure that contains information about the file to be uploaded. + * - tag: A constant character pointer representing the tag associated with the data. + * - tag_len: An integer representing the length of the tag. + * + * Returns: + * - 0 on successful ingestion. + * - -1 if an error occurs during buffer construction, compression, or ingestion. + * + * The function performs the following steps: + * 1. Constructs a request buffer from the provided data and upload file information. + * 2. Creates a payload from the buffer and frees the buffer memory. + * 3. Optionally compresses the payload using gzip if compression is enabled in the context. + * 4. Calls the `azure_kusto_queued_ingestion` function to send the payload to Azure Kusto. + * 5. Cleans up allocated resources, including destroying the payload and tag strings, and freeing the compressed payload if applicable. + */ +static int ingest_to_kusto(void *out_context, flb_sds_t new_data, + struct azure_kusto_file *upload_file, + const char *tag, int tag_len) +{ + int ret; + char *buffer = NULL; + size_t buffer_size; + struct flb_azure_kusto *ctx = out_context; + flb_sds_t payload = NULL; + void *final_payload = NULL; + size_t final_payload_size = 0; + int is_compressed = FLB_FALSE; + flb_sds_t tag_sds = flb_sds_create_len(tag, tag_len); + + /* Create buffer */ + ret = construct_request_buffer(ctx, new_data, upload_file, &buffer, &buffer_size); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not construct request buffer for %s", + upload_file->fsf->name); + return -1; + } + payload = flb_sds_create_len(buffer, buffer_size); + flb_free(buffer); + + /* Compress the JSON payload */ + if (ctx->compression_enabled == FLB_TRUE) { + ret = flb_gzip_compress((void *) payload, flb_sds_len(payload), + &final_payload, &final_payload_size); + if (ret != 0) { + flb_plg_error(ctx->ins, + "cannot gzip payload"); + flb_sds_destroy(payload); + flb_sds_destroy(tag_sds); + //pthread_mutex_unlock(&ctx->buffer_mutex); + return -1; + } + else { + is_compressed = FLB_TRUE; + flb_plg_debug(ctx->ins, "enabled payload gzip compression"); + /* JSON buffer will be cleared at cleanup: */ + } + } else { + final_payload = payload; + final_payload_size = flb_sds_len(payload); + } + + // Call azure_kusto_queued_ingestion to ingest the payload + ret = azure_kusto_queued_ingestion(ctx, tag_sds, tag_len, final_payload, final_payload_size, upload_file); + if (ret != 0) { + flb_plg_error(ctx->ins, "Failed to ingest data to Azure Kusto"); + flb_sds_destroy(tag_sds); + flb_sds_destroy(payload); + if (is_compressed) { + flb_free(final_payload); + } + return -1; + } + + flb_sds_destroy(tag_sds); + flb_sds_destroy(payload); + if (is_compressed) { + flb_free(final_payload); + } + + return 0; +} + +/** + * Initializes the Azure Kusto output plugin. + * + * This function sets up the necessary configurations and resources for the Azure Kusto + * output plugin to function correctly. It performs the following tasks: + * + * 1. Creates a configuration context for the plugin using the provided instance and config. + * 2. Initializes local storage if buffering is enabled, ensuring that the storage directory + * is set up and any existing buffered data is accounted for. + * 3. Validates the configured file size for uploads, ensuring it meets the minimum and + * maximum constraints. + * 4. Sets up network configurations, including enabling IPv6 if specified. + * 5. Initializes mutexes for thread-safe operations related to OAuth tokens and resource + * management. + * 6. Creates an upstream context for connecting to the Kusto Ingestion endpoint, configuring + * it for synchronous or asynchronous operation based on buffering settings. + * 7. If IMDS (Instance Metadata Service) is used, creates an upstream context for it. + * 8. Establishes an OAuth2 context for handling authentication with Azure services. + * 9. Associates the upstream context with the output instance for data transmission. + * + * The function returns 0 on successful initialization or -1 if any step fails. + * + * @param ins The output instance to initialize. + * @param config The configuration context for Fluent Bit. + * @param data Additional data passed to the initialization function. + * + * @return 0 on success, -1 on failure. + */ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { int io_flags = FLB_IO_TLS; struct flb_azure_kusto *ctx; + flb_plg_debug(ins, "inside azure kusto init"); + /* Create config context */ ctx = flb_azure_kusto_conf_create(ins, config); if (!ctx) { @@ -233,6 +1032,38 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi return -1; } + if (ctx->buffering_enabled == FLB_TRUE) { + ctx->ins = ins; + ctx->retry_time = 0; + + /* Initialize local storage */ + int ret = azure_kusto_store_init(ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to initialize kusto storage: %s", + ctx->store_dir); + return -1; + } + ctx->has_old_buffers = azure_kusto_store_has_data(ctx); + + /* validate 'total_file_size' */ + if (ctx->file_size <= 0) { + flb_plg_error(ctx->ins, "Failed to parse upload_file_size"); + return -1; + } + if (ctx->file_size < 1000000) { + flb_plg_error(ctx->ins, "upload_file_size must be at least 1MB"); + return -1; + } + if (ctx->file_size > MAX_FILE_SIZE) { + flb_plg_error(ctx->ins, "Max total_file_size must be lower than %ld bytes", MAX_FILE_SIZE); + return -1; + } + + ctx->timer_created = FLB_FALSE; + ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000; + flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size); + } + flb_output_set_context(ins, ctx); /* Network mode IPv6 */ @@ -251,14 +1082,32 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi * Create upstream context for Kusto Ingestion endpoint */ ctx->u = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls); + if (ctx->buffering_enabled == FLB_TRUE){ + flb_stream_disable_flags(&ctx->u->base, FLB_IO_ASYNC); + ctx->u->base.net.io_timeout = ctx->io_timeout; + ctx->has_old_buffers = azure_kusto_store_has_data(ctx); + } if (!ctx->u) { flb_plg_error(ctx->ins, "upstream creation failed"); return -1; } + flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base)); + /* Create oauth2 context */ + if(ctx->use_imds == FLB_TRUE){ + ctx->imds_upstream = + flb_upstream_create(config, "169.254.169.254", 80, FLB_IO_TCP, NULL); + if (!ctx->imds_upstream) { + flb_plg_error(ctx->ins, "cannot create imds upstream"); + return -1; + } + if (ctx->buffering_enabled == FLB_TRUE){ + flb_stream_disable_flags(&ctx->imds_upstream->base, FLB_IO_ASYNC); + } + } ctx->o = - flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH); + flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH); if (!ctx->o) { flb_plg_error(ctx->ins, "cannot create oauth2 context"); return -1; @@ -270,6 +1119,24 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi return 0; } + +/** + * This function formats log data for Azure Kusto ingestion. + * It processes a batch of log records, encodes them in a specific format, + * and outputs the formatted data. + * + * Parameters: + * - ctx: Context containing configuration and state for Azure Kusto. + * - tag: A string tag associated with the log data. + * - tag_len: Length of the tag string. + * - data: Pointer to the raw log data in msgpack format. + * - bytes: Size of the raw log data. + * - out_data: Pointer to store the formatted output data. + * - out_size: Pointer to store the size of the formatted output data. + * + * Returns: + * - 0 on success, or -1 on error. + */ static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int tag_len, const void *data, size_t bytes, void **out_data, size_t *out_size) @@ -277,16 +1144,13 @@ static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int int records = 0; msgpack_sbuffer mp_sbuf; msgpack_packer mp_pck; - /* for sub msgpack objs */ - int map_size; struct tm tms; char time_formatted[32]; size_t s; int len; struct flb_log_event_decoder log_decoder; - struct flb_log_event log_event; - int ret; - /* output buffer */ + struct flb_log_event log_event; + int ret; flb_sds_t out_buf; /* Create array for all records */ @@ -297,11 +1161,16 @@ static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int } ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); + flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret); + return -1; + } + /* Initialize the output buffer */ + out_buf = flb_sds_create_size(1024); + if (!out_buf) { + flb_plg_error(ctx->ins, "error creating output buffer"); + flb_log_event_decoder_destroy(&log_decoder); return -1; } @@ -309,16 +1178,13 @@ static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int msgpack_sbuffer_init(&mp_sbuf); msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); - msgpack_pack_array(&mp_pck, records); + while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + msgpack_sbuffer_clear(&mp_sbuf); - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - map_size = 1; + int map_size = 1; if (ctx->include_time_key == FLB_TRUE) { map_size++; } - if (ctx->include_tag_key == FLB_TRUE) { map_size++; } @@ -330,14 +1196,10 @@ static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int msgpack_pack_str(&mp_pck, flb_sds_len(ctx->time_key)); msgpack_pack_str_body(&mp_pck, ctx->time_key, flb_sds_len(ctx->time_key)); - /* Append the time value as ISO 8601 */ gmtime_r(&log_event.timestamp.tm.tv_sec, &tms); - s = strftime(time_formatted, sizeof(time_formatted) - 1, - FLB_PACK_JSON_DATE_ISO8601_FMT, &tms); - - len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, - ".%03" PRIu64 "Z", - (uint64_t)log_event.timestamp.tm.tv_nsec / 1000000); + s = strftime(time_formatted, sizeof(time_formatted) - 1, FLB_PACK_JSON_DATE_ISO8601_FMT, &tms); + len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, ".%03" PRIu64 "Z", + (uint64_t) log_event.timestamp.tm.tv_nsec / 1000000); s += len; msgpack_pack_str(&mp_pck, s); msgpack_pack_str_body(&mp_pck, time_formatted, s); @@ -353,38 +1215,157 @@ static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int msgpack_pack_str(&mp_pck, flb_sds_len(ctx->log_key)); msgpack_pack_str_body(&mp_pck, ctx->log_key, flb_sds_len(ctx->log_key)); - msgpack_pack_object(&mp_pck, *log_event.body); - } - /* Convert from msgpack to JSON */ - out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + if (log_event.body != NULL) { + msgpack_pack_object(&mp_pck, *log_event.body); + } else { + msgpack_pack_str(&mp_pck, 20); + msgpack_pack_str_body(&mp_pck, "log_attribute_missing", 20); + } - /* Cleanup */ - flb_log_event_decoder_destroy(&log_decoder); - msgpack_sbuffer_destroy(&mp_sbuf); + flb_sds_t json_record = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + if (!json_record) { + flb_plg_error(ctx->ins, "error converting msgpack to JSON"); + flb_sds_destroy(out_buf); + msgpack_sbuffer_destroy(&mp_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + return -1; + } - if (!out_buf) { - flb_plg_error(ctx->ins, "error formatting JSON payload"); - return -1; + /* Concatenate the JSON record to the output buffer */ + out_buf = flb_sds_cat(out_buf, json_record, flb_sds_len(json_record)); + out_buf = flb_sds_cat(out_buf, "\n", 1); + + flb_sds_destroy(json_record); } + msgpack_sbuffer_destroy(&mp_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + *out_data = out_buf; *out_size = flb_sds_len(out_buf); return 0; } +static int buffer_chunk(void *out_context, struct azure_kusto_file *upload_file, + flb_sds_t chunk, int chunk_size, + flb_sds_t tag, size_t tag_len) +{ + int ret; + struct flb_azure_kusto *ctx = out_context; + + flb_plg_trace(ctx->ins, "Buffering chunk %d", chunk_size); + + ret = azure_kusto_store_buffer_put(ctx, upload_file, tag, + tag_len, chunk, chunk_size); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not buffer chunk. "); + return -1; + } + return 0; +} + +/** + * @brief Initialize the flush process for Azure Kusto output plugin. + * + * This function is responsible for setting up the initial conditions required + * for flushing data to Azure Kusto. It performs the following tasks: + * + * 1. **Old Buffer Cleanup**: Checks if there are any old buffers from previous + * executions that need to be sent to Azure Kusto. If such buffers exist, it + * attempts to ingest all chunks of data. If the ingestion fails, it logs an + * error and marks the buffers to be retried later. + * + * 2. **Upload Timer Setup**: If not already created, it sets up a periodic timer + * that checks for uploads ready for completion. This timer is crucial for + * ensuring that data is uploaded at regular intervals. + * + * @param out_context Pointer to the output context, specifically the Azure Kusto context. + * @param config Pointer to the Fluent Bit configuration structure. + */ +static void flush_init(void *out_context, struct flb_config *config) +{ + int ret; + struct flb_azure_kusto *ctx = out_context; + struct flb_sched *sched; + + flb_plg_debug(ctx->ins, + "inside flush_init with old_buffers as %d", + ctx->has_old_buffers); + + /* clean up any old buffers found on startup */ + if (ctx->has_old_buffers == FLB_TRUE) { + flb_plg_info(ctx->ins, + "Sending locally buffered data from previous " + "executions to kusto; buffer=%s", + ctx->fs->root_path); + ctx->has_old_buffers = FLB_FALSE; + ret = ingest_all_chunks(ctx, config); + if (ret < 0) { + ctx->has_old_buffers = FLB_TRUE; + flb_plg_error(ctx->ins, + "Failed to send locally buffered data left over " + "from previous executions; will retry. Buffer=%s", + ctx->fs->root_path); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + }else{ + flb_plg_debug(ctx->ins, + "Did not find any local buffered data from previous " + "executions to kusto; buffer=%s", + ctx->fs->root_path); + } + + /* + * create a timer that will run periodically and check if uploads + * are ready for completion + * this is created once on the first flush + */ + if (ctx->timer_created == FLB_FALSE) { + flb_plg_debug(ctx->ins, + "Creating upload timer with frequency %ds", + ctx->timer_ms / 1000); + + sched = flb_sched_ctx_get(); + + ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, + ctx->timer_ms, cb_azure_kusto_ingest, ctx, NULL); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to create upload timer"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + ctx->timer_created = FLB_TRUE; + } +} + +/** + * This function handles the flushing of event data to Azure Kusto. + * It manages both buffered and non-buffered modes, handles JSON formatting, + * compression, and manages file uploads based on conditions like timeout and file size. + * + * @param event_chunk The event chunk containing the data to be flushed. + * @param out_flush The output flush context. + * @param i_ins The input instance (unused). + * @param out_context The output context, specifically for Azure Kusto. + * @param config The configuration context (unused). + */ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *i_ins, void *out_context, struct flb_config *config) { int ret; - flb_sds_t json; + flb_sds_t json = NULL; size_t json_size; size_t tag_len; struct flb_azure_kusto *ctx = out_context; int is_compressed = FLB_FALSE; + struct azure_kusto_file *upload_file = NULL; + int upload_timeout_check = FLB_FALSE; + int total_file_size_check = FLB_FALSE; + flb_sds_t tag_name = NULL; + size_t tag_name_len; (void)i_ins; (void)config; @@ -392,77 +1373,270 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, void *final_payload = NULL; size_t final_payload_size = 0; - flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size); + flb_plg_debug(ctx->ins, "flushing bytes for event tag %s and size %zu", event_chunk->tag ,event_chunk->size); + /* Get the length of the event tag */ tag_len = flb_sds_len(event_chunk->tag); - /* Load or refresh ingestion resources */ - ret = azure_kusto_load_ingestion_resources(ctx, config); - flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot load ingestion resources"); - FLB_OUTPUT_RETURN(FLB_RETRY); - } + if (ctx->buffering_enabled == FLB_TRUE) { + /* Determine the tag name based on the unify_tag setting */ + if (ctx->unify_tag == FLB_TRUE){ + tag_name = flb_sds_create("fluentbit-buffer-file-unify-tag.log"); + }else { + tag_name = event_chunk->tag; + } + tag_name_len = flb_sds_len(tag_name); + /* Initialize the flush process */ + flush_init(ctx,config); - /* Reformat msgpack to JSON payload */ - ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data, - event_chunk->size, (void **)&json, &json_size); - flb_plg_trace(ctx->ins, "format: ret=%d", ret); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot reformat data into json"); - FLB_OUTPUT_RETURN(FLB_RETRY); - } + /* Reformat msgpack to JSON payload */ + ret = azure_kusto_format(ctx, tag_name, tag_name_len, event_chunk->data, + event_chunk->size, (void **)&json, &json_size); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot reformat data into json"); + ret = FLB_RETRY; + goto error; + } - /* Map buffer */ - final_payload = json; - final_payload_size = json_size; - if (ctx->compression_enabled == FLB_TRUE) { - ret = flb_gzip_compress((void *) json, json_size, - &final_payload, &final_payload_size); + /* Get a file candidate matching the given 'tag' */ + upload_file = azure_kusto_store_file_get(ctx, + tag_name, + tag_name_len); + + /* Check if the file has failed to upload too many times */ + if (upload_file != NULL && upload_file->failures >= ctx->scheduler_max_retries) { + flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not " + "retry", event_chunk->tag, ctx->scheduler_max_retries); + if (ctx->delete_on_max_upload_error){ + azure_kusto_store_file_delete(ctx, upload_file); + }else{ + azure_kusto_store_file_inactive(ctx, upload_file); + } + upload_file = NULL; + } + + /* Check if the upload timeout has elapsed */ + if (upload_file != NULL && time(NULL) > + (upload_file->create_time + ctx->upload_timeout)) { + upload_timeout_check = FLB_TRUE; + flb_plg_trace(ctx->ins, "upload_timeout reached for %s", + event_chunk->tag); + } + + /* Check if the total file size has been exceeded */ + if (upload_file && upload_file->size + json_size > ctx->file_size) { + flb_plg_trace(ctx->ins, "total_file_size exceeded %s", + event_chunk->tag); + total_file_size_check = FLB_TRUE; + } + + /* If the file is ready for upload */ + if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) { + flb_plg_debug(ctx->ins, "uploading file %s with size %zu", upload_file->fsf->name, upload_file->size); + /* Load or refresh ingestion resources */ + ret = azure_kusto_load_ingestion_resources(ctx, config); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot load ingestion resources"); + ret = FLB_RETRY; + goto error; + } + + /* Ingest data to kusto */ + ret = ingest_to_kusto(ctx, json, upload_file, + tag_name, + tag_name_len); + + if (ret == 0){ + if (ctx->buffering_enabled == FLB_TRUE && ctx->buffer_file_delete_early == FLB_TRUE){ + flb_plg_debug(ctx->ins, "buffer file already deleted after blob creation"); + ret = FLB_OK; + goto cleanup; + }else{ + ret = azure_kusto_store_file_delete(ctx, upload_file); + if (ret != 0){ + /* File couldn't be deleted */ + ret = FLB_RETRY; + if (upload_file){ + azure_kusto_store_file_unlock(upload_file); + upload_file->failures += 1; + } + goto error; + } else{ + /* File deleted successfully */ + ret = FLB_OK; + goto cleanup; + } + } + }else{ + flb_plg_error(ctx->ins, "azure_kusto:: unable to ingest data into kusto : retrying"); + ret = FLB_RETRY; + if (upload_file){ + azure_kusto_store_file_unlock(upload_file); + upload_file->failures += 1; + } + goto cleanup; + } + } + + /* Buffer the current chunk in the filesystem */ + ret = buffer_chunk(ctx, upload_file, json, json_size, + tag_name, tag_name_len); + + if (ret == 0) { + flb_plg_debug(ctx->ins, "buffered chunk %s", event_chunk->tag); + ret = FLB_OK; + } else { + flb_plg_error(ctx->ins, "failed to buffer chunk %s", event_chunk->tag); + ret = FLB_RETRY; + } + goto cleanup; + + } else { + /* Buffering mode is disabled, proceed with regular flush */ + + /* Reformat msgpack data to JSON payload */ + ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data, + event_chunk->size, (void **)&json, &json_size); if (ret != 0) { - flb_plg_error(ctx->ins, - "cannot gzip payload"); - flb_sds_destroy(json); - FLB_OUTPUT_RETURN(FLB_RETRY); + flb_plg_error(ctx->ins, "cannot reformat data into json"); + ret = FLB_RETRY; + goto error; } - else { - is_compressed = FLB_TRUE; - /* JSON buffer will be cleared at cleanup: */ + + flb_plg_debug(ctx->ins, "payload size before compression %zu", json_size); + /* Map buffer */ + final_payload = json; + final_payload_size = json_size; + /* Check if compression is enabled */ + if (ctx->compression_enabled == FLB_TRUE) { + ret = flb_gzip_compress((void *) json, json_size, + &final_payload, &final_payload_size); + if (ret != 0) { + flb_plg_error(ctx->ins, + "cannot gzip payload"); + ret = FLB_ERROR; + goto error; + } + else { + is_compressed = FLB_TRUE; + flb_plg_debug(ctx->ins, "enabled payload gzip compression"); + /* JSON buffer will be cleared at cleanup: */ + } + } + flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size); + + /* Load or refresh ingestion resources */ + ret = azure_kusto_load_ingestion_resources(ctx, config); + flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot load ingestion resources"); + ret = FLB_RETRY; + goto error; + } + + /* Perform queued ingestion to Kusto */ + ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL); + flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot perform queued ingestion"); + ret = FLB_RETRY; + goto error; } + + ret = FLB_OK; + goto cleanup; } - flb_plg_trace(ctx->ins, "payload size before compression %zu & after compression %zu ", json_size ,final_payload_size); - ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size); - flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot perform queued ingestion"); + + cleanup: + /* Cleanup resources */ + if (json) { flb_sds_destroy(json); - FLB_OUTPUT_RETURN(FLB_RETRY); } + if (is_compressed && final_payload) { + flb_free(final_payload); + } + if (tag_name) { + flb_sds_destroy(tag_name); + } + FLB_OUTPUT_RETURN(ret); - /* Cleanup */ - flb_sds_destroy(json); - - /* release compressed payload */ - if (is_compressed == FLB_TRUE) { + error: + /* Error handling and cleanup */ + if (json) { + flb_sds_destroy(json); + } + if (is_compressed && final_payload) { flb_free(final_payload); } - /* Done */ - FLB_OUTPUT_RETURN(FLB_OK); + if (tag_name) { + flb_sds_destroy(tag_name); + } + FLB_OUTPUT_RETURN(ret); } +/** + * cb_azure_kusto_exit - Clean up and finalize the Azure Kusto plugin context. + * + * This function is responsible for performing cleanup operations when the + * Azure Kusto plugin is exiting. It ensures that all resources are properly + * released and any remaining data is sent to Azure Kusto before the plugin + * shuts down. + * + * Functionality: + * - Checks if the plugin context (`ctx`) is valid. If not, it returns an error. + * - If there is locally buffered data, it attempts to send all chunks to Azure + * Kusto using the `ingest_all_chunks` function. Logs an error if the operation + * fails. + * - Destroys any active upstream connections (`ctx->u` and `ctx->imds_upstream`) + * to free network resources. + * - Destroys mutexes (`resources_mutex`, `token_mutex`, `blob_mutex`) to ensure + * proper synchronization cleanup. + * - Calls `azure_kusto_store_exit` to perform any additional storage-related + * cleanup operations. + * - Finally, it calls `flb_azure_kusto_conf_destroy` to free the plugin context + * and its associated resources. + * + * Parameters: + * - data: A pointer to the plugin context (`struct flb_azure_kusto`). + * - config: A pointer to the Fluent Bit configuration (`struct flb_config`). + * + * Returns: + * - 0 on successful cleanup. + * - -1 if the context is invalid or if an error occurs during cleanup. + */ static int cb_azure_kusto_exit(void *data, struct flb_config *config) { struct flb_azure_kusto *ctx = data; + int ret = -1; if (!ctx) { return -1; } + + if (ctx->buffering_enabled == FLB_TRUE){ + if (azure_kusto_store_has_data(ctx) == FLB_TRUE) { + flb_plg_info(ctx->ins, "Sending all locally buffered data to Kusto"); + ret = ingest_all_chunks(ctx, config); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not send all chunks on exit"); + } + } + azure_kusto_store_exit(ctx); + } + if (ctx->u) { flb_upstream_destroy(ctx->u); ctx->u = NULL; } + if (ctx->imds_upstream) { + flb_upstream_destroy(ctx->imds_upstream); + ctx->imds_upstream = NULL; + } + + + // Destroy the mutexes pthread_mutex_destroy(&ctx->resources_mutex); pthread_mutex_destroy(&ctx->token_mutex); pthread_mutex_destroy(&ctx->blob_mutex); @@ -473,67 +1647,117 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config) } static struct flb_config_map config_map[] = { - {FLB_CONFIG_MAP_STR, "tenant_id", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, tenant_id), - "Set the tenant ID of the AAD application used for authentication"}, - {FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, client_id), - "Set the client ID (Application ID) of the AAD application used for authentication"}, - {FLB_CONFIG_MAP_STR, "client_secret", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, client_secret), - "Set the client secret (Application Password) of the AAD application used for " - "authentication"}, - {FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, ingestion_endpoint), - "Set the Kusto cluster's ingestion endpoint URL (e.g. " - "https://ingest-mycluster.eastus.kusto.windows.net)"}, - {FLB_CONFIG_MAP_STR, "database_name", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, database_name), "Set the database name"}, - {FLB_CONFIG_MAP_STR, "table_name", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, table_name), "Set the table name"}, - {FLB_CONFIG_MAP_STR, "ingestion_mapping_reference", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, ingestion_mapping_reference), - "Set the ingestion mapping reference"}, - {FLB_CONFIG_MAP_STR, "log_key", FLB_AZURE_KUSTO_DEFAULT_LOG_KEY, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, log_key), "The key name of event payload"}, - {FLB_CONFIG_MAP_BOOL, "include_tag_key", "true", 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, include_tag_key), - "If enabled, tag is appended to output. " - "The key name is used 'tag_key' property."}, - {FLB_CONFIG_MAP_STR, "tag_key", FLB_AZURE_KUSTO_DEFAULT_TAG_KEY, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, tag_key), - "The key name of tag. If 'include_tag_key' is false, " - "This property is ignored"}, - {FLB_CONFIG_MAP_BOOL, "include_time_key", "true", 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, include_time_key), - "If enabled, time is appended to output. " - "The key name is used 'time_key' property."}, - {FLB_CONFIG_MAP_STR, "time_key", FLB_AZURE_KUSTO_DEFAULT_TIME_KEY, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, time_key), - "The key name of the time. If 'include_time_key' is false, " - "This property is ignored"}, - {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), - "Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds." - "The default is 60 seconds."}, - {FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, compression_enabled), - "Enable HTTP payload compression (gzip)." - "The default is true."}, - {FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE, - offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval), - "Set the azure kusto ingestion resources refresh interval" - "The default is 3600 seconds."}, + {FLB_CONFIG_MAP_STR, "tenant_id", (char *)NULL, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, tenant_id), + "Set the tenant ID of the AAD application used for authentication"}, + {FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, client_id), + "Set the client ID (Application ID) of the AAD application used for authentication"}, + {FLB_CONFIG_MAP_STR, "client_secret", (char *)NULL, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, client_secret), + "Set the client secret (Application Password) of the AAD application used for " + "authentication"}, + {FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, ingestion_endpoint), + "Set the Kusto cluster's ingestion endpoint URL (e.g. " + "https://ingest-mycluster.eastus.kusto.windows.net)"}, + {FLB_CONFIG_MAP_STR, "database_name", (char *)NULL, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, database_name), "Set the database name"}, + {FLB_CONFIG_MAP_STR, "table_name", (char *)NULL, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, table_name), "Set the table name"}, + {FLB_CONFIG_MAP_STR, "ingestion_mapping_reference", (char *)NULL, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, ingestion_mapping_reference), + "Set the ingestion mapping reference"}, + {FLB_CONFIG_MAP_STR, "log_key", FLB_AZURE_KUSTO_DEFAULT_LOG_KEY, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, log_key), "The key name of event payload"}, + {FLB_CONFIG_MAP_BOOL, "include_tag_key", "true", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, include_tag_key), + "If enabled, tag is appended to output. " + "The key name is used 'tag_key' property."}, + {FLB_CONFIG_MAP_STR, "tag_key", FLB_AZURE_KUSTO_DEFAULT_TAG_KEY, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, tag_key), + "The key name of tag. If 'include_tag_key' is false, " + "This property is ignored"}, + {FLB_CONFIG_MAP_BOOL, "include_time_key", "true", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, include_time_key), + "If enabled, time is appended to output. " + "The key name is used 'time_key' property."}, + {FLB_CONFIG_MAP_STR, "time_key", FLB_AZURE_KUSTO_DEFAULT_TIME_KEY, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, time_key), + "The key name of the time. If 'include_time_key' is false, " + "This property is ignored"}, + {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), + "Set the ingestion endpoint connection timeout in seconds"}, + {FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)." + }, + {FLB_CONFIG_MAP_BOOL, "buffering_enabled", "false", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, buffering_enabled), "Enable buffering into disk before ingesting into Azure Kusto." + }, + {FLB_CONFIG_MAP_STR, "buffer_dir", "/tmp/fluent-bit/azure-kusto/", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, buffer_dir), "Specifies the location of directory where the buffered data will be stored." + }, + {FLB_CONFIG_MAP_TIME, "upload_timeout", "30m", + 0, FLB_TRUE, offsetof(struct flb_azure_kusto, upload_timeout), + "Optionally specify a timeout for uploads. " + "Fluent Bit will start ingesting buffer files which have been created more than x minutes and haven't reached upload_file_size limit yet. " + " Default is 30m." + }, + {FLB_CONFIG_MAP_SIZE, "upload_file_size", "200M", + 0, FLB_TRUE, offsetof(struct flb_azure_kusto, file_size), + "Specifies the size of files to be uploaded in MBs. Default is 200MB" + }, + {FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE, + offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval), + "Set the azure kusto ingestion resources refresh interval" + }, + {FLB_CONFIG_MAP_STR, "azure_kusto_buffer_key", "key",0, FLB_TRUE, + offsetof(struct flb_azure_kusto, azure_kusto_buffer_key), + "Set the azure kusto buffer key which needs to be specified when using multiple instances of azure kusto output plugin and buffering is enabled" + }, + {FLB_CONFIG_MAP_SIZE, "store_dir_limit_size", FLB_AZURE_KUSTO_BUFFER_DIR_MAX_SIZE,0, FLB_TRUE, + offsetof(struct flb_azure_kusto, store_dir_limit_size), + "Set the max size of the buffer directory. Default is 8GB" + }, + {FLB_CONFIG_MAP_BOOL, "buffer_file_delete_early", "false",0, FLB_TRUE, + offsetof(struct flb_azure_kusto, buffer_file_delete_early), + "Whether to delete the buffered file early after successful blob creation. Default is false" + }, + {FLB_CONFIG_MAP_BOOL, "unify_tag", "false",0, FLB_TRUE, + offsetof(struct flb_azure_kusto, unify_tag), + "This creates a single buffer file when the buffering mode is ON. Default is false" + }, + {FLB_CONFIG_MAP_INT, "blob_uri_length", "64",0, FLB_TRUE, + offsetof(struct flb_azure_kusto, blob_uri_length), + "Set the length of generated blob uri before ingesting to kusto. Default is 64" + }, + {FLB_CONFIG_MAP_INT, "scheduler_max_retries", "3",0, FLB_TRUE, + offsetof(struct flb_azure_kusto, scheduler_max_retries), + "Set the maximum number of retries for ingestion using the scheduler. Default is 3" + }, + {FLB_CONFIG_MAP_BOOL, "use_imds", "false",0, FLB_TRUE, + offsetof(struct flb_azure_kusto, use_imds), + "Whether to use IMDS to retrieve oauth token. Default is false" + }, + {FLB_CONFIG_MAP_BOOL, "delete_on_max_upload_error", "false",0, FLB_TRUE, + offsetof(struct flb_azure_kusto, delete_on_max_upload_error), + "Whether to delete the buffer file on maximum upload errors. Default is false" + }, + {FLB_CONFIG_MAP_TIME, "io_timeout", "60s",0, FLB_TRUE, + offsetof(struct flb_azure_kusto, io_timeout), + "HTTP IO timeout. Default is 60s" + }, /* EOF */ - {0}}; + {0}}; struct flb_output_plugin out_azure_kusto_plugin = { - .name = "azure_kusto", - .description = "Send events to Kusto (Azure Data Explorer)", - .cb_init = cb_azure_kusto_init, - .cb_flush = cb_azure_kusto_flush, - .cb_exit = cb_azure_kusto_exit, - .config_map = config_map, - /* Plugin flags */ - .flags = FLB_OUTPUT_NET | FLB_IO_TLS, -}; + .name = "azure_kusto", + .description = "Send events to Kusto (Azure Data Explorer)", + .cb_init = cb_azure_kusto_init, + .cb_flush = cb_azure_kusto_flush, + .cb_exit = cb_azure_kusto_exit, + .config_map = config_map, + /* Plugin flags */ + .flags = FLB_OUTPUT_NET | FLB_IO_TLS, +}; \ No newline at end of file diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index 9e3eb7b3182..ff80c94e818 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -26,6 +26,13 @@ #include #include +#include +#include +#include +#include +#include +#include + /* refresh token every 50 minutes */ #define FLB_AZURE_KUSTO_TOKEN_REFRESH 3000 @@ -53,6 +60,15 @@ #define FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT "60" +#define FLB_AZURE_KUSTO_BUFFER_DIR_MAX_SIZE "8G" // 8GB buffer directory size +#define UPLOAD_TIMER_MAX_WAIT 180000 +#define UPLOAD_TIMER_MIN_WAIT 18000 +#define MAX_FILE_SIZE 4000000000 // 4GB + +#define FLB_AZURE_IMDS_ENDPOINT "/metadata/identity/oauth2/token" +#define FLB_AZURE_IMDS_API_VERSION "2018-02-01" +#define FLB_AZURE_IMDS_RESOURCE "https://api.kusto.windows.net/" + struct flb_azure_kusto_resources { struct flb_upstream_ha *blob_ha; @@ -60,7 +76,7 @@ struct flb_azure_kusto_resources { flb_sds_t identity_token; /* used to reload resouces after some time */ - time_t load_time; + uint64_t load_time; }; struct flb_azure_kusto { @@ -74,6 +90,7 @@ struct flb_azure_kusto { flb_sds_t ingestion_mapping_reference; int ingestion_endpoint_connect_timeout; + int io_timeout; /* compress payload */ int compression_enabled; @@ -87,14 +104,19 @@ struct flb_azure_kusto { int include_time_key; flb_sds_t time_key; - /* --- internal data --- */ + flb_sds_t azure_kusto_buffer_key; - flb_sds_t ingestion_mgmt_endpoint; + /* --- internal data --- */ /* oauth2 context */ flb_sds_t oauth_url; + //flb_sds_t imds_url; + int use_imds; struct flb_oauth2 *o; + int timer_created; + int timer_ms; + /* mutex for acquiring oauth tokens */ pthread_mutex_t token_mutex; @@ -106,9 +128,36 @@ struct flb_azure_kusto { pthread_mutex_t blob_mutex; + pthread_mutex_t buffer_mutex; + + int buffering_enabled; + + size_t file_size; + time_t upload_timeout; + time_t retry_time; + + int buffer_file_delete_early; + int unify_tag; + int blob_uri_length; + int scheduler_max_retries; + int delete_on_max_upload_error; + + int has_old_buffers; + size_t store_dir_limit_size; + /* track the total amount of buffered data */ + size_t current_buffer_size; + flb_sds_t buffer_dir; + char *store_dir; + struct flb_fstore *fs; + struct flb_fstore_stream *stream_active; /* default active stream */ + struct flb_fstore_stream *stream_upload; + + /* Upstream connection to the backend server */ struct flb_upstream *u; + struct flb_upstream *imds_upstream; + /* Fluent Bit context */ struct flb_config *config; @@ -119,4 +168,4 @@ struct flb_azure_kusto { flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx); flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl); -#endif +#endif \ No newline at end of file diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index d25f11b15c3..c1384c52c46 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -17,20 +17,27 @@ * limitations under the License. */ -#include #include #include #include #include #include -#include -#include #include #include +#include +#include +#include #include "azure_kusto.h" #include "azure_kusto_conf.h" +/* Constants for PCG random number generator */ +#define PCG_DEFAULT_MULTIPLIER_64 6364136223846793005ULL +#define PCG_DEFAULT_INCREMENT_64 1442695040888963407ULL + +/* PCG random number generator state */ +typedef struct { uint64_t state; uint64_t inc; } pcg32_random_t; + static struct flb_upstream_node *flb_upstream_node_create_url(struct flb_azure_kusto *ctx, struct flb_config *config, const char *url) @@ -160,7 +167,7 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi struct flb_upstream_ha *queue_ha) { jsmn_parser parser; - jsmntok_t *t; + jsmntok_t *t = NULL; jsmntok_t *tokens = NULL; int ret = -1; int i; @@ -171,7 +178,7 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi int resource_type; struct flb_upstream_node *node; struct flb_upstream_ha *ha; - flb_sds_t resource_uri; + flb_sds_t resource_uri = NULL; /* Response is a json in the form of * { @@ -195,7 +202,7 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi resource_uri = flb_sds_create(NULL); if (!resource_uri) { flb_plg_error(ctx->ins, "error allocating resource uri buffer"); - return -1; + goto cleanup; } jsmn_init(&parser); @@ -204,9 +211,8 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi tokens = flb_calloc(1, sizeof(jsmntok_t) * (flb_sds_len(response))); if (!tokens) { - flb_errno(); /* Log the error using flb_errno() */ - flb_plg_error(ctx->ins, "failed to allocate memory for tokens"); - return -1; + flb_plg_error(ctx->ins, "error allocating tokens"); + goto cleanup; } if (tokens) { @@ -215,7 +221,7 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi if (ret > 0) { /* skip all tokens until we reach "Rows" */ for (i = 0; i < ret - 1; i++) { - t = &tokens[i]; + t = &tokens[i]; if (t->type != JSMN_STRING) { continue; @@ -238,7 +244,7 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi * values, the first value containing the resource type, and the second value * containing the resource uri */ for (; i < ret; i++) { - t = &tokens[i]; + t = &tokens[i]; /** * each token should be an array with 2 strings: @@ -269,8 +275,8 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi strncmp(token_str, "SecuredReadyForAggregationQueue", 31) == 0) { resource_type = AZURE_KUSTO_RESOURCE_QUEUE; } - /* we don't care about other resources so we just skip the next token and - move on to the next pair */ + /* we don't care about other resources so we just skip the next token and + move on to the next pair */ else { i++; continue; @@ -288,6 +294,11 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi token_str_len = (t->end - t->start); resource_uri = flb_sds_copy(resource_uri, token_str, token_str_len); + if (!resource_uri) { + flb_plg_error(ctx->ins, "error copying resource URI"); + ret = -1; + goto cleanup; + } if (resource_type == AZURE_KUSTO_RESOURCE_QUEUE) { ha = queue_ha; queue_count++; @@ -300,7 +311,7 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi if (!ha) { flb_plg_error(ctx->ins, "error creating HA upstream"); ret = -1; - break; + goto cleanup; } node = flb_upstream_node_create_url(ctx, config, resource_uri); @@ -308,7 +319,7 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi if (!node) { flb_plg_error(ctx->ins, "error creating HA upstream node"); ret = -1; - break; + goto cleanup; } flb_upstream_ha_node_add(ha, node); @@ -324,21 +335,32 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi else { flb_plg_error(ctx->ins, "error parsing resources: missing resources"); ret = -1; + goto cleanup; } } } else { flb_plg_error(ctx->ins, "error parsing JSON response: %s", response); ret = -1; + goto cleanup; } } else { flb_plg_error(ctx->ins, "error allocating tokens"); ret = -1; + goto cleanup; } - flb_sds_destroy(resource_uri); - flb_free(tokens); + cleanup: + if (resource_uri) { + flb_sds_destroy(resource_uri); + } + if (t){ + flb_free(t); + } + if (tokens) { + flb_free(tokens); + } return ret; } @@ -404,8 +426,7 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx, identity_token = flb_sds_create_len(token_str, token_str_len); if (identity_token) { - flb_plg_debug(ctx->ins, "parsed kusto identity token: '%s'", - identity_token); + flb_plg_debug(ctx->ins, "parsed kusto identity token "); } else { flb_plg_error(ctx->ins, "error parsing kusto identity token"); @@ -420,28 +441,95 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx, } flb_free(tokens); + flb_free(t); return identity_token; } - +/* PCG random number generator function */ +static uint32_t pcg32_random_r(pcg32_random_t* rng) { + uint64_t oldstate = rng->state; + rng->state = oldstate * PCG_DEFAULT_MULTIPLIER_64 + rng->inc; + uint32_t xorshifted = ((oldstate >> 18u) ^ oldstate) >> 27u; + uint32_t rot = oldstate >> 59u; + return (xorshifted >> rot) | (xorshifted << ((-rot) & 31)); +} /** - * This method returns random integers from range -600 to +600 which needs to be added - * to the kusto ingestion resources refresh interval to even out the spikes - * in kusto DM for .get ingestion resources upon expiry - * */ + * Generates a random integer within a specified range to adjust the refresh interval + * for Azure Kusto ingestion resources. This helps in distributing the load evenly + * by adding variability to the refresh timing, thus preventing spikes in demand. + * + * The method combines various sources of entropy including environment variables, + * current time, and additional random bytes to generate a robust random number. + * + * Inputs: + * - Environment variables: HOSTNAME and CLUSTER_NAME, which are used to identify + * the pod and cluster respectively. Defaults are used if these are not set. + * - Current time with high precision is obtained to ensure uniqueness. + * + * Outputs: + * - Returns a random integer in the range of -600,000 to +3,600,000. + * - In case of failure in generating random bytes, the method returns -1. + * + * The method utilizes SHA256 hashing and additional entropy from OpenSSL's + * RAND_bytes to ensure randomness. The PCG (Permuted Congruential Generator) + * algorithm is used for generating the final random number. + */ int azure_kusto_generate_random_integer() { - /* Seed the random number generator */ - int pid = getpid(); - unsigned long address = (unsigned long)&address; - unsigned int seed = pid ^ (address & 0xFFFFFFFF) * time(0); - srand(seed); - /* Generate a random integer in the range [-600, 600] */ - int random_integer = rand() % 1201 - 600; + int i; + /* Get environment variables or use default values */ + const char *pod_id = getenv("HOSTNAME"); + const char *cluster_name = getenv("CLUSTER_NAME"); + pod_id = pod_id ? pod_id : "default_pod_id"; + cluster_name = cluster_name ? cluster_name : "default_cluster_name"; + + /* Get current time with high precision */ + struct flb_time tm_now; + flb_time_get(&tm_now); + uint64_t current_time = flb_time_to_nanosec(&tm_now); + + /* Generate additional random entropy */ + unsigned char entropy[32]; + if (RAND_bytes(entropy, sizeof(entropy)) != 1) { + fprintf(stderr, "Error generating random bytes\n"); + return -1; + } + + /* Combine all sources of entropy into a single string */ + char combined[1024]; + snprintf(combined, sizeof(combined), "%s%s%llu%p", + pod_id, cluster_name, current_time, (void *)&combined); + + /* Hash the combined data using SHA256 */ + unsigned char hash[SHA256_DIGEST_LENGTH]; + SHA256((unsigned char *)combined, strlen(combined), hash); + + /* XOR the hash with the additional entropy */ + for (i = 0; i < SHA256_DIGEST_LENGTH; i++) { + hash[i] ^= entropy[i]; + } + + /* Generate an additional 64-bit random number */ + uint64_t additional_random; + if (RAND_bytes((unsigned char *)&additional_random, sizeof(additional_random)) != 1) { + fprintf(stderr, "Error generating additional random bytes\n"); + return -1; + } + + /* Initialize PCG random number generator */ + pcg32_random_t rng; + rng.state = *(uint64_t *)hash ^ additional_random; // XOR with additional random number + rng.inc = *(uint64_t *)(hash + 8); + + /* Generate random value and scale it to desired range */ + uint32_t random_value = pcg32_random_r(&rng); + int random_integer = (random_value % 4200001) - 600000; + return random_integer; } + int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, struct flb_config *config) { @@ -450,22 +538,28 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, flb_sds_t identity_token = NULL; struct flb_upstream_ha *blob_ha = NULL; struct flb_upstream_ha *queue_ha = NULL; - time_t now; + struct flb_time tm_now; + uint64_t now; int generated_random_integer = azure_kusto_generate_random_integer(); - flb_plg_debug(ctx->ins, "generated random integer is %d", generated_random_integer); + flb_plg_debug(ctx->ins, "generated random integer %d", generated_random_integer); - now = time(NULL); + flb_time_get(&tm_now); + now = flb_time_to_millisec(&tm_now); + flb_plg_debug(ctx->ins, "current time %llu", now); + flb_plg_debug(ctx->ins, "load_time is %llu", ctx->resources->load_time); + flb_plg_debug(ctx->ins, "difference is %llu", now - ctx->resources->load_time); + flb_plg_debug(ctx->ins, "effective ingestion resource interval is %d", ctx->ingestion_resources_refresh_interval * 1000 + generated_random_integer); /* check if we have all resources and they are not stale */ if (ctx->resources->blob_ha && ctx->resources->queue_ha && ctx->resources->identity_token && - now - ctx->resources->load_time < ctx->ingestion_resources_refresh_interval + generated_random_integer) { + now - ctx->resources->load_time < ctx->ingestion_resources_refresh_interval * 1000 + generated_random_integer) { flb_plg_debug(ctx->ins, "resources are already loaded and are not stale"); ret = 0; } else { - flb_plg_info(ctx->ins, "loading kusto ingestion resourcs and refresh interval is %d", ctx->ingestion_resources_refresh_interval + generated_random_integer); + flb_plg_info(ctx->ins, "loading kusto ingestion resources and refresh interval is %d", ctx->ingestion_resources_refresh_interval * 1000 + generated_random_integer); response = execute_ingest_csl_command(ctx, ".get ingestion resources"); if (response) { @@ -478,14 +572,16 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, if (pthread_mutex_lock(&ctx->resources_mutex)) { flb_plg_error(ctx->ins, "error locking mutex"); - return -1; + ret = -1; + goto cleanup; } ret = - parse_storage_resources(ctx, config, response, blob_ha, queue_ha); + parse_storage_resources(ctx, config, response, blob_ha, queue_ha); if (pthread_mutex_unlock(&ctx->resources_mutex)) { flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + ret = -1; + goto cleanup; } if (ret == 0) { @@ -493,60 +589,70 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, response = NULL; response = - execute_ingest_csl_command(ctx, ".get kusto identity token"); + execute_ingest_csl_command(ctx, ".get kusto identity token"); if (response) { if (pthread_mutex_lock(&ctx->resources_mutex)) { flb_plg_error(ctx->ins, "error locking mutex"); - return -1; + ret = -1; + goto cleanup; } identity_token = - parse_ingestion_identity_token(ctx, response); + parse_ingestion_identity_token(ctx, response); if (identity_token) { - ctx->resources->blob_ha = blob_ha; - ctx->resources->queue_ha = queue_ha; - ctx->resources->identity_token = identity_token; - ctx->resources->load_time = now; + ctx->resources->blob_ha = blob_ha; + ctx->resources->queue_ha = queue_ha; + ctx->resources->identity_token = identity_token; + ctx->resources->load_time = now; - ret = 0; + ret = 0; } else { flb_plg_error(ctx->ins, "error parsing ingestion identity token"); ret = -1; + goto cleanup; } if (pthread_mutex_unlock(&ctx->resources_mutex)) { flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + ret = -1; + goto cleanup; } } else { flb_plg_error(ctx->ins, "error getting kusto identity token"); ret = -1; + goto cleanup; } } else { flb_plg_error(ctx->ins, "error parsing ingestion storage resources"); ret = -1; + goto cleanup; } if (ret == -1) { flb_upstream_ha_destroy(blob_ha); + blob_ha = NULL; } } else { flb_plg_error(ctx->ins, "error creating storage resources upstreams"); ret = -1; + goto cleanup; } if (ret == -1) { flb_upstream_ha_destroy(queue_ha); + queue_ha = NULL; } } else { flb_plg_error(ctx->ins, "error creating storage resources upstreams"); + ret = -1; + goto cleanup; } if (response) { @@ -555,6 +661,24 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, } if (!response) { flb_plg_error(ctx->ins, "error getting ingestion storage resources"); + ret = -1; + goto cleanup; + } + } + + cleanup: + if (ret == -1) { + if (queue_ha) { + flb_upstream_ha_destroy(queue_ha); + } + if (blob_ha) { + flb_upstream_ha_destroy(blob_ha); + } + if (response) { + flb_sds_destroy(response); + } + if (identity_token) { + flb_sds_destroy(identity_token); } } @@ -616,7 +740,7 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * } /* config: 'client_secret' */ - if (ctx->client_secret == NULL) { + if (ctx->use_imds == FLB_FALSE && ctx->client_secret == NULL) { flb_plg_error(ctx->ins, "property 'client_secret' is not defined"); flb_azure_kusto_conf_destroy(ctx); return NULL; @@ -654,6 +778,7 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url), FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id); + ctx->resources = flb_calloc(1, sizeof(struct flb_azure_kusto_resources)); if (!ctx->resources) { flb_errno(); @@ -673,6 +798,8 @@ int flb_azure_kusto_conf_destroy(struct flb_azure_kusto *ctx) return -1; } + flb_plg_info(ctx->ins, "before exiting the plugin kusto conf destroy called"); + if (ctx->oauth_url) { flb_sds_destroy(ctx->oauth_url); ctx->oauth_url = NULL; diff --git a/plugins/out_azure_kusto/azure_kusto_conf.h b/plugins/out_azure_kusto/azure_kusto_conf.h index 7d38259f8c6..a2139bd6c28 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.h +++ b/plugins/out_azure_kusto/azure_kusto_conf.h @@ -28,4 +28,4 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * struct flb_config *config); int flb_azure_kusto_conf_destroy(struct flb_azure_kusto *ctx); -#endif +#endif \ No newline at end of file diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 495aa94cb83..f81825b7a8d 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -29,6 +29,7 @@ #include #include "azure_kusto_ingest.h" +#include "azure_kusto_store.h" /* not really uuid but a random string in the form 00000000-0000-0000-0000-000000000000 */ static char *generate_uuid() @@ -103,14 +104,14 @@ static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx, } ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3, - (void **)&blob_uri, &blob_uri_size); + (void **)&blob_uri, &blob_uri_size); if (ret == -1) { flb_plg_error(ctx->ins, "error getting blob uri"); return NULL; } ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_SAS, 3, - (void **)&blob_sas, &blob_sas_size); + (void **)&blob_sas, &blob_sas_size); if (ret == -1) { flb_plg_error(ctx->ins, "error getting blob sas token"); return NULL; @@ -138,8 +139,9 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t int ret = -1; flb_sds_t uri = NULL; struct flb_upstream_node *u_node; - struct flb_connection *u_conn; - struct flb_http_client *c; + struct flb_forward_config *fc = NULL; + struct flb_connection *u_conn = NULL; + struct flb_http_client *c = NULL; size_t resp_size; time_t now; struct tm tm; @@ -158,27 +160,41 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t return NULL; } + /* Get forward_config stored in node opaque data */ + fc = flb_upstream_node_get_data(u_node); flb_plg_debug(ctx->ins,"inside blob after upstream ha node get"); u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; + if (ctx->buffering_enabled == FLB_TRUE){ + u_node->u->base.flags &= ~(FLB_IO_ASYNC); + u_node->u->base.net.io_timeout = ctx->io_timeout; + } + flb_plg_debug(ctx->ins, "azure_kusto_create_blob -- async flag is %d", flb_stream_is_async(&ctx->u->base)); + flb_plg_debug(ctx->ins,"inside blob after upstream ha node get :: setting ingestion timeout"); + if (!u_node->u) { + flb_plg_error(ctx->ins, "upstream data is null"); + return NULL; + } u_conn = flb_upstream_conn_get(u_node->u); + flb_plg_debug(ctx->ins,"inside blob after upstream ha node get :: after getting connection"); if (u_conn) { if (pthread_mutex_lock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error locking blob mutex"); - return NULL; + flb_plg_error(ctx->ins, "error unlocking mutex"); + goto cleanup; } flb_plg_debug(ctx->ins,"inside blob before create blob uri"); uri = azure_kusto_create_blob_uri(ctx, u_node, blob_id); if (pthread_mutex_unlock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error unlocking blob mutex"); - return NULL; + flb_plg_error(ctx->ins, "error unlocking mutex"); + goto cleanup; } if (uri) { + flb_plg_info(ctx->ins, "azure_kusto: before calling azure storage api :: value of set io_timeout is %d", u_conn->net->io_timeout); flb_plg_debug(ctx->ins, "uploading payload to blob uri: %s", uri); c = flb_http_client(u_conn, FLB_HTTP_PUT, uri, payload, payload_size, NULL, 0, NULL, 0); @@ -205,19 +221,20 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t ret = -1; if (c->resp.payload_size > 0) { - flb_plg_debug(ctx->ins, "Request failed and returned: \n%s", + flb_plg_error(ctx->ins, "create blob Request failed and returned: \n%s", c->resp.payload); } else { - flb_plg_debug(ctx->ins, "Request failed"); + flb_plg_error(ctx->ins, "create blob Request failed"); } } } else { - flb_plg_error(ctx->ins, "cannot send HTTP request"); + flb_plg_error(ctx->ins, "create blob cannot send HTTP request"); } flb_http_client_destroy(c); + //c = NULL; } else { flb_plg_error(ctx->ins, @@ -240,6 +257,21 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t } return uri; + + cleanup: + if (c) { + flb_http_client_destroy(c); + c = NULL; + } + if (u_conn) { + flb_upstream_conn_release(u_conn); + u_conn = NULL; + } + if (uri) { + flb_sds_destroy(uri); + uri = NULL; + } + return NULL; } static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t blob_uri, @@ -252,9 +284,9 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t size_t b64_len; size_t message_len; - + if (pthread_mutex_lock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error locking blob mutex"); + flb_plg_error(ctx->ins, "error unlocking mutex"); return NULL; } @@ -263,10 +295,10 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t message = flb_sds_create(NULL); flb_plg_debug(ctx->ins,"uuid :: %s",uuid); - flb_plg_debug(ctx->ins,"blob uri :: %s",blob_uri); - flb_plg_debug(ctx->ins,"payload size :: %lu",payload_size); - flb_plg_debug(ctx->ins,"database_name :: %s",ctx->database_name); - flb_plg_debug(ctx->ins,"table name :: %s",ctx->table_name); + flb_plg_debug(ctx->ins,"blob uri :: %s",blob_uri); + flb_plg_debug(ctx->ins,"payload size :: %lu",payload_size); + flb_plg_debug(ctx->ins,"database_name :: %s",ctx->database_name); + flb_plg_debug(ctx->ins,"table name :: %s",ctx->table_name); if (message) { message_len = @@ -290,9 +322,9 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t if (message_b64) { ret = flb_sds_snprintf( - &message, flb_sds_alloc(message), - "%s%c", - message_b64, 0); + &message, flb_sds_alloc(message), + "%s%c", + message_b64, 0); if (ret == -1) { flb_plg_error(ctx->ins, "error creating ingestion queue message"); @@ -326,7 +358,7 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t if (pthread_mutex_unlock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error unlocking blob mutex"); + flb_plg_error(ctx->ins, "error unlocking mutex"); return NULL; } @@ -344,14 +376,14 @@ static flb_sds_t azure_kusto_create_queue_uri(struct flb_azure_kusto *ctx, size_t queue_sas_size; ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3, - (void **)&queue_uri, &queue_uri_size); + (void **)&queue_uri, &queue_uri_size); if (ret == -1) { flb_plg_error(ctx->ins, "error getting queue uri"); return NULL; } ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_SAS, 3, - (void **)&queue_sas, &queue_sas_size); + (void **)&queue_sas, &queue_sas_size); if (ret == -1) { flb_plg_error(ctx->ins, "error getting queue sas token"); return NULL; @@ -396,8 +428,12 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t flb_plg_error(ctx->ins, "error getting queue upstream"); return -1; } - + u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; + if (ctx->buffering_enabled == FLB_TRUE){ + u_node->u->base.flags &= ~(FLB_IO_ASYNC); + u_node->u->base.net.io_timeout = ctx->io_timeout; + } u_conn = flb_upstream_conn_get(u_node->u); @@ -441,17 +477,17 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t ret = -1; if (c->resp.payload_size > 0) { - flb_plg_debug(ctx->ins, - "Request failed and returned: \n%s", + flb_plg_error(ctx->ins, + "kusto queue Request failed and returned: %s", c->resp.payload); } else { - flb_plg_debug(ctx->ins, "Request failed"); + flb_plg_error(ctx->ins, "kusto queue Request failed"); } } } else { - flb_plg_error(ctx->ins, "cannot send HTTP request"); + flb_plg_error(ctx->ins, "kusto queue cannot send HTTP request"); } flb_http_client_destroy(c); @@ -482,48 +518,91 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t return ret; } +/* Function to generate a random alphanumeric string */ +void generate_random_string(char *str, size_t length) +{ + const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + const size_t charset_size = sizeof(charset) - 1; + + /* Seed the random number generator with multiple sources of entropy */ + unsigned int seed = (unsigned int)(time(NULL) ^ clock() ^ getpid()); + srand(seed); + + size_t i; + for (i = 0; i < length; ++i) { + size_t index = (size_t)rand() % charset_size; + str[i] = charset[index]; + } + + str[length] = '\0'; +} + static flb_sds_t azure_kusto_create_blob_id(struct flb_azure_kusto *ctx, flb_sds_t tag, size_t tag_len) { flb_sds_t blob_id = NULL; struct flb_time tm; uint64_t ms; - char *b64tag; - size_t b64_len; + char *b64tag = NULL; + size_t b64_len = 0; + char *uuid = NULL; + char timestamp[20]; /* Buffer for timestamp */ + char generated_random_string[ctx->blob_uri_length + 1]; flb_time_get(&tm); ms = ((tm.tm.tv_sec * 1000) + (tm.tm.tv_nsec / 1000000)); - b64tag = base64_encode(tag, tag_len, &b64_len); - - if (b64tag) { - /* remove trailing '=' */ - while (b64_len && b64tag[b64_len - 1] == '=') { - b64tag[b64_len - 1] = '\0'; - b64_len--; + if (!ctx->unify_tag) { + b64tag = base64_encode(tag, tag_len, &b64_len); + if (b64tag) { + /* remove trailing '=' */ + while (b64_len && b64tag[b64_len - 1] == '=') { + b64tag[b64_len - 1] = '\0'; + b64_len--; + } + } else { + flb_plg_error(ctx->ins, "error encoding tag '%s' to base64", tag); + return NULL; } + } else { + generate_random_string(generated_random_string, ctx->blob_uri_length); // Generate the random string + b64tag = generated_random_string; + b64_len = strlen(generated_random_string); + } - blob_id = flb_sds_create_size(flb_sds_len(ctx->database_name) + - flb_sds_len(ctx->table_name) + b64_len + 24); - if (blob_id) { - flb_sds_snprintf(&blob_id, flb_sds_alloc(blob_id), "flb__%s__%s__%s__%" PRIu64, - ctx->database_name, ctx->table_name, b64tag, ms); - } - else { - flb_plg_error(ctx->ins, "cannot create blob id buffer"); + /* Get the current timestamp */ + time_t now = time(NULL); + struct tm *tm_info = gmtime(&now); + strftime(timestamp, sizeof(timestamp), "%Y%m%d%H%M%S", tm_info); + + /* Generate a UUID */ + uuid = generate_uuid(); + if (!uuid) { + flb_plg_error(ctx->ins, "error generating UUID"); + if (!ctx->unify_tag && b64tag) { + flb_free(b64tag); } + return NULL; + } - flb_free(b64tag); + blob_id = flb_sds_create_size(1024); /* Ensure the size is restricted to 1024 characters */ + if (blob_id) { + flb_sds_snprintf(&blob_id, 1024, "flb__%s__%s__%s__%llu__%s__%s", + ctx->database_name, ctx->table_name, b64tag, ms, timestamp, uuid); + } else { + flb_plg_error(ctx->ins, "cannot create blob id buffer"); } - else { - flb_plg_error(ctx->ins, "error encoding tag '%s' to base64", tag); + + if (!ctx->unify_tag && b64tag) { + flb_free(b64tag); } + flb_free(uuid); return blob_id; } int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, - size_t tag_len, flb_sds_t payload, size_t payload_size) + size_t tag_len, flb_sds_t payload, size_t payload_size, struct azure_kusto_file *upload_file ) { int ret = -1; flb_sds_t blob_id; @@ -532,22 +611,28 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, if (pthread_mutex_lock(&ctx->blob_mutex)) { flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + return -1; } /* flb________ */ blob_id = azure_kusto_create_blob_id(ctx, tag, tag_len); - + if (pthread_mutex_unlock(&ctx->blob_mutex)) { flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + return -1; } if (blob_id) { blob_uri = azure_kusto_create_blob(ctx, blob_id, payload, payload_size); if (blob_uri) { + if (ctx->buffering_enabled == FLB_TRUE && upload_file != NULL && ctx->buffer_file_delete_early == FLB_TRUE) { + flb_plg_debug(ctx->ins, "buffering enabled, ingest to blob successfully done and now deleting the buffer file %s", blob_id); + if (azure_kusto_store_file_delete(ctx, upload_file) != 0) { + flb_plg_error(ctx->ins, "blob creation successful but error deleting buffer file %s", blob_id); + } + } ret = azure_kusto_enqueue_ingestion(ctx, blob_uri, payload_size); if (ret != 0) { @@ -568,4 +653,4 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, } return ret; -} +} \ No newline at end of file diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.h b/plugins/out_azure_kusto/azure_kusto_ingest.h index 1fedc717b62..b60796f4fd1 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.h +++ b/plugins/out_azure_kusto/azure_kusto_ingest.h @@ -21,8 +21,9 @@ #define FLB_OUT_AZURE_KUSTO_INGEST_H #include "azure_kusto.h" +#include "azure_kusto_store.h" int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, - size_t tag_len, flb_sds_t payload, size_t payload_size); + size_t tag_len, flb_sds_t payload, size_t payload_size, struct azure_kusto_file *upload_file); #endif \ No newline at end of file diff --git a/plugins/out_azure_kusto/azure_kusto_store.c b/plugins/out_azure_kusto/azure_kusto_store.c new file mode 100644 index 00000000000..24806b11128 --- /dev/null +++ b/plugins/out_azure_kusto/azure_kusto_store.c @@ -0,0 +1,769 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "azure_kusto_store.h" +#include +#include +#include + +/** + * Generates a unique store filename based on a given tag and the current time. + * + * This function creates a unique filename by computing a hash from the provided + * tag and combining it with a hash derived from the current time. The resulting + * filename is intended to be used for storing data in a way that ensures + * uniqueness and avoids collisions. + * + * Parameters: + * - tag: A constant character pointer representing the tag for which the + * filename is being generated. This tag is used as part of the hash + * computation to ensure that filenames are unique to each tag. + * + * Returns: + * - A dynamically allocated `flb_sds_t` string containing the generated + * filename. The caller is responsible for freeing this string using + * `flb_sds_destroy` when it is no longer needed. + * - Returns `NULL` if memory allocation fails during the process. + * + * Behavior: + * - The function first retrieves the current time using `flb_time_get`. + * - It then computes a hash from the input tag using the DJB2 algorithm. + * - A secondary hash is computed using the current time's seconds and + * nanoseconds to further ensure uniqueness. + * - The function formats these hashes into a string using `flb_sds_printf`, + * ensuring that the resulting filename is unique for each tag and time + * combination. + * - If any memory allocation fails, the function logs an error using + * `flb_errno` and returns `NULL`. + */ + +static flb_sds_t gen_store_filename(const char *tag) +{ + int c; + unsigned long hash = 5381; + unsigned long hash2 = 5381; + flb_sds_t hash_str; + flb_sds_t tmp; + struct flb_time tm; + + /* get current time */ + flb_time_get(&tm); + + /* compose hash */ + while ((c = *tag++)) { + hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ + } + hash2 = (unsigned long) hash2 * tm.tm.tv_sec * tm.tm.tv_nsec; + + /* flb_sds_printf allocs if the incoming sds is not at least 64 bytes */ + hash_str = flb_sds_create_size(64); + if (!hash_str) { + flb_errno(); + return NULL; + } + tmp = flb_sds_printf(&hash_str, "%lu-%lu", hash, hash2); + if (!tmp) { + flb_errno(); + flb_sds_destroy(hash_str); + return NULL; + } + hash_str = tmp; + + return hash_str; +} + + +/** + * Retrieve a candidate buffer file using the tag. + * + * This function searches through the list of active files in the current + * Azure Kusto plugin context to find a file that matches the specified tag. The + * tag is used as a lookup pattern to identify the appropriate file for + * storing incoming data. + * + * The function iterates over the list of files associated with the active + * stream in the context. For each file, it performs the following checks: + * + * 1. **Null Data Check**: If a file's data reference is NULL, it logs a + * warning and attempts to delete the file, as it indicates a partially + * initialized chunk. + * + * 2. **Meta Size Check**: Compares the size of the file's metadata with + * the length of the provided tag. If they do not match, the file is + * skipped. + * + * 3. **Locked File Check**: If the file is locked, it logs a debug message + * and skips the file, as locked files are not eligible for selection. + * + * 4. **Tag Comparison**: Compares the file's metadata buffer with the + * provided tag. If they match, it logs a debug message indicating a + * successful match and breaks out of the loop. + * + * If a matching file is found, the function returns a pointer to the + * `azure_kusto_file` structure associated with the file. If no match is + * found, it returns NULL. + * + * @param ctx Pointer to the Azure Kusto plugin context containing the active + * stream and file list. + * @param tag The tag used as a lookup pattern to find the matching file. + * @param tag_len The length of the tag. + * + * @return A pointer to the `azure_kusto_file` structure if a matching file + * is found; otherwise, NULL. + */ +struct azure_kusto_file *azure_kusto_store_file_get(struct flb_azure_kusto *ctx, const char *tag, + int tag_len) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_fstore_file *fsf = NULL; + struct azure_kusto_file *azure_kusto_file; + int found = 0; + + /* + * Based in the current ctx->stream_name, locate a candidate file to + * store the incoming data using as a lookup pattern the content Tag. + */ + mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { + fsf = mk_list_entry(head, struct flb_fstore_file, _head); + + /* skip and warn on partially initialized chunks */ + if (fsf->data == NULL) { + flb_plg_warn(ctx->ins, "BAD: found flb_fstore_file with NULL data reference, tag=%s, file=%s, will try to delete", tag, fsf->name); + flb_fstore_file_delete(ctx->fs, fsf); + } + + if (fsf->meta_size != tag_len) { + fsf = NULL; + continue; + } + + /* skip locked chunks */ + azure_kusto_file = fsf->data; + if (azure_kusto_file->locked == FLB_TRUE) { + flb_plg_debug(ctx->ins, "File '%s' is locked, skipping", fsf->name); + fsf = NULL; + continue; + } + + + /* compare meta and tag */ + if (strncmp((char *) fsf->meta_buf, tag, tag_len) == 0 ) { + flb_plg_debug(ctx->ins, "Found matching file '%s' for tag '%.*s'", fsf->name, tag_len, tag); + found = 1; + break; + } + } + + if (!found) { + return NULL; + } else { + return fsf->data; + } +} + +/** + * Append data to a new or existing fstore file. + * + * This function is responsible for appending data to a file in the Azure Kusto + * buffer storage system. It handles both the creation of new files and the appending + * of data to existing files. The function ensures that the buffer does not + * exceed the specified storage limit and manages file metadata and context. + * + * Parameters: + * - ctx: A pointer to the flb_azure_kusto context, which contains configuration + * and state information for the Azure Kusto storage system. + * - azure_kusto_file: A pointer to an existing azure_kusto_file structure. If + * NULL, a new file will be created. + * - tag: A string representing the tag associated with the data. This is used + * for metadata purposes. + * - tag_len: The length of the tag string. + * - data: The data to be appended to the file. + * - bytes: The size of the data in bytes. + * + * Returns: + * - 0 on success, indicating that the data was successfully appended to the + * file. + * - -1 on failure, indicating an error occurred during the process, such as + * exceeding the buffer limit, file creation failure, or metadata writing + * failure. + * + * The function performs the following steps: + * 1. Checks if adding the new data would exceed the storage directory limit. + * 2. If no target file is provided, it generates a new file name and creates + * a new file in the storage system. + * 3. Writes the tag as metadata to the newly created file. + * 4. Allocates a new buffer file context and associates it with the file. + * 5. Appends the data to the target file. + * 6. Updates the file and buffer sizes. + * 7. Warns the user if the buffer is nearing its capacity. + */ +int azure_kusto_store_buffer_put(struct flb_azure_kusto *ctx, struct azure_kusto_file *azure_kusto_file, + flb_sds_t tag, size_t tag_len, + flb_sds_t data, size_t bytes) { + int ret; + flb_sds_t name; + struct flb_fstore_file *fsf; + size_t space_remaining; + + if (ctx->store_dir_limit_size > 0 && ctx->current_buffer_size + bytes >= ctx->store_dir_limit_size) { + flb_plg_error(ctx->ins, "Buffer is full: current_buffer_size=%zu, new_data=%zu, store_dir_limit_size=%zu bytes", + ctx->current_buffer_size, bytes, ctx->store_dir_limit_size); + return -1; + } + + /* If no target file was found, create a new one */ + if (azure_kusto_file == NULL) { + name = gen_store_filename(tag); + if (!name) { + flb_plg_error(ctx->ins, "could not generate chunk file name"); + return -1; + } + + flb_plg_debug(ctx->ins, "[azure_kusto] new buffer file: %s", name); + + /* Create the file */ + fsf = flb_fstore_file_create(ctx->fs, ctx->stream_active, name, bytes); + if (!fsf) { + flb_plg_error(ctx->ins, "could not create the file '%s' in the store", + name); + flb_sds_destroy(name); + return -1; + } + + /* Write tag as metadata */ + ret = flb_fstore_file_meta_set(ctx->fs, fsf, (char *) tag, tag_len); + if (ret == -1) { + flb_plg_warn(ctx->ins, "Deleting buffer file because metadata could not be written"); + flb_fstore_file_delete(ctx->fs, fsf); + return -1; + } + + /* Allocate local context */ + azure_kusto_file = flb_calloc(1, sizeof(struct azure_kusto_file)); + if (!azure_kusto_file) { + flb_errno(); + flb_plg_warn(ctx->ins, "Deleting buffer file because azure_kusto context creation failed"); + flb_fstore_file_delete(ctx->fs, fsf); + return -1; + } + azure_kusto_file->fsf = fsf; + azure_kusto_file->create_time = time(NULL); + azure_kusto_file->size = 0; // Initialize size to 0 + + /* Use fstore opaque 'data' reference to keep our context */ + fsf->data = azure_kusto_file; + flb_sds_destroy(name); + + }else { + fsf = azure_kusto_file->fsf; + } + + /* Append data to the target file */ + ret = flb_fstore_file_append(azure_kusto_file->fsf, data, bytes); + + if (ret != 0) { + flb_plg_error(ctx->ins, "error writing data to local azure_kusto file"); + return -1; + } + + azure_kusto_file->size += bytes; + ctx->current_buffer_size += bytes; + + flb_plg_debug(ctx->ins, "[azure_kusto] new file size: %zu", azure_kusto_file->size); + flb_plg_debug(ctx->ins, "[azure_kusto] current_buffer_size: %zu", ctx->current_buffer_size); + + /* if buffer is 95% full, warn user */ + if (ctx->store_dir_limit_size > 0) { + space_remaining = ctx->store_dir_limit_size - ctx->current_buffer_size; + if ((space_remaining * 20) < ctx->store_dir_limit_size) { + flb_plg_warn(ctx->ins, "Buffer is almost full: current_buffer_size=%zu, store_dir_limit_size=%zu bytes", + ctx->current_buffer_size, ctx->store_dir_limit_size); + } + } + return 0; +} + +/** + * Set Files in Azure Kusto Plugin Buffer Context + * + * This function iterates over the file streams associated with the context, + * excluding the currently active stream and the multi-upload stream. For each file in + * these streams, it checks if the file's data context is uninitialized. If so, it allocates + * a new `azure_kusto_file` structure to serve as the file's context. + * + * The function performs the following steps: + * 1. Iterate over each file stream in the context's file store, skipping the active and + * multi-upload streams. + * 2. For each file in the stream, check if the file's data context is already set. + * - If the data context is set, continue to the next file. + * 3. Allocate memory for a new `azure_kusto_file` structure to serve as the file's context. + * - If memory allocation fails, log an error and continue to the next file. + * 4. Initialize the `azure_kusto_file` structure with the current file and the current time. + * 5. Assign the newly created `azure_kusto_file` structure to the file's data context. + * + * This function ensures that each file in the relevant streams has an associated context + * for further processing, which is crucial for managing file operations within the Azure + * Kusto environment. + * + * Parameters: + * - ctx: A pointer to the `flb_azure_kusto` structure, which contains the file store and + * other relevant context information. + * + * Returns: + * - Always returns 0, indicating successful execution. + */ +static int set_files_context(struct flb_azure_kusto *ctx) +{ + struct mk_list *head; + struct mk_list *f_head; + struct flb_fstore_stream *fs_stream; + struct flb_fstore_file *fsf; + struct azure_kusto_file *azure_kusto_file; + + mk_list_foreach(head, &ctx->fs->streams) { + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + + /* skip current stream since it's new */ + if (fs_stream == ctx->stream_active) { + continue; + } + + /* skip multi-upload */ + if (fs_stream == ctx->stream_upload) { + continue; + } + + mk_list_foreach(f_head, &fs_stream->files) { + fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); + if (fsf->data) { + continue; + } + + /* Allocate local context */ + azure_kusto_file = flb_calloc(1, sizeof(struct azure_kusto_file)); + if (!azure_kusto_file) { + flb_errno(); + flb_plg_error(ctx->ins, "cannot allocate azure_kusto file context"); + continue; + } + azure_kusto_file->fsf = fsf; + azure_kusto_file->create_time = time(NULL); + + /* Use fstore opaque 'data' reference to keep our context */ + fsf->data = azure_kusto_file; + } + } + + return 0; +} + +/** + * Initialize the filesystem storage for the Azure Kusto plugin. + * + * This function is responsible for setting up the storage context and creating + * a new stream for data storage. It ensures that the Azure Kusto plugin can + * differentiate between new data generated during the current process run and + * backlog data from previous runs. + * + * Key Steps: + * 1. **Set Storage Type**: The storage type is set to `FLB_FSTORE_FS`, indicating + * that the storage will be filesystem-based. + * + * 2. **Initialize Storage Context**: + * - Constructs a path for the storage context using the `buffer_dir` and + * `azure_kusto_buffer_key` from the context (`ctx`). + * - Creates the storage context using `flb_fstore_create`. If this fails, + * the function returns an error. + * + * 3. **Stream Creation**: + * - On each start, a new stream is created. This stream is a directory named + * with the current date and time (formatted as `YYYY-MM-DDTHH:MM:SS`), which + * stores all new data generated during the current process run. + * - The plugin differentiates between new and older buffered data, with older + * data considered as backlog. + * + * 4. **Platform-Specific Considerations**: + * - On Windows, the function replaces colons (`:`) with hyphens (`-`) in the + * stream name because colons are not allowed in directory names on Windows. + * + * 5. **Error Handling**: + * - If the stream creation fails, the function logs an error, destroys the + * storage context, and returns an error code. + * + * 6. **Finalization**: + * - If successful, the function sets the active stream in the context and + * calls `set_files_context` to finalize the setup. + * + * @param ctx A pointer to the `flb_azure_kusto` structure containing the plugin + * context and configuration. + * + * @return Returns 0 on success, or -1 on failure. + */ +int azure_kusto_store_init(struct flb_azure_kusto *ctx) +{ + int type; + time_t now; + char tmp[64]; + struct tm *tm; + struct flb_fstore *fs; + struct flb_fstore_stream *fs_stream; + + /* Set the storage type */ + type = FLB_FSTORE_FS; + + /* Initialize the storage context */ + if (ctx->buffer_dir[strlen(ctx->buffer_dir) - 1] == '/') { + snprintf(tmp, sizeof(tmp), "%s%s", ctx->buffer_dir, ctx->azure_kusto_buffer_key); + } + else { + snprintf(tmp, sizeof(tmp), "%s/%s", ctx->buffer_dir, ctx->azure_kusto_buffer_key); + } + + /* Initialize the storage context */ + fs = flb_fstore_create(tmp, type); + if (!fs) { + return -1; + } + ctx->fs = fs; + + /* + * On every start we create a new stream, this stream in the file system + * is directory with the name using the date like '2020-10-03T13:00:02'. So + * all the 'new' data that is generated on this process is stored there. + * + * Note that previous data in similar directories from previous runs is + * considered backlog data, in the azure_kusto plugin we need to differentiate the + * new v/s the older buffered data. + * + * Compose a stream name... + */ + now = time(NULL); + tm = localtime(&now); + +#ifdef FLB_SYSTEM_WINDOWS + /* Windows does not allow ':' in directory names */ + strftime(tmp, sizeof(tmp) - 1, "%Y-%m-%dT%H-%M-%S", tm); +#else + strftime(tmp, sizeof(tmp) - 1, "%Y-%m-%dT%H:%M:%S", tm); +#endif + + /* Create the stream */ + fs_stream = flb_fstore_stream_create(ctx->fs, tmp); + if (!fs_stream) { + /* Upon exception abort */ + flb_plg_error(ctx->ins, "could not initialize active stream: %s", tmp); + flb_fstore_destroy(fs); + ctx->fs = NULL; + return -1; + } + ctx->stream_active = fs_stream; + + set_files_context(ctx); + return 0; +} + +/** + * azure_kusto_store_exit - Cleans up and releases resources associated with + * the Kusto plugin storage context. + * + * This function is responsible for releasing any local resources associated + * with the Kusto plugin storage context (`ctx`). It iterates over the file + * streams in the context's file store (`ctx->fs`) and frees any allocated + * memory for non-multi upload files. Finally, it destroys the file store + * if it exists. + * + * Parameters: + * ctx - A pointer to the `flb_azure_kusto` structure representing the + * Kusto plugin storage context. + * + * Returns: + * An integer value, always returns 0 indicating successful cleanup. + */ +int azure_kusto_store_exit(struct flb_azure_kusto *ctx) +{ + struct mk_list *head; + struct mk_list *f_head; + struct flb_fstore_stream *fs_stream; + struct flb_fstore_file *fsf; + struct azure_kusto_file *azure_kusto_file; + + if (!ctx->fs) { + return 0; + } + + /* release local context on non-multi upload files */ + mk_list_foreach(head, &ctx->fs->streams) { + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + if (fs_stream == ctx->stream_upload) { + continue; + } + + mk_list_foreach(f_head, &fs_stream->files) { + fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); + if (fsf->data != NULL) { + azure_kusto_file = fsf->data; + flb_free(azure_kusto_file); + } + } + } + + if (ctx->fs) { + flb_fstore_destroy(ctx->fs); + } + return 0; +} + +/** + * azure_kusto_store_has_data - Check if there is any data in the Azure Kusto store. + * @ctx: Pointer to the Kusto plugin context structure. + * + * This function checks whether there is any data stored in the file storage + * associated with the Kusto plugin context. It iterates over each stream in the + * file storage, excluding the stream used for uploads, and checks if there are + * any files present. If files are found, it logs their names and returns true. + * If no files are found in any stream, it logs this information and returns false. + * + * Returns: + * FLB_TRUE if there is data in any stream other than the upload stream, + * FLB_FALSE otherwise. + */ +int azure_kusto_store_has_data(struct flb_azure_kusto *ctx) +{ + struct mk_list *head; + struct flb_fstore_stream *fs_stream; + + /* Check if the file storage context is initialized */ + if (!ctx->fs) { + flb_plg_debug(ctx->ins, "File storage context is not initialized"); + return FLB_FALSE; + } + + /* Iterate over each stream in the file storage */ + mk_list_foreach(head, &ctx->fs->streams) { + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + + /* Log the name of the current stream being processed */ + flb_plg_debug(ctx->ins, "Processing stream: '%s'", fs_stream->name); + + /* Check if the current stream is the one used for uploads */ + if (fs_stream == ctx->stream_upload) { + flb_plg_debug(ctx->ins, "Skipping upload stream: '%s'", fs_stream->name); + continue; + } + + /* Log the number of files in the current stream */ + int file_count = mk_list_size(&fs_stream->files); + flb_plg_debug(ctx->ins, "Stream '%s' has %d files", fs_stream->name, file_count); + + /* If there are files, log their names and return true */ + if (file_count > 0) { + struct mk_list *file_head; + struct flb_fstore_file *fs_file; + + mk_list_foreach(file_head, &fs_stream->files) { + fs_file = mk_list_entry(file_head, struct flb_fstore_file, _head); + flb_plg_debug(ctx->ins, "File in stream '%s': '%s'", fs_stream->name, fs_file->name); + } + + return FLB_TRUE; + } + } + + /* Log if no data was found in any stream */ + flb_plg_debug(ctx->ins, "No data found in any stream"); + return FLB_FALSE; +} + +/** + * Checks if there are any files in the upload stream of the Kusto plugin context. + * + * This function verifies whether the given Kusto plugin context has any files + * queued for upload. It performs the following checks: + * + * 1. Validates that the context and the upload stream are initialized. + * 2. Checks the number of files in the upload stream. + * 3. Returns true if there are files present, otherwise returns false. + * + * @param ctx A pointer to the Kusto plugin context structure. + * @return FLB_TRUE if there are files in the upload stream, FLB_FALSE otherwise. + */ +int azure_kusto_store_has_uploads(struct flb_azure_kusto *ctx) +{ + if (!ctx || !ctx->stream_upload) { + return FLB_FALSE; + } + + if (mk_list_size(&ctx->stream_upload->files) > 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +/** + * azure_kusto_store_file_inactive - Marks a file as inactive in the Kusto plugin storage context. + * @ctx: Pointer to the Kusto plugin context structure. + * @azure_kusto_file: Pointer to the Azure Kusto file structure to be marked inactive. + * + * This function is responsible for marking a specific file as inactive within the + * Kusto plugin storage context. It first retrieves the file store file structure + * associated with the buffer file, then frees the memory allocated for the + * file structure. Finally, it calls the function to mark the file + * as inactive in the file store and returns the result of this operation. + * + * Returns: + * The return value of the flb_fstore_file_inactive function, indicating success + * or failure of marking the file as inactive. + */ +int azure_kusto_store_file_inactive(struct flb_azure_kusto *ctx, struct azure_kusto_file *azure_kusto_file) +{ + int ret; + struct flb_fstore_file *fsf; + + fsf = azure_kusto_file->fsf; + + flb_free(azure_kusto_file); + ret = flb_fstore_file_inactive(ctx->fs, fsf); + + return ret; +} + +/** + * azure_kusto_store_file_cleanup - Cleans up and permanently deletes a file from Kusto plugin buffer storage. + * @ctx: Pointer to the Kusto plugin context structure. + * @azure_kusto_file: Pointer to the Kusto plugin buffer file structure to be cleaned up. + * + * This function retrieves the file store structure from the given buffer file, + * performs a permanent deletion of the file from the file store, and then frees the memory + * allocated for the file structure. It returns 0 upon successful completion. + * + * Returns: + * 0 on successful cleanup and deletion of the file. + */ +int azure_kusto_store_file_cleanup(struct flb_azure_kusto *ctx, struct azure_kusto_file *azure_kusto_file) +{ + struct flb_fstore_file *fsf; + + fsf = azure_kusto_file->fsf; + + /* permanent deletion */ + flb_fstore_file_delete(ctx->fs, fsf); + flb_free(azure_kusto_file); + + return 0; +} + +/** + * azure_kusto_store_file_delete - Deletes a file from Kusto plugin byffer storage. + * @ctx: Pointer to the Kusto plugin context structure. + * @azure_kusto_file: Pointer to the Kusto plugin file structure to be deleted. + * + * This function performs the permanent deletion of a file from the Kusto plugin buffer + * storage. It first retrieves the file store structure associated with the + * file. Then, it updates the current buffer size in the context by + * subtracting the size of the file being deleted. Finally, it deletes the file + * from the file store and frees the memory allocated for the buffer file + * structure. + * + * Returns 0 on successful deletion. + */ +int azure_kusto_store_file_delete(struct flb_azure_kusto *ctx, struct azure_kusto_file *azure_kusto_file) +{ + struct flb_fstore_file *fsf; + + fsf = azure_kusto_file->fsf; + ctx->current_buffer_size -= azure_kusto_file->size; + + /* permanent deletion */ + flb_fstore_file_delete(ctx->fs, fsf); + flb_free(azure_kusto_file); + + return 0; +} + +/** + * azure_kusto_store_file_upload_read - Reads the content of a file from the Azure Kusto store. + * @ctx: Pointer to the Kusto plugin context structure. + * @fsf: Pointer to the file store structure representing the file to be read. + * @out_buf: Pointer to a buffer where the file content will be stored. + * @out_size: Pointer to a variable where the size of the file content will be stored. + * + * This function copies the content of the specified file from the Azure Kusto store + * into a buffer. The buffer and its size are returned through the out parameters. + * + * Returns: + * 0 on success, or a negative error code on failure. + */ +int azure_kusto_store_file_upload_read(struct flb_azure_kusto *ctx, struct flb_fstore_file *fsf, + char **out_buf, size_t *out_size) +{ + int ret; + + ret = flb_fstore_file_content_copy(ctx->fs, fsf, + (void **) out_buf, out_size); + return ret; +} + +/** + * Retrieves metadata for a specified file in the Kusto plugin buffer context. + * + * This function is a wrapper around the `flb_fstore_file_meta_get` function, + * which fetches metadata for a given file within the file storage system. + * + * @param ctx A pointer to the `flb_azure_kusto` structure, which contains + * the context for Azure Kusto operations, including the file storage system. + * @param fsf A pointer to the `flb_fstore_file` structure representing the file + * for which metadata is to be retrieved. + * + * @return The result of the `flb_fstore_file_meta_get` function call, which + * typically indicates success or failure of the metadata retrieval operation. + */ +int azure_kusto_store_file_meta_get(struct flb_azure_kusto *ctx, struct flb_fstore_file *fsf) +{ + return flb_fstore_file_meta_get(ctx->fs, fsf); +} + +/** + * Locks the specified buffer file. + * + * This function sets the `locked` attribute of the given `azure_kusto_file` + * structure to `FLB_TRUE`, indicating that the file is currently locked. + * + * @param azure_kusto_file A pointer to the `azure_kusto_file` structure + * representing the file to be locked. + */ +void azure_kusto_store_file_lock(struct azure_kusto_file *azure_kusto_file) +{ + azure_kusto_file->locked = FLB_TRUE; +} + +/** + * Unlocks the specified buffer file. + * + * This function sets the `locked` attribute of the given `azure_kusto_file` + * structure to `FLB_FALSE`, indicating that the file is currently unlocked. + * + * @param azure_kusto_file A pointer to the `azure_kusto_file` structure + * representing the file to be unlocked. + */ +void azure_kusto_store_file_unlock(struct azure_kusto_file *azure_kusto_file) +{ + azure_kusto_file->locked = FLB_FALSE; +} \ No newline at end of file diff --git a/plugins/out_azure_kusto/azure_kusto_store.h b/plugins/out_azure_kusto/azure_kusto_store.h new file mode 100644 index 00000000000..770670071fe --- /dev/null +++ b/plugins/out_azure_kusto/azure_kusto_store.h @@ -0,0 +1,61 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_AZURE_KUSTO_STORE_H +#define FLB_OUT_AZURE_KUSTO_STORE_H + + +#include +#include +#include "azure_kusto.h" + +struct azure_kusto_file { + int locked; /* locked chunk is busy, cannot write to it */ + int failures; /* delivery failures */ + size_t size; /* file size */ + time_t create_time; /* creation time */ + flb_sds_t file_path; /* file path */ + int lock_fd; // File descriptor for locking + struct flb_fstore_file *fsf; /* reference to parent flb_fstore_file */ +}; + +int azure_kusto_store_buffer_put(struct flb_azure_kusto *ctx, struct azure_kusto_file *azure_kusto_file, + flb_sds_t tag, size_t tag_len, + flb_sds_t data, size_t bytes); + +int azure_kusto_store_init(struct flb_azure_kusto *ctx); +int azure_kusto_store_exit(struct flb_azure_kusto *ctx); + +int azure_kusto_store_has_data(struct flb_azure_kusto *ctx); +int azure_kusto_store_has_uploads(struct flb_azure_kusto *ctx); + +int azure_kusto_store_file_inactive(struct flb_azure_kusto *ctx, struct azure_kusto_file *azure_kusto_file); +struct azure_kusto_file *azure_kusto_store_file_get(struct flb_azure_kusto *ctx, const char *tag, + int tag_len); +int azure_kusto_store_file_cleanup(struct flb_azure_kusto *ctx, struct azure_kusto_file *azure_kusto_file); +int azure_kusto_store_file_delete(struct flb_azure_kusto *ctx, struct azure_kusto_file *azure_kusto_file); +int azure_kusto_store_file_upload_read(struct flb_azure_kusto *ctx, struct flb_fstore_file *fsf, + char **out_buf, size_t *out_size); + +int azure_kusto_store_file_meta_get(struct flb_azure_kusto *ctx, struct flb_fstore_file *fsf); + +void azure_kusto_store_file_lock(struct azure_kusto_file *azure_kusto_file); +void azure_kusto_store_file_unlock(struct azure_kusto_file *azure_kusto_file); + +#endif