1   package eu.fbk.knowledgestore.datastore.hbase.utils;
2   
3   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_REGION_MEMSTORE_FLUSH_SIZE;
4   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_REGION_NRESERVATION_BLOCKS;
5   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_REGION_MEMSTORE_FLUSH_SIZE_OPT;
6   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_REGION_NRESERVATION_BLOCKS_OPT;
7   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TSO_DEFAULT_HOST_OPT;
8   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TSO_DEFAULT_PORT_OPT;
9   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TSO_HOST;
10  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TSO_PORT;
11  
12  import java.io.IOException;
13  import java.util.ArrayList;
14  import java.util.HashMap;
15  import java.util.List;
16  import java.util.Map;
17  import java.util.Properties;
18  
19  import javax.annotation.Nullable;
20  
21  import com.yahoo.omid.transaction.RollbackException;
22  import com.yahoo.omid.transaction.TTable;
23  import com.yahoo.omid.transaction.Transaction;
24  import com.yahoo.omid.transaction.TransactionException;
25  import com.yahoo.omid.transaction.TransactionManager;
26  
27  import org.apache.hadoop.hbase.client.Delete;
28  import org.apache.hadoop.hbase.client.Get;
29  import org.apache.hadoop.hbase.client.Put;
30  import org.apache.hadoop.hbase.client.Result;
31  import org.apache.hadoop.hbase.client.ResultScanner;
32  import org.apache.hadoop.hbase.client.Scan;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.openrdf.model.URI;
35  
36  import eu.fbk.knowledgestore.data.Record;
37  import eu.fbk.knowledgestore.data.Stream;
38  import eu.fbk.knowledgestore.data.XPath;
39  import eu.fbk.knowledgestore.datastore.hbase.HBaseScanIterator;
40  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
41  
42  /**
43   * Implements HBase operations using Yahoo!'s Omid.
44   */
45  public class OmidHBaseUtils extends AbstractHBaseUtils {
46  
47      /** Transaction Manager used inside Yahoo!'s OMID. */
48      private static TransactionManager tranManager;
49  
50      /** Transaction to be started. */
51      private Transaction t1;
52  
53      /** The map tableName -> table handle */
54      private static Map<String, TTable> tableNameHandleMap = new HashMap<String, TTable>();
55  
56      /**
57       * Constructor.
58       * 
59       * @param properties
60       *            the configuration properties
61       */
62      public OmidHBaseUtils(final Properties properties) {
63          // setting basic configuration inside parent class.
64          super(properties);
65  
66          getHbcfg().setInt(
67                  HBASE_REGION_MEMSTORE_FLUSH_SIZE,
68                  Integer.parseInt(properties.getProperty(HBASE_REGION_MEMSTORE_FLUSH_SIZE, ""
69                          + OMID_REGION_MEMSTORE_FLUSH_SIZE_OPT)));
70  
71          getHbcfg().setInt(
72                  HBASE_REGION_NRESERVATION_BLOCKS,
73                  Integer.parseInt(properties.getProperty(HBASE_REGION_NRESERVATION_BLOCKS, ""
74                          + OMID_REGION_NRESERVATION_BLOCKS_OPT)));
75  
76          getHbcfg().set(OMID_TSO_HOST,
77                  properties.getProperty(OMID_TSO_HOST, OMID_TSO_DEFAULT_HOST_OPT));
78  
79          getHbcfg().setInt(
80                  OMID_TSO_PORT,
81                  Integer.parseInt(properties.getProperty(OMID_TSO_PORT, ""
82                          + OMID_TSO_DEFAULT_PORT_OPT)));
83  
84          // Creating transaction manager
85          try {
86              tranManager = new TransactionManager(this.getHbcfg());
87          } catch (IOException e) {
88              logger.error("Error trying to create a TransactionManager of OMID.");
89              logger.error(e.getMessage());
90          }
91      }
92  
93      /**
94       * Commits work done.
95       */
96      @Override
97      public void commit() throws DataCorruptedException, IOException, IllegalStateException{
98          try {
99              tranManager.commit(t1);
100         } catch (RollbackException e) {
101             rollback();
102             throw new IOException("Error trying to commit transaction.", e);
103         } catch (TransactionException e) {
104             rollback();
105             throw new IOException("Error trying to commit transaction.", e);
106         }
107     }
108 
109     /**
110      * Rollbacks work done.
111      */
112     @Override
113     public void rollback() throws DataCorruptedException, IOException, IllegalStateException{
114         try {
115             tranManager.rollback(t1);
116         } catch (Exception e) {
117             throw new DataCorruptedException("Error trying to rollback a Transaction.", e);
118         }
119     }
120 
121     /**
122      * Gets a handle of a specific table.
123      * @param tableName of the table to be accessed.
124      * @return HTable of the table found.
125      */
126     @Override
127     public Object getTable(String tableName) {
128 	logger.debug("OMID Begin of getTable for " + tableName);
129         TTable table = tableNameHandleMap.get(tableName);
130         if (table != null) {
131             logger.debug("OMIDE Found a cached handle for table " + tableName);
132             return table;
133         }
134         try {
135             table = new TTable(this.getHbcfg(), tableName);
136         } catch (IOException e) {
137             logger.error("OMID Error trying to get a TransactionTable of OMID.");
138             logger.error(e.getMessage());
139         }
140         tableNameHandleMap.put(tableName, table);
141         logger.debug("OMID Cached a handle of table: " + tableName);
142         return table;
143     }
144 
145     @Override
146     public void processPut(Record record, String tabName,
147             String famName, String quaName) {
148 	logger.debug("OMID Begin processPut(" + record + ", " + tabName + ")");
149         TTable tTable = (TTable) getTable(tabName);
150         Put op = null;
151         try {
152             op = createPut(record, tabName, famName, quaName);
153             tTable.put(t1, op);
154             //TODO rollback is there is an exception
155         } catch (IllegalArgumentException e) {
156             //TODO propagate exceptions
157             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
158             logger.error(e.getMessage());
159         } catch (IOException e) {
160             //TODO propagate exceptions
161             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
162             logger.error(e.getMessage());
163         }
164     }
165 
166     @Override
167     public void begin() {
168         try {
169             t1 = tranManager.begin();
170         } catch (TransactionException e) {
171             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
172             logger.error(e.getMessage());
173         }
174     }
175 
176     @Override
177     public void processDelete(URI id, String tabName,
178             String famName, String quaName) {
179 	logger.debug("OMID Begin processDelete(" + id + ", " + tabName + ")");
180         TTable tTable = (TTable) getTable(tabName);
181         Delete op = null;
182         try {
183             op = createDelete(id, tabName);
184             tTable.delete(t1, op);
185         } catch (IllegalArgumentException e) {
186             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
187             logger.error(e.getMessage());
188         } catch (IOException e) {
189             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
190             logger.error(e.getMessage());
191         }
192     }
193 
194     /**
195      * Gets a Record based on information passed.
196      * @param tableName to do the get.
197      * @param id of the Record needed
198      * @throws IOException
199      */
200     @Override
201     public Record get(String tableName, URI id) throws IOException {
202 	logger.debug("OMID Begin of get(" + tableName + ", " + id + ")");
203         TTable tTable = (TTable) getTable(tableName);
204         Record resGotten = null;
205         if (tTable != null) {
206 	    // Resource's Key
207 	    Get get = new Get(Bytes.toBytes(id.toString())).setMaxVersions(1);
208 	    Result rs = tTable.get(t1, get);
209 	    logger.debug("Value obtained: " + new String(rs.value()));
210 	    final AvroSerializer serializer = getSerializer();
211 	    resGotten = (Record) serializer.fromBytes(rs.value());
212         }
213         return resGotten;
214     }
215 
216     @Override
217     public List<Record> get(String tableName,
218 			    List<URI> ids) throws IOException {
219 	logger.debug("OMID Begin of get(" + tableName + ", " + ids + ")");
220         TTable tTable = (TTable)getTable(tableName);
221 	List<Record> resGotten = new ArrayList<Record> ();
222 	List<Get> gets = new ArrayList<Get> ();
223 	AvroSerializer serializer = getSerializer();
224 
225 	for (URI id : ids) {
226 	    gets.add(new Get(Bytes.toBytes(id.toString())));
227 	}
228 	// OMID does support the usage of a list of gets
229 	Result[] results = tTable.get(t1, gets);
230 
231 	for (Result res : results) {
232 	    final byte[] bytes = res.value();
233 	    if (bytes != null) {
234 		resGotten.add((Record) serializer.fromBytes(bytes));
235 	    }
236 	}
237 	return resGotten;
238     }
239 
240     @Override
241     public List<Object> checkForErrors(Object[] objs) {
242         return new ArrayList<Object>();
243     }
244 
245     /**
246      * Gets a number of record of tableName matching condition
247      * 
248      * @param tableName
249      *            to scan
250      * @param condition
251      *            to match
252      * @param scan
253      *            to scan
254      * @throws IOException
255      */
256     @Override
257     public long count(final String tableName, final String familyName,
258             @Nullable final XPath condition) throws IOException {
259         logger.debug("OMID Begin count");
260         // VERY INEFFICIENT: to be improved!
261         final Stream<Record> cur = Stream.create(new HBaseScanIterator(this, tableName,
262                 familyName, condition, null, getServerFilterFlag()));
263         try {
264             return cur.count();
265         } finally {
266             cur.close();
267         }
268     }
269 
270     /**
271      * Gets a scanner for a specific table
272      * @param tableName to get the scanner from
273      * @param scan for the specific table
274      * @param conf object to get a hold of an HBase table
275      */
276     @Override
277     public ResultScanner getScanner(String tableName, Scan scan) {
278 	logger.debug("OMID Begin of getScanner(" + tableName + ", " + scan + ")");
279         TTable tTable = (TTable)getTable(tableName);
280         ResultScanner resScanner = null;
281         try {
282             resScanner = tTable.getScanner(t1, scan);
283         } catch (IOException e) {
284             logger.error("Error while trying to obtain a ResultScanner: " + tableName);
285             logger.error(e.getMessage());
286         }
287         return resScanner;
288     }
289 }