1   /*
2   * Copyright 2015 FBK-irst.
3   *
4   * Licensed under the Apache License, Version 2.0 (the "License");
5   * you may not use this file except in compliance with the License.
6   * You may obtain a copy of the License at
7   *
8   *      http://www.apache.org/licenses/LICENSE-2.0
9   *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16  package eu.fbk.knowledgestore.elastic;
17  
18  import com.google.common.base.Charsets;
19  import com.google.common.io.Files;
20  import com.google.common.io.Resources;
21  import eu.fbk.knowledgestore.data.Data;
22  import eu.fbk.knowledgestore.data.ParseException;
23  import eu.fbk.knowledgestore.datastore.DataStore;
24  import eu.fbk.knowledgestore.datastore.DataTransaction;
25  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
26  import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
27  import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
28  import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
29  import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
30  import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
31  import org.elasticsearch.client.Client;
32  import org.elasticsearch.client.transport.TransportClient;
33  import org.elasticsearch.common.transport.TransportAddress;
34  import org.elasticsearch.common.xcontent.XContentFactory;
35  import org.elasticsearch.common.xcontent.XContentParser;
36  import org.elasticsearch.common.xcontent.XContentType;
37  import org.elasticsearch.indices.IndexAlreadyExistsException;
38  import org.elasticsearch.node.Node;
39  import org.openrdf.model.URI;
40  import org.openrdf.model.impl.URIImpl;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import java.io.File;
45  import java.io.IOException;
46  import java.net.URL;
47  import java.util.*;
48  
49  import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
50  
51  public class DataStoreElastic implements DataStore{
52      private static final Logger LOGGER = LoggerFactory.getLogger(DataStoreElastic.class);
53      private Node node; //this will be null if it's a TransportClient (-> not a local node, but connected to a remote node)
54      private Client client;
55      private final ElasticConfigurations configs;
56      private MappingHandler mapper;
57      private URIHandler uriHandler;
58      
59      public DataStoreElastic(String path){
60          configs = new ElasticConfigurations(path);
61          mapper = null;
62          node = null;
63          client = null;
64      }
65      
66      @Override
67      public void init() throws IOException, IllegalStateException {
68          LOGGER.debug("dataStore init");
69          TransportAddress[] addresses = configs.getAddresses();
70          if(addresses == null || addresses.length == 0){
71              node = nodeBuilder().settings(configs.getNodeSettings()).node();
72              
73              this.client = node.client();
74              LOGGER.info("starting a local node");
75          }else{
76              node = null;
77              this.client = new TransportClient(configs.getNodeSettings()).addTransportAddresses(addresses);
78              LOGGER.info("starting transportClient");
79          }
80          try{
81          mapper = createIndex(client);
82          uriHandler.printMapping();
83          LOGGER.debug("init done; node: " + node + " ; client: " + client + " ; mapper: " + mapper);
84          }catch(Exception ex){
85              LOGGER.error("errore nella createIndex: " + ex);
86          }
87      }
88      
89      @Override
90      public DataTransaction begin(boolean readOnly) throws DataCorruptedException, IOException, IllegalStateException {
91          LOGGER.debug("dataStore begin");
92          if(configs == null || client == null || mapper == null || uriHandler == null)
93              throw new IllegalStateException("can not start a transaction object with null values, have you called the init?\n" +
94                      "configs:" + configs + " ; client: " + client + " ; mapper: " + mapper + " ; uriHandler: " + uriHandler);
95          return new DataTransactionElastic(configs, client, mapper, uriHandler);
96      }
97      
98      /**
99       * merge all the segments in 1. If a segment becomes too large, it may cause problems.
100      */
101     public void optimize(){
102         client.admin().indices().prepareOptimize(configs.getIndexName()).setMaxNumSegments(1).setFlush(true).execute().actionGet();
103     }
104     
105     
106     @Override
107     public void close() {
108         if(node != null){
109             LOGGER.debug("close local node");
110             node.close();
111         }else{
112             LOGGER.debug("close transportClient");
113             ((TransportClient)client).close();
114         }
115         node = null;
116     }
117     
118     private void waitYellowStatus(){
119         LOGGER.debug("wait yellow status...");
120         client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
121         LOGGER.debug("done");
122     }
123     
124     private boolean isIndexExists(String indexName){
125         this.waitYellowStatus();
126         IndicesExistsRequest request = new IndicesExistsRequest(indexName);
127         IndicesExistsResponse response = client.admin().indices().exists(request).actionGet();
128         return response.isExists();
129     }
130     
131     /**
132      * creates an index with the name indexName, and sets the proper mapping for the different types.
133      * it takes the mappings from the specified files. return the mapping of the properties.
134      * @param client client of the elasticsearch node.
135      */
136     private synchronized MappingHandler createIndex(Client client) throws IOException{
137         String[] types = new String[2];
138         types[0] = "resource";
139         types[1] = "mention";
140         
141         HashSet<URI> setWeakCompr = new HashSet<>();
142         //add the user defined weak compressed namespaces.
143         String weakCompressionPath = configs.getWeakCompressionPath(); 
144         if(weakCompressionPath != null)
145             setWeakCompr = readNamespaceSetFromFile(configs.getWeakCompressionPath(), setWeakCompr);
146         HashSet<URI> setStrongCompr = new HashSet<>();
147         
148         if(!isIndexExists(configs.getIndexName())){
149             
150             String[] jsons = new String[2];
151             jsons[0] = readMappingFromFile(configs.getResourceMapping());
152             
153             jsons[1] = readMappingFromFile(configs.getMentionMapping());
154             
155             //compress the jsons.
156             XContentParser parser0 = XContentFactory.xContent(XContentType.JSON).createParser(jsons[0]);
157             Map<String, Object> map0 = parser0.map();
158             
159             XContentParser parser1 = XContentFactory.xContent(XContentType.JSON).createParser(jsons[1]);
160             Map<String, Object> map1 = parser1.map();
161             //read the properties name from the mapping. These URI are always strong compressed.
162             //properties of the sources
163             setStrongCompr = extractLeafProperties(map0, setStrongCompr);
164             //properties of the mentions
165             setStrongCompr = extractLeafProperties(map1, setStrongCompr);
166             //add the user defined namespaces to the set.
167             String strongCompressionPath = configs.getStrongCompressionPath();
168             if(strongCompressionPath != null)
169                 setStrongCompr = readNamespaceSetFromFile(strongCompressionPath, setStrongCompr);
170             LOGGER.debug("strongSet: " + setStrongCompr);
171                       
172 //init the uriHandler for uri management.
173             uriHandler = new URIHandler(setWeakCompr, setStrongCompr, client);
174 
175             //apply the uriHandler compression to the mapping properties.
176             LOGGER.debug("compressing mappings");
177             long startTime = System.currentTimeMillis();
178             map0 = compressMapping(map0);
179             map1 = compressMapping(map1);
180             LOGGER.debug("mapping compressed in: " + (System.currentTimeMillis()-startTime) + " ; result:\n\n");
181             LOGGER.debug("resource mapping map: " + map0);
182             LOGGER.debug("mention mapping map: " + map1 +"\n\n\n");
183             jsons[0] = XContentFactory.jsonBuilder().map(map0).string();
184             jsons[1] = XContentFactory.jsonBuilder().map(map1).string();
185             try{
186                 CreateIndexRequestBuilder request = client.admin().indices().prepareCreate(configs.getIndexName());
187                 setMappings(request, jsons, types);
188                 CreateIndexResponse response = request.execute().actionGet();
189                 if(!response.isAcknowledged()){
190                     throw new RuntimeException("can not create the index: " + configs.getIndexName());
191                 }
192                 LOGGER.info("index {} created", configs.getIndexName());
193             }catch(IndexAlreadyExistsException ex){
194                 LOGGER.debug("index {} already exists, isIndexExists failed", configs.getIndexName());
195             }
196         }else{
197             uriHandler =  uriHandler = new URIHandler(setWeakCompr, setStrongCompr, client);
198             LOGGER.info("index {} already exists", configs.getIndexName());
199         }
200         
201         return new MappingHandler(client.admin().indices().prepareGetMappings(configs.getIndexName())
202                 .setTypes(types).execute().actionGet());
203     }
204     
205     private void setMappings(CreateIndexRequestBuilder req, String[] jsons, String[] types){
206         for(int i=0; i< types.length; i++){
207             LOGGER.debug(types[i] + " mapping:\n" + jsons[i]);
208         }
209         int size = jsons.length;
210         if(size > types.length) size = types.length;
211         for(int i=0; i<size; i++){
212             req.addMapping(types[i], jsons[i]);
213         }
214     }
215     
216     private void setMappings(String[] types, String[] jsons){
217         int size = jsons.length;
218         if(types.length < jsons.length)
219             size = types.length;
220         
221         for(int i=0; i<size; i++){
222             LOGGER.info("setting {} mapping", types[i]);
223             PutMappingResponse mappingMentionResponse = client.admin().indices().preparePutMapping(configs.getIndexName())
224                     .setType(types[i]).setSource(jsons[i]).execute().actionGet();
225             if(!mappingMentionResponse.isAcknowledged())
226                 throw new RuntimeException("can not set the mapping for the type " + types[i]);
227         }
228         
229     }
230     
231     private String readMappingFromFile(String fileName) throws IOException{
232         String json;
233         URL url = DataTransactionElastic.class.getClassLoader().getResource(fileName);
234         if(url != null){
235             try {
236                 json = Resources.toString(url, Charsets.UTF_8);
237             }catch (IOException ex){
238                 throw new IllegalArgumentException("can not find file: " + fileName + " in the ClassPath", ex);
239             }
240         }else{
241             try {
242                 json = Files.toString(new File(fileName), Charsets.UTF_8);
243             } catch (IOException ex) {
244                 throw new IllegalArgumentException("can not find the specified file: " + fileName, ex);
245             }
246         }
247         
248         return json;
249     }
250     
251     private HashSet<URI> extractLeafProperties(Map<String, Object> map, HashSet<URI> set){
252         Set<String> propStrings = map.keySet();
253         for(String propStr : propStrings){
254             try{
255                 URI propUri = (URI)Data.parseValue(propStr, null);
256                 URI propNamespace = new URIImpl(propUri.getNamespace());
257                 set.add(propNamespace); //add the property name to the set.
258             }catch(ParseException ex){
259                 //do nothing.
260             }
261             Object value = map.get(propStr);
262             if(value instanceof Map){ //if it's not a leaf
263                 set = extractLeafProperties((Map)value, set);
264             }
265         }
266         return set;
267     }
268     
269     private HashSet<URI> readNamespaceSetFromFile(String fileName, HashSet<URI> set){
270         List<String> propStrings;
271         URL url = DataTransactionElastic.class.getClassLoader().getResource(fileName);
272         if(url != null){
273             try {
274                 propStrings = Resources.readLines(url, Charsets.UTF_8);
275             }catch (IOException ex){
276                 throw new IllegalArgumentException("can not find file: " + fileName + " in the ClassPath", ex);
277             }
278         }else{
279             try {
280                 propStrings = Files.readLines(new File(fileName), Charsets.UTF_8);
281             } catch (IOException ex) {
282                 throw new IllegalArgumentException("can not find the specified file: " + fileName, ex);
283             }
284         }
285         //add to the set.
286         for(String propStr : propStrings){
287             URI propUri = (URI)Data.parseValue(propStr, null);
288             set.add(propUri);
289         }
290         return set;
291     }
292     
293     private Map<String, Object> compressMapping(Map<String, Object> map) throws IOException{
294         HashMap<String, Object> destMap = new HashMap<>();
295         return compressMapping(destMap, map);
296     }
297     private Map<String, Object> compressMapping(Map<String, Object> destMap, Map<String, Object> sourceMap) throws IOException{
298         for(String key : sourceMap.keySet()){
299             Object value = sourceMap.get(key);
300             if(value instanceof Map){
301                 value = compressMapping((Map)value);
302             }
303             try{ //if it's a URI
304                 URI uriKey = (URI)Data.parseValue(key, null);
305                 key = uriHandler.encode(uriKey);
306                 destMap.put(key, value);
307             }catch(ParseException ex){
308                 //else put the couple in as it is.
309                 destMap.put(key, value);
310             }
311         }
312         return destMap;
313     }
314 }
315