Skip to content

Commit 0ca1e9b

Browse files
committed
NIFI-14547 MiNiFi - Resolve asset references in flows coming from C2 server
1 parent 32b9a99 commit 0ca1e9b

File tree

16 files changed

+361
-57
lines changed

16 files changed

+361
-57
lines changed

minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ public void handleChange(InputStream flowConfigInputStream) throws Configuration
8888
backup(currentRawFlowConfigFile, backupRawFlowConfigFile);
8989

9090
byte[] rawFlow = toByteArray(flowConfigInputStream);
91-
VersionedDataflow rawDataFlow = flowSerDeService.deserialize(rawFlow);
92-
VersionedDataflow enrichedFlow = flowEnrichService.enrichFlow(rawDataFlow);
93-
byte[] serializedEnrichedFlow = flowSerDeService.serialize(enrichedFlow);
91+
VersionedDataflow dataFlow = flowSerDeService.deserialize(rawFlow);
92+
flowEnrichService.enrichFlow(dataFlow);
93+
byte[] serializedEnrichedFlow = flowSerDeService.serialize(dataFlow);
9494
persist(serializedEnrichedFlow, currentFlowConfigFile, true);
9595
restartInstance();
9696
persist(rawFlow, currentRawFlowConfigFile, false);

minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ public interface FlowEnrichService {
2929
* Responsible for enriching a VersionedDataflow instance
3030
*
3131
* @param versionedDataflow a VersionedDataflow instance
32-
* @return VersionedDataflow the enriched flow instance
3332
*/
34-
VersionedDataflow enrichFlow(VersionedDataflow versionedDataflow);
33+
void enrichFlow(VersionedDataflow versionedDataflow);
3534
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.nifi.minifi.commons.service;
19+
20+
import org.apache.nifi.controller.flow.VersionedDataflow;
21+
22+
public interface FlowPropertyAssetReferenceResolver {
23+
/**
24+
* Responsible for resolving asset reference properties in a VersionedDataflow instance
25+
*
26+
* @param flow a VersionedDataflow instance to resolve its asset reference properties
27+
*/
28+
void resolveAssetReferenceProperties(VersionedDataflow flow);
29+
}

minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowPropertyEncryptor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ public interface FlowPropertyEncryptor {
2929
* Responsible for encrypting sensitive properties in a VersionedDataflow instance
3030
*
3131
* @param flow a VersionedDataflow instance to encrypt its sensitive properties
32-
* @return VersionedDataflow the flow instance with encrypted sensitive properties
3332
*/
34-
VersionedDataflow encryptSensitiveProperties(VersionedDataflow flow);
33+
void encryptSensitiveProperties(VersionedDataflow flow);
3534
}

minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public StandardFlowEnrichService(ReadableProperties minifiProperties) {
9797
}
9898

9999
@Override
100-
public VersionedDataflow enrichFlow(VersionedDataflow versionedDataflow) {
100+
public void enrichFlow(VersionedDataflow versionedDataflow) {
101101
versionedDataflow.setReportingTasks(ofNullable(versionedDataflow.getReportingTasks()).orElseGet(ArrayList::new));
102102
versionedDataflow.setRegistries(ofNullable(versionedDataflow.getRegistries()).orElseGet(ArrayList::new));
103103
versionedDataflow.setControllerServices(ofNullable(versionedDataflow.getControllerServices()).orElseGet(ArrayList::new));
@@ -147,8 +147,6 @@ public VersionedDataflow enrichFlow(VersionedDataflow versionedDataflow) {
147147
Map<String, String> idToInstanceIdMap = createIdToInstanceIdMap(rootGroup);
148148
setConnectableComponentsInstanceId(rootGroup, idToInstanceIdMap);
149149
}
150-
151-
return versionedDataflow;
152150
}
153151

154152
private void createDefaultParameterContext(VersionedDataflow versionedDataflow) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.nifi.minifi.commons.service;
19+
20+
import org.apache.nifi.controller.flow.VersionedDataflow;
21+
import org.apache.nifi.flow.VersionedConfigurableExtension;
22+
import org.apache.nifi.flow.VersionedProcessGroup;
23+
24+
import java.nio.file.Path;
25+
import java.util.List;
26+
import java.util.Optional;
27+
import java.util.Set;
28+
import java.util.function.Function;
29+
import java.util.stream.Stream;
30+
31+
import static java.util.Optional.ofNullable;
32+
import static java.util.stream.Stream.concat;
33+
34+
public class StandardFlowPropertyAssetReferenceResolverService implements FlowPropertyAssetReferenceResolver {
35+
36+
private static final String ASSET_REFERENCE_PREFIX = "@{asset-id:";
37+
private static final String ASSET_REFERENCE_SUFFIX = "}";
38+
private static final String EMPTY_STRING = "";
39+
40+
private final Function<String, Optional<Path>> assetPathResolver;
41+
42+
public StandardFlowPropertyAssetReferenceResolverService(Function<String, Optional<Path>> assetPathResolver) {
43+
this.assetPathResolver = assetPathResolver;
44+
}
45+
46+
@Override
47+
public void resolveAssetReferenceProperties(VersionedDataflow flow) {
48+
fetchFlowComponents(flow).forEach(component -> {
49+
component.getProperties().entrySet().stream()
50+
.filter(e -> isAssetReference(e.getValue()))
51+
.forEach(entry -> entry.setValue(getAssetAbsolutePathOrThrowIllegalStateException(entry.getValue())));
52+
});
53+
}
54+
55+
private boolean isAssetReference(String value) {
56+
return value != null
57+
&& value.startsWith(ASSET_REFERENCE_PREFIX)
58+
&& value.endsWith(ASSET_REFERENCE_SUFFIX);
59+
}
60+
61+
private Stream<? extends VersionedConfigurableExtension> fetchFlowComponents(VersionedDataflow flow) {
62+
return concat(
63+
ofNullable(flow.getControllerServices()).orElse(List.of()).stream(),
64+
fetchComponentsRecursively(flow.getRootGroup())
65+
);
66+
}
67+
68+
private Stream<? extends VersionedConfigurableExtension> fetchComponentsRecursively(VersionedProcessGroup processGroup) {
69+
return concat(
70+
Stream.of(
71+
ofNullable(processGroup.getProcessors()).orElse(Set.of()),
72+
ofNullable(processGroup.getControllerServices()).orElse(Set.of())
73+
)
74+
.flatMap(Set::stream),
75+
ofNullable(processGroup.getProcessGroups()).orElse(Set.of()).stream()
76+
.flatMap(this::fetchComponentsRecursively)
77+
);
78+
}
79+
80+
private String getAssetAbsolutePathOrThrowIllegalStateException(String assetReference) {
81+
String resourceId = assetReference.replace(ASSET_REFERENCE_PREFIX, EMPTY_STRING)
82+
.replace(ASSET_REFERENCE_SUFFIX, EMPTY_STRING);
83+
return assetPathResolver.apply(resourceId)
84+
.map(Path::toString)
85+
.orElseThrow(() -> new IllegalStateException("Resource '" + resourceId + "' not found"));
86+
}
87+
}

minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptor.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,14 @@ public StandardFlowPropertyEncryptor(PropertyEncryptor propertyEncryptor, Runtim
5555
}
5656

5757
@Override
58-
public VersionedDataflow encryptSensitiveProperties(VersionedDataflow flow) {
58+
public void encryptSensitiveProperties(VersionedDataflow flow) {
5959
encryptParameterContextsProperties(flow);
6060

6161
Map<String, Set<String>> sensitivePropertiesByComponentType = Optional.of(flowProvidedSensitiveProperties(flow))
6262
.filter(not(Map::isEmpty))
6363
.orElseGet(this::runtimeManifestSensitiveProperties);
6464

6565
encryptFlowComponentsProperties(flow, sensitivePropertiesByComponentType);
66-
67-
return flow;
6866
}
6967

7068
private void encryptParameterContextsProperties(VersionedDataflow flow) {

minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichServiceTest.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,10 @@ public void testFlowIsLeftIntactIfEnrichingIsNotNecessary() {
6161
VersionedDataflow testFlow = loadDefaultFlow();
6262

6363
FlowEnrichService testFlowEnrichService = new StandardFlowEnrichService(new StandardReadableProperties(properties));
64-
VersionedDataflow enrichedFlow = testFlowEnrichService.enrichFlow(testFlow);
64+
testFlowEnrichService.enrichFlow(testFlow);
6565

6666
byte[] testFlowBytes = flowToString(testFlow).getBytes(UTF_8);
67-
byte[] enrichedFlowBytes = flowToString(enrichedFlow).getBytes(UTF_8);
67+
byte[] enrichedFlowBytes = flowToString(testFlow).getBytes(UTF_8);
6868
assertArrayEquals(testFlowBytes, enrichedFlowBytes);
6969
}
7070

@@ -80,10 +80,10 @@ public void testMissingRootGroupIdsAreFilledIn() {
8080
uuid.when(UUID::randomUUID).thenReturn(expectedIdentifier);
8181

8282
FlowEnrichService testFlowEnrichService = new StandardFlowEnrichService(new StandardReadableProperties(properties));
83-
VersionedDataflow enrichedFlow = testFlowEnrichService.enrichFlow(testFlow);
83+
testFlowEnrichService.enrichFlow(testFlow);
8484

85-
assertEquals(expectedIdentifier.toString(), enrichedFlow.getRootGroup().getIdentifier());
86-
assertEquals(expectedIdentifier.toString(), enrichedFlow.getRootGroup().getInstanceIdentifier());
85+
assertEquals(expectedIdentifier.toString(), testFlow.getRootGroup().getIdentifier());
86+
assertEquals(expectedIdentifier.toString(), testFlow.getRootGroup().getInstanceIdentifier());
8787
}
8888
}
8989

@@ -100,13 +100,13 @@ public void testCommonSslControllerServiceIsAddedWithBundleVersionAndProcessorCo
100100
));
101101

102102
FlowEnrichService testFlowEnrichService = new StandardFlowEnrichService(new StandardReadableProperties(properties));
103-
VersionedDataflow enrichedFlow = testFlowEnrichService.enrichFlow(testFlow);
103+
testFlowEnrichService.enrichFlow(testFlow);
104104

105-
assertEquals(1, enrichedFlow.getRootGroup().getControllerServices().size());
106-
VersionedControllerService sslControllerService = enrichedFlow.getRootGroup().getControllerServices().iterator().next();
105+
assertEquals(1, testFlow.getRootGroup().getControllerServices().size());
106+
VersionedControllerService sslControllerService = testFlow.getRootGroup().getControllerServices().iterator().next();
107107
assertEquals(PARENT_SSL_CONTEXT_SERVICE_NAME, sslControllerService.getName());
108108
assertEquals(StringUtils.EMPTY, sslControllerService.getBundle().getVersion());
109-
Set<VersionedProcessor> processors = enrichedFlow.getRootGroup().getProcessors();
109+
Set<VersionedProcessor> processors = testFlow.getRootGroup().getProcessors();
110110
assertEquals(2, processors.size());
111111
assertTrue(
112112
processors.stream()
@@ -132,9 +132,9 @@ public void testProvenanceReportingTaskIsAdded() {
132132
VersionedDataflow testFlow = loadDefaultFlow();
133133

134134
FlowEnrichService testFlowEnrichService = new StandardFlowEnrichService(new StandardReadableProperties(properties));
135-
VersionedDataflow enrichedFlow = testFlowEnrichService.enrichFlow(testFlow);
135+
testFlowEnrichService.enrichFlow(testFlow);
136136

137-
List<VersionedReportingTask> reportingTasks = enrichedFlow.getReportingTasks();
137+
List<VersionedReportingTask> reportingTasks = testFlow.getReportingTasks();
138138
assertEquals(1, reportingTasks.size());
139139
VersionedReportingTask provenanceReportingTask = reportingTasks.get(0);
140140
assertEquals(SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME, provenanceReportingTask.getName());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.nifi.minifi.commons.service;
19+
20+
import org.apache.nifi.controller.flow.VersionedDataflow;
21+
import org.apache.nifi.flow.VersionedControllerService;
22+
import org.apache.nifi.flow.VersionedProcessGroup;
23+
import org.apache.nifi.flow.VersionedProcessor;
24+
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.api.extension.ExtendWith;
26+
import org.mockito.InjectMocks;
27+
import org.mockito.Mock;
28+
import org.mockito.junit.jupiter.MockitoExtension;
29+
30+
import java.nio.file.Path;
31+
import java.util.HashMap;
32+
import java.util.Map;
33+
import java.util.Optional;
34+
import java.util.Set;
35+
import java.util.function.Function;
36+
37+
import static org.junit.jupiter.api.Assertions.assertEquals;
38+
import static org.junit.jupiter.api.Assertions.assertThrows;
39+
import static org.mockito.Mockito.when;
40+
41+
@ExtendWith(MockitoExtension.class)
42+
public class StandardFlowPropertyAssetReferenceResolverServiceTest {
43+
44+
private Map<String, String> processorProperties;
45+
private Map<String, String> controllerServiceProperties;
46+
47+
@Mock
48+
private Function<String, Optional<Path>> assetPathResolver;
49+
50+
@InjectMocks
51+
private StandardFlowPropertyAssetReferenceResolverService victim;
52+
53+
@Test
54+
public void testResolveAssetReferenceProperties() {
55+
initProperties();
56+
VersionedDataflow dataFlow = aVersionedDataflow();
57+
58+
when(assetPathResolver.apply("asset1")).thenReturn(Optional.of(Path.of("asset1Path")));
59+
when(assetPathResolver.apply("asset2")).thenReturn(Optional.of(Path.of("asset2Path")));
60+
61+
victim.resolveAssetReferenceProperties(dataFlow);
62+
63+
verifyProperties();
64+
}
65+
66+
@Test
67+
public void testResolveNestedAssetReferenceProperties() {
68+
initProperties();
69+
VersionedDataflow dataFlow = aVersionedDataflowWithNestedProcessGroup();
70+
71+
when(assetPathResolver.apply("asset1")).thenReturn(Optional.of(Path.of("asset1Path")));
72+
when(assetPathResolver.apply("asset2")).thenReturn(Optional.of(Path.of("asset2Path")));
73+
74+
victim.resolveAssetReferenceProperties(dataFlow);
75+
76+
verifyProperties();
77+
}
78+
79+
@Test
80+
public void testResolveAssetReferencePropertiesThrowIllegalStateException() {
81+
initProperties();
82+
VersionedDataflow dataFlow = aVersionedDataflow();
83+
84+
when(assetPathResolver.apply("asset1")).thenReturn(Optional.empty());
85+
86+
assertThrows(IllegalStateException.class, () -> {
87+
victim.resolveAssetReferenceProperties(dataFlow);
88+
});
89+
}
90+
91+
private void initProperties() {
92+
processorProperties = new HashMap<>();
93+
processorProperties.put("assetReferenceProperty", "@{asset-id:asset1}");
94+
processorProperties.put("notAssetReferenceProperty", "some value1");
95+
96+
controllerServiceProperties = new HashMap<>();
97+
controllerServiceProperties.put("assetReferenceProperty", "@{asset-id:asset2}");
98+
controllerServiceProperties.put("notAssetReferenceProperty", "some value2");
99+
}
100+
101+
private void verifyProperties() {
102+
assertEquals(processorProperties.get("assetReferenceProperty"), "asset1Path");
103+
assertEquals(processorProperties.get("notAssetReferenceProperty"), "some value1");
104+
assertEquals(controllerServiceProperties.get("assetReferenceProperty"), "asset2Path");
105+
assertEquals(controllerServiceProperties.get("notAssetReferenceProperty"), "some value2");
106+
}
107+
108+
private VersionedDataflow aVersionedDataflow() {
109+
VersionedDataflow versionedDataflow = new VersionedDataflow();
110+
versionedDataflow.setRootGroup(aVersionedProcessGroup());
111+
return versionedDataflow;
112+
}
113+
114+
private VersionedDataflow aVersionedDataflowWithNestedProcessGroup() {
115+
VersionedDataflow versionedDataflow = new VersionedDataflow();
116+
VersionedProcessGroup versionedProcessGroup = new VersionedProcessGroup();
117+
versionedProcessGroup.setProcessGroups(Set.of(aVersionedProcessGroup()));
118+
versionedDataflow.setRootGroup(versionedProcessGroup);
119+
120+
return versionedDataflow;
121+
}
122+
123+
private VersionedProcessGroup aVersionedProcessGroup() {
124+
VersionedProcessGroup versionedProcessGroup = new VersionedProcessGroup();
125+
VersionedProcessor versionedProcessor = new VersionedProcessor();
126+
VersionedControllerService versionedControllerService = new VersionedControllerService();
127+
128+
versionedControllerService.setProperties(controllerServiceProperties);
129+
versionedProcessor.setProperties(processorProperties);
130+
131+
versionedProcessGroup.setProcessors(Set.of(versionedProcessor));
132+
versionedProcessGroup.setControllerServices(Set.of(versionedControllerService));
133+
134+
return versionedProcessGroup;
135+
}
136+
}

0 commit comments

Comments
 (0)