作者: whooyun发表于: 2023-04-11 17:44
/** * 当需要调整品态的SKU比较多时,使用线程池提升处理效率 * * @param acvGdsAdjustStateList */ private void orgGdsAdjustState(List<AcvGdsAdjustState> acvGdsAdjustStateList) { if (!CollectionUtils.isEmpty(acvGdsAdjustStateList)) { ExecutorService executor = new ThreadPoolExecutor(5, 10, 5000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy()); log.info("品态调整,线程池创建成功:{}", executor); acvGdsAdjustStateList.forEach(item -> { AsyncAcvGdsAdjustStateGoodsTask task = new AsyncAcvGdsAdjustStateGoodsTask(); task.setAcvGdsAdjustState(item); executor.execute(task); }); executor.shutdown(); try { if (!executor.awaitTermination(10000, TimeUnit.MILLISECONDS)) { // 超时后强制关闭所有线程 executor.shutdownNow(); log.info("==>品态调整,线程池已关闭"); } } catch (InterruptedException e) { log.info("==>品态调整,线程池关闭异常:{}", e.getMessage()); e.printStackTrace(); } } }
==============================================
//创建的对象无法被自动注入,只能通过spring上线文去拿 @Slf4j public class AsyncAcvGdsAdjustStateGoodsExecutor implements Runnable{ //处理state的原子性和可见性问题 private final AtomicReference<AcvGdsAdjustState> stateRef = new AtomicReference<>();; public void setAcvGdsAdjustState(AcvGdsAdjustState state) { stateRef.set(state); } //业务无线程安全需求,所以run内的方法无需处理 @Override public void run() { AcvGdsAdjustStateMapper acvGdsAdjustStateMapper= ApplicationContextProvider.getBeanByClass(AcvGdsAdjustStateMapper.class); AcvGdsAdjustStateGoodsMapper acvGdsAdjustStateGoodsMapper= ApplicationContextProvider.getBeanByClass(AcvGdsAdjustStateGoodsMapper.class); AcvOrgGroupMapper acvOrgGroupMapper= ApplicationContextProvider.getBeanByClass(AcvOrgGroupMapper.class); //log.debug("==> 需要调整品态的机构单据:{}", item); AcvGdsAdjustState item = stateRef.get(); log.info("当前线程:"+Thread.currentThread().getName()+",拿到的参数:"+item); log.info("---------------------------------"); QueryWrapper orgGroupQuery = new QueryWrapper(); orgGroupQuery.eq("biz_uid", item.getBizUid()); orgGroupQuery.eq("corp_id", item.getCorpId()); List<AcvOrgGroup> acvOrgGroupList = acvOrgGroupMapper.selectList(orgGroupQuery); log.info("需要更新的机构数量:{}",acvOrgGroupList); List<Integer> orgIdList = new ArrayList<>(); acvOrgGroupList.forEach(i -> { orgIdList.add(i.getGroupOrgId()); }); QueryWrapper stateGoodsQuery = new QueryWrapper(); stateGoodsQuery.eq("bill_id", item.getId()); stateGoodsQuery.eq("corp_id", item.getCorpId()); List<AcvGdsAdjustStateGoods> stateGoodsList = acvGdsAdjustStateGoodsMapper.selectList(stateGoodsQuery); log.info("需要更新的商品数量获取完成"); stateGoodsList.forEach(i -> { this.updateOrgGoodsState(Long.valueOf(i.getNewStatusDic()), i.getGoodsId(), orgIdList); }); log.info("更新商品状态完成"); AcvGdsAdjustState acvGdsAdjustState = new AcvGdsAdjustState(); acvGdsAdjustState.setId(item.getId()); ////状态,代码类型:1,代码值:1-草稿,2-审核中,3-待执行,4-已生效,5-已作废,6-已终止 acvGdsAdjustState.setBillStatusDic(CommonConstant.MATH_BYTE_4); acvGdsAdjustState.setModifiedDtm(LocalDateTime.now()); UpdateWrapper updateWrapper= new UpdateWrapper(); updateWrapper.eq("id", acvGdsAdjustState.getId()); updateWrapper.eq("corp_id", item.getCorpId()); acvGdsAdjustStateMapper.update(acvGdsAdjustState, updateWrapper); log.info("==>品态调整单:{},处理完毕", acvGdsAdjustState); log.info("当前线程结束:"+Thread.currentThread().getName()+",拿到的参数:"+item); } /** * 修改机构商品状态 * * @param stateId * @param goodsId * @param idList */ private void updateOrgGoodsState(final long stateId, final long goodsId, final List idList) { AcvGdsStateMapper acvGdsStateMapper= ApplicationContextProvider.getBeanByClass(AcvGdsStateMapper.class); AcvOrgGoodsMapper acvOrgGoodsMapper= ApplicationContextProvider.getBeanByClass(AcvOrgGoodsMapper.class); AcvGdsState state = acvGdsStateMapper.selectById(stateId); log.info("==>获取品态业务数据:{}", state); UpdateWrapper stateUpdateWrapper = new UpdateWrapper(); /* stateUpdateWrapper.set("state_no", state.getStateNo()); stateUpdateWrapper.set("life_dic", state.getLifeDic()); stateUpdateWrapper.set("modified_dtm",LocalDateTime.now());*/ stateUpdateWrapper.eq("goods_id", goodsId); stateUpdateWrapper.eq("is_deleted", CommonConstant.MATH_1); stateUpdateWrapper.in("goods_org_id", idList); stateUpdateWrapper.eq("corp_id", state.getCorpId()); AcvOrgGoods acvOrgGoods = new AcvOrgGoods(); acvOrgGoods.setStateNo(state.getStateNo()); acvOrgGoods.setLifeDic(state.getLifeDic().byteValue()); acvOrgGoods.setModifiedDtm(LocalDateTime.now()); log.info("==>更新机构商品品态:{}", acvOrgGoods); int resultCount = acvOrgGoodsMapper.update(acvOrgGoods, stateUpdateWrapper); log.info("==>更新机构商品:{},品态完成,修改条数:{}",goodsId,resultCount); } }
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @Component public class ApplicationContextProvider implements ApplicationContextAware { private static ApplicationContext context; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { context = applicationContext; } public static ApplicationContext getApplicationContext() { if (context == null) { throw new IllegalStateException("ApplicationContext not initialized yet."); } return context; } public static <T> T getBean(Class<T> beanClass) { return context.getBean(beanClass); } public static Object getBean(String beanName) { return context.getBean(beanName); } }
=====================生产环境出现了多个线程池,且线程池内存无法被GVM回收的情况
原因 (一)AsyncAcvGdsAdjustStateGoodsExecutor 在执行过程中抛出未捕获的异常,线程可能不会正常结束,从而阻止线程池关闭 (二)orgGdsAdjustState方法被多个线程同时调用,会导致同时创建多个线程池实例
想法初衷: 1、因为是被xxl-job调用,且是串行,所以默认以为线程池只会有一个 2、线程池代码缺陷,知识层面不够,代码缺陷
==========================优化方案===============================
使用单例线程池:创建一个全局的线程池实例,在应用程序启动时初始化,并在应用程序关闭时统一关闭。这样可以避免每次方法调用时创建和销毁线程池的开销。 正确处理异常:确保AsyncAcvGdsAdjustStateGoodsExecutor任务能够正确处理异常,并且在异常发生时能够优雅地结束。 恢复中断状态:在捕获InterruptedException后,调用Thread.currentThread().interrupt()来恢复中断状态。 增加超时时间:根据任务的实际执行时间,适当增加awaitTermination的超时时间,以确保大多数情况下线程池可以正常关闭
import java.util.concurrent.*; public class ThreadPoolManager { // 使用volatile关键字确保可见性和禁止指令重排 private volatile static ExecutorService executor; private ThreadPoolManager() { // 私有构造函数防止外部实例化 } public static ExecutorService getExecutor() { // 第一次检查,如果已经初始化则直接返回 if (executor == null) { synchronized (ThreadPoolManager.class) { // 第二次检查,确保只有一个线程能够进入这里初始化线程池 if (executor == null) { executor = new ThreadPoolExecutor( 5, 10, 5000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy() ); } } } return executor; } } // 如果使用Spring框架,可以将ThreadPoolManager注册为Bean @Configuration public class AppConfig { @Bean(destroyMethod = "shutdown") public ExecutorService threadPoolExecutor() { return new ThreadPoolExecutor( 5, 10, 5000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy() ); } }