1   package eu.fbk.knowledgestore.datastore.hbase;
2   
3   import java.io.Closeable;
4   import java.io.IOException;
5   import java.util.Iterator;
6   
7   import javax.annotation.Nullable;
8   
9   import com.google.common.base.Preconditions;
10  import com.google.common.collect.AbstractIterator;
11  import com.google.common.collect.Iterables;
12  
13  import org.apache.hadoop.hbase.client.Result;
14  import org.apache.hadoop.hbase.client.ResultScanner;
15  import org.apache.hadoop.hbase.client.Scan;
16  import org.openrdf.model.URI;
17  import org.slf4j.Logger;
18  import org.slf4j.LoggerFactory;
19  
20  import eu.fbk.knowledgestore.data.Record;
21  import eu.fbk.knowledgestore.data.XPath;
22  import eu.fbk.knowledgestore.datastore.hbase.utils.AbstractHBaseUtils;
23  import eu.fbk.knowledgestore.datastore.hbase.utils.AvroSerializer;
24  import eu.fbk.knowledgestore.datastore.hbase.utils.HBaseFilter;
25  
26  /**
27   * Performs an HBase scanner-based retrieval of the records matching an optional condition from a
28   * certain HBase table/column family.
29   */
30  public class HBaseScanIterator extends AbstractIterator<Record> implements Closeable {
31  
32      /** Logger object used inside HdfsFileStore. */
33      private static final Logger LOGGER = LoggerFactory.getLogger(HBaseScanIterator.class);
34  
35      /** Optional conditions to be applied locally to match records. */
36      @Nullable
37  	private final XPath condition;
38  
39      /** The properties to return, null if all properties are requested. */
40      @Nullable
41  	private final URI[] properties;
42  
43      /** Created scanner, kept in order to close it after iteration. */
44      @Nullable
45  	private final ResultScanner scanner;
46  
47      /** The iterator returned by the scanner. */
48      private final Iterator<Result> hbaseIterator;
49  
50      /** The {@code AvroSerializer} used to deserialize rows into records. */
51      private final AvroSerializer serializer;
52  
53      /**
54       * Creates a new {@code HBaseScanStream} based on the parameters supplied.
55       * 
56       * @param hbaseUtils
57       *            the {@code AbstractHBaseUtils} object for accessing HBase, not null
58       * @param tableName
59       *            the name of the HBase table to access, not null
60       * @param familyName
61       *            the name of the HBase column family to access, not null
62       * @param condition
63       *            optional condition to be satisfied by matching records, possibly null
64       * @param properties
65       *            properties to return, null if all properties are requested
66       * @param localFiltering
67       *            true if filtering should be performed locally to the HBase client
68       * @throws IOException
69       *             on failure
70       */
71      public HBaseScanIterator(final AbstractHBaseUtils hbaseUtils, final String tableName,
72  			     final String familyName, @Nullable final XPath condition,
73  			     @Nullable final Iterable<? extends URI> properties, final boolean localFiltering)
74  	throws IOException {
75  
76          // Check parameters
77          Preconditions.checkNotNull(hbaseUtils);
78          Preconditions.checkNotNull(tableName);
79          Preconditions.checkNotNull(familyName);
80  
81          // Configure Scan operation, differentiating between local or remote filtering
82          final Scan scan = hbaseUtils.getScan(tableName, familyName);
83          if (condition != null && !localFiltering) {
84              scan.setFilter(new HBaseFilter(condition, hbaseUtils.getSerializer()));
85          }
86  
87          // Open a result scanner and keep track of it, so that it can be closed at the end
88          final ResultScanner scanner = hbaseUtils.getScanner(tableName, scan);
89  
90          // Initialize state
91          this.condition = localFiltering ? condition : null; // unset on server-side filtering
92          this.properties = properties == null ? null : Iterables.toArray(properties, URI.class);
93          this.serializer = hbaseUtils.getSerializer();
94          this.scanner = scanner;
95          this.hbaseIterator = this.scanner.iterator();
96      }
97  
98      @Override
99  	protected Record computeNext() {
100 
101 	try {
102 	    // Iterate until a matching record is found or EOF is reached
103 	    while (this.hbaseIterator.hasNext()) {
104 
105 		// Retrieve next binary result from HBase
106 		final byte[] bytes = this.hbaseIterator.next().value();
107 
108 		// Attempt deserialization. Log and skip result on failure
109 		Record record;
110 		try {
111 		    record = (Record) this.serializer.fromBytes(bytes);
112 		} catch (final Throwable ex) {
113 		    LOGGER.error("discarded record with avroBytes \"" + bytes 
114 				 + ", " + ex.toString());
115 		    continue;
116 		}
117 
118 		// Evaluate condition locally, if required
119 		if (this.condition != null) {
120 		    final boolean matches = this.condition.evalBoolean(record);
121 		    if (!matches) {
122 			continue;
123 		    }
124 		}
125 
126 		// Perform client-side projection, if requested
127 		if (this.properties != null) {
128 		    record = record.retain(this.properties);
129 		}
130 
131 		// Return the record
132 		return record;
133 	    }
134 
135 	    // Signal EOF
136 	    return endOfData();
137 
138 	} catch (Exception e) {
139 	    LOGGER.warn("ignored Exception |" + e.toString() + "| and returned");
140 	    return null;
141 	}
142     }
143 
144     /**
145      * {@inheritDoc} Closes the HBase scanner, if previousy created.
146      */
147     @Override
148     public void close() {
149         if (this.scanner != null) {
150             LOGGER.debug("Closing HBaseScanIterator");
151             this.scanner.close();
152         }
153     }
154 
155 }