Stream API. Промежуточные операции

Stream API. Промежуточные операции


В прошлом уроке мы познакомились с классификацией операций в Stream API. В рамках текущего – разберем конкретные промежуточные операции, которые предлагает Stream.

Stateless операции

Сюда, как мы помним, относятся все промежуточные операции, которым не требуется для работы доступ к более чем одному элементу одновременно. Но начнем мы с тех stateless операций, которым вообще не обязательно знать об элементах Stream’а. Эти операции декларируются в базовом интерфейсе Stream API – BaseStream. Данный интерфейс является предком для Stream, IntStream и других интерфейсов стримов (и их реализаций).

sequential()

Данный метод возвращает «однопоточный» Stream. Если данный стрим до этого был «параллельным» - предполагалось, что его допустимо обрабатывать несколькими потоками выполнения одновременно, то данная операция регламентирует, что все дальнейшие операции с эти стримом должны происходить в рамках одного потока выполнения. Это уменьшает скорость обработки стрима, но может быть необходимо, исходя из соображений потокобезопасности (об этом – в рамках раздела «Многопоточность») или других причин.
Если стрим до вызова операции не был параллельным – вернет текущий стрим без изменений.

parallel()

Операция, обратная sequential(). Декларирует, что дальнейшие операции над элементами стрима можно совершать параллельно в нескольких потоках. Такой подход призван ускорить обработку данных, но имеет ряд проблем и недостатков, которые зачастую могут быть критичнее предполагаемых преимуществ. Подробнее будем разбираться в теме «Многопоточность».
По аналогии, если Stream уже был параллельным – вернет текущий стрим без изменений.

unordered()

Данный метод декларирует, что дальнейшие операции над элементами стрима могут производиться без соблюдения порядка элементов. Это может быть как порядок, заданный источником данных (скажем, List’ом), так и порядок, определенный в результате операции сортировки элементов стрима – sorted().
Опять же, поддержание порядка обработки может требовать лишних ресурсов или накладывать определенные ограничения. Таким образом, если порядок обработки элементов для вас не критичен и вы предполагаете, что его сохранение замедляет работу (обычно актуально для параллельных стримов) – этот метод для вас.

onClose()

В общем-то, с этим методом мы познакомились в рамках прошлого урока. Принимает параметром лямбду – Runnable. Все переданные таким образом лямбда-выражения будут выполнены при вызове терминальной операции close().
Данные лямбда-выражения будут обработаны в порядке их передачи в стрим. Более того – все они будут выполнены, даже если какое-то из них завершится с исключением.
Вы также можете самостоятельно разобраться, как именно исключения, возникшие при обработках этих лямбда-выражений, будут обработаны (когда выброшены и каким образом оформлены). Данный механизм достаточно грациозен. В рамках статьи не вижу смысла уделять ему время, в силу крайней узости области применения onClose() на практике. Однако рекомендую почитать документацию для общего развития. Там немного.

 

Теперь перейдем к «классическим» промежуточным stateless операциям. Их можно кратко описать как

Описание громоздкое, но под ним кроются уже привычные нам map(), flatMap() и пр.

filter()

Принимает параметром лямбду-предикат. Общее назначение совпадает с таковым у Optional: отфильтровывает элементы, которые будут обрабатываться в дальнейшем. Соответственно, операции, расположенные после операции filter(), будут применяться только к элементам стрима, для которых выполнение предиката вернуло true.

Пример:

Stream.of(1, 2, 3, 4)
  .filter(i -> i % 2 == 0) //true только для четных чисел
  .forEach(System.out::print) //24

map()

Еще одна операция, назначение которой знакомо нам по Optional. Заменяет каждый элемент стрима другим на основании переданного параметром лямбда-выражения. Условно разберем, как выглядят элементы стрима в результате каждой из операций:

Stream.of(1, 2, 3, 4) //1, 2, 3, 4
  .map(i -> i * 2) //2, 4, 6, 8
  .map(Object::toString) //"2", "4", "6", "8"
  …

Важной особенностью операции map() является то, что она всегда сопоставит одному элементу «на вход» ровно один элемент «на выход». Таким образом, число элементов стрима не изменяется в результате вызова map().

mapToInt(), mapToLong(), mapToDouble()

Методы, по назначению схожие с map(). Но возвращают не Stream, а IntStream, LongStream и DoubleStream соответственно. Как следствие, возвращаемый тип лямбда выражения для данных операций должен быть int, long и double соответственно.
На самом деле, каждый из этих методов принимает параметром свой функциональный интерфейс. Но внешне он не отличается (игнорируя строго заданный возвращаемый тип) от Function – типа параметра map().

flatMap()

Операция с таким названием нам тоже знакома по Optional, но для Stream она имеет серьезные отличия в плане результирующего значения, оставаясь идентичной в плане общей концепции.
Итак, если лямбда, передаваемая в Optional.flatMap() должна возвращать новый Optional, то лямбда, передаваемая в Stream.flatMap() – возвращает новый Stream. Это логично.

Первая особенность: новый стрим, возвращаемый из flatMap() может иметь различное число значений – от нуля (пустой стрим), до бесконечного потока. Это является ярким отличием от map(), где одному элементу «на вход» всегда соответствует один элемент «на выход».
Вторая особенность: в отличии от Optional, Stream изначально может содержать различное число элементов – от нуля до бесконечности. Соответственно, «результирующий» стрим будет выглядеть как полный набор элементов, возвращенных всеми flatMap().

!NB: Напомню, что не стоит относиться к стриму как к коллекции, особенно на этапе промежуточных операций, но подобные аналогии могут дать общее представление преобразований элементов.

Рассмотрим пример:

//4 элемента типа List
Stream.of(List.of(), List.of(1), List.of(1, 2), List.of(1, 2, 3))
  .flatMap(List::stream) //1, 1, 2, 1, 2, 3
  …

Данный пример показателен, но очевиден. Как сложноизвращенный сценарий, попытайтесь представить, как будет выглядеть дальнейшая цепочка обработки стрима, если результат каждой из flatMap() – бесконечный стрим (например, обертка над потоками ввода). И, главное, что будет при ограничении такого стрима промежуточными short-circuiting операциями.

flatMapToInt(), flatMapToLong(), flatMapToDouble()

Полагаю, вы уже догадались:)

Аналоги flatMap(), возвращающие соответствующие стримы примитивов. Если возвращаемое значение лямбды в flatMap()Stream, то здесь – IntStream, LongStream и DoubleStream соответственно.

mapMulti()

Достаточно сложная для понимания обертка над flatMap(). Появилась в Java 16. Параметр данного метода представляет собой функциональный интерфейс, первым аргументом которого является текущий элемент, а вторым – Consumer – другой функциональный интерфейс.

Consumer – функциональный интерфейс, являющийся типом параметра, например, forEach().

По задумке разработчиков языка, данная операция должна была покрыть сценарии преобразования элемента в стрим, которые не мог покрыть обычный flatMap() (по крайней мере, покрыть лаконично). Кроме того, в ряде случаев эта операция намного более оптимальна как по времени выполнения, так и по памяти, нежели flatMap().

К сценариями использования можно отнести рекурсивное получение элементов на базе текущего. Например, каждый элемент стрима – корень дерева, результат mapMulti() – стрим из всех элементов всех деревьев. Пример взят из головы, не исключаю, что реализация с помощью mapMulti() будет не самой лаконичной.

Второй сценарий использования – это классический сценарий использования flatMap(). Все те ситуации, когда результат map() – стрим контейнеров.
Контейнером в данном случае может быть Optional или другая монада. Или коллекция (тут есть нюансы).
Если мы хотим в конечном итоге получить стрим элементов, хранящихся в контейнере – mapMulti() сделает это лаконично и, зачастую, оптимальнее, чем flatMap().

Рассмотрим такой пример. Допустим, нам необходимо обработать стрим Optional’ов. Ситуация неприятная, но рядовая. Результатом работы стрима будем считать вывод элементов, хранящихся в Optional'ах, в консоль.

Классическое решение для Java 8 выглядело так:

Stream.of(Optional.of(1), Optional.of(2), Optional.of(3))
  .filter(Optional::isPresent)
  .map(Optional::get)
  .forEach(System.out::println);

В Java 9 Optional’у добавили метод stream(), возвращающий Stream из одного элемента, если Optional не пуст, в противном случае – Stream.empty(). Решение стало лаконичнее:

Stream.of(Optional.of(1), Optional.of(2), Optional.of(3))
  .flatMap(Optional::stream)
  .forEach(System.out::println);

Но это решение приводит к созданию объекта стрима на каждый Optional, даже пустой. Ведь Stream.empty() – не константа. На большом объеме данных это неприятно.

В свою очередь, решение с помощью mapMulti() выглядит так:

Stream.of(Optional.of(1), Optional.of(2), Optional.of(3))
  .<Integer>mapMulti(Optional::ifPresent)
  .forEach(System.out::println);

Во-первых, обратите внимание: мы можем указать тип элементов нового стрима в <>. В целом, это не что-то новое, просто раньше такая функциональность нам не требовалась.

Чтобы с ней разобраться, рассмотрим простой пример:

ublic class Main {
  private <R> R getCastedNull() {
    return null;
  }

  public static void main(String[] args) {
    String s1 = Main.<String>getCastedNull(); //избыточно
    var s2 = Main.<String>getCastedNull();// тип, на который будет
                                          // заменен var зависит от 
                                          // класса внутри <>
  }
}

В целом, такая форма записи доступна любому параметризованному методу. Но почти никогда не востребована.

Вернемся к mapMulti(). Форма использования выше все еще непонятна:

mapMulti(Optional::ifPresent)

Выглядит, будто мы лямбдой передаем метод, который возвращает void. Откуда же берутся новые значения Stream’а?
Дело в том, что тип параметра mapMulti(), выглядит так:

BiConsumer<? super T, ? super Consumer<R>>

Если мы наложим вайлдкарды на метод функционального интерфейса BiConsumer, получится следующее:

void accept(<? super T> t, <? super Consumer<R>> u)

t – это объект любого типа, в случае с mapMulti() – элемент стрима.

u – наследник функционального интерфейса Consumer. В случае с mapMulti(), является целым публичным классом, реализующим Consumer, который «складывает» элементы, полученные в результате обработки лямбды BiConsumer внутри себя.

R в данном случае – тип результата. То, что мы укажем в <> перед mapMulti().

Таким образом, получается, что наша задача – написать лямбда-выражение, которое будет указывать, какие элементы добавлять в «хранилище» новых элементов. На основании этих новых элементов и сформируется новый стрим.

В нашем примере,

mapMulti(Optional::ifPresent)

идентично

mapMulti((optional, sink) -> optional.ifPresent(sink::accept))

Здесь optional – элемент стрима, sink – объект того самого наследника Consumer, который хранит новые элементы.

Данный метод является достаточно красивым примером применения функций высшего порядка в Java. К сожалению, он тяжел для восприятия и не популярен, вероятно, потому что не является интуитивно-понятным. Надеюсь, мои объяснения позволили вам разобраться в том, что лежит «под капотом» данной операции.

mapMultiToInt(), mapMultiToLong(), mapMultiToDouble()

Аналоги mapMulti(), возвращающие стримы примитивов.
Если mapMulti() – обертка для flatMap(), то  mapMultiToInt() – обертка для flatMapToInt() и т.д.
К слову, когда я говорю «обертка» - это действительно обертка. mapMulti() и подобные вызывают внутри себя flatMap() (или его аналоги для стримов-примитивов).

peek()

Последняя из stateless операций. Принимаем параметром лямбда-выражение, которое производит какие-либо действия на основе элемента стрима, не изменяя состав элементов. Как я говорил в предыдущем уроке, данную операцию можно рассматривать как промежуточный forEach().
Может использоваться для логирования, изменения переменных вне стрима (нежелательно), для изменения полей элемента стрима (тоже не очень хорошо) или другой логики, в зависимости от конкретной задачи.

 

Stateful операции

Как мы помним из предыдущего урока, операциями с сохранением состояния считаются те, которым для работы требуется знать больше информации, чем значение обрабатываемого элемента стрима.
Состав информации зависит от конкретной операции, может сохраняться с использованием различных буферов и др. инструментов. Но подобный уровень погружения выходит за пределы текущего курса.
Пока же рассмотрим, какие промежуточные stateful-операции существуют для Stream.

distinct()

Возвращает Stream, в котором остаются только уникальные элементы. Пример:

Stream.of(4, 3, 2, 1, 2, 3, 4, 1, 2, 3, 4)
  .distinct()
  .forEach(System.out::print); //4321

Уникальность элемента определяется по equals(). Соответственно, данная операция вряд ли имеет смысл, если у элементов вашего стрима данный метод не переопределен.

sorted()

Определяет порядок элементов стрима на основании сортировки. Имеет две реализации: с параметром (типа Comparator) и без.
Реализация без параметра будет сортировать элементы в естественном порядке (на основании Comparable.compareTo()). Если элементы не имплементируют Comparable – такой стрим выбросит ClassCastException в месте вызова терминальной операции у данного стрима. До вызова терминальной операции sorted(), как и другие промежуточные операции, не будут выполняться, соответственно, об ошибке не будет известно.
Реализация с параметром мало чем отличается по своем логике от List.sort(), например. Отсортирует элементы стрима в соответствии с переданным компаратором.

skip()

Принимает параметром число типа long. Возвращает стрим без первых N элементов, где N – число, переданное параметром. Для неупорядоченных стримов, например, после вызова unordered(), или не имевших строго порядка элементов изначально, может вести себя непредсказуемо, особенно при параллельной обработке.
Также является весьма дорогой операцией для параллельных стримов, с заданным порядком обработки (ordered или sorted – не имеет значения).

Пример:

Stream.of(1, 2, 3, 4)
  .skip(2)
  .forEach(System.out::print); //34

Если число пропускаемых элементов больше, чем число элементов в стриме – будет возвращен пустой стрим.

dropWhile()

Операция, «отбрасывающая» элементы стрима до тех пор, пока не найдется элемент, не подходящий под условие предиката, переданного параметром.

Рассмотрим два примера:

Пример 1. Отбрасываем элементы до тех пор, пока не попадется НЕчетный:

Stream.of(1, 2, 3, 4)
  .dropWhile(i -> i % 2 == 0)
  .forEach(System.out::print); // 1234

Первый же элемент оказывается нечетным, никакие элементы не отбрасываются.

Пример 2. Отбрасываем элементы до тех пор, пока не попадется четный:

Stream.of(1, 2, 3, 4)
  .dropWhile(i -> i % 2 != 0)
  .forEach(System.out::print); // 234

Первый элемент нечетный, отбрасывается. Второй элемент четный.

dropWhile() напоминает filter(), но далеко не идентичен ему. Его можно представить как filter()-наоборот (удаляет элементы, подходящие под условие), который работает до первого false (первого элемента, не подошедшего под условие).

Поведение данной операции при применении к unordered стриму не прогнозируемо. Стоит использовать только если порядок обработки определен (ordered или sorted). Либо если в рамках вашей задачи результат вызова этого метода не критичен, но это достаточно тяжело представить.

 

 На этом разбор stateful операций завершен. Переходим к последней группе промежуточных операций

 

Short-circuiting операции

Операции короткого замыкания в рамках промежуточных – те, которые могут из бесконечного стрима сделать конечный.

Некоторые ошибочно относят к этой группе dropWhile(), но, полагаю, очевидно, что он не может ограничить бесконечный стрим – лишь отсечь часть элементов в его начале.

Промежуточных short-circuiting операции всего две.

limit()

Принимает параметром число типа long. Возвращает стрим из первых N элементов исходного стрима, либо весь исходный стрим, если число его элементов меньше N. В данном случае, N – число, переданное параметром.

Полагаю, очевидно, что поведение для unordered стрима не гарантированно – при нескольких выполнениях на одних и тех же данных в результирующий стрим каждый раз может попасть разный набор элементов. В зависимости от задачи, это может быть некритично.

takeWhile()

Операций, вызывающая ассоциацию с dropWhile().

Принимает параметром предикат. Возвращает стрим из элементов, которые были обработаны до первого элемента, не выполнившего условие предиката.
Рассмотрим несколько примеров:

Пример 1.1. Выводим на экран элементы до тех пор, пока они четные (== пока не встретим нечетный):

Stream.of(1, 2, 3, 4)
  .takeWhile(i -> i % 2 == 0)
  .forEach(System.out::print); // *nothing*

Первый же элемент оказался нечетным, ни один элемент не будет пропущен дальше.

Пример 1.2. Те же условия, но другой набор данных:

Stream.of(2, 4, 2, 6, 3, 4)
  .takeWhile(i -> i % 2 == 0)
  .forEach(System.out::print); // 2426

Выводили элементы на экран, пока не встретили 3 – нечетное число.

Пример 2. Выводим на экран элементы до тех пор, пока они нечетные (== пока не встретим четный):

ream.of(1, 2, 3, 4)
  .takeWhile(i -> i % 2 != 0)
  .forEach(System.out::print); // 1

Первый элемент – 1 – подходит под условие и выводится на экран. Следующий – 2 – не подходит, все остальные элементы будут проигнорированы.

Результат этого метода не прогнозируем для unordered-стрима.

 

Итог

Сегодня мы рассмотрели все промежуточные операции, доступные интерфейсу Stream. Зачастую, при обработке Stream’ом, наиболее сложная задача – правильно реализовать терминальную операцию. В конце концов, именно она дает результат, ради которого стрим был использован.

Но именно владение промежуточными операциями позволяет из исходного набора данных получить тот, который нужен для получения конечного результата. И именно из промежуточных операций состоят функциональные цепочки.

Поэтому, несмотря на то что впереди остаются важные и, местами, сложные темы по Stream API, большая часть материала уже изучена. И именно в рамках изученной части происходит большая часть логики обработки стримов.

 

С теорией на сегодня все!

Урок теоретический. Но рекомендую продолжать нарешивать задачи по Stream API. Не стесняйтесь делиться ссылками на практику или единичными задачами в комментариях. Чем больше задач будет решено сейчас, тем легче будет влиться в более-менее современный проект потом.

Если что-то непонятно или не получается – welcome в комменты к посту или в лс:)

Канал: https://t.me/+relA0-qlUYAxZjI6

Мой тг: https://t.me/ironicMotherfucker

 

Дорогу осилит идущий!

Report Page