Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ public class StartWorkflowRequest {

private IdempotencyStrategy idempotencyStrategy;

/**
* Optional runtime overrides for task-level rate limits.
*
* <p>Key : task reference name OR task definition name Value : {@link TaskRateLimitOverride}
* containing per-frequency overrides.
*/
@ProtoField(id = 10)
private Map<String, TaskRateLimitOverride> taskRateLimitOverrides = new HashMap<>();

public String getIdempotencyKey() {
return idempotencyKey;
}
Expand All @@ -77,6 +86,134 @@ public void setIdempotencyStrategy(IdempotencyStrategy idempotencyStrategy) {
this.idempotencyStrategy = idempotencyStrategy;
}

/* -------------------------------------------------------
* Dynamic rate-limit override accessors
* ------------------------------------------------------- */

public Map<String, TaskRateLimitOverride> getTaskRateLimitOverrides() {
return taskRateLimitOverrides;
}

public void setTaskRateLimitOverrides(
Map<String, TaskRateLimitOverride> taskRateLimitOverrides) {
if (taskRateLimitOverrides == null) {
taskRateLimitOverrides = new HashMap<>();
}
this.taskRateLimitOverrides = taskRateLimitOverrides;
}

public StartWorkflowRequest withTaskRateLimitOverrides(
Map<String, TaskRateLimitOverride> taskRateLimitOverrides) {
setTaskRateLimitOverrides(taskRateLimitOverrides);
return this;
}

/** Holder for per-task dynamic rate-limit configuration. */
@ProtoMessage
public static class TaskRateLimitOverride {
/** Default value indicating no specific rate limit is set */
public static final int NOT_SET = -1;

@ProtoField(id = 1)
private int rateLimitPerFrequency = NOT_SET;

@ProtoField(id = 2)
private int rateLimitFrequencyInSeconds = NOT_SET;

private boolean rateLimitPerFrequencySet = false;
private boolean rateLimitFrequencyInSecondsSet = false;

/**
* Get the rate limit per frequency value.
*
* @return the rate limit per frequency or null if not explicitly set
*/
public Integer getRateLimitPerFrequency() {
return rateLimitPerFrequencySet ? rateLimitPerFrequency : null;
}

/**
* Set the rate limit per frequency value.
*
* @param rateLimitPerFrequency the rate limit per frequency (must be non-negative)
* @throws IllegalArgumentException if value is negative
*/
public void setRateLimitPerFrequency(Integer rateLimitPerFrequency) {
if (rateLimitPerFrequency == null) {
this.rateLimitPerFrequencySet = false;
this.rateLimitPerFrequency = NOT_SET;
return;
}

if (rateLimitPerFrequency < 0) {
throw new IllegalArgumentException(
"Rate limit per frequency cannot be negative: " + rateLimitPerFrequency);
}

this.rateLimitPerFrequency = rateLimitPerFrequency;
this.rateLimitPerFrequencySet = true;
}

/**
* Fluent API for setting rate limit per frequency.
*
* @param rateLimitPerFrequency the rate limit per frequency (must be non-negative)
* @return this instance for method chaining
* @throws IllegalArgumentException if value is negative
*/
public TaskRateLimitOverride withRateLimitPerFrequency(Integer rateLimitPerFrequency) {
setRateLimitPerFrequency(rateLimitPerFrequency);
return this;
}

/**
* Get the rate limit frequency in seconds value.
*
* @return the rate limit frequency in seconds or null if not explicitly set
*/
public Integer getRateLimitFrequencyInSeconds() {
return rateLimitFrequencyInSecondsSet ? rateLimitFrequencyInSeconds : null;
}

/**
* Set the rate limit frequency in seconds value.
*
* @param rateLimitFrequencyInSeconds the rate limit frequency in seconds (must be
* non-negative)
* @throws IllegalArgumentException if value is negative
*/
public void setRateLimitFrequencyInSeconds(Integer rateLimitFrequencyInSeconds) {
if (rateLimitFrequencyInSeconds == null) {
this.rateLimitFrequencyInSecondsSet = false;
this.rateLimitFrequencyInSeconds = NOT_SET;
return;
}

if (rateLimitFrequencyInSeconds < 0) {
throw new IllegalArgumentException(
"Rate limit frequency in seconds cannot be negative: "
+ rateLimitFrequencyInSeconds);
}

this.rateLimitFrequencyInSeconds = rateLimitFrequencyInSeconds;
this.rateLimitFrequencyInSecondsSet = true;
}

/**
* Fluent API for setting rate limit frequency in seconds.
*
* @param rateLimitFrequencyInSeconds the rate limit frequency in seconds (must be
* non-negative)
* @return this instance for method chaining
* @throws IllegalArgumentException if value is negative
*/
public TaskRateLimitOverride withRateLimitFrequencyInSeconds(
Integer rateLimitFrequencyInSeconds) {
setRateLimitFrequencyInSeconds(rateLimitFrequencyInSeconds);
return this;
}
}

public String getName() {
return name;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright 2025 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.common.metadata.workflow;

import org.junit.Test;

import static org.junit.Assert.*;

public class TaskRateLimitOverrideTest {

@Test
public void testDefaultState() {
StartWorkflowRequest.TaskRateLimitOverride override =
new StartWorkflowRequest.TaskRateLimitOverride();

// By default, both values should be null (not set)
assertNull(
"Default rateLimitPerFrequency should be null",
override.getRateLimitPerFrequency());
assertNull(
"Default rateLimitFrequencyInSeconds should be null",
override.getRateLimitFrequencyInSeconds());
}

@Test
public void testPositiveValues() {
StartWorkflowRequest.TaskRateLimitOverride override =
new StartWorkflowRequest.TaskRateLimitOverride();

// Set positive values
override.setRateLimitPerFrequency(100);
override.setRateLimitFrequencyInSeconds(60);

// Verify values are set correctly
assertEquals(Integer.valueOf(100), override.getRateLimitPerFrequency());
assertEquals(Integer.valueOf(60), override.getRateLimitFrequencyInSeconds());
}

@Test
public void testZeroValues() {
StartWorkflowRequest.TaskRateLimitOverride override =
new StartWorkflowRequest.TaskRateLimitOverride();

// Zero is a valid value (no rate limiting)
override.setRateLimitPerFrequency(0);
override.setRateLimitFrequencyInSeconds(0);

// Verify values are set correctly
assertEquals(Integer.valueOf(0), override.getRateLimitPerFrequency());
assertEquals(Integer.valueOf(0), override.getRateLimitFrequencyInSeconds());
}

@Test(expected = IllegalArgumentException.class)
public void testNegativeRateLimitPerFrequency() {
StartWorkflowRequest.TaskRateLimitOverride override =
new StartWorkflowRequest.TaskRateLimitOverride();

// Should throw IllegalArgumentException
override.setRateLimitPerFrequency(-10);
}

@Test(expected = IllegalArgumentException.class)
public void testNegativeRateLimitFrequencyInSeconds() {
StartWorkflowRequest.TaskRateLimitOverride override =
new StartWorkflowRequest.TaskRateLimitOverride();

// Should throw IllegalArgumentException
override.setRateLimitFrequencyInSeconds(-5);
}

@Test
public void testNullHandling() {
StartWorkflowRequest.TaskRateLimitOverride override =
new StartWorkflowRequest.TaskRateLimitOverride();

// Set values first
override.setRateLimitPerFrequency(100);
override.setRateLimitFrequencyInSeconds(60);

// Then set to null (unset)
override.setRateLimitPerFrequency(null);
override.setRateLimitFrequencyInSeconds(null);

// Verify values are null again
assertNull(
"rateLimitPerFrequency should be null after setting to null",
override.getRateLimitPerFrequency());
assertNull(
"rateLimitFrequencyInSeconds should be null after setting to null",
override.getRateLimitFrequencyInSeconds());
}

@Test
public void testFluentAPI() {
StartWorkflowRequest.TaskRateLimitOverride override =
new StartWorkflowRequest.TaskRateLimitOverride()
.withRateLimitPerFrequency(200)
.withRateLimitFrequencyInSeconds(30);

// Verify fluent API set values correctly
assertEquals(Integer.valueOf(200), override.getRateLimitPerFrequency());
assertEquals(Integer.valueOf(30), override.getRateLimitFrequencyInSeconds());
}

@Test(expected = IllegalArgumentException.class)
public void testFluentAPIWithNegativeValue() {
// Should throw IllegalArgumentException
new StartWorkflowRequest.TaskRateLimitOverride().withRateLimitPerFrequency(-50);
}

@Test
public void testSetUnsetState() {
StartWorkflowRequest.TaskRateLimitOverride override =
new StartWorkflowRequest.TaskRateLimitOverride();

// Initially both should be null
assertNull(override.getRateLimitPerFrequency());
assertNull(override.getRateLimitFrequencyInSeconds());

// Set only one value
override.setRateLimitPerFrequency(100);

// One should be set, one should be null
assertEquals(Integer.valueOf(100), override.getRateLimitPerFrequency());
assertNull(override.getRateLimitFrequencyInSeconds());

// Set the other value
override.setRateLimitFrequencyInSeconds(60);

// Both should be set
assertEquals(Integer.valueOf(100), override.getRateLimitPerFrequency());
assertEquals(Integer.valueOf(60), override.getRateLimitFrequencyInSeconds());

// Unset one value
override.setRateLimitPerFrequency(null);

// One should be null, one should be set
assertNull(override.getRateLimitPerFrequency());
assertEquals(Integer.valueOf(60), override.getRateLimitFrequencyInSeconds());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Objects;

import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest.TaskRateLimitOverride;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;

public class StartWorkflowInput {
Expand All @@ -34,6 +35,9 @@ public class StartWorkflowInput {
private String workflowId;
private String triggeringWorkflowId;

/** Optional per‐task dynamic rate-limit overrides (copied verbatim from the request). */
private Map<String, TaskRateLimitOverride> taskRateLimitOverrides;

public StartWorkflowInput() {}

public StartWorkflowInput(StartWorkflowRequest startWorkflowRequest) {
Expand All @@ -46,6 +50,7 @@ public StartWorkflowInput(StartWorkflowRequest startWorkflowRequest) {
this.externalInputPayloadStoragePath =
startWorkflowRequest.getExternalInputPayloadStoragePath();
this.taskToDomain = startWorkflowRequest.getTaskToDomain();
this.taskRateLimitOverrides = startWorkflowRequest.getTaskRateLimitOverrides();
}

public String getName() {
Expand Down Expand Up @@ -152,6 +157,16 @@ public void setTriggeringWorkflowId(String triggeringWorkflowId) {
this.triggeringWorkflowId = triggeringWorkflowId;
}

/* -------- Dynamic rate-limit overrides -------- */
public Map<String, TaskRateLimitOverride> getTaskRateLimitOverrides() {
return taskRateLimitOverrides;
}

public void setTaskRateLimitOverrides(
Map<String, TaskRateLimitOverride> taskRateLimitOverrides) {
this.taskRateLimitOverrides = taskRateLimitOverrides;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -169,6 +184,7 @@ public boolean equals(Object o) {
&& Objects.equals(parentWorkflowTaskId, that.parentWorkflowTaskId)
&& Objects.equals(event, that.event)
&& Objects.equals(taskToDomain, that.taskToDomain)
&& Objects.equals(taskRateLimitOverrides, that.taskRateLimitOverrides)
&& Objects.equals(triggeringWorkflowId, that.triggeringWorkflowId)
&& Objects.equals(workflowId, that.workflowId);
}
Expand All @@ -187,6 +203,7 @@ public int hashCode() {
parentWorkflowTaskId,
event,
taskToDomain,
taskRateLimitOverrides,
triggeringWorkflowId,
workflowId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,9 @@ public String startWorkflow(StartWorkflowInput input) {
workflow.setUpdatedTime(null);
workflow.setEvent(input.getEvent());
workflow.setTaskToDomain(input.getTaskToDomain());
// copy dynamic task-level rate-limit overrides (may be empty, preserves backward
// compatibility)
workflow.setTaskRateLimitOverrides(input.getTaskRateLimitOverrides());
workflow.setVariables(workflowDefinition.getVariables());

if (workflowInput != null && !workflowInput.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
doWhileTask.setTaskType(TaskType.TASK_TYPE_DO_WHILE);
doWhileTask.setStatus(TaskModel.Status.IN_PROGRESS);
doWhileTask.setStartTime(System.currentTimeMillis());
doWhileTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
doWhileTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds());
doWhileTask.setRetryCount(taskMapperContext.getRetryCount());

// Apply static defaults or dynamic per-workflow overrides for rate limits
TaskMapperUtils.applyRateLimits(workflowModel, workflowTask, taskDefinition, doWhileTask);

Map<String, Object> taskInput =
parametersUtils.getTaskInputV2(
workflowTask.getInputParameters(),
Expand Down
Loading
Loading