Commit 05f0f0ca authored by lihao's avatar lihao

提交多线程任务执行

parent a0119f45
......@@ -4,6 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<modules>
<!-- <module>redisson-lock</module>-->
<module>task-executor-kit</module>
</modules>
<groupId>com.mushiny</groupId>
<artifactId>pubTools</artifactId>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>pubTools</artifactId>
<groupId>com.mushiny</groupId>
<version>1.0.0-RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>task-executor-kit</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.mushiny.task;
import java.util.Collection;
import java.util.function.BiFunction;
/**
* @author lihao
*/
public class TaskDefinition<T, U, R> {
/**
* 处理的业务数据
*/
private Collection<T> data;
/**
* 业务逻辑
*/
private BiFunction<T, U, R> handler;
/**
* 共有变量,推荐只用于查询,不推荐更新数据
*/
private final U context;
public TaskDefinition(Collection<T> data, BiFunction<T, U, R> handler, U context) {
this.data = data;
this.handler = handler;
this.context = context;
}
public Collection<T> getData() {
return data;
}
public void setData(Collection<T> data) {
this.data = data;
}
public BiFunction<T, U, R> getHandler() {
return handler;
}
public void setHandler(BiFunction<T, U, R> handler) {
this.handler = handler;
}
public U getContext() {
return context;
}
}
package com.mushiny.task;
import java.util.Collection;
import java.util.concurrent.*;
import java.util.function.BiFunction;
/**
* @author lihao
*/
public class TaskExecutor<T, U, R> {
/**
* 线程池
*/
private ExecutorService pool;
/**
* 执行的任务数据
*/
private TaskDefinition<T, U, R> task;
private CountDownLatch countDownLatch;
public TaskExecutor() {
int i = Runtime.getRuntime().availableProcessors();
int core = Math.min(i / 2, 8);
this.pool = new ThreadPoolExecutor(core, core + 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
public TaskExecutor(ThreadPoolExecutor pool) {
this.pool = pool;
}
/**
* 初始化函数
*/
public void init(TaskDefinition<T, U, R> task) {
this.task = task;
}
public void init(BiFunction<T, U, R> handler, Collection<T> data, U context) {
init(new TaskDefinition<>(data, handler, context));
}
/**
* 执行函数
*/
public void execute() {
TaskDefinition<T, U, R> t = this.task;
Collection<T> data = t.getData();
countDownLatch = new CountDownLatch(data.size());
data.forEach(d -> pool.execute(() -> {
try {
Object apply = t.getHandler().apply(d, t.getContext());
System.out.println("bi func result: " + apply);
} catch (Exception ex) {
} finally {
countDownLatch.countDown();
}
}));
try {
countDownLatch.await();
} catch (InterruptedException e) {
} finally {
this.destroy();
}
}
/**
* 销毁函数
*/
public void destroy() {
this.task = null;
this.countDownLatch = null;
this.pool = null;
}
}
package com.mushiny.task;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author lihao
*/
public class TaskPoolFactory {
TaskPoolFactory() {
// ignore
}
public static <T, U, R> TaskExecutor<T, U, R> createExecutor() {
return new TaskExecutor<>();
}
public static <T, U, R> TaskExecutor<T, U, R> createExecutor(ThreadPoolExecutor pool) {
return new TaskExecutor<>(pool);
}
}
package com.mushiny.task;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TaskTests {
private List<User> list;
@Before
public void init() {
list = new ArrayList<>();
for (int i = 0; i < 20; i++) {
list.add(new User(i, "leehyoo" + i));
}
}
@Test
public void multiTaskLoopNoArgsTest() {
long startTime = System.currentTimeMillis();
TaskExecutor<User, User, String> executor = TaskPoolFactory.createExecutor();
TaskDefinition<User, User, String> taskDefinition = new TaskDefinition<>(list, (data, context) -> {
System.out.println("enter bi func, current Thread:" + Thread.currentThread());
System.out.println(data);
data.setName(data.getName() + "handle");
System.out.println(context);
System.out.println("exit bi func");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "success";
}, new User(30, "leehyoo"));
executor.init((data, context) -> {
System.out.println("enter bi func, current Thread:" + Thread.currentThread());
System.out.println(data);
data.setName(data.getName() + "handle");
System.out.println(context);
System.out.println("exit bi func");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "success";
}, list, new User(30, "leehyoo"));
executor.execute();
System.out.println("finally over. cost: " + (System.currentTimeMillis() - startTime));
}
@Test
public void normalLoopTest() {
long startTime = System.currentTimeMillis();
User context = new User(30, "leehyoo");
list.forEach(data -> {
System.out.println("enter bi func, current Thread:" + Thread.currentThread());
System.out.println(data);
data.setName(data.getName() + "handle");
System.out.println(context);
System.out.println("exit bi func");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("finally over. cost: " + (System.currentTimeMillis() - startTime));
}
@Test
public void multiTaskLoopWithArgsTest() {
long startTime = System.currentTimeMillis();
TaskExecutor<User, User, String> executor = TaskPoolFactory.createExecutor(new ThreadPoolExecutor(4, 4,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()));
TaskDefinition<User, User, String> taskDefinition = new TaskDefinition<>(list, (data, context) -> {
System.out.println("enter bi func, current Thread:" + Thread.currentThread());
System.out.println(data);
data.setName(data.getName() + "handle");
System.out.println(context);
System.out.println("exit bi func");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "success";
}, new User(30, "leehyoo"));
executor.init((data, context) -> {
System.out.println("enter bi func, current Thread:" + Thread.currentThread());
System.out.println(data);
data.setName(data.getName() + "handle");
System.out.println(context);
System.out.println("exit bi func");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "success";
}, list, new User(30, "leehyoo"));
executor.execute();
System.out.println("finally over. cost: " + (System.currentTimeMillis() - startTime));
}
}
package com.mushiny.task;
public class User {
private int age;
private String name;
public int getAge() {
return age;
}
public User(int age, String name) {
this.age = age;
this.name = name;
}
public void setAge(int age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "User{" +
"age=" + age +
", name='" + name + '\'' +
'}';
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment