Skip to content

Commit 757801c

Browse files
committed
switching to schema in progress
1 parent f201839 commit 757801c

File tree

56 files changed

+770
-358
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+770
-358
lines changed

api/src/main/proto/dataset_info.proto

-10
This file was deleted.

api/src/main/proto/dataset_service.proto

+29-17
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,48 @@ syntax = "proto3";
22

33
package sustain.synopsis.metadata;
44

5-
import "dataset_info.proto";
5+
import "google/protobuf/timestamp.proto";
66
import "health_check.proto";
77

8-
message RegisterDatasetRequest {
9-
DatasetInfo datasetInfo = 1;
10-
}
8+
service DatasetService {
9+
rpc checkHealth (HealthCheckRequest) returns (HealthCheckResponse);
10+
rpc registerDataset (RegisterDatasetRequest) returns (RegisterDatasetResponse);
11+
rpc createIngestSession (CreateIngestSessionRequest) returns (CreateIngestSessionResponse);
1112

12-
message RegisterDatasetResponse {
13+
rpc getDatasetSessions (GetDatasetSessionsRequest) returns (GetDatasetSessionsResponse);
14+
}
1315

16+
message Session {
17+
int64 sessionId = 1;
18+
int64 temporalBracketLength= 2;
19+
int32 geohashLength= 3;
20+
string binConfig = 4;
1421
}
1522

16-
message GetDatasetInfoRequest {
17-
string id = 1;
23+
message GetDatasetSessionsRequest {
24+
string datasetId = 1;
1825
}
1926

20-
message GetDatasetInfoResponse {
21-
DatasetInfo datasetInfo = 1;
27+
message GetDatasetSessionsResponse {
28+
repeated Session session = 1;
2229
}
2330

24-
message GetDatasetInfosRequest {
31+
message RegisterDatasetRequest {
32+
string datasetId = 1;
2533

2634
}
2735

28-
message GetDatasetInfosResponse {
29-
repeated DatasetInfo info = 1;
36+
message RegisterDatasetResponse {}
37+
38+
39+
message CreateIngestSessionRequest {
40+
string dataset_id = 1;
41+
int64 temporalBracketLength= 2;
42+
int32 geohashLength= 3;
43+
string binConfig = 4;
3044
}
3145

32-
service DatasetService {
33-
rpc checkHealth (HealthCheckRequest) returns (HealthCheckResponse);
34-
rpc registerDataset (RegisterDatasetRequest) returns (RegisterDatasetResponse);
35-
rpc getDatasetInfo (GetDatasetInfoRequest) returns (GetDatasetInfoResponse);
36-
rpc getDatasetInfos (GetDatasetInfosRequest) returns (GetDatasetInfosResponse);
46+
message CreateIngestSessionResponse {
47+
int64 session_id = 1;
48+
google.protobuf.Timestamp created_at = 2;
3749
}

api/src/main/proto/ingest_service.proto

-24
This file was deleted.
+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
syntax = "proto3";
2+
3+
package sustain.synopsis.metadata;
4+
5+
import "health_check.proto";
6+
7+
message QueryRequest {
8+
string query = 1;
9+
}
10+
11+
message QueryResponse {
12+
string result = 1;
13+
}
14+
15+
service QueryService {
16+
rpc checkHealth (HealthCheckRequest) returns (HealthCheckResponse);
17+
rpc query (QueryRequest) returns (QueryResponse);
18+
}

api/src/main/proto/user_service.proto

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ syntax = "proto3";
22

33
package sustain.synopsis.metadata;
44

5-
import "dataset_info.proto";
65
import "health_check.proto";
76

87
message RegisterUserRequest {

common/src/main/java/sustain/synopsis/metadata/MySqlDataSource.java

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package sustain.synopsis.metadata;
22

3+
import sustain.synopsis.metadata.config.DatabaseConfig;
4+
35
import java.sql.Connection;
46
import java.sql.DriverManager;
57
import java.sql.SQLException;

common/src/main/java/sustain/synopsis/metadata/ServiceServerBase.java

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.io.IOException;
99

1010
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
11+
import sustain.synopsis.metadata.config.ClusterConfig;
1112

1213
public class ServiceServerBase {
1314

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package sustain.synopsis.metadata.config;
2+
3+
public class CassandraConfig {
4+
5+
private String keyspace;
6+
private String host;
7+
private int port;
8+
9+
public String getKeyspace() {
10+
return keyspace;
11+
}
12+
13+
public void setKeyspace(String keyspace) {
14+
this.keyspace = keyspace;
15+
}
16+
17+
public String getHost() {
18+
return host;
19+
}
20+
21+
public void setHost(String host) {
22+
this.host = host;
23+
}
24+
25+
public int getPort() {
26+
return port;
27+
}
28+
29+
public void setPort(int port) {
30+
this.port = port;
31+
}
32+
33+
}

common/src/main/java/sustain/synopsis/metadata/ClusterConfig.java common/src/main/java/sustain/synopsis/metadata/config/ClusterConfig.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,12 @@
1-
package sustain.synopsis.metadata;
1+
package sustain.synopsis.metadata.config;
22

33

44
public class ClusterConfig {
55

6-
private ServerConfig ingest_server;
76
private ServerConfig dataset_server;
87
private ServerConfig user_server;
98
private DatabaseConfig database;
10-
11-
public ServerConfig getIngest_server() {
12-
return ingest_server;
13-
}
14-
15-
public void setIngest_server(ServerConfig ingest_server) {
16-
this.ingest_server = ingest_server;
17-
}
9+
private CassandraConfig cassandra;
1810

1911
public ServerConfig getDataset_server() {
2012
return dataset_server;
@@ -40,4 +32,12 @@ public void setDatabase(DatabaseConfig database) {
4032
this.database = database;
4133
}
4234

35+
public CassandraConfig getCassandra() {
36+
return cassandra;
37+
}
38+
39+
public void setCassandra(CassandraConfig cassandra) {
40+
this.cassandra = cassandra;
41+
}
42+
4343
}

common/src/main/java/sustain/synopsis/metadata/DatabaseConfig.java common/src/main/java/sustain/synopsis/metadata/config/DatabaseConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package sustain.synopsis.metadata;
1+
package sustain.synopsis.metadata.config;
22

33
public class DatabaseConfig {
44

common/src/main/java/sustain/synopsis/metadata/ServerConfig.java common/src/main/java/sustain/synopsis/metadata/config/ServerConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package sustain.synopsis.metadata;
1+
package sustain.synopsis.metadata.config;
22

33
public class ServerConfig {
44

common/src/main/java/sustain/synopsis/metadata/HashAlgorithm.java common/src/main/java/sustain/synopsis/metadata/util/HashAlgorithm.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package sustain.synopsis.metadata;
1+
package sustain.synopsis.metadata.util;
22

33
import java.io.ByteArrayOutputStream;
44
import java.io.IOException;

common/src/main/java/sustain/synopsis/metadata/JwtTokenUtil.java common/src/main/java/sustain/synopsis/metadata/util/JwtTokenUtil.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package sustain.synopsis.metadata;
1+
package sustain.synopsis.metadata.util;
22

33
import com.auth0.jwt.JWT;
44
import com.auth0.jwt.JWTVerifier;

common/src/main/java/sustain/synopsis/metadata/TestUtils.java common/src/main/java/sustain/synopsis/metadata/util/TestUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package sustain.synopsis.metadata;
1+
package sustain.synopsis.metadata.util;
22

33
import java.util.Random;
44

Original file line numberDiff line numberDiff line change
@@ -1,14 +1,64 @@
11
package sustain.synopsis.metadata.dataset;
22

3-
import sustain.synopsis.metadata.DatasetInfoOuterClass;
4-
import sustain.synopsis.metadata.DatasetInfoOuterClass.DatasetInfo;
3+
import sustain.synopsis.metadata.DatasetServiceOuterClass.Session;
4+
import sustain.synopsis.metadata.config.DatabaseConfig;
5+
import sustain.synopsis.metadata.MySqlDataSource;
56

7+
import java.sql.PreparedStatement;
8+
import java.sql.ResultSet;
9+
import java.sql.SQLException;
10+
import java.sql.Timestamp;
11+
import java.time.Instant;
12+
import java.util.ArrayList;
613
import java.util.List;
714

8-
public interface DatasetDataSource {
15+
public class DatasetDataSource extends MySqlDataSource {
916

10-
public void insertDataset(DatasetInfo datasetInfo) throws Exception;
11-
public DatasetInfo getDatasetInfo(String id) throws Exception;
12-
public List<DatasetInfo> getDatasetInfos() throws Exception;
17+
public DatasetDataSource(DatabaseConfig db) throws SQLException, ClassNotFoundException {
18+
super(db);
19+
}
20+
21+
public void insertDataset(String id) throws SQLException {
22+
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO dataset VALUE(?,NOW())");
23+
pstmt.setString(1, id);
24+
pstmt.execute();
25+
}
26+
27+
public int insertSession(String datasetId, long temporalBracketLength, int geohashLength, String binConfig) throws SQLException {
28+
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO session VALUE(?,?,?,?,?)");
29+
pstmt.setString(1, datasetId);
30+
pstmt.setLong(2, temporalBracketLength);
31+
pstmt.setInt(3,geohashLength);
32+
pstmt.setString(4, binConfig);
33+
pstmt.setTimestamp(5, Timestamp.from(Instant.now()));
34+
pstmt.execute();
35+
36+
PreparedStatement preparedStatement = conn.prepareStatement("SELECT MAX(session_id) from session where dataset_id=?");
37+
preparedStatement.setString(1,datasetId);
38+
ResultSet resultSet = preparedStatement.executeQuery();
39+
resultSet.next();
40+
return resultSet.getInt(0);
41+
}
42+
43+
private static Session readSessionFromResultSet(ResultSet rs) throws SQLException {
44+
return Session.newBuilder()
45+
.setSessionId(rs.getInt("session_id"))
46+
.setTemporalBracketLength(rs.getLong("temoporal_bracket_length"))
47+
.setGeohashLength(rs.getInt("geohash_length"))
48+
.setBinConfig(rs.getString("bin_config"))
49+
.build();
50+
}
51+
52+
public List<Session> getDatasetSessions(String datasetId) throws SQLException {
53+
PreparedStatement pstmt = conn.prepareStatement(
54+
"SELECT session_id, temporal_bracket_length, geohash_length, bin_config from session where dataset_id=?");
55+
ResultSet resultSet = pstmt.executeQuery();
56+
57+
ArrayList<Session> sessions = new ArrayList<>();
58+
while (resultSet.next()) {
59+
sessions.add(readSessionFromResultSet(resultSet));
60+
}
61+
return sessions;
62+
}
1363

1464
}
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package sustain.synopsis.metadata.dataset;
22

33
import io.grpc.stub.StreamObserver;
4-
import sustain.synopsis.metadata.ClusterConfig;
4+
import sustain.synopsis.metadata.HealthCheck;
5+
import sustain.synopsis.metadata.config.ClusterConfig;
56
import sustain.synopsis.metadata.DatasetServiceGrpc;
7+
68
import java.sql.SQLException;
79
import static sustain.synopsis.metadata.DatasetServiceOuterClass.*;
810

@@ -11,13 +13,13 @@ public class DatasetService extends DatasetServiceGrpc.DatasetServiceImplBase {
1113
private final DatasetDataSource dataSource;
1214

1315
public DatasetService(ClusterConfig config) throws SQLException, ClassNotFoundException {
14-
this.dataSource = new MySqlDatasetDataSource(config.getDatabase());
16+
this.dataSource = new DatasetDataSource(config.getDatabase());
1517
}
1618

1719
@Override
1820
public void registerDataset(RegisterDatasetRequest request, StreamObserver<RegisterDatasetResponse> responseObserver) {
1921
try {
20-
dataSource.insertDataset(request.getDatasetInfo());
22+
dataSource.insertDataset(request.getDatasetId());
2123
RegisterDatasetResponse resp = RegisterDatasetResponse.newBuilder().build();
2224
responseObserver.onNext(resp);
2325
responseObserver.onCompleted();
@@ -29,13 +31,13 @@ public void registerDataset(RegisterDatasetRequest request, StreamObserver<Regis
2931
}
3032

3133
@Override
32-
public void getDatasetInfo(GetDatasetInfoRequest request, StreamObserver<GetDatasetInfoResponse> responseObserver) {
34+
public void createIngestSession(CreateIngestSessionRequest request, StreamObserver<CreateIngestSessionResponse> responseObserver) {
3335
try {
34-
GetDatasetInfoResponse resp = GetDatasetInfoResponse.newBuilder()
35-
.setDatasetInfo(dataSource.getDatasetInfo(request.getId()))
36-
.build();
37-
responseObserver.onNext(resp);
38-
responseObserver.onCompleted();
36+
dataSource.insertSession(
37+
request.getDatasetId(),
38+
request.getTemporalBracketLength(),
39+
request.getGeohashLength(),
40+
request.getBinConfig());
3941

4042
} catch (Exception e) {
4143
e.printStackTrace();
@@ -44,18 +46,24 @@ public void getDatasetInfo(GetDatasetInfoRequest request, StreamObserver<GetData
4446
}
4547

4648
@Override
47-
public void getDatasetInfos(GetDatasetInfosRequest request, StreamObserver<GetDatasetInfosResponse> responseObserver) {
49+
public void getDatasetSessions(GetDatasetSessionsRequest request, StreamObserver<GetDatasetSessionsResponse> responseObserver) {
4850
try {
49-
GetDatasetInfosResponse resp = GetDatasetInfosResponse.newBuilder()
50-
.addAllInfo(dataSource.getDatasetInfos())
51+
GetDatasetSessionsResponse response = GetDatasetSessionsResponse.newBuilder()
52+
.addAllSession(dataSource.getDatasetSessions(request.getDatasetId()))
5153
.build();
5254

53-
responseObserver.onNext(resp);
55+
responseObserver.onNext(response);
5456
responseObserver.onCompleted();
5557

5658
} catch (Exception e) {
5759
e.printStackTrace();
5860
responseObserver.onError(e);
5961
}
6062
}
63+
64+
@Override
65+
public void checkHealth(HealthCheck.HealthCheckRequest request, StreamObserver<HealthCheck.HealthCheckResponse> responseObserver) {
66+
super.checkHealth(request, responseObserver);
67+
}
68+
6169
}

0 commit comments

Comments
 (0)