1   package eu.fbk.knowledgestore.datastore.hbase.utils;
2   
3   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HADOOP_FS_DEFAULT_NAME;
4   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_TRAN_LAYER;
5   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_ZOOKEEPER_CLIENT_PORT;
6   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_ZOOKEEPER_QUORUM;
7   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TRAN_LAYER_OPT;
8   import static java.lang.Integer.MAX_VALUE;
9   
10  import java.io.IOException;
11  import java.nio.ByteBuffer;
12  import java.util.List;
13  import java.util.Properties;
14  
15  import org.apache.hadoop.hbase.HBaseConfiguration;
16  import org.apache.hadoop.hbase.HColumnDescriptor;
17  import org.apache.hadoop.hbase.HTableDescriptor;
18  import org.apache.hadoop.hbase.MasterNotRunningException;
19  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
20  import org.apache.hadoop.hbase.client.Delete;
21  import org.apache.hadoop.hbase.client.HBaseAdmin;
22  import org.apache.hadoop.hbase.client.Put;
23  import org.apache.hadoop.hbase.client.ResultScanner;
24  import org.apache.hadoop.hbase.client.Scan;
25  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
26  import org.apache.hadoop.hbase.filter.FilterList;
27  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
28  import org.apache.hadoop.hbase.util.Bytes;
29  import org.openrdf.model.URI;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  import eu.fbk.knowledgestore.data.Record;
34  import eu.fbk.knowledgestore.data.XPath;
35  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
36  
37  /**
38   * Class defining all HBase operations.
39   */
40  public abstract class AbstractHBaseUtils {
41  
42      /** Logger object used inside HdfsFileStore. */
43      public static Logger logger = LoggerFactory.getLogger(AbstractHBaseUtils.class);
44  
45      private org.apache.hadoop.conf.Configuration hbcfg;
46      
47      private AvroSerializer serializer;
48      
49      private String hbaseTableNamePrefix;
50  
51      private boolean serverFilterFlag;
52  
53      /**
54       * Constructor.
55       * @param xmlConf holds all configuration properties.
56       */
57      public AbstractHBaseUtils(Properties properties) {
58          createConfiguration(properties);
59      }
60  
61      public static AbstractHBaseUtils factoryHBaseUtils(Properties properties) {
62          AbstractHBaseUtils hbaseUtils = null;
63          if (properties.getProperty(HBASE_TRAN_LAYER, OMID_TRAN_LAYER_OPT).equalsIgnoreCase(
64                  OMID_TRAN_LAYER_OPT)) {
65              logger.info("Using OMID HbaseUtils");
66              hbaseUtils = new OmidHBaseUtils(properties);
67          } else {
68              logger.info("Using Native HbaseUtils");
69              hbaseUtils = new HBaseUtils(properties);
70          }
71          return hbaseUtils;
72      }
73  
74      /**
75       * Begins a transaction.
76       */
77      public abstract void begin();
78  
79      /**
80       * Commits operations done.
81       * @throws DataCorruptedException in case a rollback has not been successful.
82       * @throws IOException in case a commit has not been successful.
83       */
84      public abstract void commit() throws DataCorruptedException, IOException;
85  
86      /**
87       * Rollbacks operations done.
88       * @throws DataCorruptedException in case a rollback has not been successful.
89       * @throws IOException in case a commit has not been successful.
90       */
91      public abstract void rollback() throws DataCorruptedException, IOException;
92  
93      /**
94       * Gets a handle of a specific table.
95       * @param tableName of the table to be accessed.
96       * @return Object of the table found.
97       */
98      public abstract Object getTable(String tableName);
99  
100     /**
101      * Process operations on the Resource table.
102      * @param record to be put.
103      * @param tableName where operations will be performed.
104      * @param ops to be performed into tableName.
105      * @param conf object to connect to HBase.
106      * @param isPut to determine if the operation is put or a delete.
107      */
108     public abstract void processPut(Record record, String tabName,
109             String famName, String quaName);
110 
111     /**
112      * Process operations on the Resource table.
113      * @param record to be deleted.
114      * @param tabName where operations will be performed.
115      * @param famName where operation will be performed.
116      * @param quaName where operation will be performed.
117      * @param conf object to connect to HBase.
118      */
119     public abstract void processDelete(URI id, String tabName,
120             String famName, String quaName);
121 
122     /**
123      * Gets a Record based on information passed.
124      * @param tableName to do the get.
125      * @param id of the Record needed
126      * @throws IOException
127      */
128     public abstract Record get(String tableName, URI id)
129             throws IOException;
130 
131     /**
132      * Gets a scanner based on the Scan object
133      * @param scan to retrieve an HBase scanner
134      * @return
135      */
136     public abstract ResultScanner getScanner(String tableName, Scan scan);
137 
138     /**
139      * Gets a resource based on information passed.
140      * @param tableName table name to get data from
141      * @param ids to be retrieved
142      * @param conf
143      * @return
144      * @throws IOException
145      */
146     public abstract List<Record> get(String tableName, List<URI> ids)
147             throws IOException;
148 
149     /**
150      * Checking for errors after operations have been processed.
151      * @param objs
152      * @return
153      */
154     public abstract List<Object> checkForErrors(Object[] objs);
155 
156     /**
157      * Counts the records having the type and matching the optional condition specified. This
158      * method returns the number of matching instances instead of retrieving the corresponding {@code Record}
159      * objects.
160      * 
161      * @param type
162      *            the URI of the type of records to return
163      * @param condition
164      *            an optional condition to be satisfied by matching records; if null, no condition
165      *            must be checked
166      * @return the number of records matching the optional condition and type specified
167      * @throws IOException
168      *             in case some IO error occurs
169      */
170     public abstract long count(String tableName, String familyName, XPath condition)
171             throws IOException;
172 
173     /**
174      * Creates an HBase configuration object.
175      * 
176      * @param properties the configuration properties
177      */
178     public void createConfiguration(final Properties properties) {
179 
180         setHbcfg(HBaseConfiguration.create());
181 
182         getHbcfg().set(HBASE_ZOOKEEPER_QUORUM,
183                 properties.getProperty(HBASE_ZOOKEEPER_QUORUM, "hlt-services4"));
184 
185         getHbcfg().set(HBASE_ZOOKEEPER_CLIENT_PORT,
186                 properties.getProperty(HBASE_ZOOKEEPER_CLIENT_PORT, "2181"));
187 
188         getHbcfg().set(HADOOP_FS_DEFAULT_NAME,
189                 properties.getProperty(HADOOP_FS_DEFAULT_NAME, "hdfs://hlt-services4:9000"));
190 
191         // getHbcfg().set("hbase.client.retries.number", "1");
192     }
193 
194     /** 
195      * Gets filter based on the condition to be performed
196      * @param condition To be applied server sided
197      * @param passAll boolean if all elements have to pass the test
198      * @param famNames to be checked
199      * @param qualNames to be checked
200      * @param params that could be needed
201      * @return FilterList containing all filters needed
202      */
203     public FilterList getFilter(XPath condition, boolean passAll,
204             String []famNames, String []qualNames, String []params) {
205         FilterList list = new FilterList((passAll)?FilterList.Operator.MUST_PASS_ALL:
206         FilterList.Operator.MUST_PASS_ONE);
207         for (int iCont = 0; iCont < famNames.length; iCont ++) {
208             SingleColumnValueFilter filterTmp = new SingleColumnValueFilter(
209                 Bytes.toBytes(famNames[iCont]),
210                 Bytes.toBytes(qualNames[iCont]),
211                 CompareOp.EQUAL,
212                 Bytes.toBytes(params[iCont])
213                 );
214             list.addFilter(filterTmp);
215         }
216         return list;
217     }
218  
219     /**
220      * Creates a scan
221      * @param tableName to be scan
222      * @param famName to be checked
223      * @param startKey to query the table
224      * @param endKey to query the table
225      * @param conf 
226      * @return Scan inside the table
227      * @throws IOException
228      */
229      public Scan getResultScan(String tableName, String famName,
230              ByteBuffer startKey, ByteBuffer endKey) throws IOException {
231 	 logger.debug("AbstractHBaseUtils Begin of getResultScan(" + tableName + ", " + famName + ")");
232          Scan scan = new Scan();
233          scan.addFamily(Bytes.toBytes(famName));
234          // For a range scan, set start / stop id or just start.
235          if (startKey != null)
236              scan.setStartRow(Bytes.toBytes(startKey));
237          if (endKey != null)
238              scan.setStopRow(Bytes.toBytes(endKey));
239          return scan;
240      }
241 
242     /**
243      * Creates a result scanner
244      * @param tableName
245      * @param famName
246      * @param conf
247      * @return
248      * @throws IOException
249      */
250      public Scan getScan(String tableName,
251              String famName) throws IOException {
252          return getResultScan(tableName, famName, null, null);
253      }
254 
255     /**
256      * Creates puts for HBase
257      * @param record
258      * @throws IOException
259      */
260     public Put createPut(Record record, String tableName,
261             String famName, String quaName) throws IOException {
262         Object tTable = getTable(tableName);
263         Put put = null;
264         if (tTable != null) {
265             // Transforming data model record into an Avro record
266             AvroSerializer serializer = getSerializer();
267             final byte[] bytes = serializer.toBytes(record);
268             // Resource's Key
269             put = new Put(Bytes.toBytes(record.getID().toString()));
270             // Resource's Value
271             put.add(Bytes.toBytes(famName), Bytes.toBytes(quaName), bytes);
272         }
273         return put;
274     }
275 
276     /**
277      * Creates deletes for HBase tables
278      * @param id
279      * @param tableName
280      * @param conf
281      * @return
282      * @throws IOException
283      */
284     public Delete createDelete(URI id, String tableName) throws IOException {
285         Delete del = null;
286         Object tTable = getTable(tableName);
287         if (tTable != null) {
288             del = new Delete(Bytes.toBytes(id.toString()));
289         }
290         return del;
291     }
292 
293     /**
294      * @return the logger
295      */
296     public static Logger getLogger() {
297         return logger;
298     }
299 
300     /**
301      * @param logger the logger to set
302      */
303     public void setLogger(Logger pLogger) {
304         logger = pLogger;
305     }
306 
307     /**
308      * Checks and/or create table with specific column family
309      * @param tabName 
310      * @param colFamName
311      * @throws IOException 
312      */
313     public void checkAndCreateTable(String tabName, String colFamName) throws IOException {
314         HBaseAdmin hba;
315         try {
316             hba = new HBaseAdmin(this.getHbcfg());
317             if (hba.tableExists(tabName) == false) {
318                 logger.debug("creating table " + tabName);
319                 final HTableDescriptor tableDescriptor = new HTableDescriptor(tabName);
320                 final HColumnDescriptor columnDescriptor = new HColumnDescriptor(colFamName);
321 		columnDescriptor.setMaxVersions(MAX_VALUE);
322 		tableDescriptor.addFamily(columnDescriptor);
323                 hba.createTable(tableDescriptor);
324             } else {
325                 logger.debug("already existent table " + tabName);
326             }
327             hba.close();
328         } catch (MasterNotRunningException e) {
329             throw new IOException(e);
330         } catch (ZooKeeperConnectionException e) {
331             throw new IOException(e);
332         } catch (IOException e) {
333             throw new IOException(e);
334         }
335     }
336     /**
337      * @return the hbcfg
338      */
339     public org.apache.hadoop.conf.Configuration getHbcfg() {
340         return hbcfg;
341     }
342     /**
343      * @param hbcfg the hbcfg to set
344      */
345     public void setHbcfg(org.apache.hadoop.conf.Configuration hbcfg) {
346         this.hbcfg = hbcfg;
347     }
348 
349     public void initServerFilterFlag(boolean serverFilterFlag) {
350         this.serverFilterFlag = serverFilterFlag;
351     }
352    
353     /**
354      * @return the server filter flag
355      */
356     public boolean getServerFilterFlag() {
357         return serverFilterFlag;
358     }
359 
360     public void initSerializer(AvroSerializer serializer) {
361         this.serializer = serializer;
362     }
363    
364     /**
365      * @return the serializer
366      */
367     public AvroSerializer getSerializer() {
368         return serializer;
369     }
370     
371     public void initHbaseTableNamePrefix(String hbaseTableNamePrefix) {
372         this.hbaseTableNamePrefix = hbaseTableNamePrefix;
373     }
374    
375     /**
376      * @return the hbase table prefix
377      */
378     public String getHbaseTableNamePrefix() {
379         return hbaseTableNamePrefix;
380     }
381 
382 }