Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51714][SS] Add Failure Ingestion test to test state store checkpoint format V2 #50508

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

siying
Copy link
Contributor

@siying siying commented Apr 3, 2025

Why are the changes needed?

The new state store checkpoint format needs failure tolerance tests to make sure the implementation is correct and delivers the behavior we would like.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

It is test code itself

Was this patch authored or co-authored using generative AI tooling?

No.

@siying siying force-pushed the ingest_failure9 branch from 28fb3d8 to a4656e8 Compare April 3, 2025 18:48
@@ -135,7 +135,23 @@ class RocksDB(
private val nativeStats = rocksDbOptions.statistics()

private val workingDir = createTempDir("workingDir")
private val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"),

protected def CreateFileManager(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: createFileManager

@@ -132,8 +132,12 @@ class RocksDBFileManager(

import RocksDBImmutableFile._

protected def GetFileSystem(myDfsRootDir: String, myHadoopConf: Configuration) : FileSystem = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getFileSystem

@@ -524,13 +524,33 @@ private[sql] class RocksDBStateStoreProvider
@volatile private var stateStoreEncoding: String = _
@volatile private var stateSchemaProvider: Option[StateSchemaProvider] = _

protected def CreateRocksDB(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move all these functions to start with lower case ?

import org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream}
import org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager

/** A wrapper file output stream that will throw exception in close() and put the underlying
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lets follow the comment style ?

stopMaintenanceTaskNoLock()
}

/** Only used for unit tests. The function doesn't hold lockloadedProviders. Calling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: doesn't hold the loadedProviders lock


/** Only used for unit tests. The function doesn't hold lockloadedProviders. Calling
* it can work-around a deadlock condition where a maintenance task is waiting for the lock */
private[streaming] def stopMaintenanceTaskNoLock(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: stopMaintenanceTaskWithoutLock

override def close(): Unit = {
if (!closed) {
closed = true
FailureInjectionFileSystem.delayedStreams =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are adding to a singleton here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is a singleton.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating singleton fields within this class feels a bit odd. Do you think we can refactor a bit ?

Copy link
Contributor Author

@siying siying Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anishshri-db it's very hard to access any of these objects from the test code, as they are created deep in the state store code as we are operating on the DB level. Static variables are the best way of communicating between unit test and file system wrapper in deep level that I can think of. What's the main concern of using singleton here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ok - wonder if there is some way to be keep this test/class local. but if not, its fine. also cc - @HeartSaVioR - in case he has seen other patterns for similar cases before

}
}

/** Contains a list of variables for failure ingestion conditions */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we add some more details here ?

override def open(f: Path, bufferSize: Int): FSDataInputStream = innerFs.open(f, bufferSize)

override def create(
f: Path,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent seems off ?

override def delete(f: Path, recursive: Boolean): Boolean = innerFs.delete(f, recursive)

override def listStatus(f: Path): Array[FileStatus] = {
innerFs.listStatus(f)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could move to same line to be consistent ?

/** A rapper RocksDB State Store Provider that replaces FileSystem used in RocksDBFileManager
* to FailureInjectionFileSystem. */
class FailureInjectionRocksDBStateStoreProvider extends RocksDBStateStoreProvider {
override def CreateRocksDB(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: createRocksDB

/** RocksDBFieManager is created by RocksDB class where it creates a default FileSystem.
* we made RocksDB create a RocksDBFileManager but a different FileSystem here. */
def createRocksDBWithFaultInjection(
dfsRootDir: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent seems off ?

} else {
ret
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra newline ?


override def renameTempFile(srcPath: Path, dstPath: Path,
overwriteIfPossible: Boolean): Unit = {
if (FailureInjectionFileSystem.allowOverwriteInRename || !fs.exists(dstPath)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent seems off ?

partitionId = partitionId
) {
override def createFileManager(
dfsRootDir: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

) {
override def getFileSystem(
myDfsRootDir: String,
myHadoopConf: Configuration): FileSystem = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

object FailureInjectionRocksDBStateStoreProvider {
/**
* RocksDBFieManager is created by RocksDB class where it creates a default FileSystem.
* we made RocksDB create a RocksDBFileManager but a different FileSystem here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We make RocksDB create a RocksDBFileManager that uses a different FileSystem here


@SlowSQLTest
/**
* Test suite to ingest some failures in RocksDB checkpoint */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inject some failures ?

* be killed or write zip file. Only after the later one is successfully committed, it comes back
* and write the zip file.
* */
test("Zip File Overwritten by Previous Task Checkpoint V2") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this largely the same as prev test - only for checkpoint v2 ? should we parameterize into 1 test ?

* be killed or write changelog file. Only after the later one is successfully committed, it come
* back and write the changelog file.
* */
test("Changelog File Overwritten by Previous Task With Changelog Checkpoint V2") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here ?

* 2. The batch eventually failed
* 3. Query is retried and moved forward
* 4. The snapshot checkpoint succeeded
* In checkpoint V2, this snapshot shouldn't take effective. Otherwise, it will break the strong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shouldn't take effect

}
}

test("Basic RocksDB Zip File Upload Failure Handling") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont we need to run test with both checkpoint v1 and v2 ?


/**
* This test is to simulate the case where a previous task had connectivity problem that couldn't
* be killed or write changelog file. Only after the later one is successfully committed, it come
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comes back and writes the

db.put("foo", "bar")
checkpointId3 = commitAndGetCheckpointId(db)

db2.doMaintenance()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test with varying maintenance intervals ? i.e. where maintenance is configured to run very often vs very rarely ? maybe not for each case - but for some of the most common test cases here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have it. Maybe do it as a follow up when we have time later.

}

/**
* An integreated test to cover this scenario:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: An integrated test

}

/**
* A wrapper checkpoint file manager that might inject functions in some function calls.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inject failures in some function calls ?

* This can be put into SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS to provide failure
* injection behavior.
*
* @param path
Copy link
Contributor

@anishshri-db anishshri-db Apr 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you need to add comments for the args ?

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm pending comments being addressed. Thx

@siying
Copy link
Contributor Author

siying commented Apr 6, 2025

Thanks @anishshri-db for review and approval. @HeartSaVioR can you help review and merge it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants