From 0188425f8ad31fc0b35a5c7260e770e90fd7a72f Mon Sep 17 00:00:00 2001 From: Mike Moser Date: Tue, 22 Apr 2025 19:39:58 +0000 Subject: [PATCH 1/2] NIFI-14478 Removed ability for a version controlled Process Group to be compared against anything but what's in the versioned FlowRegistry --- ...tandardVersionedComponentSynchronizer.java | 2 +- .../nifi/groups/StandardProcessGroup.java | 1 - .../groups/FlowSynchronizationOptions.java | 22 ------------------- .../StandardStatelessGroupNodeFactory.java | 1 - .../VersionedFlowSynchronizer.java | 1 - 5 files changed, 1 insertion(+), 26 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index 3cc325862f5e..1e8314537868 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -517,7 +517,7 @@ private void synchronize(final ProcessGroup group, final VersionedProcessGroup p .storageLocation(storageLocation) .flowName(flowId) .version(version) - .flowSnapshot(syncOptions.isUpdateGroupVersionControlSnapshot() ? proposed : null) + .flowSnapshot(null) .status(new StandardVersionedFlowStatus(flowState, flowState.getDescription())) .build(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 786b8313f72d..81d7eb4a6db0 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -3836,7 +3836,6 @@ public void updateFlow(final VersionedExternalFlow proposedSnapshot, final Strin .ignoreLocalModifications(!verifyNotDirty) .updateDescendantVersionedFlows(updateDescendantVersionedFlows) .updateGroupSettings(updateSettings) - .updateGroupVersionControlSnapshot(true) .updateRpgUrls(false) .propertyDecryptor(value -> null) .build(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java index f609a8b88b7a..b857f07f053d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java @@ -30,7 +30,6 @@ public class FlowSynchronizationOptions { private final boolean ignoreLocalModifications; private final boolean updateSettings; private final boolean updateDescendantVersionedFlows; - private final boolean updateGroupVersionControlSnapshot; private final boolean updateRpgUrls; private final Duration componentStopTimeout; private final ComponentStopTimeoutAction timeoutAction; @@ -45,7 +44,6 @@ private FlowSynchronizationOptions(final Builder builder) { this.ignoreLocalModifications = builder.ignoreLocalModifications; this.updateSettings = builder.updateSettings; this.updateDescendantVersionedFlows = builder.updateDescendantVersionedFlows; - this.updateGroupVersionControlSnapshot = builder.updateGroupVersionControlSnapshot; this.updateRpgUrls = builder.updateRpgUrls; this.componentStopTimeout = builder.componentStopTimeout; this.timeoutAction = builder.timeoutAction; @@ -77,10 +75,6 @@ public boolean isUpdateDescendantVersionedFlows() { return updateDescendantVersionedFlows; } - public boolean isUpdateGroupVersionControlSnapshot() { - return updateGroupVersionControlSnapshot; - } - public boolean isUpdateRpgUrls() { return updateRpgUrls; } @@ -112,7 +106,6 @@ public static class Builder { private boolean ignoreLocalModifications = false; private boolean updateSettings = true; private boolean updateDescendantVersionedFlows = true; - private boolean updateGroupVersionControlSnapshot = true; private boolean updateRpgUrls = false; private ScheduledStateChangeListener scheduledStateChangeListener; private PropertyDecryptor propertyDecryptor = value -> value; @@ -183,20 +176,6 @@ public Builder updateDescendantVersionedFlows(final boolean updateDescendantVers return this; } - /** - * When a Process Group is version controlled, it tracks whether or not there are any local modifications by comparing the current dataflow - * to a snapshot of what the Versioned Flow looks like. If this value is set to true, when the Process Group is synchronized - * with a VersionedProcessGroup, that VersionedProcessGroup will become the snapshot of what the Versioned Flow looks like. If false, - * the snapshot is not updated. - * - * @param updateGroupVersionControlSnapshot true to update the snapshot, false otherwise - * @return the builder - */ - public Builder updateGroupVersionControlSnapshot(final boolean updateGroupVersionControlSnapshot) { - this.updateGroupVersionControlSnapshot = updateGroupVersionControlSnapshot; - return this; - } - /** * Specifies whether or not the URLs / "Target URIs" of a Remote Process Group that exists in both the proposed flow and the current flow * should be updated to match that of the proposed flow @@ -285,7 +264,6 @@ public static Builder from(final FlowSynchronizationOptions options) { builder.ignoreLocalModifications = options.isIgnoreLocalModifications(); builder.updateSettings = options.isUpdateSettings(); builder.updateDescendantVersionedFlows = options.isUpdateDescendantVersionedFlows(); - builder.updateGroupVersionControlSnapshot = options.isUpdateGroupVersionControlSnapshot(); builder.updateRpgUrls = options.isUpdateRpgUrls(); builder.propertyDecryptor = options.getPropertyDecryptor(); builder.componentStopTimeout = options.getComponentStopTimeout(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java index 1d9aae134403..568165457546 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java @@ -288,7 +288,6 @@ public Future> fetch(final Set bundleCoordinates, .topLevelGroupId(group.getIdentifier()) .updateDescendantVersionedFlows(true) .updateGroupSettings(true) - .updateGroupVersionControlSnapshot(false) .updateRpgUrls(true) .ignoreLocalModifications(true) .build(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java index 76b71636b277..4cf90739c7ae 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java @@ -429,7 +429,6 @@ private void synchronizeFlow(final FlowController controller, final DataFlow exi .ignoreLocalModifications(true) .updateGroupSettings(true) .updateDescendantVersionedFlows(true) - .updateGroupVersionControlSnapshot(false) .updateRpgUrls(true) .propertyDecryptor(encryptor::decrypt) .build(); From 70a9e4e253a902172fcf5312fa225e2643ef574a Mon Sep 17 00:00:00 2001 From: Mike Moser Date: Thu, 8 May 2025 17:53:52 +0000 Subject: [PATCH 2/2] NIFI-14478 add system test for copy/paste a versioned PG with local modifications --- .../nifi/tests/system/pg/CopyPasteIT.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/CopyPasteIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/CopyPasteIT.java index c86b4f619c04..f66c800695e9 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/CopyPasteIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/CopyPasteIT.java @@ -402,4 +402,31 @@ public void testCopyPasteProcessGroupUnderVersionControlMaintainsVersionedCompon assertNotEquals(terminate1.getComponent().getId(), terminate2.getComponent().getId()); assertEquals(terminate1.getComponent().getVersionedComponentId(), terminate2.getComponent().getVersionedComponentId()); } + + @Test + public void testCopyPasteVersionControlledProcessGroupWithLocalChanges() throws NiFiClientException, IOException, InterruptedException { + // Create a top-level PG and version it with nothing in it. + final FlowRegistryClientEntity clientEntity = registerClient(); + final ProcessGroupEntity topLevel1 = getClientUtil().createProcessGroup("Top Level 1", "root"); + + // Create a lower level PG and add a Processor. Commit as Version 1 of the PG. + final ProcessGroupEntity innerGroup = getClientUtil().createProcessGroup("Inner 1", topLevel1.getId()); + ProcessorEntity terminate1 = getClientUtil().createProcessor("TerminateFlowFile", innerGroup.getId()); + VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(innerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID); + assertEquals("1", vciEntity.getVersionControlInformation().getVersion()); + + // Modify the processor and wait for the group to show it has local changes + getClientUtil().updateProcessorSchedulingPeriod(terminate1, "2 mins"); + waitFor(() -> VersionControlInformationDTO.LOCALLY_MODIFIED.equals(getClientUtil().getVersionControlState(innerGroup.getId()))); + + // Copy the versioned and modified process group and paste it to a new PG. + final ProcessGroupEntity topLevel2 = getClientUtil().createProcessGroup("Top Level 2", "root"); + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setProcessGroups(Set.of(innerGroup.getId())); + final FlowDTO flowDto = getClientUtil().copyAndPaste(topLevel1.getId(), copyRequestEntity, topLevel2.getRevision(), topLevel2.getId()); + + // The new PG should also show as being locally modified from Version 1 of the PG + final String pastedGroupId = flowDto.getProcessGroups().iterator().next().getId(); + waitFor(() -> VersionControlInformationDTO.LOCALLY_MODIFIED.equals(getClientUtil().getVersionControlState(pastedGroupId))); + } }