Skip to content

Commit f54aee4

Browse files
committed
Fix default namespacing
1 parent 9ee160d commit f54aee4

File tree

8 files changed

+97
-37
lines changed

8 files changed

+97
-37
lines changed

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java

+33-12
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import java.util.Optional;
1111

1212
import org.apache.commons.lang3.StringUtils;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
1315

1416
import io.kubernetes.client.apimachinery.GroupVersion;
1517
import io.kubernetes.client.common.KubernetesListObject;
@@ -25,15 +27,18 @@
2527

2628
public class K8sContext {
2729

30+
private final static Logger LOG = LoggerFactory.getLogger(K8sContext.class);
31+
public static final String DEFAULT_NAMESPACE = "default";
2832
private static final String ENV_OVERRIDE_BASEPATH = "KUBECONFIG_BASEPATH";
2933
private static K8sContext currentContext = null;
3034

3135
private final String name;
3236
private final String namespace;
33-
private final ApiClient apiClient;
37+
private ApiClient apiClient;
3438
private final SharedInformerFactory informerFactory;
3539

3640
public K8sContext(String name, String namespace, ApiClient apiClient) {
41+
LOG.info("K8sContext created for namespace: {}", namespace);
3742
this.name = name;
3843
this.namespace = namespace;
3944
this.apiClient = apiClient;
@@ -44,6 +49,12 @@ public ApiClient apiClient() {
4449
return apiClient;
4550
}
4651

52+
// Assigning a new api client should only happen once right after context creation.
53+
// Re-assigning a new api client can have unexpected consequences.
54+
public void apiClient(ApiClient apiClient) {
55+
this.apiClient = apiClient;
56+
}
57+
4758
public String name() {
4859
return name;
4960
}
@@ -57,8 +68,8 @@ public SharedInformerFactory informerFactory() {
5768
}
5869

5970
public <T extends KubernetesObject, U extends KubernetesListObject> void registerInformer(
60-
K8sApiEndpoint<T, U> endpoint, Duration resyncPeriod) {
61-
informerFactory.sharedIndexInformerFor(generic(endpoint), endpoint.elementType(), resyncPeriod.toMillis());
71+
K8sApiEndpoint<T, U> endpoint, Duration resyncPeriod, String watchNamespace) {
72+
informerFactory.sharedIndexInformerFor(generic(endpoint), endpoint.elementType(), resyncPeriod.toMillis(), watchNamespace);
6273
}
6374

6475
public DynamicKubernetesApi dynamic(String apiVersion, String plural) {
@@ -105,6 +116,10 @@ public static void useContext(K8sContext context) {
105116
currentContext = context;
106117
}
107118

119+
120+
// If $HOME/.kube/config is defined, use that config file.
121+
// If POD_NAMESPACE_FILEPATH is defined, use that config file.
122+
// Use Config.defaultClient() and defaultNamespace if no config file is found.
108123
static K8sContext defaultContext() throws IOException {
109124
Path path = Paths.get(System.getProperty("user.home"), ".kube", "config");
110125
if (Files.exists(path)) {
@@ -113,19 +128,13 @@ static K8sContext defaultContext() throws IOException {
113128
KubeConfig kubeConfig = KubeConfig.loadKubeConfig(r);
114129
kubeConfig.setFile(file);
115130
ApiClient apiClient = addEnvOverrides(kubeConfig).build();
116-
String namespace = Optional.ofNullable(kubeConfig.getNamespace()).orElse("default");
131+
String namespace = Optional.ofNullable(kubeConfig.getNamespace()).orElse(DEFAULT_NAMESPACE);
117132
return new K8sContext(kubeConfig.getCurrentContext(), namespace, apiClient);
118133
}
119134
} else {
120135
ApiClient apiClient = Config.defaultClient();
121-
String filePath = System.getenv("POD_NAMESPACE_FILEPATH");
122-
String namespace;
123-
if (filePath == null) {
124-
namespace = "default";
125-
} else {
126-
namespace = new String(Files.readAllBytes(Paths.get(filePath)));
127-
}
128-
return new K8sContext("default", namespace, apiClient);
136+
String namespace = getNamespace();
137+
return new K8sContext(namespace, namespace, apiClient);
129138
}
130139
}
131140

@@ -139,4 +148,16 @@ private static ClientBuilder addEnvOverrides(KubeConfig kubeConfig) throws IOExc
139148

140149
return builder;
141150
}
151+
152+
private static String getNamespace() throws IOException {
153+
String filePath = System.getenv("POD_NAMESPACE_FILEPATH");
154+
if (filePath != null) {
155+
return new String(Files.readAllBytes(Paths.get(filePath)));
156+
}
157+
String namespace = System.getProperty("SELF_POD_NAMESPACE");
158+
if (namespace != null) {
159+
return namespace;
160+
}
161+
return DEFAULT_NAMESPACE;
162+
}
142163
}

hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java

+16-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.hoptimator.operator;
22

3+
import java.time.Duration;
34
import java.util.ArrayList;
45
import java.util.List;
56
import java.util.Properties;
@@ -23,6 +24,7 @@
2324
import io.kubernetes.client.util.Config;
2425

2526
import com.linkedin.hoptimator.catalog.Resource;
27+
import com.linkedin.hoptimator.k8s.K8sApiEndpoints;
2628
import com.linkedin.hoptimator.k8s.K8sContext;
2729
import com.linkedin.hoptimator.models.V1alpha1Subscription;
2830
import com.linkedin.hoptimator.models.V1alpha1SubscriptionList;
@@ -35,17 +37,17 @@ public class HoptimatorOperatorApp {
3537
private static final Logger log = LoggerFactory.getLogger(HoptimatorOperatorApp.class);
3638

3739
final String url;
38-
final String namespace;
40+
final String watchNamespace;
3941
final ApiClient apiClient;
4042
final Predicate<V1alpha1Subscription> subscriptionFilter;
4143
final Properties properties;
4244
final Resource.Environment environment;
4345

4446
/** This constructor is likely to evolve and break. */
45-
public HoptimatorOperatorApp(String url, String namespace, ApiClient apiClient,
47+
public HoptimatorOperatorApp(String url, String watchNamespace, ApiClient apiClient,
4648
Predicate<V1alpha1Subscription> subscriptionFilter, Properties properties) {
4749
this.url = url;
48-
this.namespace = namespace;
50+
this.watchNamespace = watchNamespace;
4951
this.apiClient = apiClient;
5052
this.subscriptionFilter = subscriptionFilter;
5153
this.properties = properties;
@@ -59,9 +61,10 @@ public static void main(String[] args) throws Exception {
5961

6062
Options options = new Options();
6163

62-
Option namespace = new Option("n", "namespace", true, "specified namespace");
63-
namespace.setRequired(false);
64-
options.addOption(namespace);
64+
Option watchNamespace = new Option("w", "watch", true,
65+
"namespace to watch for resource operations, empty string indicates all namespaces");
66+
watchNamespace.setRequired(false);
67+
options.addOption(watchNamespace);
6568

6669
CommandLineParser parser = new DefaultParser();
6770
HelpFormatter formatter = new HelpFormatter();
@@ -78,9 +81,10 @@ public static void main(String[] args) throws Exception {
7881
}
7982

8083
String urlInput = cmd.getArgs()[0];
81-
String namespaceInput = cmd.getOptionValue("namespace", "default");
84+
String watchNamespaceInput = cmd.getOptionValue("watch", "");
8285

83-
new HoptimatorOperatorApp(urlInput, namespaceInput, Config.defaultClient(), null, new Properties()).run();
86+
new HoptimatorOperatorApp(urlInput, watchNamespaceInput,
87+
Config.defaultClient(), null, new Properties()).run();
8488
}
8589

8690
public void run() throws Exception {
@@ -91,15 +95,18 @@ public void run() throws Exception {
9195

9296
apiClient.setHttpClient(apiClient.getHttpClient().newBuilder().readTimeout(0, TimeUnit.SECONDS).build());
9397
SharedInformerFactory informerFactory = new SharedInformerFactory(apiClient);
94-
Operator operator = new Operator(namespace, apiClient, informerFactory, properties);
98+
Operator operator = new Operator(watchNamespace, apiClient, informerFactory, properties);
9599
K8sContext context = K8sContext.currentContext();
100+
context.apiClient(apiClient);
96101

97102
operator.registerApi("Subscription", "subscription", "subscriptions", "hoptimator.linkedin.com", "v1alpha1",
98103
V1alpha1Subscription.class, V1alpha1SubscriptionList.class);
99104

100105
List<Controller> controllers = new ArrayList<>();
101106
controllers.addAll(ControllerService.controllers(operator));
102107
controllers.add(SubscriptionReconciler.controller(operator, plannerFactory, environment, subscriptionFilter));
108+
109+
context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5), watchNamespace);
103110
controllers.add(PipelineReconciler.controller(context));
104111

105112
ControllerManager controllerManager =

hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/PipelineOperatorApp.java

+38-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@
44
import java.util.ArrayList;
55
import java.util.List;
66

7+
import org.apache.commons.cli.CommandLine;
8+
import org.apache.commons.cli.CommandLineParser;
9+
import org.apache.commons.cli.DefaultParser;
10+
import org.apache.commons.cli.HelpFormatter;
11+
import org.apache.commons.cli.Option;
12+
import org.apache.commons.cli.Options;
13+
import org.apache.commons.cli.ParseException;
714
import org.slf4j.Logger;
815
import org.slf4j.LoggerFactory;
916

@@ -18,15 +25,44 @@
1825
public class PipelineOperatorApp {
1926
private static final Logger log = LoggerFactory.getLogger(PipelineOperatorApp.class);
2027

28+
final String watchNamespace;
29+
30+
public PipelineOperatorApp(String watchNamespace) {
31+
this.watchNamespace = watchNamespace;
32+
}
33+
2134
public static void main(String[] args) throws Exception {
22-
new PipelineOperatorApp().run();
35+
Options options = new Options();
36+
37+
Option watchNamespace = new Option("w", "watch", true,
38+
"namespace to watch for resource operations, empty string indicates all namespaces");
39+
watchNamespace.setRequired(false);
40+
options.addOption(watchNamespace);
41+
42+
CommandLineParser parser = new DefaultParser();
43+
HelpFormatter formatter = new HelpFormatter();
44+
CommandLine cmd;
45+
46+
try {
47+
cmd = parser.parse(options, args);
48+
} catch (ParseException e) {
49+
System.out.println(e.getMessage());
50+
formatter.printHelp("pipeline-operator", options);
51+
52+
System.exit(1);
53+
return;
54+
}
55+
56+
String watchNamespaceInput = cmd.getOptionValue("watch", "");
57+
58+
new PipelineOperatorApp(watchNamespaceInput).run();
2359
}
2460

2561
public void run() throws Exception {
2662
K8sContext context = K8sContext.currentContext();
2763

2864
// register informers
29-
context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5));
65+
context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5), watchNamespace);
3066

3167
List<Controller> controllers = new ArrayList<>();
3268
// TODO: add additional controllers from ControllerProvider SPI

hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java

-4
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,6 @@ protected Duration pendingRetryDuration() {
9797
}
9898

9999
public static Controller controller(K8sContext context) {
100-
// Duplicate call, only needed while still using HoptimatorOperatorApp,
101-
// when removed in favor of PipelineOperatorApp this call is redundant
102-
context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5));
103-
104100
Reconciler reconciler = new PipelineReconciler(context);
105101
return ControllerBuilder.defaultBuilder(context.informerFactory())
106102
.withReconciler(reconciler)

hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
3333
RelDataType key = rel(keySchema, typeFactory);
3434
RelDataType value = rel(valueSchema, typeFactory);
3535
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
36-
builder.addAll(value.getFieldList());
3736
if (key.isStruct()) {
3837
for (RelDataTypeField field: key.getFieldList()) {
3938
builder.add(KEY_PREFIX + field.getName(), field.getType());
4039
}
4140
} else {
4241
builder.add("KEY", key);
4342
}
43+
builder.addAll(value.getFieldList());
4444
RelDataType combinedSchema = builder.build();
4545
return DataTypeUtils.flatten(combinedSchema, typeFactory);
4646
}

hoptimator-venice/src/test/resources/venice-ddl-insert-all.id

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ spec:
1313
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
1414
args:
1515
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
16-
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
16+
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
1717
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
18-
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
19-
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`
18+
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
19+
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`
2020
jarURI: file:///opt/hoptimator-flink-runner.jar
2121
parallelism: 1
2222
upgradeMode: stateless

hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ spec:
1313
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
1414
args:
1515
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
16-
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
16+
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
1717
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
18-
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
19-
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS SIGNED) AS `intField`, `KEY_id` FROM `VENICE-CLUSTER0`.`test-store`
18+
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
19+
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`KEY_id`, `intField`) SELECT `KEY_id`, CAST(`stringField` AS SIGNED) AS `intField` FROM `VENICE-CLUSTER0`.`test-store`
2020
jarURI: file:///opt/hoptimator-flink-runner.jar
2121
parallelism: 1
2222
upgradeMode: stateless

hoptimator-venice/src/test/resources/venice-ddl-select.id

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ spec:
1313
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
1414
args:
1515
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
16-
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
16+
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
1717
- CREATE DATABASE IF NOT EXISTS `PIPELINE` WITH ()
18-
- CREATE TABLE IF NOT EXISTS `PIPELINE`.`SINK` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ()
19-
- INSERT INTO `PIPELINE`.`SINK` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1`
18+
- CREATE TABLE IF NOT EXISTS `PIPELINE`.`SINK` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ()
19+
- INSERT INTO `PIPELINE`.`SINK` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1`
2020
jarURI: file:///opt/hoptimator-flink-runner.jar
2121
parallelism: 1
2222
upgradeMode: stateless

0 commit comments

Comments
 (0)