Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,16 @@
*/
package org.springframework.batch.core.repository.dao.mongodb;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;

import org.springframework.batch.core.job.JobExecution;
import org.springframework.batch.core.job.JobInstance;
import org.springframework.batch.core.step.StepExecution;
import org.springframework.batch.core.repository.dao.StepExecutionDao;
import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter;
import org.springframework.batch.core.repository.persistence.converter.StepExecutionConverter;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
Expand All @@ -36,6 +34,7 @@

/**
* @author Mahmoud Ben Hassine
* @author Jinwoo Bae
* @since 5.2.0
*/
public class MongoStepExecutionDao implements StepExecutionDao {
Expand Down Expand Up @@ -100,34 +99,42 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut
@Override
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
// TODO optimize the query
// get all step executions
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = new ArrayList<>();
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
Query jobExecutionQuery = query(where("jobInstanceId").is(jobInstance.getId()));
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
.find(jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) {
stepExecutions.addAll(jobExecution.getStepExecutions());
}
// sort step executions by creation date then id (see contract) and return the
// first one
Optional<org.springframework.batch.core.repository.persistence.StepExecution> lastStepExecution = stepExecutions
.stream()
.filter(stepExecution -> stepExecution.getName().equals(stepName))
.min(Comparator
.comparing(org.springframework.batch.core.repository.persistence.StepExecution::getCreateTime)
.thenComparing(org.springframework.batch.core.repository.persistence.StepExecution::getId));
if (lastStepExecution.isPresent()) {
org.springframework.batch.core.repository.persistence.StepExecution stepExecution = lastStepExecution.get();
JobExecution jobExecution = this.jobExecutionConverter.toJobExecution(jobExecutions.stream()
.filter(execution -> execution.getJobExecutionId().equals(stepExecution.getJobExecutionId()))
.findFirst()
.get(), jobInstance);
return this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution);

if (jobExecutions.isEmpty()) {
return null;
}
else {

List<Long> jobExecutionIds = jobExecutions.stream()
.map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId)
.toList();

Query stepExecutionQuery = query(where("name").is(stepName).and("jobExecutionId").in(jobExecutionIds))
.with(Sort.by(Sort.Direction.DESC, "createTime", "stepExecutionId"))
.limit(1);

org.springframework.batch.core.repository.persistence.StepExecution stepExecution = this.mongoOperations
.findOne(stepExecutionQuery, org.springframework.batch.core.repository.persistence.StepExecution.class,
STEP_EXECUTIONS_COLLECTION_NAME);

if (stepExecution == null) {
return null;
}

org.springframework.batch.core.repository.persistence.JobExecution jobExecution = jobExecutions.stream()
.filter(execution -> execution.getJobExecutionId().equals(stepExecution.getJobExecutionId()))
.findFirst()
.orElse(null);

if (jobExecution != null) {
JobExecution jobExecutionDomain = this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance);
return this.stepExecutionConverter.toStepExecution(stepExecution, jobExecutionDomain);
}

return null;
}

@Override
Expand All @@ -144,22 +151,23 @@ public void addStepExecutions(JobExecution jobExecution) {

@Override
public long countStepExecutions(JobInstance jobInstance, String stepName) {
long count = 0;
// TODO optimize the count query
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) {
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = jobExecution
.getStepExecutions();
for (org.springframework.batch.core.repository.persistence.StepExecution stepExecution : stepExecutions) {
if (stepExecution.getName().equals(stepName)) {
count++;
}
}
Query jobExecutionQuery = query(where("jobInstanceId").is(jobInstance.getId()));
List<Long> jobExecutionIds = this.mongoOperations
.find(jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME)
.stream()
.map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId)
.toList();

if (jobExecutionIds.isEmpty()) {
return 0;
}
return count;

// Count step executions directly from BATCH_STEP_EXECUTION collection
Query stepQuery = query(where("name").is(stepName).and("jobExecutionId").in(jobExecutionIds));
return this.mongoOperations.count(stepQuery,
org.springframework.batch.core.repository.persistence.StepExecution.class,
STEP_EXECUTIONS_COLLECTION_NAME);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.dao.StepExecutionDao;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -56,6 +57,11 @@ public JobRepository jobRepository(MongoTemplate mongoTemplate, MongoTransaction
return jobRepositoryFactoryBean.getObject();
}

@Bean
public StepExecutionDao stepExecutionDao(MongoTemplate mongoTemplate) {
return new org.springframework.batch.core.repository.dao.mongodb.MongoStepExecutionDao(mongoTemplate);
}

@Bean
public MongoDatabaseFactory mongoDatabaseFactory(MongoDBContainer mongoDBContainer) {
return new SimpleMongoClientDatabaseFactory(mongoDBContainer.getConnectionString() + "/test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.springframework.batch.core.repository.support;

import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Map;

import com.mongodb.client.MongoCollection;
Expand All @@ -29,17 +30,26 @@
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.testcontainers.junit.jupiter.Testcontainers;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.JobExecution;
import org.springframework.batch.core.job.JobInstance;
import org.springframework.batch.core.job.parameters.JobParameters;
import org.springframework.batch.core.job.parameters.JobParametersBuilder;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.dao.StepExecutionDao;
import org.springframework.batch.core.step.StepExecution;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;

/**
* @author Mahmoud Ben Hassine
* @author Jinwoo Bae
* @author Yanming Zhou
*/
@DirtiesContext
Expand All @@ -53,18 +63,25 @@ public class MongoDBJobRepositoryIntegrationTests {
@SuppressWarnings("removal")
@BeforeEach
public void setUp() {
// collections
// Clear existing collections to ensure clean state
mongoTemplate.getCollection("BATCH_JOB_INSTANCE").drop();
mongoTemplate.getCollection("BATCH_JOB_EXECUTION").drop();
mongoTemplate.getCollection("BATCH_STEP_EXECUTION").drop();
mongoTemplate.getCollection("BATCH_SEQUENCES").drop();

// sequences
mongoTemplate.createCollection("BATCH_JOB_INSTANCE");
mongoTemplate.createCollection("BATCH_JOB_EXECUTION");
mongoTemplate.createCollection("BATCH_STEP_EXECUTION");
// sequences
mongoTemplate.createCollection("BATCH_SEQUENCES");

mongoTemplate.getCollection("BATCH_SEQUENCES")
.insertOne(new Document(Map.of("_id", "BATCH_JOB_INSTANCE_SEQ", "count", 0L)));
mongoTemplate.getCollection("BATCH_SEQUENCES")
.insertOne(new Document(Map.of("_id", "BATCH_JOB_EXECUTION_SEQ", "count", 0L)));
mongoTemplate.getCollection("BATCH_SEQUENCES")
.insertOne(new Document(Map.of("_id", "BATCH_STEP_EXECUTION_SEQ", "count", 0L)));

// indices
mongoTemplate.indexOps("BATCH_JOB_INSTANCE")
.ensureIndex(new Index().on("jobName", Sort.Direction.ASC).named("job_name_idx"));
Expand Down Expand Up @@ -112,6 +129,58 @@ void testJobExecution(@Autowired JobOperator jobOperator, @Autowired Job job) th
dump(stepExecutionsCollection, "step execution = ");
}

/**
* Test for GitHub issue #4943: getLastStepExecution should work when JobExecution's
* embedded stepExecutions array is empty.
*
* <p>
* This can happen after abrupt shutdown when the embedded stepExecutions array is not
* synchronized, but BATCH_STEP_EXECUTION collection still contains the data.
*
*/
@Test
void testGetLastStepExecutionWithEmptyEmbeddedArray(@Autowired JobOperator jobOperator, @Autowired Job job,
@Autowired StepExecutionDao stepExecutionDao) throws Exception {
// Step 1: Run job normally
JobParameters jobParameters = new JobParametersBuilder().addString("name", "emptyArrayTest")
.addLocalDateTime("runtime", LocalDateTime.now())
.toJobParameters();

JobExecution jobExecution = jobOperator.start(job, jobParameters);
JobInstance jobInstance = jobExecution.getJobInstance();

// Verify job completed successfully
Assertions.assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());

// Step 2: Simulate the core issue - clear embedded stepExecutions array
// while keeping BATCH_STEP_EXECUTION collection intact
Query jobQuery = new Query(Criteria.where("jobExecutionId").is(jobExecution.getId()));
Update jobUpdate = new Update().set("stepExecutions", Collections.emptyList());
mongoTemplate.updateFirst(jobQuery, jobUpdate, "BATCH_JOB_EXECUTION");

// Step 3: Verify embedded array is empty but collection still has data
MongoCollection<Document> jobExecutionsCollection = mongoTemplate.getCollection("BATCH_JOB_EXECUTION");
MongoCollection<Document> stepExecutionsCollection = mongoTemplate.getCollection("BATCH_STEP_EXECUTION");

Document jobDoc = jobExecutionsCollection.find(new Document("jobExecutionId", jobExecution.getId())).first();
Assertions.assertTrue(jobDoc.getList("stepExecutions", Document.class).isEmpty(),
"Embedded stepExecutions array should be empty");
Assertions.assertEquals(2, stepExecutionsCollection.countDocuments(),
"BATCH_STEP_EXECUTION collection should still contain data");

// Step 4: Test the fix - getLastStepExecution should work despite empty embedded
// array
StepExecution lastStepExecution = stepExecutionDao.getLastStepExecution(jobInstance, "step1");
Assertions.assertNotNull(lastStepExecution,
"getLastStepExecution should find step execution even with empty embedded array");
Assertions.assertEquals("step1", lastStepExecution.getStepName());
Assertions.assertEquals(BatchStatus.COMPLETED, lastStepExecution.getStatus());

// Step 5: Test countStepExecutions also works
long stepCount = stepExecutionDao.countStepExecutions(jobInstance, "step1");
Assertions.assertEquals(1L, stepCount, "countStepExecutions should work despite empty embedded array");
}

private static void dump(MongoCollection<Document> collection, String prefix) {
for (Document document : collection.find()) {
System.out.println(prefix + document.toJson());
Expand Down
Loading