作者: whooyun发表于: 2023-04-11 17:44
/**
* 当需要调整品态的SKU比较多时,使用线程池提升处理效率
*
* @param acvGdsAdjustStateList
*/
private void orgGdsAdjustState(List<AcvGdsAdjustState> acvGdsAdjustStateList) {
if (!CollectionUtils.isEmpty(acvGdsAdjustStateList)) {
ExecutorService executor = new ThreadPoolExecutor(5, 10, 5000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
log.info("品态调整,线程池创建成功:{}", executor);
acvGdsAdjustStateList.forEach(item -> {
AsyncAcvGdsAdjustStateGoodsTask task = new AsyncAcvGdsAdjustStateGoodsTask();
task.setAcvGdsAdjustState(item);
executor.execute(task);
});
executor.shutdown();
try {
if (!executor.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
// 超时后强制关闭所有线程
executor.shutdownNow();
log.info("==>品态调整,线程池已关闭");
}
} catch (InterruptedException e) {
log.info("==>品态调整,线程池关闭异常:{}", e.getMessage());
e.printStackTrace();
}
}
}
==============================================
//创建的对象无法被自动注入,只能通过spring上线文去拿
@Slf4j
public class AsyncAcvGdsAdjustStateGoodsExecutor implements Runnable{
//处理state的原子性和可见性问题
private final AtomicReference<AcvGdsAdjustState> stateRef = new AtomicReference<>();;
public void setAcvGdsAdjustState(AcvGdsAdjustState state) {
stateRef.set(state);
}
//业务无线程安全需求,所以run内的方法无需处理
@Override
public void run() {
AcvGdsAdjustStateMapper acvGdsAdjustStateMapper= ApplicationContextProvider.getBeanByClass(AcvGdsAdjustStateMapper.class);
AcvGdsAdjustStateGoodsMapper acvGdsAdjustStateGoodsMapper= ApplicationContextProvider.getBeanByClass(AcvGdsAdjustStateGoodsMapper.class);
AcvOrgGroupMapper acvOrgGroupMapper= ApplicationContextProvider.getBeanByClass(AcvOrgGroupMapper.class);
//log.debug("==> 需要调整品态的机构单据:{}", item);
AcvGdsAdjustState item = stateRef.get();
log.info("当前线程:"+Thread.currentThread().getName()+",拿到的参数:"+item);
log.info("---------------------------------");
QueryWrapper orgGroupQuery = new QueryWrapper();
orgGroupQuery.eq("biz_uid", item.getBizUid());
orgGroupQuery.eq("corp_id", item.getCorpId());
List<AcvOrgGroup> acvOrgGroupList = acvOrgGroupMapper.selectList(orgGroupQuery);
log.info("需要更新的机构数量:{}",acvOrgGroupList);
List<Integer> orgIdList = new ArrayList<>();
acvOrgGroupList.forEach(i -> {
orgIdList.add(i.getGroupOrgId());
});
QueryWrapper stateGoodsQuery = new QueryWrapper();
stateGoodsQuery.eq("bill_id", item.getId());
stateGoodsQuery.eq("corp_id", item.getCorpId());
List<AcvGdsAdjustStateGoods> stateGoodsList = acvGdsAdjustStateGoodsMapper.selectList(stateGoodsQuery);
log.info("需要更新的商品数量获取完成");
stateGoodsList.forEach(i -> {
this.updateOrgGoodsState(Long.valueOf(i.getNewStatusDic()), i.getGoodsId(), orgIdList);
});
log.info("更新商品状态完成");
AcvGdsAdjustState acvGdsAdjustState = new AcvGdsAdjustState();
acvGdsAdjustState.setId(item.getId());
////状态,代码类型:1,代码值:1-草稿,2-审核中,3-待执行,4-已生效,5-已作废,6-已终止
acvGdsAdjustState.setBillStatusDic(CommonConstant.MATH_BYTE_4);
acvGdsAdjustState.setModifiedDtm(LocalDateTime.now());
UpdateWrapper updateWrapper= new UpdateWrapper();
updateWrapper.eq("id", acvGdsAdjustState.getId());
updateWrapper.eq("corp_id", item.getCorpId());
acvGdsAdjustStateMapper.update(acvGdsAdjustState, updateWrapper);
log.info("==>品态调整单:{},处理完毕", acvGdsAdjustState);
log.info("当前线程结束:"+Thread.currentThread().getName()+",拿到的参数:"+item);
}
/**
* 修改机构商品状态
*
* @param stateId
* @param goodsId
* @param idList
*/
private void updateOrgGoodsState(final long stateId, final long goodsId, final List idList) {
AcvGdsStateMapper acvGdsStateMapper= ApplicationContextProvider.getBeanByClass(AcvGdsStateMapper.class);
AcvOrgGoodsMapper acvOrgGoodsMapper= ApplicationContextProvider.getBeanByClass(AcvOrgGoodsMapper.class);
AcvGdsState state = acvGdsStateMapper.selectById(stateId);
log.info("==>获取品态业务数据:{}", state);
UpdateWrapper stateUpdateWrapper = new UpdateWrapper();
/* stateUpdateWrapper.set("state_no", state.getStateNo());
stateUpdateWrapper.set("life_dic", state.getLifeDic());
stateUpdateWrapper.set("modified_dtm",LocalDateTime.now());*/
stateUpdateWrapper.eq("goods_id", goodsId);
stateUpdateWrapper.eq("is_deleted", CommonConstant.MATH_1);
stateUpdateWrapper.in("goods_org_id", idList);
stateUpdateWrapper.eq("corp_id", state.getCorpId());
AcvOrgGoods acvOrgGoods = new AcvOrgGoods();
acvOrgGoods.setStateNo(state.getStateNo());
acvOrgGoods.setLifeDic(state.getLifeDic().byteValue());
acvOrgGoods.setModifiedDtm(LocalDateTime.now());
log.info("==>更新机构商品品态:{}", acvOrgGoods);
int resultCount = acvOrgGoodsMapper.update(acvOrgGoods, stateUpdateWrapper);
log.info("==>更新机构商品:{},品态完成,修改条数:{}",goodsId,resultCount);
}
}
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
@Component
public class ApplicationContextProvider implements ApplicationContextAware {
private static ApplicationContext context;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
public static ApplicationContext getApplicationContext() {
if (context == null) {
throw new IllegalStateException("ApplicationContext not initialized yet.");
}
return context;
}
public static <T> T getBean(Class<T> beanClass) {
return context.getBean(beanClass);
}
public static Object getBean(String beanName) {
return context.getBean(beanName);
}
}
=====================生产环境出现了多个线程池,且线程池内存无法被GVM回收的情况
原因 (一)AsyncAcvGdsAdjustStateGoodsExecutor 在执行过程中抛出未捕获的异常,线程可能不会正常结束,从而阻止线程池关闭 (二)orgGdsAdjustState方法被多个线程同时调用,会导致同时创建多个线程池实例
想法初衷: 1、因为是被xxl-job调用,且是串行,所以默认以为线程池只会有一个 2、线程池代码缺陷,知识层面不够,代码缺陷
==========================优化方案===============================
使用单例线程池:创建一个全局的线程池实例,在应用程序启动时初始化,并在应用程序关闭时统一关闭。这样可以避免每次方法调用时创建和销毁线程池的开销。 正确处理异常:确保AsyncAcvGdsAdjustStateGoodsExecutor任务能够正确处理异常,并且在异常发生时能够优雅地结束。 恢复中断状态:在捕获InterruptedException后,调用Thread.currentThread().interrupt()来恢复中断状态。 增加超时时间:根据任务的实际执行时间,适当增加awaitTermination的超时时间,以确保大多数情况下线程池可以正常关闭
import java.util.concurrent.*;
public class ThreadPoolManager {
// 使用volatile关键字确保可见性和禁止指令重排
private volatile static ExecutorService executor;
private ThreadPoolManager() {
// 私有构造函数防止外部实例化
}
public static ExecutorService getExecutor() {
// 第一次检查,如果已经初始化则直接返回
if (executor == null) {
synchronized (ThreadPoolManager.class) {
// 第二次检查,确保只有一个线程能够进入这里初始化线程池
if (executor == null) {
executor = new ThreadPoolExecutor(
5, 10, 5000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
}
return executor;
}
}
// 如果使用Spring框架,可以将ThreadPoolManager注册为Bean
@Configuration
public class AppConfig {
@Bean(destroyMethod = "shutdown")
public ExecutorService threadPoolExecutor() {
return new ThreadPoolExecutor(
5, 10, 5000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}