Skip to content

Commit c76604a

Browse files
authored
Add ownership references (#114)
1 parent c58a259 commit c76604a

File tree

51 files changed

+586
-327
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+586
-327
lines changed

Makefile

+5-5
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ deploy-kafka: deploy deploy-flink
6969
kubectl apply -f ./deploy/samples/kafkadb.yaml
7070

7171
undeploy-kafka:
72-
kubectl delete kafkatopic.kafka.strimzi.io -n kafka --all || echo "skipping"
72+
kubectl delete kafkatopic.kafka.strimzi.io --all || echo "skipping"
7373
kubectl delete strimzi -n kafka --all || echo "skipping"
7474
kubectl delete pvc -l strimzi.io/name=one-kafka -n kafka || echo "skipping"
7575
kubectl delete -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka || echo "skipping"
@@ -103,8 +103,8 @@ undeploy-dev-environment: undeploy-venice undeploy-kafka undeploy-flink undeploy
103103
# Integration test setup intended to be run locally
104104
integration-tests: deploy-dev-environment
105105
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
106-
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
107-
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
106+
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m
107+
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m
108108
kubectl port-forward -n kafka svc/one-kafka-external-bootstrap 9092 & echo $$! > port-forward.pid
109109
kubectl port-forward -n flink svc/flink-sql-gateway 8083 & echo $$! > port-forward-2.pid
110110
kubectl port-forward -n flink svc/basic-session-deployment-rest 8081 & echo $$! > port-forward-3.pid
@@ -116,8 +116,8 @@ integration-tests: deploy-dev-environment
116116
# kind cluster used in github workflow needs to have different routing set up, avoiding the need to forward kafka ports
117117
integration-tests-kind: deploy-dev-environment
118118
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
119-
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
120-
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
119+
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m
120+
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m
121121
./gradlew intTest -i
122122

123123
generate-models:

deploy/dev/kafka.yaml

+37-1
Original file line numberDiff line numberDiff line change
@@ -65,5 +65,41 @@ spec:
6565
storage:
6666
type: ephemeral
6767
entityOperator:
68-
topicOperator: {}
68+
topicOperator:
69+
watchedNamespace: default
70+
71+
---
72+
73+
apiVersion: rbac.authorization.k8s.io/v1
74+
kind: ClusterRole
75+
metadata:
76+
namespace: kafka
77+
name: kafka-operator
78+
rules:
79+
- apiGroups: ["kafka.strimzi.io"]
80+
resources: ["*"]
81+
verbs: ["*"]
82+
- apiGroups: ["rbac.authorization.k8s.io"]
83+
resources: ["roles", "rolebindings"]
84+
verbs: ["*"]
85+
86+
87+
---
88+
89+
apiVersion: rbac.authorization.k8s.io/v1
90+
kind: ClusterRoleBinding
91+
metadata:
92+
name: kafka-operator
93+
namespace: kafka
94+
subjects:
95+
- kind: ServiceAccount
96+
name: one-entity-operator
97+
namespace: kafka
98+
- kind: ServiceAccount
99+
name: strimzi-cluster-operator
100+
namespace: kafka
101+
roleRef:
102+
kind: ClusterRole
103+
name: kafka-operator
104+
apiGroup: rbac.authorization.k8s.io
69105

deploy/samples/flink-template.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ spec:
1010
kind: FlinkSessionJob
1111
metadata:
1212
name: {{name}}
13-
namespace: flink
1413
spec:
1514
deploymentName: basic-session-deployment
1615
job:

deploy/samples/kafkadb.yaml

+4-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ spec:
2121
kind: KafkaTopic
2222
metadata:
2323
name: {{name}}
24-
namespace: kafka
2524
labels:
2625
strimzi.io/cluster: one
2726
spec:
@@ -46,11 +45,11 @@ spec:
4645
apiVersion: kafka.strimzi.io/v1beta2
4746
kind: KafkaTopic
4847
metadata:
49-
name: existing-topic-1
50-
namespace: kafka
48+
name: kafka-database-existing-topic-1
5149
labels:
5250
strimzi.io/cluster: one
5351
spec:
52+
topicName: existing-topic-1
5453
partitions: 1
5554
replicas: 1
5655
config:
@@ -62,11 +61,11 @@ spec:
6261
apiVersion: kafka.strimzi.io/v1beta2
6362
kind: KafkaTopic
6463
metadata:
65-
name: existing-topic-2
66-
namespace: kafka
64+
name: kafka-database-existing-topic-2
6765
labels:
6866
strimzi.io/cluster: one
6967
spec:
68+
topicName: existing-topic-2
7069
partitions: 1
7170
replicas: 1
7271
config:

hoptimator-api/src/main/java/com/linkedin/hoptimator/Connector.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import java.util.Map;
55

66

7-
public interface Connector<T> {
7+
public interface Connector {
88

9-
Map<String, String> configure(T t) throws SQLException;
9+
Map<String, String> configure() throws SQLException;
1010
}

hoptimator-api/src/main/java/com/linkedin/hoptimator/ConnectorProvider.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@
66

77
public interface ConnectorProvider {
88

9-
<T> Collection<Connector<T>> connectors(Class<T> clazz, Properties connectionProperties);
9+
/** Find connectors capable of configuring data plane connectors for the obj. */
10+
<T> Collection<Connector> connectors(T obj, Properties connectionProperties);
1011
}
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,6 @@
11
package com.linkedin.hoptimator;
22

3-
import java.sql.SQLException;
4-
import java.util.List;
5-
63

74
public interface Deployable {
85

9-
void create() throws SQLException;
10-
11-
void delete() throws SQLException;
12-
13-
void update() throws SQLException;
14-
15-
/** Render a list of specs, usually YAML. */
16-
List<String> specify() throws SQLException;
176
}

hoptimator-api/src/main/java/com/linkedin/hoptimator/Deployer.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
import java.util.List;
55

66

7-
public interface Deployer<T> {
7+
/** Deploys something. */
8+
public interface Deployer {
89

9-
void create(T t) throws SQLException;
10+
void create() throws SQLException;
1011

11-
void update(T t) throws SQLException;
12+
void delete() throws SQLException;
1213

13-
void delete(T t) throws SQLException;
14+
void update() throws SQLException;
1415

15-
List<String> specify(T t) throws SQLException;
16+
/** Render a list of specs, usually YAML. */
17+
List<String> specify() throws SQLException;
1618
}

hoptimator-api/src/main/java/com/linkedin/hoptimator/DeployerProvider.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@
66

77
public interface DeployerProvider {
88

9-
<T> Collection<Deployer<T>> deployers(Class<T> clazz, Properties connectionProperties);
9+
/** Find deployers capable of deploying the obj. */
10+
<T extends Deployable> Collection<Deployer> deployers(T obj, Properties connectionProperties);
1011
}

hoptimator-api/src/main/java/com/linkedin/hoptimator/Job.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import java.util.function.Function;
44

55

6-
public class Job {
6+
public class Job implements Deployable {
77

88
private final String name;
99
private final Sink sink;

hoptimator-api/src/main/java/com/linkedin/hoptimator/MaterializedView.java

+2-26
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,20 @@
55
import java.util.stream.Collectors;
66

77

8-
public class MaterializedView {
8+
public class MaterializedView extends View implements Deployable {
99

1010
private final String database;
11-
private final List<String> path;
12-
private final String viewSql;
1311
private final Function<SqlDialect, String> pipelineSql;
1412
private final Pipeline pipeline;
1513

1614
public MaterializedView(String database, List<String> path, String viewSql, Function<SqlDialect, String> pipelineSql,
1715
Pipeline pipeline) {
16+
super(path, viewSql);
1817
this.database = database;
19-
this.path = path;
20-
this.viewSql = viewSql;
2118
this.pipelineSql = pipelineSql;
2219
this.pipeline = pipeline;
2320
}
2421

25-
/** SQL query which defines this view, e.g. SELECT ... FROM ... */
26-
public String viewSql() {
27-
return viewSql;
28-
}
29-
3022
public Pipeline pipeline() {
3123
return pipeline;
3224
}
@@ -40,22 +32,6 @@ public String database() {
4032
return database;
4133
}
4234

43-
public String table() {
44-
return path.get(path.size() - 1);
45-
}
46-
47-
public String schema() {
48-
return path.get(path.size() - 2);
49-
}
50-
51-
public List<String> path() {
52-
return path;
53-
}
54-
55-
protected String pathString() {
56-
return path.stream().collect(Collectors.joining("."));
57-
}
58-
5935
@Override
6036
public String toString() {
6137
return "MaterializedView[" + pathString() + "]";
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,33 @@
11
package com.linkedin.hoptimator;
22

33
import java.sql.SQLException;
4-
import java.util.ArrayList;
5-
import java.util.List;
4+
import java.util.Collection;
65

76

87
/**
9-
* A set of Deployable objects that work together to deliver data.
8+
* A job, along with its sources and sink.
109
*/
11-
public class Pipeline implements Deployable {
10+
public class Pipeline {
1211

13-
private List<Deployable> deployables;
12+
private Collection<Source> sources;
13+
private Sink sink;
14+
private Job job;
1415

15-
public Pipeline(List<Deployable> deployables) {
16-
this.deployables = deployables;
16+
public Pipeline(Collection<Source> sources, Sink sink, Job job) {
17+
this.sources = sources;
18+
this.sink = sink;
19+
this.job = job;
1720
}
1821

19-
@Override
20-
public void create() throws SQLException {
21-
for (Deployable deployable : deployables) {
22-
deployable.create();
23-
}
22+
public Collection<Source> sources() {
23+
return sources;
2424
}
2525

26-
@Override
27-
public void delete() throws SQLException {
28-
for (Deployable deployable : deployables) {
29-
deployable.delete();
30-
}
26+
public Sink sink() {
27+
return sink;
3128
}
3229

33-
@Override
34-
public void update() throws SQLException {
35-
for (Deployable deployable : deployables) {
36-
deployable.update();
37-
}
38-
}
39-
40-
@Override
41-
public List<String> specify() throws SQLException {
42-
List<String> specs = new ArrayList<>();
43-
for (Deployable deployable : deployables) {
44-
specs.addAll(deployable.specify());
45-
}
46-
return specs;
30+
public Job job() {
31+
return job;
4732
}
4833
}

hoptimator-api/src/main/java/com/linkedin/hoptimator/Sink.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
import java.util.Map;
55

66

7-
public class Sink extends Source {
7+
// Unclear if Sink will always extend Source
8+
public class Sink extends Source implements Deployable {
89

910
public Sink(String database, List<String> path, Map<String, String> options) {
1011
super(database, path, options);

hoptimator-api/src/main/java/com/linkedin/hoptimator/Source.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import java.util.stream.Collectors;
66

77

8-
public class Source {
8+
public class Source implements Deployable {
99

1010
private final String database;
1111
private final List<String> path;

hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java

+11-10
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111
import java.util.function.Function;
1212

1313

14-
public interface Validator<T> {
15-
16-
void validate(T t, Issues issues);
14+
public interface Validator extends Validated {
1715

1816
static void validateSubdomainName(String s, Issues issues) {
1917
// N.B. we don't aim to be efficient here; rather, as verbose as possible.
@@ -44,15 +42,18 @@ static void notYetImplemented(Issues issues) {
4442
issues.warn("Validation not implemented for this object");
4543
}
4644

47-
/** Validator that invokes `Validated.validate()`. */
48-
class DefaultValidator<T> implements Validator<T> {
45+
/** Validator that invokes `validate()` on the target object. */
46+
class DefaultValidator<T extends Validated> implements Validator {
47+
48+
private final T t;
49+
50+
public DefaultValidator(T t) {
51+
this.t = t;
52+
}
4953

5054
@Override
51-
public void validate(T t, Issues issues) {
52-
if (t instanceof Validated) {
53-
Validated v = (Validated) t;
54-
v.validate(issues.child(t.getClass().getSimpleName()));
55-
}
55+
public void validate(Issues issues) {
56+
t.validate(issues.child(t.getClass().getSimpleName()));
5657
}
5758
}
5859

hoptimator-api/src/main/java/com/linkedin/hoptimator/ValidatorProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55

66
public interface ValidatorProvider {
77

8-
<T> Collection<Validator<T>> validators(Class<T> clazz);
8+
<T> Collection<Validator> validators(T obj);
99
}

0 commit comments

Comments
 (0)