Skip to content

Commit b3bd2cb

Browse files
committed
Initial commit
0 parents  commit b3bd2cb

File tree

95 files changed

+5092
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+5092
-0
lines changed

.gitignore

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Created by .ignore support plugin (hsz.mobi)
2+
### JetBrains template
3+
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion
4+
5+
*.iml
6+
7+
## Directory-based project format:
8+
.idea/
9+
# if you remove the above rule, at least ignore the following:
10+
11+
# User-specific stuff:
12+
# .idea/workspace.xml
13+
# .idea/tasks.xml
14+
# .idea/dictionaries
15+
16+
# Sensitive or high-churn files:
17+
# .idea/dataSources.ids
18+
# .idea/dataSources.xml
19+
# .idea/sqlDataSources.xml
20+
# .idea/dynamic.xml
21+
# .idea/uiDesigner.xml
22+
23+
# Gradle:
24+
# .idea/gradle.xml
25+
# .idea/libraries
26+
27+
# Mongo Explorer plugin:
28+
# .idea/mongoSettings.xml
29+
30+
## File-based project format:
31+
*.ipr
32+
*.iws
33+
34+
## Plugin-specific files:
35+
36+
# IntelliJ
37+
/build/
38+
/out/
39+
40+
# mpeltonen/sbt-idea plugin
41+
.idea_modules/
42+
43+
# JIRA plugin
44+
atlassian-ide-plugin.xml
45+
46+
# Crashlytics plugin (for Android Studio and IntelliJ)
47+
com_crashlytics_export_strings.xml
48+
crashlytics.properties
49+
crashlytics-build.properties
50+
51+
52+
53+
### Eclipse template
54+
*.pydevproject
55+
.metadata
56+
.gradle
57+
bin/
58+
tmp/
59+
*.tmp
60+
*.bak
61+
*.swp
62+
*~.nib
63+
local.properties
64+
.settings/
65+
.loadpath
66+
67+
# Eclipse Core
68+
.project
69+
70+
# External tool builders
71+
.externalToolBuilders/
72+
73+
# Locally stored "Eclipse launch configurations"
74+
*.launch
75+
76+
# CDT-specific
77+
.cproject
78+
79+
# JDT-specific (Eclipse Java Development Tools)
80+
.classpath
81+
82+
# Java annotation processor (APT)
83+
.factorypath
84+
85+
# PDT-specific
86+
.buildpath
87+
88+
# sbteclipse plugin
89+
.target
90+
91+
# TeXlipse plugin
92+
.texlipse
93+
94+
.DS_Store

.travis.yml

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
language: java
2+
jdk:
3+
- oraclejdk8
4+
5+
sudo: required
6+
dist: trusty
7+
8+
# cache gradle dependencies
9+
# https://docs.travis-ci.com/user/languages/java#Caching
10+
before_cache:
11+
- rm -f $HOME/.gradle/caches/modules-2/modules-2.lock
12+
cache:
13+
directories:
14+
- $HOME/.gradle/caches/
15+
- $HOME/.gradle/wrapper/

README.md

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# quasar-flow
2+
## Handle data flows using quasar fibers and reactive-streams.
3+
 [![Build Status](https://travis-ci.org/fulmicotone/com.fulmicotone.qio.svg?branch=master)](https://travis-ci.org/fulmicotone/com.fulmicotone.qio)  [![](https://jitpack.io/v/fulmicotone/com.fulmicotone.qio.svg)](https://jitpack.io/#fulmicotone/com.fulmicotone.qio)
4+
5+
6+
### Goal - Build a concurrency system that
7+
8+
- It's easy to use.
9+
- It's simply to read.
10+
- It's simply to code.
11+
- Uses reactive-streams channels / processors / subscriber with quasar fibers under the hood.
12+
- Uses FanIn/FanOut concurrency patterns.
13+
- Have common methods for size/byte batching with flushing timeouts.
14+
15+
16+
### Main entities:
17+
18+
- Emitter: Entity that emit flows of objects on a channel.
19+
- An Emitter could be a broadcast emitter.
20+
- An Emitter could be a routed emitter, every processor should subscribe on a particular data object property.
21+
- An Emitter could have 1-N subscribers.
22+
23+
24+
- Processor: Entity that receives an Emitter's data flow.
25+
- A Processor could process an Emitter data-flow with 1-N fiber/s.
26+
- A Processor could transform the emitter data-flow with a transformation function.
27+
- A Processor could process an Emitter data-flow with N fibers and return 1 result Emitter using FanIn pattern.
28+
- A Processor could process an Emitter data-flow with N fibers and return N result Emitter using FanOut pattern.
29+
- A Processor could process an Emitter data-flow batching results grouping them by size.
30+
- A Processor could process an Emitter data-flow batching results grouping them by a custom user-defined accumulator.
31+
32+
33+
- Consumer: Entity that could receives both Emitter or Processors data-flow
34+
- A Consumer could process an Emitter/Processor data-flow with 1-N fiber/s.
35+
- A Consumer could transform an Emitter/Processor data-flow with a transformation function.
36+
- A Consumer could process an Emitter/Processor data-flow with N fibers and apply an user-defined task to the result.
37+
- A Consumer could process an Emitter/Processor data-flow batching results grouping them by size and apply an user-defined task to the result.
38+
- A Consumer could process an Emitter/Processor data-flow batching results grouping them by a custom user-defined accumulator and apply an user-defined task to the result.
39+
40+
41+
42+
### Examples:
43+
44+
45+
#### Emitter -> Processor -> Consumer
46+
```java
47+
public class Main {
48+
49+
public static void main(String[] args) {
50+
51+
// Emit Task that emits 10 Strings
52+
IEmitterTask<String> stringEmitterTask = publisherChannel -> {
53+
for(int i = 0; i<10; i++){ publisherChannel.send("String"+i); }
54+
};
55+
56+
57+
// LINEAR LAYOUT
58+
QuasarFlow.newFlow()
59+
.broadcastEmitter(stringEmitterTask) // BUILD A BROADCAST EMITTER FROM TASK
60+
.addProcessor() // ADD A PROCESSOR
61+
.process() // PROCESS
62+
.addConsumer() // ADD A CONSUMER
63+
.consume(str -> System.out.println(str)) // CONSUME WITH CONSUMER TASK
64+
.start();
65+
66+
// NESTED LAYOUT
67+
QuasarFlow.newFlow()
68+
.broadcastEmitter(null) // BUILD A BROADCAST EMITTER FROM TASK
69+
.addProcessor(p -> { // ADD A PROCESSOR
70+
p.process() // PROCESS
71+
.addConsumer(c -> // ADD A CONSUMER
72+
c.consume(str -> System.out.println(str))); // CONSUME WITH CONSUMER TASK
73+
})
74+
.start();
75+
}
76+
77+
}
78+
79+
80+

build.gradle

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
group 'it.enryold'
2+
3+
version '0.0.1'
4+
5+
apply plugin: 'java'
6+
apply plugin: 'idea'
7+
8+
9+
sourceCompatibility = 1.8
10+
11+
12+
test {
13+
useJUnitPlatform()
14+
maxHeapSize = "1024m"
15+
jvmArgs '-javaagent:'+file('.')+"/quasar-core-0.7.10.jar"
16+
}
17+
18+
19+
repositories {
20+
mavenCentral()
21+
maven { url "http://repo.opensourceagility.com/release" }
22+
maven { url "http://repo.spring.io/libs-milestone" }
23+
maven { url "http://repo.spring.io/libs-snapshot" }
24+
}
25+
26+
dependencies {
27+
implementation group: 'com.amazonaws', name: 'aws-java-sdk-kinesis', version: '1.11.+'
28+
implementation group: 'com.amazonaws', name: 'amazon-kinesis-producer', version: '0.12.+'
29+
implementation group: 'com.amazonaws', name: 'amazon-kinesis-client', version: '1.9.+'
30+
implementation group: 'co.paralleluniverse', name: 'quasar-core', version: '0.7.10'
31+
implementation group: 'co.paralleluniverse', name: 'quasar-reactive-streams', version: '0.7.10'
32+
implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'
33+
implementation group: 'com.google.guava', name: 'guava', version: '23.0'
34+
implementation "com.evanlennick:retry4j:0.15.0"
35+
36+
testImplementation('org.junit.jupiter:junit-jupiter-api:5.4.2')
37+
testRuntime('org.junit.jupiter:junit-jupiter-engine:5.4.2')
38+
}

quasar-core-0.7.10.jar

1.46 MB
Binary file not shown.

settings.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
rootProject.name = 'quasar-flow'
2+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package it.enryold.quasarflow;
2+
3+
import it.enryold.quasarflow.interfaces.IEmitter;
4+
import it.enryold.quasarflow.interfaces.IEmitterTask;
5+
import it.enryold.quasarflow.interfaces.IRoutingKeyExtractor;
6+
import it.enryold.quasarflow.models.QEmitter;
7+
import it.enryold.quasarflow.models.QFlow;
8+
9+
public class QuasarFlow {
10+
11+
12+
private QFlow qFlow;
13+
14+
private QuasarFlow(){
15+
qFlow = new QFlow();
16+
}
17+
18+
public static QuasarFlow newFlow(){
19+
return new QuasarFlow();
20+
}
21+
22+
public <T, E extends IEmitter<T>> E broadcastEmitter(IEmitterTask<T> task){
23+
return new QEmitter<T>(qFlow)
24+
.broadcastEmitter(task);
25+
}
26+
27+
public <T, E extends IEmitter<T>> E broadcastEmitter(IEmitterTask<T> task, String name){
28+
return new QEmitter<T>(qFlow, name)
29+
.broadcastEmitter(task);
30+
}
31+
32+
public <T, E extends IEmitter<T>> E routedEmitter(IEmitterTask<T> task, IRoutingKeyExtractor<T> routingKeyExtractor){
33+
return new QEmitter<T>(qFlow)
34+
.routedEmitter(task, routingKeyExtractor);
35+
}
36+
37+
public <T, E extends IEmitter<T>> E routedEmitter(IEmitterTask<T> task, String name, IRoutingKeyExtractor<T> routingKeyExtractor){
38+
return new QEmitter<T>(qFlow, name)
39+
.routedEmitter(task, routingKeyExtractor);
40+
}
41+
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package it.enryold.quasarflow.abstracts;
2+
3+
import it.enryold.quasarflow.components.IAccumulator;
4+
import it.enryold.quasarflow.components.IAccumulatorLengthFunction;
5+
import com.google.common.util.concurrent.AtomicDouble;
6+
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
10+
public abstract class AbstractAccumulator<E, T> implements IAccumulator<E, T> {
11+
12+
private double byteSizeLimit;
13+
protected List<E> accumulator = new ArrayList<>();
14+
private AtomicDouble accumulatorSize = new AtomicDouble(0);
15+
protected IAccumulatorLengthFunction<E> accumulatorLengthFunction;
16+
17+
public AbstractAccumulator(double byteSizeLimit){
18+
this.byteSizeLimit = byteSizeLimit;
19+
this.accumulatorLengthFunction = accumulatorLengthFunction();
20+
}
21+
22+
23+
@Override
24+
public boolean add(E obj) {
25+
26+
double size = objectSize(obj);
27+
28+
if(shouldBecomeFull(size)){
29+
return false;
30+
}
31+
32+
accumulator.add(obj);
33+
accumulatorSize.addAndGet(size);
34+
return true;
35+
}
36+
37+
38+
private double objectSize(E obj)
39+
{
40+
return accumulatorLengthFunction.apply(obj);
41+
}
42+
43+
private boolean shouldBecomeFull(double size){
44+
return (accumulatorSize.get() + size > byteSizeLimit);
45+
}
46+
}

0 commit comments

Comments
 (0)