Commit 85311783 authored by lihao's avatar lihao

add callback for record with exception

parent 5c6e8115
...@@ -2,6 +2,7 @@ package com.mushiny.task; ...@@ -2,6 +2,7 @@ package com.mushiny.task;
import java.util.Collection; import java.util.Collection;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer;
/** /**
* @author lihao * @author lihao
...@@ -18,6 +19,8 @@ public class TaskDefinition<T, U, R> { ...@@ -18,6 +19,8 @@ public class TaskDefinition<T, U, R> {
*/ */
private BiFunction<T, U, R> handler; private BiFunction<T, U, R> handler;
private Consumer<T> exceptionHandler;
/** /**
* 共有变量,推荐只用于查询,不推荐更新数据 * 共有变量,推荐只用于查询,不推荐更新数据
*/ */
...@@ -29,6 +32,13 @@ public class TaskDefinition<T, U, R> { ...@@ -29,6 +32,13 @@ public class TaskDefinition<T, U, R> {
this.context = context; this.context = context;
} }
public TaskDefinition(Collection<T> data, BiFunction<T, U, R> handler, U context, Consumer<T> exceptionHandler) {
this.data = data;
this.handler = handler;
this.exceptionHandler = exceptionHandler;
this.context = context;
}
public Collection<T> getData() { public Collection<T> getData() {
return data; return data;
} }
...@@ -48,4 +58,12 @@ public class TaskDefinition<T, U, R> { ...@@ -48,4 +58,12 @@ public class TaskDefinition<T, U, R> {
public U getContext() { public U getContext() {
return context; return context;
} }
public Consumer<T> getExceptionHandler() {
return exceptionHandler;
}
public void setExceptionHandler(Consumer<T> exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
} }
package com.mushiny.task; package com.mushiny.task;
import java.util.Collection; import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer;
/** /**
* @author lihao * @author lihao
...@@ -22,9 +24,9 @@ public class TaskExecutor<T, U, R> { ...@@ -22,9 +24,9 @@ public class TaskExecutor<T, U, R> {
private CountDownLatch countDownLatch; private CountDownLatch countDownLatch;
private Long countDownTimeOut; private final Long countDownTimeOut;
private TimeUnit timeUnit; private final TimeUnit timeUnit;
public TaskExecutor(ThreadPoolExecutor pool, Long countDownTimeOut, TimeUnit timeUnit) { public TaskExecutor(ThreadPoolExecutor pool, Long countDownTimeOut, TimeUnit timeUnit) {
this.pool = pool; this.pool = pool;
...@@ -44,6 +46,10 @@ public class TaskExecutor<T, U, R> { ...@@ -44,6 +46,10 @@ public class TaskExecutor<T, U, R> {
init(new TaskDefinition<>(data, handler, context)); init(new TaskDefinition<>(data, handler, context));
} }
public void init(BiFunction<T, U, R> handler, Collection<T> data, U context, Consumer<T> exceptionHandler) {
init(new TaskDefinition<>(data, handler, context, exceptionHandler));
}
/** /**
* 执行函数 * 执行函数
*/ */
...@@ -56,6 +62,9 @@ public class TaskExecutor<T, U, R> { ...@@ -56,6 +62,9 @@ public class TaskExecutor<T, U, R> {
try { try {
Object apply = t.getHandler().apply(d, t.getContext()); Object apply = t.getHandler().apply(d, t.getContext());
} catch (Exception ex) { } catch (Exception ex) {
if (Objects.nonNull(t.getExceptionHandler())) {
t.getExceptionHandler().accept(d);
}
failCount.addAndGet(1); failCount.addAndGet(1);
} finally { } finally {
countDownLatch.countDown(); countDownLatch.countDown();
......
...@@ -98,4 +98,28 @@ public class TaskTests { ...@@ -98,4 +98,28 @@ public class TaskTests {
boolean result = executor.execute(); boolean result = executor.execute();
Assert.assertFalse(result); Assert.assertFalse(result);
} }
@Test
public void errorRecordCallbackTest() {
TaskExecutor<User, User, String> executor = TaskPoolFactory.createExecutor();
List<User> errors = new ArrayList<>();
executor.init((data, context) -> {
data.setName(data.getName() + "handle");
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
context.getAge();
return "success";
}, list, null, errors::add);
boolean result = executor.execute();
System.out.println("========= start ===========");
errors.forEach(System.out::println);
System.out.println("========= end ===========");
Assert.assertFalse(result);
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment