ThreadPoolExecutor — это функция, добавленная java concurrent api для эффективного обслуживания и повторного использования потоков, так что нашим программам не нужно беспокоиться о создании и уничтожении потоков и сосредоточиться на основной функциональности. Я создал пользовательский пул потоков, чтобы лучше понять, как будет работать пул потоков.
Функциональность:
- Он поддерживает фиксированный пул потоков и создает потоки и запускает потоки, даже если задача не отправляется, тогда как ThreadPoolExecutor создает потоки по требованию, т. Е. Всякий раз, когда исполняемый файл передается в пул, а число потоков меньше размера основного пула.
- В ThreadPoolExecutor мы предоставляем очередь ожидания, где новая запускаемая задача ожидает, когда все потоки заняты выполнением существующей задачи. После заполнения очереди будут созданы новые потоки с максимальным размером пула. В MyThreadPool я храню исполняемый файл в связанном списке, поэтому каждая задача будет ждать в списке, и он не ограничен, поэтому в этом случае не используется maxPoolSize.
- В ThreadPoolExecutor мы используем Future Objects для получения результата от задачи, метод future.get () будет блокироваться, если результат недоступен, или мы используем CompletionService. В MyThreadPoolExecutor я создал простой интерфейс под названием ResultListener, пользователь должен предоставить реализацию того, как он хочет, чтобы вывод обрабатывался. После выполнения каждой задачи ResultListener получит обратный вызов с выводом задачи, или в случае любого исключения будет вызван метод ошибки.
- При вызове метода завершения MyThreadPoolExecutor перестанет принимать новые задачи и выполнит оставшиеся задачи.
- Я предоставил очень простую функциональность по сравнению с ThreadPoolExecutor, я использовал простой механизм потока, как wait (), notify (), notifyAll () и join ().
- По производительности он похож на ThreadPoolExecutor, в некоторых случаях лучше в некоторых случаях. Дайте мне знать, если вы найдете какие-либо интересные результаты или способы улучшить его.
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | packagecom.util;importjava.util.concurrent.Callable;/** * Run submitted task of {@link MyThreadPool} After running the task , It calls * on {@link ResultListener}object with {@link Output}which contains returned * result of {@link Callable}task. Waits if the pool is empty. *  * @author abhishek *  * @param  */importjava.util.concurrent.Callable;/*** Run submitted task of {@link MyThreadPool} After running the task , It calls* on {@link ResultListener}object with {@link Output}which contains returned* result of {@link Callable}task. Waits if the pool is empty.** @author abhishek** @param <V>*/publicclassMyThread<V> extendsThread {    /**    * MyThreadPool object, from which the task to be run    */    privateMyThreadPool<V> pool;    privatebooleanactive = true;    publicbooleanisActive() {        returnactive;    }    publicvoidsetPool(MyThreadPool<V> p) {        pool = p;    }    /**    * Checks if there are any unfinished tasks left. if there are , then runs    * the task and call back with output on resultListner Waits if there are no    * tasks available to run If shutDown is called on MyThreadPool, all waiting    * threads will exit and all running threads will exit after finishing the    * task    */    publicvoidrun() {        ResultListener<V> result = pool.getResultListener();        Callable<V> task;        while(true)        {            task = pool.removeFromQueue();            if(task != null)            {                try                {                    V output = task.call();                    result.finish(output);                } catch(Exception e)                {                    result.error(e);                }            } else            {                if(!isActive())                break;                else                {                    synchronized(pool.getWaitLock())                    {                        try                        {                            pool.getWaitLock().wait();                        } catch(InterruptedException e)                        {                            // TODO Auto-generated catch block                            e.printStackTrace();                        }                    }                }            }        }    }    voidshutdown() {        active = false;    }} | 
| 001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 | packagecom.util;importjava.util.LinkedList;importjava.util.concurrent.Callable;/*** This class is used to execute submitted {@link Callable} tasks. this class* creates and manages fixed number of threads User will provide a* {@link ResultListener}object in order to get the Result of submitted task** @author abhishek***/publicclassMyThreadPool<V> {    privateObject waitLock = newObject();    publicObject getWaitLock() {        returnwaitLock;    }    /**    * list of threads for completing submitted tasks    */    privatefinalLinkedList<MyThread<V>> threads;    /**    * submitted task will be kept in this list untill they run by one of    * threads in pool    */    privatefinalLinkedList<Callable<V>> tasks;    /**    * shutDown flag to shut Down service    */    privatevolatilebooleanshutDown;    /**    * ResultListener to get back the result of submitted tasks    */    privateResultListener<V> resultListener;    /**    * initializes the threadPool by starting the threads threads will wait till    * tasks are not submitted    *    * @param size    * Number of threads to be created and maintained in pool    * @param myResultListener    * ResultListener to get back result    */    publicMyThreadPool(intsize, ResultListener<V> myResultListener) {        tasks = newLinkedList<Callable<V>>();        threads = newLinkedList<MyThread<V>>();        shutDown = false;        resultListener = myResultListener;        for(inti = 0; i < size; i++) {            MyThread<V> myThread = newMyThread<V>();            myThread.setPool(this);            threads.add(myThread);            myThread.start();        }    }    publicResultListener<V> getResultListener() {        returnresultListener;    }    publicvoidsetResultListener(ResultListener<V> resultListener) {        this.resultListener = resultListener;    }    publicbooleanisShutDown() {        returnshutDown;    }    publicintgetThreadPoolSize() {        returnthreads.size();    }    publicsynchronizedCallable<V> removeFromQueue() {        returntasks.poll();    }    publicsynchronizedvoidaddToTasks(Callable<V> callable) {        tasks.add(callable);    }    /**    * submits the task to threadPool. will not accept any new task if shutDown    * is called Adds the task to the list and notify any waiting threads    *    * @param callable    */    publicvoidsubmit(Callable<V> callable) {        if(!shutDown) {            addToTasks(callable);            synchronized(this.waitLock) {                waitLock.notify();            }            } else{            System.out.println('task is rejected.. Pool shutDown executed');        }    }    /**    * Initiates a shutdown in which previously submitted tasks are executed,    * but no new tasks will be accepted. Waits if there are unfinished tasks    * remaining    *    */    publicvoidstop() {        for(MyThread<V> mythread : threads) {            mythread.shutdown();        }        synchronized(this.waitLock) {            waitLock.notifyAll();        }        for(MyThread<V> mythread : threads) {            try{                mythread.join();                } catch(InterruptedException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }        }    }} | 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 | packagecom.util;/** * This interface imposes finish method  * which is used to get the {@link Output} object  * of finished task * @author abhishek * * @param  */publicinterfaceResultListener                         { publicvoidfinish(T obj); publicvoiderror(Exception ex);} | 
Вы можете реализовать этот класс так, как хотите, чтобы получить обратно и обработать результат, возвращаемый задачами.
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 | packagecom.util;publicclassDefaultResultListener implementsResultListener{ @Override publicvoidfinish(Object obj) { } @Override publicvoiderror(Exception ex) {  ex.printStackTrace(); }} | 
Например, этот класс добавит число, возвращаемое задачами.
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | packagecom.util;importjava.util.concurrent.atomic.AtomicInteger;/** * ResultListener class to keep track of total matched count * @author abhishek *  * @param  */publicclassMatchedCountResultListener                        implementsResultListener                         {    /**     * matchedCount to keep track of the number of matches returned by submitted     * task     */    AtomicInteger matchedCount = newAtomicInteger();    /**     * this method is called by ThreadPool to give back the result of callable     * task. if the task completed successfully then increment the matchedCount by     * result count     */    @Override    publicvoidfinish(V obj) {        //System.out.println('count is '+obj);        matchedCount.addAndGet((Integer)obj);    }    /**     * print exception thrown in running the task     */    @Override    publicvoiderror(Exception ex) {        ex.printStackTrace();    }    /**     * returns the final matched count of all the finished tasks     *      * @return     */    publicintgetFinalCount() {        returnmatchedCount.get();    }} | 
Это тестовый класс, который запускается просто для цикла с использованием CompletionService и MyThreadPoolExecutor
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | packagetest;importjava.util.concurrent.Callable;importjava.util.concurrent.CompletionService;importjava.util.concurrent.ExecutorCompletionService;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.Future;importcom.util.DefaultResultListener;importcom.util.MyThreadPool;publicclassTestClass {    publicstaticvoidmain(String[] args) throwsInterruptedException {        CompletionService                       threadService;        ExecutorService service = Executors.newFixedThreadPool(2);        threadService = newExecutorCompletionService                       (service);        longb = System.currentTimeMillis();        for(inti =0;i<50000;i++){            threadService.submit(newMyRunable (i));        }        service.shutdown();        System.out.println('time taken by Completion Service '+ (System.currentTimeMillis()-b));        DefaultResultListener result = newDefaultResultListener();        MyThreadPool                         newPool = newMyThreadPool                         (2,result);        longa = System.currentTimeMillis();        intcc =0;        for(inti =0;i<50000;i++)        {            cc = cc+i;        }        System.out.println('time taken without any pool '+ (System.currentTimeMillis()-a));        a= System.currentTimeMillis();        for(inti =0;i<5000;i++){            newPool.submit(newMyRunable (i));        }        newPool.stop();        System.out.println('time taken by myThreadPool '+ (System.currentTimeMillis()-a));    }}classMyRunable implementsCallable{    intindex = -1;    publicMyRunable(intindex)    {        this.index = index;    }    @Override    publicInteger call() throwsException {        returnindex;    }} | 
Ссылка: Мой пользовательский пул потоков в Java от нашего партнера JCG Абхишека Сомани из блога Java, J2EE, Server .