时间:2025-10-02 21:52
人气:
作者:admin
可以讲在拼团中怎么用的。
为什么不直接填满到maximyumPoolSize:
- 内存占用大、上下文切换频繁消耗CPU、同步开销大。
- 动态扩展的方案只在高峰期临时创建更多线程,是一种“按需分配”的原则:先用少量核心线程处理常规负载,队列缓冲短期突发请求,实在处理不过来才会额外创建线程,空闲时回收多余线程。是为了在响应速度和资源效率之间取得最佳平衡。
- ArrayBlockingQueue:有界队列是一个用数组实现的有界阻塞队列,按FIFO排序量。
- LinkedBlockingQueue:基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话就是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQueue;newFixedThreadPool线程池用了这个队列。
- DelayQueue:延迟队列是一个任务定时周期的执行的队列。单纯的DealyDueue不保证相同延迟任务的FIFO,但ScheduledThreadPoolExecutor通过添加序列号的方式,确保了FIFO顺序。(任务1还有2秒执行,任务2还有3秒执行,那么任务1排在任务2前面)
getDelay()返回距离触发还有多久- 当
getDelay() <= 0时,任务才可被取出- 队列按触发时间排序,最早该执行的任务在队首
- priorityBlockingQueue:是具有优先级的无界阻塞队列。
- SynchronousQueue:是一个不存储元素的阻塞队列,每个插入操作必须等另一个线程调用移除操作,否则插入操作一直处于阻塞,吞吐量通常高于 LinkedBlockingQuene(因为是生产者直接交给消费者,无缓冲,适合突发性、短时任务),newCachedThreadPool线程池使⽤了这个队列。
// 1. 默认工厂(抽象但可用)
ThreadFactory defaultFactory = Executors.defaultThreadFactory();
// 2. 自定义工厂 - 可以完全控制线程创建过程
ThreadFactory customFactory = new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("my-pool-worker-" + threadNumber.getAndIncrement());
t.setDaemon(false); // 设置为非守护线程
t.setPriority(Thread.NORM_PRIORITY); // 设置优先级
return t;
}
};
// 3. 在线程池中使用
ThreadPoolExecutor pool = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
customFactory, // 这里传入自定义工厂
new ThreadPoolExecutor.CallerRunsPolicy()
);
- AbortPolicy:直接抛出异常,默认使用此策略
- CallerFunsPolicy:用调用者所在的线程执行任务
- DIscartOldestPolicy:丢弃阻塞队列里最老的任务,也就是队列里最靠前的任务
- DiscardPolicy:当前任务直接丢弃
- 自定义:实现RejectExecutionHandler接口即可。
用法:void execute(Runnable command)
特点:
只能提交 Runnable 任务;
没有返回值;
如果任务抛出异常,异常会直接抛到线程中,最终由线程的 UncaughtExceptionHandler 或 afterExecute 捕获
(1) execute() 用法:void execute(Runnable command) 特点: 只能提交 Runnable 任务; 没有返回值; 如果任务抛出异常,异常会直接抛到线程中,最终由线程的 UncaughtExceptionHandler 或 afterExecute 捕获。
用法:
Future<?> submit(Runnable task)
<T> Future<T> submit(Callable<T> task)
特点:
可以提交 Runnable 或 Callable 任务;
返回 Future 对象,可以获取结果、取消任务、捕获异常;
如果任务抛出异常,异常会被封装在 Future 中,只有调用 get() 时才会抛出 ExecutionException。
(2) submit() 用法: Future<?> submit(Runnable task) <T> Future<T> submit(Callable<T> task) 特点: 可以提交 Runnable 或 Callable 任务; 返回 Future 对象,可以获取结果、取消任务、捕获异常; 如果任务抛出异常,异常会被封装在 Future 中,只有调用 get() 时才会抛出 ExecutionException。
(1) shutdown()
行为:停止接收新任务;已提交的任务(正在执行的 + 队列中等待的)会执行完。
特点:平滑关闭,常用方式。
注意:调用后线程池状态会变为 SHUTDOWN,不会立刻终止。
(2) shutdownNow()
行为:停止接收新任务,清空队列中的等待任务,并尝试中断正在执行的任务。
返回值:未执行的任务列表。
特点:强制关闭,风险较大(可能造成数据不一致)。
注意:线程是否真正中断,取决于任务代码是否响应中断。
(3) awaitTermination(timeout, unit)
用法:
pool.shutdown(); if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); }
作用:在调用 shutdown() 后等待一段时间,如果线程池没有在指定时间内终止,再调用 shutdownNow() 强制关闭。
特点:常用于“优雅关闭”+“兜底强制关闭”组合。
(4) isShutdown() / isTerminated()
isShutdown():调用 shutdown() 或 shutdownNow() 后返回 true。
isTerminated():所有任务执行结束,线程池完全终止时返回 true。
可配合轮询检测线程池状态,做收尾逻辑。
(1) 计算密集型任务
特点:大部分时间消耗在 CPU 运算上。
配置:CPU核数 + 1。
理由:核心数保证最大并行度,额外的 +1 用来应对偶尔的线程挂起(页缺失、GC、上下文切换)。
(2) IO 密集型任务
特点:大部分时间等待 IO(数据库、网络、磁盘等)。
配置:CPU核数 * 2(甚至更高,取决于 IO 等待比例)。
理由:IO 期间线程空闲,更多线程可提升 CPU 利用率。
(3) 混合型任务
特点:既包含计算,又包含 IO。
配置方法:
比例分解法:根据任务中 IO 和计算占比,估算最优线程数。
分池法:将任务拆分为计算型和 IO 型,分别放入不同线程池,避免资源竞争。
(4) 经验公式(IO 密集型可参考)
线程数=CPU核心数×(1+IO时间计算时间)线程数 = CPU核心数 \times (1 + \frac{IO时间}{计算时间})线程数=CPU核心数×(1+计算时间IO时间)
如果 IO 占比高,线程数可比 CPU 数大得多。
(1) newFixedThreadPool
核心线程数 = 最大线程数 = 固定值。
keepAliveTime = 0,线程不会被回收。
阻塞队列 = LinkedBlockingQueue(无界队列)。
特点:线程数固定,任务多了就排队。
适用场景:适合 CPU 密集型任务,线程数可设置为 CPU 核心数 或略大。
(2) newCachedThreadPool
核心线程数 = 0;最大线程数 = Integer.MAX_VALUE(相当于无限大)。
keepAliveTime = 60s,空闲线程超过时间会回收。
阻塞队列 = SynchronousQueue(没有容量,任务直接交给线程执行)。
特点:任务多就创建新线程,空闲一段时间自动回收。
适用场景:适合 大量并发的短期小任务,但要注意可能导致线程数过多,风险较大。
(3) newSingleThreadExecutor
核心线程数 = 最大线程数 = 1。
keepAliveTime = 0。
阻塞队列 = LinkedBlockingQueue(无界队列)。
特点:单线程串行执行,保证任务按照提交顺序(FIFO)执行。
适用场景:适合 需要顺序执行任务、或全局只需要一个后台线程的场景。
(4) newScheduledThreadPool
核心线程数 = 指定值;最大线程数 = Integer.MAX_VALUE。
keepAliveTime = 0。
阻塞队列 = DelayQueue。
特点:支持定时和周期性任务调度。
适用场景:适合定时执行任务(如心跳检测、周期任务)。
(5) newWorkStealingPool(JDK 1.8+ 新增)
使用 ForkJoinPool 实现。
核心线程数 = CPU 核心数(Runtime.getRuntime().availableProcessors())。
特点:基于“工作窃取算法”,空闲线程可以从其他线程队列里偷任务执行,负载均衡。
适用场景:适合 大批量、小任务并行计算(如分治计算)。
在使用线程池时,经常会遇到这样的情况:线程池本身没问题,但任务(Runnable/Callable)运行过程中抛出了异常。
如果没有额外处理,这些异常可能会被“吞掉”,导致我们难以及时发现问题。
本文总结了常见的几种线程池任务异常处理方式。
pool.execute(() -> { try { int a = 1 / 0; } catch (Exception e) { System.err.println("任务异常: " + e.getMessage()); } });
优点:简单直观,能捕获异常。
缺点:需要每个任务都写 try/catch,容易遗漏,不够统一。
submit() + Future.get()Future<?> future = pool.submit(() -> { int a = 1 / 0; }); try { future.get(); // 会抛 ExecutionException } catch (Exception e) { System.err.println("捕获到异常: " + e.getCause()); }
特点:
使用 submit() 提交任务会返回 Future;
如果调用 get(),会抛出 ExecutionException,异常可以被捕获;
但如果 不调用 get(),异常就会被“吞掉”。
适合需要拿任务返回值的场景。
UncaughtExceptionHandlerThreadFactory factory = r -> { Thread t = new Thread(r); t.setUncaughtExceptionHandler((thread, e) -> System.err.println(thread.getName() + " 出错了: " + e)); return t; }; ExecutorService pool = new ThreadPoolExecutor( 2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory ); pool.execute(() -> { int a = 1 / 0; }); //输出示例: pool-1-thread-1 出错了: java.lang.ArithmeticException: / by zero
特点:
给线程绑定一个全局的异常处理器。
任务里不用写 try/catch,异常会被统一捕获和处理。
适用于 execute() 提交的任务。
但对 submit() 提交的异常无效(因为被封装到 Future 里)。
afterExecute()ExecutorService pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) { @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t != null) { System.err.println("execute 出错: " + t); } if (r instanceof Future<?>) { try { ((Future<?>) r).get(); } catch (Exception e) { System.err.println("submit 出错: " + e.getCause()); } } } }; // execute 提交的异常 pool.execute(() -> { int a = 1 / 0; }); // submit 提交的异常 pool.submit(() -> { return 1 / 0; }); //输出 execute 出错: java.lang.ArithmeticException: / by zero submit 出错: java.lang.ArithmeticException: / by zero
特点:
t 参数能捕获 execute() 的异常;Future.get() 能捕获 submit() 的异常;execute 和 submit 的异常;UncaughtExceptionHandler 更全面。(1) RUNNING
默认初始状态,可以接受新任务,也可以处理阻塞队列中的任务。
调用 shutdown() → 转为 SHUTDOWN;调用 shutdownNow() → 转为 STOP。
(2) SHUTDOWN
不再接受新任务,但会继续处理阻塞队列中的任务和正在执行的任务。
当队列为空且活动线程数为 0 → 转为 TIDYING。
(3) STOP
不再接受新任务,也不处理阻塞队列中的任务。
会中断正在运行的任务(通过调用线程的 interrupt())。
一般由 shutdownNow() 触发。
(4) TIDYING
所有任务都结束,活动线程数为 0。
会执行 terminated() 钩子方法,通常用于清理资源。
是进入终止前的过渡状态。
(5) TERMINATED
线程池彻底终止,生命周期完全结束。
┌───────────┐ shutdown() ┌───────────┐
│ RUNNING │ ──────────────▶ │ SHUTDOWN │
└─────┬─────┘ └─────┬─────┘
shutdownNow()│ │
▼ ▼
┌───────┐ 任务清空 ┌───────────┐
│ STOP │ ─────────────────▶ │ TIDYING │
└───────┘ └─────┬─────┘
│ terminated()
▼
┌───────────┐
│TERMINATED │
└───────────┘
(1) ThreadPoolExecutor 提供的 setter 方法
setCorePoolSize(int corePoolSize)
setMaximumPoolSize(int maximumPoolSize)
setKeepAliveTime(long time, TimeUnit unit)
setThreadFactory(ThreadFactory threadFactory)
setRejectedExecutionHandler(RejectedExecutionHandler handler)
???? 这些方法可在运行时动态修改参数,但线程池本身不会主动感知变化。
(2) 配置中心动态下发
使用 Nacos / Apollo / Spring Cloud Config 等配置中心。
配置变更后应用自动收到通知,调用 setXxx() 方法完成热更新。
适合分布式环境,可全局统一调整。
(3) 自定义监听方案
没有配置中心时,可以定时读取数据库/文件/Redis 配置。
如果参数发生变化,就调用 setXxx() 更新。
简单易实现,但有一定延迟。
(4) 结合监控系统
可通过队列长度、任务堆积、CPU 使用率等指标触发动态调整。
常见实践:监控 + 自动调节,或人工结合监控后台改配置。
(1) 阻塞队列持久化
默认的内存队列断电即丢失,可替换为消息队列(Kafka、RocketMQ、RabbitMQ)。
确保任务在宕机后依然存在,重启可继续消费。
(2) 正在处理任务的事务控制
对数据库操作使用事务,保证原子性。
避免“库存减了但订单没落库”这类不一致。
(3) 断电任务的回滚与日志恢复
在任务执行前记录操作日志,异常中断后可根据日志恢复。
常见于支付、退款等高一致性业务。
(4) 服务器重启后的任务恢复
将待执行任务元信息存入数据库/文件,重启时加载并重新提交线程池。
常用于定时清算、对账任务。
Java7 引入的并行计算框架,位于 java.util.concurrent 包。
设计目标:充分利用多核 CPU,支持大任务拆分(Fork)+ 小任务合并(Join)。
基础类:
ForkJoinPool → 线程池,负责任务调度;
ForkJoinTask → 抽象任务类,常用子类:
RecursiveTask<V>:有返回值;
RecursiveAction:无返回值。
分而治之:将大任务拆分成小任务。
并行执行:小任务由 ForkJoinPool 中的工作线程执行。
结果合并:子任务完成后通过 join() 返回结果,逐级汇总。
工作窃取 (Work Stealing):
每个工作线程维护一个双端队列;
当某个线程空闲,会从别的线程队列“偷”任务执行,避免线程闲置。
求和 1+2+...+100,设置阈值 THRESHOLD=16:
public class CountTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 16; private int start, end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THRESHOLD; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { int middle = (start + end) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); leftTask.fork(); // 提交左任务 int rightResult = rightTask.compute(); // 当前线程直接算右任务 int leftResult = leftTask.join(); // 等待左任务结果 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) throws Exception { ForkJoinPool pool = new ForkJoinPool(); CountTask task = new CountTask(1, 100); Future<Integer> result = pool.submit(task); System.out.println("结果: " + result.get()); } }
输出:结果: 5050
不要无限制拆分任务
设置合理的 THRESHOLD,否则递归太深,性能反而差。
fork/join 的调用顺序
推荐:leftTask.fork(); rightTask.compute(); leftTask.join();
避免所有子任务都 fork,导致线程间竞争严重。
异常处理
子任务异常会传播到 join() 时抛出,要注意捕获。
线程数
默认 = CPU 核心数,通常够用;也可指定 new ForkJoinPool(n)。
和普通线程池区别
普通线程池 → 任务彼此独立,适合 IO 密集;
Fork/Join → 大任务拆小任务,适合 CPU 密集型计算。
Fork/Join 是 并行递归计算的利器,适合大规模、可拆分的任务(如矩阵计算、归并排序、递归求和)。
核心思想:任务拆分 + 线程池执行 + 结果合并 + 工作窃取负载均衡。
编写任务时要重点关注:
合理拆分任务;
避免过多的小任务;
注意异常处理和结果合并逻辑。
[1] 沉默王二公众号
Hutool 的 `TimedCache` 到期会自动清理吗?