Java 8 streams API and parallelism

Leveraging intermediate and terminal operators to process large collections.

In the last post in this series, we learned about functional interfaces and lambdas. In particular, we looked at the ITrade functional interface, and filtered a collection of trades to generate a specific value. Prior to the feature updates made available in Java 8, running bulky operations on collections at the same time was ineffective and often cumbersome. They were restricted from performing efficient parallel processing due to the inherent explicit iteration that was being used. There was no easy way of splitting a collection to apply a piece of functionality on individual elements that then ran on multi-core machine architectures — that is, until the addition of Java 8’s Streams API.

Let’s first take an example requirement of fetching out the biggest trade from a set collection using Java 7:

private List<Trade> preJava8LargeTrades(List<Trade> trades) {

    List<Trade> largeTrades = new ArrayList<Trade>();

    // loop through each and every trade

    for (Trade trade : trades) {

        // if it's a big trade, collect it

        if(trade.getQuantity() > MILLION)

        largeTrades.add(trade);

    }

    return largeTrades;
}

This code searches through the collection to find if a trade’s quantity is greater than the threshold. It then checks each one of the trades and loads them into another data structure if the trade’s quantity evaluates to true. As we can see, this operation is sequential and requires intermediate storage. It also exhibits tightly-coupled behavior, that is, any change to the logic cannot be fulfilled without having to re-work the code. Should we wish to add additional logic of collecting not only the big trades but also those from a specific issuer, we would need to tweak the code as follows:

for (Trade trade : trades) {

    if((trade.getQuantity() > MILLION) && 

    if (trade.getIssuer().equals(IBM)) &&

    if (trade.getStatus().equals(NEW))) {
        largeTrades.add(trade);
    }
}

To counteract these drawbacks, let’s now re-model the same requirements (i.e., collect a list of Trades based on a few different criteria, such as large, new and issuer-specific) using Java 8’s new Streams API and lambdas. The following code snippet demonstrates the logic of collecting “filtered” trades on a list structure. The method would take in a functional interface (Predicate) for support of a lambda, as shown below:

private List<Trade> filterTrades(List<Trade> trades, Predicate<Trade> filter) {

  return trades.stream()

    .filter(filter)

    .collect(Collectors.toList());

}

Now that we have all the pieces in play, a client can invoke the filterTrades method as follows:

Predicate<Trade> filteredTradesPredicate = 

  (Trade t) -> (

    t.isBigTrade() && 

    t.isNew() &&

    t.getIssuer().equals(ISSUER)

   );

// client then calls the filterTrades method with this predicate

List<Trade> bigNewIssuerTrades = filterTrades(trades, filterPredicate);

Note that the second argument filterPredicate can be modified to suit varied conditions by the client. The trades.stream() creates a pipeline of activity with a sequence of trade instances being passed through. We then apply a filter that only allows the trades to pass through with restricted quantity. Finally, we collect them all into a List by using a utility class Collector‘s toList method. Not only is this code short and succinct, it is also readable and fluent. The implicit iterations are part and parcel of the libraries and can be run in parallel by asking the library how to perform the heavy lifting while the clients ask what to do (perhaps via a lambda expression).

What are streams?

A Stream is a free flowing sequence of elements. They do not hold any storage as that responsibility lies with collections such as arrays, lists and sets. Every stream starts with a source of data, sets up a pipeline, processes the elements through a pipeline and finishes with a terminal operation. They allow us to parallelize the load that comes with heavy operations without having to write any parallel code. A new package java.util.stream was introduced in Java 8 to deal with this feature.

Streams adhere to a common Pipes and Filters software pattern. A pipeline is created with data events flowing through, with various intermediate operations being applied on individual events as they move through the pipeline. The stream is said to be terminated when the pipeline is disrupted with a terminal operation. Please keep in mind that a stream is expected to be immutable — any attempts to modify the collection during the pipeline will raise a ConcurrentModifiedException exception.

Stream operations

The pipeline of operations are classified as intermediate and terminal operators. Intermediate operators apply logic so the incoming stream produces yet another stream. A stream can have countless intermediate operators attached to it, and includes such operations as filters and maps. Intermediate operators set up the pipeline but do not begin to execute until a terminal operator is associated with the stream, which can be found at the end of the call stack. The final operation to consume the stream is a terminal operation. Examples of terminal operators include forEach, reduce and collect.

Let’s now go ahead and create a fully-functioning stream. There are various ways to get started — one can be created from a collection of lists or sets, from another stream, or simply from a list of values. The following code provides a few of the many mechanisms needed to get a stream up and running:

// Creating streams



// trades collection

List<Trade> trades = ...

// From a collection

Stream<Trade> tradesStream = trades.stream();



// from a set of objects

Stream<Object> objStream = Stream.of(new A(),new B()..); 



// From a intermediate operator - they produce streams

Stream<Account> filteredStream = stream.filter();

Stream<Integer> transfomedStream = stream.map();



// From a stream's substream

Stream<Employee> subStream = stream.substream();

Filters

We have already illustrated an example of the functionality of filters. Filters are intermediate operations that produce another stream with the same event properties by removing the unwanted items. For example, let’s find out all the big and pending trades issued by Google. To do so, we would simply create a pipeline of filters as shown below using the new fluent style API:

Stream<Trade> filteredTrades =  

  trades.stream()

    .filter(t -> t.getIssuer().equals("GOOG"))

    .filter(t-> t.getQuantity()>100000)

    .filter(t -> t.getStatus().equals("PEND"));

The filters are chained together so each one performs a specific task on each of the trades. And while we have set the filters on the stream, the execution begins when we add terminal or short cut operators such as findFirst, count, and forEach.

Stream<Trade> filteredTrades =  

  trades.stream()

    ... 

    .forEach(t -> System.out.println(t));

Maps

A stream of events can be transformed into another event by using mapping functionality. A transform operation can be performed on a stream by invoking the map method, as seen in the below code snippet:

private void mapIDsToTrades(Integer[] ids) {

  Stream.of(ids)

    .map(id -> getTradeById(id))

    .forEach(System.out::println);

}

We create a stream from IDs and then use the map method to fetch the trades. The map method is expecting a lambda, which calls a getTradeById for each of the IDs that have been picked up from the stream. Before applying this transformation, the stream exists as a set of integers, while after the map method has been implemented it morphs into a Trade instance. The forEach operator then prints out each of the trades.

Reduce

Performing the reduce method on a stream is useful to achieve a single accumulated figure. For example, if our requirement is to fetch the aggregated quantity of a set of trades, we can use the reduce method as shown below:

BinaryOperator<Integer> adder = (a,b) -> a+b;



Integer aggQty = trades.stream()

  .map(t->t.getQuantity())

  .reduce(0,adder);

The reduce method accepts a seed number (called an identity) and the BinaryOperator. A BinaryOperator is a pre-defined function that takes two operands of the same type and produces the same result. In the above example, the operator sums up the two integers. The reduce method starts with adding zero to the first trade’s quantity (you can always change the seed number to suit your individual requirements). The resulting sum is then added to the next trade’s quantity, and so on — it achieves the aggregated quantity by accumulating the quantities. This can be a very helpful method so we don’t have to worry about the variable’s state corruption.

Collect

We can also create additional data storage from the stream by using the collect method. For example, here’s what the code would look like if we needed to have some trades tapped into a list:

import static java.util.stream.Collectors.toList;



List<Integer> list = trades.stream()

  .map(t->t.getQuantity())

  .collect(Collectors.toList());      

Parallel streams

Distributing and performing parallel operations on collections can lead to threading issues as they are not “thread-safe” by nature. As a result, we may have to introduce thread safety nets such as locks and synchronization techniques, which, in turn, may produce starvation and other thread-contention issues. Java 8 uses parallel streams to partition streams into smaller sub-tasks, and distributes them onto multi-core processors where they are joined up with the aggregated sum. Implementing parallelism on multi-cores is now just a matter of invoking a parallel command on the stream itself:

private void runParallel() {

  trades

   .stream()

   .parallel().forEach(t->doSomething(t) );

}

By adding a parallel() method to the stream, we are asking the library to deal with the complexities of threading. The library will take charge and control the process of forking out the tasks by utilizing and running the operations at the same time on all cores using whatever strategy it can find.

The ordering of the tasks is then left to the Java Runtime. Therefore, when we run the example of performing a parallel task on a set of trades, the trades will not be run in order — they will be split into appropriate chunks and distributed to different threads. Some of them may finish earlier than others so the ordering is not considered. We could still use the forEachOrdered method, but we would lose out on free parallelism.

Summary

In this post, we delved deeper into the core functionality underlying Java 8’s new Streams API. We learned about the many different kinds of intermediate and terminal operations that make up a stream, such as filters, mapping, and collecting. We also discovered how easy it is to use parallel streams to create smaller sub-tasks within collections. Going forward, every Java developer should have a solid knowledge and understanding of streams — and lambdas more broadly — in order to keep their code neat and simple.


This post is the last of a three-part exploration of Java 8 features.

tags: , , , , ,