Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Acquire only one exclusive external worker job per process instance #3510

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ protected VariableScope resolveVariableScopeInternal(Job job) {
return null;
}

@Override
protected String resolveJobLockIdInternal(Job job) {
return job.getScopeId();
}

@Override
protected boolean handleJobInsertInternal(Job job) {
// Currently, nothing extra needed (but counting relationships can be added later here).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ protected VariableScope resolveVariableScopeInternal(Job job) {
return null;
}

@Override
protected String resolveJobLockIdInternal(Job job){
return job.getProcessInstanceId();
}

@Override
protected boolean handleJobInsertInternal(Job job) {
// add link to execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
Expand Down Expand Up @@ -937,6 +938,53 @@ void testSimpleWithBoundaryErrorAndVariables() {
);
}

@Test
@Deployment
void testSimpleParallel() {
ProcessInstance processInstance = runtimeService.createProcessInstanceBuilder()
.processDefinitionKey("simpleParallelExternalWorker")
.start();

List<ExternalWorkerJob> externalWorkerJobs = managementService.createExternalWorkerJobQuery()
.list();
assertThat(externalWorkerJobs)
.extracting(ExternalWorkerJob::getElementId, ExternalWorkerJob::getJobHandlerConfiguration)
.containsExactlyInAnyOrder(
tuple("externalWorkerTask1", "simple"),
tuple("externalWorkerTask2", "simple")
);

List<AcquiredExternalWorkerJob> acquiredJobs = managementService.createExternalWorkerJobAcquireBuilder()
.topic("simple", Duration.ofMinutes(30))
.acquireAndLock(2, "testWorker");

//Both external worker tasks have the exclusive flag set
//so only one job can be acquired because they cannot be executed concurrently.
assertThat(acquiredJobs).hasSize(1);

AcquiredExternalWorkerJob acquiredJob1 = acquiredJobs.get(0);

managementService.createExternalWorkerCompletionBuilder(acquiredJob1.getId(), "testWorker")
.complete();

//Acquire the second external worker job
acquiredJobs = managementService.createExternalWorkerJobAcquireBuilder()
.topic("simple", Duration.ofMinutes(30))
.acquireAndLock(2, "testWorker");

assertThat(acquiredJobs).hasSize(1);

AcquiredExternalWorkerJob acquiredJob2 = acquiredJobs.get(0);

managementService.createExternalWorkerCompletionBuilder(acquiredJob2.getId(), "testWorker")
.complete();

assertThat(Arrays.asList(acquiredJob1.getElementId(), acquiredJob2.getElementId()))
.containsExactlyInAnyOrder("externalWorkerTask1", "externalWorkerTask2");

waitForJobExecutorToProcessAllJobs(5000, 300);
}

@Test
void testAcquireWithInvalidArguments() {
assertThatThrownBy(() -> managementService.createExternalWorkerJobAcquireBuilder().acquireAndLock(10, "someWorker"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions id="definitions"
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:flowable="http://flowable.org/bpmn"
targetNamespace="Examples"
>

<process id="simpleParallelExternalWorker">

<startEvent id="theStart"/>

<sequenceFlow sourceRef="theStart" targetRef="parallelGateway1"/>

<parallelGateway id="parallelGateway1"></parallelGateway>

<sequenceFlow sourceRef="parallelGateway1" targetRef="externalWorkerTask1"/>

<serviceTask id="externalWorkerTask1" flowable:type="external-worker" flowable:topic="simple"/>

<sequenceFlow sourceRef="parallelGateway1" targetRef="externalWorkerTask2"/>

<serviceTask id="externalWorkerTask2" flowable:type="external-worker" flowable:topic="simple"/>

<sequenceFlow sourceRef="externalWorkerTask1" targetRef="parallelGateway2"/>

<sequenceFlow sourceRef="externalWorkerTask2" targetRef="parallelGateway2"/>

<parallelGateway id="parallelGateway2"></parallelGateway>

<sequenceFlow sourceRef="parallelGateway2" targetRef="theEnd"/>

<endEvent id="theEnd"/>

</process>

</definitions>
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public interface InternalJobManager {
void registerScopedInternalJobManager(String scopeType, InternalJobManager internalJobManager);

VariableScope resolveVariableScope(Job job);

String resolveJobLockId(Job job);

boolean handleJobInsert(Job job);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ public final VariableScope resolveVariableScope(Job job) {

protected abstract VariableScope resolveVariableScopeInternal(Job job);

@Override
public final String resolveJobLockId(Job job) {
InternalJobManager internalJobManager = findInternalJobManager(job);
if (internalJobManager == null) {
return resolveJobLockIdInternal(job);
}

return internalJobManager.resolveJobLockId(job);
}

protected abstract String resolveJobLockIdInternal(Job job);

@Override
public final boolean handleJobInsert(Job job) {
InternalJobManager internalJobManager = findInternalJobManager(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
import org.flowable.common.engine.api.FlowableIllegalArgumentException;
Expand Down Expand Up @@ -73,27 +75,44 @@ public List<AcquiredExternalWorkerJob> execute(CommandContext commandContext) {

int lockTimeInMillis = (int) builder.getLockDuration().abs().toMillis();
List<AcquiredExternalWorkerJob> acquiredJobs = new ArrayList<>(jobs.size());
Set<String> jobLockIds = new HashSet<>();

for (ExternalWorkerJobEntity job : jobs) {
lockJob(commandContext, job, lockTimeInMillis);
Map<String, Object> variables = null;
if (internalJobManager != null) {
VariableScope variableScope = internalJobManager.resolveVariableScope(job);
if (variableScope != null) {
variables = variableScope.getVariables();
if (hasUnLockedJobScope(internalJobManager, job, jobLockIds)) {
lockJob(commandContext, job, lockTimeInMillis);
Map<String, Object> variables = null;
if (internalJobManager != null) {
VariableScope variableScope = internalJobManager.resolveVariableScope(job);
if (variableScope != null) {
variables = variableScope.getVariables();
}

if (job.isExclusive()) {
internalJobManager.lockJobScope(job);
String jobLockId = internalJobManager.resolveJobLockId(job);
if (jobLockId != null) {
jobLockIds.add(jobLockId);
}
}
}

if (job.isExclusive()) {
internalJobManager.lockJobScope(job);
}
acquiredJobs.add(new AcquiredExternalWorkerJobImpl(job, variables));
}

acquiredJobs.add(new AcquiredExternalWorkerJobImpl(job, variables));
}

return acquiredJobs;
}

protected boolean hasUnLockedJobScope(InternalJobManager internalJobManager, ExternalWorkerJobEntity job, Set<String> jobLockIds) {
if (internalJobManager != null && job.isExclusive()) {
String jobLockId = internalJobManager.resolveJobLockId(job);
if (jobLockId != null && jobLockIds.contains(jobLockId)) {
return false;
}
}
return true;
}

protected void lockJob(CommandContext commandContext, JobInfoEntity job, int lockTimeInMillis) {
GregorianCalendar gregorianCalendar = new GregorianCalendar();
gregorianCalendar.setTime(jobServiceConfiguration.getClock().getCurrentTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ protected <T extends Job> T mockCmmnJob(Class<T> jobClass, String mockName) {
private static class TestScopeAwareInternalJobManager extends ScopeAwareInternalJobManager {

protected Map<Job, VariableScope> variableScopeByJob = new HashMap<>();
protected Map<Job, String> lockIdByJob = new HashMap<>();
protected Map<Job, Boolean> insertJobInternalByJob = new HashMap<>();
protected Job jobDeleteInternal;
protected Job lockJobScopeInternal;
Expand All @@ -221,6 +222,12 @@ protected VariableScope resolveVariableScopeInternal(Job job) {
return variableScopeByJob.get(job);
}

@Override
protected String resolveJobLockIdInternal(Job job) {
invokedMethods.add("resolveJobLockIdInternal");
return lockIdByJob.get(job);
}

@Override
protected boolean handleJobInsertInternal(Job job) {
invokedMethods.add("handleJobInsertInternal");
Expand Down