1   package eu.fbk.knowledgestore.datastore.hbase;
2   
3   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_FAM_NAME;
4   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_QUA_NAME;
5   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_TAB_NAME;
6   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_FAM_NAME;
7   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_QUA_NAME;
8   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_TAB_NAME;
9   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_FAM_NAME;
10  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_QUA_NAME;
11  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_TAB_NAME;
12  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_FAM_NAME;
13  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_QUA_NAME;
14  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_TAB_NAME;
15  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_FAM_NAME;
16  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_QUA_NAME;
17  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_TAB_NAME;
18  
19  import java.io.IOException;
20  import java.util.Map;
21  import java.util.Set;
22  
23  import javax.annotation.Nullable;
24  
25  import org.openrdf.model.URI;
26  import org.openrdf.model.vocabulary.RDF;
27  import org.slf4j.Logger;
28  import org.slf4j.LoggerFactory;
29  
30  import eu.fbk.knowledgestore.data.Data;
31  import eu.fbk.knowledgestore.data.Record;
32  import eu.fbk.knowledgestore.data.Stream;
33  import eu.fbk.knowledgestore.data.XPath;
34  import eu.fbk.knowledgestore.datastore.DataStore;
35  import eu.fbk.knowledgestore.datastore.DataTransaction;
36  import eu.fbk.knowledgestore.datastore.hbase.utils.AbstractHBaseUtils;
37  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
38  import eu.fbk.knowledgestore.vocabulary.KS;
39  // import java.io.PrintWriter;
40  
41  /**
42   * Class HBaseDataTransaction to perform operations on top of
43   * HBase, using specific operations to create transactions.
44   */
45  public class HBaseDataTransaction implements DataTransaction {
46  
47      /** Logger object used inside HBaseFileStore. */
48      private static Logger logger = LoggerFactory.getLogger(HBaseDataTransaction.class);
49  
50      /** Represents the transactional layer to be used. */
51      private AbstractHBaseUtils hbaseUtils;
52  
53      /**
54       * Constructor.
55       * @param pHbaseUtils represents the transactional layer to be used.
56       */
57      HBaseDataTransaction (AbstractHBaseUtils pHbaseUtils) {
58          this.setHbaseUtils(pHbaseUtils);
59          this.getHbaseUtils().begin();
60      }
61      
62      @Nullable
63      static URI getRecordType(Record record) {
64          for (URI type : record.get(RDF.TYPE, URI.class)) {
65              if (DataStore.SUPPORTED_TYPES.contains(type)) {
66                  return type;
67              }
68          }
69          return null;
70      }
71  
72      @Override
73      public Stream<Record> lookup(final URI type, final Set<? extends URI> ids,
74              final @Nullable Set<? extends URI> properties) throws DataCorruptedException,
75              IOException, IllegalArgumentException, IllegalStateException {
76          // Record.type, set of ids, a set of properties to be obtained
77          // for the objects retrieved select all needed properties
78          // create cursor on top of that list
79          HBaseIterator iterator = null;
80          if (KS.RESOURCE.equals(type)) {
81              iterator = new HBaseIterator(hbaseUtils, hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_RES_TAB_NAME, 
82  					 ids, properties);
83          } else if (KS.MENTION.equals(type)) {
84              iterator = new HBaseIterator(hbaseUtils, hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_MEN_TAB_NAME,
85  					 ids, properties);
86          } else if (KS.ENTITY.equals(type)) {
87              iterator = new HBaseIterator(hbaseUtils, hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_ENT_TAB_NAME,
88  					 ids, properties);
89          } else if (KS.CONTEXT.equals(type)) {
90              iterator = new HBaseIterator(hbaseUtils, hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_CON_TAB_NAME,
91  					 ids, properties);
92          } else if (KS.USER.equals(type)) {
93              iterator = new HBaseIterator(hbaseUtils, hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_USR_TAB_NAME,
94  					 ids, properties);
95          } else {
96              throw new IllegalArgumentException("Unsupported record type "
97                      + Data.toString(type, Data.getNamespaceMap()));
98          }
99          return Stream.create(iterator);
100     }
101 
102     @Override
103     public Stream<Record> retrieve(final URI type, @Nullable final XPath condition,
104             @Nullable final Set<? extends URI> properties) throws DataCorruptedException,
105             IOException, IllegalArgumentException, IllegalStateException {
106 
107         String tableName;
108         String familyName;
109 
110         if (KS.RESOURCE.equals(type)) {
111             tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_RES_TAB_NAME;
112             familyName = DEFAULT_RES_FAM_NAME;
113         } else if (KS.MENTION.equals(type)) {
114             tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_MEN_TAB_NAME;
115             familyName = DEFAULT_MEN_FAM_NAME;
116         } else if (KS.ENTITY.equals(type)) {
117             tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_ENT_TAB_NAME;
118             familyName = DEFAULT_ENT_FAM_NAME;
119         } else if (KS.CONTEXT.equals(type)) {
120             tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_CON_TAB_NAME;
121             familyName = DEFAULT_CON_FAM_NAME;
122         } else if (KS.USER.equals(type)) {
123             tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_USR_TAB_NAME;
124             familyName = DEFAULT_USR_FAM_NAME;
125         } else {
126             throw new IllegalArgumentException("Unsupported record type "
127                     + Data.toString(type, Data.getNamespaceMap()));
128         }
129 
130         return Stream.create(new HBaseScanIterator(hbaseUtils, tableName, familyName,
131                 condition, properties, hbaseUtils.getServerFilterFlag()));
132     }
133 
134     @Override
135     public long count(URI type, XPath condition)
136             throws DataCorruptedException, IOException,
137 		   IllegalArgumentException, IllegalStateException {
138 
139         String tableName = null;
140         String familyName = null;
141 
142         if (KS.RESOURCE.equals(type)) {
143             tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_RES_TAB_NAME;
144             familyName = DEFAULT_RES_FAM_NAME;
145         } else if (KS.MENTION.equals(type)) {
146             tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_MEN_TAB_NAME;
147             familyName = DEFAULT_MEN_FAM_NAME;
148         } else if (KS.ENTITY.equals(type)) {
149             tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_ENT_TAB_NAME;
150             familyName = DEFAULT_ENT_FAM_NAME;
151         } else if (KS.CONTEXT.equals(type)) {
152             tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_CON_TAB_NAME;
153             familyName = DEFAULT_CON_FAM_NAME;
154         } else if (KS.USER.equals(type)) {
155             tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_USR_TAB_NAME;
156             familyName = DEFAULT_USR_FAM_NAME;
157         } else {
158             throw new IllegalArgumentException("Unsupported record type "
159                     + Data.toString(type, Data.getNamespaceMap()));
160         }
161 
162         return getHbaseUtils().count(tableName, familyName, condition);
163     }
164 
165     @Override
166     public Stream<Record> match(final Map<URI, XPath> conditions, final Map<URI, Set<URI>> ids,
167             final Map<URI, Set<URI>> properties) throws IOException, IllegalStateException {
168         throw new UnsupportedOperationException();
169     }
170 
171     /**
172      * {@inheritDoc} Puts a record inside the HBase store.
173      */
174     @Override
175     public void store(final URI type, final Record record) {
176         if (KS.RESOURCE.equals(type)) {
177             getHbaseUtils().processPut(record,
178                     hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_RES_TAB_NAME,
179                     DEFAULT_RES_FAM_NAME,
180                     DEFAULT_RES_QUA_NAME);
181         } else if (KS.MENTION.equals(type)) {
182             getHbaseUtils().processPut(record,
183                     hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_MEN_TAB_NAME,
184                     DEFAULT_MEN_FAM_NAME,
185                     DEFAULT_MEN_QUA_NAME);
186         } else if (KS.ENTITY.equals(type)) {
187             getHbaseUtils().processPut(record,
188                     hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_ENT_TAB_NAME,
189                     DEFAULT_ENT_FAM_NAME,
190                     DEFAULT_ENT_QUA_NAME);
191         } else if (KS.CONTEXT.equals(type)) {
192             getHbaseUtils().processPut(record,
193                     hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_CON_TAB_NAME,
194                     DEFAULT_CON_FAM_NAME,
195                     DEFAULT_CON_QUA_NAME);
196         } else if (KS.USER.equals(type)) {
197             getHbaseUtils().processPut(record,
198                     hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_USR_TAB_NAME,
199                     DEFAULT_USR_FAM_NAME,
200                     DEFAULT_USR_QUA_NAME);
201         } else {
202             throw new IllegalArgumentException("Unsupported record:\n"
203                     + record.toString(Data.getNamespaceMap(), true));
204         }
205     }
206 
207     /**
208      * {@inheritDoc}
209      */
210     @Override
211     public void delete(final URI type, final URI id)  {
212         if (KS.RESOURCE.equals(type)) {
213             getHbaseUtils().processDelete(id,
214                     hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_RES_TAB_NAME,
215                     DEFAULT_RES_FAM_NAME,
216                     DEFAULT_RES_QUA_NAME);
217         } else if (KS.MENTION.equals(type)) {
218             getHbaseUtils().processDelete(id,
219                     hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_MEN_TAB_NAME,
220                     DEFAULT_MEN_FAM_NAME,
221                     DEFAULT_MEN_QUA_NAME);
222         } else if (KS.ENTITY.equals(type)) {
223             getHbaseUtils().processDelete(id,
224                     hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_ENT_TAB_NAME,
225                     DEFAULT_ENT_FAM_NAME,
226                     DEFAULT_ENT_QUA_NAME);
227         } else if (KS.CONTEXT.equals(type)) {
228             getHbaseUtils().processDelete(id,
229                     hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_CON_TAB_NAME,
230                     DEFAULT_CON_FAM_NAME,
231                     DEFAULT_CON_QUA_NAME);
232         } else if (KS.USER.equals(type)) {
233             getHbaseUtils().processDelete(id,
234                     hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_USR_TAB_NAME,
235                     DEFAULT_USR_FAM_NAME,
236                     DEFAULT_USR_QUA_NAME);
237         } else {
238             throw new IllegalArgumentException("Unsupported record type:\n" + type);
239         }
240     }
241 
242     @Override
243     public void end(boolean commit) throws DataCorruptedException, IOException,
244             IllegalStateException {
245         if (commit)
246             getHbaseUtils().commit();
247         else
248             getHbaseUtils().rollback();
249     }
250 
251     /**
252      * @return the logger
253      */
254     public static Logger getLogger() {
255         return logger;
256     }
257 
258     /**
259      * @param logger the logger to set
260      */
261     public static void setLogger(Logger logger) {
262         HBaseDataTransaction.logger = logger;
263     }
264 
265     /**
266      * @return the hbaseUtils
267      */
268     public AbstractHBaseUtils getHbaseUtils() {
269         return hbaseUtils;
270     }
271 
272     /**
273      * @param hbaseUtils the hbaseUtils to set
274      */
275     public void setHbaseUtils(AbstractHBaseUtils hbaseUtils) {
276         this.hbaseUtils = hbaseUtils;
277     }
278 }