Многопоточность. java.util.concurrent. Future. CompletableFuture

Многопоточность. java.util.concurrent. Future. CompletableFuture

Дорогу осилит идущий

В рамках текущей статьи мы поближе познакомимся с инструментом Future, который затронули в рамках разбора thread pool’ов. Итак, еще раз, что это и для чего нужно?

Future – интерфейс, реализации которого предоставляют механизм взаимодействия с результатом асинхронной (выполняемой в другом потоке без точной информации о времени завершения) операции (задачи), а также возможность отменить саму операцию, если это возможно.

Некоторые реализации не ограничиваются указанной функциональностью, но об этом позже.

Итак, какие методы предоставляют описанные возможности?

 

Future. Методы

Интерфейс Future является весьма лаконичным.


V get()

Данный метод блокирует поток, в котором был вызван, до тех пор, пока не будет выполнена операция, с которой связан объект Future. При корректном завершении – вернет результат этой операции (если таковой существует). Может также завершиться с исключениями:

·      InterruptedException – как и всегда, если текущий (заблокированный) поток был прерван;

·      ExecutionException – если в процессе выполнения асинхронной операции возникло исключение;

·      CancellationException – если данная операция была отменена через Future или иным образом – например, при вызове shutdown() у пула потоков;

Важно понимать, что вызов get() – операция дорогая и, зачастую, непредсказуемая – неизвестно, как долго потоку придется ждать выполнения связанной с Future задачи. Когда-то это бывает оправдано, когда-то – нет. Поэтому существует альтернатива в виде перегруженного метода.

Перегруженный метод get() принимает параметрами период времени (long timeout, TimeUnit unit), в течении которого пытается получить результат операции. Если по истечении срока задача все еще не выполнена – завершается с TimeoutException. В остальном ничем не отличается от get() без параметров.

 

boolean cancel(boolean mayInterruptIfRunning)

Метод, отменяющий (пытающийся отменить) связанную операцию.

Данный метод вернет false, если задача уже была отменена или завершена. Если нет – попытается отменить задачу, которая еще не начала свое исполнение.

В случае же, если mayInterruptIfRunning == true – попытается, также, отменить и задачу, которая уже начала выполняться, посредством прерывания потока, который ее выполняет. Последнее, очевидно, не является гарантией того, что задача будет отменена – как потому что сама задача может не проверять (явно или неявно) наличие прерывания, так и потому что не всегда известно, каким потоком выполнения взята задача, а значит, непонятно, какой поток нужно прервать.

Независимо от описанного поведения, данный метод, скорее всего, вернет true, если задача не была завершена/отменена на момент его вызова. Т.е. true не гарантирует того, что задача действительно была отменена. Об этом чуть ниже.

 

Другие методы Future

Кроме описанных методов существуют еще два метода, позволяющих узнать состояние задачи в данный момент:

1.    isDone(). Возвращает true, если обработка данной задачи завершена. В результате успешного выполнения, исключения или отмены – не важно, все эти результаты вернут true. В противном случае (задача в очереди на выполнение или выполняется прямо сейчас) – false;

2.    isCancelled(). Возвращает true, если задача была отменена. Именно этому методу можно верить в данном вопросе (в отличии от calcel()). В противном случае (задача не отменялась, отменить не удалось, завершена и т.д.) – false.

 

Как видите в методах Future нет ничего сложного. Нас, в целом, мало интересуют реализации данного интерфейса (в чистом виде), поэтому просто озвучим основные из них:

·      FutureTask – класс, реализующий RunnableFuture – наследника Future, который объединяет функциональность двух интерфейсов – Future и Runnable. Используется в ThreadPoolExecutor;

·      ForkJoinTask – абстрактный класс, основной тип, которым оперирует ForkJoinPool. Кроме методов Future предоставляет массу собственных на разные случаи жизни – от методов, предоставляющих экземпляры данного типа (adapt()) до управляющих жизненным циклом задачи в рамках пула – fork(), join(), invoke() и пр.

По сути, именно ForkJoinTask раскрывает весь потенциал, который заложен в ForkJoinPool. Важно отметить, что, как и FutureTask, конечные реализации ForkJoinTask являются также и Runnable через имплементацию RunnableFuture.

Но на этом мы не заканчиваем – сегодня мы также рассмотрим несколько наследников Future, которые развивают его функциональность для различных сценариев использования.

 

ScheduledFuture

На этом этапе стоит, вероятно, отметить, что механизм Future мало кому нужен сам по себе. Наверно, практически все конечные реализации Future в рамках java.util.concurrent являются имплементацией двух и более  интерфейсов (не считая маркерных, которые не предоставляют новой функциональности). Так, рассмотренные выше реализации в том или ином виде сводились к имплементации RunnableFuture – объединению интерфейсов Runnable и Future.

В рамках данного пункта мы рассмотрим интерфейс, с которым работает ScheduledExecutorService в рамках собственных методов (не наследуемых от ExecutorService).

Эти методы, так или иначе, сводятся к отложенному выполнению задачи (одиночному или множественному).

Возвращаемый тип этих методов – ScheduledFuture, плод жаркой любви Future и Delayed – интерфейса, предоставляющего метод получения длины задержки, который удобно ложится в концепцию отложенных задач.

ScheduledFuture не имеет публичных реализаций, но, все же, мы пройдем по цепочке наследования, чтобы понимать, что именно возвращают методы какого-нибудь ScheduledThreadPoolExecutor при постановке отложенной задачи.

Единственным наследником ScheduledFuture в java.util.concurrent является интерфейс RunnableScheduledFuture – дитя отношений между ScheduledFuture и RunnableFuture. Таким образом, это объединение уже трех интерфейсов – Future, Delayed и Runnable. А также обладатель собственного метода isPeriodic() (возвращает true, если задача повторяемая и false – если должна исполниться лишь 1 раз).

И уже данный интерфейс имеет единственную непубличную реализацию – ScheduledThreadPoolExecutor.ScheduledFutureTask, которую и возвращают scheduleAtFixedRate(), scheduleWithFixedDelay() и вариации schedule() с задержкой.

Предсказуемым, но важным нюансом ScheduledFuture является то, что его cancel() отменяет все вызовы связанной таски, а не какой-то один. Строго говоря, сам интерфейс (или его наследники) это не регламентирует, но это декларируется в ScheduledExecutorService.

 

CompletableFuture

CompletableFuture – класс, который наследует Future и новый для нас интерфейс CompletionStage, предоставляющий массу методов которые позволяют применить какие-либо действия к результату предыдущей операции, объединить результаты асинхронных операций и другие возможности по взаимодействию с асинхронными задачами.

CompletableFuture примечателен по нескольким причинам:

·      Он привносит в обработку Future функциональный стиль. Так, мы можем описать в виде цепочки вызовов, что требуется сделать с результатом операции. По сути, CompletableFuture является монадой (или чем-то очень на нее похожим). В этом плане обработка результата в CompletableFuture мало чем отличается от работы с условными Optional или Stream – только операции (функции высшего порядка) другие;

·      CompletableFuture позволяет объединять обработку асинхронных операций, оставаясь в рамках одной цепочки вызовов: так, запуск следующей партии асинхронных операций можно сделать на базе результата предыдущей операции, существуют методы, позволяющие объединить ожидание результатов разных асинхронных операций и т.д. Обобщая, CompletableFuture позволяет лаконично описывать способ выполнения как параллельных, так и последовательных асинхронных (или синхронных) операций;

·      CompletableFuture не привязан к конкретным реализациям пулов потоков или чему-то еще (как, например, ForkJoinTask тесно связан с ForkJoinPool) и является полностью самостоятельным инструментом, в отличии от большинства своих собратьев.

Мы не будем разбирать данный класс досконально – на это пришлось бы выделить несколько полноценных уроков. Число публичных методов CompletableFuture превышает 70. Поэтому ограничимся классификацией операций, парой примеров использования и задачами в практике.

Стоит отметить, что, как и любая монада в Java, CompletableFuture имеет три типа операции:

1.    Порождающие. В первую очередь:

      ·      completedFuture(). Аналог Optional.of(). Оборачивает переданное параметром значение в объект монады;

      ·      supplyAsync(). Некий аналог Stream.generate() с асинхронно выполняемым Supplier;

      ·      runAsync() – асинхронное выполнение задачи, результат которой будет представлен в виде CompletableFuture.

2.    Промежуточные. Данных операций больше всего, некоторые из них похожи между собой – как, вероятно, и в любой монаде. Примеры: thenApply(), thenCombine(), whenComplete() и другие. Для большинства промежуточных операций есть аналог с постфиксом Async, указывающий, что операция(-ии) должны быть применены асинхронно;

3.    Терминальные. Эти операции, по аналогии с другими монадами, выделаются тем, что не возвращают CompletableFuture: join(), complete() и другие.

Стоит отметить, что асинхронное выполнение в рамках CompletableFuture строится на базе ForkJoinPool.commonPool() или посредством запуска нового Thread’а на каждую асинхронную операцию – в зависимости от уровня параллелизма, доступного JVM или commonPool, если его настройки описаны на уровне JVM – так тоже можно:)

 

Рассмотрим несколько базовых примеров применения CompletableFuture.

Пример 1. Не несет особой смысловой нагрузки, описание операций в комментариях к коду.

      // Асинхронно получаем текущее (на момент выполнения) время
CompletableFuture.supplyAsync(LocalDateTime::now) 
      // Если выполнилось успешно – выводим в консоль. Некий аналог
      // Stream#peek(), тоже позволяет сделать действие и передать
      // результат дальше (обернув в новый CompletableFuture). 
      // Но позволяет обработать и сценарий, когда предыдущая
      // асинхронная операция завершилась с исключением
  .whenCompleteAsync((time, throwable) -> System.out.println(time))
      // Если выполнилось успешно – выводим в консоль.
      // Отдаленно напоминает Optional#ifPresent() – тоже производит
      // действие, при наличии результата и не позволяет дальше
      // обрабатывать этот результат. Но в отличии от
      // Optional#ifPresent() не является терминальной операцией – 
      // цепочку вызовов можно продолжить, например, для нового 
      // асинхронного вызова. Или для другой операции, которая должна 
      // выполниться только после завершения вышестоящих асинхронных, 
      // но не связана напрямую с их результатом
  .thenAccept(System.out::println) 
      // Операция та же, что и выше. Но значением принимает уже Void
  .thenAccept(v -> System.out.println("Sth final print"))
      // Указываем, что текущий поток должен дождаться выполнения
      // цепочки, прежде чем продолжить/завершить свое выполнение
  .join();


Пример 2. Запускаем асинхронную задачу, затем запускаем другую асинхронную задачу (выполняется одновременно с первой) и перемножаем результаты, когда обе задачи будут выполнены. Полученный результат приводим к строке.

Если выполнение затягивается (не выполнено до вызова терминальной операции) – возвращаем значение по умолчанию. Получившийся результат выводим в консоль.

var result = CompletableFuture.supplyAsync(() -> 2)
  .thenCombine(
      // Это тоже CompletableFuture,
      // т.е. здесь может быть вложенная цепочка, если необходимо
    CompletableFuture.supplyAsync(() -> 3),  
    (i1, i2) -> i1 * i2)
      // Аналог Stream#map(). Работает синхронно. 
      // Есть асинхронная альтернатива handleAsync() 
  .handle((r, throwable) -> r.toString())
  .getNow("0");

System.out.println(result);

В таком виде результат будет "6". Чтобы увидеть результат по умолчанию – в любую из асинхронных задач добавьте Thread.sleep().

 

Пример 3. Обработка ошибки без прерывания цепочки вызовов.

CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Sth exception");
  })
  .exceptionally(throwable -> {
    System.out.println("Ex. occurred: " + throwable.getMessage());
    return 0; // возвращаем значение по умолчанию
  })
  .thenAccept(System.out::println) //0
  .join();


Пример 4. Выполнение нескольких асинхронных операций (альтернатива императивному созданию потоков и вызову Thread#join() для них). Для параметров CompletableFuture.allOf() опустим CompletableFuture перед supplyAsync() для большей лаконичности кода:

    // CompletableFuture<Void>. Цепочка начнет обработку,
    // когда все вложенные CompletableFuture завершатся
CompletableFuture.allOf(
      supplyAsync(() ->1).thenAccept(System.out::println),
      supplyAsync(() ->2).thenAccept(System.out::println),
      supplyAsync(() ->3).thenAccept(System.out::println),
      supplyAsync(() ->4).thenAccept(System.out::println))
    // Side effect'а. Не использует результат и не влияет на него.
    // Альтернатива thenAccept()
  .thenRun(() -> System.out.println("All tasks completed!"))
  .join();

Полагаю, даже достаточно простые примеры выше позволяют оценить, насколько удобным и лаконичным может быть использование многопоточности с помощью CompletableFuture. А ведь это только самые базовые из его возможностей:)

 

С теорией на сегодня все!

Переходим к практике:

Задача 1

Реализуйте Задачу 2 урока 73. Не используйте барьер.

 

Задача 2

Реализуйте параллельный поиск максимального значения в массиве чисел.

 

Задача 3

Реализуйте цепочку асинхронных операций: получите сообщение с клавиатуры, «разверните» его, затем выведите в консоль. Для имитации «тяжелых» операций сопровождайте каждую из них усыплением потока на заданный промежуток времени.

Посмотрите, в каком потоке будет выполнена каждая из операций.


Если что-то непонятно или не получается – welcome в комменты к посту или в лс:)

Канал: https://t.me/ViamSupervadetVadens

Мой тг: https://t.me/ironicMotherfucker

 

Дорогу осилит идущий!

Report Page