1   package eu.fbk.knowledgestore.internal;
2   
3   import java.io.Closeable;
4   import java.io.File;
5   import java.io.FilterInputStream;
6   import java.io.FilterOutputStream;
7   import java.io.IOException;
8   import java.io.InputStream;
9   import java.io.OutputStream;
10  import java.lang.Thread.UncaughtExceptionHandler;
11  import java.lang.reflect.Type;
12  import java.net.URL;
13  import java.util.Collection;
14  import java.util.List;
15  import java.util.Map;
16  import java.util.Properties;
17  import java.util.concurrent.Callable;
18  import java.util.concurrent.ExecutionException;
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.Future;
22  import java.util.concurrent.ScheduledExecutorService;
23  import java.util.concurrent.ThreadFactory;
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.TimeoutException;
26  
27  import javax.annotation.Nullable;
28  
29  import com.google.common.base.Charsets;
30  import com.google.common.base.Preconditions;
31  import com.google.common.base.Throwables;
32  import com.google.common.collect.Lists;
33  import com.google.common.io.Resources;
34  import com.google.common.reflect.TypeToken;
35  import com.google.common.util.concurrent.ListenableFuture;
36  import com.google.common.util.concurrent.ListenableScheduledFuture;
37  import com.google.common.util.concurrent.ListeningExecutorService;
38  import com.google.common.util.concurrent.ListeningScheduledExecutorService;
39  import com.google.common.util.concurrent.MoreExecutors;
40  import com.google.common.util.concurrent.ThreadFactoryBuilder;
41  
42  import org.slf4j.Logger;
43  import org.slf4j.LoggerFactory;
44  import org.slf4j.MDC;
45  
46  public final class Util {
47  
48      private static final Logger LOGGER = LoggerFactory.getLogger(Util.class);
49  
50      public static URL getURL(final String location) {
51          URL url = null;
52          try {
53              url = Resources.getResource(location.startsWith("/") ? location.substring(1)
54                      : location);
55              if (url != null) {
56                  return url;
57              }
58          } catch (final Exception ex) {
59              // not a classpath resource - ignore
60          }
61          try {
62              final File file = new File(location);
63              if (file.exists() && file.isFile()) {
64                  return file.toURI().toURL();
65              }
66          } catch (final Exception ex) {
67              // not a file - ignore
68          }
69          try {
70              return new URL(location);
71          } catch (final Exception ex) {
72              // not a valid URL
73              throw new IllegalArgumentException("Cannot extract a URL from: " + location);
74          }
75      }
76  
77      public static String getResource(final Class<?> referenceClass, final String resourceName) {
78          try {
79              final URL url = referenceClass.getResource(resourceName);
80              return Resources.toString(url, Charsets.UTF_8);
81          } catch (final IOException ex) {
82              throw new Error("Missing resource '" + resourceName + "': " + ex.getMessage(), ex);
83          }
84      }
85  
86      public static String getVersion(final String groupId, final String artifactId,
87              final String defaultValue) {
88          final URL url = Util.class.getClassLoader().getResource(
89                  "META-INF/maven/" + groupId + "/" + artifactId + "/pom.properties");
90          String version = defaultValue;
91          if (url != null) {
92              try {
93                  final InputStream stream = url.openStream();
94                  try {
95                      final Properties properties = new Properties();
96                      properties.load(stream);
97                      version = properties.getProperty("version").trim();
98                  } finally {
99                      stream.close();
100                 }
101             } catch (final IOException ex) {
102                 version = "unknown";
103             }
104         }
105         return version;
106     }
107 
108     @SuppressWarnings({ "unchecked", "rawtypes" })
109     public static String formatType(final Type type) {
110         final TypeToken<?> token = TypeToken.of(type);
111         final Class<?> clazz = token.getRawType();
112         String name = clazz.getSimpleName();
113         if (name.isEmpty()) {
114             Class<?> parent = clazz.getSuperclass();
115             if (parent == null && clazz.getInterfaces().length > 0) {
116                 parent = clazz.getInterfaces()[0];
117             }
118             if (parent != null) {
119                 name = token.getSupertype((Class) parent).toString();
120             }
121         }
122         return name;
123     }
124 
125     @Nullable
126     public static <T> T closeQuietly(@Nullable final T object) {
127         if (object instanceof Closeable) {
128             try {
129                 ((Closeable) object).close();
130             } catch (final Throwable ex) {
131                 LOGGER.error("Error closing " + object.getClass().getSimpleName(), ex);
132             }
133         }
134         return object;
135     }
136 
137     @Nullable
138     public static InputStream interceptClose(@Nullable final InputStream stream,
139             @Nullable final Runnable runnable) {
140         if (stream == null || runnable == null) {
141             return stream;
142         }
143         final Map<String, String> mdc = Logging.getMDC();
144         return new FilterInputStream(stream) {
145 
146             private boolean closed;
147 
148             @Override
149             public void close() throws IOException {
150                 if (this.closed) {
151                     return;
152                 }
153                 final Map<String, String> oldMdc = Logging.getMDC();
154                 try {
155                     Logging.setMDC(mdc);
156                     super.close();
157                     runnable.run();
158                 } finally {
159                     this.closed = true;
160                     Logging.setMDC(oldMdc);
161                 }
162             }
163 
164         };
165     }
166 
167     @Nullable
168     public static OutputStream interceptClose(@Nullable final OutputStream stream,
169             @Nullable final Runnable runnable) {
170         if (stream == null || runnable == null) {
171             return stream;
172         }
173         final Map<String, String> mdc = Logging.getMDC();
174         return new FilterOutputStream(stream) {
175 
176             private boolean closed;
177 
178             @Override
179             public void close() throws IOException {
180                 if (this.closed) {
181                     return;
182                 }
183                 final Map<String, String> oldMdc = Logging.getMDC();
184                 try {
185                     Logging.setMDC(mdc);
186                     super.close();
187                     runnable.run();
188                 } finally {
189                     this.closed = true;
190                     Logging.setMDC(oldMdc);
191                 }
192             }
193 
194         };
195     }
196 
197     public static ListeningScheduledExecutorService newScheduler(final int numThreads,
198             final String nameFormat, final boolean daemon) {
199         final ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(daemon)
200                 .setNameFormat(nameFormat)
201                 .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
202 
203                     @Override
204                     public void uncaughtException(final Thread thread, final Throwable ex) {
205                         LOGGER.error("Uncaught exception in thread " + thread.getName(), ex);
206                     }
207 
208                 }).build();
209         return decorate(Executors.newScheduledThreadPool(numThreads, factory));
210     }
211 
212     public static ListeningExecutorService decorate(final ExecutorService executor) {
213         Preconditions.checkNotNull(executor);
214         if (executor instanceof MDCExecutorService) {
215             return (MDCExecutorService) executor;
216         } else if (executor instanceof ListeningExecutorService) {
217             return new MDCExecutorService((ListeningExecutorService) executor);
218         } else {
219             return new MDCExecutorService(MoreExecutors.listeningDecorator(executor));
220         }
221     }
222 
223     public static ListeningScheduledExecutorService decorate(
224             final ScheduledExecutorService executor) {
225         if (executor instanceof MDCScheduledExecutorService) {
226             return (MDCScheduledExecutorService) executor;
227         } else if (executor instanceof ListeningScheduledExecutorService) {
228             return new MDCScheduledExecutorService((ListeningScheduledExecutorService) executor);
229         } else {
230             // return MoreExecutors.listeningDecorator(executor);
231             return new MDCScheduledExecutorService(MoreExecutors.listeningDecorator(executor));
232         }
233     }
234 
235     private static class MDCScheduledExecutorService extends MDCExecutorService implements
236             ListeningScheduledExecutorService {
237 
238         MDCScheduledExecutorService(final ListeningScheduledExecutorService delegate) {
239             super(Preconditions.checkNotNull(delegate));
240         }
241 
242         @Override
243         ListeningScheduledExecutorService delegate() {
244             return (ListeningScheduledExecutorService) super.delegate();
245         }
246 
247         @Override
248         public ListenableScheduledFuture<?> schedule(final Runnable command, final long delay,
249                 final TimeUnit unit) {
250             return delegate().schedule(wrap(command, MDC.getCopyOfContextMap()), delay, unit);
251         }
252 
253         @Override
254         public <V> ListenableScheduledFuture<V> schedule(final Callable<V> callable,
255                 final long delay, final TimeUnit unit) {
256             return delegate().schedule(wrap(callable, MDC.getCopyOfContextMap()), delay, unit);
257         }
258 
259         @Override
260         public ListenableScheduledFuture<?> scheduleAtFixedRate(final Runnable command,
261                 final long initialDelay, final long period, final TimeUnit unit) {
262             return delegate().scheduleAtFixedRate(wrap(command, MDC.getCopyOfContextMap()),
263                     initialDelay, period, unit);
264         }
265 
266         @Override
267         public ListenableScheduledFuture<?> scheduleWithFixedDelay(final Runnable command,
268                 final long initialDelay, final long delay, final TimeUnit unit) {
269             return delegate().scheduleWithFixedDelay(wrap(command, MDC.getCopyOfContextMap()),
270                     initialDelay, delay, unit);
271         }
272 
273     }
274 
275     private static class MDCExecutorService implements ListeningExecutorService {
276 
277         private final ListeningExecutorService delegate;
278 
279         MDCExecutorService(final ListeningExecutorService delegate) {
280             this.delegate = Preconditions.checkNotNull(delegate);
281         }
282 
283         ListeningExecutorService delegate() {
284             return this.delegate;
285         }
286 
287         Runnable wrap(final Runnable runnable, final Map<String, String> mdcMap) {
288             return new Runnable() {
289 
290                 @Override
291                 public void run() {
292                     Map<String, String> oldMap = null;
293                     try {
294                         if (mdcMap != null) {
295                             oldMap = MDC.getCopyOfContextMap();
296                             MDC.setContextMap(mdcMap);
297                         }
298                         runnable.run();
299                     } catch (final Throwable ex) {
300                         LOGGER.error("Uncaught exception in thread "
301                                 + Thread.currentThread().getName() + ": " + ex.getMessage(), ex);
302                         throw Throwables.propagate(ex);
303                     } finally {
304                         if (oldMap != null) {
305                             MDC.setContextMap(oldMap);
306                         }
307                     }
308                 }
309             };
310         }
311 
312         <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> mdcMap) {
313             return new Callable<T>() {
314 
315                 @Override
316                 public T call() throws Exception {
317                     Map<String, String> oldMap = null;
318                     try {
319                         if (mdcMap != null) {
320                             oldMap = MDC.getCopyOfContextMap();
321                             MDC.setContextMap(mdcMap);
322                         }
323                         return callable.call();
324                     } catch (final Throwable ex) {
325                         LOGGER.error("Uncaught exception in thread "
326                                 + Thread.currentThread().getName() + ": " + ex.getMessage(), ex);
327                         Throwables.propagateIfPossible(ex, Exception.class);
328                         throw new RuntimeException(ex);
329                     } finally {
330                         if (oldMap != null) {
331                             MDC.setContextMap(oldMap);
332                         }
333                     }
334                 }
335             };
336         }
337 
338         <T> Collection<Callable<T>> wrap(final Collection<? extends Callable<T>> callables,
339                 final Map<String, String> mdcMap) {
340             final List<Callable<T>> result = Lists.newArrayListWithCapacity(callables.size());
341             for (final Callable<T> callable : callables) {
342                 result.add(wrap(callable, mdcMap));
343             }
344             return result;
345         }
346 
347         @Override
348         public void shutdown() {
349             delegate().shutdown();
350         }
351 
352         @Override
353         public List<Runnable> shutdownNow() {
354             return delegate().shutdownNow();
355         }
356 
357         @Override
358         public boolean isShutdown() {
359             return delegate().isShutdown();
360         }
361 
362         @Override
363         public boolean isTerminated() {
364             return delegate().isTerminated();
365         }
366 
367         @Override
368         public boolean awaitTermination(final long timeout, final TimeUnit unit)
369                 throws InterruptedException {
370             return delegate().awaitTermination(timeout, unit);
371         }
372 
373         @Override
374         public <T> T invokeAny(final Collection<? extends Callable<T>> tasks)
375                 throws InterruptedException, ExecutionException {
376             return delegate().invokeAny(wrap(tasks, MDC.getCopyOfContextMap()));
377         }
378 
379         @Override
380         public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout,
381                 final TimeUnit unit) throws InterruptedException, ExecutionException,
382                 TimeoutException {
383             return delegate().invokeAny(wrap(tasks, MDC.getCopyOfContextMap()), timeout, unit);
384         }
385 
386         @Override
387         public void execute(final Runnable command) {
388             delegate().execute(wrap(command, MDC.getCopyOfContextMap()));
389         }
390 
391         @Override
392         public <T> ListenableFuture<T> submit(final Callable<T> task) {
393             return delegate().submit(wrap(task, MDC.getCopyOfContextMap()));
394         }
395 
396         @Override
397         public ListenableFuture<?> submit(final Runnable task) {
398             return delegate().submit(wrap(task, MDC.getCopyOfContextMap()));
399         }
400 
401         @Override
402         public <T> ListenableFuture<T> submit(final Runnable task, final T result) {
403             return delegate().submit(wrap(task, MDC.getCopyOfContextMap()), result);
404         }
405 
406         @Override
407         public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks)
408                 throws InterruptedException {
409             return delegate().invokeAll(wrap(tasks, MDC.getCopyOfContextMap()));
410         }
411 
412         @Override
413         public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks,
414                 final long timeout, final TimeUnit unit) throws InterruptedException {
415             return delegate().invokeAll(wrap(tasks, MDC.getCopyOfContextMap()), timeout, unit);
416         }
417 
418     }
419 
420 }