Многопоточность. java.util.concurrent. Реализации Thread Pool и при чем тут Stream API

Многопоточность. java.util.concurrent. Реализации Thread Pool и при чем тут Stream API

Дорогу осилит идущий

В прошлом уроке мы познакомились с концепцией пула потоков и разобрали интерфейсы, общие для всех реализаций пулов.

Теперь пришло время разобраться с тем, какие реализации пулов существуют в Java.

 

ThreadPoolExecutor

Наиболее близкая к канонической реализация, подходящая для большинства задач. Связано это с двумя причинами:

1.    Данный пул не имеет выраженной специфики. Ниже мы познакомимся с более узконаправленными пулами и данный тезис станет понятнее;

2.    У ThreadPoolExecutor достаточно гибкая настройка (как на этапе создания, так и в рамках использования пула), что позволяет сконфигурировать пул на любой вкус, в зависимости от потребностей в рамках конкретной задачи.

Учитывая второй пункт, настройка пула вручную – дело нечастое. Слишком много нюансов, которые нужно учитывать – некоторые параметры связаны между собой. В рамках текущего пункта мы разберем основные настройки, а в конце статьи – познакомимся с классом, который позволяет упростить конфигурацию пулов и предоставляет более простой интерфейс по созданию пулов.

Кроме того, в ThreadPoolExecutor есть ряд публичных методов, которые отсутствуют в ExecutorService. Часть из них мы опустим – они являются геттерами/сетерами полей ThreadPoolExecutor или иным образом относятся к конфигурации пула.

Но пару интересных методов мы, все-таки, рассмотрим. После чего познакомимся с основными параметрами конфигурации пула.

 

Методы

Нас интересует лишь два метода, оба связаны с очисткой очереди задач (да, держим в голове, что пул потоков работает на базе блокирующей очереди Runnable’ов (не только их, но не суть)).

1.    purge(). Данный метод удаляет из очереди задач те, которые были отменены (мы уже упоминали, что механизм Future имеет соответствующую функциональность);

Безусловно, если этот метод не использовать, обычно ничего страшного не происходит. Просто поток, дошедший до выполнения отмененной задачи, выкинет ее из очереди и перейдет к следующей.
Но, как вы можете помнить, классические блокирующие очереди ограничены по размеру. А значит, могут заполниться и попытки добавления задач будут блокировать потоки, пытающиеся это сделать.
По сути, мы получим голодание потоков из-за того, что очередь забита задачами, которые уже отменены и не будут выполнены – лишь занимают место.

2.    remove(Runnable task). По сути, метод обратный execute(). Пытается удалить из очереди Runnable, переданный параметром. Может работать некорректно с submit() – там конечный Runnable не всегда является тем, который был передан параметром (тем более, параметром может быть вообще Callable). Но в случае с execute() предлагает альтернативу механизму отмены задачи, который для submit() предоставляет Future.

Как было сказано выше, остальные методы, специфические для ThreadPoolExecutor направлены на его конфигурацию и/или являются геттерами и сеттерами для его полей. Часть из этих полей рассмотрим в следующем пункте.

 

Поля. И основа конфигурации

Поля предлагаю рассмотреть на базе наиболее «жирного» конструктора данного класса. Именно они позволяют понять, какими параметрами пула можно управлять. И дают общее понимание, какие характеристики пула потоков имеют значение.

·      int corePoolSize. Или базовый размер пула. Это то число потоков, которое будет существовать в пуле, даже если пул не будет иметь задач для всех. У последней формулировки есть две оговорки, но они не критичны в данном контексте.

·      int maximumPoolSize. Максимально допустимо число потоков в пуле. Потоки после превышения corePoolSize могут добавляться, если очередь задач занята – т.е. уже существующие потоки не справляются. При простое (отсутствии задач) эти потоки удаляются.

В целом, можно сравнить с турбо-частотой у процессора. Если надо, можно разогнать, но на постоянной основе поддерживать избыточно. Если maximumPoolSize == corePoolSize – у пула фиксированный размер и он не может расшириться;

·      long keepAliveTime, TimeUnit unit. Период времени, в течении которого будут поддерживаться дополнительные потоки (сверх corePoolSize) при отсутствии задач. После истечения заданного периода они будут удалены;

·      BlockingQueue<Runnable> workQueue. Та самая пресловутая очередь задач. При создании экземпляра ThreadPoolExecutor через конструктор мы вынуждены передавать ее явно;

·      ThreadFactory threadFactory. Фабрика потоков. С паттерном «фабрика» и его разновидностях мы еще познакомимся. В контексте данного случая – интерфейс, предлагающий метод, создающий поток. Именно на основе этой штуки будут создаваться потоки-исполнители пула. А значит, благодаря кастомной реализации фабрики можно кастомизировать и конкретные потоки под ваши нужды. Обычно это не требуется, но такая возможность существует;

·      RejectedExecutionHandler handler. Обработчик, который может быть вызван, если пул не может быть расширен (maximumPoolSize достигнут) и очередь потоков заполнена.

По сути, это механизм, который позволяет избежать блокировки потока при попытке добавить задачу в пул.
Есть 4 базовые стратегии (реализации RejectedExecutionHandler), который предлагает сам пул. Теоретически, можно создать свои реализации и использовать их.
Мы не будем рассматривать все 4 стандартные стратегии – их легко найти и ничего сложного в них нет. Лишь отметим, что при стратегии по умолчанию добавление задачи сверх лимита будет приводить к исключению (RejectedExecutionException).

Подводя итог, в рамках конфигурации ThreadPoolExecutor мы может влиять на допустимое число потоков в нем, время простоя (до удаления) для дополнительных потоков (для основных – тоже, но через отдельный метод), указывать очередь с задачами для пула и стратегию борьбы с переполнением пула. Кроме того, мы можем задать собственный механизм создания конкретных потоков-исполнителей для заданного пула через реализацию кастомной фабрики потоков.

По сути, описанное позволяет сконфигурировать пул почти под любую задачу, которая может возникнуть.

 

ScheduledExecutorService и ScheduledThreadPoolExecutor

В прошлом уроке мы не разобрали один специфический интерфейс-наследник ExecutorService ScheduledExecutorService.

Как можно догадаться из названия, он предлагает механизм для запуска задач по расписанию – как единичных с заданной отсрочкой, так и периодических – с заданным промежутком между срабатываниями.

Область применения данного механизма практически безгранична. Это основа всевозможных таймеров, периодических задач на любой вкус – от строго внутренних, вроде синхронизаций между разными системами/частями системы, до видимых пользователю – скажем, рассылок писем/уведомлений/напоминаний. Наверно, проще перечислить системы, в которых данный механизм не нужен, чем те, в которых необходим.

Интерфейс ScheduledExecutorService имеет всего 3 новых метода (один из них перегружен):

1.    schedule(). Принимает Runnable (в альтернативной реализации - Callable) – задачу, а также параметры для указания периода (число единиц и единицы измерения). Предназначен для назначения единичной задачи на запуск через указанный (параметрами периода) промежуток времени, считая от «сейчас»;

2.    scheduleAtFixedRate(). Принимает задачу в виде Runnable, задержку от «сейчас» до первого исполнения и задержку между исполнениями (запуском предыдущего исполнения и запуском следующего). А также параметр единиц измерения (TimeUnit), относящийся сразу к двум предыдущим параметрам;

3.    scheduleWithFixedDelay().Принимает те же параметры, что и предыдущий метод. Но если в scheduleAtFixedRate() задержка означала период между запусками задач, то в данном методе задержка считается от завершения предыдущего запуска до начала следующего. Для задач, выполнение которых происходит не мгновенно, это может иметь значение.


Все указанные методы возвращают объект типа ScheduledFuture – наследника знакомого по последним урокам Future и знакомого по блокирующим очередям Delayed. Вы же помните DelayQueue? Так вот, она здесь не используется:)
Зато используется похожая на нее непубличная реализация.

В теории, периодические задачи, заданные через один из двух подходящих методов ScheduledExecutorService, будут запускаться до бесконечности. На практике же этот процесс может быть прерван через отмену задачи с помощью Future (в данном случае, ScheduledFuture). Кроме того, может быть завершена работа самого пула, обслуживающего данную периодическую задачу. И, наконец, завершение одного из запусков с исключением приведет к тому, что задача перестанет запускаться.

 

Перейдем к реализации интерфейса – ScheduledThreadPoolExecutor. Она всего одна и наследует уже разобранный выше ThreadPoolExecutor.

В целом, чего-то специфического в ней нет, лишь отметим, что ScheduledThreadPoolExecutor не так гибок в настройке, как его предок. Мы можем указать лишь:

·      базовое число потоков (corePoolSize). Сверх него пул может расширяться до Integer.MAX_VALUE. В контексте пула потоков это, по сути, расширение до бесконечности;

·      Фабрику потоков (threadFactory);

·      Стратегия на случай переполнения (handler). Стратегия по умолчанию не отличается от ThreadPoolExecutor – исключение на добавление в переполненную очередь. Впрочем, с учетом условно-бесконечного пула, переполнение очереди – не самый вероятный сценарий.

Таким образом, с использованием двух изученных реализаций мы имеем пул с гибкой настроек для разовых задач, а также отдельный пул для задач, отложенных во времени или повторяющихся с заданной периодичностью. Что еще может понадобиться Java-разработчику?

 

ForkJoinPool

Кроме описанных выше, в Java есть специфический вид пула – ForkJoinPool, хорошо подходящий для двух сценариев:

·      Простые асинхронные задачи. Которые выполняются быстро и не предполагают блокировки потока внутри пула;

·      Асинхронные/параллельные задачи, которые порождают другие асинхронные/параллельные задачи. Отсюда и название: fork – ответвление (вторичной задачи в отдельный поток от текущей) и join – ожидание выполнения такой вторичной задачи в первичной.

Отдельный интересный момент заключается в механизме work-stealing – кражи (?) работы. Не погружаясь в детали, освободившийся поток в ForkJoinPool будет пытаться найти себе другую задачу – в т.ч. «забрать» вторичную задачу у другого потока в пуле. Таким образом обеспечивается более равномерная загрузка потоков пула и, тем самым, общая эффективность пула.

К слову, именно ForkJoinPool используется для параллельных Stream’ов в Stream API. Механизм «ветвящихся» параллельных задач хорошо ложится в решение параллельной обработки стрима.
В таком подходе и, как следствие, в использовании параллельных стримов, есть свои сложности и ограничения, из-за которых не рекомендуется добиваться параллелизма через parallelStream, но для простых операций такой подход имеет право на жизнь.

Как бы там ни было, в рамках реальных проектов рекомендую советоваться с более опытными разработчиками при желании использовать параллельные стримы. А так же проверять с помощью логов и других доступных инструментов их эффективность в каждом конкретном случае.
Возникающие проблемы могут начинаться от загрузчика классов (для гугла: ClassLoader java) до неэффективного определения необходимого уровня параллелизма при выделении пула для конкретного стрима.

Возможно, есть и более неприятные вещи, но автор с ними еще не столкнулся:)

Возвращаясь к функциональности ForkJoinPool, хочется рассмотреть возможности его конфигурации – они несколько отличаются от того, что есть в ThreadPoolExecutor, даже для одинаковых по названиям параметров. Но поскольку мы не слишком глубоко погражаемся в реализацию пула, а полноценная конфигурация ForkJoinPool – крайне редкая задача, остановимся лишь на паре моментов, на которые стоит обратить внимание.

·      int parallelism. ForkJoinPool, в отличии от других пулов потоков, больше ориентирован на то, сколько потоков на самом деле может работать одновременно, а не сколько объектов Thread может лежать в пуле. Поэтому можно задать значение параллелизма явно, если имеется исчерпывающее представление о возможностях процессора и лимитах JVM в среде, где будет использована программа. Или делегировать это самой JVM – документация предлагает использовать Runtime#availableProcessors() для получения информации об уровне параллелизма (условно, число ядер) в рамках среды выполнения;

·      boolean asyncMode. Если задачи пула предполагаются как асинхронные (нужно сделать, но основной поток не блокируется ожиданием результата) и не предполагают активного «ветвления» – данный флаг стоит установить в true.

В таком случае пул будет работать в режиме, который чем-то напоминает fair-доступ у локов и других инструментов – задачи, поступающие в пул будут обрабатываться в порядке поступления в пул.

Значение по умолчанию – false. Оно больше подходит для параллельных задач, нежели асинхронных.

 

Также предлагаю разобрать ключевые методы ForkJoinPool, которых нет в уже рассмотренных пулах.

 

static commonPool()

Возвращает ForkJoinPool, который система завела по умолчанию. Он имеет некоторые некритичные в работе отличия (читай, оптимизации) относительно ForkJoinPool, который мы можем создать руками. И, зачастую, работу с созданием ForkJoinPool стоит ограничить данным методом. Он отлично подходит для не трудоемких задач.

Основная опасность при использовании пула из этого метода – чрезмерное и/или бездумное использование. При заваливании пула трудоемкими задачи могут возникнуть общие проблемы с производительностью системы – могут возникать неожиданные провисания в выполнении за счет того, что задачи ждут своей очереди. А пул, в свое время, занят перевариванием трудоемких задач, которые были переданы в него из другой части системы. Подобные проблемы тяжело идентифицировать и бороться с ними весьма неприятно.

Основная мораль: не грузите ForkJoinPool.commonPool() тяжелыми задачами. Особенно задачами с блокирующими операциями: обращение к внешним ресурсам, включая файлы и БД, длительное ожиданием разделяемого ресурса и т.д. Он для этого не предназначен.

При необходимости выполнять эти операции параллельно или асинхронно стоит использовать другие пулы. В идеале – вообще не ForkJoinPool.


invoke(ForkJoinTask<T> task)

invoke() – метод, который напоминает рассмотренные в предыдущем уроке методы invokeAll() и invokeAny(). Его основное отличие в том, что принимает он параметром одну задачу (вместо коллекции), ожидает ее выполнения и возвращает результат.

Второе отличие заключается в том, что мы не работаем с Callable.

Callable и Runnable вообще не в чести у ForkJoinPool. Как и любой пул, он умеет с ними работать, но предпочитает использование ForkJoinTask.

Строго говоря, ForkJoinTask не является задачей. Этот абстрактный класс – одна из имплементаций Future, актуальная для ForkJoinPool. Зато он имеет ряд статических методов adapt() по набору параметров идентичных параметрам у различных реализаций ExecutorService#submit().

И как раз adapt()-методы возвращают наследников ForkJoinTask, которые также имплементируют Runnable или хранят его в виде поля (с Callable немного хитрее, но суть та же). Так или иначе, объекты этих реализации представляют из себя задачу, имеющую также функциональность, направленную на эффективную обработку в ForkJoinPool – методы

·      ForkJoinTask#fork() – асинхронный запуск задачи, запустивший поток не блокируется;

·      ForkJoinTask#invoke() – синхронный запуск задачи, запустивший поток блокируется до получения результата.

Их рекомендуется использовать для запуска вторичных задач – тех, которые запускаются из задачи, которая уже обрабатывается в ForkJoinPool’е. Это предпочтительнее, чем использовать в таких случаях методы самого ForkJoinPool.

Данная концепция немного громоздка, особенно на первый взгляд, но относительно проста в использовании.

 

execute(ForkJoinTask<?> task)

Здесь ничего особенного. Принимает параметром таск, который необходимо выполнить в рамках пула. Является перегрузкой Executor#execute(Runnable runnable), только в качестве параметра принимает более близкий данному пулу ForkJoinTask.

 

submit(ForkJoinTask<T> task)

Еще одна перегрузка. Как ExecutorService#submit() но с поправкой на реалии ForkJoinPool.


Отдельно стоит отметить, что методы submit() (как новый, так и известные по ExecutorService) в ForkJoinPool возвращают не просто Future, а ForkJoinTask, который имеет расширенную функциональность, в сравнении со своим предком.

 

Кроме рассмотренных методов, есть еще несколько, обеспечивающих вспомогательные функции при необходимости более тонкой работы с пулом. Их мы не будем рассматривать в силу узости их сферы применения. Боюсь, в контексте необходимости осознать связку ForkJoinPool ForkJoinTask данный урок и так получился нагруженным. Для тех, кому действительно интересно разобраться – рекомендую больше внимания уделить практике и документации/исходному коду. Ничего сложного здесь нет, но нужно потратить время на осознание концепции.

 

Утилитный класс Executors

Мы рассмотрели все основные реализации thread pool’ов в java.util.concurrent – их всего три.

Но, как и во многих других случаях, для сложных/гибких механик Java предлагает вспомогательный класс со статическими методами. В данном случае – Executors.

Он предлагает несколько методов по превращению Runnable (и не только) в Callable – см. callable(), метод с дефолтной реализацией фабрики потоков – defaultThreadFactory() и, самое главное – 14 методов для создания пулов потоков.

Чтобы не разбирать каждый метод по отдельности, отметим категории пулов, которые он предлагает:

·      С фиксированным числом потоков. corePoolSize == maximumPoolSize;

·      С использованием work-stealing механизма. Читай, сконфигуренные ForkJoinPool’ы;

·      Однопоточные пулы – corePoolSize == maximumPoolSize == 1;

·      Кэшированные пулы. corePoolSize == 0, но maximumPoolSize не ограничен. При появлении задач создаются потоки, когда задачи заканчиваются – живут в течении минуты (находятся в «кэше») и завершаются, если не дождутся новых задач. При поступлении новых задач после завершения кэшированных потоков – создаются новые;

·      Scheduled-пулы. Однопоточные и нет;

·      «Не сконфигуренные» пулы. Возвращают обертку над реализациями ExecutorService/ScheduledExecutorService (в зависимости от метода), которые принимают параметром. Все операции, доступные данным интерфейсам, делегируют пулу, который был передан параметром.

Удобны тем, что гарантируют недоступность кастомных методов пула даже при желании пользователя – используется объект-обертка, что исключает возможность обращения к кастомным методам через downcast.

Так, например, с помощью подобных методов можно ForkJoinPool ограничить в функциональности до ExecutorService. Главное понять, зачем это нужно:)

 

В качестве заключения

Пулы потоков – очень полезная тема при практическом использовании многопоточности. Независимо от остальных тем данного раздела, крайне рекомендую разобраться в ней хотя бы на том уровне, который предлагает данный урок.

В целом, в ней нет ничего сложного, если мы говорим об использовании пулов, а не их внутреннем устройстве. Даже ForkJoinPool достаточно прост, если потратить на него немного времени. Остальные пулы еще более прозрачны.

Надеюсь, данная статья поможет вам в осознании возможностей данных механик.

 

С теорией на сегодня все!

Переходим к практике:

Задача 1

Реализуйте программу, которая выводит актуальное время каждую минуту, начиная с текущего момента. Не используйте Thread.sleep() или циклы.

 

Задача 2

Реализуйте программу, которая сообщает о начале каждого часа. Не используйте Thread.sleep() или циклы.

 

Примечание: для Задач 1 и 2 в рамках отладки можно выбрать любые промежутки времени. Скажем, не каждую минуту, а каждую секунду. Или не «начало каждого часа», а «начало каждой минуты». Концептуально это не влияет на решение.

 

Задача 3

Реализуйте третий вариант Задачи 2 урока 60 с использованием пулов потоков.

 

Задача 4

Реализуйте программу, которая выводит числа от 0 до 100 в консоль. Выведения каждого десятка должно быть вынесено в отдельный поток, в котором будут запущены потоки на выведение каждого конкретного числа.

Каждый конечный поток обязан спать в течении 500мс после того, как выведет число в консоль.


Если что-то непонятно или не получается – welcome в комменты к посту или в лс:)

Канал: https://t.me/ViamSupervadetVadens

Мой тг: https://t.me/ironicMotherfucker

 

Дорогу осилит идущий!

Report Page