1   package eu.fbk.knowledgestore.datastore.hbase.utils;
2   
3   import java.io.IOException;
4   import java.util.ArrayList;
5   import java.util.HashMap;
6   import java.util.List;
7   import java.util.Map;
8   import java.util.Properties;
9   
10  import com.google.common.base.Preconditions;
11  
12  import org.apache.hadoop.hbase.HTableDescriptor;
13  import org.apache.hadoop.hbase.client.Delete;
14  import org.apache.hadoop.hbase.client.Get;
15  import org.apache.hadoop.hbase.client.HBaseAdmin;
16  import org.apache.hadoop.hbase.client.HTable;
17  import org.apache.hadoop.hbase.client.Put;
18  import org.apache.hadoop.hbase.client.Result;
19  import org.apache.hadoop.hbase.client.ResultScanner;
20  import org.apache.hadoop.hbase.client.Scan;
21  import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
22  import org.apache.hadoop.hbase.util.Bytes;
23  import org.openrdf.model.URI;
24  
25  import eu.fbk.knowledgestore.data.Record;
26  import eu.fbk.knowledgestore.data.XPath;
27  
28  public class HBaseUtils extends AbstractHBaseUtils {
29  
30      /** The map tableName -> table handle */
31      private static Map<String, HTable> tableNameHandleMap = new HashMap<String, HTable>();
32  
33      public HBaseUtils(final Properties properties) {
34          super(properties);
35      }
36  
37      /**
38       * Gets a handle of a specific table.
39       * @param tableName of the table to be accessed.
40       * @return HTable of the table found.
41       */
42      @Override
43      public HTable getTable(String tableName) {
44  	logger.debug("NATIVE Begin of getTable for " + tableName);
45          HTable table = tableNameHandleMap.get(tableName);
46          if (table != null) {
47              logger.debug("NATIVE Found a cached handle for table " + tableName);
48              return table;
49          }
50          try {
51              logger.debug("NATIVE Looking for a handle of table: " + tableName);
52              HBaseAdmin admin = new HBaseAdmin(this.getHbcfg());
53              HTableDescriptor[] resources = admin.listTables(tableName);
54  	    Preconditions.checkElementIndex(0, resources.length, "no table " + tableName + " found");
55              admin.close();
56              table = new HTable(this.getHbcfg(), tableName);
57          } catch (IOException e) {
58              logger.error("NATIVE Error while trying to obtain table: " + tableName);
59              logger.error(e.getMessage());
60          };
61          tableNameHandleMap.put(tableName, table);
62          logger.debug("NATIVE Cached a handle of table: " + tableName);
63          return table;
64      }
65  
66      /**
67       * Commits work done.
68       */
69      @Override
70      public void commit() {
71      }
72  
73      /**
74       * Rollbacks work done.
75       */
76      @Override
77      public void rollback() {
78      }
79  
80      /**
81       * Gets a scanner for a specific table
82       * @param tableName to get the scanner from
83       * @param scan for the specific table
84       * @param conf object to get a hold of an HBase table
85       */
86      @Override
87      public ResultScanner getScanner(String tableName, Scan scan) {
88  	logger.debug("NATIVE Begin of getScanner(" + tableName + ", " + scan + ")");
89          HTable tab = (HTable) getTable(tableName);
90          ResultScanner resScanner = null;
91          try {
92              resScanner = tab.getScanner(scan);
93          } catch (IOException e) {
94              logger.error("Error while trying to obtain a ResultScanner: " + tableName);
95              logger.error(e.getMessage());
96          }
97          return resScanner;
98      }
99  
100     /**
101      * Process put operations on an HBase table.
102      */
103     @Override
104     public void processPut(Record record, String tabName,
105             String famName, String quaName) {
106 	logger.debug("NATIVE Begin processPut(" + record + ", " + tabName + ")");
107         HTable hTable = getTable(tabName);
108         try {
109             Put op = createPut(record, tabName, famName, quaName);
110             hTable.put(op);
111         } catch (IOException e) {
112             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
113             logger.error(e.getMessage());
114         }
115     }
116 
117     /**
118      * Process delete operations on an HBase table.
119      */
120     @Override
121     public void processDelete(URI id, String tabName,
122             String famName, String quaName) {
123 	logger.debug("NATIVE Begin processDelete(" + id + ", " + tabName + ")");
124         HTable hTable = getTable(tabName);
125         try {
126             Delete op = createDelete(id, tabName);
127             hTable.delete(op);
128         } catch (IOException e) {
129             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
130             logger.error(e.getMessage());
131         }
132     }
133 
134     /**
135      * Gets a Record based on information passed.
136      * @param tableName to do the get.
137      * @param id of the Record needed
138      * @throws IOException
139      */
140     @Override
141     public Record get(String tableName, URI id) throws IOException {
142 	logger.debug("NATIVE Begin of get(" + tableName + ", " + id + ")");
143         HTable selTable = getTable(tableName);
144         Record resGotten = null;
145         if (selTable != null) {
146            // Resource's Key
147            Get get = new Get(Bytes.toBytes(id.toString()));
148            Result rs = selTable.get(get);
149            logger.debug("Value obtained: " + new String(rs.value()));
150            final AvroSerializer serializer = getSerializer();
151            resGotten = (Record) serializer.fromBytes(rs.value());
152         }
153         return resGotten;
154     }
155  
156     @Override
157     public List<Record> get(String tableName,
158             List<URI> ids) throws IOException {
159 	logger.debug("NATIVE Begin of get(" + tableName + ", " + ids + ")");
160         HTable selTable = getTable(tableName);
161         List<Record> resGotten = new ArrayList<Record> ();
162         List<Get> gets = new ArrayList<Get> ();
163         AvroSerializer serializer = getSerializer();
164 
165         for (URI id : ids) {
166             gets.add(new Get(Bytes.toBytes(id.toString())));
167         }
168         Result[] results = selTable.get(gets);
169 
170         //TODO check if this is ok
171         for (Result res : results) {
172             final byte[] bytes = res.value();
173             if (bytes != null) {
174                 resGotten.add((Record) serializer.fromBytes(bytes));
175             }
176         }
177         return resGotten;
178     }
179  
180     /**
181      * Creates puts for HBase
182      * @param record
183      * @throws IOException
184      */
185     @Override
186     public Put createPut(Record record, String tableName,
187             String famName, String quaName) throws IOException {
188         HTable hTable = getTable(tableName);
189         Put put = null;
190         if (hTable != null) {
191             // Transforming data model record into an Avro record
192             AvroSerializer serializer = getSerializer();
193             final byte[] bytes = serializer.toBytes(record);
194             // Resource's Key
195             put = new Put(Bytes.toBytes(record.getID().toString()));
196             // Resource's Value
197             put.add(Bytes.toBytes(famName), Bytes.toBytes(quaName), bytes);
198         }
199         return put;
200     }
201 
202     /**
203      * Creates deletes for HBase tables
204      * @param record
205      * @param tableName
206      * @param conf
207      * @return
208      * @throws IOException
209      */
210     @Override
211     public Delete createDelete(URI id, String tableName) throws IOException {
212         Delete del = null;
213         HTable hTable = getTable(tableName);
214         if (hTable != null) {
215             del = new Delete(Bytes.toBytes(id.toString()));
216         }
217         return del;
218     }
219 
220     /**
221      * Checking for errors after operations have been processed.
222      * @param objs
223      * @return
224      */
225     @Override
226     public List<Object> checkForErrors(Object[] objs) {
227         List<Object> errors = new ArrayList<Object>();
228         if (objs != null) {
229             for (int cont = 0; cont < objs.length; cont ++) {
230                 if (objs[cont] == null) {
231                     logger.debug("A operation could not be performed.");
232                     errors.add(objs[cont]);
233                 }
234             }
235         }
236         return errors;
237     }
238 
239     /**
240      * Gets a number of record of tableName matching condition
241      * @param tableName the table name
242      * @param familyName the family
243      * @param condition to match
244      * @throws IOException
245      */
246     @Override
247     public long count(String tableName, String familyName, XPath condition) throws IOException {
248 	logger.debug("NATIVE Begin count");
249 	// clone the current conf
250 	org.apache.hadoop.conf.Configuration customConf = new org.apache.hadoop.conf.Configuration(super.getHbcfg());
251 	// Increase RPC timeout, in case of a slow computation
252         customConf.setLong("hbase.rpc.timeout", 600000);
253         // Default is 1, set to a higher value for faster scanner.next(..)
254         customConf.setLong("hbase.client.scanner.caching", 1000);
255 
256 	/*
257         System.out.println("HBaseUtils begin of |customConf|");
258 	Configuration.dumpConfiguration(customConf, new PrintWriter(System.out));
259 	System.out.println("\nHBaseUtils end of |customConf|");
260 	*/
261 
262         AggregationClient agClient = new AggregationClient(customConf);
263 	long rowCount = 0;
264 	byte[] tName = Bytes.toBytes(tableName);
265 	try {
266 	    Scan scan = getScan(tableName, familyName);
267 	    rowCount = agClient.rowCount(tName, null, scan);
268 	} catch (Throwable e) {
269             throw new IOException(e.toString());
270 	}
271         return rowCount;
272     }
273 
274     @Override
275     public void begin() {
276     }
277 }