1   /*
2   * Copyright 2015 FBK-irst.
3   *
4   * Licensed under the Apache License, Version 2.0 (the "License");
5   * you may not use this file except in compliance with the License.
6   * You may obtain a copy of the License at
7   *
8   *      http://www.apache.org/licenses/LICENSE-2.0
9   *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16  package eu.fbk.knowledgestore.elastic;
17  
18  import eu.fbk.knowledgestore.data.Handler;
19  import eu.fbk.knowledgestore.data.Record;
20  import eu.fbk.knowledgestore.data.Stream;
21  import org.elasticsearch.action.search.SearchResponse;
22  import org.elasticsearch.client.Client;
23  import org.elasticsearch.common.unit.TimeValue;
24  import org.elasticsearch.search.SearchHit;
25  import org.openrdf.model.URI;
26  
27  /**
28   *
29   * @author enrico
30   */
31  public class SearchResponseStream extends Stream<Record>{
32      SearchResponse response;
33      Client client;
34      TimeValue timeout;
35      MappingHandler mapper;
36      URIHandler uriHandler;
37      URI[] properties;
38      
39      /**
40       * 
41       * @param res the SearchResponse has to be created from a SearchRequest with a setScroll.
42       * @param client client where to execute
43       * @param timeout duration of the scrollId
44       */
45      SearchResponseStream(SearchResponse res, Client client, TimeValue timeout,  MappingHandler mapper, URIHandler handler){
46          response = res;
47          this.client = client;
48          this.timeout = timeout;
49          this.mapper = mapper;
50          this.uriHandler = handler;
51      }
52      
53      @Override
54      protected void doToHandler(final Handler<? super Record> handler) throws Throwable {
55          while(true){            
56              for(SearchHit hit : response.getHits().getHits()){
57                  Record rec;
58                  rec = Utility.deserialize(hit, mapper, uriHandler);
59                  if(Thread.interrupted()){
60                      handler.handle(null);
61                      return;
62                  }
63                  handler.handle(rec);
64              }
65              response = client.prepareSearchScroll(response.getScrollId()).setScroll(timeout).execute().actionGet();
66              //Break condition: No hits are returned
67              if (response.getHits().getHits().length == 0){
68                  handler.handle(null);
69                  break;
70              }
71          }
72      }
73  }