1   package eu.fbk.knowledgestore.populator.naf;
2   
3   import eu.fbk.knowledgestore.Session;
4   import eu.fbk.knowledgestore.data.Data;
5   import eu.fbk.knowledgestore.data.Record;
6   
7   import java.io.IOException;
8   import java.lang.reflect.InvocationTargetException;
9   import java.lang.reflect.Method;
10  import java.util.Hashtable;
11  import java.util.Map.Entry;
12  import java.util.concurrent.BlockingQueue;
13  
14  public class Consumer implements Runnable {
15  
16      private BlockingQueue<Hashtable<String, KSPresentation>> queue;
17  
18      public Consumer(BlockingQueue<Hashtable<String, KSPresentation>> q) {
19          this.queue = q;
20      }
21      static boolean called = false;
22      static int cc=1;
23      @Override
24      public void run() {
25          Session session = null;
26          try {
27              System.out.println("Start Consumer:"+cc);
28              int cN= cc;
29              cc++;
30              if(nafPopulator.store!=null&&!nafPopulator.store.isClosed()){
31              	session = nafPopulator.store.newSession(nafPopulator.USERNAME, nafPopulator.PASSWORD);
32              }
33              // consuming messages until exit message is received
34                  while (!nafPopulator.JobFinished||queue.size()>0) {
35                      // Thread.sleep(10);
36                      // System.out.println("Consumed "+msg.getMsg());
37                     // System.out.println("submitting Consumer");
38                      Hashtable<String, KSPresentation> obl = queue.poll();
39                      if(obl!=null){
40                         System.out.println("Consumer:"+cc+" is serving{"+obl.keySet()+"}");
41                      if (!nafPopulator.printToFile) {
42                          submitCollectedData(obl,session);
43                          
44                      } else {
45                          appendCollectedDataToFile(obl);
46                          
47                      }
48                      }
49                     /* try 
50                      {
51                       // 
52                          Hashtable<String, KSPresentation> obl = queue.poll(100, TimeUnit.MILLISECONDS);
53                          if(nafPopulator.JobFinished && queue.isEmpty()&&obl==null)
54                              return;
55                          if(obl!= null)
56                          {
57                             
58                          }
59                      } 
60                      catch (InterruptedException e) 
61                      {                   
62                          return;
63                      }*/
64  
65                  }
66                  
67                  /*Thread.currentThread().interrupt();
68                  
69                  if(allThreadsDied())
70                      footer();*/
71              
72                  
73          } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | SecurityException | IOException  e) {
74              e.printStackTrace();
75          }
76          finally {
77              if (session != null) {
78                  session.close();
79              }
80          }
81      }
82      
83  
84  
85  
86      private static void submitCollectedData(Hashtable<String, KSPresentation> mentions, Session session) throws ClassNotFoundException,
87              InstantiationException, IllegalAccessException, NoSuchMethodException,
88              SecurityException, IOException {
89          String className = "eu.fbk.knowledgestore.populator.naf.submitKS";
90          Class clazz = Class.forName(className);
91          Class[] parameters = new Class[] { Hashtable.class, boolean.class, Session.class };
92          Method method = clazz.getMethod("init", parameters);
93          Object obj = clazz.newInstance();
94          try {
95  
96              nafPopulator.checkSession();
97              int status = (Integer) method.invoke(obj, mentions, nafPopulator.store_partial_info, session);
98              if (status == 1) {// resourse submitted and states updated in submitKS.java
99              }
100             if (status == 0) {
101                 // error happens, rollback done. redo file by file,
102                 Hashtable<String, KSPresentation> mentmp = new Hashtable<String, KSPresentation>();
103                 for (Entry<String, KSPresentation> rc : mentions.entrySet()) {
104                     mentmp.put(rc.getKey(), rc.getValue());
105 
106                     int status2 = (Integer) method
107                             .invoke(obj, mentmp, nafPopulator.store_partial_info, nafPopulator.session);
108                     if (status2 == 1) {
109                     	// resourse submitted and states updated in submitKS.java
110                         mentmp.clear();
111                     }
112                     if (status2 == 0) {
113                         nafPopulator.logger.error("Error storing this file to KS: "
114                                 + rc.getValue().getNaf_file_path());
115                     }
116 
117                 }
118             }
119             mentions.clear();
120         } catch (IllegalAccessException e) {
121             e.printStackTrace();
122             String error = " Involved file(s):";
123             for (KSPresentation vl : mentions.values()) {
124                 error += vl.getNaf_file_path() + ",";
125             }
126             error += ((e.getMessage() != null) ? e.getMessage() : "")
127                     + "\nStoring to KS phase: Populating mentions interrupted!";
128             nafPopulator.logger.error(error);
129         } catch (IllegalArgumentException e) {
130             e.printStackTrace();
131             String error = " Involved file(s):";
132             for (KSPresentation vl : mentions.values()) {
133                 error += vl.getNaf_file_path() + ",";
134             }
135             error += ((e.getMessage() != null) ? e.getMessage() : "")
136                     + "\nStoring to KS phase: Populating mentions interrupted!";
137             nafPopulator.logger.error(error);
138         } catch (InvocationTargetException e) {
139             e.printStackTrace();
140             String error = " Involved file(s):";
141             for (KSPresentation vl : mentions.values()) {
142                 error += vl.getNaf_file_path() + ",";
143             }
144             error += ((e.getMessage() != null) ? e.getMessage() : "")
145                     + "\nStoring to KS phase: Populating mentions interrupted!";
146             nafPopulator.logger.error(error);
147         } finally {
148             mentions.clear();
149         }
150     }
151 
152     private static void appendCollectedDataToFile(Hashtable<String, KSPresentation> mentions) throws IOException {
153 
154         for (Entry<String, KSPresentation> mn : mentions.entrySet()) {
155             String naf_file_path = mn.getValue().getNaf_file_path();
156             String stats = mn.getValue().getStats().getStats();
157             if (nafPopulator.out != null) {
158                 nafPopulator.out.append("NAF: " + naf_file_path);
159                 nafPopulator.out.append(stats);
160                 nafPopulator.out.append("\n");
161                 nafPopulator.out.flush();
162             }
163             nafPopulator.updatestats(mn.getValue().getStats());
164             nafPopulator.mentionFile.append(mn.getValue().getNewsResource()
165                     .toString(Data.getNamespaceMap(), true)
166                     + "\n");
167             nafPopulator.mentionFile.append(mn.getValue().getNaf().toString(Data.getNamespaceMap(), true)
168                     + "\n");
169             for (Record mnMen : mn.getValue().getMentions().values()) {
170                 nafPopulator.mentionFile.append(mnMen.toString(Data.getNamespaceMap(), true) + "\n");
171             }
172 
173         }
174         if (nafPopulator.out != null) {
175             nafPopulator.out.flush();
176         }
177         nafPopulator.mentionFile.flush();
178         mentions.clear();
179 
180     }
181 }