1   package eu.fbk.knowledgestore.datastore.hbase.utils;
2   
3   import java.io.DataInput;
4   import java.io.DataOutput;
5   import java.io.IOException;
6   
7   import com.google.common.base.Preconditions;
8   
9   import org.apache.hadoop.hbase.KeyValue;
10  import org.apache.hadoop.hbase.filter.FilterBase;
11  import org.openrdf.model.URI;
12  import org.slf4j.Logger;
13  import org.slf4j.LoggerFactory;
14  
15  import eu.fbk.knowledgestore.data.Dictionary;
16  import eu.fbk.knowledgestore.data.Record;
17  import eu.fbk.knowledgestore.data.XPath;
18  
19  public class HBaseFilter extends FilterBase {
20  
21      static final Logger LOGGER = LoggerFactory.getLogger(HBaseFilter.class);
22  
23      /** The condition to be applied to filter out records. */
24      private XPath condition;
25  
26      /** The serializer to be used to decode stored byte arrays into records. */
27      private AvroSerializer serializer;
28  
29      /**
30       * Default constructor for exclusive use by HBase. This constructor is used for
31       * deserialization of an {@code HBaseFilter} on a region server; initialization of the object
32       * is done by {@link #readFields(DataInput)}.
33       */
34      public HBaseFilter() {
35      }
36  
37      /**
38       * Constructor for normal use.
39       * 
40       * @param condition
41       *            the condition to be evaluated for each Node.
42       * @param serializer
43       *            the serializer for decoding stored byte arrays into records
44       */
45      public HBaseFilter(final XPath condition, final AvroSerializer serializer) {
46          Preconditions.checkNotNull(condition);
47          Preconditions.checkNotNull(serializer);
48          this.condition = condition;
49          this.serializer = serializer;
50      }
51  
52      @Override
53      public ReturnCode filterKeyValue(final KeyValue keyValue) {
54          final Record record = (Record) this.serializer.fromBytes(keyValue.getValue());
55          return this.condition.evalBoolean(record) ? ReturnCode.INCLUDE : ReturnCode.NEXT_ROW;
56      }
57  
58      @Override
59      public void readFields(final DataInput dataIn) throws IOException {
60          final String conditionString = dataIn.readUTF();
61          final String dictionaryURL = dataIn.readUTF();
62          this.condition = XPath.parse(conditionString);
63          this.serializer = new AvroSerializer(Dictionary.createHadoopDictionary(URI.class, dictionaryURL));
64      }
65  
66      @Override
67      public void write(final DataOutput dataOut) throws IOException {
68          dataOut.writeUTF(this.condition.toString());
69          dataOut.writeUTF(this.serializer.getDictionary().getDictionaryURL());
70      }
71  
72  }