Commit eb545c9c authored by lihao's avatar lihao

Merge branch 'feature/task-executor' into develop

parents 7bd17f04 462f90e9
...@@ -5,36 +5,37 @@ ...@@ -5,36 +5,37 @@
1. 引入依赖 1. 引入依赖
```xml ```xml
<groupId>com.mushiny</groupId> <groupId>com.mushiny</groupId>
<artifactId>task-executor-kit</artifactId> <artifactId>task-executor-kit</artifactId>
<version>1.0.0-RELEASE</version> <version>1.0.0-RELEASE</version>
``` ```
2. 获取任务执行实例 2. 获取任务执行实例
```java ```java
TaskExecutor<User, User, String> executor = TaskPoolFactory.createExecutor(); TaskPoolFactory taskPoolFactory = new TaskPoolFactory();
TaskExecutor<User, User, String> executor = taskPoolFactory.createExecutor();
``` ```
3. 初始化执行器 3. 初始化执行器
```java ```java
executor.init( executor.init(
(data, context) -> { (data, context) -> {
data.setName(data.getName() + "handle"); data.setName(data.getName() + "handle");
try { try {
Thread.sleep(SLEEP_TIME); Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
return "success"; return "success";
}, // 需要处理的业务逻辑 }, // 需要处理的业务逻辑
list, // 需要处理的业务数据列表 list, // 需要处理的业务数据列表
new User(30, "leehyoo") // 其他公用查询参数 new User(30, "leehyoo") // 其他公用查询参数
); );
``` ```
4. 获取查询结果 4. 获取查询结果
```java ```java
boolean result = executor.execute(); boolean result = executor.execute();
``` ```
# 注意事项 # 注意事项
......
...@@ -2,26 +2,30 @@ ...@@ -2,26 +2,30 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>pubTools</artifactId>
<groupId>com.mushiny</groupId>
<version>1.0.0-RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.mushiny</groupId>
<artifactId>task-executor-kit</artifactId> <artifactId>task-executor-kit</artifactId>
<version>1.0.0-RELEASE</version>
<packaging>pom</packaging>
<modules>
<module>task-executor-core</module>
<module>task-executor-spring-boot-starter</module>
</modules>
<properties> <properties>
<maven.compiler.source>8</maven.compiler.source> <maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target> <maven.compiler.target>8</maven.compiler.target>
</properties> </properties>
<dependencies> <dependencyManagement>
<dependency> <dependencies>
<groupId>junit</groupId> <dependency>
<artifactId>junit</artifactId> <groupId>junit</groupId>
<version>4.13.2</version> <artifactId>junit</artifactId>
<scope>test</scope> <version>4.13.2</version>
</dependency> <scope>test</scope>
</dependencies> </dependency>
</dependencies>
</dependencyManagement>
</project> </project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>task-executor-kit</artifactId>
<groupId>com.mushiny</groupId>
<version>1.0.0-RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>task-executor-core</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.mushiny.task; package com.mushiny.task.executor.core;
import java.util.Collection; import java.util.Collection;
import java.util.function.BiFunction; import java.util.function.BiFunction;
......
package com.mushiny.task; package com.mushiny.task.executor.core;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.*; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer; import java.util.function.Consumer;
...@@ -62,6 +64,7 @@ public class TaskExecutor<T, U, R> { ...@@ -62,6 +64,7 @@ public class TaskExecutor<T, U, R> {
} }
countDownLatch = new CountDownLatch(data.size()); countDownLatch = new CountDownLatch(data.size());
AtomicInteger failCount = new AtomicInteger(); AtomicInteger failCount = new AtomicInteger();
data.forEach(d -> pool.execute(() -> { data.forEach(d -> pool.execute(() -> {
try { try {
Object apply = t.getHandler().apply(d, t.getContext()); Object apply = t.getHandler().apply(d, t.getContext());
......
package com.mushiny.task; package com.mushiny.task.executor.core;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
...@@ -8,22 +8,22 @@ import java.util.concurrent.TimeUnit; ...@@ -8,22 +8,22 @@ import java.util.concurrent.TimeUnit;
* @author lihao * @author lihao
*/ */
public class TaskPoolFactory { public class TaskPoolFactory {
TaskPoolFactory() { public TaskPoolFactory() {
// ignore // ignore
} }
public static <T, U, R> TaskExecutor<T, U, R> createExecutor() { public <T, U, R> TaskExecutor<T, U, R> createExecutor() {
int core = Runtime.getRuntime().availableProcessors(); int core = Runtime.getRuntime().availableProcessors();
int poolSize = Math.min(core / 2, 4); int poolSize = Math.min(core / 2, 4);
return createExecutor(new ThreadPoolExecutor(poolSize, poolSize + 1, 0L, TimeUnit.MILLISECONDS, return createExecutor(new ThreadPoolExecutor(poolSize, poolSize + 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()), 15L, TimeUnit.SECONDS); new LinkedBlockingQueue<>()), 15L, TimeUnit.SECONDS);
} }
public static <T, U, R> TaskExecutor<T, U, R> createExecutor(ThreadPoolExecutor pool) { public <T, U, R> TaskExecutor<T, U, R> createExecutor(ThreadPoolExecutor pool) {
return createExecutor(pool, 15L, TimeUnit.SECONDS); return createExecutor(pool, 15L, TimeUnit.SECONDS);
} }
public static <T, U, R> TaskExecutor<T, U, R> createExecutor(ThreadPoolExecutor pool, Long timeOut, TimeUnit timeUnit) { public <T, U, R> TaskExecutor<T, U, R> createExecutor(ThreadPoolExecutor pool, Long timeOut, TimeUnit timeUnit) {
return new TaskExecutor<>(pool, timeOut, timeUnit); return new TaskExecutor<>(pool, timeOut, timeUnit);
} }
} }
package com.mushiny.task; package com.mushiny.task.executor.core;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
...@@ -12,21 +13,22 @@ import java.util.concurrent.TimeUnit; ...@@ -12,21 +13,22 @@ import java.util.concurrent.TimeUnit;
public class TaskTests { public class TaskTests {
private int SLEEP_TIME = 1000; private int SLEEP_TIME = 100;
private List<User> list; private List<User> list;
@Before @Before
public void init() { public void init() {
list = new ArrayList<>(); list = new ArrayList<>();
for (int i = 0; i < 20; i++) { for (int i = 0; i < 200; i++) {
list.add(new User(i, "leehyoo" + i)); list.add(new User(i, "leehyoo" + i));
} }
} }
@Test @Test
public void multiTaskLoopNoArgsTest() { public void multiTaskLoopNoArgsTest() {
TaskExecutor<User, User, String> executor = TaskPoolFactory.createExecutor(); TaskPoolFactory taskPoolFactory = new TaskPoolFactory();
TaskExecutor<User, User, String> executor = taskPoolFactory.createExecutor();
executor.init((data, context) -> { executor.init((data, context) -> {
data.setName(data.getName() + "handle"); data.setName(data.getName() + "handle");
try { try {
...@@ -42,26 +44,12 @@ public class TaskTests { ...@@ -42,26 +44,12 @@ public class TaskTests {
} }
@Test
public void normalLoopTest() {
User context = new User(30, "leehyoo");
list.forEach(data -> {
data.setName(data.getName() + "handle");
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
@Test @Test
public void multiTaskLoopWithArgsTest() { public void multiTaskLoopWithArgsTest() {
TaskExecutor<User, User, String> executor = TaskPoolFactory.createExecutor(new ThreadPoolExecutor(5, 6, TaskPoolFactory taskPoolFactory = new TaskPoolFactory();
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>())); TaskExecutor<User, User, String> executor = taskPoolFactory.createExecutor(new ThreadPoolExecutor(8, 9,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()), 60L, TimeUnit.SECONDS);
executor.init((data, context) -> { executor.init((data, context) -> {
data.setName(data.getName() + "handle"); data.setName(data.getName() + "handle");
...@@ -82,7 +70,8 @@ public class TaskTests { ...@@ -82,7 +70,8 @@ public class TaskTests {
@Test @Test
public void failCaseTest() { public void failCaseTest() {
TaskExecutor<User, User, String> executor = TaskPoolFactory.createExecutor(); TaskPoolFactory taskPoolFactory = new TaskPoolFactory();
TaskExecutor<User, User, String> executor = taskPoolFactory.createExecutor();
executor.init((data, context) -> { executor.init((data, context) -> {
data.setName(data.getName() + "handle"); data.setName(data.getName() + "handle");
...@@ -102,8 +91,9 @@ public class TaskTests { ...@@ -102,8 +91,9 @@ public class TaskTests {
@Test @Test
public void errorRecordCallbackTest() { public void errorRecordCallbackTest() {
TaskExecutor<User, User, String> executor = TaskPoolFactory.createExecutor(); TaskPoolFactory taskPoolFactory = new TaskPoolFactory();
List<User> errors = new ArrayList<>(); TaskExecutor<User, User, String> executor = taskPoolFactory.createExecutor();
List<User> errors = Collections.synchronizedList(new ArrayList<>());
executor.init((data, context) -> { executor.init((data, context) -> {
data.setName(data.getName() + "handle"); data.setName(data.getName() + "handle");
try { try {
...@@ -122,4 +112,18 @@ public class TaskTests { ...@@ -122,4 +112,18 @@ public class TaskTests {
Assert.assertFalse(result); Assert.assertFalse(result);
} }
@Test
public void normalLoopTest() {
User context = new User(30, "leehyoo");
list.forEach(data -> {
data.setName(data.getName() + "handle");
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
} }
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>task-executor-kit</artifactId>
<groupId>com.mushiny</groupId>
<version>1.0.0-RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>task-executor-spring-boot-starter</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spring-boot.version>2.3.12.RELEASE</spring-boot.version>
<spring-cloud-alibaba.version>2.2.7.RELEASE</spring-cloud-alibaba.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring-boot.version}</version>
<scope>provided</scope>
</dependency>
<!-- 自动依赖配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<version>${spring-boot.version}</version>
<scope>provided</scope>
</dependency>
<!-- 将被@ConfigurationProperties注解的类的属性注入到元属性 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<version>${spring-boot.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.mushiny</groupId>
<artifactId>task-executor-core</artifactId>
<version>1.0.0-RELEASE</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.mushiny.task.executor.autoconfig;
import com.mushiny.task.executor.core.TaskPoolFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(value = TaskExecutorProperties.class)
public class TaskExecutorAutoConfiguration {
@Bean("listLoopThreadPool")
@ConditionalOnMissingBean(name = { "listLoopThreadPool" } )
public ThreadPoolExecutor listLoopThreadPool(TaskExecutorProperties taskExecutorProperties) {
TaskPoolConfig pool = taskExecutorProperties.getPool();
return new ThreadPoolExecutor(pool.getCorePoolSize(), pool.getMaximumPoolSize(), pool.getKeepAliveTime(), pool.getTimeUnit(), new LinkedBlockingQueue<>());
}
@Bean
@ConditionalOnBean(name = { "listLoopThreadPool" })
public TaskPoolFactory taskPoolFactory () {
return new TaskPoolFactory();
}
}
package com.mushiny.task.executor.autoconfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
@ConfigurationProperties(prefix = "com.mushiny.tools.task.executor")
public class TaskExecutorProperties {
private Boolean enabled;
@NestedConfigurationProperty
private TaskPoolConfig pool;
public Boolean getEnabled() {
return enabled;
}
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}
public TaskPoolConfig getPool() {
return pool;
}
public void setPool(TaskPoolConfig pool) {
this.pool = pool;
}
}
package com.mushiny.task.executor.autoconfig;
import java.util.concurrent.TimeUnit;
public class TaskPoolConfig {
private Integer corePoolSize;
private Integer maximumPoolSize;
private Long keepAliveTime;
private TimeUnit timeUnit;
public Integer getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize(Integer corePoolSize) {
this.corePoolSize = corePoolSize;
}
public Integer getMaximumPoolSize() {
return maximumPoolSize;
}
public void setMaximumPoolSize(Integer maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
}
public Long getKeepAliveTime() {
return keepAliveTime;
}
public void setKeepAliveTime(Long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}
public TimeUnit getTimeUnit() {
return timeUnit;
}
public void setTimeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
}
}
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.mushiny.task.executor.autoconfig.TaskExecutorAutoConfiguration
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment