1   /*
2   * Copyright 2015 FBK-irst.
3   *
4   * Licensed under the Apache License, Version 2.0 (the "License");
5   * you may not use this file except in compliance with the License.
6   * You may obtain a copy of the License at
7   *
8   *      http://www.apache.org/licenses/LICENSE-2.0
9   *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16  package eu.fbk.knowledgestore.elastic;
17  
18  import com.google.common.collect.BiMap;
19  import com.google.common.collect.HashBiMap;
20  import com.google.common.collect.Maps;
21  import org.elasticsearch.action.ShardOperationFailedException;
22  import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
23  import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
24  import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
25  import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
26  import org.elasticsearch.action.admin.indices.flush.FlushResponse;
27  import org.elasticsearch.action.index.IndexRequest;
28  import org.elasticsearch.action.index.IndexResponse;
29  import org.elasticsearch.action.search.SearchResponse;
30  import org.elasticsearch.client.Client;
31  import org.elasticsearch.common.settings.ImmutableSettings;
32  import org.elasticsearch.common.unit.TimeValue;
33  import org.elasticsearch.common.xcontent.XContentBuilder;
34  import org.elasticsearch.index.query.FilterBuilders;
35  import org.elasticsearch.index.query.QueryBuilders;
36  import org.elasticsearch.search.SearchHit;
37  import org.openrdf.model.URI;
38  import org.openrdf.model.impl.URIImpl;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  import java.io.IOException;
43  import java.util.HashSet;
44  import java.util.Map;
45  import java.util.Set;
46  import java.util.concurrent.atomic.AtomicInteger;
47  
48  import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
49  
50  /**
51   *
52   * @author enrico
53   */
54  public class URIHandler {
55      private static final Logger LOGGER = LoggerFactory.getLogger(URIHandler.class);
56      private static final String indexName = "urimappingindex";
57      private static final String extendedURIPropName = "ex";
58      private static final TimeValue scrollTimeOut = TimeValue.timeValueMinutes(1);
59      private final HashSet<URI> strongCompress;
60      private static final char URI_ESCAPE = (char)1;
61      private final String[] types;
62      private final Client client;
63      private final AtomicInteger counter;
64      private final BiMap<String, URI> mapper; //<compressed, extended>, compressed is a String rapresentation of a Integer.
65        
66      public URIHandler(Set<URI> weakCompression, HashSet<URI> strongCompression, Client client) throws IOException{
67          
68          this.strongCompress = strongCompression;
69          this.client = client;
70          counter = new AtomicInteger(0);
71          types = new String[2];
72          types[0] = "s"; //strongCompressed URIs.
73          types[1] = "w"; //weakCompressed URIs.
74          BiMap<String, URI> map = HashBiMap.create();
75          mapper = Maps.synchronizedBiMap(map);
76          initMap(weakCompression);
77          client.admin().indices().prepareOptimize(indexName).setMaxNumSegments(1).setFlush(true).execute().actionGet();
78      }
79      
80      
81  //methods called only from the contructor
82      /**
83       * check if the index exists if not the index is created. 
84       * The content of the index is loaded into the mapper.
85       * The set of weakCompression nameSpaces is added.
86       * @param nameSpaces
87       * @throws IOException 
88       */
89      private void initMap(Set<URI> nameSpaces) throws IOException{
90          if(isIndexExists()){ //if the index exists load the content.
91              LOGGER.debug("index: " + indexName + " already exists");
92              loadMappingFromIndex();            
93          }else{ //create it.
94              createIndex();
95              LOGGER.debug("index: " + indexName + " created");
96          }
97          addWeakCompressions(nameSpaces);
98      }
99      
100     /**
101      * loads the content of the index in the Map.
102      */
103     private void loadMappingFromIndex(){
104         SearchResponse response = client.prepareSearch(indexName).setTypes(types)
105                 .setQuery(QueryBuilders.constantScoreQuery(FilterBuilders.matchAllFilter()))
106                 .setScroll(scrollTimeOut).execute().actionGet();
107         while(true){ //will stop when the scroll doesn't return any hits.
108             for(SearchHit hit : response.getHits().getHits()){
109                 if(hit.isSourceEmpty())
110                     throw new UnsupportedOperationException("deserialization of projected object not supported, URIHandler");
111                 //else
112                 String compressedURI = hit.getId();
113                 Map<String, Object> document = hit.getSource();
114                 URI extendedURI = new URIImpl((String)document.get(extendedURIPropName));
115                 int uriInteger = Integer.parseInt(compressedURI);
116                 if(uriInteger > counter.get()) counter.set(uriInteger);
117                 mapper.put(compressedURI, extendedURI); //add the couple to the map.
118             }
119             response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeOut).execute().actionGet();
120             if (response.getHits().getHits().length == 0){
121                 break;
122             }
123         }
124         counter.incrementAndGet(); //counter is the next id available.
125         
126         LOGGER.debug("compression: " + mapper.toString());
127     }
128     
129     /**
130      * creates the index with name = indexName, the types with the names types and the mapping.
131      * @throws IOException 
132      */
133     private void createIndex() throws IOException{
134         CreateIndexRequestBuilder createRequest = client.admin().indices().prepareCreate(indexName)
135                 .setSettings(ImmutableSettings.settingsBuilder().put("index.refresh_interval", -1).put().build());
136         //set the mapping for the types.
137         for(String typeName : types){
138             XContentBuilder mapping = jsonBuilder().startObject()
139                     .field(typeName).startObject()
140                     .field("dynamic", "strict")
141                     .field("properties").startObject()
142                     .field("_timestamp").startObject().field("enabled", false).endObject()
143                 .field("_all").startObject().field("enabled", false).endObject()
144                     .field("norms").startObject().field("enabled", false).endObject()
145                     
146                     .field(extendedURIPropName).startObject()
147                     .field("type", "string")
148                     .field("index", "no").endObject()
149                     
150                     .endObject()
151                     .endObject()
152                     .endObject();
153            
154             createRequest.addMapping(typeName, mapping);
155             LOGGER.trace("uriHandler index: " + indexName + "." + typeName + " mapping: " + mapping.string());
156         }
157         CreateIndexResponse createResponse = createRequest.execute().actionGet();
158         if(!createResponse.isAcknowledged()){
159              throw new RuntimeException("can not create the index: " + indexName); 
160         }
161     }    
162     
163     /**
164      * adds to the index the missing weakCompressions.
165      * @param nameSpaces
166      * @throws IOException 
167      */
168     private void addWeakCompressions(Set<URI> nameSpaces) throws IOException{
169         for(URI uri : nameSpaces){
170             if(!mapper.containsValue(uri)){ //if the value is not already in the map (and in the index)
171                 store(uri, types[1]); //store the mapping as weak compression. 
172             }
173         }
174     }
175     
176     
177     
178 //methods called NOT only from constructor.
179     
180     /**
181      * adds a mapping of the specified type (strong or weak) in the index and in the map.
182      * the called must care about syncronization.
183      * @param extendedUri the uri to compress.
184      * @param type must be in the array types.
185      * @return the id of the new mapping. (compressed string that maps the extendedUri)
186      * @throws IOException 
187      */
188     private String store(URI extendedUri, String type) throws IOException{
189         String extended = extendedUri.toString();
190         XContentBuilder source = jsonBuilder().startObject().field(extendedURIPropName, extended).endObject();
191         String id;
192         synchronized(this){
193             id = Integer.toString(counter.get());
194             
195             IndexRequest indexReq = new IndexRequest(indexName, type, id).source(source);
196             this.waitYellowStatus();
197             IndexResponse indexResponse = client.index(indexReq).actionGet();
198             
199             if(!indexResponse.isCreated())
200                 throw new IllegalStateException("failed to add the weakCompression of " + source.string() + " in: " + indexName + "." + type);
201             this.waitYellowStatus();
202             FlushResponse flushResponse = client.admin().indices().prepareFlush(indexName).execute().actionGet();
203             
204             if(flushResponse.getFailedShards() != 0){ //if there are failures.
205                 String errorMessage = "flush of the idex: " + indexName + " failed:\n";
206                 for(ShardOperationFailedException failure : flushResponse.getShardFailures()){
207                     errorMessage += "\t shardId: " + failure.shardId() + " ; reason: " + failure.reason() + "\n";
208                 }
209                 throw new IllegalStateException(errorMessage); //explode
210             }
211             
212             mapper.put(Integer.toString(counter.get()), extendedUri); //add to the map
213             counter.incrementAndGet(); //increment the id.
214         }
215         return id;
216     }
217     
218     /**
219      * 
220      * @param extended the uri to check if can be strong compressed
221      * @return the strong compressed string (escaped) or null if the string must not be strong compressed.
222      * @throws IOException 
223      */
224     private synchronized String strongCompression(URI extended) throws IOException{
225        // if(extended == null) return null;
226         String compressedNotEscaped = mapper.inverse().get(extended);
227         if(compressedNotEscaped != null)  //if there is already a strong compression in the map.
228             return String.valueOf(URI_ESCAPE) + compressedNotEscaped;
229 
230         URI namespace = new URIImpl(extended.getNamespace());
231         if(!strongCompress.contains(namespace)) //if this uri has not to be strong compressed.
232             return null;
233         
234         //the string must be strong compressed and is not already in the map (and in the index)
235         
236         //add the strong compression
237 
238         compressedNotEscaped =  store(extended, types[0]);
239         LOGGER.debug(extended.toString() + " --S--> " + compressedNotEscaped);
240 
241         return String.valueOf(URI_ESCAPE) + compressedNotEscaped;
242     }
243     
244     /**
245      * encodes the specified uri using the mapper. (opposite of decode)
246      * @param uri
247      * @return the encoded rapresentation of the uri. 
248      * @throws IOException
249      */
250     public String encode(URI uri) throws IOException{
251 //        LOGGER.debug("encoding of: " + uri);
252         //check if the uri can be strong compressed
253         String compressed = strongCompression(uri);
254         if(compressed != null){ //if the string has been strongCompressed and escaped.
255            // LOGGER.debug("compressed as: " + compressed);
256             return compressed; 
257         }
258         
259         
260         //if in the map there is no strong compression for the uri, check for weak compression
261         URI nameSpace = new URIImpl(uri.getNamespace());
262         
263         String encodedNotEscape = mapper.inverse().get(nameSpace);
264         
265         if(encodedNotEscape != null){ //if there is a weak compression
266             compressed = String.valueOf(URI_ESCAPE).concat(encodedNotEscape.concat(String.valueOf(URI_ESCAPE))
267                     .concat(uri.getLocalName()));
268 //            LOGGER.debug("compressed as: " + compressed);
269             return compressed;
270         }
271         //if no compression can be done.
272         compressed = String.valueOf(URI_ESCAPE).concat(uri.toString());
273 //        LOGGER.debug("compressed as: " + compressed);
274         return compressed;
275     }
276     
277     /**
278      * return the extended uri rapresented by the compressed string. (opposite of encode)
279      * @param compressed
280      * @return 
281      */
282     public URI decode(String compressed){
283         LOGGER.trace("decompression of: " + compressed);
284            //if the string is not a URI
285         if(compressed.length() < 2 || !(compressed.charAt(0) == URI_ESCAPE)){
286             throw new IllegalArgumentException(compressed + " is not a valid URI for decoding. Expected <$escape><number> or <$escape><number><$escape><String> or <$escape><URI>");
287         }
288         compressed = compressed.substring(1); //remove the first escape character.
289         //check if it's strong or weak compressed
290        
291         if(Character.isDigit(compressed.charAt(0))){
292             int pos = compressed.indexOf(Character.toString(URI_ESCAPE), 1);
293             if(pos == -1){ //strong compressed
294                 URI res = mapper.get(compressed);
295                 LOGGER.trace("decompressed as: " + res);
296                 return res;
297             }
298             //it's weak compressed
299             String number = compressed.substring(0, pos);
300             String nameSpaceStr = mapper.get(number).toString();
301             String localNameStr = compressed.substring(pos+1); //discard the escape char.
302             URI res = new URIImpl(nameSpaceStr + localNameStr);
303             LOGGER.trace("decompressed as: " + res);
304             return res;
305             }
306         //if it's not compressed
307         URI res = new URIImpl(compressed);
308         LOGGER.trace("decompressed as: " + res);
309         return res;
310     }
311     
312     public boolean isEncodedUri(String str){
313         return str!= null && str.length()>1 && str.charAt(0) == URI_ESCAPE;
314     }
315      
316     private void waitYellowStatus(){
317         client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
318     }
319     private boolean isIndexExists(){
320         this.waitYellowStatus();
321         IndicesExistsRequest request = new IndicesExistsRequest(indexName);
322         IndicesExistsResponse response = client.admin().indices().exists(request).actionGet();
323         return response.isExists();
324     }
325     
326     public void printMapping(){
327         LOGGER.debug("uri mapping: " + mapper.toString());
328     }
329 }