网站首页 全球最实用的IT互联网站!

人工智能P2P分享Wind搜索发布信息网站地图标签大全

当前位置:诺佳网 > 软件工程 > 后端开发 > Java >

封装CompletionService的并发任务分发器(优化版)

时间:2025-06-09 11:31

人气:

作者:admin

标签:

导读:这个框架代码用了很长时间,使用场景也挺多,初衷是简化CompletionService的编程接口,尽量减少业务代码处的感知。 今天找deepseek做了一版优化,优化点: 整体的超时控制 超时、异常处...

这个框架代码用了很长时间,使用场景也挺多,初衷是简化CompletionService的编程接口,尽量减少业务代码处的感知。
今天找deepseek做了一版优化,优化点:

  • 整体的超时控制
  • 超时、异常处理和封装
  • 取消未完成的任务

核心代码

public class TaskDispatcher<T> {

    private final CompletionService<T> completionService;

    /**
     * 待处理任务
     */
    private final Set<Future<T>> pending = Sets.newHashSet();

    /**
     * 超时时间, 单位: s
     */
    private long timeout = 10000;

    public TaskDispatcher(Executor executor, long timeout) {
        completionService = new ExecutorCompletionService<>(executor);
        if (timeout > 0) {
            this.timeout = timeout;
        }
    }

    public void submit(Callable<T> task) {
        Future<T> future = completionService.submit(task);
        pending.add(future);
    }

    /**
     * 仅获取执行的任务结果
     *
     * @param ignoreException 忽略执行时发生的异常
     * @return
     */
    public List<T> taskCompletedResult(boolean ignoreException) {
        List<TaskResult<T>> taskResultList = taskCompleted();
        List<T> res = Lists.newArrayList();
        if (CollectionUtils.isEmpty(taskResultList)) {
            return res;
        }
        boolean hasError = false;
        for (TaskResult<T> taskResult : taskResultList) {
            if (!taskResult.isTimeout() && taskResult.getError() == null) {
                res.add(taskResult.getValue());
            } else if (taskResult.isTimeout() && !ignoreException) {
                LoggerUtils.error("执行任务时超时");
                hasError = true;
            } else if (taskResult.getError() != null && !ignoreException) {
                LoggerUtils.error("执行任务时发生异常", taskResult.getError());
                hasError = true;
            }
        }
        if (hasError) {
            throw new ZHException("任务并发处理时发生异常");
        }
        return res;
    }

    /**
     * 获取执行的任务
     *
     * @return
     */
    public List<TaskResult<T>> taskCompleted() {
        long deadline = System.currentTimeMillis() + timeout;
        List<TaskResult<T>> results = Lists.newArrayList();
        int totalTasks = pending.size();

        try {
            for (int i = 0; i < totalTasks; i++) {
                long remaining = Math.max(0, deadline - System.currentTimeMillis());
                Future<T> future = completionService.poll(remaining, TimeUnit.MILLISECONDS);
                TaskResult<T> result = new TaskResult<>();
                if (future == null) {
                    result.setTimeout(true);
                } else {
                    pending.remove(future);
                    try {
                        result.setValue(future.get());
                    } catch (ExecutionException e) {
                        result.setError(e.getCause());
                    }
                }
                results.add(result);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("任务结果收集中断", e);
        } finally {
            pending.forEach(f -> f.cancel(true));
            pending.clear();
        }
        return results;
    }

    @Data
    static class TaskResult<T> {
        private T value;
        private Throwable error;
        private boolean isTimeout;
    }
}

需要自己声明线程池bean,使用方式如下

        TaskDispatcher<Integer> taskDispatcher = new TaskDispatcher<Integer>(threadExecutor, TIME_OUT);
        for (long index: indexList) {
            taskDispatcher.submit(() -> xxxService.count(index));
        }

为了便于在计数求和场景使用,进一步实现了一个子类

public class IntSumTaskDispatcher extends TaskDispatcher<Integer> {
    public IntSumTaskDispatcher(Executor executor, long timeout, boolean throwException) {
        super(executor, timeout);
    }

    /**
     * 对所有结果求和
     *
     * @return
     */
    public int takeCompletedSum() {
        List<Integer> countResList = taskCompletedResult(true);
        int count = 0;
        for (Integer countSingle : countResList) {
            if (countSingle == null) {
                continue;
            }
            count += countSingle;
        }
        return count;
    }
}


作者:五岳
出处:http://www.cnblogs.com/wuyuegb2312
对于标题未标注为“转载”的文章均为原创,其版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

温馨提示:以上内容整理于网络,仅供参考,如果对您有帮助,留下您的阅读感言吧!
相关阅读
本类排行
相关标签
本类推荐

CPU | 内存 | 硬盘 | 显卡 | 显示器 | 主板 | 电源 | 键鼠 | 网站地图

Copyright © 2025-2035 诺佳网 版权所有 备案号:赣ICP备2025066733号
本站资料均来源互联网收集整理,作品版权归作者所有,如果侵犯了您的版权,请跟我们联系。

关注微信