Skip to content

[FixBug] To support YARN HA, a config field(JSON String) has been added dinky_cluster table #4369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ public class ClusterInstanceDTO {
@ApiModelProperty(value = "Enabled", required = true, dataType = "Boolean", example = "true")
private Boolean enabled;

@ApiModelProperty(
value = "config",
required = true,
dataType = "String",
example = "{\"applicaitonId\":\"application_1745048813572_0007\",\"resourceManager\":[\"0.0.0.0:8032\"]}",
notes =
"A JSON string that contains configuration information such as the ResourceManager and ApplicationID")
private String config;

public ClusterInstance toBean() {
ClusterInstance clusterInstance = new ClusterInstance();
BeanUtil.copyProperties(this, clusterInstance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ public class ClusterInstance extends SuperEntity<ClusterInstance> {
notes = "job manager host")
private String jobManagerHost;

@ApiModelProperty(
value = "config",
required = true,
dataType = "String",
example =
"{\"applicationId\":\"application_1745048813572_0007\",\"resourceManager\":\"127.0.0.1:8032,127.0.0.2:8032\"}",
notes =
"A JSON string that contains configuration information such as the ResourceManager and ApplicationID")
private String config;

@ApiModelProperty(
value = "version",
required = true,
Expand All @@ -95,7 +105,7 @@ public class ClusterInstance extends SuperEntity<ClusterInstance> {
required = true,
dataType = "Boolean",
example = "test",
notes = "is auto registers, if this record from projob/application mode , it will be true")
notes = "is auto registers, if this record from per-job/application mode , it will be true")
private boolean autoRegisters;

@ApiModelProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ public class ClusterInstanceMapping {
notes = "job manager host")
private String jobManagerHost;

@ApiModelProperty(
value = "config",
required = true,
dataType = "String",
example = "{\"applicaitonId\":\"application_1745048813572_0007\",\"resourceManager\":[\"0.0.0.0:8032\"]}",
notes =
"A JSON string that contains configuration information such as the ResourceManager and ApplicationID")
private String config;

@ApiModelProperty(
value = "version",
required = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public boolean success() {
.taskId(taskId)
.autoRegisters(true)
.enabled(true)
.config(job.getConfig())
.build());

if (Asserts.isNotNull(clusterInstance)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,18 @@ private static void checkAndRefreshCluster(JobInfoDetail jobInfoDetail) {
return;
}

FlinkClusterInfo flinkClusterInfo = clusterInstanceService.checkHeartBeat(
jobInfoDetail.getClusterInstance().getHosts(),
jobInfoDetail.getClusterInstance().getJobManagerHost());
FlinkClusterInfo flinkClusterInfo = clusterInstanceService.checkHeartBeat(jobInfoDetail.getClusterInstance());
if (flinkClusterInfo.isEffective()
&& Asserts.isNotNull(jobInfoDetail.getClusterInstance().getJobManagerHost())
&& !jobInfoDetail
.getClusterInstance()
.getJobManagerHost()
.equals(flinkClusterInfo.getJobManagerAddress())) {
jobInfoDetail.getClusterInstance().setJobManagerHost(flinkClusterInfo.getJobManagerAddress());
jobInfoDetail.getHistory().setJobManagerAddress(flinkClusterInfo.getJobManagerAddress());
clusterInstanceService.updateById(jobInfoDetail.getClusterInstance());
historyService.updateById(jobInfoDetail.getHistory());
}
if (!flinkClusterInfo.isEffective()) {
ClusterConfigurationDTO clusterCfg = jobInfoDetail.getClusterConfiguration();
ClusterInstance clusterInstance = jobInfoDetail.getClusterInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ public interface ClusterInstanceService extends ISuperService<ClusterInstance> {
/**
* check cluster heartbeat status
*
* @param hosts {@link String} eg: host1:8081,host2:8082,host3:8083,host4:8084
* @param host {@link String} eg: host1
* @param clusterInstance {@link ClusterInstance}
* @return {@link FlinkClusterInfo}
*/
FlinkClusterInfo checkHeartBeat(String hosts, String host);
FlinkClusterInfo checkHeartBeat(ClusterInstance clusterInstance);

/**
* get job manager address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.dinky.assertion.DinkyAssert;
import org.dinky.cluster.FlinkCluster;
import org.dinky.cluster.FlinkClusterInfo;
import org.dinky.cluster.FlinkClusterInstanceConfig;
import org.dinky.data.dto.ClusterInstanceDTO;
import org.dinky.data.enums.GatewayType;
import org.dinky.data.enums.Status;
Expand Down Expand Up @@ -60,6 +61,8 @@
import org.springframework.transaction.annotation.Transactional;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import cn.hutool.core.lang.Assert;
import cn.hutool.core.thread.ThreadUtil;
Expand All @@ -82,16 +85,26 @@ public class ClusterInstanceServiceImpl extends SuperServiceImpl<ClusterInstance
@Lazy
private TaskService taskService;

private static final ObjectMapper mapper = new ObjectMapper();

@Override
public FlinkClusterInfo checkHeartBeat(String hosts, String host) {
return FlinkCluster.testFlinkJobManagerIP(hosts, host);
public FlinkClusterInfo checkHeartBeat(ClusterInstance clusterInstance) {
try {
JsonNode configNode = mapper.readTree(clusterInstance.getConfig());
return FlinkCluster.testFlinkJobManagerIP(FlinkClusterInstanceConfig.build(
GatewayType.get(clusterInstance.getType()),
clusterInstance.getHosts(),
clusterInstance.getJobManagerHost(),
configNode));
} catch (Exception e) {
return FlinkClusterInfo.INEFFECTIVE;
}
}

@Override
public String getJobManagerAddress(ClusterInstance clusterInstance) {
DinkyAssert.check(clusterInstance);
FlinkClusterInfo info =
FlinkCluster.testFlinkJobManagerIP(clusterInstance.getHosts(), clusterInstance.getJobManagerHost());
FlinkClusterInfo info = checkHeartBeat(clusterInstance);
String host = null;
if (info.isEffective()) {
host = info.getJobManagerAddress();
Expand Down Expand Up @@ -305,7 +318,7 @@ public Long heartbeat() {
}

private boolean checkHealth(ClusterInstance clusterInstance) {
FlinkClusterInfo info = checkHeartBeat(clusterInstance.getHosts(), clusterInstance.getJobManagerHost());
FlinkClusterInfo info = checkHeartBeat(clusterInstance);
if (!info.isEffective()) {
clusterInstance.setJobManagerHost("");
clusterInstance.setStatus(0);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

ALTER TABLE dinky_cluster DROP config;

SET FOREIGN_KEY_CHECKS = 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

ALTER TABLE dinky_cluster ADD COLUMN config varchar(255) DEFAULT NULL comment 'A JSON string that contains configuration information such as the ResourceManager and ApplicationID' AFTER job_manager_host;

SET FOREIGN_KEY_CHECKS = 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

ALTER TABLE dinky_cluster DROP config;

SET FOREIGN_KEY_CHECKS = 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

CALL add_column_if_not_exists('dinky_cluster', 'config', 'varchar(255)', 'NULL', 'A JSON string that contains configuration information such as the ResourceManager and ApplicationID' , 'job_manager_host');

SET FOREIGN_KEY_CHECKS = 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE public.dinky_cluster DROP config;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT add_column_if_not_exists('public','dinky_cluster', 'config', 'varchar(255)', 'null', 'A JSON string that contains configuration information such as the ResourceManager and ApplicationID');
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<result column="type" property="type" />
<result column="hosts" property="hosts" />
<result column="job_manager_host" property="jobManagerHost" />
<result column="config" property="config" />
<result column="version" property="version" />
<result column="status" property="status" />
<result column="note" property="note" />
Expand All @@ -23,7 +24,7 @@

<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, alias,type,hosts,job_manager_host,version, status,note,auto_registers,cluster_configuration_id,task_id, enabled, create_time, update_time
id, name, alias,type,hosts,job_manager_host,config,version, status,note,auto_registers,cluster_configuration_id,task_id, enabled, create_time, update_time
</sql>


Expand Down
30 changes: 26 additions & 4 deletions dinky-core/src/main/java/org/dinky/cluster/FlinkCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;

import cn.hutool.core.io.IORuntimeException;

/**
Expand All @@ -36,14 +38,34 @@ public class FlinkCluster {

private static Logger logger = LoggerFactory.getLogger(FlinkCluster.class);

public static FlinkClusterInfo testFlinkJobManagerIP(String hosts, String host) {
if (Asserts.isNotNullString(host)) {
FlinkClusterInfo info = executeSocketTest(host);
public static FlinkClusterInfo testFlinkJobManagerIP(FlinkClusterInstanceConfig config) {
if (config.getClusterType().isDeployYarnCluster()) {
JsonNode configNode = config.getConfig();
if (configNode != null && configNode.has("applicationId") && configNode.has("resourceManager")) {
String applicationId = configNode.get("applicationId").asText();
String resourceManagerStr = configNode.get("resourceManager").asText();
if (Asserts.isNotNullString(resourceManagerStr)) {
String[] resourceManagers = resourceManagerStr.split(",");
for (String resourceManager : resourceManagers) {
if (Asserts.isNotNullString(resourceManager.trim())) {
String yarnUrl = "http://" + resourceManager.trim() + "/proxy/" + applicationId + "/";
FlinkClusterInfo info = executeSocketTest(yarnUrl);
if (info.isEffective()) {
return info;
}
}
}
}
}
}

if (Asserts.isNotNullString(config.getHost())) {
FlinkClusterInfo info = executeSocketTest(config.getHost());
if (info.isEffective()) {
return info;
}
}
String[] servers = hosts.split(",");
String[] servers = config.getHosts().split(",");
for (String server : servers) {
FlinkClusterInfo info = executeSocketTest(server);
if (info.isEffective()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.cluster;

import org.dinky.data.enums.GatewayType;

import com.fasterxml.jackson.databind.JsonNode;

public class FlinkClusterInstanceConfig {
private GatewayType clusterType;
private String hosts;
private String host;
// "{\"applicationId\":\"application_1745048813572_0007\",\"resourceManager\":\"127.0.0.1:8032,127.0.0.2:8032\"}",
private JsonNode config;

public FlinkClusterInstanceConfig() {}

public FlinkClusterInstanceConfig(GatewayType clusterType, String hosts, String host, JsonNode config) {
this.clusterType = clusterType;
this.hosts = hosts;
this.host = host;
this.config = config;
}

public GatewayType getClusterType() {
return clusterType;
}

public String getHosts() {
return hosts;
}

public String getHost() {
return host;
}

public JsonNode getConfig() {
return config;
}

public static FlinkClusterInstanceConfig build(
GatewayType clusterType, String hosts, String host, JsonNode config) {
return new FlinkClusterInstanceConfig(clusterType, hosts, host, config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public static ExecutorConfig build(
String host = null;
Integer port = null;
String hostPort = address;
if (Asserts.isNotNullString(address)) {
if (Asserts.isNotNullString(address) && !address.contains("proxy")) {
if (address.startsWith(NetConstant.HTTP) || address.startsWith(NetConstant.HTTPS)) {
hostPort = address.replace(NetConstant.HTTP, "").replace(NetConstant.HTTPS, "");
}
Expand Down
1 change: 1 addition & 0 deletions dinky-core/src/main/java/org/dinky/job/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class Job {
private Integer jobInstanceId;
private JobConfig jobConfig;
private String jobManagerAddress;
private String config;
private JobStatus status;
private GatewayType type;
private String statement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ private void setJobResultFromGatewayResult(GatewayResult gatewayResult) {
jobManager.getJob().setJobId(gatewayResult.getId());
jobManager.getJob().setJids(gatewayResult.getJids());
jobManager.getJob().setJobManagerAddress(URLUtils.formatAddress(gatewayResult.getWebURL()));
jobManager.getJob().setConfig(gatewayResult.getConfig());
jobManager.getJob().setStatus(gatewayResult.isSuccess() ? Job.JobStatus.SUCCESS : Job.JobStatus.FAILED);
if (!gatewayResult.isSuccess()) {
jobManager.getJob().setError(gatewayResult.getError());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public interface GatewayResult {

String getWebURL();

String getConfig();

List<String> getJids();

String getError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public KubernetesResult setId(String id) {
return this;
}

@Override
public String getConfig() {
return null;
}

public KubernetesResult setWebURL(String webURL) {
this.webURL = webURL;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public String getWebURL() {
return null;
}

@Override
public String getConfig() {
return null;
}

@Override
public List<String> getJids() {
return null;
Expand Down
Loading
Loading