Статьи

Мой пользовательский пул потоков в Java

ThreadPoolExecutor — это функция, добавленная java concurrent api для эффективного обслуживания и повторного использования потоков, так что нашим программам не нужно беспокоиться о создании и уничтожении потоков и сосредоточиться на основной функциональности. Я создал пользовательский пул потоков, чтобы лучше понять, как будет работать пул потоков.

Функциональность:

  • Он поддерживает фиксированный пул потоков и создает потоки и запускает потоки, даже если задача не отправляется, тогда как ThreadPoolExecutor создает потоки по требованию, т. Е. Всякий раз, когда исполняемый файл передается в пул, а число потоков меньше размера основного пула.
  • В ThreadPoolExecutor мы предоставляем очередь ожидания, где новая запускаемая задача ожидает, когда все потоки заняты выполнением существующей задачи. После заполнения очереди будут созданы новые потоки с максимальным размером пула. В MyThreadPool я храню исполняемый файл в связанном списке, поэтому каждая задача будет ждать в списке, и он не ограничен, поэтому в этом случае не используется maxPoolSize.
  • В ThreadPoolExecutor мы используем Future Objects для получения результата от задачи, метод future.get () будет блокироваться, если результат недоступен, или мы используем CompletionService. В MyThreadPoolExecutor я создал простой интерфейс под названием ResultListener, пользователь должен предоставить реализацию того, как он хочет, чтобы вывод обрабатывался. После выполнения каждой задачи ResultListener получит обратный вызов с выводом задачи, или в случае любого исключения будет вызван метод ошибки.
  • При вызове метода завершения MyThreadPoolExecutor перестанет принимать новые задачи и выполнит оставшиеся задачи.
  • Я предоставил очень простую функциональность по сравнению с ThreadPoolExecutor, я использовал простой механизм потока, как wait (), notify (), notifyAll () и join ().
  • По производительности он похож на ThreadPoolExecutor, в некоторых случаях лучше в некоторых случаях. Дайте мне знать, если вы найдете какие-либо интересные результаты или способы улучшить его.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.util;
 
import java.util.concurrent.Callable;
 
/**
 * Run submitted task of {@link MyThreadPool} After running the task , It calls
 * on {@link ResultListener}object with {@link Output}which contains returned
 * result of {@link Callable}task. Waits if the pool is empty.
 *
 * @author abhishek
 *
 * @param
 
 */
 
import java.util.concurrent.Callable;
/**
* Run submitted task of {@link MyThreadPool} After running the task , It calls
* on {@link ResultListener}object with {@link Output}which contains returned
* result of {@link Callable}task. Waits if the pool is empty.
*
* @author abhishek
*
* @param <V>
*/
public class MyThread<V> extends Thread {
    /**
    * MyThreadPool object, from which the task to be run
    */
    private MyThreadPool<V> pool;
    private boolean active = true;
    public boolean isActive() {
        return active;
    }
    public void setPool(MyThreadPool<V> p) {
        pool = p;
    }
    /**
    * Checks if there are any unfinished tasks left. if there are , then runs
    * the task and call back with output on resultListner Waits if there are no
    * tasks available to run If shutDown is called on MyThreadPool, all waiting
    * threads will exit and all running threads will exit after finishing the
    * task
    */
    public void run() {
        ResultListener<V> result = pool.getResultListener();
        Callable<V> task;
        while (true)
        {
            task = pool.removeFromQueue();
            if (task != null)
            {
                try
                {
                    V output = task.call();
                    result.finish(output);
                } catch (Exception e)
                {
                    result.error(e);
                }
            } else
            {
                if (!isActive())
                break;
                else
                {
                    synchronized (pool.getWaitLock())
                    {
                        try
                        {
                            pool.getWaitLock().wait();
                        } catch (InterruptedException e)
                        {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
    void shutdown() {
        active = false;
    }
}
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
package com.util;
import java.util.LinkedList;
import java.util.concurrent.Callable;
/**
* This class is used to execute submitted {@link Callable} tasks. this class
* creates and manages fixed number of threads User will provide a
* {@link ResultListener}object in order to get the Result of submitted task
*
* @author abhishek
*
*
*/
public class MyThreadPool<V> {
    private Object waitLock = new Object();
    public Object getWaitLock() {
        return waitLock;
    }
    /**
    * list of threads for completing submitted tasks
    */
    private final LinkedList<MyThread<V>> threads;
    /**
    * submitted task will be kept in this list untill they run by one of
    * threads in pool
    */
    private final LinkedList<Callable<V>> tasks;
    /**
    * shutDown flag to shut Down service
    */
    private volatile boolean shutDown;
    /**
    * ResultListener to get back the result of submitted tasks
    */
    private ResultListener<V> resultListener;
    /**
    * initializes the threadPool by starting the threads threads will wait till
    * tasks are not submitted
    *
    * @param size
    * Number of threads to be created and maintained in pool
    * @param myResultListener
    * ResultListener to get back result
    */
    public MyThreadPool(int size, ResultListener<V> myResultListener) {
        tasks = new LinkedList<Callable<V>>();
        threads = new LinkedList<MyThread<V>>();
        shutDown = false;
        resultListener = myResultListener;
        for (int i = 0; i < size; i++) {
            MyThread<V> myThread = new MyThread<V>();
            myThread.setPool(this);
            threads.add(myThread);
            myThread.start();
        }
    }
    public ResultListener<V> getResultListener() {
        return resultListener;
    }
    public void setResultListener(ResultListener<V> resultListener) {
        this.resultListener = resultListener;
    }
    public boolean isShutDown() {
        return shutDown;
    }
    public int getThreadPoolSize() {
        return threads.size();
    }
    public synchronized Callable<V> removeFromQueue() {
        return tasks.poll();
    }
    public synchronized void addToTasks(Callable<V> callable) {
        tasks.add(callable);
    }
    /**
    * submits the task to threadPool. will not accept any new task if shutDown
    * is called Adds the task to the list and notify any waiting threads
    *
    * @param callable
    */
    public void submit(Callable<V> callable) {
        if (!shutDown) {
            addToTasks(callable);
            synchronized (this.waitLock) {
                waitLock.notify();
            }
            } else {
            System.out.println('task is rejected.. Pool shutDown executed');
        }
    }
    /**
    * Initiates a shutdown in which previously submitted tasks are executed,
    * but no new tasks will be accepted. Waits if there are unfinished tasks
    * remaining
    *
    */
    public void stop() {
        for (MyThread<V> mythread : threads) {
            mythread.shutdown();
        }
        synchronized (this.waitLock) {
            waitLock.notifyAll();
        }
        for (MyThread<V> mythread : threads) {
            try {
                mythread.join();
                } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
package com.util;
 
/**
 * This interface imposes finish method
 * which is used to get the {@link Output} object
 * of finished task
 * @author abhishek
 *
 * @param
 
 */
 
public interface ResultListener
 
                        {
 
 public void finish(T obj);
 public void error(Exception ex);
 
}

Вы можете реализовать этот класс так, как хотите, чтобы получить обратно и обработать результат, возвращаемый задачами.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
package com.util;
 
public class DefaultResultListener implements ResultListener{
 
 @Override
 public void finish(Object obj) {
 
 }
 
 @Override
 public void error(Exception ex) {
  ex.printStackTrace();
 }
 
}

Например, этот класс добавит число, возвращаемое задачами.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.util;
 
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * ResultListener class to keep track of total matched count
 * @author abhishek
 *
 * @param
 
 */
public class MatchedCountResultListener
 
                        implements ResultListener
 
                         {
 
    /**
     * matchedCount to keep track of the number of matches returned by submitted
     * task
     */
    AtomicInteger matchedCount = new AtomicInteger();
 
    /**
     * this method is called by ThreadPool to give back the result of callable
     * task. if the task completed successfully then increment the matchedCount by
     * result count
     */
    @Override
    public void finish(V obj) {
        //System.out.println('count is '+obj);
        matchedCount.addAndGet((Integer)obj);
    }
 
    /**
     * print exception thrown in running the task
     */
    @Override
    public void error(Exception ex) {
        ex.printStackTrace();
    }
 
    /**
     * returns the final matched count of all the finished tasks
     *
     * @return
     */
    public int getFinalCount() {
        return matchedCount.get();
    }
}

Это тестовый класс, который запускается просто для цикла с использованием CompletionService и MyThreadPoolExecutor

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package test;
 
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
import com.util.DefaultResultListener;
import com.util.MyThreadPool;
 
public class TestClass {
 
    public static void main(String[] args) throws InterruptedException {
        CompletionService
 
                       threadService;
        ExecutorService service = Executors.newFixedThreadPool(2);
        threadService = new ExecutorCompletionService
 
                       (service);
 
        long b = System.currentTimeMillis();
        for(int i =0;i<50000;i++){
            threadService.submit(new MyRunable (i));
        }
 
        service.shutdown();
        System.out.println('time taken by Completion Service ' + (System.currentTimeMillis()-b));
 
        DefaultResultListener result = new DefaultResultListener();
        MyThreadPool
 
                         newPool = new MyThreadPool
 
                         (2,result);
        long a = System.currentTimeMillis();
 
        int cc =0;
        for(int i =0;i<50000;i++)
        {
            cc = cc+i;
        }
        System.out.println('time taken without any pool ' + (System.currentTimeMillis()-a));
        a= System.currentTimeMillis();
 
        for(int i =0;i<5000;i++){
            newPool.submit(new MyRunable (i));
        }
 
        newPool.stop();
        System.out.println('time taken by myThreadPool ' + (System.currentTimeMillis()-a));
    }
 
}
 
class MyRunable implements Callable
 
{
    int index = -1;
    public MyRunable(int index)
    {
        this.index = index;
    }
    @Override
    public Integer call() throws Exception {
        return index;
    }
 
}

Ссылка: Мой пользовательский пул потоков в Java от нашего партнера JCG Абхишека Сомани из блога Java, J2EE, Server .