Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]The Problem facing in using Java Client Func write into hudi #12485

Open
notAprogrammer-0 opened this issue Dec 13, 2024 · 10 comments
Open
Labels
legacy-release priority:critical production down; pipelines stalled; Need help asap. write-client-java

Comments

@notAprogrammer-0
Copy link

Tips before filing an issue

  • Have you gone through our FAQs? Yes

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

I want to use the Java hudi client to get data from HBase and write it to Hudi. All the code worked perfectly the first time, but when I proceeded to send another attempt request, it had the exception listed below.

3926293 [pool-14-thread-2] ERROR org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor  - Error upserting bucketType UPDATE for partition :0
org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle
	at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:435)
	at org.apache.hudi.table.action.commit.JavaMergeHelper.runMerge(JavaMergeHelper.java:120)
	at org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.handleUpdateInternal(BaseJavaCommitActionExecutor.java:290)
	at org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.handleUpdate(BaseJavaCommitActionExecutor.java:281)
	at org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.handleUpsertPartition(BaseJavaCommitActionExecutor.java:254)
	at org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.lambda$execute$0(BaseJavaCommitActionExecutor.java:126)
	at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
	at org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.execute(BaseJavaCommitActionExecutor.java:124)
	at org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.execute(BaseJavaCommitActionExecutor.java:74)
	at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
	at org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor.execute(JavaUpsertCommitActionExecutor.java:53)
	at org.apache.hudi.table.HoodieJavaCopyOnWriteTable.upsert(HoodieJavaCopyOnWriteTable.java:109)
	at org.apache.hudi.table.HoodieJavaCopyOnWriteTable.upsert(HoodieJavaCopyOnWriteTable.java:88)
	at org.apache.hudi.client.HoodieJavaWriteClient.upsert(HoodieJavaWriteClient.java:113)
	at com.baosteel.bsee.etl.service.impl.DataFlowServiceImpl.writeIntoHudi(DataFlowServiceImpl.java:353)
	at com.baosteel.bsee.etl.service.impl.DataFlowServiceImpl.processHBaseData(DataFlowServiceImpl.java:268)
	at com.baosteel.bsee.etl.service.impl.DataFlowServiceImpl.lambda$writeDataFromHBase2Hoodie$1(DataFlowServiceImpl.java:212)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
	at org.apache.hadoop.hdfs.ExceptionLastSeen.throwException4Close(ExceptionLastSeen.java:73)
	at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:156)
	at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:106)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:62)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:62)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.lambda$write$0(SizeAwareFSDataOutputStream.java:65)
	at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:112)
	at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.java:130)
	at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.write(SizeAwareFSDataOutputStream.java:62)
	at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
	at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.lambda$write$1(SizeAwareFSDataOutputStream.java:75)
	at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:112)
	at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.java:130)
	at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.write(SizeAwareFSDataOutputStream.java:72)
	at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.write(HadoopPositionOutputStream.java:45)
	at org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
	at org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:811)
	at org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:757)
	at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:310)
	at org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:458)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:186)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
	at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
	at org.apache.hudi.io.storage.HoodieAvroParquetWriter.close(HoodieAvroParquetWriter.java:90)
	at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:410)
	... 21 more

To Reproduce

Steps to reproduce the behavior:

  1. the code to create table
 private void createHoodieTableIfNotExist(HoodieOperateVO entity) {
        String tablePath = entity.getTablePath();
        String tableName = entity.getTableName();
        Configuration hadoopConf = new Configuration();
        Path path = new Path(entity.getTablePath());
        try {
            FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
            log.info("Resolved filesystem for path {}: {}", tablePath, fs.getUri());
            if (!fs.exists(path)) {
                log.info("Path does not exist: {}. Creating...", tablePath);
                HoodieTableMetaClient.withPropertyBuilder()
                        .setTableType(tableType)
                        .setTableName(tableName)
                        .setPayloadClassName(HoodieAvroPayload.class.getName())
                        .initTable(hadoopConf, tablePath);
            }
        } catch (IOException e) {
            e.printStackTrace();
            log.error(e.getMessage());
        }
    }
  1. the code about write into hudi
private void writeIntoHudi(HoodieOperateVO entity, List<HoodieRecord<HoodieAvroPayload>> records) {
        String tablePath = entity.getTablePath();
        String tableName = entity.getTableName();
        Configuration hadoopConf = new Configuration();
        HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
                .withSchema(entity.getSchema().toString()).withParallelism(10, 10)
                .withDeleteParallelism(10).forTable(tableName)
                .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
                .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(40, 50).build()).build();
        HoodieJavaWriteClient<HoodieAvroPayload> hoodieWriteClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg);

        try {
            String newCommitTime = hoodieWriteClient.startCommit();
            List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new ArrayList<>(records);
            List<HoodieRecord<HoodieAvroPayload>> writeRecords =
                    recordsSoFar.stream().map(r -> new HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
            hoodieWriteClient.upsert(writeRecords, newCommitTime);
        } catch (Exception e) {
            log.error(e.getMessage());
            e.printStackTrace();
        } finally {
            hoodieWriteClient.close();
        }
    }

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

  • Hudi version : 0.11.0

  • Spark version :

  • Hive version :

  • Hadoop version : 3.3.1

  • Storage (HDFS/S3/GCS..) :

  • Running on Docker? (yes/no) :

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

@github-project-automation github-project-automation bot moved this to ⏳ Awaiting Triage in Hudi Issue Support Dec 13, 2024
@danny0405 danny0405 added write-client-java priority:critical production down; pipelines stalled; Need help asap. labels Dec 13, 2024
@danny0405
Copy link
Contributor

What is the storage the Hudi table backed on, HDFS or Object Store?

@notAprogrammer-0
Copy link
Author

What is the storage the Hudi table backed on, HDFS or Object Store?

On HDFS.

@danny0405
Copy link
Contributor

Is there any parquet related errors, not sure why the input stream had been closed.

@rangareddy
Copy link

Hi @notAprogrammer-0

Could you please share the full source code on GitHub? We will try to reproduce the issue internally.

@notAprogrammer-0
Copy link
Author

Hi @notAprogrammer-0

Could you please share the full source code on GitHub? We will try to reproduce the issue internally.
Sorry, for some reason I can't show you all the source code. But I put the main code on my github, and the link is: https://github.com/notAprogrammer-0/HBaseToHudi. Thank you so much for trying to solve this problem for me.

@notAprogrammer-0
Copy link
Author

Is there any parquet related errors, not sure why the input stream had been closed.

I looked at all the error logs and there were no parquet related error messages printed in the log files, but there were many rollback commits on '.hoodie', the metadata information folder for hudi tables in HDFS. I put my log on my github and the link is: https://github.com/notAprogrammer-0/HBaseToHudi/blob/main/nohup.log. Thank you for trying to help me with this.

@rangareddy
Copy link

Hi @notAprogrammer-0

I am sharing the sample code. Please use it to try to reproduce the issue.

package com.ranga;

import org.apache.avro.generic.GenericRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.*;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.common.HoodieJavaEngineContext;

public class Test12485 {

    public static final String DEFAULT_FIRST_PARTITION_PATH = "2020/01/01";
    public static final String DEFAULT_SECOND_PARTITION_PATH = "2020/01/02";
    public static final String DEFAULT_THIRD_PARTITION_PATH = "2020/01/03";

    public static final String[] DEFAULT_PARTITION_PATHS =
            {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};

    public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
            + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"},"
            + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
            + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
            + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
            + "{\"name\":\"fare\",\"type\": \"double\"}]}";

    public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
    private static final Random RAND = new Random(46474747);
    private static String tableType = HoodieTableType.COPY_ON_WRITE.name();

    public static void main(String[] args) throws Exception {
        List<HoodieRecord<HoodieAvroPayload>> records = processHBaseData();
        writeIntoHudi(records);
    }

    private static void writeIntoHudi(List<HoodieRecord<HoodieAvroPayload>> records) throws Exception {
        String tablePath = "file:///tmp/my_test/";
        String tableName = "test_table";
        Configuration hadoopConf = new Configuration();
        Path path = new Path(tablePath);
        FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
        if (!fs.exists(path)) {
            HoodieTableMetaClient.withPropertyBuilder()
                    .setTableType(tableType)
                    .setTableName(tableName)
                    .setPayloadClassName(HoodieAvroPayload.class.getName())
                    .initTable(hadoopConf, tablePath);
        }

        HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
                .withSchema(avroSchema.toString()).withParallelism(10, 10)
                .withDeleteParallelism(10).forTable(tableName)
                .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
                .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(40, 50).build()).build();

        HoodieJavaWriteClient<HoodieAvroPayload> hoodieWriteClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg);

        try {
            String newCommitTime = hoodieWriteClient.startCommit();
            List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new ArrayList<>(records);
            List<HoodieRecord<HoodieAvroPayload>> writeRecords =
                    recordsSoFar.stream().map(r -> new HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
            hoodieWriteClient.upsert(writeRecords, newCommitTime);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            hoodieWriteClient.close();
        }
    }

    private static List<HoodieRecord<HoodieAvroPayload>> processHBaseData() {
        List<HoodieRecord<HoodieAvroPayload>> records = new ArrayList<>();
        for (String partitionPath : DEFAULT_PARTITION_PATHS) {
            HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
            String commitTime = HoodieActiveTimeline.createNewInstantTime();
            HoodieRecord<HoodieAvroPayload> record = generateUpdateRecord(key, commitTime);
            records.add(record);
        }
        return records;
    }

    public static HoodieRecord<HoodieAvroPayload> generateUpdateRecord(HoodieKey key, String commitTime) {
        return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
    }

    public static HoodieAvroPayload generateRandomValue(HoodieKey key, String commitTime) {
        GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0);
        return new HoodieAvroPayload(Option.of(rec));
    }

    public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
                                               long timestamp) {
        GenericRecord rec = new GenericData.Record(avroSchema);
        rec.put("uuid", rowKey);
        rec.put("ts", timestamp);
        rec.put("rider", riderName);
        rec.put("driver", driverName);
        rec.put("begin_lat", RAND.nextDouble());
        rec.put("begin_lon", RAND.nextDouble());
        rec.put("end_lat", RAND.nextDouble());
        rec.put("end_lon", RAND.nextDouble());
        rec.put("fare", RAND.nextDouble() * 100);
        return rec;
    }
}

@notAprogrammer-0
Copy link
Author

Hi @notAprogrammer-0

I am sharing the sample code. Please use it to try to reproduce the issue.

package com.ranga;

import org.apache.avro.generic.GenericRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.*;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.common.HoodieJavaEngineContext;

public class Test12485 {

    public static final String DEFAULT_FIRST_PARTITION_PATH = "2020/01/01";
    public static final String DEFAULT_SECOND_PARTITION_PATH = "2020/01/02";
    public static final String DEFAULT_THIRD_PARTITION_PATH = "2020/01/03";

    public static final String[] DEFAULT_PARTITION_PATHS =
            {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};

    public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
            + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"},"
            + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
            + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
            + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
            + "{\"name\":\"fare\",\"type\": \"double\"}]}";

    public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
    private static final Random RAND = new Random(46474747);
    private static String tableType = HoodieTableType.COPY_ON_WRITE.name();

    public static void main(String[] args) throws Exception {
        List<HoodieRecord<HoodieAvroPayload>> records = processHBaseData();
        writeIntoHudi(records);
    }

    private static void writeIntoHudi(List<HoodieRecord<HoodieAvroPayload>> records) throws Exception {
        String tablePath = "file:///tmp/my_test/";
        String tableName = "test_table";
        Configuration hadoopConf = new Configuration();
        Path path = new Path(tablePath);
        FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
        if (!fs.exists(path)) {
            HoodieTableMetaClient.withPropertyBuilder()
                    .setTableType(tableType)
                    .setTableName(tableName)
                    .setPayloadClassName(HoodieAvroPayload.class.getName())
                    .initTable(hadoopConf, tablePath);
        }

        HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
                .withSchema(avroSchema.toString()).withParallelism(10, 10)
                .withDeleteParallelism(10).forTable(tableName)
                .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
                .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(40, 50).build()).build();

        HoodieJavaWriteClient<HoodieAvroPayload> hoodieWriteClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg);

        try {
            String newCommitTime = hoodieWriteClient.startCommit();
            List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new ArrayList<>(records);
            List<HoodieRecord<HoodieAvroPayload>> writeRecords =
                    recordsSoFar.stream().map(r -> new HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
            hoodieWriteClient.upsert(writeRecords, newCommitTime);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            hoodieWriteClient.close();
        }
    }

    private static List<HoodieRecord<HoodieAvroPayload>> processHBaseData() {
        List<HoodieRecord<HoodieAvroPayload>> records = new ArrayList<>();
        for (String partitionPath : DEFAULT_PARTITION_PATHS) {
            HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
            String commitTime = HoodieActiveTimeline.createNewInstantTime();
            HoodieRecord<HoodieAvroPayload> record = generateUpdateRecord(key, commitTime);
            records.add(record);
        }
        return records;
    }

    public static HoodieRecord<HoodieAvroPayload> generateUpdateRecord(HoodieKey key, String commitTime) {
        return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
    }

    public static HoodieAvroPayload generateRandomValue(HoodieKey key, String commitTime) {
        GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0);
        return new HoodieAvroPayload(Option.of(rec));
    }

    public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
                                               long timestamp) {
        GenericRecord rec = new GenericData.Record(avroSchema);
        rec.put("uuid", rowKey);
        rec.put("ts", timestamp);
        rec.put("rider", riderName);
        rec.put("driver", driverName);
        rec.put("begin_lat", RAND.nextDouble());
        rec.put("begin_lon", RAND.nextDouble());
        rec.put("end_lat", RAND.nextDouble());
        rec.put("end_lon", RAND.nextDouble());
        rec.put("fare", RAND.nextDouble() * 100);
        return rec;
    }
}

I have changed the table type to "mor" and used a while (true) loop to reproduce the error. When it ran for about 1.5 hours and it went wrong, I put a file called error.log in my github and it recorded the error message. The modified program file (with the same name as the file you gave me) is also in there. The github link is:https://github.com/notAprogrammer-0/HBaseToHudi.

@rangareddy
Copy link

Hi @notAprogrammer-0

The issue is not reproducible with the local file system, which leads me to suspect an HDFS-related problem. I propose examining the HDFS logs to identify any client disconnections or other relevant errors, excluding Hudi-specific issues.

Application start time: 2025-01-02 14:56:51
Application Killed time: 2025-01-02 20:16:44

2025-01-02 14:56:51 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 2025-01-02 14:56:51 INFO  EmbeddedTimelineService:67 - Starting Timeline service !!
 2025-01-02 14:56:51 WARN  EmbeddedTimelineService:104 - Unable to find driver bind address from spark config
 2025-01-02 14:56:51 INFO  FileSystemViewManager:232 - Creating View Manager with storage type :MEMORY
 2025-01-02 14:56:51 INFO  FileSystemViewManager:244 - Creating in-memory based Table View
 2025-01-02 14:56:51 INFO  log:193 - Logging initialized @1368ms to org.apache.hudi.org.apache.jetty.util.log.Slf4jLog

..................

 2025-01-02 20:16:43 INFO  HoodieActiveTimeline:556 - Checking for file exists ?file:/tmp/my_test/.hoodie/20250102201639581.clean.requested
 2025-01-02 20:16:43 INFO  HoodieActiveTimeline:564 - Create new file for toInstant ?file:/tmp/my_test/.hoodie/20250102201639581.clean.inflight
 2025-01-02 20:16:43 INFO  CleanActionExecutor:133 - Using cleanerParallelism: 3
 2025-01-02 20:16:44 INFO  HoodieActiveTimeline:129 - Loaded instants upto : Option{val=[==>20250102201639581__clean__INFLIGHT]}
 2025-01-02 20:16:44 INFO  HoodieActiveTimeline:556 - Checking for file exists ?file:/tmp/my_test/.hoodie/20250102201639581.clean.inflight
 2025-01-02 20:16:44 INFO  HoodieActiveTimeline:564 - Create new file for toInstant ?file:/tmp/my_test/.hoodie/20250102201639581.clean
 2025-01-02 20:16:44 INFO  CleanActionExecutor:226 - Marked clean started on 20250102201639581 as complete

@notAprogrammer-0
Copy link
Author

Hi @notAprogrammer-0

The issue is not reproducible with the local file system, which leads me to suspect an HDFS-related problem. I propose examining the HDFS logs to identify any client disconnections or other relevant errors, excluding Hudi-specific issues.

Application start time: 2025-01-02 14:56:51 Application Killed time: 2025-01-02 20:16:44

2025-01-02 14:56:51 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 2025-01-02 14:56:51 INFO  EmbeddedTimelineService:67 - Starting Timeline service !!
 2025-01-02 14:56:51 WARN  EmbeddedTimelineService:104 - Unable to find driver bind address from spark config
 2025-01-02 14:56:51 INFO  FileSystemViewManager:232 - Creating View Manager with storage type :MEMORY
 2025-01-02 14:56:51 INFO  FileSystemViewManager:244 - Creating in-memory based Table View
 2025-01-02 14:56:51 INFO  log:193 - Logging initialized @1368ms to org.apache.hudi.org.apache.jetty.util.log.Slf4jLog

..................

 2025-01-02 20:16:43 INFO  HoodieActiveTimeline:556 - Checking for file exists ?file:/tmp/my_test/.hoodie/20250102201639581.clean.requested
 2025-01-02 20:16:43 INFO  HoodieActiveTimeline:564 - Create new file for toInstant ?file:/tmp/my_test/.hoodie/20250102201639581.clean.inflight
 2025-01-02 20:16:43 INFO  CleanActionExecutor:133 - Using cleanerParallelism: 3
 2025-01-02 20:16:44 INFO  HoodieActiveTimeline:129 - Loaded instants upto : Option{val=[==>20250102201639581__clean__INFLIGHT]}
 2025-01-02 20:16:44 INFO  HoodieActiveTimeline:556 - Checking for file exists ?file:/tmp/my_test/.hoodie/20250102201639581.clean.inflight
 2025-01-02 20:16:44 INFO  HoodieActiveTimeline:564 - Create new file for toInstant ?file:/tmp/my_test/.hoodie/20250102201639581.clean
 2025-01-02 20:16:44 INFO  CleanActionExecutor:226 - Marked clean started on 20250102201639581 as complete

Ok, I'll take care to check my HDFS logs, thanks for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
legacy-release priority:critical production down; pipelines stalled; Need help asap. write-client-java
Projects
Status: Awaiting Triage
Development

No branches or pull requests

3 participants