1   package eu.fbk.knowledgestore.populator.naf;
2   
3   import java.io.BufferedReader;
4   import java.io.BufferedWriter;
5   import java.io.File;
6   import java.io.FileInputStream;
7   import java.io.FileOutputStream;
8   import java.io.IOException;
9   import java.io.InputStreamReader;
10  import java.io.OutputStream;
11  import java.io.OutputStreamWriter;
12  import java.io.Reader;
13  import java.io.Writer;
14  import java.lang.reflect.InvocationTargetException;
15  import java.lang.reflect.Method;
16  import java.util.Hashtable;
17  import java.util.LinkedList;
18  import java.util.zip.ZipEntry;
19  import java.util.zip.ZipInputStream;
20  
21  import javax.xml.bind.JAXBException;
22  
23  import org.apache.commons.compress.archivers.tar.*;
24  import org.apache.commons.compress.compressors.gzip.*;
25  import org.apache.commons.compress.utils.IOUtils;
26  import org.slf4j.Logger;
27  
28  public class NAFRunner {
29      
30      void generate(){
31          try {
32  	    if (nafPopulator.FInFile) {
33  		/* 
34  		   input is a file whose content is a list of NAF paths to be processed (one for line)
35  		*/
36  		FileInputStream in = new FileInputStream(nafPopulator.INpath);
37  		Reader reader = new InputStreamReader(in, "utf8");
38  		BufferedReader br = new BufferedReader(reader);
39  		String line = "";
40  		LinkedList<File> fileslist = new LinkedList<File>();
41  		while ((line = br.readLine()) != null) {
42              
43  		    if(fileslist.size() >= nafPopulator.batchSize){
44  			RunSystemOnList(fileslist, nafPopulator.disabledItems, nafPopulator.recursion);
45  			fileslist.clear();
46  		    }
47             
48  		    File e=new File(line);
49  		    if(e.exists())
50  			fileslist.addLast(e);
51  		    else {
52  			System.err.println("Path not exist!" + e.getPath());
53  
54  		    }
55             
56  		}
57  		if(fileslist.size()>0){
58  		    RunSystemOnList(fileslist, nafPopulator.disabledItems, nafPopulator.recursion);
59  		    fileslist.clear();
60  		}
61  		in.close();
62  	    } else if (nafPopulator.ZInFile) {
63  		/* 
64  		   input is a zip archive containing NAF files to be processed
65  		*/
66  		String ZIP_OUTPUT_DIR = "/tmp/nafPopulatorZipOutDir";
67  		byte[] buffer = new byte[1024];
68  		LinkedList<File> fileslist = new LinkedList<File>();
69  		boolean multipleFileFlag = (nafPopulator.batchSize > 1);
70  
71  		// create output directory is not exists
72  		File zipDir = new File(ZIP_OUTPUT_DIR);
73  		if (!zipDir.exists()) {
74  		    zipDir.mkdir();
75  		}
76  
77  		// get the zip file content
78  		ZipInputStream zis = new ZipInputStream(new FileInputStream(nafPopulator.INpath));
79  
80  		// iterate over zipped file list entry
81  		ZipEntry ze = zis.getNextEntry();
82  		while (ze != null) {
83  
84  		    // if it is a directory, then skip it, else copy the file contents
85  		    //
86  		    if (ze.isDirectory()) {
87  			/*
88  			  String zeName = ze.getName();
89  			  File extractedDir = new File(ZIP_OUTPUT_DIR + File.separator + zeName);
90  			  extractedDir.mkdirs();
91  			  System.out.println("ROL2: created new dir " + extractedDir.getAbsoluteFile());
92  			*/
93  		    } else {
94  
95  			String zeName = ze.getName();
96  
97  			// just use the basename of the file
98  			File tmpFile = new File(ZIP_OUTPUT_DIR + File.separator + zeName);
99  			String basename = tmpFile.getName();
100 
101 			File extractedFile = new File(ZIP_OUTPUT_DIR + File.separator + basename);
102 			String extractedPath = extractedFile.getAbsolutePath();
103 
104 			// create all non existing directories
105 			// 
106 			// new File(extractedFile.getParent()).mkdirs();
107 
108 			FileOutputStream fos = new FileOutputStream(extractedFile);
109 			int len;
110 			while ((len = zis.read(buffer)) > 0) {
111 			    fos.write(buffer, 0, len);
112 			}
113 			fos.close();
114 			// System.out.println("ROL3: created new file " + extractedPath + " |" + zeName + "|");
115 
116 			if (multipleFileFlag) {
117 			    // if needed invocate populator on the fileslist and delete the extracted files
118 			    //
119 			    if (fileslist.size() >= nafPopulator.batchSize) {
120 				RunSystemOnList(fileslist, nafPopulator.disabledItems, nafPopulator.recursion);
121 				fileslist.clear();
122 			    }
123 			    // add the file to the fileslist
124 			    //
125 			    fileslist.addLast(extractedFile);
126 			} else {
127 			    // invocate the populator of the extracted file
128 			    //
129 			    analyzePathAndRunSystem(extractedPath, nafPopulator.disabledItems, nafPopulator.recursion);
130 			}
131 		    }
132 		    // close entry and get a new one
133 		    zis.closeEntry();
134 		    ze = zis.getNextEntry();
135 		}
136 
137 		// if needed invocate populator on the fileslist and delete the extracted files
138 		//
139 		if (multipleFileFlag && (fileslist.size() > 0)) {
140 		    RunSystemOnList(fileslist, nafPopulator.disabledItems, nafPopulator.recursion);
141 		    fileslist.clear();
142 		}
143 		
144 		// close entry and zip
145 		zis.closeEntry();
146 		zis.close();
147 
148 		
149 	    } else if (nafPopulator.TInFile) {
150 		/* 
151 		   input is a compressed tar archive containing NAF files to be processed
152 		*/
153 		String TAR_OUTPUT_DIR = "/tmp/nafPopulatorTarOutDir";
154 		byte[] buffer = new byte[1024];
155 		LinkedList<File> fileslist = new LinkedList<File>();
156 		boolean multipleFileFlag = (nafPopulator.batchSize > 1);
157 
158 		// create output directory is not exists
159 		File tgzDir = new File(TAR_OUTPUT_DIR);
160 		if (!tgzDir.exists()) {
161 		    tgzDir.mkdir();
162 		}
163 
164 		// get the tgz file content
165 		TarArchiveInputStream is = new TarArchiveInputStream(new GzipCompressorInputStream(new FileInputStream(nafPopulator.INpath)));
166 
167 		// iterate over tgz file list entry
168 		TarArchiveEntry te = (TarArchiveEntry)is.getNextEntry();
169 		while (te != null) {
170 
171 		    // if it is a directory, then skip it, else copy the file contents
172 		    //
173 		    if (te.isDirectory()) {
174 			/*
175 			  String teName = te.getName();
176 			  File extractedDir = new File(TAR_OUTPUT_DIR + File.separator + teName);
177 			  extractedDir.mkdirs();
178 			  System.out.println("ROL2: created new dir " + extractedDir.getAbsoluteFile());
179 			*/
180 		    } else {
181 
182 			String teName = te.getName();
183 
184 			// just use the basename of the file
185 			File tmpFile = new File(TAR_OUTPUT_DIR + File.separator + teName);
186 			String basename = tmpFile.getName();
187 
188 			File extractedFile = new File(TAR_OUTPUT_DIR + File.separator + basename);
189 			String extractedPath = extractedFile.getAbsolutePath();
190 
191 			// create all non existing directories
192 			// 
193 			// new File(extractedFile.getParent()).mkdirs();
194 
195 			OutputStream outputFileStream = new FileOutputStream(extractedFile); 
196 			IOUtils.copy(is, outputFileStream);
197 			outputFileStream.close();
198 			// System.out.println("ROL2: created new file " + extractedPath + " |" + teName + "|");
199 
200 			if (multipleFileFlag) {
201 			    // if needed invocate populator on the fileslist and delete the extracted files
202 			    //
203 			    if (fileslist.size() >= nafPopulator.batchSize) {
204 				RunSystemOnList(fileslist, nafPopulator.disabledItems, nafPopulator.recursion);
205 				fileslist.clear();
206 			    }
207 			    // add the file to the fileslist
208 			    //
209 			    fileslist.addLast(extractedFile);
210 			} else {
211 			    // invocate the populator of the extracted file
212 			    //
213 			    analyzePathAndRunSystem(extractedPath, nafPopulator.disabledItems, nafPopulator.recursion);
214 			}
215 		    }
216 		    // get a new entry
217 		    te = (TarArchiveEntry)is.getNextEntry();
218 		}
219 
220 		// if needed invocate populator on the fileslist and delete the extracted files
221 		//
222 		if (multipleFileFlag && (fileslist.size() > 0)) {
223 		    RunSystemOnList(fileslist, nafPopulator.disabledItems, nafPopulator.recursion);
224 		    fileslist.clear();
225 		}
226 		
227 		// close the tgz
228 		is.close();
229 		
230 	    } else {
231 		/* 
232 		   input is either a NAF file or a NAF directory
233 		*/
234 		analyzePathAndRunSystem(nafPopulator.INpath, nafPopulator.disabledItems, nafPopulator.recursion);
235 	    }
236         } catch(Exception e) {
237             e.printStackTrace();
238             nafPopulator.logger.error(nafPopulator.INpath + " Processing phase: file discarded!\n");
239         }
240         nafPopulator.JobFinished=true;
241     }
242 
243     private  void RunSystemOnList(LinkedList<File> fileslist, String disabledItems, boolean rec)
244             throws JAXBException, IOException, InstantiationException, IllegalAccessException,
245             NoSuchMethodException, SecurityException, ClassNotFoundException, InterruptedException {
246         Hashtable<String, KSPresentation> mentions = new Hashtable<String, KSPresentation>();
247 	boolean submittedFlag = false;
248         for(File filePath:fileslist){
249             if (filePath.exists() && filePath.isDirectory()) {
250                 
251                 File[] listOfFiles = filePath.listFiles();
252                 
253                 for (int i = 0; i < listOfFiles.length; i++) {
254                     if (listOfFiles[i].exists() && listOfFiles[i].isFile()) {
255                         // System.err.println(i + ") working with: " + listOfFiles[i].getName());
256                         // out.append("\n" + i + "=" + listOfFiles[i].getName() + "\n");
257                         runClass(listOfFiles[i].getPath(), disabledItems,mentions);
258                     } else if (listOfFiles[i].exists() && listOfFiles[i].isDirectory()) {
259                         analyzePathAndRunSystem(listOfFiles[i].getPath(), disabledItems, rec);
260                     }
261                     nafPopulator.out.flush();
262                     //this is bug applied once it should be i%mod nafPopulator.batchSize==0
263                    submittedFlag = checkAddOrSubmit(mentions);
264 
265                 }
266               
267                 if ((nafPopulator.batchSize == -1) && (! submittedFlag)) {
268                     addAndFreeMemory(mentions);
269                 }
270             } else if (filePath.exists() && filePath.isFile()) {
271                 
272                 // out.append(filePath.getPath() + "\n");
273                 runClass(filePath.getPath(), disabledItems,mentions);
274                 submittedFlag = checkAddOrSubmit(mentions);
275             }
276         }
277         if(! submittedFlag){
278             addAndFreeMemory(mentions);
279         }
280         
281 	nafPopulator.out.flush();
282 	if (nafPopulator.printToFile && (nafPopulator.mentionFile != null)) {
283 	    nafPopulator.mentionFile.flush();
284 	}
285     }
286 
287     /*
288       return true if mentions have been submitted on the queue
289     */
290     boolean checkAddOrSubmit(Hashtable<String, KSPresentation> mentions) throws InterruptedException{
291         if (((mentions.size() % nafPopulator.batchSize) == 0) && (nafPopulator.batchSize != -1)) {
292             addAndFreeMemory(mentions);
293 	    return true;
294         } else {
295 	    return false;
296 	}
297     }
298 
299     void addAndFreeMemory(Hashtable<String, KSPresentation> mentions) throws InterruptedException{
300         Producer.queue.put(mentions);
301         // empty the heap memory
302         mentions = new Hashtable<String, KSPresentation>();
303         System.gc();
304         Runtime.getRuntime().gc();
305     }
306     
307     private  void analyzePathAndRunSystem(String path, String disabledItems, boolean rec)
308             throws JAXBException, IOException, InstantiationException, IllegalAccessException,
309             NoSuchMethodException, SecurityException, ClassNotFoundException, InterruptedException {
310         File filePath = new File(path);
311         if (filePath.exists()) {
312             
313             if (filePath.exists() && filePath.isDirectory()) {
314                 // create report file in the same directory of the input file path.
315                 File[] listOfFiles = filePath.listFiles();
316                 Hashtable<String, KSPresentation> mentions = new Hashtable<String, KSPresentation>();
317                 for (int i = 0; i < listOfFiles.length; i++) {
318                     if (listOfFiles[i].exists() && listOfFiles[i].isFile()) {
319                         // System.err.println(i + ") working with: " + listOfFiles[i].getName());
320                         // out.append("\n" + i + "=" + listOfFiles[i].getName() + "\n");
321                         runClass(listOfFiles[i].getPath(), disabledItems,mentions);
322                     } else if (listOfFiles[i].exists() && listOfFiles[i].isDirectory()) {
323                         analyzePathAndRunSystem(listOfFiles[i].getPath(), disabledItems, rec);
324                     }
325                     //this is bug applied once it should be i%mod nafPopulator.batchSize==0
326                     if (nafPopulator.batchSize != -1&&mentions.size() % nafPopulator.batchSize==0 ) {
327                         // submit the collected data to KS.
328                       /*  if (!nafPopulator.printToFile) {
329                             submitCollectedData();
330                         } else {
331                             appendCollectedDataToFile();
332                         }*/
333                         
334                         Producer.queue.put(mentions);
335                         // empty the heap memory
336                         mentions = new Hashtable<String, KSPresentation>();
337                         System.gc();
338                         Runtime.getRuntime().gc();
339                     }
340 
341                 }
342                 if(mentions.size()>0){
343                     Producer.queue.put(mentions);
344                     // empty the heap memory
345                     mentions = new Hashtable<String, KSPresentation>();
346                     System.gc();
347                     Runtime.getRuntime().gc();
348                 }
349                 //TODO if batchsize ==-1 so submit all once?! check it
350                 if (nafPopulator.batchSize == -1) {
351                     // submit the collected data to KS then it should finish as no other files
352                   /*  if (!nafPopulator.printToFile) {
353                         submitCollectedData();
354                     } else {
355                         appendCollectedDataToFile();
356                     }*/
357                     Producer.queue.put(mentions);
358                     // empty the heap memory
359                     mentions = new Hashtable<String, KSPresentation>();
360                     System.gc();
361                     Runtime.getRuntime().gc();
362                 }
363             } else if (filePath.exists() && filePath.isFile()) {
364                
365                 // out.append(filePath.getPath() + "\n");
366                 Hashtable<String, KSPresentation> mentions = new Hashtable<String, KSPresentation>();
367                 runClass(filePath.getPath(), disabledItems,mentions);
368                 /*if (!nafPopulator.printToFile) {
369                     submitCollectedData();
370                 } else {
371                     appendCollectedDataToFile();
372                 }*/
373                 Producer.queue.put(mentions);
374                 // empty the heap memory
375                 mentions = new Hashtable<String, KSPresentation>();
376                 System.gc();
377                 Runtime.getRuntime().gc();
378             }
379             if (nafPopulator.printToFile && 
380 		(nafPopulator.mentionFile != null)) {
381                 nafPopulator.mentionFile.flush();
382             }
383         } else {
384             System.err.println("Path not exist!" + filePath.getPath());
385 
386         }
387     }
388 
389     
390     
391     public  void runClass(String path, String disabledItems, Hashtable<String, KSPresentation> mentions) throws InstantiationException,
392             IllegalAccessException, NoSuchMethodException, SecurityException,
393             ClassNotFoundException, IOException {
394 
395         System.out.println(path); // TODO
396         String className = "eu.fbk.knowledgestore.populator.naf.processNAF";
397         Class clazz = Class.forName(className);
398         Class[] parameters = new Class[] { String.class, Writer.class, String.class, boolean.class };
399         Method method = clazz.getMethod("init", parameters);
400         Object obj = clazz.newInstance();
401         try {
402             KSPresentation as = (KSPresentation) method.invoke(obj, path, nafPopulator.out, disabledItems,
403                     nafPopulator.store_partial_info);
404             if (as != null) {
405                 mentions.put(path, as);
406             } else {
407                 nafPopulator.logger.error(path + " null is returned from processNAF procedure! -  Processing phase: file discarded!\n");
408             }
409         } catch (IllegalAccessException e) {
410             e.printStackTrace();
411             nafPopulator.logger.error(path + " Processing phase: file discarded!\n");
412         } catch (IllegalArgumentException e) {
413             e.printStackTrace();
414             nafPopulator.logger.error(path + " Processing phase: file discarded!\n");
415         } catch (InvocationTargetException e) {
416             e.printStackTrace();
417             nafPopulator.logger.error(path + " Processing phase: file discarded!\n");
418         }
419 
420     }
421 
422 }