复制代码

为懒人提供无限可能,生命不息,code不止

人类感性的情绪,让我们知难行难
我思故我在
日拱一卒,功不唐捐
  • 首页
  • 前端
  • 后台
  • 数据库
  • 运维
  • 资源下载
  • 实用工具
  • 接口文档工具
  • 登录
  • 注册

性能优化

【原创】多个从库并发查询优化查询性能(虚拟线程并发)

作者: whooyun发表于: 2025-07-08 18:44

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Service
@RequiredArgsConstructor
public class OrderService {
    private final OrderMapper orderMapper;
    
    // 使用Java 21虚拟线程池
    private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

    public List<Order> batchConcurrentQuery(List<String> orderNumberList) {
        // 1. 拆分订单号列表(每50个一组)
        List<List<String>> chunks = partitionList(orderNumberList, 50);
        
        List<Order> allResults = new ArrayList<>();
        int batchSize = 3; // 每批并发数
        
        for (int i = 0; i < chunks.size(); i += batchSize) {
            // 2. 获取当前批次(最多3个查询)
            int end = Math.min(i + batchSize, chunks.size());
            List<List<String>> currentBatch = chunks.subList(i, end);
            
            // 3. 并发执行当前批次的查询
            List<CompletableFuture<List<Order>>> futures = new ArrayList<>();
            for (List<String> chunk : currentBatch) {
                futures.add(CompletableFuture.supplyAsync(
                    () -> queryDatabase(chunk), executor
                ));
            }
            
            // 4. 等待当前批次所有查询完成
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
            
            // 5. 收集结果
            for (CompletableFuture<List<Order>> future : futures) {
                allResults.addAll(future.join());
            }
            
            // 6. 批次间隔处理(最后一组不等待)
            if (end < chunks.size()) {
                try {
                    TimeUnit.SECONDS.sleep(1); // 等待1秒
                    System.out.println("批次完成,等待1秒后继续...");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        
        return allResults;
    }

    // 数据库查询方法
    private List<Order> queryDatabase(List<String> orderNumbers) {
        System.out.println("执行查询,数量: " + orderNumbers.size() + 
                          " | 线程: " + Thread.currentThread());
        
        if (orderNumbers.isEmpty()) {
            return Collections.emptyList();
        }
        
        QueryWrapper<Order> queryWrapper = new QueryWrapper<>();
        queryWrapper.in("order_number", orderNumbers);
        return orderMapper.selectList(queryWrapper);
    }

    // 手动实现列表拆分(避免Guava依赖)
    private <T> List<List<T>> partitionList(List<T> list, int size) {
        List<List<T>> partitions = new ArrayList<>();
        for (int i = 0; i < list.size(); i += size) {
            partitions.add(list.subList(i, Math.min(i + size, list.size())));
        }
        return partitions;
    }
}
=======================
@Service
@RequiredArgsConstructor
public class OrderService {
    private final OrderMapper orderMapper;
    
    // 使用虚拟线程池(Java 21特性)
    private final ExecutorService virtualThreadExecutor = 
        Executors.newVirtualThreadPerTaskExecutor();

    public ListconcurrentQuery(ListorderNumberList) {
        // 1. 拆分订单号列表(每50个一组)
        Listchunks = Lists.partition(orderNumberList, 50);
        
        // 2. 创建信号量控制并发度(每秒3个查询)
        Semaphore rateLimiter = new Semaphore(3);
        
        // 3. 提交查询任务
        List