1   package eu.fbk.knowledgestore.datastore.hbase;
2   
3   import java.io.IOException;
4   import java.util.Collections;
5   import java.util.Iterator;
6   import java.util.List;
7   import java.util.Set;
8   
9   import javax.annotation.Nullable;
10  
11  import com.google.common.base.Preconditions;
12  import com.google.common.base.Throwables;
13  import com.google.common.collect.AbstractIterator;
14  import com.google.common.collect.ImmutableList;
15  import com.google.common.collect.Iterables;
16  import com.google.common.collect.Lists;
17  
18  import org.openrdf.model.URI;
19  
20  import eu.fbk.knowledgestore.data.Record;
21  import eu.fbk.knowledgestore.datastore.hbase.utils.AbstractHBaseUtils;
22  
23  /**
24   * Performs batch lookup of records by ID from a given HBase table, optionally returning only a
25   * subset of record properties.
26   */
27  public class HBaseIterator extends AbstractIterator<Record> {
28  
29      /** The batch size. */
30      private static final int BATCH_SIZE = 100;
31  
32      /** Object referencing transactional layer. */
33      private final AbstractHBaseUtils hbaseUtils;
34  
35      /** HBase table name to be used */
36      private final String tableName;
37  
38      /** Properties to be looked up. */
39      private final URI[] properties;
40  
41      /** An iterator over the IDs of the records to lookup. */
42      private final Iterator<URI> idIterator;
43  
44      /** An iterator over records buffered in the last batch looked up from HBase. */
45      private Iterator<Record> recordIterator;
46  
47      /**
48       * Creates a new {@code HBaseStream} for the parameters supplied.
49       * 
50       * @param hbaseUtils
51       *            the {@code AbstractHBaseUtils} object for accessing HBase, not null
52       * @param tableName
53       *            the name of the HBase table to access, not null
54       * @param ids
55       *            the IDs of records to fetch from HBase, not null
56       * @param properties
57       *            the properties of records to return, null if all properties should be returned
58       */
59      @SuppressWarnings("unchecked")
60      public HBaseIterator(final AbstractHBaseUtils hbaseUtils, final String tableName,
61              final Set<? extends URI> ids, @Nullable final Set<? extends URI> properties) {
62  
63          Preconditions.checkNotNull(hbaseUtils);
64          Preconditions.checkNotNull(tableName);
65          Preconditions.checkNotNull(ids);
66  
67          this.hbaseUtils = hbaseUtils;
68          this.tableName = tableName;
69          this.properties = properties == null ? null : Iterables.toArray(properties, URI.class);
70          this.idIterator = (Iterator<URI>) ImmutableList.copyOf(ids).iterator();
71          this.recordIterator = Collections.emptyIterator();
72      }
73  
74      @Override
75      protected Record computeNext() {
76          while (true) {
77              // Return a record previously buffered, if available
78              if (this.recordIterator.hasNext()) {
79                  return this.recordIterator.next();
80              }
81  
82              // Otherwise, retrieve next batch of IDs
83              final List<URI> ids = Lists.newArrayListWithCapacity(BATCH_SIZE);
84              while (this.idIterator.hasNext() && ids.size() < BATCH_SIZE) {
85                  ids.add(this.idIterator.next());
86              }
87  
88              // EOF reached if there are no more IDs to retrieve
89              if (ids.isEmpty()) {
90                  return endOfData();
91              }
92  
93              // Retrieve next batch of records corresponding to IDs batch
94              final List<Record> records;
95              try {
96                  records = this.hbaseUtils.get(this.tableName, ids);
97              } catch (final IOException ex) {
98                  throw Throwables.propagate(ex);
99              }
100 
101             // Perform client-side projection, if requested
102             if (this.properties != null) {
103                 for (int i = 0; i < records.size(); ++i) {
104                     records.set(i, records.get(i).retain(this.properties));
105                 }
106             }
107 
108             // Store fetched record in record iterator and return first one
109             this.recordIterator = records.iterator();
110         }
111     }
112 
113 }