1   package eu.fbk.knowledgestore.internal.rdf;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.io.InputStream;
6   import java.io.OutputStream;
7   import java.lang.reflect.Field;
8   import java.util.Collections;
9   import java.util.Comparator;
10  import java.util.List;
11  import java.util.Map;
12  import java.util.Set;
13  import java.util.concurrent.ArrayBlockingQueue;
14  import java.util.concurrent.BlockingQueue;
15  import java.util.concurrent.CountDownLatch;
16  import java.util.concurrent.Future;
17  import java.util.concurrent.atomic.AtomicBoolean;
18  import java.util.concurrent.atomic.AtomicLong;
19  import java.util.concurrent.atomic.AtomicReference;
20  
21  import javax.annotation.Nullable;
22  
23  import com.google.common.base.Function;
24  import com.google.common.base.MoreObjects;
25  import com.google.common.base.Preconditions;
26  import com.google.common.base.Strings;
27  import com.google.common.base.Throwables;
28  import com.google.common.collect.ImmutableList;
29  import com.google.common.collect.Lists;
30  import com.google.common.collect.Maps;
31  import com.google.common.collect.Ordering;
32  
33  import org.openrdf.model.BNode;
34  import org.openrdf.model.Literal;
35  import org.openrdf.model.Namespace;
36  import org.openrdf.model.Resource;
37  import org.openrdf.model.Statement;
38  import org.openrdf.model.URI;
39  import org.openrdf.model.Value;
40  import org.openrdf.model.impl.NamespaceImpl;
41  import org.openrdf.query.BindingSet;
42  import org.openrdf.query.QueryEvaluationException;
43  import org.openrdf.query.QueryResultHandlerException;
44  import org.openrdf.query.TupleQueryResultHandlerBase;
45  import org.openrdf.query.TupleQueryResultHandlerException;
46  import org.openrdf.query.resultio.BasicQueryWriterSettings;
47  import org.openrdf.query.resultio.BooleanQueryResultFormat;
48  import org.openrdf.query.resultio.BooleanQueryResultParser;
49  import org.openrdf.query.resultio.BooleanQueryResultWriter;
50  import org.openrdf.query.resultio.QueryResultIO;
51  import org.openrdf.query.resultio.TupleQueryResultFormat;
52  import org.openrdf.query.resultio.TupleQueryResultParser;
53  import org.openrdf.query.resultio.TupleQueryResultWriter;
54  import org.openrdf.rio.ParserConfig;
55  import org.openrdf.rio.RDFFormat;
56  import org.openrdf.rio.RDFHandler;
57  import org.openrdf.rio.RDFHandlerException;
58  import org.openrdf.rio.RDFParseException;
59  import org.openrdf.rio.RDFParser;
60  import org.openrdf.rio.RDFWriter;
61  import org.openrdf.rio.Rio;
62  import org.openrdf.rio.RioSetting;
63  import org.openrdf.rio.WriterConfig;
64  import org.openrdf.rio.helpers.BasicParserSettings;
65  import org.openrdf.rio.helpers.BasicWriterSettings;
66  import org.openrdf.rio.helpers.JSONLDMode;
67  import org.openrdf.rio.helpers.JSONLDSettings;
68  import org.openrdf.rio.helpers.NTriplesParserSettings;
69  import org.openrdf.rio.helpers.RDFHandlerBase;
70  import org.openrdf.rio.helpers.RDFJSONParserSettings;
71  import org.openrdf.rio.helpers.RDFParserBase;
72  import org.openrdf.rio.helpers.TriXParserSettings;
73  import org.openrdf.rio.helpers.XMLParserSettings;
74  import org.openrdf.rio.helpers.XMLWriterSettings;
75  import org.slf4j.Logger;
76  
77  import info.aduna.iteration.CloseableIteration;
78  import info.aduna.iteration.Iteration;
79  
80  import eu.fbk.knowledgestore.data.Data;
81  import eu.fbk.knowledgestore.data.Handler;
82  import eu.fbk.knowledgestore.data.ParseException;
83  import eu.fbk.knowledgestore.data.Stream;
84  import eu.fbk.knowledgestore.internal.Compression;
85  import eu.fbk.knowledgestore.internal.Logging;
86  import eu.fbk.knowledgestore.internal.Util;
87  import eu.fbk.rdfpro.jsonld.JSONLD;
88  import eu.fbk.rdfpro.tql.TQL;
89  
90  // TODO: reorganize code in this class
91  
92  public final class RDFUtil {
93  
94      public static final String PROPERTY_VARIABLES = "variables";
95  
96      private static boolean jsonldDisabled = false;
97  
98      public static void toHtml(final Value value, @Nullable final Map<String, String> prefixes,
99              final Appendable sink) throws IOException {
100         if (value instanceof Literal) {
101             final Literal literal = (Literal) value;
102             sink.append("<span");
103             if (literal.getLanguage() != null) {
104                 sink.append(" title=\"@").append(literal.getLanguage()).append("\"");
105             } else if (literal.getDatatype() != null) {
106                 sink.append(" title=\"&lt;").append(literal.getDatatype().stringValue())
107                         .append("&gt;\"");
108             }
109             sink.append(">").append(value.stringValue()).append("</span>");
110         } else if (value instanceof BNode) {
111             sink.append("_:").append(((BNode) value).getID());
112         } else if (value instanceof URI) {
113             final URI uri = (URI) value;
114             sink.append("<a href=\"").append(uri.stringValue()).append("\">");
115             String prefix = null;
116             if (prefixes != null) {
117                 prefix = prefixes.get(uri.getNamespace());
118             }
119             if (prefix == null) {
120                 prefix = Data.namespaceToPrefix(uri.getNamespace(), Data.getNamespaceMap());
121             }
122             if (prefix != null) {
123                 sink.append(prefix).append(':').append(uri.getLocalName());
124             } else {
125                 final int index = uri.stringValue().lastIndexOf('/');
126                 if (index >= 0) {
127                     sink.append("&lt;..").append(uri.stringValue().substring(index))
128                             .append("&gt;");
129                 } else {
130                     sink.append("&lt;").append(uri.stringValue()).append("&gt;");
131                 }
132             }
133             sink.append("</a>");
134         }
135     }
136 
137     public static Stream<Statement> toStatementStream(
138             final Iteration<? extends BindingSet, ?> iteration) {
139 
140         Preconditions.checkNotNull(iteration);
141 
142         return Stream.create(iteration).transform(new Function<BindingSet, Statement>() {
143 
144             @Override
145             @Nullable
146             public Statement apply(final BindingSet bindings) {
147                 final Value subject = bindings.getValue("subject");
148                 final Value predicate = bindings.getValue("predicate");
149                 final Value object = bindings.getValue("object");
150                 final Value context = bindings.getValue("context");
151                 if (subject instanceof Resource && predicate instanceof URI && object != null) {
152                     final Resource subj = (Resource) subject;
153                     final URI pred = (URI) predicate;
154                     if (context == null) {
155                         return Data.getValueFactory().createStatement(subj, pred, object);
156                     } else if (context instanceof Resource) {
157                         final Resource ctx = (Resource) context;
158                         return Data.getValueFactory().createStatement(subj, pred, object, ctx);
159                     }
160                 }
161                 return null;
162             }
163 
164         }, 0);
165     }
166 
167     public static Stream<BindingSet> toBindingsStream(
168             final CloseableIteration<BindingSet, QueryEvaluationException> iteration,
169             final Iterable<? extends String> variables) {
170 
171         Preconditions.checkNotNull(iteration);
172 
173         final List<String> variableList = ImmutableList.copyOf(variables);
174         final CompactBindingSet.Builder builder = CompactBindingSet.builder(variableList);
175 
176         return Stream.create(iteration).transform(new Function<BindingSet, BindingSet>() {
177 
178             @Override
179             @Nullable
180             public BindingSet apply(final BindingSet bindings) {
181                 final int variableCount = variableList.size();
182                 for (int i = 0; i < variableCount; ++i) {
183                     final String variable = variableList.get(i);
184                     builder.set(variable, bindings.getValue(variable));
185                 }
186                 return builder.build();
187             }
188 
189         }, 0).setProperty(PROPERTY_VARIABLES, variableList);
190     }
191 
192     public static int detectSparqlProlog(final String string) {
193         final int length = string.length();
194         int index = 0;
195         while (index < length) {
196             final char ch = string.charAt(index);
197             if (ch == '#') { // comment
198                 while (index < length && string.charAt(index) != '\n') {
199                     ++index;
200                 }
201             } else if (ch == 'p' || ch == 'b' || ch == 'P' || ch == 'B') { // prefix or base
202                 while (index < length && string.charAt(index) != '>') {
203                     ++index;
204                 }
205             } else if (!Character.isWhitespace(ch)) { // found
206                 return index;
207             }
208             ++index;
209         }
210         throw new ParseException(string, "Cannot detect SPARQL prolog");
211     }
212 
213     public static String detectSparqlForm(final String string) {
214         final int start = detectSparqlProlog(string);
215         for (int i = start; i < string.length(); ++i) {
216             final char ch = string.charAt(i);
217             if (Character.isWhitespace(ch)) {
218                 final String form = string.substring(start, i).toLowerCase();
219                 if (!form.equals("select") && !form.equals("construct")
220                         && !form.equals("describe") && !form.equals("ask")) {
221                     throw new ParseException(string, "Invalid query form: " + form);
222                 }
223                 return form;
224             }
225         }
226         throw new ParseException(string, "Cannot detect query form");
227     }
228 
229     public static long writeSparqlTuples(final TupleQueryResultFormat format,
230             final OutputStream out, final Stream<? extends BindingSet> stream) {
231 
232         final TupleQueryResultWriter writer = RDFUtil.newSparqlTupleWriter(format, out);
233 
234         try {
235             final AtomicLong result = new AtomicLong();
236             stream.toHandler(new Handler<BindingSet>() {
237 
238                 private boolean started = false;
239 
240                 private long count = 0L;
241 
242                 @Override
243                 public void handle(final BindingSet bindings) throws QueryResultHandlerException {
244                     if (!this.started) {
245                         @SuppressWarnings("unchecked")
246                         final List<String> variables = (List<String>) stream.getProperty(
247                                 PROPERTY_VARIABLES, Object.class);
248                         writer.startDocument();
249                         writer.startHeader();
250                         writer.startQueryResult(variables);
251                         this.started = true;
252                     }
253                     if (bindings != null) {
254                         writer.handleSolution(bindings);
255                         ++this.count;
256                     } else if (this.started) {
257                         writer.endQueryResult();
258                         result.set(this.count);
259                     }
260                 }
261 
262             });
263             return result.get();
264 
265         } catch (final Exception ex) {
266             throw Throwables.propagate(ex);
267 
268         } finally {
269             Util.closeQuietly(stream);
270         }
271     }
272 
273     public static Stream<BindingSet> readSparqlTuples(final TupleQueryResultFormat format,
274             final InputStream in) {
275 
276         // Create a parser for the specified format
277         final TupleQueryResultParser parser = newSparqlTupleParser(format);
278 
279         // Return a source over parsed bindings
280         final Map<String, String> mdc = Logging.getMDC();
281         return new Stream<BindingSet>() {
282 
283             @Override
284             protected void doToHandler(final Handler<? super BindingSet> handler) throws Throwable {
285                 final Map<String, String> oldMdc = Logging.getMDC();
286                 try {
287                     Logging.setMDC(mdc);
288                     parser.setQueryResultHandler(new TupleQueryResultHandlerBase() {
289 
290                         private CompactBindingSet.Builder builder;
291 
292                         @Override
293                         public void startQueryResult(final List<String> vars)
294                                 throws TupleQueryResultHandlerException {
295                             final List<String> variables = ImmutableList.copyOf(vars);
296                             setProperty(PROPERTY_VARIABLES, variables);
297                             this.builder = CompactBindingSet.builder(variables);
298                         }
299 
300                         @Override
301                         public void handleSolution(final BindingSet bindings)
302                                 throws TupleQueryResultHandlerException {
303                             if (bindings != null) {
304                                 emit(bindings);
305                             }
306                         }
307 
308                         @Override
309                         public void endQueryResult() throws TupleQueryResultHandlerException {
310                             emit(null);
311                         }
312 
313                         private void emit(final BindingSet bindings)
314                                 throws TupleQueryResultHandlerException {
315                             try {
316                                 BindingSet compactBindings = bindings;
317                                 if (bindings != null) {
318                                     this.builder.setAll(bindings);
319                                     compactBindings = this.builder.build();
320                                 }
321                                 handler.handle(compactBindings);
322                             } catch (final Throwable ex) {
323                                 Throwables.propagateIfPossible(ex,
324                                         TupleQueryResultHandlerException.class);
325                                 throw new TupleQueryResultHandlerException(ex);
326                             }
327                         }
328 
329                     });
330                     parser.parseQueryResult(in);
331                 } finally {
332                     Logging.setMDC(oldMdc);
333                 }
334             }
335 
336         };
337     }
338 
339     public static void writeSparqlBoolean(final BooleanQueryResultFormat format,
340             final OutputStream out, final boolean value) {
341 
342         final BooleanQueryResultWriter writer = newSparqlBooleanWriter(format, out);
343 
344         try {
345             writer.startDocument();
346             writer.startHeader();
347             writer.handleBoolean(value);
348 
349         } catch (final Exception ex) {
350             Throwables.propagate(ex);
351         }
352     }
353 
354     public static boolean readSparqlBoolean(final BooleanQueryResultFormat format,
355             final InputStream in) {
356 
357         final BooleanQueryResultParser parser = newSparqlBooleanReader(format);
358 
359         try {
360             final AtomicBoolean resultHolder = new AtomicBoolean();
361             parser.setQueryResultHandler(new TupleQueryResultHandlerBase() {
362 
363                 @Override
364                 public void handleBoolean(final boolean result) throws QueryResultHandlerException {
365                     resultHolder.set(result);
366                 }
367 
368             });
369             parser.parseQueryResult(in);
370             return resultHolder.get();
371 
372         } catch (final Exception ex) {
373             throw Throwables.propagate(ex);
374         }
375     }
376 
377     public static long writeRDF(final OutputStream out, final RDFFormat format,
378             @Nullable final Map<String, String> namespaces,
379             @Nullable final Map<? extends RioSetting<?>, ? extends Object> settings,
380             final Stream<? extends Statement> stream) {
381 
382         final Map<RioSetting<?>, Object> actualSettings = Maps.newHashMap();
383         if (settings != null) {
384             actualSettings.putAll(settings);
385         }
386         final Object types = stream.getProperty("types", Object.class);
387         if (types instanceof Set && !jsonldDisabled) {
388             try {
389                 actualSettings.put(JSONLD.ROOT_TYPES, types);
390             } catch (final Throwable ex) {
391                 jsonldDisabled = true; // rdfpro-jsonld not available
392             }
393         }
394 
395         try {
396             final RDFHandler handler = writeRDF(out, format, namespaces, actualSettings);
397             final AtomicLong result = new AtomicLong();
398             stream.toHandler(new Handler<Statement>() {
399 
400                 private boolean started = false;
401 
402                 private long count = 0L;
403 
404                 @Override
405                 public void handle(final Statement statement) throws RDFHandlerException {
406                     if (!this.started) {
407                         handler.startRDF();
408                         this.started = true;
409                     }
410                     if (statement != null) {
411                         handler.handleStatement(statement);
412                         ++this.count;
413                     } else if (this.started) {
414                         handler.endRDF();
415                         result.set(this.count);
416                     }
417                 }
418 
419             });
420             return result.get();
421 
422         } catch (final Exception ex) {
423             throw Throwables.propagate(ex);
424 
425         } finally {
426             Util.closeQuietly(stream);
427         }
428     }
429 
430     @SuppressWarnings({ "unchecked", "rawtypes" })
431     public static RDFHandler writeRDF(final OutputStream out, final RDFFormat format,
432             @Nullable final Map<String, String> namespaces,
433             @Nullable final Map<? extends RioSetting<?>, ? extends Object> settings)
434             throws IOException, RDFHandlerException {
435 
436         final RDFWriter writer = Rio.createWriter(format, out);
437 
438         final WriterConfig config = writer.getWriterConfig();
439         config.set(BasicWriterSettings.PRETTY_PRINT, true);
440         config.set(BasicWriterSettings.RDF_LANGSTRING_TO_LANG_LITERAL, true);
441         config.set(BasicWriterSettings.XSD_STRING_TO_PLAIN_LITERAL, true);
442 
443         if (format.equals(RDFFormat.RDFXML)) {
444             config.set(XMLWriterSettings.INCLUDE_XML_PI, true);
445             config.set(XMLWriterSettings.INCLUDE_ROOT_RDF_TAG, true);
446         }
447 
448         if (settings != null) {
449             for (final Map.Entry entry : settings.entrySet()) {
450                 config.set((RioSetting) entry.getKey(), entry.getValue());
451             }
452         }
453 
454         return namespaces == null ? writer : newNamespaceHandler(writer, namespaces, null);
455     }
456 
457     public static Stream<Statement> readRDF(final InputStream in, final RDFFormat format,
458             @Nullable final Map<String, String> namespaces, @Nullable final String base,
459             final boolean preserveBNodes) {
460 
461         final Map<String, String> mdc = Logging.getMDC();
462         return new Stream<Statement>() {
463 
464             @Override
465             protected void doToHandler(final Handler<? super Statement> handler) throws Throwable {
466                 final Map<String, String> oldMdc = Logging.getMDC();
467                 try {
468                     Logging.setMDC(mdc);
469                     final RDFHandler rdfHandler = new RDFHandlerBase() {
470 
471                         @Override
472                         public void handleStatement(final Statement statement)
473                                 throws RDFHandlerException {
474                             emit(statement);
475                         }
476 
477                         @Override
478                         public void endRDF() throws RDFHandlerException {
479                             emit(null);
480                         }
481 
482                         private void emit(final Statement statement) throws RDFHandlerException {
483                             try {
484                                 handler.handle(statement);
485                             } catch (final Throwable ex) {
486                                 Throwables.propagateIfPossible(ex, RDFHandlerException.class);
487                                 throw new RuntimeException(ex);
488                             }
489                         }
490 
491                     };
492                     readRDF(in, format, namespaces, base, preserveBNodes, rdfHandler);
493                 } finally {
494                     Logging.setMDC(oldMdc);
495                 }
496             }
497 
498         };
499     }
500 
501     public static void readRDF(final InputStream in, @Nullable final RDFFormat format,
502             @Nullable final Map<String, String> namespaces, @Nullable final String base,
503             final boolean preserveBNodes, final RDFHandler handler) throws IOException,
504             RDFParseException, RDFHandlerException {
505 
506         final RDFParser parser = Rio.createParser(format);
507         parser.setValueFactory(Data.getValueFactory());
508 
509         final ParserConfig config = parser.getParserConfig();
510         config.set(BasicParserSettings.FAIL_ON_UNKNOWN_DATATYPES, true);
511         config.set(BasicParserSettings.FAIL_ON_UNKNOWN_LANGUAGES, true);
512         config.set(BasicParserSettings.VERIFY_DATATYPE_VALUES, false);
513         config.set(BasicParserSettings.VERIFY_LANGUAGE_TAGS, true);
514         config.set(BasicParserSettings.VERIFY_RELATIVE_URIS, true);
515         config.set(BasicParserSettings.NORMALIZE_DATATYPE_VALUES, true);
516         config.set(BasicParserSettings.NORMALIZE_LANGUAGE_TAGS, true);
517         config.set(BasicParserSettings.PRESERVE_BNODE_IDS, preserveBNodes);
518 
519         if (format.equals(RDFFormat.NTRIPLES)) {
520             config.set(NTriplesParserSettings.FAIL_ON_NTRIPLES_INVALID_LINES, true);
521 
522         } else if (format.equals(RDFFormat.JSONLD)) {
523             // following parameters are currently ignored by used library
524             config.set(JSONLDSettings.COMPACT_ARRAYS, true);
525             config.set(JSONLDSettings.OPTIMIZE, true);
526             config.set(JSONLDSettings.USE_NATIVE_TYPES, false);
527             config.set(JSONLDSettings.USE_RDF_TYPE, false);
528             config.set(JSONLDSettings.JSONLD_MODE, JSONLDMode.COMPACT);
529 
530         } else if (format.equals(RDFFormat.RDFJSON)) {
531             config.set(RDFJSONParserSettings.FAIL_ON_MULTIPLE_OBJECT_DATATYPES, true);
532             config.set(RDFJSONParserSettings.FAIL_ON_MULTIPLE_OBJECT_LANGUAGES, true);
533             config.set(RDFJSONParserSettings.FAIL_ON_MULTIPLE_OBJECT_TYPES, true);
534             config.set(RDFJSONParserSettings.FAIL_ON_MULTIPLE_OBJECT_VALUES, true);
535             config.set(RDFJSONParserSettings.FAIL_ON_UNKNOWN_PROPERTY, true);
536             config.set(RDFJSONParserSettings.SUPPORT_GRAPHS_EXTENSION, true);
537 
538         } else if (format.equals(RDFFormat.TRIX)) {
539             config.set(TriXParserSettings.FAIL_ON_TRIX_INVALID_STATEMENT, true);
540             config.set(TriXParserSettings.FAIL_ON_TRIX_MISSING_DATATYPE, false);
541 
542         } else if (format.equals(RDFFormat.RDFXML)) {
543             config.set(XMLParserSettings.FAIL_ON_DUPLICATE_RDF_ID, true);
544             config.set(XMLParserSettings.FAIL_ON_INVALID_NCNAME, true);
545             config.set(XMLParserSettings.FAIL_ON_INVALID_QNAME, true);
546             config.set(XMLParserSettings.FAIL_ON_MISMATCHED_TAGS, true);
547             config.set(XMLParserSettings.FAIL_ON_NON_STANDARD_ATTRIBUTES, false);
548             config.set(XMLParserSettings.FAIL_ON_SAX_NON_FATAL_ERRORS, false);
549         }
550 
551         if (namespaces != null && parser instanceof RDFParserBase) {
552             try {
553                 final Field field = RDFParserBase.class.getDeclaredField("namespaceTable");
554                 field.setAccessible(true);
555                 field.set(parser, Data.newNamespaceMap(Data.newNamespaceMap(), namespaces));
556             } catch (final Throwable ex) {
557                 // ignore
558                 ex.printStackTrace();
559             }
560         }
561 
562         parser.setRDFHandler(handler);
563         parser.parse(in, Strings.nullToEmpty(base));
564     }
565 
566     public static void readRDF(final Map<File, ? extends RDFHandler> sources,
567             @Nullable final RDFFormat format, @Nullable final Map<String, String> namespaces,
568             @Nullable final String base, final boolean preserveBNodes,
569             @Nullable final Compression compression, final int parallelism) throws IOException,
570             RDFParseException, RDFHandlerException {
571 
572         // Sort files based on size, placing larger files first to better parallelism
573         final Map<File, RDFHandler> actualSources = Maps.newHashMap(sources);
574         final List<File> sortedFiles = Lists.newArrayList(sources.keySet());
575         Collections.sort(sortedFiles, new Comparator<File>() {
576 
577             @Override
578             public int compare(final File first, final File second) {
579                 if (first == null) {
580                     return second == null ? 0 : -1;
581                 } else {
582                     return second == null ? 1 : (int) (second.length() - first.length());
583                 }
584             }
585 
586         });
587 
588         // Compute parallelism degree as minimum of supplied value and available files
589         final int actualParallelism = Math.max(1, Math.min(parallelism, sortedFiles.size()));
590 
591         // If parallelism is not needed, just loop through the files using this thread
592         if (actualParallelism == 1) {
593             for (final File file : sortedFiles) {
594                 final RDFHandler handler = actualSources.get(file);
595                 readRDFHelper(file, format, namespaces, base, preserveBNodes, compression, handler);
596             }
597             return;
598         }
599 
600         // Allocate a latch to wait for threads to finish, and a variable to store exceptions
601         final AtomicReference<Throwable> exceptionHolder = new AtomicReference<Throwable>(null);
602         final CountDownLatch latch = new CountDownLatch(actualParallelism);
603 
604         // Parse the files using multiple threads until the list is empty or an error occurs
605         for (int i = 0; i < actualParallelism; ++i) {
606             Data.getExecutor().execute(new Runnable() {
607 
608                 @Override
609                 public void run() {
610                     try {
611                         while (exceptionHolder.get() == null) {
612                             final File file;
613                             final RDFHandler handler;
614                             synchronized (sortedFiles) {
615                                 if (sortedFiles.isEmpty() || exceptionHolder.get() != null) {
616                                     break;
617                                 }
618                                 file = sortedFiles.remove(0);
619                                 handler = actualSources.get(file);
620                             }
621                             readRDFHelper(file, format, namespaces, base, preserveBNodes,
622                                     compression, handler);
623                         }
624                     } catch (final Throwable ex) {
625                         exceptionHolder.set(ex);
626                     } finally {
627                         latch.countDown();
628                     }
629                 }
630 
631             });
632         }
633 
634         try {
635             latch.await();
636         } catch (final InterruptedException ex) {
637             // restore interrupted status
638             Thread.currentThread().interrupt();
639         }
640 
641         // Propagate an exception occurred during parsing
642         final Throwable ex = exceptionHolder.get();
643         if (ex != null) {
644             Throwables.propagateIfPossible(ex, IOException.class);
645             Throwables.propagateIfPossible(ex, RDFHandlerException.class);
646             Throwables.propagateIfPossible(ex, RDFParseException.class);
647             throw new RuntimeException(ex);
648         }
649     }
650 
651     private static void readRDFHelper(@Nullable final File file, @Nullable final RDFFormat format,
652             @Nullable final Map<String, String> namespaces, @Nullable final String base,
653             final boolean preserveBNodes, @Nullable final Compression compression,
654             final RDFHandler handler) throws IOException, RDFParseException, RDFHandlerException {
655 
656         // Detect file format
657         RDFFormat actualFormat = format;
658         if (actualFormat == null) {
659             if (file == null) {
660                 throw new IllegalArgumentException("Cannot detect RDF format of STDIN");
661             }
662             actualFormat = RDFFormat.forFileName(file.getName());
663         }
664 
665         // Detect file compression
666         Compression actualCompression = compression;
667         if (actualCompression == null) {
668             actualCompression = file == null ? Compression.NONE : Compression.forFileName(
669                     file.getName(), Compression.NONE);
670         }
671 
672         // Perform parsing, wrapping possible exceptions so to report the file name
673         InputStream stream = null;
674         try {
675             stream = file == null ? System.in : actualCompression.read(Data.getExecutor(), file);
676             readRDF(stream, actualFormat, namespaces, base, preserveBNodes, handler);
677 
678         } catch (final Throwable ex) {
679             final String message = "Parsing of " + (file == null ? "STDIN" : file)
680                     + " using format " + actualFormat + " and compression " + actualCompression
681                     + " failed: " + ex.getMessage();
682             if (ex instanceof IOException) {
683                 throw new IOException(message, ex);
684             } else if (ex instanceof RDFParseException) {
685                 throw new RDFParseException(message, ex);
686             } else if (ex instanceof RDFHandlerException) {
687                 throw new RDFHandlerException(message, ex);
688             }
689             throw new RuntimeException(message, ex);
690         } finally {
691             if (stream != System.in) {
692                 Util.closeQuietly(stream);
693             }
694         }
695     }
696 
697     public static TupleQueryResultWriter newSparqlTupleWriter(final TupleQueryResultFormat format,
698             final OutputStream stream) {
699 
700         final TupleQueryResultWriter writer = QueryResultIO.createWriter(format, stream);
701 
702         final WriterConfig config = writer.getWriterConfig();
703         if (format.equals(TupleQueryResultFormat.JSON)) {
704             config.set(BasicWriterSettings.PRETTY_PRINT, true);
705             config.set(BasicWriterSettings.XSD_STRING_TO_PLAIN_LITERAL, true);
706             config.set(BasicWriterSettings.RDF_LANGSTRING_TO_LANG_LITERAL, true);
707 
708         } else if (format.equals(TupleQueryResultFormat.SPARQL)) {
709             config.set(BasicWriterSettings.PRETTY_PRINT, true);
710             config.set(BasicWriterSettings.XSD_STRING_TO_PLAIN_LITERAL, true);
711             config.set(BasicWriterSettings.RDF_LANGSTRING_TO_LANG_LITERAL, true);
712             config.set(BasicQueryWriterSettings.ADD_SESAME_QNAME, false);
713         }
714 
715         return writer;
716     }
717 
718     public static TupleQueryResultParser newSparqlTupleParser(final TupleQueryResultFormat format) {
719 
720         final TupleQueryResultParser parser = QueryResultIO.createParser(format);
721         parser.setValueFactory(CompactValueFactory.getInstance());
722 
723         return parser;
724     }
725 
726     public static BooleanQueryResultWriter newSparqlBooleanWriter(
727             final BooleanQueryResultFormat format, final OutputStream stream) {
728 
729         final BooleanQueryResultWriter writer = QueryResultIO.createWriter(format, stream);
730 
731         final WriterConfig config = writer.getWriterConfig();
732         if (format.equals(BooleanQueryResultFormat.JSON)) {
733             config.set(BasicWriterSettings.PRETTY_PRINT, true);
734             config.set(BasicWriterSettings.XSD_STRING_TO_PLAIN_LITERAL, true);
735             config.set(BasicWriterSettings.RDF_LANGSTRING_TO_LANG_LITERAL, true);
736 
737         } else if (format.equals(TupleQueryResultFormat.SPARQL)) {
738             config.set(BasicWriterSettings.PRETTY_PRINT, true);
739             config.set(BasicWriterSettings.XSD_STRING_TO_PLAIN_LITERAL, true);
740             config.set(BasicWriterSettings.RDF_LANGSTRING_TO_LANG_LITERAL, true);
741             config.set(BasicQueryWriterSettings.ADD_SESAME_QNAME, false);
742         }
743 
744         return writer;
745     }
746 
747     public static BooleanQueryResultParser newSparqlBooleanReader(
748             final BooleanQueryResultFormat format) {
749 
750         final BooleanQueryResultParser parser = QueryResultIO.createParser(format);
751         parser.setValueFactory(Data.getValueFactory());
752 
753         return parser;
754     }
755 
756     public static RDFHandler newMergingHandler(final RDFHandler handler) {
757         return new MergingHandler(handler);
758     }
759 
760     public static RDFHandler newDecouplingHandler(final RDFHandler handler,
761             @Nullable final Integer queueSize) {
762         return new DecouplingHandler(handler, queueSize);
763     }
764 
765     public static RDFHandler newNamespaceHandler(final RDFHandler handler,
766             final Map<String, String> namespaces, @Nullable final Integer bufferSize) {
767         return new NamespaceHandler(handler, namespaces, bufferSize);
768     }
769 
770     public static RDFHandler newLoggingHandler(final RDFHandler handler, final Logger logger,
771             @Nullable final String startMessage, @Nullable final String progressMessage,
772             @Nullable final String endMessage) {
773 
774         Preconditions.checkNotNull(handler);
775         Preconditions.checkNotNull(logger);
776         if (startMessage == null && progressMessage == null && endMessage == null) {
777             return handler;
778         } else {
779             return new LoggingHandler(handler, logger, startMessage, progressMessage, endMessage);
780         }
781     }
782 
783     private static final class MergingHandler implements RDFHandler {
784 
785         private final RDFHandler handler;
786 
787         private int depth;
788 
789         MergingHandler(final RDFHandler handler) {
790             this.handler = Preconditions.checkNotNull(handler);
791             this.depth = 0;
792         }
793 
794         @Override
795         public synchronized void startRDF() throws RDFHandlerException {
796             if (this.depth == 0) {
797                 this.handler.startRDF();
798             }
799             ++this.depth;
800         }
801 
802         @Override
803         public synchronized void handleComment(final String comment) throws RDFHandlerException {
804             this.handler.handleComment(comment);
805         }
806 
807         @Override
808         public synchronized void handleNamespace(final String prefix, final String uri)
809                 throws RDFHandlerException {
810             this.handler.handleNamespace(prefix, uri);
811         }
812 
813         @Override
814         public synchronized void handleStatement(final Statement statement)
815                 throws RDFHandlerException {
816             this.handler.handleStatement(statement);
817         }
818 
819         @Override
820         public synchronized void endRDF() throws RDFHandlerException {
821             --this.depth;
822             if (this.depth == 0) {
823                 this.handler.endRDF();
824             }
825         }
826 
827     }
828 
829     private static final class DecouplingHandler implements RDFHandler {
830 
831         private static final int DEFAULT_QUEUE_SIZE = 1024;
832 
833         private static final Object EOF = new Object();
834 
835         private final RDFHandler handler;
836 
837         private final int queueSize;
838 
839         private BlockingQueue<Object> queue;
840 
841         private AtomicReference<Throwable> exception;
842 
843         private Future<?> future;
844 
845         private int depth;
846 
847         DecouplingHandler(final RDFHandler handler, @Nullable final Integer queueSize) {
848             this.handler = Preconditions.checkNotNull(handler);
849             this.queueSize = MoreObjects.firstNonNull(queueSize, DEFAULT_QUEUE_SIZE);
850             this.queue = null;
851             this.exception = null;
852             this.future = null;
853             this.depth = 0;
854         }
855 
856         @Override
857         public synchronized void startRDF() throws RDFHandlerException {
858 
859             // Accept nested startRDF/endRDF calls
860             if (this.depth++ > 0) {
861                 return;
862             }
863 
864             // Initialize queue and exception holder
865             this.queue = new ArrayBlockingQueue<Object>(this.queueSize);
866             this.exception = new AtomicReference<Throwable>(null);
867 
868             // Run a background task to move comments, namespaces and statements off the queue and
869             // forward it to the wrapped handler
870             this.future = Data.getExecutor().submit(new Runnable() {
871 
872                 @Override
873                 public void run() {
874                     Object object;
875                     try {
876                         DecouplingHandler.this.handler.startRDF();
877                         while ((object = DecouplingHandler.this.queue.take()) != EOF) {
878                             if (object instanceof Statement) {
879                                 DecouplingHandler.this.handler.handleStatement((Statement) object);
880                             } else if (object instanceof Namespace) {
881                                 final Namespace ns = (Namespace) object;
882                                 DecouplingHandler.this.handler.handleNamespace(ns.getPrefix(),
883                                         ns.getName());
884                             } else if (object instanceof String) {
885                                 DecouplingHandler.this.handler.handleComment((String) object);
886                             }
887                         }
888                         DecouplingHandler.this.handler.endRDF();
889                     } catch (final Throwable ex) {
890                         DecouplingHandler.this.exception.set(ex);
891                     }
892                 }
893 
894             });
895         }
896 
897         @Override
898         public void handleComment(final String comment) throws RDFHandlerException {
899 
900             // Enqueue comment and propagate exceptions from background task, if any
901             put(comment);
902             propagateOnFailure();
903         }
904 
905         @Override
906         public void handleNamespace(final String prefix, final String uri)
907                 throws RDFHandlerException {
908 
909             // Enqueue namespace and propagate exceptions from background task, if any
910             put(new NamespaceImpl(prefix, uri));
911             propagateOnFailure();
912         }
913 
914         @Override
915         public void handleStatement(final Statement statement) throws RDFHandlerException {
916 
917             // Enqueue statement and propagate exceptions from background task, if any
918             put(statement);
919             propagateOnFailure();
920         }
921 
922         @Override
923         public synchronized void endRDF() throws RDFHandlerException {
924 
925             // Accept nested startRDF/endRDF calls
926             if (--this.depth > 0) {
927                 return;
928             }
929 
930             // Signal end of RDF
931             put(EOF);
932 
933             // Wait for the background task to complete
934             try {
935                 this.future.get();
936             } catch (final Throwable ex) {
937                 Throwables.propagateIfPossible(ex, RDFHandlerException.class);
938                 Throwables.propagate(ex);
939             }
940 
941             // Propagate exception from background task, if any
942             propagateOnFailure();
943         }
944 
945         private void put(final Object object) throws RDFHandlerException {
946             try {
947                 this.queue.put(object);
948             } catch (final InterruptedException ex) {
949                 throw new RDFHandlerException(ex);
950             }
951         }
952 
953         private void propagateOnFailure() throws RDFHandlerException {
954             final Throwable ex = this.exception.get();
955             if (ex != null) {
956                 Throwables.propagateIfPossible(ex, RDFHandlerException.class);
957                 Throwables.propagate(ex);
958             }
959         }
960 
961     }
962 
963     private static final class NamespaceHandler implements RDFHandler {
964 
965         private static final int DEFAULT_BUFFER_SIZE = 1024;
966 
967         private final RDFHandler handler;
968 
969         private final Map<String, String> namespaces;
970 
971         private final int bufferSize;
972 
973         private List<Statement> buffer;
974 
975         private boolean buffering;
976 
977         private Map<String, String> bindings;
978 
979         NamespaceHandler(final RDFHandler handler, final Map<String, String> namespaces,
980                 @Nullable final Integer bufferSize) {
981             this.handler = Preconditions.checkNotNull(handler);
982             this.namespaces = Preconditions.checkNotNull(namespaces);
983             this.bufferSize = MoreObjects.firstNonNull(bufferSize, DEFAULT_BUFFER_SIZE);
984             this.buffer = null;
985             this.buffering = false;
986             this.bindings = null;
987         }
988 
989         @Override
990         public void startRDF() throws RDFHandlerException {
991             this.bindings = Maps.newHashMap();
992             this.buffer = Lists.newArrayListWithCapacity(this.bufferSize);
993             this.buffering = true;
994             this.handler.startRDF();
995         }
996 
997         @Override
998         public void handleComment(final String comment) throws RDFHandlerException {
999             flush();
1000             this.handler.handleComment(comment);
1001         }
1002 
1003         @Override
1004         public void handleNamespace(final String prefix, final String uri)
1005                 throws RDFHandlerException {
1006             if (this.buffering) {
1007                 this.bindings.put(uri, prefix);
1008             }
1009         }
1010 
1011         @Override
1012         public void handleStatement(final Statement statement) throws RDFHandlerException {
1013             if (this.buffering) {
1014                 extractNamespace(statement.getSubject());
1015                 extractNamespace(statement.getPredicate());
1016                 extractNamespace(statement.getObject());
1017                 extractNamespace(statement.getContext());
1018                 this.buffer.add(statement);
1019                 if (this.buffer.size() == this.bufferSize) {
1020                     flush();
1021                 }
1022             } else {
1023                 this.handler.handleStatement(statement);
1024             }
1025         }
1026 
1027         @Override
1028         public void endRDF() throws RDFHandlerException {
1029             flush();
1030             this.handler.endRDF();
1031         }
1032 
1033         private void extractNamespace(final Value value) {
1034             if (value instanceof URI) {
1035                 final String ns = ((URI) value).getNamespace();
1036                 this.bindings.put(ns, this.bindings.get(ns));
1037             } else if (value instanceof Literal) {
1038                 extractNamespace(((Literal) value).getDatatype());
1039             }
1040         }
1041 
1042         private void flush() throws RDFHandlerException {
1043             if (!this.buffering) {
1044                 return;
1045             }
1046             for (final String namespace : Ordering.natural().sortedCopy(this.bindings.keySet())) {
1047                 String prefix = this.bindings.get(namespace);
1048                 if (prefix == null) {
1049                     prefix = Data.namespaceToPrefix(namespace, this.namespaces);
1050                 }
1051                 if (prefix != null) {
1052                     this.handler.handleNamespace(prefix, namespace);
1053                 }
1054             }
1055             for (final Statement statement : this.buffer) {
1056                 this.handler.handleStatement(statement);
1057             }
1058             this.bindings = null;
1059             this.buffer = null;
1060             this.buffering = false;
1061         }
1062 
1063     }
1064 
1065     private static final class LoggingHandler implements RDFHandler {
1066 
1067         private final RDFHandler handler;
1068 
1069         @Nullable
1070         private final Logger logger;
1071 
1072         @Nullable
1073         private final String startMessage;
1074 
1075         @Nullable
1076         private final String progressMessage;
1077 
1078         @Nullable
1079         private final String endMessage;
1080 
1081         private long totalTs;
1082 
1083         private long totalCounter = 0;
1084 
1085         private long lastTs;
1086 
1087         private long lastCounter = 0;
1088 
1089         LoggingHandler(final RDFHandler handler, final Logger logger,
1090                 @Nullable final String startMessage, @Nullable final String progressMessage,
1091                 @Nullable final String endMessage) {
1092             this.handler = Preconditions.checkNotNull(handler);
1093             this.logger = logger;
1094             this.startMessage = startMessage;
1095             this.progressMessage = progressMessage;
1096             this.endMessage = endMessage;
1097         }
1098 
1099         @Override
1100         public void startRDF() throws RDFHandlerException {
1101             this.handler.startRDF();
1102             this.totalTs = System.currentTimeMillis();
1103             this.lastTs = this.totalTs;
1104             if (this.startMessage != null) {
1105                 this.logger.info(this.startMessage);
1106             }
1107         }
1108 
1109         @Override
1110         public void handleComment(final String comment) throws RDFHandlerException {
1111             this.handler.handleComment(comment);
1112         }
1113 
1114         @Override
1115         public void handleNamespace(final String prefix, final String uri)
1116                 throws RDFHandlerException {
1117             this.handler.handleNamespace(prefix, uri);
1118         }
1119 
1120         @Override
1121         public void handleStatement(final Statement statement) throws RDFHandlerException {
1122             this.handler.handleStatement(statement);
1123             ++this.totalCounter;
1124             if (this.progressMessage != null && this.totalCounter % 1000 == 0) {
1125                 final long ts = System.currentTimeMillis();
1126                 if (ts - this.lastTs >= 1000) {
1127                     final long throughput = (this.totalCounter - this.lastCounter) * 1000
1128                             / (ts - this.lastTs);
1129                     final long avgThroughput = this.totalCounter * 1000 / (ts - this.totalTs);
1130                     this.lastTs = ts;
1131                     this.lastCounter = this.totalCounter;
1132                     this.logger.info(String.format(this.progressMessage, this.totalCounter,
1133                             throughput, avgThroughput));
1134                 }
1135             }
1136         }
1137 
1138         @Override
1139         public void endRDF() throws RDFHandlerException {
1140             if (this.endMessage != null) {
1141                 final long ts = System.currentTimeMillis();
1142                 final long avgThroughput = this.totalCounter * 1000 / (ts - this.totalTs + 1);
1143                 this.logger.info(String.format(this.endMessage, this.totalCounter, avgThroughput));
1144             }
1145             this.handler.endRDF();
1146         }
1147 
1148     }
1149 
1150     private RDFUtil() {
1151     }
1152 
1153     {
1154         TQL.register();
1155         System.setProperty("entityExpansionLimit", "" + Integer.MAX_VALUE);
1156     }
1157 
1158 }