Commit 672c751f authored by liuxingyu's avatar liuxingyu

分布式锁代码提交

parent f54c4339
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<modules>
<!-- <module>redisson-lock</module>-->
<module>task-executor-kit</module>
</modules>
<groupId>com.mushiny</groupId>
<artifactId>pubTools</artifactId>
<version>1.0.0-RELEASE</version>
<packaging>pom</packaging>
<properties>
<java.version>8</java.version>
</properties>
<distributionManagement>
<snapshotRepository>
<id>maven-snapshots</id>
<name>maven-snapshots</name>
<url>http://192.168.3.160:8081/repository/maven-snapshots/</url>
</snapshotRepository>
<repository>
<id>maven-release</id>
<name>maven-release</name>
<url>http://192.168.3.160:8081/repository/maven-releases/</url>
</repository>
</distributionManagement>
</project>
package com.mushiny.redisson.config;
import com.mushiny.redisson.utils.RedissonLockUtil;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ObjectUtils;
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(value = RedissonProperties.class)
public class RedissonAutoConfiguration {
@Bean
@ConditionalOnMissingBean(RedissonClient.class)
public RedissonClient redissonClient(RedissonProperties redissonProperties) {
Config config = new Config();
//调用 useSingleServer 方法,选择单机模式,并指定 Redis 服务器的地址
config.useSingleServer().setAddress("redis://" + redissonProperties.getHost() + ":" + redissonProperties.getPort()).setPassword(redissonProperties.getPassword());
//调用 setConnectionMinimumIdleSize 方法,设置连接池最小空闲连接数为 10。
config.useSingleServer().setConnectionMinimumIdleSize(10);
//调用 Redisson.create 方法,创建一个 RedissonClient 对象
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
@Bean
public RedissonLockUtil redissonLockUtil(RedissonClient client) {
if (ObjectUtils.isEmpty(client)) {
throw new RuntimeException();
}
return new RedissonLockUtil(client);
}
}
package com.mushiny.redisson.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
@Data
@ConfigurationProperties(prefix = "spring.redis")
public class RedissonProperties {
private String host;
private String password;
private String port;
}
...@@ -20,22 +20,25 @@ import java.util.concurrent.TimeUnit; ...@@ -20,22 +20,25 @@ import java.util.concurrent.TimeUnit;
* @since 2023/8/7 13:27 * @since 2023/8/7 13:27
*/ */
@Slf4j @Slf4j
@Component
public class RedissonLockUtil { public class RedissonLockUtil {
public static final Long WAITTIME = 10000L; public static final Long WAITTIME = 10000L;
public static final Long RELEASETIME = -1L; public static final Long RELEASETIME = -1L;
@Autowired
RedissonClient redissonClient; public static RedissonClient client;
public RedissonLockUtil(RedissonClient redissonClient) {
client = redissonClient;
}
/** /**
* @param key * @param key
* @param lockType REENTRANT_LOCK:可重入锁 FAIR_LOCK:公平锁 READ_LOCK:红锁 WRITE_LOCK:读写锁 * @param lockType REENTRANT_LOCK:可重入锁 FAIR_LOCK:公平锁 READ_LOCK:红锁 WRITE_LOCK:读写锁
* @return * @return
*/ */
public RLock getLockByKey(String key, int lockType) { public RLock getLockByKey(String key, int lockType) {
if (ObjectUtils.isEmpty(key)) { if (ObjectUtils.isEmpty(key)) {
key = Thread.currentThread().getName() + ":" + Thread.currentThread().getId(); key = Thread.currentThread().getName() + ":" + Thread.currentThread().getId();
} }
...@@ -44,13 +47,13 @@ public class RedissonLockUtil { ...@@ -44,13 +47,13 @@ public class RedissonLockUtil {
} }
switch (lockType) { switch (lockType) {
case 1: case 1:
return redissonClient.getLock(key); return client.getLock(key);
case 2: case 2:
return redissonClient.getFairLock(key); return client.getFairLock(key);
case 3: case 3:
return redissonClient.getReadWriteLock(key).readLock(); return client.getReadWriteLock(key).readLock();
case 4: case 4:
return redissonClient.getReadWriteLock(key).writeLock(); return client.getReadWriteLock(key).writeLock();
default: default:
throw new RuntimeException("do not support lock type:" + lockType); throw new RuntimeException("do not support lock type:" + lockType);
} }
...@@ -61,7 +64,7 @@ public class RedissonLockUtil { ...@@ -61,7 +64,7 @@ public class RedissonLockUtil {
* *
* @param rLock * @param rLock
*/ */
public void lock(RLock rLock) { public void lock(RLock rLock) {
rLock.lock(); rLock.lock();
} }
...@@ -70,7 +73,7 @@ public class RedissonLockUtil { ...@@ -70,7 +73,7 @@ public class RedissonLockUtil {
* *
* @param rLock * @param rLock
*/ */
public void unlock(RLock rLock) { public void unlock(RLock rLock) {
rLock.unlock(); rLock.unlock();
} }
...@@ -85,7 +88,7 @@ public class RedissonLockUtil { ...@@ -85,7 +88,7 @@ public class RedissonLockUtil {
* @param type * @param type
* @return * @return
*/ */
public Tuple2<RLock, Boolean> tryLock(Long waitTime, Long releaseTime, String key, int type) { public Tuple2<RLock, Boolean> tryLock(Long waitTime, Long releaseTime, String key, int type) {
boolean lock = false; boolean lock = false;
RLock rLock = null; RLock rLock = null;
if (ObjectUtils.isEmpty(waitTime)) { if (ObjectUtils.isEmpty(waitTime)) {
...@@ -110,7 +113,7 @@ public class RedissonLockUtil { ...@@ -110,7 +113,7 @@ public class RedissonLockUtil {
* @param rLock * @param rLock
* @return * @return
*/ */
public boolean releaseLock(RLock rLock) { public boolean releaseLock(RLock rLock) {
String threadName = Thread.currentThread().getName() + ":" + Thread.currentThread().getId(); String threadName = Thread.currentThread().getName() + ":" + Thread.currentThread().getId();
try { try {
rLock.unlock(); rLock.unlock();
......
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.mushiny.task.executor.autoconfig.TaskExecutorAutoConfiguration com.mushiny.redisson.config.RedissonAutoConfiguration
\ No newline at end of file \ No newline at end of file
# 项目介绍
多线程处理列表逻辑
# 使用方式
1. 引入依赖
```xml
<groupId>com.mushiny</groupId>
<artifactId>task-executor-kit</artifactId>
<version>1.0.0-RELEASE</version>
```
2. 获取任务执行实例
```java
TaskPoolFactory taskPoolFactory = new TaskPoolFactory();
TaskExecutor<User, User, String> executor = taskPoolFactory.createExecutor();
```
3. 初始化执行器
```java
executor.init(
(data, context) -> {
data.setName(data.getName() + "handle");
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "success";
}, // 需要处理的业务逻辑
list, // 需要处理的业务数据列表
new User(30, "leehyoo") // 其他公用查询参数
);
```
4. 获取查询结果
```java
boolean result = executor.execute();
```
# 注意事项
1. 没有事务处理
2. 尽量避免在循环中查询数据库
3. context内容建议不要修改,只作为逻辑判断、查询等使用
4. 尽量使用自定义的线程池
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mushiny</groupId>
<artifactId>task-executor-kit</artifactId>
<version>1.0.0-RELEASE</version>
<packaging>pom</packaging>
<modules>
<module>task-executor-core</module>
<module>task-executor-spring-boot-starter</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.27</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>task-executor-kit</artifactId>
<groupId>com.mushiny</groupId>
<version>1.0.0-RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>task-executor-core</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.mushiny.task.executor.core;
import java.util.Collection;
import java.util.function.BiFunction;
import java.util.function.Consumer;
/**
* @author lihao
*/
public class TaskDefinition<T, U, R> {
/**
* 处理的业务数据
*/
private Collection<T> data;
/**
* 业务逻辑
*/
private BiFunction<T, U, R> handler;
private Consumer<T> exceptionHandler;
/**
* 共有变量,推荐只用于查询,不推荐更新数据
*/
private final U context;
public TaskDefinition(Collection<T> data, BiFunction<T, U, R> handler, U context) {
this.data = data;
this.handler = handler;
this.context = context;
}
public TaskDefinition(Collection<T> data, BiFunction<T, U, R> handler, U context, Consumer<T> exceptionHandler) {
this.data = data;
this.handler = handler;
this.exceptionHandler = exceptionHandler;
this.context = context;
}
public Collection<T> getData() {
return data;
}
public void setData(Collection<T> data) {
this.data = data;
}
public BiFunction<T, U, R> getHandler() {
return handler;
}
public void setHandler(BiFunction<T, U, R> handler) {
this.handler = handler;
}
public U getContext() {
return context;
}
public Consumer<T> getExceptionHandler() {
return exceptionHandler;
}
public void setExceptionHandler(Consumer<T> exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
}
package com.mushiny.task.executor.core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
/**
* @author lihao
*/
public class TaskExecutor<T, U, R> {
private static final Logger log = LoggerFactory.getLogger(TaskExecutor.class);
/**
* 线程池
*/
private ExecutorService pool;
/**
* 执行的任务数据
*/
private TaskDefinition<T, U, R> task;
private CountDownLatch countDownLatch;
private final Long countDownTimeOut;
private final TimeUnit timeUnit;
public TaskExecutor(ThreadPoolExecutor pool, Long countDownTimeOut, TimeUnit timeUnit) {
this.pool = pool;
this.countDownTimeOut = countDownTimeOut;
this.timeUnit = timeUnit;
}
/**
* 初始化函数
*/
public void init(TaskDefinition<T, U, R> task) {
this.task = task;
}
public void init(BiFunction<T, U, R> handler, Collection<T> data, U context) {
init(new TaskDefinition<>(data, handler, context));
}
public void init(BiFunction<T, U, R> handler, Collection<T> data, U context, Consumer<T> exceptionHandler) {
init(new TaskDefinition<>(data, handler, context, exceptionHandler));
}
/**
* 执行函数
*/
public boolean execute() {
if (log.isInfoEnabled()) {
log.info("task start ======");
}
TaskDefinition<T, U, R> t = this.task;
Collection<T> data = t.getData();
if (null == data || data.isEmpty()) {
log.warn("data is empty");
return false;
}
countDownLatch = new CountDownLatch(data.size());
AtomicInteger failCount = new AtomicInteger();
data.forEach(d -> pool.execute(() -> {
try {
Object apply = t.getHandler().apply(d, t.getContext());
if (log.isInfoEnabled()) {
log.info("sub task finish ======");
}
} catch (Exception ex) {
if (Objects.nonNull(t.getExceptionHandler())) {
t.getExceptionHandler().accept(d);
}
failCount.addAndGet(1);
} finally {
// await超时会导致,loop里面继续执行,但是已经执行destroy方法将latch对象销毁
if (Objects.nonNull(countDownLatch)) {
countDownLatch.countDown();
}
}
}));
try {
return countDownLatch.await(countDownTimeOut, timeUnit)
&& failCount.get() <= 0;
} catch (InterruptedException e) {
return false;
} finally {
this.destroy();
}
}
/**
* 销毁函数
*/
public void destroy() {
this.task = null;
this.countDownLatch = null;
this.pool = null;
}
}
package com.mushiny.task.executor.core;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author lihao
*/
public class TaskPoolFactory {
private ThreadPoolExecutor threadPoolExecutor;
public TaskPoolFactory() {
int core = Runtime.getRuntime().availableProcessors();
int poolSize = Math.min(core / 2, 4);
this.threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize + 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
}
public TaskPoolFactory(ThreadPoolExecutor threadPoolExecutor) {
this.threadPoolExecutor = threadPoolExecutor;
}
public <T, U, R> TaskExecutor<T, U, R> createExecutor() {
return createExecutor(15L, TimeUnit.SECONDS);
}
public <T, U, R> TaskExecutor<T, U, R> createExecutor(Long timeOut, TimeUnit timeUnit) {
return new TaskExecutor<>(this.threadPoolExecutor, timeOut, timeUnit);
}
}
package com.mushiny.task.executor.core;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class TaskTests {
private int SLEEP_TIME = 100;
private List<User> list;
@Before
public void init() {
list = new ArrayList<>();
for (int i = 0; i < 200; i++) {
list.add(new User(i, "leehyoo" + i));
}
}
@Test
public void multiTaskLoopNoArgsTest() {
TaskPoolFactory taskPoolFactory = new TaskPoolFactory();
TaskExecutor<User, User, String> executor = taskPoolFactory.createExecutor();
executor.init((data, context) -> {
data.setName(data.getName() + "handle");
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "success";
}, list, new User(30, "leehyoo"));
boolean result = executor.execute();
Assert.assertTrue(result);
}
@Test
public void multiTaskLoopWithArgsTest() {
TaskPoolFactory taskPoolFactory = new TaskPoolFactory();
TaskExecutor<User, User, String> executor = taskPoolFactory.createExecutor(60L, TimeUnit.SECONDS);
executor.init((data, context) -> {
data.setName(data.getName() + "handle");
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "success";
}, list, new User(30, "leehyoo"));
boolean result = executor.execute();
Assert.assertTrue(result);
}
@Test
public void failCaseTest() {
TaskPoolFactory taskPoolFactory = new TaskPoolFactory();
TaskExecutor<User, User, String> executor = taskPoolFactory.createExecutor();
executor.init((data, context) -> {
data.setName(data.getName() + "handle");
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
context.getAge();
return "success";
}, list, null);
boolean result = executor.execute();
Assert.assertFalse(result);
}
@Test
public void errorRecordCallbackTest() {
TaskPoolFactory taskPoolFactory = new TaskPoolFactory();
TaskExecutor<User, User, String> executor = taskPoolFactory.createExecutor();
List<User> errors = Collections.synchronizedList(new ArrayList<>());
executor.init((data, context) -> {
data.setName(data.getName() + "handle");
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
context.getAge();
return "success";
}, list, null, errors::add);
boolean result = executor.execute();
System.out.println("========= start ===========");
errors.forEach(System.out::println);
System.out.println("========= end ===========");
Assert.assertFalse(result);
}
@Test
public void normalLoopTest() {
User context = new User(30, "leehyoo");
list.forEach(data -> {
data.setName(data.getName() + "handle");
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
package com.mushiny.task.executor.core;
public class User {
private int age;
private String name;
public int getAge() {
return age;
}
public User(int age, String name) {
this.age = age;
this.name = name;
}
public void setAge(int age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "User{" +
"age=" + age +
", name='" + name + '\'' +
'}';
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>task-executor-kit</artifactId>
<groupId>com.mushiny</groupId>
<version>1.0.0-RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>task-executor-spring-boot-starter</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spring-boot.version>2.3.12.RELEASE</spring-boot.version>
<spring-cloud-alibaba.version>2.2.7.RELEASE</spring-cloud-alibaba.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring-boot.version}</version>
<scope>provided</scope>
</dependency>
<!-- 自动依赖配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<version>${spring-boot.version}</version>
<scope>provided</scope>
</dependency>
<!-- 将被@ConfigurationProperties注解的类的属性注入到元属性 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<version>${spring-boot.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.mushiny</groupId>
<artifactId>task-executor-core</artifactId>
<version>1.0.0-RELEASE</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.mushiny.task.executor.autoconfig;
import com.mushiny.task.executor.core.TaskPoolFactory;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(value = TaskExecutorProperties.class)
public class TaskExecutorAutoConfiguration {
@Bean
public TaskPoolFactory taskPoolFactory (TaskExecutorProperties taskExecutorProperties) {
TaskPoolConfig pool = taskExecutorProperties.getPool();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(pool.getCorePoolSize(), pool.getMaximumPoolSize(), pool.getKeepAliveTime(), pool.getTimeUnit(), new LinkedBlockingQueue<>());
return new TaskPoolFactory(threadPoolExecutor);
}
}
package com.mushiny.task.executor.autoconfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
@ConfigurationProperties(prefix = "com.mushiny.tools.task.executor")
public class TaskExecutorProperties {
private Boolean enabled;
@NestedConfigurationProperty
private TaskPoolConfig pool;
public Boolean getEnabled() {
return enabled;
}
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}
public TaskPoolConfig getPool() {
return pool;
}
public void setPool(TaskPoolConfig pool) {
this.pool = pool;
}
}
package com.mushiny.task.executor.autoconfig;
import java.util.concurrent.TimeUnit;
public class TaskPoolConfig {
private Integer corePoolSize;
private Integer maximumPoolSize;
private Long keepAliveTime;
private TimeUnit timeUnit;
public Integer getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize(Integer corePoolSize) {
this.corePoolSize = corePoolSize;
}
public Integer getMaximumPoolSize() {
return maximumPoolSize;
}
public void setMaximumPoolSize(Integer maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
}
public Long getKeepAliveTime() {
return keepAliveTime;
}
public void setKeepAliveTime(Long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}
public TimeUnit getTimeUnit() {
return timeUnit;
}
public void setTimeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment