A light-weight lock-free series/parallel
combined scheduling framework for tasks. The goal is to maximize parallelism in order to minimize the execution time overall.
From imperative to Monad
: lite.Task —— Simplified and Type-safe Task Assembly Toolset
I/O
type, using the compiler check, match errors at a glance.
Examples see here.From single execution to streaming: Pulse —— Step Streaming Data Processor
set of large-scale integration tasks
, can always be entered in the order in which it was entered, can be 'queued' (FIFO
, using a ingenious scheduling strategy that doesn't actually have queue) into each tasks,
and each task can also retain the mark specially left by the previous data during processing, so as to be used for the next data processing.
It doesn't matter in any subtask of any depth, and it doesn't matter if the previous data stays in a subtask much longer than the latter.
Examples see here."The maximum improvement in system performance is determined by the parts that cannot be parallelized." —— Amdahl's law
In a large system, there is often a lot of business logic and control logic, which is the embodiment of hundreds of "jobs". The logic is intertwined and, taken as a whole, often intricate. On the other hand, if this large amount of "jobs" is not organized reasonably and effectively, the execution time overall and performance are also worrying. So how do we abstract and simplify such a complex problem?
Programs = Algorithms + Data Structures
Algorithm = Logic + Control
Inspired by this idea, we can separate the business logic from the control logic, abstract the control logic as a framework, and construct the business logic as a Task. The relationship between tasks can also be further classified into two types: dependent and non-dependent, that is, serial and parallel (dependent tasks must be executed sequentially, and non-dependent tasks can be executed in parallel). The user programmer only needs to focus on writing task one by one and leave the rest to the framework. Reflow is designed around the control logic that handles these tasks.
Reflow was developed to simplify the coding complexity of data-flow and event-processing between multiple tasks in complex business logic. Through the design of I / O that requires explicit definition of tasks, intelligent dependency management based on keyword and value-type analysis, unified operation scheduling, event feedback and error handling interface, the set goal is realized: task series / parallel combined scheduling. Data is electricity, task is component. In addition to simplifying the coding complexity, the established framework can standardize the original chaotic and intricate writing method, and coding errors can be easily detected, which will greatly enhance the readability, robustness and scalability of the program.
In Reflow basic logic, a complex business should first be broken down into a series of single-function, no-blocking, single-threaded task sets and packaged in Trait
that explicitly define the attributes of the task.
Dependencies are then built and committed using Dependency
, and a working reflow
object is finally obtained, which is started and the task flow can be executed.
Pulse
or not; // Asynchronous by default
reflow.start(input, feedback)
// To change to sync, simply add 'sync()' to the end.
reflow.start(input, feedback).sync()
// `Progress.Strategy.Xxx` e.g.
implicit lazy val strategy: Strategy = Strategy.Depth(3) -> Strategy.Fluent -> Strategy.Interval(600)
// Feedback.scala
def onFailed(trat: Trait, e: Exception): Unit = {
e match {
// There are two categories:
// 1. `RuntimeException` caused by customer code quality issues such as `NullPointerException` are wrapped in `CodeException`,
// The specific exception object can be retrieved through the `CodeException#getCause()` method.
case c: CodeException => c.getCause
// 2. Custom `Exception`, which is explicitly passed to the 'Task#failed(Exception)' method parameter, possible be
// `null` (although Scala considers `null` to be low level).
case e: Exception => e.printStackTrace()
}
}
Reflow.GlobalTrack.registerObserver(new GlobalTrackObserver {
override def onUpdate(current: GlobalTrack, items: All): Unit = {
if (!current.isSubReflow && current.scheduler.getState == State.EXECUTING) {
println(s"++++++++++[[[current.state:${current.scheduler.getState}")
items().foreach(println)
println("----------]]]")
}
}
})(Strategy.Interval(600), null)
configured maximum
, then enqueue, idle release threads until core size
;There is no code for
future.get()
mechanism;
This requires no blocking in the user-defined tasks (if there is a block, you can split the task into multiple dependent but non-blocking tasks, except for network requests).
These features greatly meet the practical requirements of various projects.
The main features of this framework are similar to Facebook Bolts and RxJava, can be seen as fine-grained extensions of their task-combination capabilities, but it's more intuitive to use, more rigorous, high-spec, and closely aligned with actual project needs.
This framework is implemented based on the thread-pool (java.util.concurrent.ThreadPoolExecutor
) instead of the Fork-Join framework (java.util.concurrent.ForkJoinPool
), and improved the former
(see Worker) to conform to the basic logic of
increase the size of threads to maximum first, or else enqueue, release thread when idle.
The latter is suitable for computationally intensive tasks, but is not suitable for the design objectives of this framework, and is not suitable for resource-constrained devices (e.g. mobile phones, etc).
This framework is completely written in Scala language, and all parameters support shorthand, will be automatically escaped as needed (implicit), can be used on any platform that adopts jvm-like (e.g. Android Runtime).
This framework is based on a special anti-blocking thread synchronization tool Snatcher, see the code documentation for details.
java.util.concurrent.Future<V>
tools to handle parallel tasks, since it is implemented based on the thread blocking model, it does not meet the design goals of this framework: lock-free.See below or LiteSpec, and ReflowSpec.
class App extends AbsApp {
override def onCreate(): Unit = {
App.reflow.init()
super.onCreate()
}
}
object App {
object implicits {
// Feedback strategy for task-flow execution progress
implicit lazy val strategy: Strategy = Strategy.Depth(3) -> Strategy.Fluent -> Strategy.Interval(600)
implicit lazy val poster: Poster = new Poster {
// Post feedback to the UI thread
override def post(runner: Runnable): Unit = getApp.mainHandler.post(runner)
}
}
object reflow {
private[App] def init(): Unit = {
Reflow.setThreadResetor(new ThreadResetor {
override def reset(thread: Thread, runOnCurrentThread: Boolean): Unit = {
if (runOnCurrentThread) Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND)
}
})
}
}
}
// ...
Scenario("`Serial/Parallel` Tasks mixed assembly") {
val pars =
(
c2d
+>>
c2abc.inPar("name#c2abc", "`Serial` mix in `Parallel`")
+>>
(c2b >>> b2c >>> c2a >>> a2b) //.inPar("xxx") is optional.
+>>
c2a
) **> { (d, c, b, a, ctx) =>
info(a.toString)
info(b.toString)
info(c.toString)
info(d.toString)
d
}
Input(new Aaa) >>> a2b >>> b2c >>> pars run() sync()
assert(true)
}
// `Pulse`, more complex:
Scenario("Multi-level nested assembly test of `Pulse`") {
val pars = {
def p = (c2d +>> c2b) **> { (d, _, ctx) => ctx.progress(1, 2); d } +|- { (_, d) => d }
@tailrec
def loop(s: () => Serial[Ccc, Ddd], times: Int = 0, limit: Int = 10): Serial[Ccc, Ddd] =
if (times >= limit) s()
else {
def p =
(
s() +|- { (_, d) => d }
+>>
s()
+>>
s()
) **> { (d, _, _, ctx) => ctx.progress(1, 2); d }
loop(() => p, times + 1, limit)
}
loop(() => p, limit = 3)
}
@volatile var callableOut: Int = 0
val future = new FutureTask[Int](() => callableOut)
val repeatCount = 1000 // Int.MaxValue
val pulse = (Input[Aaa] >>> a2b >>> b2c >>> pars) pulse (new reflow.Pulse.Feedback.Lite[Ddd] {
override def onStart(serialNum: Long): Unit = {}
// ...
}
// ...
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。