1.创建线程池:
ThreadPoolConfig
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ThreadPoolConfig {
/** 核心线程数(默认线程数) */
private static final int corePoolSize = 5;
/** 最大线程数 */
private static final int maxPoolSize = 8;
/** 允许线程空闲时间(单位:默认为秒) */
private static final int keepAliveTime = 60;
/** 缓冲队列大小 */
private static final int queueCapacity = 100;
/** 线程池名前缀 */
private static final String threadNamePrefix = "DirectSearchEsService-";
@Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
public ThreadPoolTaskExecutor taskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveTime);
executor.setThreadNamePrefix(threadNamePrefix);
// 线程池对拒绝任务的处理策略
// CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
}
2.启动类开启异步:
启动类
@SpringBootApplication
@EnableTransactionManagement
@EnableAsync
public class WalletMaintainApplication {
public static void main(String[] args) {
SpringApplication.run(WalletMaintainApplication.class, args);
}
}
3.编写异步方法:
blockChainUtil
@Service
public class blockChainUtil {
//线程池bean名称,可不填。
@Async("taskExecutor")
public Future<ArrayList> getBatchBalance(ArrayList accountList, Integer index){
ArrayList<Object> balanceList = new ArrayList<>();
//此处省略部分业务处理过程
//多线程异步执行返回值必须为Future类型。
return new AsyncResult<>(balanceList);
}
}
4.调用异步方法:
getAllUser
@DS("Local")
@Async("taskExecutor")
@Override
public Map getAllUser() {
//获取所有用户数据
List<UserAccountInfoBean> userAccountInfoBeans = baseMapper.selectList(null);
System.out.println("一共有" + userAccountInfoBeans.size() + "个用户!");
//根据总用户数量分割,30000用户一组。
List<List<String>> lists = Lists.partition(userAccountInfoBeans, 30000);
//执行多线程任务
BlockingQueue<Future<ArrayList>> queue = new LinkedBlockingQueue();
Future<ArrayList> future;
//循环调用查询
for (int i = 0; i < lists.size(); i++) {
List<String> itemList = castList(lists.get(i), String.class);
Integer index = i + 1;
future = blockChainUtil.getBatchBalance((ArrayList) itemList, index);
queue.add(future);
}
//拼装结果
for (int i = 0; i < queue.size(); i++) {
ArrayList batchBalance = null;
try {
batchBalance = queue.take().get();
} catch (InterruptedException e) {
System.out.println("服务中断异常!");
} catch (ExecutionException e) {
System.out.println("服务执行异常!");
}
//取实际返回的有效数据
for (int j = 0; j < batchBalance.size(); j++) {
String str = batchBalance.get(j).toString();
//此处省略部分业务处理过程
}
}
return null;
}