作者: whooyun发表于: 2025-07-08 18:44
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @Service @RequiredArgsConstructor public class OrderService { private final OrderMapper orderMapper; // 使用Java 21虚拟线程池 private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); public List<Order> batchConcurrentQuery(List<String> orderNumberList) { // 1. 拆分订单号列表(每50个一组) List<List<String>> chunks = partitionList(orderNumberList, 50); List<Order> allResults = new ArrayList<>(); int batchSize = 3; // 每批并发数 for (int i = 0; i < chunks.size(); i += batchSize) { // 2. 获取当前批次(最多3个查询) int end = Math.min(i + batchSize, chunks.size()); List<List<String>> currentBatch = chunks.subList(i, end); // 3. 并发执行当前批次的查询 List<CompletableFuture<List<Order>>> futures = new ArrayList<>(); for (List<String> chunk : currentBatch) { futures.add(CompletableFuture.supplyAsync( () -> queryDatabase(chunk), executor )); } // 4. 等待当前批次所有查询完成 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); // 5. 收集结果 for (CompletableFuture<List<Order>> future : futures) { allResults.addAll(future.join()); } // 6. 批次间隔处理(最后一组不等待) if (end < chunks.size()) { try { TimeUnit.SECONDS.sleep(1); // 等待1秒 System.out.println("批次完成,等待1秒后继续..."); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } return allResults; } // 数据库查询方法 private List<Order> queryDatabase(List<String> orderNumbers) { System.out.println("执行查询,数量: " + orderNumbers.size() + " | 线程: " + Thread.currentThread()); if (orderNumbers.isEmpty()) { return Collections.emptyList(); } QueryWrapper<Order> queryWrapper = new QueryWrapper<>(); queryWrapper.in("order_number", orderNumbers); return orderMapper.selectList(queryWrapper); } // 手动实现列表拆分(避免Guava依赖) private <T> List<List<T>> partitionList(List<T> list, int size) { List<List<T>> partitions = new ArrayList<>(); for (int i = 0; i < list.size(); i += size) { partitions.add(list.subList(i, Math.min(i + size, list.size()))); } return partitions; } }=======================
@Service @RequiredArgsConstructor public class OrderService { private final OrderMapper orderMapper; // 使用虚拟线程池(Java 21特性) private final ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor(); public ListconcurrentQuery(ListorderNumberList) { // 1. 拆分订单号列表(每50个一组) Listchunks = Lists.partition(orderNumberList, 50); // 2. 创建信号量控制并发度(每秒3个查询) Semaphore rateLimiter = new Semaphore(3); // 3. 提交查询任务 List