1   package eu.fbk.knowledgestore.populator.naf;
2   
3   import eu.fbk.knowledgestore.KnowledgeStore;
4   import eu.fbk.knowledgestore.OperationException;
5   import eu.fbk.knowledgestore.Session;
6   import eu.fbk.knowledgestore.client.Client;
7   import eu.fbk.knowledgestore.populator.naf.connection.KnowledgestoreServer;
8   import org.apache.commons.cli.*;
9   import org.openrdf.model.impl.URIImpl;
10  import org.slf4j.Logger;
11  import org.slf4j.LoggerFactory;
12  
13  import javax.xml.bind.JAXBContext;
14  import javax.xml.bind.JAXBException;
15  import javax.xml.bind.Unmarshaller;
16  import java.io.*;
17  import java.net.URL;
18  import java.util.Hashtable;
19  import java.util.LinkedList;
20  import java.util.concurrent.ArrayBlockingQueue;
21  import java.util.concurrent.BlockingQueue;
22  import java.util.logging.Level;
23  
24  public class nafPopulator {
25  
26      public static int KSresourceReplacement = 1;     	//Default 1=discard the new, 2=ignore repopulate, 3=delete repopulate
27  	static statistics globalStats = new statistics();
28      static Writer out, mentionFile;
29      static int batchSize = 1, consumer_threads = 1;
30  
31      static String disabledItems = "", reportFileName = "report.txt", mentionsF = "records.txt";
32      static boolean recursion = false, printToFile = false,JobFinished=false;
33      static boolean store_partial_info = false;
34      static boolean FInFile=false; //to keep track if the input is a file containing paths of NAFs
35      static boolean ZInFile=false; //to keep track if the input is a zip archive containing NAF files
36      static boolean TInFile=false; //to keep track if the input is a compressed tar archive containing NAF files
37      static Integer timeout = null;
38      static String INpath="";
39      private static String SERVER_URL = "";
40       static String USERNAME = "";
41       static String PASSWORD = "";
42      static Session session = null;
43      static KnowledgeStore store = null;
44  
45     // static Hashtable<String, KSPresentation> mentions = new Hashtable<String, KSPresentation>();
46      static String populatorVersion = "V0.1";
47      static Logger logger = LoggerFactory.getLogger(nafPopulator.class);
48      static LinkedList<Thread> threads = new LinkedList<Thread>();
49      //Creating BlockingQueue of size 10
50      static BlockingQueue<Hashtable<String, KSPresentation>> queue ;
51      static Producer producer ;
52      static Consumer consumer ;
53      /**
54       * @param args
55       * @throws IOException
56       * @throws JAXBException
57       * @throws ClassNotFoundException
58       * @throws SecurityException
59       * @throws NoSuchMethodException
60       * @throws IllegalAccessException
61       * @throws InstantiationException
62       */
63      public static void main(String[] args) throws JAXBException, IOException,
64              InstantiationException, IllegalAccessException, NoSuchMethodException,
65              SecurityException, ClassNotFoundException {
66  
67         init();
68          // Configure command line options
69          final Options options = new Options();
70          options.addOption("u", "ks_server_url", true, "the URL of the ks server");
71          options.addOption("n", "NAF_file", true, "the path to a NAF file to be processed.");
72          options.addOption("d", "NAF_directory", true,
73                  "the path of a directory whose files are NAF files to be processed.");
74          options.addOption("f", "file", true,
75                  "the path of a file whose content is a list of NAF paths to be processed (one for line).");
76          options.addOption("r", "recursive", false,
77                  "process recursively the given NAF directory (in conjunction with -d)");
78          options.addOption("x", "exclude", true,
79                  "the given layer is excluded wen populating the K. Currently only the 'Entity' layer can be provided as argument.");
80          options.addOption("b", "batchsize", true,
81                  "the number of NAF files to be processed and submitted to the KS in a single step; -1 means all (WARNING: very memory consuming!), defaults to 1.");
82          options.addOption("qs", "queueSize", true,
83                  "the number of batch queue items to be hold in memory; defaults to 2.");
84          options.addOption("ct", "consumerThreads", true,
85                  "the number of consumer threads to be thrown simultaneously, 1 is default.");
86          options.addOption("ksm", "ksModality", true,
87                  "Submitting to KS modality:  (1=discard the new,Default) , (2=ignore previous content and populate), (3=delete previous content and repopulate)");
88  
89          options.addOption("v", "version", false,
90                  "display version and copyright information, then exit");
91          options.addOption("h", "help", false, "display usage information, then exit");
92          options.addOption("spi", "store_partial_info", false,
93                  "store in the KS even partial information in case of error (try to maximize data stored), defaults to false.");
94          options.addOption("o", "outputreportfilepath", true,
95                  "the path of the 'report' file, where individual and overall statistics are saved");
96          options.addOption("or", "outputrecordsfilepath", true,
97                  "the path of the 'record' file, where mentions and resources objects are saved");
98          options.addOption("p", "parsingOnly", false,
99                  "perform NAF parsing only (do not store information in the KS)");
100         options.addOption("z", "zip", true,
101                 "the path to a zip archive containing NAF files to be processed.");
102         options.addOption("t", "tgz", true,
103                 "the path to a compressed tar archive (.tar.gz or .tgz) containing NAF files to be processed.");
104         options.addOption("T", "timeout", true,
105                 "timeout (in seconds, default 10)");
106 
107         try {
108             final CommandLine cmd = new GnuParser().parse(options, args);
109 	    {
110 
111             java.util.logging.Logger COM_SUN_JERSEY_LOGGER = java.util.logging.Logger.getLogger( "org.glassfish.jersey.message.internal.HeaderUtils" );
112             COM_SUN_JERSEY_LOGGER.setLevel(Level.SEVERE);
113 
114             //check if we have many inputs in the same call, error and exit
115 		int nafFileModalitiesCount = 0;
116 		if (cmd.hasOption("ksm")) {
117 			KSresourceReplacement=Integer.parseInt(cmd.getOptionValue("ksm")) ;
118 			}
119 
120 		if (cmd.hasOption("n")) { nafFileModalitiesCount++;}
121 		if (cmd.hasOption("d")) { nafFileModalitiesCount++;}
122 		if (cmd.hasOption("f")) { nafFileModalitiesCount++;}
123 		if (cmd.hasOption("z")) { nafFileModalitiesCount++;}
124 		if (nafFileModalitiesCount > 1) {
125 		    System.err.println("Cannot manage multiple options(-n|-d|-f|-z): please choice one of them.");
126 		    printUsage(options);
127 		    System.exit(0);
128 		}
129             }
130 
131             if (cmd.hasOption("u")) {
132                 SERVER_URL = cmd.getOptionValue('u');
133             }
134 
135             if (cmd.hasOption("T")) {
136                 timeout=Integer.parseInt(cmd.getOptionValue("T")) * 1000 ;
137             }
138             if (cmd.hasOption("ct")) {
139                 consumer_threads=Integer.parseInt(cmd.getOptionValue("ct"))  ;
140             }
141             //TODO important if any illegal input crash with error message
142 
143             if (cmd.hasOption("qs")) {
144                 queue = new ArrayBlockingQueue<>(Integer.parseInt(cmd.getOptionValue("qs")))  ;
145             }else{
146                 queue = new ArrayBlockingQueue<>(2)  ;
147             }
148              producer = new Producer(queue);
149              consumer = new Consumer(queue);
150 
151             if (cmd.hasOption("o")) {
152                 reportFileName = cmd.getOptionValue('o');
153                 File tst = new File(reportFileName);
154                 if(tst.exists()&&!tst.isFile()&&tst.isDirectory()){
155                     reportFileName = reportFileName +"/report.txt";
156                 }
157                 nafPopulator.out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(
158                         // filePath.getPath(),
159                                     nafPopulator.reportFileName)), "utf-8"));
160             }
161             if (cmd.hasOption("or")) {
162                 mentionsF = cmd.getOptionValue("or");
163                 File tst = new File(mentionsF);
164                 if(tst.exists()&&!tst.isFile()&&tst.isDirectory()){
165                     mentionsF = mentionsF +"/records.txt";
166                 }
167                 nafPopulator.mentionFile = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(
168                         // filePath.getPath(),
169                                     nafPopulator.mentionsF)), "utf-8"));
170             }
171 
172             if (cmd.hasOption("p")) {
173                 printToFile = true;
174             }
175 
176             if (cmd.hasOption("spi")) {
177                 store_partial_info = true;
178             }
179             if (cmd.hasOption("v")) {
180                 System.out.println("KnowledgeStore.populator.version: " + populatorVersion);
181                 System.exit(0);
182             }
183             if (cmd.hasOption("h")
184 		|| (!cmd.hasOption("n") && !cmd.hasOption("d") && !cmd.hasOption("f") && !cmd.hasOption("z") && !cmd.hasOption("t"))) {
185                 printUsage(options);
186                 System.exit(0);
187             }
188 
189             if (cmd.hasOption("b")) {
190                 batchSize = Integer.parseInt(cmd.getOptionValue('b'));
191             } else {
192                 batchSize = 1;
193             }
194 
195             if (cmd.hasOption("x")) {
196                 disabledItems = cmd.getOptionValue('x');
197             }
198             if (cmd.hasOption("r")) {
199                 recursion = true;
200             }
201             if (!printToFile &&
202 		(cmd.hasOption("n") || cmd.hasOption("d") || cmd.hasOption("f") || cmd.hasOption("z") || cmd.hasOption("t"))) {
203                 readConnectionFile();
204             }
205             if (cmd.hasOption("n") || cmd.hasOption("d") || cmd.hasOption("f") || cmd.hasOption("z") || cmd.hasOption("t")) {
206 
207                 if (cmd.hasOption("n")){
208                     INpath = cmd.getOptionValue('n');
209                     //analyzePathAndRunSystem(cmd.getOptionValue('n'), disabledItems, recursion);
210                 } else if (cmd.hasOption("d")){
211                     INpath = cmd.getOptionValue('d');
212                    // analyzePathAndRunSystem(cmd.getOptionValue('d'), disabledItems, recursion);
213                 } else if (cmd.hasOption("f")) {
214                     FInFile=true;
215                     INpath = cmd.getOptionValue('f');
216                 } else if (cmd.hasOption("z")) {
217                     ZInFile=true;
218                     INpath = cmd.getOptionValue('z');
219                 }  else if (cmd.hasOption("t")) {
220                     TInFile=true;
221                     INpath = cmd.getOptionValue('t');
222                 }
223                 //starting producer to produce messages in queue
224                 new Thread(producer).start();
225                 //starting consumer to consume messages from queue
226               /*  ExecutorService threadPool = Executors.newFixedThreadPool(consumer_threads);
227                 threadPool.submit(consumer);
228                 threadPool.shutdown();*/
229                for(int i=0;i<consumer_threads;i++){
230                 Thread a = new Thread(consumer);
231                 a.start();
232                 threads.addLast(a);
233                 }
234 
235                 // new Thread(consumer).start();
236                finalizeThread finalizeThreadObj = new finalizeThread();
237                 new Thread(finalizeThreadObj).start();
238 
239 
240             }
241 
242         } catch (final ParseException ex) {
243             // Display error message and then usage on syntax error
244             System.err.println("SYNTAX ERROR: " + ex.getMessage());
245             printUsage(options);
246         } catch (final Throwable ex) {
247             // Display error message and stack trace on generic error
248             System.err.print("EXECUTION FAILED: ");
249             ex.printStackTrace();
250             printUsage(options);
251         }
252 
253 
254     }
255 
256     private static void init() {
257         globalStats = new statistics();
258         out=null;
259         mentionFile = null;
260         disabledItems = ""; reportFileName = "report.txt"; mentionsF = "records.txt";
261         recursion = false; printToFile = false;
262         store_partial_info = false;
263         SERVER_URL = "";
264         USERNAME = "";
265         PASSWORD = "";
266         session = null;
267         store = null;
268         //mentions = new Hashtable<String, KSPresentation>();
269         logger = LoggerFactory.getLogger(nafPopulator.class);
270 
271     }
272 
273     static void nullObjects() throws IOException {
274         nafPopulator.closeConnection();
275         if (nafPopulator.mentionFile != null) {
276             nafPopulator.mentionFile.flush();
277             nafPopulator.mentionFile.close();
278         }
279 		if (nafPopulator.out != null) {
280 			nafPopulator.out.flush();
281 			nafPopulator.out.close();
282 		}
283         globalStats = null;
284         out=null;
285         mentionFile = null;
286         batchSize = 1;
287         disabledItems = null;
288         reportFileName = null;
289         mentionsF = null;
290         recursion = false;
291         printToFile = false;
292         store_partial_info = false;
293         session = null;
294         store = null;
295        // mentions = null;
296 
297     }
298 
299     private static void printUsage(Options options) {
300         int WIDTH = 80;
301         final PrintWriter out = new PrintWriter(System.out);
302         final HelpFormatter formatter = new HelpFormatter();
303         formatter.printUsage(out, WIDTH, "eu.fbk.knowledgestore.populator.naf.nafPopulator",
304                 options);
305         out.println("\nOptions");
306         formatter.printOptions(out, WIDTH, options, 2, 2);
307         out.flush();
308     }
309 
310 
311 
312 
313     static void updatestats(statistics st) {
314         globalStats.setObjectMention(globalStats.getObjectMention() + st.getObjectMention());
315         globalStats.setPER(globalStats.getPER() + st.getPER());
316         globalStats.setORG(globalStats.getORG() + st.getORG());
317         globalStats.setLOC(globalStats.getLOC() + st.getLOC());
318         globalStats.setFin(globalStats.getFin() + st.getFin());
319         globalStats.setMix(globalStats.getMix() + st.getMix());
320         globalStats.setPRO(globalStats.getPRO() + st.getPRO());
321         globalStats.setNo_mapping(globalStats.getNo_mapping() + st.getNo_mapping());
322         globalStats.setTimeMention(globalStats.getTimeMention() + st.getTimeMention());
323         globalStats.setEventMention(globalStats.getEventMention() + st.getEventMention());
324         globalStats.setParticipationMention(globalStats.getParticipationMention()
325                 + st.getParticipationMention());
326         globalStats.setEntity(globalStats.getEntity() + st.getEntity());
327         globalStats.setCoref(globalStats.getCoref() + st.getCoref());
328         globalStats.setFactuality(globalStats.getFactuality() + st.getFactuality());
329         globalStats.setRole(globalStats.getRole() + st.getRole());
330         globalStats.setRolewithEntity(globalStats.getRolewithEntity() + st.getRolewithEntity());
331         globalStats.setRolewithoutEntity(globalStats.getRolewithoutEntity()
332                 + st.getRolewithoutEntity());
333         globalStats.setSrl(globalStats.getSrl() + st.getSrl());
334         globalStats.setTimex(globalStats.getTimex() + st.getTimex());
335         globalStats.setTlinkMention(globalStats.getTlinkMention() + st.getTlinkMention());
336         globalStats.setTlinkMentionDiscarded(globalStats.getTlinkMentionDiscarded() + st.getTlinkMentionDiscarded());
337         globalStats.setClinkMention(globalStats.getClinkMention() + st.getClinkMention());
338         globalStats.setClinkMentionDiscarded(globalStats.getClinkMentionDiscarded() + st.getClinkMentionDiscarded());
339         globalStats.setTlinkMentionsEnriched(globalStats.getTlinkMentionsEnriched() + st.getTlinkMentionsEnriched());
340 
341         globalStats.setCorefMentionEvent(globalStats.getCorefMentionEvent() + st.getCorefMentionEvent());
342         globalStats.setCorefMentionNotEvent(globalStats.getCorefMentionNotEvent() + st.getCorefMentionNotEvent());
343     }
344 
345     public static void readConnectionFile() throws UnsupportedEncodingException, JAXBException,
346             IOException {
347         String resourceName = "populator-ks-connection.xml";
348         URL url = nafPopulator.class.getResource(resourceName);
349 
350         JAXBContext jc = JAXBContext.newInstance("eu.fbk.knowledgestore.populator.naf.connection");
351         Unmarshaller unmarshaller = jc.createUnmarshaller();
352         if (url != null) {
353             KnowledgestoreServer myFile = (KnowledgestoreServer) unmarshaller
354 					.unmarshal(new InputStreamReader(url.openStream(), "UTF-8"));
355 	    // read SERVER_URL from XML file unless previously defined with command line parameters
356 	    if (SERVER_URL.equals("")) {
357 		SERVER_URL = myFile.getUrl();
358 	    }
359             USERNAME = myFile.getUsername();
360             PASSWORD = myFile.getPassword();
361             checkSession();
362         } else {
363             System.err
364 					.println("Error: populator-ks-connection.xml.xml file not found!\nYou should first create the connection file to the KS.");
365         }
366 
367     }
368 
369     static void checkSession() {
370 	logger.info("checkSession SERVER_URL |" + SERVER_URL + "|");
371         if (store == null) {
372             // Initialize a KnowledgeStore client
373             store = Client.builder(SERVER_URL).maxConnections(16).validateServer(false).socketTimeout(timeout).build();
374         }
375         if (store != null && (session == null || session.isClosed())) {
376             // Acquire a session for a given username/password pair
377             session = store.newSession(USERNAME, PASSWORD);
378         }
379         if (store == null || session == null || session.isClosed()) {
380 	    String errMsg = "";
381 	    if (store == null) { errMsg = "null store"; }
382 	    else if (session == null) { errMsg = "null session"; }
383 	    else { errMsg = "closed session"; }
384             logger.error("checkSession with SERVER_URL " + SERVER_URL + " : " + errMsg);
385             System.exit(0);
386         }
387         try {
388             session.download(new URIImpl("http://localhost/test")).exec();
389         } catch (IllegalStateException e) {
390             e.printStackTrace();
391             logger.error("checkSession with SERVER_URL " + SERVER_URL + " : IllegalStateException");
392             System.exit(0);
393         } catch (OperationException e) {
394             e.printStackTrace();
395             logger.error("checkSession with SERVER_URL " + SERVER_URL + " : OperationException");
396             System.exit(0);
397         }
398 
399     }
400 
401     static void closeConnection() {
402     	if(session!=null && !session.isClosed()){
403         // Close the session
404         session.close();
405     	}
406     	if(store!=null&&!store.isClosed()){
407         // Ensure to close the KS (will also close pending sessions)
408         store.close();
409     	}
410     }
411 }