Skip to content

Commit

Permalink
Add support for thread pool and asynchronous execution.(#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
dora4 committed Dec 1, 2024
1 parent e04287f commit c7ccf87
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 83 deletions.
2 changes: 1 addition & 1 deletion dcache/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ afterEvaluate {
from(components["release"])
groupId = "com.github.dora4"
artifactId = "dcache-android"
version = "3.0.6"
version = "3.0.7"
}
}
}
Expand Down
59 changes: 29 additions & 30 deletions dcache/src/main/java/dora/db/async/OrmExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,36 @@ import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

internal class OrmExecutor : Runnable, Handler.Callback {
internal class OrmExecutor<T : OrmTable> : Runnable, Handler.Callback {

private val queue: BlockingQueue<OrmTask<out OrmTable>> = LinkedBlockingQueue()
private val queue: BlockingQueue<OrmTask<T>> = LinkedBlockingQueue()

@Volatile
private var executorRunning = false

@Volatile
var maxOperationCountToMerge: Int = 50
var maxTaskCountToMerge: Int = 50

@Volatile
var listener: OrmTaskListener? = null
var listener: OrmTaskListener<T>? = null

@Volatile
var listenerMainThread: OrmTaskListener? = null
var listenerMainThread: OrmTaskListener<T>? = null

@Volatile
var waitForMergeMillis: Int = 50

private var countOperationsEnqueued = 0
private var countOperationsCompleted = 0
private var countTasksEnqueued = 0
private var countTasksCompleted = 0

private var handlerMainThread: Handler? = null
private var lastSequenceNumber = 0

fun <T : OrmTable> enqueue(task: OrmTask<T>) {
fun enqueue(task: OrmTask<T>) {
synchronized(this) {
task.sequenceNumber = ++lastSequenceNumber
queue.add(task)
countOperationsEnqueued++
countTasksEnqueued++
if (!executorRunning) {
executorRunning = true
executor.execute(this)
Expand All @@ -55,7 +55,7 @@ internal class OrmExecutor : Runnable, Handler.Callback {

@get:Synchronized
val isCompleted: Boolean
get() = countOperationsEnqueued == countOperationsCompleted
get() = countTasksEnqueued == countTasksCompleted

/**
* Waits until all enqueued operations are complete. If the thread gets interrupted, any
Expand All @@ -70,7 +70,7 @@ internal class OrmExecutor : Runnable, Handler.Callback {
try {
(this as Object).wait()
} catch (e: InterruptedException) {
throw OrmTaskException("Interrupted while waiting for all operations to complete.\n$e")
throw OrmTaskException("Interrupted while waiting for all tasks to complete.\n$e")
}
}
}
Expand All @@ -90,7 +90,7 @@ internal class OrmExecutor : Runnable, Handler.Callback {
try {
(this as Object).wait(maxMillis.toLong())
} catch (e: InterruptedException) {
throw OrmTaskException("Interrupted while waiting for all operations to complete.\n$e")
throw OrmTaskException("Interrupted while waiting for all tasks to complete.\n$e")
}
}
return isCompleted
Expand Down Expand Up @@ -140,8 +140,8 @@ internal class OrmExecutor : Runnable, Handler.Callback {
}

@Throws(OrmTaskException::class)
private fun mergeTxAndExecute(task1: OrmTask<out OrmTable>, task2: OrmTask<out OrmTable>) {
val mergedTasks = ArrayList<OrmTask<out OrmTable>>()
private fun mergeTxAndExecute(task1: OrmTask<T>, task2: OrmTask<T>) {
val mergedTasks = ArrayList<OrmTask<T>>()
mergedTasks.add(task1)
mergedTasks.add(task2)
var success = false
Expand All @@ -153,15 +153,14 @@ internal class OrmExecutor : Runnable, Handler.Callback {
break
}
if (i == mergedTasks.size - 1) {
val peekedOp = queue.peek()
if (i < maxOperationCountToMerge && task.isMergeableWith(peekedOp)) {
val removedOp = queue.remove()
if (removedOp !== peekedOp) {
throw OrmTaskException("Internal error: peeked op did not match removed op")
val peekedTask = queue.peek()
if (i < maxTaskCountToMerge && task.isMergeableWith(peekedTask)) {
val removedTask = queue.remove()
if (removedTask !== peekedTask) {
throw OrmTaskException("Internal error: peeked task did not match removed task")
}
mergedTasks.add(removedOp)
mergedTasks.add(removedTask)
} else {
db.setTransactionSuccessful()
success = true
break
}
Expand All @@ -176,7 +175,7 @@ internal class OrmExecutor : Runnable, Handler.Callback {
}
} else {
OrmLog.i(
"Reverted merged transaction because one of the operations failed. Executing operations one by " +
"Reverted merged transaction because one of the tasks failed. Executing tasks one by " +
"one instead..."
)
for (task in mergedTasks) {
Expand All @@ -186,7 +185,7 @@ internal class OrmExecutor : Runnable, Handler.Callback {
}
}

private fun handleTaskCompleted(task: OrmTask<*>) {
private fun handleTaskCompleted(task: OrmTask<T>) {
task.setCompleted()
val listenerToCall = listener
listenerToCall?.onCompleted(task)
Expand All @@ -198,19 +197,19 @@ internal class OrmExecutor : Runnable, Handler.Callback {
handlerMainThread!!.sendMessage(msg)
}
synchronized(this) {
countOperationsCompleted++
if (countOperationsCompleted == countOperationsEnqueued) {
countTasksCompleted++
if (countTasksCompleted == countTasksEnqueued) {
(this as Object).notifyAll()
}
}
}

private fun executeTaskAndPostCompleted(task: OrmTask<*>) {
private fun executeTaskAndPostCompleted(task: OrmTask<T>) {
executeTask(task)
handleTaskCompleted(task)
}

private fun <T : OrmTable> executeTask(task: OrmTask<T>) {
private fun executeTask(task: OrmTask<T>) {
task.timeStarted = System.currentTimeMillis()
try {
when (task.type) {
Expand Down Expand Up @@ -256,22 +255,22 @@ internal class OrmExecutor : Runnable, Handler.Callback {
task.timeCompleted = System.currentTimeMillis()
}

private fun executeTransactionRunnable(task: OrmTask<*>) {
private fun executeTransactionRunnable(task: OrmTask<T>) {
Transaction.execute {
(task.parameter as Runnable).run()
}
}

@Throws(Exception::class)
private fun executeTransactionCallable(task: OrmTask<*>) {
private fun executeTransactionCallable(task: OrmTask<T>) {
Transaction.execute {
task.result = (task.parameter as Callable<*>).call()
}
}

override fun handleMessage(msg: Message): Boolean {
val listenerToCall = listenerMainThread
listenerToCall?.onCompleted(msg.obj as OrmTask<*>)
listenerToCall?.onCompleted(msg.obj as OrmTask<T>)
return false
}

Expand Down
6 changes: 3 additions & 3 deletions dcache/src/main/java/dora/db/async/OrmTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ open class OrmTask<T : OrmTable> internal constructor(
*/
@Synchronized
@Throws(OrmTaskException::class)
fun result(): Any? {
fun result(): T? {
if (!isCompleted) {
waitForCompletion()
}
if (throwable != null) {
throw OrmTaskException(this, throwable!!)
}
return result
return result as T
}

val isMergeTx: Boolean
Expand All @@ -111,7 +111,7 @@ open class OrmTask<T : OrmTable> internal constructor(
* [.FLAG_MERGE_TX], and if the database instances match. 简体中文:判断此操作是否可以与指定的操作合并。
* 会检查 null、[.FLAG_MERGE_TX] 以及数据库实例是否匹配。
*/
fun isMergeableWith(other: OrmTask<*>?): Boolean {
fun isMergeableWith(other: OrmTask<out OrmTable>?): Boolean {
return other != null && isMergeTx && other.isMergeTx && getDatabase() == other.getDatabase()
}

Expand Down
6 changes: 3 additions & 3 deletions dcache/src/main/java/dora/db/async/OrmTaskListener.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package dora.db.async

import dora.db.table.OrmTable

interface OrmTaskListener {
interface OrmTaskListener<T : OrmTable> {

fun onCompleted(task: OrmTask<out OrmTable>)
fun onCompleted(task: OrmTask<T>)

fun onFailed(task: OrmTask<out OrmTable>, e: Exception)
fun onFailed(task: OrmTask<T>, e: Exception)
}
Loading

0 comments on commit c7ccf87

Please sign in to comment.