1   
2   package eu.fbk.knowledgestore.elastic;
3   
4   import com.google.common.collect.Iterables;
5   import com.google.common.collect.Range;
6   import eu.fbk.knowledgestore.data.Record;
7   import eu.fbk.knowledgestore.data.Stream;
8   import eu.fbk.knowledgestore.data.XPath;
9   import eu.fbk.knowledgestore.datastore.DataTransaction;
10  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
11  import eu.fbk.knowledgestore.vocabulary.KS;
12  import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
13  import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
14  import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
15  import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
16  import org.elasticsearch.action.bulk.BulkItemResponse;
17  import org.elasticsearch.action.bulk.BulkProcessor;
18  import org.elasticsearch.action.bulk.BulkRequest;
19  import org.elasticsearch.action.bulk.BulkResponse;
20  import org.elasticsearch.action.count.CountRequestBuilder;
21  import org.elasticsearch.action.count.CountResponse;
22  import org.elasticsearch.action.delete.DeleteRequest;
23  import org.elasticsearch.action.get.*;
24  import org.elasticsearch.action.get.MultiGetRequest.Item;
25  import org.elasticsearch.action.get.MultiGetResponse.Failure;
26  import org.elasticsearch.action.index.IndexRequest;
27  import org.elasticsearch.action.search.SearchRequestBuilder;
28  import org.elasticsearch.action.search.SearchResponse;
29  import org.elasticsearch.action.update.UpdateRequest;
30  import org.elasticsearch.client.Client;
31  import org.elasticsearch.common.unit.TimeValue;
32  import org.elasticsearch.common.xcontent.XContentBuilder;
33  import org.elasticsearch.index.query.FilterBuilder;
34  import org.elasticsearch.index.query.FilterBuilders;
35  import org.elasticsearch.index.query.QueryBuilder;
36  import org.elasticsearch.index.query.QueryBuilders;
37  import org.elasticsearch.indices.IndexMissingException;
38  import org.openrdf.model.URI;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  import javax.annotation.Nullable;
43  import java.io.IOException;
44  import java.util.*;
45  import java.util.concurrent.TimeUnit;
46  import java.util.concurrent.atomic.AtomicReference;
47  
48  /**
49   * implements DataTransaction interface using ElasticSearch API.
50   * does not implement transaction.
51   */
52  public final class DataTransactionElastic implements DataTransaction{
53      private static final Logger LOGGER = LoggerFactory.getLogger(DataTransactionElastic.class);
54      private final Client client; //client of the cluster
55      
56      private BulkProcessor bulk; //for bulk operations
57      
58      private final ElasticConfigurations configs; //configuration parameters.
59      private final MappingHandler mapper; //mapping of the types.
60      private final URIHandler uriHandler; //for URI compression.
61      private final AtomicReference<RuntimeException> bulkException;
62      
63      /**
64       * @param configs the configurations
65       * @param client ElasticSearch client where to send queries ecc.
66       * @param mapper mapping of the types.
67       */
68      public DataTransactionElastic(ElasticConfigurations configs, Client client, MappingHandler mapper, URIHandler uriHandler) {
69          this.configs = configs;
70          this.client = client;
71          this.mapper = mapper;
72          this.uriHandler = uriHandler;
73          bulkException = new AtomicReference<>();
74          bulk = null;
75      }
76      
77      /**
78       * maps the URI type to a String, that can be used as a type for the ES index.
79       * @param type
80       * @return
81       */
82      private String getTypeAsString(URI type){
83          if(type.equals(KS.MENTION)) return "mention";
84          if(type.equals(KS.RESOURCE)) return "resource";
85          throw new IllegalArgumentException("unknow type: " + type.toString());
86      }
87      
88      
89      /**
90       * initialization of the bulk.
91       * Uses the default values only if bulkSize, flushInterval, concurrentRequest are never been set.
92       * Otherwise uses the values set.
93       */
94      private void initBulk(){
95          LOGGER.trace("starting a new bulk");
96          BulkProcessor.Builder builder = BulkProcessor.builder(client, new BulkProcessor.Listener(){
97              @Override
98              public void beforeBulk(long l, BulkRequest br) {
99                  client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
100                 LOGGER.trace("starting bulk operation with {} request", br.numberOfActions());
101             }
102             
103             @Override
104             public void afterBulk(long l, BulkRequest br, BulkResponse br1) {
105                 LOGGER.trace("finished bulk operation with {} request", br.numberOfActions());
106                 if(br1.hasFailures()){
107                     String log = "errors in bulk execution:";
108                     for(BulkItemResponse item : br1){
109                         if(item.isFailed())
110                             log += "\n\t id: " + item.getId() + " ; message: " + item.getFailureMessage();
111                     }
112                     LOGGER.error(log);
113                     //bulkException.set(new RuntimeException("failure in response -:" + br1.buildFailureMessage()));
114                 }
115             }
116             
117             @Override
118             public void afterBulk(long l, BulkRequest br, Throwable thrwbl){
119                 LOGGER.trace("finished bulk operation with {} request and errors", br.numberOfActions());
120                 bulkException.set(new RuntimeException("Caught exception in bulk: " + br + ", failure: " + thrwbl, thrwbl));
121             }
122         });
123         
124         if(configs.getBulkSize() != null) builder.setBulkSize(configs.getBulkSize());
125         //  if(configs.getFlushInterval() != null) builder.setFlushInterval(configs.getFlushInterval());
126         if(configs.getConcurrentRequests() != -1) builder.setConcurrentRequests(configs.getConcurrentRequests());
127         
128         bulk = builder.build();
129     }
130     
131     private void checkNotFailed(){
132         RuntimeException exception = bulkException.get();
133         if(exception != null){
134             if(!(exception instanceof IllegalStateException))
135                 bulkException.set(new IllegalStateException("previous bulk operation failed"));
136             throw exception;
137         }
138     }
139     
140     /**
141      *
142      * @param timeout max time that the bulk execution can take.
143      * @param unit TimeUnit of timeout
144      * @throws InterruptedException
145      */
146     private void flushBulk(TimeValue time) throws IllegalStateException{
147         if(bulk != null){
148             LOGGER.debug("flushing bulk");
149             try{
150                 bulk.flush();
151                 if(!bulk.awaitClose(time.getMillis(), TimeUnit.MILLISECONDS))
152                     throw new RuntimeException("bulk request did not completed in " + time.getMillis() + TimeUnit.MILLISECONDS);
153                 
154                 bulk = null; //for signal that there is no bulk running.
155 //refresh of the index for make the documets available for search.
156                 RefreshResponse fr = client.admin().indices().refresh(new RefreshRequest(configs.getIndexName())).actionGet();
157                 LOGGER.debug(String.format("Flush: %s failed,  %s successful, %s total",fr.getFailedShards(),fr.getSuccessfulShards(),fr.getTotalShards()));
158             }catch(InterruptedException ex){
159                 throw new IllegalStateException("bulk execution interrupted", ex);
160             }
161         }
162         checkNotFailed();
163     }
164     
165     /**
166      * wait until elastic search comes in yellow status.
167      */
168     private void waitYellowStatus(){
169         client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
170     }
171     
172     /**
173      * search for the specified ids in the database
174      * @param type mention or resource
175      * @param ids identifiers of the documents to search.
176      * @param properties properties to keep. (null for all)
177      * @return the documents found.
178      * @throws IOException
179      * @throws IllegalArgumentException
180      * @throws IllegalStateException
181      */
182     @Override
183     public Stream<Record> lookup(URI type, Set<? extends URI> ids,  @Nullable Set<? extends URI> properties) throws IOException, IllegalArgumentException, IllegalStateException {
184         LOGGER.debug("lookup, type: " + type + " ; ids: " + ids + " ; properties: " + properties);
185         checkNotFailed();
186         if(ids.isEmpty()){
187             return Stream.create();
188         }
189 //flush the operation buffered in the bulk.
190         this.flushBulk(configs.getBulkTime());
191         
192         MultiGetRequestBuilder request = client.prepareMultiGet();
193         //set the type name where to search the data.
194         String typeName = getTypeAsString(type);
195         
196         for(URI id : ids){ //the projection is done after the get.
197             Item item = new MultiGetRequest.Item(configs.getIndexName(), typeName, uriHandler.encode(id));
198             request.add(item);
199         }
200         LOGGER.debug("multiGetRequest: "+ request);
201         this.waitYellowStatus();
202         
203 //convert from MultiGetItemResponse[] to Stream
204         MultiGetResponse response = request.execute().actionGet();
205         ArrayList<Record> records = new ArrayList<>();
206         //add records to results.
207         for(MultiGetItemResponse item : response) {
208             if(item.isFailed()){
209                 Failure failure = item.getFailure();
210                 LOGGER.error("failed MultiGet: id: " + failure.getId() + " message: " + failure.getMessage());
211             }else{
212                 GetResponse tmp = item.getResponse();
213                 Record recToAdd =  Utility.deserialize(tmp, mapper, uriHandler);
214                 if(recToAdd != null){
215                     if(properties != null)
216                         recToAdd = recToAdd.retain(Iterables.toArray(properties, URI.class));
217                     records.add(recToAdd);
218                 }
219             }
220         }
221         return Stream.create(records);
222     }
223     
224     
225     /**
226      * @param condition condition to use for filter the Records.
227      * @param partialAccept is a partial filter accepted.
228      * @return the filter. null if has not managed to build a filter.
229      */
230     private FilterBuilder buildFilter(XPath condition, boolean partialAccept) throws IOException{
231         LOGGER.debug("buildFilter condition: " + condition + " ; partialAccept: " + partialAccept);
232         Map<URI, Set<Object>> map = new HashMap<>();
233         XPath remain = condition.decompose(map); //splits the xPath
234         
235         if((!partialAccept && remain != null) || map.isEmpty()){
236             LOGGER.debug("entrySet number: " + map.size());
237             return null;
238         }
239         List<FilterBuilder> filters = new ArrayList<>();
240         
241         for(Map.Entry el : map.entrySet()){
242             LOGGER.debug("analyze key: " + el.getKey());
243             String elKey = null;
244             if(el.getKey() instanceof URI)
245                 elKey = uriHandler.encode((URI)el.getKey());
246             else{
247                 //something wrog, it's not a Value
248                 throw new IllegalArgumentException("found a key in the xPath map that is not a URI");
249             }
250             
251             if(mapper.getValueClass(elKey).equals(byte[].class)){ //byte[] are not pre-filtered. Only post-filter
252                 LOGGER.debug("byte[], ignore");
253                 if(!partialAccept) return null;
254                 //else ignore.
255             }else{
256                 LOGGER.debug("analizing key: " + elKey);
257                 ArrayList<FilterBuilder> tmpFilters = new ArrayList<>();
258                 LOGGER.debug("number of orFilters: " + ((Set<Object>)el.getValue()).size());
259                 for(Object item : (Set<Object>)el.getValue()){
260                     FilterBuilder tmpFilter = null;
261                     LOGGER.debug("analizing value: " + item.toString() + " ; class: " + item.getClass());
262                     if(item instanceof Range){
263                         tmpFilter = Utility.buildRangeFilter(elKey, (Range)item, mapper, uriHandler);
264                     }else{
265                         LOGGER.debug("term filter: " + item.getClass());
266                         tmpFilter = Utility.buildTermFilter(elKey, item, mapper, uriHandler);
267                     }
268                     if(tmpFilter != null){ //if the creation of the filter failed.
269                         tmpFilters.add(tmpFilter);
270                     }else{
271                         LOGGER.debug("creation of the filter on {} failed", condition);
272                         if(!partialAccept) return null;
273                     }
274                 }
275                 filters.add(FilterBuilders.orFilter(Iterables.toArray(tmpFilters, FilterBuilder.class))); //if a document passes 1 of the tmpFilter I keep that
276             }
277         }
278         FilterBuilder mainFilter = null;
279         if(!filters.isEmpty())
280             mainFilter = FilterBuilders.andFilter(Iterables.toArray(filters, FilterBuilder.class)); //a documet has to pass all the filters.
281         
282         return mainFilter;
283     }
284     
285     /**
286      * returns all the documents in the database that match the conditions
287      * @param type mention or resource
288      * @param condition condition to match
289      * @param properties properties of the documents to keep
290      * @return the documents found.
291      * @throws IOException
292      * @throws IllegalArgumentException
293      * @throws IllegalStateException
294      */
295     @Override
296     public Stream<Record> retrieve(URI type, @Nullable XPath condition, @Nullable Set<? extends URI> properties) throws IOException, IllegalArgumentException, IllegalStateException {
297         checkNotFailed();
298         LOGGER.debug("retrieve type: " + type + " ; condition: " + condition + " ; properties: " + properties);
299         
300         this.flushBulk(configs.getBulkTime());
301         
302         String typeName = getTypeAsString(type);
303         
304         SearchRequestBuilder responseBuilder;
305         QueryBuilder mainQuery;
306         //if there are no conditions I take all the documents.
307         if(condition == null){ //matchAll query.
308             LOGGER.debug("match all query");
309             mainQuery = QueryBuilders.constantScoreQuery(FilterBuilders.matchAllFilter());
310             
311         }else{ //if there are some conditions, take only the documents that satisfy all of them. pre-checking
312             LOGGER.debug("try to build a filter of condition: " + condition);
313             FilterBuilder filter = buildFilter(condition, true);
314             if(filter == null){ //if the build of the filter fails, match all and post-filter.
315                 LOGGER.debug("prefilter failed");
316                 filter = FilterBuilders.matchAllFilter();
317             }
318             mainQuery = QueryBuilders.constantScoreQuery(filter);
319         }
320         
321         LOGGER.debug("Query: " + mainQuery);
322         responseBuilder = client.prepareSearch(configs.getIndexName()).setTypes(typeName)
323                 .setQuery(mainQuery).setScroll(configs.getTimeout());
324         
325         this.waitYellowStatus();
326         
327         SearchResponse response = responseBuilder.execute().actionGet();
328         Stream<Record> res = new SearchResponseStream(response, client, configs.getTimeout(), mapper, uriHandler);
329         
330         if(condition != null){
331             LOGGER.debug("post-filtering: " + condition);
332             res = res.filter(condition.asPredicate(), 0); //post-checking
333         }
334         
335         if(properties != null){ //projection
336             final URI[] propURIs = Iterables.toArray(properties, URI.class);
337             res = res.transform((Record r) -> {return r.retain(propURIs);}, 0);
338         }        
339         return res;
340     }
341     
342     /**
343      * the number of documents matching the conditions
344      * @param type mention or resource
345      * @param condition condition to match
346      * @return the number of documents matching the conditions
347      * @throws IOException
348      * @throws IllegalArgumentException
349      * @throws IllegalStateException
350      */
351     @Override
352     public long count(URI type, @Nullable XPath condition) throws IOException, IllegalArgumentException, IllegalStateException {
353         checkNotFailed();
354         LOGGER.debug("count type: " + type + " ; condition: " + condition);
355         
356 //flush the bulk.
357         this.flushBulk(configs.getBulkTime());
358         
359         
360         String typeName = getTypeAsString(type);
361         
362         CountRequestBuilder responseBuilder = null;
363         //if there are no conditions I take all the documents (from, size).
364         if(condition == null){ //matchAll query.
365             responseBuilder = client.prepareCount(configs.getIndexName()).setTypes(typeName)
366                     .setQuery(QueryBuilders.matchAllQuery());
367             
368         }else{ //if there are some conditions, take only the documents that satisfy all of them. pre-checking
369             FilterBuilder mainFilter = buildFilter(condition, false);
370             if(mainFilter != null){
371                 QueryBuilder mainQuery = QueryBuilders.constantScoreQuery(mainFilter);
372                 responseBuilder = client.prepareCount(configs.getIndexName()).setTypes(typeName)
373                         .setQuery(mainQuery);
374             }else{ //if we can't pre-filter all, do a retrive with 0 properties.
375                 this.waitYellowStatus();
376                 LOGGER.debug("can not prefilter all, have to do a matchall and postfilter");
377                 Stream<Record> stream = this.retrieve(type, condition, new HashSet<>());
378                 return stream.count();
379             }
380         }
381         this.waitYellowStatus();
382         CountResponse response = responseBuilder.execute().actionGet();
383         return response.getCount();
384     }
385     
386     @Override
387     public Stream<Record> match(Map<URI, XPath> conditions, Map<URI, Set<URI>> ids, Map<URI, Set<URI>> properties) throws IOException, IllegalStateException {
388         throw new UnsupportedOperationException("Not supported yet.");
389     }
390     
391     /**
392      * stores the specified record in the database.
393      * @param type mention or resource
394      * @param record record to store
395      * @throws IOException
396      * @throws IllegalStateException
397      */
398     @Override
399     public void store(URI type, Record record) throws IOException, IllegalStateException {
400         checkNotFailed();
401         if(bulk == null) //start a new bulk if it's not already started.
402             this.initBulk();
403         
404         String typeName = getTypeAsString(type);
405 
406         XContentBuilder source = Utility.serialize(record, uriHandler);
407         LOGGER.trace("storing\n\trecord: " + record.toString(null, true) + "\n\t-> serialized: " + source.string());
408         IndexRequest indexRequest = new IndexRequest(configs.getIndexName(), typeName, uriHandler.encode(record.getID())).source(source);
409         UpdateRequest updateRequest = new UpdateRequest(configs.getIndexName(), typeName,  uriHandler.encode(record.getID())).doc(source).upsert(indexRequest);
410         bulk.add(updateRequest);
411     }
412     
413     /**
414      * removes a document from the database given it's id
415      * @param type mention or resource
416      * @param id identifier of the document to delete
417      * @throws IOException
418      * @throws IllegalStateException
419      */
420     @Override
421     public void delete(URI type, URI id) throws IOException, IllegalStateException {
422         checkNotFailed();
423         LOGGER.trace("delete document type: " + type + " ; id: " + id);
424         
425         String typeName = getTypeAsString(type);
426         String indexName = configs.getIndexName();
427         String idStr = uriHandler.encode(id);
428         DeleteRequest deleteRequest = new DeleteRequest(indexName, typeName, idStr);
429         
430         if(bulk == null) //start a new bulk if it's not already started.
431             this.initBulk();
432         bulk.add(deleteRequest);
433     }
434     
435     /**
436      * end the "transaction".
437      * @param commit
438      * @throws DataCorruptedException
439      * @throws IOException
440      * @throws IllegalStateException
441      */
442     @Override
443     public void end(boolean commit) throws DataCorruptedException, IOException, IllegalStateException {
444         checkNotFailed();
445         LOGGER.debug("end");
446         if(commit){
447             LOGGER.debug("flushing pending operation");
448             this.flushBulk(configs.getBulkTime());
449             LOGGER.debug("done");
450         }
451     }
452     
453     /**
454      * merges the segment of the index for reach the specified segment number
455      * @param segNumber at least 1.
456      */
457     public void optimizeIndex(int segNumber){
458         client.admin().indices().prepareOptimize(configs.getIndexName()).setMaxNumSegments(segNumber).setFlush(true).execute().actionGet();
459     }
460     
461     public void deleteAllIndexContent(){
462         this.waitYellowStatus();
463         if(!client.admin().indices().prepareDelete("*").execute().actionGet().isAcknowledged())
464             throw new RuntimeException("can not delete the index ("+configs.getIndexName()+") content ");
465     }
466     public void deleteAll(){
467         this.waitYellowStatus();
468         DeleteIndexResponse response = null;
469         try{
470             response = client.admin().indices().delete(new DeleteIndexRequest(configs.getIndexName())).actionGet();
471         }catch(IndexMissingException ex){
472             return;
473         }
474         if(!response.isAcknowledged())
475             throw new RuntimeException("can not delete the index: " + configs.getIndexName());
476     }
477 }