1 Star 0 Fork 165

ElonChung / Java-Review

forked from flatfish / Java-Review 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
Java-多线程-线程池.md 11.86 KB
一键复制 编辑 原始数据 按行查看 历史
icanci 提交于 2020-09-07 23:09 . :fire:更新文件夹

Java - 多线程 - 线程池

在上一篇文章 [Java - 多线程详解] 我们学习了 Java操作多线程的基本操作,本篇来解析一下Java中的线程池。

线程池:三大方法、七大参数、4种拒绝策略

程序运行的本质:就是占用系统资源!为了优化资源的使用,就演变出来一种策略,就是池化技术!

如:线程池、对象池、数据库连接池、内存池等

线程池的好处

  • 降低资源的消耗
  • 提高响应的速度
  • 方便管理

线程池可以复用、可以控制最大并发数、管理线程

线程池的四大方法

这三大方法是JDK提供的,但是《阿里巴巴Java开发手册》不允许使用,因为我们需要根据具体的场景创建具体的线程池。

单个线程的线程池

ExecutorService service = Executors.newSingleThreadExecutor();

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 单个线程
        ExecutorService service = Executors.newSingleThreadExecutor();
        try {
            for (int i = 0; i < 100; i++) {
                service.execute(() -> {
                    System.out.println("ThreadPoolDemo.main" + Thread.currentThread().getName());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            service.shutdown();
        }
    }
}
固定线程池的大小

ExecutorService service = Executors.newFixedThreadPool(int nums);

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 指定线程大小
        ExecutorService service = Executors.newFixedThreadPool(5);
        try {
            for (int i = 0; i < 100; i++) {
                service.execute(() -> {
                    System.out.println("ThreadPoolDemo.main" + Thread.currentThread().getName());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            service.shutdown();
        }
    }
}
弹性线程池 大小可以变化

ExecutorService service = Executors.newCachedThreadPool();

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 可以伸缩大小的线程池 最大线程为 Integer.MAX_VALUE
        ExecutorService service = Executors.newCachedThreadPool();
        try {
            for (int i = 0; i < 100; i++) {
                service.execute(() -> {
                    System.out.println("ThreadPoolDemo.main" + Thread.currentThread().getName());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            service.shutdown();
        }
    }
}
定时任务
public class SchPoolDemo {
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
        // 参数 任务线程 延时多少时间执行 执行之后多少次执行一次 时间单位是什么
        service.scheduleWithFixedDelay(() -> {
            System.out.println(Thread.currentThread().getName());
        }, 5, 1, TimeUnit.SECONDS);
    }
}

上述的四种在开发的时候都不适用

七大参数

上述三种线程池的源码:

// 单一线程
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

// 固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

// 可以伸缩的线程池
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

// 所有的本质

// ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

// ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize:核心线程池的大小
  • maximumPoolSize:最大线程池的大小
  • keepAliveTime:超时时间,超过多久就会释放
  • unit:超时时间的单位
  • workQueue:阻塞队列
  • threadFactory:线程工厂
  • handler:拒绝策略、

手写一个线程池

public class MyThreadPool {
    public static void main(String[] args) {
        // 获取本机的线程数量
        int maxCore = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor service = new ThreadPoolExecutor(
            4,
            maxCore,
            5,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(),
            new ThreadPoolExecutor.AbortPolicy()
        );
        try {
            for (int i = 0; i < 100; i++) {
                service.execute(() -> {
                    System.out.println("MyThreadPool.main " + Thread.currentThread().getName());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            service.shutdown();
        }
    }
}

四种拒绝策略

1596440458523

  • AbrotPolicy
    • 满了,还有进来的,不做处理,就抛出异常
  • CallerRunsPolicy
    • 从哪儿来就回哪儿去
  • DiscardOldestPolicy
    • 队列满了,就和最新进来的进行竞争,竞争失败就没了,成功了就上位
  • DiscardPolicy
    • 队列满了,就丢掉任务,不抛出异常

池的最大线程如何定义

  • CPU密集型 几核 就定义为几

  • CPU密集型:

    **定义:**CPU密集型也是指计算密集型,大部分时间用来做计算逻辑判断等CPU动作的程序称为CPU密集型任务。该类型的任务需要进行大量的计算,主要消耗CPU资源。 这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

    特点:

    01:CPU 使用率较高(也就是经常计算一些复杂的运算,逻辑处理等情况)非常多的情况下使用

    02:针对单台机器,最大线程数一般只需要设置为CPU核心数的线程个数就可以了

    03:这一类型多出现在开发中的一些业务复杂计算和逻辑处理过程中。

  • // 获得 CPU 的核心数
    int num = Runtime.getRuntime().availableProcessors();
  • IO密集型

  • **定义:**IO密集型任务指任务需要执行大量的IO操作,涉及到网络、磁盘IO操作,对CPU消耗较少,其消耗的主要资源为IO。

    我们所接触到的 IO ,大致可以分成两种:磁盘 IO和网络 IO。

    01:磁盘 IO ,大多都是一些针对磁盘的读写操作,最常见的就是文件的读写,假如你的数据库、 Redis 也是在本地的话,那么这个也属于磁盘 IO。

    02:网络 IO ,这个应该是大家更加熟悉的,我们会遇到各种网络请求,比如 http 请求、远程数据库读写、远程 Redis 读写等等。

    IO 操作的特点就是需要等待,我们请求一些数据,由对方将数据写入缓冲区,在这段时间中,需要读取数据的线程根本无事可做,因此可以把 CPU 时间片让出去,直到缓冲区写满。

    既然这样,IO 密集型任务其实就有很大的优化空间了(毕竟存在等待):

    CPU 使用率较低,程序中会存在大量的 I/O 操作占用时间,导致线程空余时间很多,所以通常就需要开CPU核心数两倍的线程。当线程进行 I/O 操作 CPU 空闲时,线程等待时间所占比例越高,就需要越多线程,启用其他线程继续使用 CPU,以此提高 CPU 的使用率;线程 CPU 时间所占比例越高,需要越少的线程,这一类型在开发中主要出现在一些计算业务频繁的逻辑中。

1:高并发、任务执行时间短的业务,线程池线程数可以设置为CPU核数+1,减少线程上下文的切换

2:并发不高、任务执行时间长的业务这就需要区分开看了:

  a)假如是业务时间长集中在IO操作上,也就是IO密集型的任务,因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以适当加大线程池中的线程数目,让CPU处理更多的业务

  b)假如是业务时间长集中在计算操作上,也就是计算密集型任务,这个就没办法了,线程池中的线程数设置得少一些,减少线程上下文的切换

(其实从一二可以看出无论并发高不高,对于业务中是否是cpu密集还是I/O密集的判断都是需要的当前前提是你需要优化性能的前提下)

3:并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,我们的项目使用的时redis作为缓存(这类非关系型数据库还是挺好的)。增加服务器是第二步(一般政府项目的首先,因为不用对项目技术做大改动,求一个稳,但前提是资金充足),至于线程池的设置,设置参考 2 。最后,业务执行时间长的问题,也可能需要分析一下,看看能不能使用中间件(任务时间过长的可以考虑拆分逻辑放入队列等操作)对任务进行拆分和解耦。

总结

  01:一个计算为主的程序(CPU密集型程序),多线程跑的时候,可以充分利用起所有的 CPU 核心数,比如说 8 个核心的CPU ,开8 个线程的时候,可以同时跑 8 个线程的运算任务,此时是最大效率。但是如果线程远远超出 CPU 核心数量,反而会使得任务效率下降,因为频繁的切换线程也是要消耗时间的。因此对于 CPU 密集型的任务来说,线程数等于 CPU 数是最好的了。

  02:如果是一个磁盘或网络为主的程序(IO密集型程序),一个线程处在 IO 等待的时候,另一个线程还可以在 CPU 里面跑,有时候 CPU 闲着没事干,所有的线程都在等着 IO,这时候他们就是同时的了,而单线程的话此时还是在一个一个等待的。我们都知道 IO 的速度比起 CPU 来是很慢的。此时线程数等于CPU核心数的两倍是最佳的。

以上就是JAVA 自定义线程池的最大线程数设置方法的详细内容,更多关于JAVA 自定义线程池的资料请关注脚本之家其它相关文章!

1
https://gitee.com/elonchung/Java-Review.git
git@gitee.com:elonchung/Java-Review.git
elonchung
Java-Review
Java-Review
master

搜索帮助