1 package eu.fbk.knowledgestore.datastore.hbase.utils;
2
3 import java.io.DataInput;
4 import java.io.DataOutput;
5 import java.io.IOException;
6
7 import com.google.common.base.Preconditions;
8
9 import org.apache.hadoop.hbase.KeyValue;
10 import org.apache.hadoop.hbase.filter.FilterBase;
11 import org.openrdf.model.URI;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14
15 import eu.fbk.knowledgestore.data.Dictionary;
16 import eu.fbk.knowledgestore.data.Record;
17 import eu.fbk.knowledgestore.data.XPath;
18
19 public class HBaseFilter extends FilterBase {
20
21 static final Logger LOGGER = LoggerFactory.getLogger(HBaseFilter.class);
22
23
24 private XPath condition;
25
26
27 private AvroSerializer serializer;
28
29
30
31
32
33
34 public HBaseFilter() {
35 }
36
37
38
39
40
41
42
43
44
45 public HBaseFilter(final XPath condition, final AvroSerializer serializer) {
46 Preconditions.checkNotNull(condition);
47 Preconditions.checkNotNull(serializer);
48 this.condition = condition;
49 this.serializer = serializer;
50 }
51
52 @Override
53 public ReturnCode filterKeyValue(final KeyValue keyValue) {
54 final Record record = (Record) this.serializer.fromBytes(keyValue.getValue());
55 return this.condition.evalBoolean(record) ? ReturnCode.INCLUDE : ReturnCode.NEXT_ROW;
56 }
57
58 @Override
59 public void readFields(final DataInput dataIn) throws IOException {
60 final String conditionString = dataIn.readUTF();
61 final String dictionaryURL = dataIn.readUTF();
62 this.condition = XPath.parse(conditionString);
63 this.serializer = new AvroSerializer(Dictionary.createHadoopDictionary(URI.class, dictionaryURL));
64 }
65
66 @Override
67 public void write(final DataOutput dataOut) throws IOException {
68 dataOut.writeUTF(this.condition.toString());
69 dataOut.writeUTF(this.serializer.getDictionary().getDictionaryURL());
70 }
71
72 }