Commit c3c78695 authored by liuxingyu's avatar liuxingyu

Merge remote-tracking branch 'origin/master'

parents 2ab276cd 7bd17f04
...@@ -2,13 +2,21 @@ ...@@ -2,13 +2,21 @@
多线程处理列表逻辑 多线程处理列表逻辑
# 使用方式 # 使用方式
1. 获取任务执行实例 1. 引入依赖
```xml
<groupId>com.mushiny</groupId>
<artifactId>task-executor-kit</artifactId>
<version>1.0.0-RELEASE</version>
```
2. 获取任务执行实例
```java ```java
TaskExecutor<User, User, String> executor = TaskPoolFactory.createExecutor(); TaskExecutor<User, User, String> executor = TaskPoolFactory.createExecutor();
``` ```
2. 初始化执行器 3. 初始化执行器
```java ```java
executor.init( executor.init(
(data, context) -> { (data, context) -> {
...@@ -24,7 +32,7 @@ ...@@ -24,7 +32,7 @@
new User(30, "leehyoo") // 其他公用查询参数 new User(30, "leehyoo") // 其他公用查询参数
); );
``` ```
3. 获取查询结果 4. 获取查询结果
```java ```java
boolean result = executor.execute(); boolean result = executor.execute();
``` ```
......
package com.mushiny.task; package com.mushiny.task;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
...@@ -56,6 +57,9 @@ public class TaskExecutor<T, U, R> { ...@@ -56,6 +57,9 @@ public class TaskExecutor<T, U, R> {
public boolean execute() { public boolean execute() {
TaskDefinition<T, U, R> t = this.task; TaskDefinition<T, U, R> t = this.task;
Collection<T> data = t.getData(); Collection<T> data = t.getData();
if (null == data || data.isEmpty()) {
return false;
}
countDownLatch = new CountDownLatch(data.size()); countDownLatch = new CountDownLatch(data.size());
AtomicInteger failCount = new AtomicInteger(); AtomicInteger failCount = new AtomicInteger();
data.forEach(d -> pool.execute(() -> { data.forEach(d -> pool.execute(() -> {
...@@ -67,11 +71,15 @@ public class TaskExecutor<T, U, R> { ...@@ -67,11 +71,15 @@ public class TaskExecutor<T, U, R> {
} }
failCount.addAndGet(1); failCount.addAndGet(1);
} finally { } finally {
countDownLatch.countDown(); // await超时会导致,loop里面继续执行,但是已经执行destroy方法将latch对象销毁
if (Objects.nonNull(countDownLatch)) {
countDownLatch.countDown();
}
} }
})); }));
try { try {
return countDownLatch.await(countDownTimeOut, timeUnit) && failCount.get() <= 0; return countDownLatch.await(countDownTimeOut, timeUnit)
&& failCount.get() <= 0;
} catch (InterruptedException e) { } catch (InterruptedException e) {
return false; return false;
} finally { } finally {
......
...@@ -12,7 +12,7 @@ import java.util.concurrent.TimeUnit; ...@@ -12,7 +12,7 @@ import java.util.concurrent.TimeUnit;
public class TaskTests { public class TaskTests {
private int SLEEP_TIME = 100; private int SLEEP_TIME = 1000;
private List<User> list; private List<User> list;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment