1   /*
2   * Copyright 2015 FBK-irst.
3   *
4   * Licensed under the Apache License, Version 2.0 (the "License");
5   * you may not use this file except in compliance with the License.
6   * You may obtain a copy of the License at
7   *
8   *      http://www.apache.org/licenses/LICENSE-2.0
9   *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16  package eu.fbk.knowledgestore.elastic;
17  
18  import org.elasticsearch.common.settings.ImmutableSettings;
19  import org.elasticsearch.common.settings.Settings;
20  import org.elasticsearch.common.settings.SettingsException;
21  import org.elasticsearch.common.transport.InetSocketTransportAddress;
22  import org.elasticsearch.common.transport.TransportAddress;
23  import org.elasticsearch.common.unit.ByteSizeValue;
24  import org.elasticsearch.common.unit.TimeValue;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import java.io.FileInputStream;
29  import java.io.FileNotFoundException;
30  import java.io.InputStream;
31  import java.net.InetAddress;
32  import java.net.URL;
33  import java.net.UnknownHostException;
34  import java.util.Arrays;
35  import java.util.concurrent.TimeUnit;
36  
37  /**
38   *
39   * @author enrico
40   */
41  public class ElasticConfigurations {
42      
43      private static final Logger LOGGER = LoggerFactory.getLogger(ElasticConfigurations.class);
44      //path where to find the file with all the configurations.
45      
46  //settings handled by ES directly.
47      private final Settings nodeSettings;
48      //path where to find 2 files: the mapping of the resource type and the one for the mention type
49      private final String resourceMapping;
50      private final String mentionMapping;
51  
52      private final String indexName; //name of the index.
53      
54   //only for TransportClient.
55      private final TransportAddress[] addresses;  //addresses for TrasportClient.
56  //for scroll
57      private final TimeValue timeout; //timeout between the request of a scroll page and the next one.
58      
59  //for bulk
60      private final TimeValue bulkTime; //for bulk timeout
61      private final ByteSizeValue bulkSize; //threshold size for flush the bulk
62      private final TimeValue flushInterval; //time interval for flush the bulk
63      private final int concurrentRequests; //number of concurrent request that the bulk can execute
64      
65      //configuration for URIHandler.
66      private final String weakCompressionPath;
67      private final String strongCompressionPath;
68      
69      ElasticConfigurations(String path){
70          //load configurations from the file.
71          if(path != null){
72              URL url = DataTransactionElastic.class.getClassLoader().getResource(path);
73              if(url != null){
74                  try{
75                      LOGGER.debug("loading configuration from classPath");
76                      nodeSettings = ImmutableSettings.settingsBuilder().loadFromClasspath(path).build();
77                  }catch(SettingsException ex){
78                      throw new IllegalArgumentException("failed to load settings from classpath", ex);
79                  }
80              }else{
81                  try{
82                      LOGGER.debug("loading configuration from source");
83                      InputStream input = null;
84                      try {
85                          input = new FileInputStream(path);
86                      } catch (FileNotFoundException ex) {
87                          throw new IllegalArgumentException("file" + path + "not found", ex);
88                      }
89                      nodeSettings = ImmutableSettings.settingsBuilder().loadFromStream(path, input).build();
90                  }catch(SettingsException ex){
91                      throw new IllegalArgumentException("failed to load settings from source", ex);
92                  }
93              }
94              LOGGER.debug("nodeSettings: " + nodeSettings.names());
95              mentionMapping = nodeSettings.get("index.mapping.mention");
96              resourceMapping = nodeSettings.get("index.mapping.resource");
97              LOGGER.debug("mention mapping url: " + mentionMapping + " ; resource mapping url: " + resourceMapping);
98              indexName = nodeSettings.get("index.name");
99              if(indexName == null){
100                 throw new IllegalArgumentException("no index name found in the configuration file: " + path);
101             }
102             LOGGER.debug("index name: " + indexName);
103             bulkTime = new TimeValue(nodeSettings.getAsLong("bulk.timeout", 60000L), TimeUnit.MILLISECONDS);
104             bulkSize = new ByteSizeValue(getNodeSettings().getAsLong("bulk.size", 1024L));
105             concurrentRequests = nodeSettings.getAsInt("bulk.concurrent_request", 0);
106             flushInterval = new TimeValue(getNodeSettings().getAsLong("bulk.interval", 60000L), TimeUnit.MILLISECONDS);
107             
108             timeout = new TimeValue(getNodeSettings().getAsLong("scroll.timeout", 600000L), TimeUnit.MILLISECONDS);
109             
110             addresses = parseAddresses(nodeSettings.getAsArray("transport.client.initial_nodes"));
111             
112             weakCompressionPath = nodeSettings.get("uri_handler.weakcompression_path");
113             strongCompressionPath = nodeSettings.get("uri_handler.strongcompression_path");
114             
115         }else{//if there are no settings don't start.
116             throw new IllegalArgumentException("can not load the settings from path: " + path);
117             /*
118             LOGGER.warn("loading default settings");
119             nodeSettings = null;
120             indexName = "eu.fkb.dkm.elasticindex";
121             bulkTime = new TimeValue(1, TimeUnit.MINUTES);
122             timeout = new TimeValue(1, TimeUnit.MINUTES);
123             LOGGER.info("loaded the default settings");
124             mentionMapping = null;
125             resourceMapping = null;
126             addresses = null;
127             bulkSize = null;
128             flushInterval = null;
129             concurrentRequests = -1;
130             weakCompressionPath = null;
131             strongCompressionPath = null;
132             */
133         }
134         
135         LOGGER.debug("elasticsearch configuration loaded");
136     }
137     
138     private TransportAddress[] parseAddresses(String[] addresses){
139         LOGGER.debug("addresses string: " + Arrays.toString(addresses));
140         
141         if(addresses == null || addresses.length == 0) return null;
142         
143         TransportAddress[] res = new InetSocketTransportAddress[addresses.length];
144         for(int i=0; i< addresses.length; i++){
145             String[] splitted = addresses[i].split(":", 2); //192.168.0.1:1024
146             InetAddress address = null;
147             int port = -1;
148             try {
149                 address = InetAddress.getByName(splitted[0]);
150                 port = Integer.parseInt(splitted[1]);
151                 LOGGER.debug("adress: " + address + " : " + port);
152                 res[i] = new InetSocketTransportAddress(address, port);
153             } catch (UnknownHostException | NumberFormatException ex) {
154                 LOGGER.error("can not find the host with IP: " + splitted[0] + " and port: " + port);
155             }
156         }
157         return res;
158     }
159     
160 
161     /**
162      * @return the nodeSettings
163      */
164     public Settings getNodeSettings() {
165         return nodeSettings;
166     }
167 
168     /**
169      * @return the resourceMapping
170      */
171     public String getResourceMapping() {
172         return resourceMapping;
173     }
174 
175     /**
176      * @return the mentionMapping
177      */
178     public String getMentionMapping() {
179         return mentionMapping;
180     }
181 
182     /**
183      * @return the indexName
184      */
185     public String getIndexName() {
186         return indexName;
187     }
188 
189     /**
190      * @return the addresses
191      */
192     public TransportAddress[] getAddresses() {
193         return addresses;
194     }
195 
196     /**
197      * @return the timeout
198      */
199     public TimeValue getTimeout() {
200         return timeout;
201     }
202 
203     /**
204      * @return the bulkTime
205      */
206     public TimeValue getBulkTime() {
207         return bulkTime;
208     }
209 
210     
211     /**
212      * @return the bulkSize
213      */
214     public ByteSizeValue getBulkSize() {
215         return bulkSize;
216     }
217 
218     /**
219      * @return the flushInterval
220      */
221     public TimeValue getFlushInterval() {
222         return flushInterval;
223     }
224 
225     /**
226      * @return the concurrentRequests
227      */
228     public int getConcurrentRequests() {
229         return concurrentRequests;
230     }
231     
232     /**
233      * @return the weakCompressionPath
234      */
235     public String getWeakCompressionPath(){
236         return weakCompressionPath;
237     }
238     
239     /**
240      * @return the strongCompressionPath
241      */
242     public String getStrongCompressionPath(){
243         return strongCompressionPath;
244     }
245 }