-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-51714][SS] Add Failure Ingestion test to test state store checkpoint format V2 #50508
base: master
Are you sure you want to change the base?
Conversation
@@ -135,7 +135,23 @@ class RocksDB( | |||
private val nativeStats = rocksDbOptions.statistics() | |||
|
|||
private val workingDir = createTempDir("workingDir") | |||
private val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"), | |||
|
|||
protected def CreateFileManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: createFileManager
@@ -132,8 +132,12 @@ class RocksDBFileManager( | |||
|
|||
import RocksDBImmutableFile._ | |||
|
|||
protected def GetFileSystem(myDfsRootDir: String, myHadoopConf: Configuration) : FileSystem = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: getFileSystem
@@ -524,13 +524,33 @@ private[sql] class RocksDBStateStoreProvider | |||
@volatile private var stateStoreEncoding: String = _ | |||
@volatile private var stateSchemaProvider: Option[StateSchemaProvider] = _ | |||
|
|||
protected def CreateRocksDB( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move all these functions to start with lower case ?
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream} | ||
import org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager | ||
|
||
/** A wrapper file output stream that will throw exception in close() and put the underlying |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: lets follow the comment style ?
stopMaintenanceTaskNoLock() | ||
} | ||
|
||
/** Only used for unit tests. The function doesn't hold lockloadedProviders. Calling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: doesn't hold the loadedProviders lock
|
||
/** Only used for unit tests. The function doesn't hold lockloadedProviders. Calling | ||
* it can work-around a deadlock condition where a maintenance task is waiting for the lock */ | ||
private[streaming] def stopMaintenanceTaskNoLock(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: stopMaintenanceTaskWithoutLock
override def close(): Unit = { | ||
if (!closed) { | ||
closed = true | ||
FailureInjectionFileSystem.delayedStreams = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are adding to a singleton here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is a singleton.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating singleton fields within this class feels a bit odd. Do you think we can refactor a bit ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anishshri-db it's very hard to access any of these objects from the test code, as they are created deep in the state store code as we are operating on the DB level. Static variables are the best way of communicating between unit test and file system wrapper in deep level that I can think of. What's the main concern of using singleton here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm ok - wonder if there is some way to be keep this test/class local. but if not, its fine. also cc - @HeartSaVioR - in case he has seen other patterns for similar cases before
} | ||
} | ||
|
||
/** Contains a list of variables for failure ingestion conditions */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could we add some more details here ?
override def open(f: Path, bufferSize: Int): FSDataInputStream = innerFs.open(f, bufferSize) | ||
|
||
override def create( | ||
f: Path, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent seems off ?
override def delete(f: Path, recursive: Boolean): Boolean = innerFs.delete(f, recursive) | ||
|
||
override def listStatus(f: Path): Array[FileStatus] = { | ||
innerFs.listStatus(f) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could move to same line to be consistent ?
/** A rapper RocksDB State Store Provider that replaces FileSystem used in RocksDBFileManager | ||
* to FailureInjectionFileSystem. */ | ||
class FailureInjectionRocksDBStateStoreProvider extends RocksDBStateStoreProvider { | ||
override def CreateRocksDB( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: createRocksDB
/** RocksDBFieManager is created by RocksDB class where it creates a default FileSystem. | ||
* we made RocksDB create a RocksDBFileManager but a different FileSystem here. */ | ||
def createRocksDBWithFaultInjection( | ||
dfsRootDir: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent seems off ?
} else { | ||
ret | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra newline ?
|
||
override def renameTempFile(srcPath: Path, dstPath: Path, | ||
overwriteIfPossible: Boolean): Unit = { | ||
if (FailureInjectionFileSystem.allowOverwriteInRename || !fs.exists(dstPath)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent seems off ?
partitionId = partitionId | ||
) { | ||
override def createFileManager( | ||
dfsRootDir: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
) { | ||
override def getFileSystem( | ||
myDfsRootDir: String, | ||
myHadoopConf: Configuration): FileSystem = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
object FailureInjectionRocksDBStateStoreProvider { | ||
/** | ||
* RocksDBFieManager is created by RocksDB class where it creates a default FileSystem. | ||
* we made RocksDB create a RocksDBFileManager but a different FileSystem here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We make RocksDB create a RocksDBFileManager that uses a different FileSystem here
|
||
@SlowSQLTest | ||
/** | ||
* Test suite to ingest some failures in RocksDB checkpoint */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: inject some failures
?
* be killed or write zip file. Only after the later one is successfully committed, it comes back | ||
* and write the zip file. | ||
* */ | ||
test("Zip File Overwritten by Previous Task Checkpoint V2") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this largely the same as prev test - only for checkpoint v2 ? should we parameterize into 1 test ?
* be killed or write changelog file. Only after the later one is successfully committed, it come | ||
* back and write the changelog file. | ||
* */ | ||
test("Changelog File Overwritten by Previous Task With Changelog Checkpoint V2") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here ?
* 2. The batch eventually failed | ||
* 3. Query is retried and moved forward | ||
* 4. The snapshot checkpoint succeeded | ||
* In checkpoint V2, this snapshot shouldn't take effective. Otherwise, it will break the strong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shouldn't take effect
} | ||
} | ||
|
||
test("Basic RocksDB Zip File Upload Failure Handling") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dont we need to run test with both checkpoint v1 and v2 ?
|
||
/** | ||
* This test is to simulate the case where a previous task had connectivity problem that couldn't | ||
* be killed or write changelog file. Only after the later one is successfully committed, it come |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comes back and writes the
db.put("foo", "bar") | ||
checkpointId3 = commitAndGetCheckpointId(db) | ||
|
||
db2.doMaintenance() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test with varying maintenance intervals ? i.e. where maintenance is configured to run very often vs very rarely ? maybe not for each case - but for some of the most common test cases here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have it. Maybe do it as a follow up when we have time later.
} | ||
|
||
/** | ||
* An integreated test to cover this scenario: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: An integrated test
} | ||
|
||
/** | ||
* A wrapper checkpoint file manager that might inject functions in some function calls. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: inject failures in some function calls
?
* This can be put into SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS to provide failure | ||
* injection behavior. | ||
* | ||
* @param path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you need to add comments for the args ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm pending comments being addressed. Thx
Thanks @anishshri-db for review and approval. @HeartSaVioR can you help review and merge it? |
Why are the changes needed?
The new state store checkpoint format needs failure tolerance tests to make sure the implementation is correct and delivers the behavior we would like.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
It is test code itself
Was this patch authored or co-authored using generative AI tooling?
No.