深入理解Java之线程池

老方法开头,看看线程池(引自百度百科)

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

上面说了线程池的定义,相信大家也清楚了,但是这个里面有个概念,大家一定要知道,知道后再聊“线程池”,分解下三个字变成两部分:

一、线程

大家应该都知道进程,学过系统的等等都理解,知道一个进程启动(如Java进程”java -jar *** “),但是什么是线程呢?

在操作系统中有这么个定义:

线程是指进程中的一个执行流程,一个进程中可以运行多个线程。比如java.exe进程中可以运行很多线程。线程总是属于某个进程,进程中的多个线程共享进程的内存。

也就是说进程中包含多个线程,而且可以并行干活。从下面图可以看到,runnable的线程是多个。

如Java里面的线程:
在Java中,“线程”指两件不同的事情:

  1. java.lang.Thread类的一个实例;
  2. 线程的执行。
使用java.lang.Thread类或者java.lang.Runnable接口编写代码来定义、实例化和启动新线程。 一个Thread类实例只是一个对象,像Java中的任何其他对象一样,具有变量和方法,生死于堆上。 Java中,每个线程都有一个调用栈,即使不在程序中创建任何新的线程,线程也在后台运行着。 一个Java应用总是从main()方法开始运行,mian()方法运行在一个线程内,它被称为主线程。 一旦创建一个新的线程,就产生一个新的调用栈。
线程总体分两类:用户线程和守候线程。

当所有用户线程执行完毕的时候,JVM自动关闭。但是守候线程却不独立于JVM,守候线程一般是由操作系统或者用户自己创建的。

看了这么多,带来了另一个问题,一个进程中多少线程合适呢?

看过一个文章:稍后补上吧,嘿嘿。

二、对象池(POOL)

单例模式是限制了一个类只能有一个实例,对象池模式则是限制一个类实例的个数。对象池类就像是一个对象管理员,它以Static列表(也就是装对象的池子)的形式存存储某个实例数受限的类的实例,每一个实例还要加一个标记,标记该实例是否被占用。当类初始化的时候,这个对象池就被初始化了,实例就被创建出来。然后,用户可以向这个类索取实例,如果池中所有的实例都已经被占用了,那么抛出异常。用户用完以后,还要把实例“还”回来,即释放占用。对象池类的成员应该都是静态的。用户也不应该能访问池子里装着的对象的构造函数,以防用户绕开对象池创建实例。书上说这个模式会用在数据库连接的管理上。比如,每个用户的连接数是有限的,这样每个连接就是一个池子里的一个对象,“连接池”类就可以控制连接数了。

对象池技术基本原理的核心有两点:缓存和共享,即对于那些被频繁使用的对象,在使用完后,不立即将它们释放,而是将它们缓存起来,以供后续的应用程序重复使用,从而减少创建对象和释放对象的次数,进而改善应用程序的性能。事实上,由于对象池技术将对象限制在一定的数量,也有效地减少了应用程序内存上的开销。

将用过的对象保存起来,等下一次需要这种对象的时候,再拿出来重复使用,从而在一定程度上减少频繁创建对象所造成的开销。 并非所有对象都适合拿来池化――因为维护对象池也要造成一定开销。对生成时开销不大的对象进行池化,反而可能会出现“维护对象池的开销”大于“生成新对象的开销”,从而使性能降低的情况。但是对于生成时开销可观的对象,池化技术就是提高性能的有效策略了。下面是构建对象池的一个例子:

相当简单写了个例子,说明对象池的。嘿嘿。

    /**
     * @author 犊子@duzhi.me
     * @email ashang.peng@aliyun.com
     * @date 二月 14, 2017
     */
    public  class ObjectUnit<T> {
        private Class<T> type;
        private List<T> items = new ArrayList<T>();
        private List<Boolean> checkedOut = new ArrayList<Boolean>();
        private int semaphore;

        public ObjectUnit(Class<T> type) {
            this.type = type;
        }

        public synchronized T addItem() {
            T obj;
            try {
                obj = type.newInstance();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            items.add(obj);
            checkedOut.add(false);
            return obj;
        }

        public synchronized T checkOut() {
            if (semaphore < items.size()) {
                semaphore++;
                return getItem();
            } else
                return addItem();
        }

        public synchronized void checkIn(T x) {
            if (releaseItem(x))
                semaphore--;
        }

        private synchronized T getItem() {
            for (int index = 0; index < checkedOut.size(); index++)
                if (!checkedOut.get(index)) {
                    checkedOut.set(index, true);
                    return items.get(index);
                }
            return null;
        }

        private synchronized boolean releaseItem(T item) {
            int index = items.indexOf(item);
            if (index == -1)
                return false; // Not in the list
            if (checkedOut.get(index)) {
                checkedOut.set(index, false);
                return true;
            }
            return false;
        }
    }

 简单的注释下,大家看的清楚点:

使用Map<Class<?>, ObjectUnit<?>>来保存当前对象池中类型目录,并把它设计为线程安全的ConcurrentHashMap。

这里的getObj方法和renObj方法不用加锁,因为它调用的对象单元类是线程安全的,并且Map是线程安全的。

此外,这里在处理泛型的时候,会有warning产生,因为之前定义Map中使用<?>,而后面的两个泛型方法指定<T>。还没有想到更好的解决办法。

/**
     * @author 犊子@duzhi.me
     * @email ashang.peng@aliyun.com
     * @date 二月 14, 2017
     */
    public  class Provider {
        private Map<Class<?>, ObjectUnit<?>> providers = new ConcurrentHashMap<Class<?>, ObjectUnit<?>>();
        private static Provider instance = new Provider();

        private Provider() {
        }

        public static Provider getInstance() {
            return instance;
        }

        @SuppressWarnings("unchecked")
        public <T> T getObj(Class<T> key) {
            ObjectUnit value = providers.get(key);
            if (value != null) {
                return (T) value.checkOut();
            } else {
                value = new ObjectUnit<T>(key);
                providers.put(key, value);
                return (T) value.addItem();
            }
        }

        @SuppressWarnings("unchecked")
        public <T> void renObj(T x) {
            if (providers.containsKey(x.getClass())) {
                ObjectUnit value = providers.get(x.getClass());
                value.checkIn(x);
            }
        }
    }

三 回到正题(线程池)

老方法,定义这种事情我不想写,还是来百科

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

经过上面说了,线程,对象池,线程池的定义,大家不难知道线程池的作用以及实现方案了,前段时间,我也正好在看tomcat 的源码,以及netty的异步处理机制,正好对这个做个总结,当然也主要针对Jdk提供的线程池做个总结,嘿嘿。

Executor

Executor 框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable 等。

Executor 接口中之定义了一个方法 execute(Runnable command),该方法接收一个 Runable 实例,它用来执行一个任务,任务即一个实现了 Runnable 接口的类。ExecutorService 接口继承自 Executor 接口,它提供了更丰富的实现多线程的方法,比如,ExecutorService 提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以调用 ExecutorService 的 shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致 ExecutorService 停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭 ExecutorService。因此我们一般用该接口来实现和管理多线程。

ExecutorService 的生命周期包括三种状态:运行、关闭、终止。创建后便进入运行状态,当调用了 shutdown()方法时,便进入关闭状态,此时意味着 ExecutorService 不再接受新的任务,但它还在执行已经提交了的任务,当素有已经提交了的任务执行完后,便到达终止状态。如果不调用 shutdown()方法,ExecutorService 会一直处在运行状态,不断接收新的任务,执行新的任务,服务器端一般不需要关闭它,保持一直运行即可。

public static ExecutorService newFixedThreadPool(int nThreads)
创建固定数目线程的线程池。

public static ExecutorService newCachedThreadPool()
创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线 程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

public static ExecutorService newSingleThreadExecutor()
创建一个单线程化的Executor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

下面分别看看这四个方法:


newCachedThreadPool()

  • 缓存型池子,先查看池中有没有以前建立的线程,如果有,就 reuse 如果没有,就建一个新的线程加入池中
  • 缓存型池子通常用于执行一些生存期很短的异步型任务 因此在一些面向连接的 daemon 型 SERVER 中用得不多。但对于生存期短的异步任务,它是 Executor 的首选。
  • 能 reuse 的线程,必须是 timeout IDLE 内的池中线程,缺省 timeout 是 60s,超过这个 IDLE 时长,线程实例将被终止及移出池。

  • 一般情况下,放入 CachedThreadPool 的线程不必担心其结束,超过 TIMEOUT 不活动,其会自动被终止

newFixedThreadPool(int)

  • newFixedThreadPool 与 cacheThreadPool 差不多,也是能 reuse 就用,但不能随时建新的线程。
  • 其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子。
  • 和 cacheThreadPool 不同,FixedThreadPool 没有 IDLE 机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的 TCP 或 UDP IDLE 机制之类的),所以 FixedThreadPool 多数针对一些很稳定很固定的正规并发线程,多用于服务器。
  • 从方法的源代码看,cache池和fixed 池调用的是同一个底层 池,只不过参数不同:
    • fixed 池线程数固定,并且是0秒IDLE(无IDLE)。
    • cache 池线程数支持 0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60 秒 IDLE 。

newScheduledThreadPool(int)

  • 调度型线程池
  • 这个池子里的线程可以按 schedule 依次 delay 执行,或周期执行

SingleThreadExecutor()

  • 单例线程,任意时间池中只能有一个线程
  • 用的是和 cache 池和 fixed 池相同的底层池,但线程数目是 1-1,0 秒 IDLE(无 IDLE)

一般来说,CachedTheadPool 在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时停止创建新线程,因此它是合理的 Executor 的首选,只有当这种方式会引发问题时(比如需要大量长时间面向连接的线程时),才需要考虑用 FixedThreadPool。

最后给个例子,大家参考参考使用:

 class ExecutorTest {
    public static void main(String[] args) {

        Runnable hello = () -> {
            for (int i = 0; i < 100; i++) {
                System.out.println(i + " hello");
            }
        };
        Runnable bye = () -> {
            for (int i = 0; i < 100; i++) {
                System.out.println(i + " bye");
            }
        };

        Executor executor = Executors.newCachedThreadPool();

        executor.execute(hello);
        executor.execute(bye);

    }
}

 

除特别注明外,本站所有文章均为duzhi原创,转载请注明出处来自https://www.duzhi.me/article/1183.html

联系我们

******

在线咨询:点击这里给我发消息

邮件:ashang.peng#aliyun.com

QR code