Многопоточность. java.util.concurrent. ExecutorService и Thread Pool
Дорогу осилит идущийСегодня мы начнем знакомство с Thread Pool («трэд пул», пул потоков) – последней темой, которая будет освещена в рамках многопоточности, но растянется на несколько уроков.
Thread Pool – набор (пул) потоков, предназначенный для выполнения определенных задач (или определенного типа задач). По сути, является инструментом, дающим упрощенный API по взаимодействию с потоками в рамках определенной задачи, что позволяет абстрагироваться от ручного создания и управления потоками. А также позволяет переиспользовать объекты потоков, а не создавать новый поток под каждую задачу.
Базовый сценарий использования заключается в том, чтобы сообщить пулу, какую задачу (обычно, представленную в виде лямбда-выражения*) требуется выполнить. Пул выделает поток для переданной задачи. Или ставит ее в очередь, или планирует ее выполнение – в зависимости от реализации пула. После выполнения задачи, ответственный за нее поток возвращается в пул и может быть использован снова.
* Обычно, но не всегда. Инструкции таких задач располагаются внутри реализации соответствующих (Runnable/Callable) функциональных интерфейсов. Но могут быть описаны как лямбда-выражением, так и полноценным классом-реализацией. Это не имеет особого значения в контексте выполнения таких инструкций. Зато может сильно помочь при описании этих инструкций.
Скажем, Runnable не имеет входных параметров. Зато класс, имплементирующий Runnable вполне может иметь поля, а эти поля можно использовать в переопределенном run():)
С точки зрения применения многопоточности в реальных проектах, вы, вероятно, почти всегда будете работать с пулами потоков. Создание объекта Thread в ручном режиме – что-то из ряда вон выходящее.
В рамках текущего урока мы познакомимся с основами иерархии пулов потоков и посмотрим, какие существуют методы для управления пулом.
!NB: Мы уже встречали класс, отвечающий за объединение потоков в группы – ThreadGroup. В отличии от пулов потоков, он отвечает за ручное управление потоками, объединенных в группу. Thread pool же не предоставляет ручного управления в таком императивном виде, как ThreadGroup. Пул является абстракцией более высокого уровня. За пределами операций создания и «закрытия» пула, он самостоятельно управляет своей жизнедеятельностью и своими потоками.
Интерфейс Executor
Данный интерфейс содержит всего один метод: execute(Runnable command).
Иными словами, данный интерфейс предоставляет контракт на выполнение переданного в него Runnable. Он бы нас мало интересовал, если бы не одно но: он является «дедушкой» всех thread pool’ов.
Основной плюс этого интерфейса в том, что он предоставляет контракт на выполнение Runnable, но не ограничивает способ этого выполнения: в текущем потоке или в другом, сейчас или с задержкой по времени, или в неопределенный момент времени и т.д.
Таким образом, используя execute(), мы можем быть уверены что наша задача (описанная в Runnable или его наследнике) будет выполнена. При этом способ ее выполнения будет зависеть от имплементации, которую мы выберем.
Интерфейс ExecutorService
Наследник Executor и «отец» thread pool’ов. Предоставляет общий для всех имплементаций пулов потоков интерфейс. Его методы мы разберем подробнее.
Кроме метода Executor#execute() есть еще несколько способов отправить задачу на выполнение пулом:
submit()
По назначению схож с execute() – позволяет передать задачу на выполнение в пул, но имеет возвращаемое значение и умеет работать не только с Runnable.
Все варианты submit() возвращают Future. С этим интерфейсом и его реализациями мы познакомимся в отдельном уроке. Главное, что стоит узнать о нем на данном этапе – то, что данный механизм представляет собой инструмент для получения результата задачи, выполняемой асинхронно.
Т.е. мы, допустим, хотим получить что-то в результате выполнения задачи в другом потоке. Но нельзя требовать от потока объект – мы не знаем, когда поток выполнит переданные инструкции, а значит получение такого объекта будет блокирующей операцией.
Зато мы можем получить Future – объект, который позволит получить результат, когда асинхронная задача будет выполнена. Он предоставляет метод, позволяющий получить результат (та самая блокирующая операция), узнать статус – доступен ли уже результат, а также отменить саму задачу (с рядом оговорок). Подробнее разберемся в рамках отдельного урока.
Итак, все перегрузки submit() возвращают Future, внутрь которого можно положить результат выполнения задачи, переданной в параметрах метода. Собственно, а что с параметрами?
1. Callable. Мы уже сталкивались с этим функциональным интерфейсом. Его основное отличие от Runnable в том, что он имеет возвращаемое значение. В случае с использованием через submit(), возвращаемое значение и будет внутри Future. Дополнительный бонус этого интерфейса – его метод обозначен как throws Exception, а значит мы можем не использовать внутри лямбды try-catch или даже явно выбрасывать checked-exception. Безусловно, этой возможностью надо пользоваться с умом, но иногда она облегчает жизнь;
2. Runnable + T result. С Runnable все понятно. Объект обобщенного типа – результат – будет возвращен внутри Future. По сути, предназначается для ситуаций, вроде описанных в примере:
var result = new HashMap<String, Object>();
Runnable task = () -> {
result.put("success", true);
};
var futureResult = Executors.newFixedThreadPool(1) // Создание пула.
// Разберем позже
.submit(task, result);
Само собой, можно использовать не только мапу, а любой объект. Таким образом, основное отличие от Callable сводится к тому, что объект результата создан вне лямбды. Его же наполнение может происходить уже внутри «task».
3. Runnable. По сути, возвращаемый результат отсутствует – контракт говорит, что в случае успешного выполнения полезной нагрузкой Future будет null. От execute() отличается тем, что позволяет определить статус выполнения в момент времени, используя Future. А также отменить задачу через него же.
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
Метод, принимающий коллекцию Callable и возвращающий список выполненных Future. Таким образом, данный метод запускает (обычно параллельно) переданный ему набор задач, ожидает их завершения (не важно, успешного или посредством выброса исключения), после чего возвращает результаты, обернутые во Future. В данном случае Future нужен не как механизм для получения результата, а как обертка над отсутствием возвращаемого значения (если задача завершилась с исключением).
Также существует перегруженная версия этого метода, принимающая дополнительными параметрами период времени. Отличие от первой реализации в том, что все задачи, которые не успели выполниться за переданный период времени, будут отменены. Таким образом, часть списка Future, возвращенных из метода, могут быть не завершенными, а отмененными.
T invokeAny(Collection<? extends Callable<T>> tasks)
Метод по набору параметров идентичен предыдущему. Отличие в том, что invokeAny() дождется первого успешного выполнения задачи из списка, а не выполнения всех. И вернет полученный результат. Все остальные задачи будут отменены.
Также существует перегрузка с указанием периода времени. Если метод укладывается в заданный период – результат не будет отличаться от первой реализации. Если не уложится, то будет выброшено исключение – TimeoutException.
Кроме методов выполнения задач, ExecutorService предоставляет ряд методов для управления пулом и отслеживания его состояния:
· shutdown(). Завершение работы пула, его «закрытие». Все задачи, уже переданные в него будут завершены, новые задачи пул откажется принимать. После завершения всех задач пул завершит свою работу;
· shutdownNow(). Метод для экстренного завершения работы пула. После вызова этого метода пул перестанет принимать новые задачи, задачи, ожидающие выполнения вернутся в виде списка Runnable, задачи, которые находятся в стадии выполнения будут, по возможности, остановлены.
Обычно для остановки используется Thread#interrupt(). Соответственно, если задача не обрабатывает прерывание потока – она может оказаться никогда не завершенной, т.е. получение ее результата из Future заблокирует поток навсегда.
Полагаю, очевидно, что с данным методом стоит работать крайне осторожно;
· awaitTermination(). Метод, принимающий параметрами период времени. Блокирует текущий поток, пока все задачи в пуле не будут выполнены или пока не истечет указанный период времени. Используется после shutdown() или shutdownNow(). Если задачи успели завершиться в срок – вернет true, иначе – false;
· isShutdown(). Вернет true, если пул был закрыт. Иначе – false;
· isTerminated(). Возвращает true, если после завершения работы пула все активные задачи были выполнены. Иначе (пул еще не закрыт, или не все задачи были выполнены при закрытии) – false.
Вместо заключения
Как вы могли заметить при разборе методов, пул потоков не гарантирует ни мгновенного выполнения переданной задачи, ни даже ее мгновенного запуска. Дело в том, что пулы потоков ограничены по размеру. Т.е. число потоков либо задано в явном виде, либо определено иным образом, но не является бесконечным.
Но проблема в том, что задачи, в отличии от числа потоков в пуле, вполне могут поступать бесконечно. Поэтому работа пула, по сути, заключается в приеме поступающих задач (обычно они направляются в очередь – один из классических примеров использования блокирующих очередей) и их выполнении по мере появления свободных потоков. Т.е. каждый конечный поток в пуле (он же worker) по сути представляет из себя бесконечный цикл, берущий задачи из очереди (если они там есть) и выполняющий их.
Это общее и очень упрощенное описание работы некого абстрактного пула потоков. Стоит понимать, что существуют реализации, сильно отличающиеся от описанной выше. Например, пул, запускающий задачи по таймеру – мы рассмотрим его в следующем уроке, среди прочих реализаций.
С теорией на сегодня все!
В следующем уроке мы рассмотрим конкретные реализации пулов потоков, а также способы их создания. И уже с учетом новых знаний поработаем над практикой.

Если что-то непонятно или не получается – welcome в комменты к посту или в лс:)
Канал: https://t.me/ViamSupervadetVadens
Мой тг: https://t.me/ironicMotherfucker
Дорогу осилит идущий!