1   package eu.fbk.knowledgestore.data;
2   
3   import java.io.Closeable;
4   import java.io.IOException;
5   import java.lang.reflect.Array;
6   import java.lang.reflect.Method;
7   import java.util.Arrays;
8   import java.util.Collection;
9   import java.util.Collections;
10  import java.util.Comparator;
11  import java.util.Enumeration;
12  import java.util.Iterator;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.NoSuchElementException;
16  import java.util.Set;
17  import java.util.SortedSet;
18  import java.util.concurrent.ArrayBlockingQueue;
19  import java.util.concurrent.BlockingQueue;
20  import java.util.concurrent.Callable;
21  import java.util.concurrent.Future;
22  import java.util.concurrent.ScheduledFuture;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  import java.util.concurrent.atomic.AtomicLong;
26  import java.util.concurrent.atomic.AtomicReference;
27  
28  import javax.annotation.Nullable;
29  
30  import com.google.common.base.Function;
31  import com.google.common.base.Functions;
32  import com.google.common.base.Preconditions;
33  import com.google.common.base.Predicate;
34  import com.google.common.base.Throwables;
35  import com.google.common.collect.ArrayListMultimap;
36  import com.google.common.collect.ImmutableCollection;
37  import com.google.common.collect.ImmutableList;
38  import com.google.common.collect.ImmutableListMultimap;
39  import com.google.common.collect.ImmutableMap;
40  import com.google.common.collect.ImmutableSet;
41  import com.google.common.collect.ImmutableSortedSet;
42  import com.google.common.collect.Iterables;
43  import com.google.common.collect.Iterators;
44  import com.google.common.collect.ListMultimap;
45  import com.google.common.collect.Lists;
46  import com.google.common.collect.Maps;
47  import com.google.common.collect.Multimap;
48  import com.google.common.collect.Ordering;
49  import com.google.common.collect.Sets;
50  import com.google.common.collect.UnmodifiableIterator;
51  import com.google.common.util.concurrent.Futures;
52  
53  import org.openrdf.model.URI;
54  import org.openrdf.query.BindingSet;
55  import org.slf4j.Logger;
56  import org.slf4j.LoggerFactory;
57  
58  import info.aduna.iteration.CloseableIteration;
59  import info.aduna.iteration.Iteration;
60  
61  import eu.fbk.knowledgestore.internal.Util;
62  
63  // Metadata can be made available by some Stream implementations or can be attached to a
64  // Stream via
65  
66  /**
67   * A stream of typed elements that can be consumed at most once.
68   * <p>
69   * A Stream returns a sequence of elements (e.g., the results of a retrieval API operation) one at
70   * a time, after which it is no more usable, allowing manipulation and consumption of elements it
71   * in a number of way.
72   * </p>
73   * <p>
74   * A Stream can be created:
75   * </p>
76   * <ul>
77   * <li>using one of the static {@code create()} factory methods starting from an
78   * {@link #create(Object...) array}, an {@link #create(Iterable) Iterable}, an
79   * {@link #create(Iterator) Iterator}, a Sesame {@link #create(Iteration) Iteration} or an
80   * {@link #create(Enumeration) Enumeration} of elements;</li>
81   * <li>concatenating zero or more Streams or Iterables, via {@link #concat(Iterable)} or
82   * {@link #concat(Iterable...)};</li>
83   * <li>subclassing the Stream class and overriding at least one of protected methods
84   * {@link #doIterator()} and {@link #doToHandler(Handler)} (the default implementation of each of
85   * them is based on the implementation of the other one).</li>
86   * </ul>
87   * <p>
88   * While the first method supports the usual <i>external iteration</i> / pull-like pattern based
89   * on {@code Iterator}s, the second supports an <i>internal iteration</i> / push-like pattern
90   * where iterated elements are forwarded to an {@code Handler} and control of the iteration is
91   * maintained by the {@code Stream}. Although implementing external iteration should be preferred
92   * (as more flexible and immediately convertible to internal iteration), there are cases where
93   * only internal iteration is feasible (e.g., because more simple to implement, or because the
94   * ultimate source of the iterated elements is a third party library that supports only an
95   * internal iteration / push like approach, such as Sesame parsers). In these cases, conversion
96   * from internal iteration to external iteration is automatically performed by the {@code Stream}
97   * using a background thread and an element queue.
98   * </p>
99   * <p>
100  * Two main families of operations are offered: <i>intermediate operations</i> and <i>terminal
101  * operations</i> (as in JDK 8 streams).
102  * </p>
103  * <p>
104  * Intermediate operations</i> return new Stream wrappers that lazily manipulate elements of a
105  * source Stream while they are traversed, or otherwise enrich the source Stream, and can be
106  * chained in a fluent style. These operations are:
107  * <ul>
108  * <li>element filtering, via {@link #filter(Predicate) sequential} or
109  * {@link #filter(Predicate, int) parallel} {@code filter()} methods;</li>
110  * <li>element transformation, via {@link #transform(Function) sequential} or
111  * {@link #transform(Function, int) parallel} {@code transform()} methods;</li>
112  * <li>duplicate removal, via {@link #distinct()};</li>
113  * <li>element slicing, via {@link #slice(long, long)};</li>
114  * <li>element chunking, via {@link #chunk(int)};</li>
115  * <li>iteration timeout, via {@link #timeout(long)}.</li>
116  * </ul>
117  * </p>
118  * <p>
119  * Terminal operations consume the elements of the stream. At most a terminal operation can be
120  * invoked on a Stream, reflecting the fact that elements can be accessed only once. Before
121  * invoking it, the Stream is {@link #isAvailable() available}; after invoking it, the Stream is
122  * <i>consumed</i> and no other intermediate or terminal operation can be called on it. Terminal
123  * operations are:
124  * <ul>
125  * <li>element counting, via {@link #count()};</li>
126  * <li>external, pull-style iteration, via {@link #iterator()};</li>
127  * <li>internal, push-style iteration, via {@link #toHandler(Handler)};</li>
128  * <li>array construction, via {@link #toArray(Class)};</li>
129  * <li>immutable List construction, via {@link #toList()};</li>
130  * <li>immutable Set construction, via {@link #toSet()};</li>
131  * <li>immutable sorted List construction, via {@link #toSortedList(Comparator)};</li>
132  * <li>immutable SortedSet construction, via {@link #toSortedSet(Comparator)};</li>
133  * <li>Map {@link #toMap(Function, Function, Map) population} or immutable Map
134  * {@link #toMap(Function, Function) construction}, via {@code toMap()} methods;</li>
135  * <li>Multimap {@link #toMultimap(Function, Function, Multimap) population} or immutable Multimap
136  * {@link #toMultimap(Function, Function) construction}, via {@code toMultimap()} methods;</li>
137  * <li>unique element extraction, via {@link #getUnique()} and {@link #getUnique(Object)};</li>
138  * </ul>
139  * </p>
140  * <p>
141  * Streams are thread safe. They implement the {@link Iterable} interface, hence can be used in
142  * enhanced {@code for} loops, and the {@code Closeable} interface, as they may wrap underlying
143  * resources (e.g., network connections) that need to be closed. Closing a Stream causes the call
144  * of method {@link #doClose()}, which can be overridden by subclasses to perform additional
145  * cleanup. Note that closing a Stream will cause any pending terminal operation to be interrupted
146  * (on a best effort basis), resulting in an exception being thrown by that operation. Also,
147  * completion of a terminal operation automatically causes the Stream to be closed (as it cannot
148  * be used in any other way).
149  * </p>
150  *
151  * @param <T>
152  *            the type of element returned by the Stream
153  */
154 public abstract class Stream<T> implements Iterable<T>, Closeable {
155 
156     private static final Logger LOGGER = LoggerFactory.getLogger(Stream.class);
157 
158     private static final Object EOF = new Object();
159 
160     final State state;
161 
162     /**
163      * Constructor for use by non-delegating sub-classes.
164      */
165     protected Stream() {
166         this(new State());
167     }
168 
169     Stream(final State state) {
170         this.state = state;
171         state.closeObjects.add(this);
172     }
173 
174     /**
175      * Creates a new Stream for the supplied elements. Use this method without arguments to
176      * produce an <i>empty</i> Stream, or with a single argument to produce a <i>singleton</i>
177      * Stream.
178      *
179      * @param elements
180      *            the elements to be returned by the Stream, not null
181      * @param <T>
182      *            the type of elements
183      * @return a Stream over the supplied elements
184      */
185     public static <T> Stream<T> create(@SuppressWarnings("unchecked") final T... elements) {
186         if (elements.length == 0) {
187             return new EmptyStream<T>();
188         } else if (elements.length == 1) {
189             return new SingletonStream<T>(elements[0]);
190         } else {
191             return new IteratorStream<T>(Iterators.forArray(elements));
192         }
193     }
194 
195     /**
196      * Creates a new Stream over the elements of the supplied {@code Iterable}. If the supplied
197      * {@code Iterable} is a {@code Stream}, it will be returned unchanged. If the supplied
198      * {@code Iterable} implements {@code Closeable}, method {@link Closeable#close()} will be
199      * called when the {@code Stream} is closed.
200      *
201      * @param iterable
202      *            an {@code Iterable} of non-null elements, possibly empty but not null
203      * @param <T>
204      *            the type of element
205      * @return a {@code Stream} over the elements in the {@code Iterable}
206      */
207     @SuppressWarnings("unchecked")
208     public static <T> Stream<T> create(final Iterable<? extends T> iterable) {
209         if (iterable instanceof Stream) {
210             return (Stream<T>) iterable;
211         } else if (iterable instanceof ImmutableCollection<?>
212                 && ((ImmutableCollection<? extends T>) iterable).isEmpty()) {
213             return new EmptyStream<T>();
214         } else {
215             return new IterableStream<T>(iterable);
216         }
217     }
218 
219     /**
220      * Creates a new Stream over the elements returned by the supplied Iterator.
221      *
222      * @param iterator
223      *            an Iterator returning non-null elements
224      * @param <T>
225      *            the type of elements
226      * @return a Stream over the elements returned by the supplied Iterator
227      */
228     public static <T> Stream<T> create(final Iterator<? extends T> iterator) {
229         if (iterator.hasNext()) {
230             return new IteratorStream<T>(iterator);
231         } else {
232             return new EmptyStream<T>();
233         }
234     }
235 
236     /**
237      * Creates a new Stream over the elements returned by the supplied Sesame Iteration.
238      *
239      * @param iteration
240      *            the Iteration
241      * @param <T>
242      *            the type of elements
243      * @return a Stream over the elements returned by the supplied Iteration
244      */
245     public static <T> Stream<T> create(final Iteration<? extends T, ?> iteration) {
246         return new IterationStream<T>(iteration);
247     }
248 
249     /**
250      * Creates a new Stream over the elements returned by the supplied Enumeration.
251      *
252      * @param enumeration
253      *            an Enumeration of non-null elements
254      * @param <T>
255      *            the type of elements
256      * @return a Stream over the elements returned by the supplied Enumeration
257      */
258     public static <T> Stream<T> create(final Enumeration<? extends T> enumeration) {
259         if (enumeration.hasMoreElements()) {
260             return new IteratorStream<T>(Iterators.forEnumeration(enumeration));
261         } else {
262             return new EmptyStream<T>();
263         }
264     }
265 
266     /**
267      * Returns a Stream concatenating zero or more Iterables. If an input Iterable is a Stream, it
268      * is closed as soon as exhausted or as iteration completes.
269      *
270      * @param iterables
271      *            an Iterable with the Iterables or Streams to concatenate
272      * @param <T>
273      *            the type of elements
274      * @return the resulting concatenated Stream
275      */
276     public static <T> Stream<T> concat(final Iterable<? extends Iterable<? extends T>> iterables) {
277         return new ConcatStream<Iterable<? extends T>, T>(create(iterables));
278     }
279 
280     /**
281      * Returns a Stream concatenating zero or more Iterables. If an input Iterable is a Stream, it
282      * is closed as soon as exhausted or as iteration completes.
283      *
284      * @param iterables
285      *            a vararg array with the Iterables or Streams to concatenate
286      * @param <T>
287      *            the type of elements
288      * @return the resulting concatenated Stream
289      */
290     @SafeVarargs
291     public static <T> Stream<T> concat(final Iterable<? extends T>... iterables) {
292         return new ConcatStream<Iterable<? extends T>, T>(create(iterables));
293     }
294 
295     /**
296      * Intermediate operation returning a Stream with only the elements of this Stream that
297      * satisfy the specified predicate. If {@code parallelism > 1}, up to {@code parallelism}
298      * evaluations of the predicate are simultaneously performed in background threads for greater
299      * throughput.
300      *
301      * @param predicate
302      *            the predicate, never called with a null input
303      * @param parallelism
304      *            the parallelism degree, i.e., the maximum number of predicate evaluations that
305      *            can be performed in parallel (if <= 1 no parallel evaluation will be performed)
306      * @return a Stream over the elements satisfying the predicate
307      */
308     public final Stream<T> filter(final Predicate<? super T> predicate, final int parallelism) {
309         synchronized (this.state) {
310             checkState();
311             return new FilterStream<T>(this, parallelism, predicate);
312         }
313     }
314 
315     /**
316      * Intermediate operation returning a Stream with the elements obtained from the ones of this
317      * Stream by applying the specified transformation function. Method
318      * {@link Function#apply(Object)} is called to transform each input element into an output
319      * element. If {@code parallelism > 1}, up to {@code parallelism} evaluations of the function
320      * are simultaneously performed in background threads for greater throughput. Note that the
321      * function is never called with a null input; in case it returns a null output, it is ignored
322      * and iteration moves to the next element (this feature can be used to combine filtering with
323      * transformation).
324      *
325      * @param function
326      *            the function, not null
327      * @param parallelism
328      *            the parallelism degree, i.e., the maximum number of function evaluations that
329      *            can be performed in parallel (if <= 1 no parallel evaluation will be performed)
330      * @param <R>
331      *            the type of transformed elements
332      * @return a Stream over the transformed elements
333      */
334     public final <R> Stream<R> transform(final Function<? super T, ? extends R> function,
335             final int parallelism) {
336         synchronized (this.state) {
337             checkState();
338             return new TransformElementStream<T, R>(this, parallelism, function);
339         }
340     }
341 
342     /**
343      * Intermediate operation returning a Stream with the elements obtained by applying the
344      * supplied transformation functions to the {@code Iterator} and {@code Handler} returned /
345      * accepted by this Stream. At least a function must be supplied. If both of them are
346      * supplied, make sure that they perform the same transformation.
347      *
348      * @param iteratorFunction
349      *            the transformation function responsible to adapt the {@code Iterator} returned
350      *            by this Stream
351      * @param handlerFunction
352      *            the transformation function responsible to adapt the {@code Handler} accepted by
353      *            this Stream
354      * @param <R>
355      *            the type of transformed elements
356      * @return a Stream over the transformed sequence of elements
357      */
358     public final <R> Stream<R> transform(
359             @Nullable final Function<Iterator<T>, Iterator<R>> iteratorFunction,
360             @Nullable final Function<Handler<R>, Handler<T>> handlerFunction) {
361         synchronized (this.state) {
362             checkState();
363             return new TransformSequenceStream<T, R>(this, iteratorFunction, handlerFunction);
364         }
365     }
366 
367     /**
368      * Intermediate operation returning a Stream with the elements obtained by applying an
369      * optional <i>navigation path</i> and conversion to a certain type to the elements of this
370      * Stream. The path is a sequence of keys ({@code String}s, {@code URI}s, generic objects)
371      * that are applied to {@code Record}, {@code BindingSet}, {@code Map} and {@code Multimap}
372      * elements to extract child elements in a recursive fashion. Starting from an element
373      * returned by this stream, the result of this navigation process is a list of (sub-)child
374      * elements that are converted to the requested type (via {@link Data#convert(Object, Class)})
375      * and concatenated in the resulting stream; {@code Iterable}s, {@code Iterator}s and arrays
376      * found during the navigation are exploded and their elements individually considered. The
377      * {@code lenient} parameters controls whether conversion errors should be ignored or result
378      * in an exception being thrown by the returned Stream.
379      *
380      * @param type
381      *            the class resulting elements should be converted to
382      * @param lenient
383      *            true if conversion errors should be ignored
384      * @param path
385      *            a vararg array of zero or more keys that recursively select the elements to
386      *            return
387      * @param <R>
388      *            the type of resulting elements
389      * @return a Stream over the elements obtained applying the navigation path and the conversion
390      *         specified
391      */
392     public final <R> Stream<R> transform(final Class<R> type, final boolean lenient,
393             final Object... path) {
394         synchronized (this.state) {
395             checkState();
396             return concat(new TransformPathStream<T, R>(this, type, lenient, path));
397         }
398     }
399 
400     /**
401      * Intermediate operation returning a Stream with only the distinct elements of this Stream.
402      * Duplicates are removed lazily during the iteration. Note that duplicate removal requires to
403      * keep track of the elements seen, so an amount of memory proportional to the number of
404      * elements in the Stream is required.
405      *
406      * @return a Stream over de-duplicated elements
407      */
408     public final Stream<T> distinct() {
409         synchronized (this.state) {
410             checkState();
411             return this instanceof DistinctStream<?> ? this : new DistinctStream<T>(this);
412         }
413     }
414 
415     /**
416      * Intermediate operation returning a Stream with max {@code limit} elements with index
417      * starting at {@code offset} taken from this Stream. After those elements are returned, the
418      * wrapped Stream is automatically closed.
419      *
420      * @param offset
421      *            the offset where to start returning elements from, not negative
422      * @param limit
423      *            the maximum number of elements to return (starting from offset), not negative
424      * @return a Stream wrapping this Stream and limiting the number of returned elements
425      */
426     public final Stream<T> slice(final long offset, final long limit) {
427         synchronized (this.state) {
428             checkState();
429             return new SliceStream<T>(this, offset, limit);
430         }
431     }
432 
433     /**
434      * Intermediate operation returning a Stream of elements chunks of the specified size obtained
435      * from this Stream (the last chunk may be smaller).
436      *
437      * @param chunkSize
438      *            the chunk size, positive
439      * @return a Stream wrapping this Stream and returning chunks of elements
440      */
441     public final Stream<List<T>> chunk(final int chunkSize) {
442         synchronized (this.state) {
443             checkState();
444             return new ChunkStream<T>(this, chunkSize);
445         }
446     }
447 
448     /**
449      * Intermediate operation returning a Stream that returns the elements of this {@code Stream}
450      * and tracks the number of elements returned so far. As tracking the number elements has a
451      * small cost (due to {@code Iterator} and {@code Handler} wrapping, this feature must be
452      * explicitly requested and is not offered as part of the default feature set of
453      * {@code Stream}.
454      *
455      * @param counter
456      *            the variable where to hold the number of returned elements, possibly null
457      * @param eof
458      *            the variable where to store whether end of sequence has been reached, possibly
459      *            null
460      * @return a {@code Stream} tracking the number of returned elements
461      */
462     public final Stream<T> track(@Nullable final AtomicLong counter,
463             @Nullable final AtomicBoolean eof) {
464         synchronized (this.state) {
465             checkState();
466             return new TrackStream<T>(this, counter, eof);
467         }
468     }
469 
470     /**
471      * Terminal operation returning the number of elements in this Stream. Note that only few
472      * elements are materialized at any time, so it is safe to use this method with arbitrarily
473      * large Streams.
474      *
475      * @return the number of elements in this Stream
476      */
477     public final long count() {
478         final AtomicLong result = new AtomicLong();
479         toHandler(new Handler<T>() {
480 
481             private long count;
482 
483             @Override
484             public void handle(final T element) {
485                 if (element != null) {
486                     ++this.count;
487                 } else {
488                     result.set(this.count);
489                 }
490 
491             }
492 
493         });
494         return result.get();
495     }
496 
497     /**
498      * Terminal operation returning an Iterator over the elements of this Stream. {@inheritDoc}
499      */
500     @Override
501     public final Iterator<T> iterator() {
502         synchronized (this.state) {
503             checkState();
504             this.state.available = false;
505             final Iterator<T> iterator;
506             try {
507                 iterator = new CheckedIterator<T>(doIterator(), this);
508             } catch (final Throwable ex) {
509                 throw Throwables.propagate(ex);
510             }
511             this.state.activeIterator = iterator;
512             return iterator;
513         }
514     }
515 
516     /**
517      * Terminal operations that forwards all the elements of this Stream to the Handler specified.
518      * No explicit mechanism is provided for interrupting the Iteration, although many Streams may
519      * react to the standard {@link Thread#interrupt()} signal to stop iteration.
520      *
521      * @param handler
522      *            the Handler where to forward elements
523      */
524     public final void toHandler(final Handler<? super T> handler) {
525         Preconditions.checkNotNull(handler);
526         synchronized (this.state) {
527             checkState();
528             this.state.available = false;
529             this.state.toHandlerThread = Thread.currentThread();
530         }
531         try {
532             try {
533                 doToHandler(handler);
534             } catch (final Throwable ex) {
535                 Throwables.propagate(ex);
536             }
537         } finally {
538             synchronized (this.state) {
539                 if (this.state.closed) {
540                     checkState(); // fail in case stream was closed while iteration was active
541                 }
542                 this.state.toHandlerThread = null; // to avoid interruption
543                 Thread.interrupted(); // clear interruption status
544                 close(); // if not closed, will close the stream now
545             }
546         }
547     }
548 
549     /**
550      * Terminal operation returning an array of the specified type with all the elements of this
551      * Stream. Call this method only if there is enough memory to hold the resulting array.
552      *
553      * @param elementClass
554      *            the type of element to be stored in the array (necessary for the array creation)
555      * @return the resulting array
556      */
557     public final T[] toArray(final Class<T> elementClass) {
558         return Iterables.toArray(toCollection(Lists.<T>newArrayListWithCapacity(256)),
559                 elementClass);
560     }
561 
562     /**
563      * Terminal operation returning an immutable List with all the elements of this Stream. Call
564      * this method only if there is enough memory to hold the resulting List.
565      *
566      * @return the resulting immutable List
567      */
568     public final List<T> toList() {
569         return ImmutableList.copyOf(toCollection(Lists.<T>newArrayListWithCapacity(256)));
570     }
571 
572     /**
573      * Terminal operation returning an immutable Set with all the elements of this Stream. Call
574      * this method only if there is enough memory to hold the resulting Set. Note that duplicate
575      * elements are automatically removed from the resulting Set.
576      *
577      * @return the resulting immutable Set
578      */
579     public final Set<T> toSet() {
580         return ImmutableSet.copyOf(toCollection(Lists.<T>newArrayListWithCapacity(256)));
581     }
582 
583     /**
584      * Terminal operation returning an immutable List with all the elements of this Stream, sorted
585      * using the supplied Comparator. Use {@link Ordering#natural()} to sort Comparable elements
586      * based on {@link Comparable#compareTo(Object)} order. Call this method only if there is
587      * enough memory to hold the resulting List.
588      *
589      * @param comparator
590      *            the Comparator to sort elements, not null
591      * @return the resulting immutable sorted list
592      */
593     public final List<T> toSortedList(final Comparator<? super T> comparator) {
594         return Ordering.from(comparator).immutableSortedCopy(
595                 toCollection(Lists.<T>newArrayListWithCapacity(256)));
596     }
597 
598     /**
599      * Terminal operation returning an immutable List with all the elements of this Stream, sorted
600      * based on a sort key obtained by applying an optional <i>navigation path</i> and conversion
601      * to a specified type. The path is a sequence of keys ({@code String}s, {@code URI}s, generic
602      * objects) that are applied to {@code Record}, {@code BindingSet}, {@code Map} and
603      * {@code Multimap} elements to extract child elements in a recursive fashion. Starting from
604      * an element returned by this stream, the result of this navigation process is either null or
605      * a {@code Comparable} key object that is converted to the requested type (via
606      * {@link Data#convert(Object, Class)}) and used for comparing the element with other element.
607      * The {@code lenient} parameters controls whether conversion errors should be ignored or
608      * result in an exception being thrown.
609      *
610      * @param type
611      *            the class of the sort key
612      * @param lenient
613      *            true if conversion errors should be ignored
614      * @param path
615      *            a vararg array of zero or more keys that recursively select the sort key
616      * @return the resulting immutable sorted list
617      */
618     public final List<T> toSortedList(final Class<? extends Comparable<?>> type,
619             final boolean lenient, final Object... path) {
620         return Ordering.from(new PathComparator(type, lenient, path)).immutableSortedCopy(
621                 toCollection(Lists.<T>newArrayListWithCapacity(256)));
622     }
623 
624     /**
625      * Terminal operation returning an immutable SortedSet with all the elements of this Stream,
626      * sorted using the supplied {@code Comparator}. Use {@link Ordering#natural()} to sort
627      * Comparable elements based on {@link Comparable#compareTo(Object)} order. Call this method
628      * only if there is enough memory to hold the resulting SortedSet.
629      *
630      * @param comparator
631      *            the {@code Comparator} to sort elements, not null
632      * @return the resulting immutable SortedSet
633      */
634     public final SortedSet<T> toSortedSet(final Comparator<? super T> comparator) {
635         return ImmutableSortedSet.copyOf(comparator,
636                 toCollection(Lists.<T>newArrayListWithCapacity(256)));
637     }
638 
639     /**
640      * Terminal operation storing all the elements of this Stream in the supplied Collection. Call
641      * this method only if the target Collection can hold all the remaining elements.
642      *
643      * @param collection
644      *            the Collection where to store elements, not null
645      * @param <C>
646      *            the type of Collection
647      * @return the supplied Collection
648      */
649     public final <C extends Collection<? super T>> C toCollection(final C collection) {
650         Preconditions.checkNotNull(collection);
651         toHandler(new Handler<T>() {
652 
653             @Override
654             public void handle(final T element) {
655                 if (element != null) {
656                     collection.add(element);
657                 }
658             }
659         });
660         return collection;
661     }
662 
663     /**
664      * Terminal operation returning an immutable Map indexing the elements of this Stream as
665      * {@code <key, value>} pairs computed using the supplied Functions. The supplied key and
666      * value Functions are called for each element, producing the keys and values to store in the
667      * Map. If a null key or value are produced, the element is discarded. If multiple values are
668      * mapped to the same key, only the most recently computed one will be stored. Use
669      * {@link Functions#identity()} in case no transformation is required to extract the key
670      * and/or the value.
671      *
672      * @param keyFunction
673      *            the key function
674      * @param valueFunction
675      *            the value function
676      * @param <K>
677      *            the type of key
678      * @param <V>
679      *            the type of value
680      * @return an immutable Map with the computed {@code <key, value>} pairs
681      */
682     public final <K, V> Map<K, V> toMap(final Function<? super T, ? extends K> keyFunction,
683             final Function<? super T, ? extends V> valueFunction) {
684         return ImmutableMap
685                 .copyOf(toMap(keyFunction, valueFunction, Maps.<K, V>newLinkedHashMap()));
686     }
687 
688     /**
689      * Terminal operation storing the elements of this Stream in the supplied map as
690      * {@code <key, value>} pairs computed using the supplied Functions. The supplied key and
691      * value Functions are called for each element, producing the keys and values to store in the
692      * Map. If a null key or value are produced, the element is discarded. If multiple values are
693      * mapped to the same key, only the most recently computed one will be stored. Use
694      * {@link Functions#identity()} in case no transformation is required to extract the key
695      * and/or the value.
696      *
697      * @param keyFunction
698      *            the key function
699      * @param valueFunction
700      *            the value function
701      * @param map
702      *            the Map where to store the extracted {@code <key, value>} pairs
703      * @param <K>
704      *            the type of key
705      * @param <V>
706      *            the type of value
707      * @param <M>
708      *            the type of Map
709      * @return the supplied Map
710      */
711     public final <K, V, M extends Map<K, V>> M toMap(
712             final Function<? super T, ? extends K> keyFunction,
713             final Function<? super T, ? extends V> valueFunction, final M map) {
714         Preconditions.checkNotNull(keyFunction);
715         Preconditions.checkNotNull(valueFunction);
716         Preconditions.checkNotNull(map);
717         toHandler(new Handler<T>() {
718 
719             @Override
720             public void handle(final T element) {
721                 if (element != null) {
722                     final K key = keyFunction.apply(element);
723                     final V value = valueFunction.apply(element);
724                     if (key != null && value != null) {
725                         map.put(key, value);
726                     }
727                 }
728             }
729 
730         });
731         return map;
732     }
733 
734     /**
735      * Terminal operation returning an immutable ListMultimap indexing the elements of this Stream
736      * as {@code <key, value>} pairs computed using the supplied Functions. The supplied key and
737      * value Functions are called for each element, producing the keys and values to store. If a
738      * null key or value are produced, the element is discarded. Use {@link Functions#identity()}
739      * in case no transformation is required to extract the key and/or the value.
740      *
741      * @param keyFunction
742      *            the key function
743      * @param valueFunction
744      *            the value function
745      * @param <K>
746      *            the type of key
747      * @param <V>
748      *            the type of value
749      * @return an immutable ListMultimap
750      */
751     public final <K, V> ListMultimap<K, V> toMultimap(
752             final Function<? super T, ? extends K> keyFunction,
753             final Function<? super T, ? extends V> valueFunction) {
754         return ImmutableListMultimap.copyOf(toMultimap(keyFunction, valueFunction,
755                 ArrayListMultimap.<K, V>create()));
756     }
757 
758     /**
759      * Terminal operation storing the elements of this Stream in the supplied Multimap as
760      * {@code <key, value>} pairs computed using the supplied Functions. The supplied key and
761      * value functions are called for each element, producing the keys and values to store. If a
762      * null key or value are produced, the element is discarded. Use {@link Functions#identity()}
763      * in case no transformation is required to extract the key and/or the value.
764      *
765      * @param keyFunction
766      *            the key function
767      * @param valueFunction
768      *            the value function
769      * @param multimap
770      *            the Multimap where to store the extracted {@code <key, value>} pairs
771      * @param <K>
772      *            the type of key
773      * @param <V>
774      *            the type of value
775      * @param <M>
776      *            the type of Multimap
777      * @return the supplied Multimap
778      */
779     public final <K, V, M extends Multimap<K, V>> M toMultimap(
780             final Function<? super T, ? extends K> keyFunction,
781             final Function<? super T, ? extends V> valueFunction, final M multimap) {
782         Preconditions.checkNotNull(keyFunction);
783         Preconditions.checkNotNull(valueFunction);
784         Preconditions.checkNotNull(multimap);
785         toHandler(new Handler<T>() {
786 
787             @Override
788             public void handle(final T element) {
789                 if (element != null) {
790                     final K key = keyFunction.apply(element);
791                     final V value = valueFunction.apply(element);
792                     if (key != null && value != null) {
793                         multimap.put(key, value);
794                     }
795                 }
796             }
797 
798         });
799         return multimap;
800     }
801 
802     /**
803      * Terminal operation returning the only element in this Stream, or the default value
804      * specified if there are no elements, multiple elements or an Exception occurs.
805      *
806      * @param defaultValue
807      *            the default value to return if a unique value cannot be extracted
808      * @return the only element in this Stream, or the default value in case that element does not
809      *         exist, there are multiple elements or an Exception occurs
810      */
811     public final T getUnique(final T defaultValue) {
812         try {
813             final T result = getUnique();
814             if (result != null) {
815                 return result;
816             }
817         } catch (final Throwable ex) {
818             // ignore
819         }
820         return defaultValue;
821     }
822 
823     /**
824      * Terminal operation returning the only element in this Stream, or <tt>null</tt> if there are
825      * no elements. An exception is thrown in case there is more than one element in the Stream.
826      * This method is designed for being used with Streams that are expected to return exactly one
827      * element, for which it embeds the check on the number of elements.
828      *
829      * @return the only element in the Stream
830      * @throws IllegalStateException
831      *             in case there are multiple elements in the Stream
832      */
833     @SuppressWarnings("unchecked")
834     @Nullable
835     public final T getUnique() throws IllegalStateException {
836         final AtomicReference<Object> holder = new AtomicReference<Object>();
837         toHandler(new Handler<T>() {
838 
839             @Override
840             public void handle(final T element) {
841                 if (element != null) {
842                     if (holder.get() == null) {
843                         holder.set(element);
844                     } else {
845                         holder.set(EOF);
846                         Thread.currentThread().interrupt(); // attempt interupting iteration
847                     }
848                 }
849             }
850 
851         });
852         final Object result = holder.get();
853         if (result != EOF) {
854             return (T) result;
855         }
856         throw new IllegalStateException("Stream " + this + " returned more than one element");
857     }
858 
859     /**
860      * Returns a metadata property about the stream. Note that {@code Stream} wrappers obtained
861      * through intermediate operations don't have their own properties, but instead access the
862      * metadata properties of the source {@code Stream}.
863      *
864      * @param name
865      *            the name of the property
866      * @param type
867      *            the type of the property value (conversion will be attempted if available value
868      *            has a different type)
869      * @param <V>
870      *            the type of value
871      * @return the value of the property, or null if the property is undefined
872      */
873     public final <V> V getProperty(final String name, final Class<V> type) {
874         Preconditions.checkNotNull(name);
875         try {
876             Object value = null;
877             synchronized (this.state) {
878                 if (this.state.properties != null) {
879                     value = this.state.properties.get(name);
880                 }
881             }
882             return Data.convert(value, type);
883         } catch (final Throwable ex) {
884             throw Throwables.propagate(ex);
885         }
886     }
887 
888     /**
889      * Sets a metadata property about the stream. Note that {@code Stream} wrappers obtained
890      * through intermediate operations don't have their own properties, but instead access the
891      * metadata properties of the source {@code Stream}.
892      *
893      * @param name
894      *            the name of the property
895      * @param value
896      *            the value of the property, null to clear it
897      * @return this {@code Stream}, for call chaining
898      */
899     public final Stream<T> setProperty(final String name, @Nullable final Object value) {
900         Preconditions.checkNotNull(name);
901         synchronized (this.state) {
902             if (this.state.properties != null) {
903                 this.state.properties.put(name, value);
904             } else if (value != null) {
905                 this.state.properties = Maps.newHashMap();
906                 this.state.properties.put(name, value);
907             }
908         }
909         return this;
910     }
911 
912     /**
913      * Gets a timeout possibly set on the {@code Stream} and represented by the absolute
914      * milliseconds timestamp when the {@code Stream} will be closed.
915      *
916      * @return the milliseconds timestamp when this {@code Stream} will be closed, or null if no
917      *         timeout has been set
918      */
919     public final Long getTimeout() {
920         synchronized (this.state) {
921             return this.state.timeoutFuture == null ? null : this.state.timeoutFuture
922                     .getDelay(TimeUnit.MILLISECONDS) + System.currentTimeMillis();
923         }
924     }
925 
926     /**
927      * Sets a timeout by supplying the absolute milliseconds timestamp when the {@code Stream}
928      * will be forcedly closed. If the supplied value is null, any previously set timeout is
929      * removed. In case the {@code Stream has already been closed}, or has just timed out due to a
930      * previously set timeout, calling this method has no effect.
931      *
932      * @param timestamp
933      *            the milliseconds timestamp when the {@code Stream} will be closed
934      * @return this {@code Stream}, for call chaining
935      */
936     public final Stream<T> setTimeout(@Nullable final Long timestamp) {
937         Preconditions.checkArgument(timestamp == null || timestamp > System.currentTimeMillis());
938         synchronized (this.state) {
939             if (this.state.closed) {
940                 return this; // NOP, already closed
941             }
942             if (this.state.timeoutFuture != null) {
943                 if (!this.state.timeoutFuture.cancel(false)) {
944                     return this; // NOP, timeout already occurred
945                 }
946             }
947             if (timestamp != null) {
948                 this.state.timeoutFuture = Data.getExecutor().schedule(new Runnable() {
949 
950                     @Override
951                     public void run() {
952                         close();
953                     }
954 
955                 }, Math.max(0, timestamp - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
956             }
957             return this;
958         }
959     }
960 
961     /**
962      * Checks whether this Stream is available, i.e., intermediate and terminal operations can be
963      * called. Invoking a terminal operation or closing the Stream will make it non-available.
964      *
965      * @return true, if the Stream is available
966      */
967     public final boolean isAvailable() {
968         synchronized (this.state) {
969             return this.state.available;
970         }
971     }
972 
973     /**
974      * Checks whether this Stream has been closed. Note that a Stream is automatically closed when
975      * consumption of its elements by a terminal operation completes.
976      *
977      * @return true, if the Stream has been closed
978      */
979     public final boolean isClosed() {
980         synchronized (this.state) {
981             return this.state.closed;
982         }
983     }
984 
985     /**
986      * Register zero or more objects for activation when this {@code Stream} will be closed. Each
987      * supplied object can be a {@code Closeable}, in which case method {@link Closeable#close()}
988      * will be called, a {@code Runnable}, in which case method {@link Runnable#run()} will be
989      * called, or a {@code Callable}, in which casle method {@link Callable#call()} will be
990      * called; any other type of object will be rejected resulting in an exception. In case the
991      * {@code Stream} has already been closed, activation of supplied objects will be done
992      * immediately.
993      *
994      * @param objects
995      *            the objects to activate when the {@code Stream} will be closed
996      * @return this {@code Stream}, for call chaining.
997      */
998     public final Stream<T> onClose(final Object... objects) {
999         synchronized (this.state) {
1000             for (final Object object : objects) {
1001                 if (!(object instanceof Closeable) && !(object instanceof Runnable)
1002                         && !(object instanceof Callable)) {
1003                     throw new IllegalArgumentException("Illegal object: " + object);
1004                 } else if (this.state.closed) {
1005                     closeAction(object);
1006                 } else {
1007                     boolean alreadyContained = false;
1008                     for (final Object o : this.state.closeObjects) {
1009                         if (o == object) {
1010                             alreadyContained = true;
1011                             break;
1012                         }
1013                     }
1014                     if (!alreadyContained) {
1015                         this.state.closeObjects.add(object);
1016                     }
1017                 }
1018             }
1019         }
1020         return this;
1021     }
1022 
1023     /**
1024      * Closes this {@code Stream} and releases any resource associated to it. The operation causes
1025      * any {@code Stream} wrapping or wrapped by this {@code Stream} to be closed. If element
1026      * iteration through a terminal operation is in progress, it is interrupted resulting in an
1027      * exception being thrown. If this {@code Stream} has already been closed, then calling this
1028      * method has no effect.
1029      */
1030     @Override
1031     public final void close() {
1032         synchronized (this.state) {
1033             if (this.state.closed) {
1034                 return;
1035             }
1036             if (this.state.activeIterator instanceof Closeable) {
1037                 Util.closeQuietly(this.state.activeIterator);
1038             }
1039             if (this.state.toHandlerThread != null) {
1040                 this.state.toHandlerThread.interrupt();
1041             }
1042             for (final Object object : this.state.closeObjects) {
1043                 closeAction(object);
1044             }
1045             this.state.activeIterator = null;
1046             this.state.toHandlerThread = null;
1047             this.state.available = false;
1048             this.state.closed = true;
1049         }
1050     }
1051 
1052     /**
1053      * Returns a string representation of this Stream. The resulting string depends on the actual
1054      * Stream class. For wrapper Streams, it shows the wrapper parameters and the wrapping
1055      * hierarchy.
1056      *
1057      * @return a string representation of this Stream
1058      */
1059     @Override
1060     public final String toString() {
1061         final StringBuilder builder = new StringBuilder();
1062         toStringHelper(builder);
1063         return builder.toString();
1064     }
1065 
1066     void toStringHelper(final StringBuilder builder) {
1067         String name = getClass().getSimpleName();
1068         if (name == null) {
1069             final Method method = getClass().getEnclosingMethod();
1070             if (method != null) {
1071                 name = method.getDeclaringClass().getSimpleName() + "." + method.getName()
1072                         + "-Stream";
1073             } else {
1074                 name = "anon-Stream";
1075             }
1076         }
1077         builder.append(name);
1078         final String args = doToString();
1079         if (args != null) {
1080             builder.append("<").append(args).append(">");
1081         }
1082     }
1083 
1084     final void checkState() {
1085         synchronized (this.state) {
1086             if (this.state.closed) {
1087                 throw new IllegalStateException("Stream already closed: " + this);
1088             } else if (!this.state.available) {
1089                 throw new IllegalStateException("Stream already being iterated: " + this);
1090             }
1091         }
1092     }
1093 
1094     final void closeAction(final Object object) {
1095         try {
1096             if (object instanceof Stream<?>) {
1097                 ((Stream<?>) object).doClose();
1098             } else if (object instanceof Closeable) {
1099                 ((Closeable) object).close();
1100             } else if (object instanceof Runnable) {
1101                 ((Runnable) object).run();
1102             } else if (object instanceof Callable<?>) {
1103                 ((Callable<?>) object).call();
1104             }
1105         } catch (final Throwable ex) {
1106             LOGGER.error("Error performing close action on " + object, ex);
1107         }
1108     }
1109 
1110     /**
1111      * Implementation method responsible of producing an Iterator over the elements of the Stream.
1112      * This method is called by {@link #iterator()} with the guarantee that it is called at most
1113      * once and with the Stream in the <i>available</i> state. If the returned Iterator implements
1114      * the {@link Closeable} interface, it will be automatically closed when the Stream is closed.
1115      *
1116      * @return an Iterator over the elements of the Stream
1117      * @throws Throwable
1118      *             in case of failure
1119      */
1120     protected Iterator<T> doIterator() throws Throwable {
1121         final ToHandlerIterator<T> iterator = new ToHandlerIterator<T>(this);
1122         iterator.submit();
1123         return iterator;
1124     }
1125 
1126     /**
1127      * Implementation methods responsible of forwarding all the elements of the Stream to the
1128      * Handler specified. This method is called by {@link #toHandler(Handler)} to perform internal
1129      * iteration, with the guarantee that it is called at most once and with the Stream in the
1130      * <i>available</i> state. As a best practice, the method should intercept
1131      * {@link Thread#interrupt()} requests and stop iteration, if possible; also remember to call
1132      * {@link Handler#handle(Object)} with a null argument after the last element is reached, in
1133      * order to signal the end of the sequence.
1134      *
1135      * @param handler
1136      *            the {@code Handler} where to forward elements
1137      * @throws Throwable
1138      *             in case of failure
1139      */
1140     protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1141         final Iterator<T> iterator = doIterator();
1142         while (iterator.hasNext()) {
1143             if (Thread.interrupted()) {
1144                 return;
1145             }
1146             handler.handle(iterator.next());
1147         }
1148         handler.handle(null);
1149     }
1150 
1151     /**
1152      * Implementation method supporting the generation of a string representation of this Stream.
1153      * The method should return any parameter / state characterizing this {@code Stream}, which is
1154      * then included within angular brackets in the String denoting the {@code Stream} structure.
1155      *
1156      * @return an optional string with the arguments / state characterizing this {@code Stream},
1157      *         possibly null
1158      */
1159     @Nullable
1160     protected String doToString() {
1161         return null;
1162     }
1163 
1164     /**
1165      * Implementation method responsible of closing optional resources associated to this Stream.
1166      * The default implementation does nothing.
1167      *
1168      * @throws Throwable
1169      *             in case of failure
1170      */
1171     protected void doClose() throws Throwable {
1172     }
1173 
1174     @Override
1175     protected void finalize() throws Throwable {
1176         try {
1177             close();
1178         } finally {
1179             super.finalize();
1180         }
1181     }
1182 
1183     private static final class State {
1184 
1185         @Nullable
1186         List<Object> closeObjects = Lists.newArrayList();
1187 
1188         @Nullable
1189         ScheduledFuture<?> timeoutFuture;
1190 
1191         @Nullable
1192         Map<String, Object> properties;
1193 
1194         @Nullable
1195         Iterator<?> activeIterator;
1196 
1197         @Nullable
1198         Thread toHandlerThread;
1199 
1200         boolean available = true;
1201 
1202         boolean closed = false;
1203 
1204     }
1205 
1206     private abstract static class AbstractIterator<T> extends UnmodifiableIterator<T> implements
1207             Closeable {
1208 
1209         @Nullable
1210         private T next;
1211 
1212         @Override
1213         public final boolean hasNext() {
1214             if (this.next == null) {
1215                 this.next = advance();
1216             }
1217             return this.next != null;
1218         }
1219 
1220         @Override
1221         public final T next() {
1222             if (this.next == null) {
1223                 final T result = advance();
1224                 if (result != null) {
1225                     return result;
1226                 }
1227                 throw new NoSuchElementException();
1228             } else {
1229                 final T result = this.next;
1230                 this.next = null;
1231                 return result;
1232             }
1233         }
1234 
1235         @Override
1236         public void close() throws IOException {
1237         }
1238 
1239         protected abstract T advance();
1240 
1241     }
1242 
1243     private static final class CheckedIterator<T> extends UnmodifiableIterator<T> {
1244 
1245         private final Iterator<T> iterator;
1246 
1247         private final Stream<T> stream;
1248 
1249         private final State state;
1250 
1251         private boolean exhausted;
1252 
1253         CheckedIterator(final Iterator<T> iterator, final Stream<T> stream) {
1254             this.iterator = iterator;
1255             this.stream = stream;
1256             this.state = stream.state;
1257             this.exhausted = false;
1258         }
1259 
1260         @Override
1261         public boolean hasNext() {
1262             boolean result = false;
1263             if (!this.exhausted) {
1264                 checkState();
1265                 try {
1266                     result = this.iterator.hasNext();
1267                 } finally {
1268                     if (!result) {
1269                         this.stream.close();
1270                         this.exhausted = true;
1271                     }
1272                 }
1273             }
1274             return result;
1275         }
1276 
1277         @Override
1278         public T next() {
1279             checkState();
1280             try {
1281                 return this.iterator.next();
1282             } catch (final Throwable ex) {
1283                 this.stream.close();
1284                 throw Throwables.propagate(ex);
1285             }
1286         }
1287 
1288         private void checkState() {
1289             boolean closed;
1290             synchronized (this.state) {
1291                 closed = this.state.closed;
1292             }
1293             Preconditions.checkState(!closed, "Stream has been closed");
1294         }
1295 
1296     }
1297 
1298     private static final class ToHandlerIterator<T> extends AbstractIterator<T> implements
1299             Handler<T>, Runnable {
1300 
1301         private final Stream<T> stream;
1302 
1303         private final BlockingQueue<Object> queue;
1304 
1305         private Future<?> future;
1306 
1307         ToHandlerIterator(final Stream<T> stream) {
1308             this.stream = stream;
1309             this.queue = new ArrayBlockingQueue<Object>(1024);
1310             this.future = null;
1311         }
1312 
1313         public void submit() {
1314             this.future = Data.getExecutor().submit(this);
1315         }
1316 
1317         @Override
1318         public void run() {
1319             try {
1320                 this.stream.doToHandler(this);
1321             } catch (final Throwable ex) {
1322                 putUninterruptibly(ex);
1323                 putUninterruptibly(EOF);
1324             }
1325         }
1326 
1327         @Override
1328         public void handle(final T element) {
1329             try {
1330                 this.queue.put(element == null ? EOF : element);
1331             } catch (final InterruptedException ex) {
1332                 putUninterruptibly(ex);
1333                 putUninterruptibly(EOF);
1334                 Thread.currentThread().interrupt(); // restore interruption status
1335             }
1336         }
1337 
1338         @SuppressWarnings("unchecked")
1339         @Override
1340         protected T advance() {
1341             try {
1342                 final Object element = this.queue.take();
1343                 if (element == EOF) {
1344                     return null;
1345                 } else if (element instanceof Throwable) {
1346                     throw Throwables.propagate((Throwable) element);
1347                 } else {
1348                     return (T) element;
1349                 }
1350             } catch (final InterruptedException ex) {
1351                 Thread.currentThread().interrupt(); // restore interruption status
1352                 throw new RuntimeException("Interrupted while waiting for next element", ex);
1353             }
1354         }
1355 
1356         @Override
1357         public void close() {
1358             if (this.future != null) {
1359                 this.future.cancel(true);
1360             }
1361         }
1362 
1363         private void putUninterruptibly(final Object element) {
1364             while (true) {
1365                 try {
1366                     this.queue.put(element);
1367                     return;
1368                 } catch (final InterruptedException ex) {
1369                     // ignore
1370                 }
1371             }
1372         }
1373 
1374     }
1375 
1376     // STREAM IMPLEMENTATIONS
1377 
1378     private static final class EmptyStream<T> extends Stream<T> {
1379 
1380         @Override
1381         protected Iterator<T> doIterator() {
1382             return Collections.emptyIterator();
1383         }
1384 
1385         @Override
1386         protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1387             handler.handle(null);
1388         }
1389 
1390     }
1391 
1392     private static final class SingletonStream<T> extends Stream<T> {
1393 
1394         private T element;
1395 
1396         SingletonStream(final T element) {
1397             this.element = Preconditions.checkNotNull(element);
1398         }
1399 
1400         @Override
1401         protected Iterator<T> doIterator() {
1402             return Iterators.singletonIterator(this.element);
1403         }
1404 
1405         @Override
1406         protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1407             handler.handle(this.element);
1408             handler.handle(null);
1409         }
1410 
1411         @Override
1412         protected void doClose() throws Throwable {
1413             this.element = null;
1414         }
1415     }
1416 
1417     private static final class IterableStream<T> extends Stream<T> {
1418 
1419         private Iterable<? extends T> iterable;
1420 
1421         IterableStream(final Iterable<? extends T> iterable) {
1422             this.iterable = Preconditions.checkNotNull(iterable);
1423         }
1424 
1425         @SuppressWarnings("unchecked")
1426         @Override
1427         protected Iterator<T> doIterator() throws Throwable {
1428             return ((Iterable<T>) this.iterable).iterator();
1429         }
1430 
1431         @Override
1432         protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1433             for (final T element : this.iterable) {
1434                 handler.handle(element);
1435             }
1436             handler.handle(null);
1437         }
1438 
1439         @Override
1440         protected void doClose() throws Throwable {
1441             if (this.iterable instanceof Closeable) {
1442                 ((Closeable) this.iterable).close();
1443             }
1444             this.iterable = null;
1445         }
1446 
1447     }
1448 
1449     private static final class IteratorStream<T> extends Stream<T> {
1450 
1451         private Iterator<? extends T> iterator;
1452 
1453         IteratorStream(final Iterator<? extends T> iterator) {
1454             this.iterator = Preconditions.checkNotNull(iterator);
1455         }
1456 
1457         @SuppressWarnings("unchecked")
1458         @Override
1459         protected Iterator<T> doIterator() {
1460             return (Iterator<T>) this.iterator;
1461         }
1462 
1463         @Override
1464         protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1465             while (this.iterator.hasNext()) {
1466                 if (Thread.interrupted()) {
1467                     return;
1468                 }
1469                 final T element = this.iterator.next();
1470                 handler.handle(element);
1471             }
1472             handler.handle(null);
1473         }
1474 
1475         @Override
1476         protected void doClose() throws Throwable {
1477             if (this.iterator instanceof Closeable) {
1478                 ((Closeable) this.iterator).close();
1479             }
1480             this.iterator = null;
1481         }
1482 
1483     }
1484 
1485     private static final class IterationStream<T> extends Stream<T> {
1486 
1487         private Iteration<? extends T, ?> iteration;
1488 
1489         IterationStream(final Iteration<? extends T, ?> iteration) {
1490             this.iteration = Preconditions.checkNotNull(iteration);
1491         }
1492 
1493         @Override
1494         protected Iterator<T> doIterator() {
1495             return new AbstractIterator<T>() {
1496 
1497                 @Override
1498                 protected T advance() {
1499                     try {
1500                         if (IterationStream.this.iteration.hasNext()) {
1501                             return IterationStream.this.iteration.next();
1502                         } else {
1503                             return null;
1504                         }
1505                     } catch (final Throwable ex) {
1506                         throw Throwables.propagate(ex);
1507                     }
1508                 }
1509 
1510             };
1511         }
1512 
1513         @Override
1514         protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1515             while (this.iteration.hasNext()) {
1516                 if (Thread.interrupted()) {
1517                     return;
1518                 }
1519                 final T element = this.iteration.next();
1520                 handler.handle(element);
1521             }
1522             handler.handle(null);
1523         }
1524 
1525         @Override
1526         protected void doClose() throws Throwable {
1527             if (this.iteration instanceof CloseableIteration<?, ?>) {
1528                 ((CloseableIteration<? extends T, ?>) this.iteration).close();
1529             }
1530             this.iteration = null;
1531         }
1532 
1533     }
1534 
1535     private abstract static class DelegatingStream<I, O> extends Stream<O> {
1536 
1537         final Stream<I> delegate;
1538 
1539         DelegatingStream(final Stream<I> delegate) {
1540             super(delegate.state);
1541             this.delegate = delegate;
1542         }
1543 
1544         @Override
1545         void toStringHelper(final StringBuilder builder) {
1546             super.toStringHelper(builder);
1547             builder.append(" (");
1548             this.delegate.toStringHelper(builder);
1549             builder.append(")");
1550         }
1551 
1552     }
1553 
1554     private static final class ConcatStream<I extends Iterable<? extends O>, O> extends
1555             DelegatingStream<I, O> {
1556 
1557         ConcatStream(final Stream<I> delegate) {
1558             super(delegate);
1559         }
1560 
1561         @Override
1562         protected Iterator<O> doIterator() throws Throwable {
1563             final Iterator<I> streamIterator = this.delegate.doIterator();
1564             final Iterator<O> elementIterator = new AbstractIterator<O>() {
1565 
1566                 private Stream<? extends O> stream;
1567 
1568                 private Iterator<? extends O> iterator;
1569 
1570                 @Override
1571                 protected O advance() {
1572                     while (this.iterator == null || !this.iterator.hasNext()) {
1573                         if (this.stream != null) {
1574                             this.stream.close();
1575                         }
1576                         if (!streamIterator.hasNext()) {
1577                             return null;
1578                         }
1579                         this.stream = create(streamIterator.next());
1580                         this.iterator = this.stream.iterator();
1581                     }
1582                     return this.iterator.next();
1583                 }
1584 
1585                 @Override
1586                 public void close() {
1587                     if (this.stream != null) {
1588                         this.stream.close();
1589                     }
1590                 }
1591 
1592             };
1593             onClose(elementIterator);
1594             return elementIterator;
1595         }
1596 
1597         @Override
1598         protected void doToHandler(final Handler<? super O> handler) throws Throwable {
1599             this.delegate.doToHandler(new Handler<I>() {
1600 
1601                 @Override
1602                 public void handle(final I iterable) throws Throwable {
1603                     if (iterable == null) {
1604                         handler.handle(null);
1605                     } else {
1606                         final AtomicBoolean eof = new AtomicBoolean(false);
1607                         create(iterable).toHandler(new Handler<O>() {
1608 
1609                             @Override
1610                             public void handle(final O element) throws Throwable {
1611                                 if (element != null) {
1612                                     handler.handle(element);
1613                                 } else {
1614                                     eof.set(true);
1615                                 }
1616                             }
1617 
1618                         });
1619                         if (!eof.get()) { // halt streamStream.toHandler if interrupted
1620                             Thread.currentThread().interrupt();
1621                         }
1622                     }
1623                 }
1624 
1625             });
1626         }
1627 
1628     }
1629 
1630     private abstract static class ProcessingStream<I, O> extends DelegatingStream<I, O> {
1631 
1632         final int parallelism;
1633 
1634         ProcessingStream(final Stream<I> delegate, final int parallelism) {
1635             super(delegate);
1636             this.parallelism = parallelism;
1637         }
1638 
1639         @Override
1640         protected final Iterator<O> doIterator() throws Throwable {
1641             if (this.parallelism <= 1) {
1642                 return doIteratorSequential();
1643             } else {
1644                 return doIteratorParallel();
1645             }
1646         }
1647 
1648         private Iterator<O> doIteratorSequential() throws Throwable {
1649             final Iterator<I> iterator = this.delegate.doIterator();
1650             return new AbstractIterator<O>() {
1651 
1652                 @SuppressWarnings("unchecked")
1653                 @Override
1654                 protected O advance() {
1655                     while (iterator.hasNext()) {
1656                         final I element = iterator.next();
1657                         final Object transformed = process(element);
1658                         if (transformed == EOF) {
1659                             return null;
1660                         } else if (transformed != null) {
1661                             return (O) transformed;
1662                         }
1663                     }
1664                     return null;
1665                 }
1666 
1667             };
1668         }
1669 
1670         private Iterator<O> doIteratorParallel() throws Throwable {
1671             final Iterator<I> iterator = this.delegate.doIterator();
1672             final List<Future<Object>> queue = Lists.newLinkedList();
1673             final Iterator<O> result = new AbstractIterator<O>() {
1674 
1675                 @SuppressWarnings("unchecked")
1676                 @Override
1677                 protected O advance() {
1678                     while (true) {
1679                         while (queue.size() < ProcessingStream.this.parallelism
1680                                 && iterator.hasNext()) {
1681                             offer(queue, iterator.next());
1682                         }
1683                         if (queue.isEmpty()) {
1684                             return null;
1685                         }
1686                         final Object output = take(queue);
1687                         if (output == EOF) {
1688                             return null;
1689                         } else if (output != null) {
1690                             return (O) output;
1691                         }
1692                     }
1693                 }
1694 
1695                 @Override
1696                 public void close() {
1697                     for (final Future<Object> future : queue) {
1698                         try {
1699                             future.cancel(true);
1700                         } catch (final Exception ex) {
1701                             // ignore
1702                         }
1703                     }
1704                 }
1705 
1706             };
1707             onClose(result);
1708             return result;
1709         }
1710 
1711         @Override
1712         protected final void doToHandler(final Handler<? super O> handler) throws Throwable {
1713             if (this.parallelism <= 1) {
1714                 doToHandlerSequential(handler);
1715             } else {
1716                 doToHandlerParallel(handler);
1717             }
1718         }
1719 
1720         private void doToHandlerSequential(final Handler<? super O> handler) throws Throwable {
1721             this.delegate.doToHandler(new Handler<I>() {
1722 
1723                 private boolean done = false;
1724 
1725                 @SuppressWarnings("unchecked")
1726                 @Override
1727                 public void handle(final I element) throws Throwable {
1728                     if (!this.done) {
1729                         if (element == null) {
1730                             handler.handle(null);
1731                             this.done = true;
1732                         } else {
1733                             final Object transformed = process(element);
1734                             if (transformed == EOF) {
1735                                 handler.handle(null);
1736                                 Thread.currentThread().interrupt();
1737                                 this.done = true;
1738                             } else if (transformed != null) {
1739                                 handler.handle((O) transformed);
1740                             }
1741                         }
1742                     }
1743                 }
1744 
1745             });
1746         }
1747 
1748         private void doToHandlerParallel(final Handler<? super O> handler) throws Throwable {
1749             final List<Future<Object>> queue = Lists.newLinkedList();
1750             try {
1751                 this.delegate.doToHandler(new Handler<I>() {
1752 
1753                     private boolean done = false;
1754 
1755                     @SuppressWarnings("unchecked")
1756                     @Override
1757                     public void handle(final I element) throws Throwable {
1758                         if (!this.done) {
1759                             if (element == null) {
1760                                 while (!this.done && !queue.isEmpty()) {
1761                                     final Object output = take(queue);
1762                                     if (output == EOF) {
1763                                         break;
1764                                     } else if (output != null) {
1765                                         handler.handle((O) output);
1766                                     }
1767                                 }
1768                                 handler.handle(null);
1769                                 this.done = true;
1770                             } else {
1771                                 if (queue.size() == ProcessingStream.this.parallelism) {
1772                                     final Object output = take(queue);
1773                                     if (output == EOF) {
1774                                         handler.handle(null);
1775                                         Thread.currentThread().interrupt();
1776                                         this.done = true;
1777                                     } else if (output != null) {
1778                                         handler.handle((O) output);
1779                                     }
1780                                 }
1781                                 if (!this.done) {
1782                                     offer(queue, element);
1783                                 }
1784                             }
1785                         }
1786                     }
1787 
1788                 });
1789             } finally {
1790                 for (final Future<Object> future : queue) {
1791                     try {
1792                         future.cancel(true);
1793                     } catch (final Exception ex) {
1794                         // ignore
1795                     }
1796                 }
1797             }
1798         }
1799 
1800         private Object take(final List<Future<Object>> queue) {
1801             return Futures.get(queue.remove(0), RuntimeException.class);
1802         }
1803 
1804         private void offer(final List<Future<Object>> queue, final I element) {
1805             queue.add(Data.getExecutor().submit(new Callable<Object>() {
1806 
1807                 @Override
1808                 public Object call() {
1809                     return process(element);
1810                 }
1811 
1812             }));
1813         }
1814 
1815         protected abstract Object process(I element);
1816 
1817     }
1818 
1819     private static final class FilterStream<T> extends ProcessingStream<T, T> {
1820 
1821         private final Predicate<? super T> predicate;
1822 
1823         FilterStream(final Stream<T> delegate, final int parallelism,
1824                 final Predicate<? super T> predicate) {
1825             super(delegate, parallelism);
1826             this.predicate = Preconditions.checkNotNull(predicate);
1827         }
1828 
1829         @Override
1830         protected Object process(final T element) {
1831             return this.predicate.apply(element) ? element : null;
1832         }
1833 
1834         @Override
1835         protected String doToString() {
1836             return this.predicate + ", " + this.parallelism;
1837         }
1838 
1839     }
1840 
1841     private static final class TransformElementStream<I, O> extends ProcessingStream<I, O> {
1842 
1843         private final Function<? super I, ? extends O> function;
1844 
1845         TransformElementStream(final Stream<I> delegate, final int parallelism,
1846                 final Function<? super I, ? extends O> function) {
1847             super(delegate, parallelism);
1848             this.function = Preconditions.checkNotNull(function);
1849         }
1850 
1851         @Override
1852         protected Object process(final I element) {
1853             return this.function.apply(element);
1854         }
1855 
1856         @Override
1857         protected String doToString() {
1858             return this.function + ", " + this.parallelism;
1859         }
1860 
1861     }
1862 
1863     private static final class TransformSequenceStream<I, O> extends DelegatingStream<I, O> {
1864 
1865         private final Function<Iterator<I>, Iterator<O>> iteratorFunction;
1866 
1867         private final Function<Handler<O>, Handler<I>> handlerFunction;
1868 
1869         TransformSequenceStream(final Stream<I> delegate,
1870                 @Nullable final Function<Iterator<I>, Iterator<O>> iteratorFunction,
1871                 @Nullable final Function<Handler<O>, Handler<I>> handlerFunction) {
1872             super(delegate);
1873             Preconditions.checkArgument(iteratorFunction != null || handlerFunction != null,
1874                     "At least one function must be supplied");
1875             this.iteratorFunction = iteratorFunction;
1876             this.handlerFunction = handlerFunction;
1877         }
1878 
1879         @Override
1880         protected Iterator<O> doIterator() throws Throwable {
1881             if (this.iteratorFunction != null) {
1882                 return this.iteratorFunction.apply(this.delegate.doIterator());
1883             } else {
1884                 return super.doIterator(); // delegates to doToHandler
1885             }
1886         }
1887 
1888         @SuppressWarnings("unchecked")
1889         @Override
1890         protected void doToHandler(final Handler<? super O> handler) throws Throwable {
1891             if (this.handlerFunction != null) {
1892                 this.delegate.doToHandler(this.handlerFunction.apply((Handler<O>) handler));
1893             } else {
1894                 super.doToHandler(handler); // delegates to doIterator
1895             }
1896         }
1897 
1898         @Override
1899         protected String doToString() {
1900             return this.iteratorFunction == null ? this.handlerFunction.toString()
1901                     : this.handlerFunction == null ? this.iteratorFunction.toString()
1902                             : this.iteratorFunction.toString() + ", "
1903                                     + this.handlerFunction.toString();
1904         }
1905 
1906     }
1907 
1908     private static final class TransformPathStream<I, O> extends ProcessingStream<I, List<O>> {
1909 
1910         private final Class<O> type;
1911 
1912         private final boolean lenient;
1913 
1914         private final Object[] path;
1915 
1916         TransformPathStream(final Stream<I> delegate, final Class<O> type, final boolean lenient,
1917                 final Object[] path) {
1918             super(delegate, 0);
1919             this.type = Preconditions.checkNotNull(type);
1920             this.lenient = lenient;
1921             this.path = Preconditions.checkNotNull(path);
1922         }
1923 
1924         @Override
1925         protected Object process(final I element) {
1926             final List<O> elements = Lists.newArrayList();
1927             path(element, 0, elements);
1928             return elements;
1929         }
1930 
1931         @SuppressWarnings({ "rawtypes", "unchecked" })
1932         private void path(final Object object, final int index, final List<O> result) {
1933             if (object == null) {
1934                 return;
1935             } else if (object instanceof Iterable && !(object instanceof BindingSet)) {
1936                 for (final Object element : (Iterable) object) {
1937                     path(element, index, result);
1938                 }
1939             } else if (object instanceof Iterator) {
1940                 final Iterator<?> iterator = (Iterator<?>) object;
1941                 while (iterator.hasNext()) {
1942                     path(iterator.next(), index, result);
1943                 }
1944             } else if (object.getClass().isArray()) {
1945                 final int length = Array.getLength(object);
1946                 for (int i = 0; i < length; ++i) {
1947                     path(Array.get(object, i), index, result);
1948                 }
1949             } else if (index == this.path.length) {
1950                 final O element = this.lenient ? Data.convert(object, this.type, null) : //
1951                         Data.convert(object, this.type);
1952                 if (element != null) {
1953                     result.add(element);
1954                 }
1955             } else {
1956                 final Object key = this.path[index];
1957                 if (object instanceof Record) {
1958                     if (key instanceof URI) {
1959                         path(((Record) object).get((URI) key), index + 1, result);
1960                     }
1961                 } else if (object instanceof BindingSet) {
1962                     if (key instanceof String) {
1963                         path(((BindingSet) object).getValue((String) key), index + 1, result);
1964                     }
1965                 } else if (object instanceof Map) {
1966                     path(((Map<Object, Object>) object).get(key), index + 1, result);
1967                 } else if (object instanceof Multimap) {
1968                     path(((Multimap<Object, Object>) object).get(this.path), index + 1, result);
1969                 }
1970             }
1971         }
1972 
1973     }
1974 
1975     private static final class DistinctStream<T> extends ProcessingStream<T, T> {
1976 
1977         private final Set<T> seen;
1978 
1979         DistinctStream(final Stream<T> delegate) {
1980             super(delegate, 0); // pure sequential processing
1981             this.seen = Sets.newHashSet();
1982         }
1983 
1984         @Override
1985         protected Object process(final T element) {
1986             return this.seen.add(element) ? element : null; // could be improved
1987         }
1988 
1989     }
1990 
1991     private static final class SliceStream<T> extends ProcessingStream<T, T> {
1992 
1993         private final long startIndex;
1994 
1995         private final long endIndex;
1996 
1997         private long index;
1998 
1999         SliceStream(final Stream<T> delegate, final long offset, final long limit) {
2000             super(delegate, 0); // no parallel evaluation
2001             Preconditions.checkArgument(offset >= 0, "Negative offset: {}", limit);
2002             Preconditions.checkArgument(limit >= 0, "Negative limit: {}", limit);
2003             this.startIndex = offset;
2004             this.endIndex = offset + limit;
2005             this.index = 0;
2006         }
2007 
2008         @Override
2009         protected Object process(final T element) {
2010             Object result = null;
2011             if (this.index >= this.endIndex) {
2012                 result = EOF;
2013             } else if (this.index >= this.startIndex) {
2014                 result = element;
2015             }
2016             ++this.index;
2017             return result;
2018         }
2019 
2020     }
2021 
2022     private static final class ChunkStream<T> extends DelegatingStream<T, List<T>> {
2023 
2024         private final int chunkSize;
2025 
2026         ChunkStream(final Stream<T> delegate, final int chunkSize) {
2027             super(delegate);
2028             Preconditions.checkArgument(chunkSize > 0, "Invalid chunk size: %d", chunkSize);
2029             this.chunkSize = chunkSize;
2030         }
2031 
2032         @Override
2033         protected Iterator<List<T>> doIterator() throws Throwable {
2034             final Iterator<T> iterator = this.delegate.doIterator();
2035             return new AbstractIterator<List<T>>() {
2036 
2037                 private final Object[] chunk = new Object[ChunkStream.this.chunkSize];
2038 
2039                 @SuppressWarnings("unchecked")
2040                 @Override
2041                 protected List<T> advance() {
2042                     int index = 0;
2043                     for (; index < ChunkStream.this.chunkSize && iterator.hasNext(); ++index) {
2044                         this.chunk[index] = iterator.next();
2045                     }
2046                     if (index == 0) {
2047                         return null;
2048                     } else if (index == ChunkStream.this.chunkSize) {
2049                         return (List<T>) ImmutableList.copyOf(this.chunk);
2050                     } else {
2051                         return (List<T>) ImmutableList.copyOf(Arrays.asList(this.chunk).subList(0,
2052                                 index));
2053                     }
2054                 }
2055 
2056             };
2057         }
2058 
2059         @Override
2060         protected void doToHandler(final Handler<? super List<T>> handler) throws Throwable {
2061             this.delegate.doToHandler(new Handler<T>() {
2062 
2063                 private final List<T> chunk = Lists.newArrayList();
2064 
2065                 @Override
2066                 public void handle(final T element) throws Throwable {
2067                     if (element == null) {
2068                         if (!this.chunk.isEmpty()) {
2069                             handler.handle(ImmutableList.copyOf(this.chunk));
2070                         }
2071                         handler.handle(null);
2072                     } else {
2073                         this.chunk.add(element);
2074                         if (this.chunk.size() == ChunkStream.this.chunkSize) {
2075                             handler.handle(ImmutableList.copyOf(this.chunk));
2076                             this.chunk.clear();
2077                         }
2078                     }
2079                 }
2080 
2081             });
2082         }
2083 
2084         @Override
2085         protected String doToString() {
2086             return Integer.toString(this.chunkSize);
2087         }
2088 
2089     }
2090 
2091     private static final class TrackStream<T> extends DelegatingStream<T, T> {
2092 
2093         private final AtomicLong counter;
2094 
2095         private final AtomicBoolean eof;
2096 
2097         public TrackStream(final Stream<T> delegate, @Nullable final AtomicLong counter,
2098                 @Nullable final AtomicBoolean eof) {
2099             super(delegate);
2100             this.counter = counter != null ? counter : new AtomicLong();
2101             this.eof = eof != null ? eof : new AtomicBoolean();
2102             this.counter.set(0L);
2103             this.eof.set(false);
2104         }
2105 
2106         @Override
2107         protected Iterator<T> doIterator() throws Throwable {
2108             final Iterator<T> iterator = this.delegate.doIterator();
2109             return new UnmodifiableIterator<T>() {
2110 
2111                 private long count = 0L;
2112 
2113                 @Override
2114                 public boolean hasNext() {
2115                     final boolean result = iterator.hasNext();
2116                     TrackStream.this.eof.set(result);
2117                     return result;
2118                 }
2119 
2120                 @Override
2121                 public T next() {
2122                     final T next = iterator.next();
2123                     TrackStream.this.counter.set(++this.count);
2124                     return next;
2125                 }
2126 
2127             };
2128         }
2129 
2130         @Override
2131         protected void doToHandler(final Handler<? super T> handler) throws Throwable {
2132             this.delegate.doToHandler(new Handler<T>() {
2133 
2134                 private long count = 0L;
2135 
2136                 @Override
2137                 public void handle(final T element) throws Throwable {
2138                     if (element != null) {
2139                         TrackStream.this.counter.set(++this.count);
2140                         handler.handle(element);
2141                     } else {
2142                         TrackStream.this.eof.set(true);
2143                         handler.handle(null);
2144                     }
2145                 }
2146 
2147             });
2148         }
2149 
2150     }
2151 
2152     private static final class PathComparator implements Comparator<Object> {
2153 
2154         private final Class<? extends Comparable<?>> type;
2155 
2156         private final Object[] path;
2157 
2158         private final boolean lenient;
2159 
2160         PathComparator(final Class<? extends Comparable<?>> type, final boolean lenient,
2161                 final Object... path) {
2162             this.type = Preconditions.checkNotNull(type);
2163             this.lenient = lenient;
2164             this.path = path.clone();
2165         }
2166 
2167         @SuppressWarnings({ "rawtypes", "unchecked" })
2168         @Override
2169         public int compare(final Object first, final Object second) {
2170             final Comparable firstKey = path(first, 0);
2171             final Comparable secondKey = path(second, 0);
2172             if (firstKey == null) {
2173                 return secondKey == null ? 0 : 1;
2174             } else {
2175                 return secondKey == null ? -1 : firstKey.compareTo(secondKey);
2176             }
2177         }
2178 
2179         @SuppressWarnings({ "unchecked" })
2180         private Comparable<?> path(final Object object, final int index) {
2181             if (object == null) {
2182                 return null;
2183             } else if (object instanceof Iterable && !(object instanceof BindingSet)) {
2184                 final Iterator<?> iterator = ((Iterable<?>) object).iterator();
2185                 return iterator.hasNext() ? path(iterator.next(), index) : null;
2186             } else if (object instanceof Iterator) {
2187                 final Iterator<?> iterator = (Iterator<?>) object;
2188                 return iterator.hasNext() ? path(iterator.next(), index) : null;
2189             } else if (object.getClass().isArray()) {
2190                 final int length = Array.getLength(object);
2191                 return length > 0 ? path(Array.get(object, 0), index) : null;
2192             } else if (index == this.path.length) {
2193                 return this.lenient ? Data.convert(object, this.type, null) : //
2194                         Data.convert(object, this.type);
2195             } else {
2196                 final Object key = this.path[index];
2197                 if (object instanceof Record) {
2198                     return key instanceof URI ? path(((Record) object).get((URI) key), index + 1)
2199                             : null;
2200                 } else if (object instanceof BindingSet) {
2201                     return key instanceof String ? path(
2202                             ((BindingSet) object).getValue((String) key), index + 1) : null;
2203                 } else if (object instanceof Map) {
2204                     return path(((Map<Object, Object>) object).get(key), index + 1);
2205                 } else if (object instanceof Multimap) {
2206                     return path(((Multimap<Object, Object>) object).get(this.path), index + 1);
2207                 }
2208                 return null;
2209             }
2210         }
2211 
2212     }
2213 
2214 }