1   package eu.fbk.knowledgestore.populator.rdf;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.io.InputStreamReader;
6   import java.io.OutputStream;
7   import java.io.PrintStream;
8   import java.io.PrintWriter;
9   import java.util.Collection;
10  import java.util.Comparator;
11  import java.util.List;
12  import java.util.Map;
13  
14  import javax.annotation.Nullable;
15  
16  import com.google.common.base.Charsets;
17  import com.google.common.base.Function;
18  import com.google.common.base.Throwables;
19  import com.google.common.collect.ImmutableList;
20  import com.google.common.collect.ImmutableSet;
21  import com.google.common.collect.Iterables;
22  import com.google.common.collect.Lists;
23  import com.google.common.collect.Maps;
24  import com.google.common.io.ByteStreams;
25  import com.google.common.io.CharStreams;
26  import com.google.common.io.Files;
27  
28  import org.apache.commons.cli.CommandLine;
29  import org.apache.commons.cli.GnuParser;
30  import org.apache.commons.cli.HelpFormatter;
31  import org.apache.commons.cli.Option;
32  import org.apache.commons.cli.OptionBuilder;
33  import org.apache.commons.cli.Options;
34  import org.apache.commons.cli.ParseException;
35  import org.openrdf.model.Statement;
36  import org.openrdf.model.URI;
37  import org.openrdf.rio.RDFFormat;
38  import org.openrdf.rio.RDFHandler;
39  import org.openrdf.rio.RDFHandlerException;
40  import org.openrdf.rio.helpers.RDFHandlerBase;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import ch.qos.logback.classic.LoggerContext;
45  import ch.qos.logback.classic.joran.JoranConfigurator;
46  import ch.qos.logback.core.joran.spi.JoranException;
47  
48  import eu.fbk.knowledgestore.Operation;
49  import eu.fbk.knowledgestore.Outcome;
50  import eu.fbk.knowledgestore.Session;
51  import eu.fbk.knowledgestore.client.Client;
52  import eu.fbk.knowledgestore.data.Criteria;
53  import eu.fbk.knowledgestore.data.Data;
54  import eu.fbk.knowledgestore.data.Handler;
55  import eu.fbk.knowledgestore.data.Record;
56  import eu.fbk.knowledgestore.data.Stream;
57  import eu.fbk.knowledgestore.internal.Compression;
58  import eu.fbk.knowledgestore.internal.Util;
59  import eu.fbk.knowledgestore.internal.rdf.RDFUtil;
60  import eu.fbk.knowledgestore.vocabulary.CKR;
61  import eu.fbk.knowledgestore.vocabulary.KS;
62  
63  public final class RDFPopulator {
64  
65      private static final String VERSION = Util.getVersion("eu.fbk.knowledgestore",
66              "ks-populator-rdf", "devel");
67  
68      private static final String HEADER = Util.getResource(RDFPopulator.class, "header").trim();
69  
70      private static final String FOOTER = Util.getResource(RDFPopulator.class, "footer").trim();
71  
72      private static final String DISCLAIMER = Util.getResource(RDFPopulator.class, "disclaimer")
73              .trim();
74  
75      private static final Logger MAIN_LOGGER = LoggerFactory.getLogger(RDFPopulator.class);
76  
77      private static final Logger STATUS_LOGGER = LoggerFactory.getLogger("status");
78  
79      public static void main(final String... args) {
80          try {
81              // Parse command line, handling -h and -v commands
82              final CommandLine cmd = parseCommandLine(args);
83  
84              // Extract command line options
85              final String base = cmd.getOptionValue('b');
86              final int parallelism = !cmd.hasOption('p') ? 1 : //
87                      Integer.parseInt(cmd.getOptionValue('p'));
88              final boolean listStdin = cmd.hasOption('@');
89              final String listFile = cmd.getOptionValue('T');
90              final List<String> sourceFiles = cmd.getArgList();
91              final boolean sourceStdin = !cmd.hasOption('@') && !cmd.hasOption('T')
92                      && cmd.getArgs().length == 0;
93              final String sourceFormat = cmd.getOptionValue('s');
94              final String errorFile = cmd.getOptionValue('e');
95              final String target = cmd.getOptionValue('o');
96              final String targetFormat = cmd.getOptionValue('t');
97              final boolean validate = !cmd.hasOption('i');
98              final Criteria criteria = !cmd.hasOption('c') ? Criteria.overwrite() : //
99                      Criteria.parse(cmd.getOptionValue('c'), Data.getNamespaceMap());
100             final URI globalURI = cmd.hasOption('g') ? (URI) Data.parseValue(
101                     cmd.getOptionValue('g'), Data.getNamespaceMap()) : CKR.GLOBAL;
102             final String credentials = cmd.getOptionValue('u');
103 
104             // Split username / password
105             String username = null;
106             String password = null;
107             if (credentials != null) {
108                 final int index = credentials.indexOf(':');
109                 username = credentials.substring(0, index < 0 ? credentials.length() : index);
110                 password = index < 0 ? null : credentials.substring(index + 1);
111             }
112 
113             // Select input files based on supplied options and arguments
114             final List<File> sources = select(listStdin, listFile, sourceFiles, sourceStdin);
115             for (final File file : sources) {
116                 checkFileParseable(file, sourceFormat);
117             }
118 
119             // Setup axiom decoding
120             final Stream<Record> axioms = decode(sources, globalURI, parallelism, base,
121                     sourceFormat);
122 
123             // Handle 3 cases based on option -o
124             if (target == null) {
125                 // (1) emit axioms to STDOUT
126                 disableLogging();
127                 final OutputStream out = System.out;
128                 System.setOut(new PrintStream(ByteStreams.nullOutputStream()));
129                 write(axioms, out, targetFormat);
130 
131             } else if (!target.startsWith("http://") && !target.startsWith("https://")) {
132                 // (2) emit axioms to FILE
133                 write(axioms, new File(target), targetFormat);
134 
135             } else {
136                 // (3) upload axioms to KS, emit rejected axioms to FILE / STDERR
137                 Session session = null;
138                 final Client client = Client.builder(target).maxConnections(2)
139                         .validateServer(validate).build();
140                 try {
141                     session = client.newSession(username, password);
142                     final Stream<Record> rejected = upload(session, criteria, axioms);
143                     if (errorFile == null) {
144                         write(rejected, System.err, targetFormat);
145                     } else {
146                         write(rejected, new File(errorFile), targetFormat);
147                     }
148                 } finally {
149                     Util.closeQuietly(session);
150                     client.close();
151                 }
152             }
153 
154             // Signal success
155             System.exit(0);
156 
157         } catch (final IllegalArgumentException ex) {
158             // Signal wrong user input
159             ex.printStackTrace();
160             System.err.println("INVALID INPUT. " + ex.getMessage());
161             System.exit(-1);
162 
163         } catch (final ParseException ex) {
164             // Signal syntax error
165             System.err.println("SYNTAX ERROR. " + ex.getMessage());
166             System.exit(-1);
167 
168         } catch (final Throwable ex) {
169             // Signal other error
170             System.err.println("EXECUTION FAILED. " + ex.getMessage() + "\n");
171             ex.printStackTrace();
172             System.exit(-2);
173         }
174     }
175 
176     private static CommandLine parseCommandLine(final String... args) throws ParseException {
177 
178         // Define input options
179         final List<Option> inputOpts = Lists.newArrayList();
180         newOption(inputOpts, '@', "files-from-stdin", 0, false, null,
181                 "read names of input files from STDIN");
182         newOption(inputOpts, 'T', "files-from", 1, false, "FILE",
183                 "read names of input files from FILE");
184         newOption(inputOpts, 's', "source-format", 1, false, "FMT",
185                 "use input RDF format/compression FMT (eg: ttl.gz; default: "
186                         + "autodetect based on file name)");
187         newOption(inputOpts, 'b', "base", 1, false, "URI",
188                 "base URI for resolving parsed relative URIs");
189         newOption(inputOpts, 'p', "parallel-files", 1, false, "N",
190                 "parse at most N files in parallel (default: 1)");
191 
192         // Define extraction options
193         final List<Option> extractOpts = Lists.newArrayList();
194         newOption(extractOpts, 'g', "global-uri", 1, false, "URI",
195                 "use URI in place of ckr:global (default: ckr:global)");
196         newOption(extractOpts, 'd', "default", 1, false, "FILE",
197                 "augment axioms with default metadata/context in FILE");
198 
199         // Define output options
200         final List<Option> outputOpts = Lists.newArrayList();
201         newOption(outputOpts, 'o', "output", 1, false, "FILE|URL",
202                 "send axioms to FILE | server URL (default: STDOUT)");
203         newOption(outputOpts, 'e', "error", 1, false, "FILE",
204                 "write non-uploaded axioms to FILE (default: STDERR)");
205         newOption(outputOpts, 't', "target-format", 1, false, "FMT",
206                 "use output file RDF format/compression FMT (e.g., ttl.gz; "
207                         + "default: autodetect based on file name)");
208         newOption(outputOpts, 'u', "user", 1, false, "user[:pwd]",
209                 "upload using login user:pwd (default: anonymous)");
210         newOption(outputOpts, 'i', "ignore-certificate", 0, false, null,
211                 "don't check server certificate (default: check)");
212         newOption(outputOpts, 'c', "criteria", 1, false, "C",
213                 "upload with merge criteria C (default: overwrite *)");
214         // -U|--proxy-user "user[:password]" proxy
215         // -x|--proxy host:port
216 
217         // Define miscellaneous options
218         final List<Option> miscOpts = Lists.newArrayList();
219         newOption(miscOpts, 'h', "help", 0, false, null, "print this help message and exit");
220         newOption(miscOpts, 'v', "version", 0, false, null, "print version information and exit");
221 
222         // Define combined option list
223         final List<Option> allOpts = ImmutableList.copyOf(Iterables.concat(inputOpts, extractOpts,
224                 outputOpts, miscOpts));
225 
226         // Parse command line
227         final CommandLine cmd = new GnuParser().parse(newOptions(allOpts), args);
228 
229         // Handle help and version commands
230         if (cmd.hasOption('h')) {
231             final HelpFormatter formatter = new HelpFormatter();
232             formatter.setOptionComparator(new Comparator<Option>() {
233 
234                 @Override
235                 public int compare(final Option option1, final Option option2) {
236                     return allOpts.indexOf(option1) - allOpts.indexOf(option2);
237                 }
238 
239             });
240             final PrintWriter out = new PrintWriter(System.out);
241             formatter.printUsage(out, 80, "ksrdf [-o URL|FILE] [OPTIONS] [INPUT_FILE ...]");
242             out.println();
243             formatter.printWrapped(out, 80, HEADER);
244             formatter.printWrapped(out, 80, DISCLAIMER);
245             out.println("\nInput options:");
246             formatter.printOptions(out, 80, newOptions(inputOpts), 2, 2);
247             out.println("\nExtraction options:");
248             formatter.printOptions(out, 80, newOptions(extractOpts), 2, 5);
249             out.println("\nOutput options:");
250             formatter.printOptions(out, 80, newOptions(outputOpts), 2, 2);
251             out.println("\nMiscellaneous options:");
252             formatter.printOptions(out, 80, newOptions(miscOpts), 2, 14);
253             out.println();
254             out.println(FOOTER);
255             out.flush();
256             System.exit(0);
257 
258         } else if (cmd.hasOption('v')) {
259             System.out.println(String.format(
260                     "ksrdf (FBK KnowledgeStore) %s\njava %s bit (%s) %s\n%s", VERSION,
261                     System.getProperty("sun.arch.data.model"), System.getProperty("java.vendor"),
262                     System.getProperty("java.version"), DISCLAIMER));
263             System.exit(0);
264         }
265 
266         // Return parsed options
267         return cmd;
268     }
269 
270     private static List<File> select(final boolean listStdin, final String listFile,
271             final List<String> sourceFiles, final boolean sourceStdin) throws IOException {
272 
273         // Extract file names from non-option command line arguments
274         final List<String> inputs = Lists.newArrayList(sourceFiles);
275 
276         // Extract file names from the file pointed by option -T, if any
277         if (listFile != null) {
278             final File file = new File(listFile);
279             checkFileExist(file);
280             for (final String line : Files.readLines(file, Charsets.UTF_8)) {
281                 final String trimmedLine = line.trim();
282                 if (!"".equals(trimmedLine)) {
283                     inputs.add(line);
284                 }
285             }
286         }
287 
288         // Extract file names from STDIN, if option -@ has been specified
289         if (listStdin) {
290             for (final String line : CharStreams.readLines(new InputStreamReader(System.in))) {
291                 final String trimmedLine = line.trim();
292                 if (!"".equals(trimmedLine)) {
293                     inputs.add(line);
294                 }
295             }
296         }
297 
298         // Convert to File objects and return the result
299         final List<File> files = Lists.newArrayListWithCapacity(inputs.size());
300         for (final String input : inputs) {
301             files.add(new File(input));
302         }
303 
304         // Add null in case STDIN should be included
305         if (sourceStdin) {
306             files.add(null);
307         }
308         return files;
309     }
310 
311     private static Stream<Record> decode(final List<File> files, final URI globalURI,
312             final int parallelism, @Nullable final String base, //
313             @Nullable final String formatString) {
314 
315         // Determine source RDF format and compression based on format string
316         final Compression compression = detectCompression(formatString, null);
317         final RDFFormat format = detectRDFFormat(formatString, null);
318 
319         // Return a stream that read input RDF and decodes contained axioms
320         return new Stream<Record>() {
321 
322             @Override
323             protected void doToHandler(final Handler<? super Record> handler) throws Throwable {
324 
325                 // Create the decoder
326                 final Decoder decoder = new Decoder(handler, globalURI);
327 
328                 // Wrap the decoder in a RDFHandler
329                 RDFHandler rdfHandler = new RDFHandlerBase() {
330 
331                     @Override
332                     public void handleStatement(final Statement stmt) throws RDFHandlerException {
333                         emit(stmt);
334                     }
335 
336                     @Override
337                     public void endRDF() throws RDFHandlerException {
338                         emit(null);
339                     }
340 
341                     private void emit(@Nullable final Statement stmt) throws RDFHandlerException {
342                         try {
343                             decoder.handle(stmt);
344                         } catch (final Throwable ex) {
345                             Throwables.propagateIfPossible(ex, RDFHandlerException.class);
346                             Throwables.propagate(ex);
347                         }
348                     }
349 
350                 };
351 
352                 // Add logging
353                 rdfHandler = RDFUtil.newLoggingHandler(rdfHandler, STATUS_LOGGER, null,
354                         "parsing: %d triples (%d triples/s, %d triples/s avg)", null);
355 
356                 // Add decoupling queue to parallelize parsing and encoding
357                 rdfHandler = RDFUtil.newDecouplingHandler(rdfHandler, null);
358 
359                 // Perform parallel parsing.
360                 final Map<File, RDFHandler> map = Maps.newLinkedHashMap();
361                 for (final File file : files) {
362                     final RDFHandler fileHandler = RDFUtil.newLoggingHandler(rdfHandler,
363                             MAIN_LOGGER, null, null,
364                             "parsed " + (file == null ? "STDIN" : file.getAbsolutePath())
365                                     + ": %d triples, (%d triples/s avg)");
366                     map.put(file, fileHandler);
367                 }
368                 rdfHandler.startRDF();
369                 RDFUtil.readRDF(map, format, null, base, false, compression, parallelism);
370                 rdfHandler.endRDF();
371                 STATUS_LOGGER.info("");
372             }
373 
374         };
375     }
376 
377     private static Stream<Record> upload(final Session session, final Criteria criteria,
378             final Stream<Record> axioms) {
379 
380         return axioms.transform(null, new Function<Handler<Record>, Handler<Record>>() {
381 
382             @Override
383             public Handler<Record> apply(final Handler<Record> handler) {
384                 return new UploadHandler(session, criteria, handler);
385             }
386 
387         });
388     }
389 
390     private static void write(final Stream<Record> axioms, final OutputStream stream,
391             @Nullable final String formatString) throws IOException {
392 
393         // Determine target RDF format and compression based on format string
394         final Compression compression = detectCompression(formatString, Compression.NONE);
395         final RDFFormat format = detectRDFFormat(formatString, null);
396         if (format == null) {
397             if (formatString == null) {
398                 throw new IllegalArgumentException(
399                         "Must specify output format (-t) if writing to STDOUT");
400             } else {
401                 throw new IllegalArgumentException("Cannot detect RDF format for " + formatString);
402             }
403         }
404 
405         // Setup compression, if necessary
406         final OutputStream actualStream = compression.write(Data.getExecutor(), stream);
407 
408         // Performs writing
409         RDFUtil.writeRDF(actualStream, format, Data.getNamespaceMap(), null,
410                 Record.encode(axioms, ImmutableSet.of(KS.AXIOM)));
411     }
412 
413     private static void write(final Stream<Record> axioms, final File file,
414             @Nullable final String formatString) throws IOException {
415 
416         // Determine target RDF format and compression based on format string
417         Compression compression = detectCompression(file.getName(), null);
418         if (compression == null) {
419             compression = detectCompression(formatString, Compression.NONE);
420         }
421         RDFFormat format = detectRDFFormat(file.getName(), null);
422         if (format == null) {
423             format = detectRDFFormat(formatString, null);
424         }
425         if (format == null) {
426             throw new IllegalArgumentException("Cannot detect RDF format of " + file);
427         }
428 
429         // Setup compression, if necessary
430         final OutputStream actualStream = compression.write(Data.getExecutor(), file);
431 
432         // Performs writing
433         try {
434             RDFUtil.writeRDF(actualStream, format, Data.getNamespaceMap(), null,
435                     Record.encode(axioms, ImmutableSet.of(KS.AXIOM)));
436         } finally {
437             Util.closeQuietly(actualStream);
438         }
439     }
440 
441     private static Options newOptions(final Iterable<? extends Option> options) {
442         final Options result = new Options();
443         for (final Object option : options) {
444             result.addOption((Option) option);
445         }
446         return result;
447     }
448 
449     private static void newOption(final Collection<? super Option> options,
450             @Nullable final Character shortName, final String longName, final int argCount,
451             final boolean argOpt, @Nullable final String argName, final String description) {
452 
453         OptionBuilder.withLongOpt(longName);
454         OptionBuilder.withDescription(description);
455         if (argCount != 0) {
456             OptionBuilder.withArgName(argName);
457             if (argOpt) {
458                 if (argCount == 1) {
459                     OptionBuilder.hasOptionalArg();
460                 } else if (argCount > 1) {
461                     OptionBuilder.hasOptionalArgs(argCount);
462                 } else {
463                     OptionBuilder.hasOptionalArgs();
464                 }
465             } else {
466                 if (argCount == 1) {
467                     OptionBuilder.hasArg();
468                 } else if (argCount > 1) {
469                     OptionBuilder.hasArgs(argCount);
470                 } else {
471                     OptionBuilder.hasArgs();
472                 }
473             }
474         }
475         options.add(shortName == null ? OptionBuilder.create() : OptionBuilder.create(shortName));
476     }
477 
478     private static RDFFormat detectRDFFormat(@Nullable final String string,
479             final RDFFormat fallback) {
480         return string == null ? fallback : RDFFormat.forFileName("dummy." + string.trim(),
481                 fallback);
482     }
483 
484     private static Compression detectCompression(@Nullable final String string,
485             final Compression fallback) {
486         return string == null ? fallback : Compression.forFileName("dummy." + string.trim(),
487                 fallback);
488     }
489 
490     private static void checkFileExist(@Nullable final File file) {
491         if (file == null) {
492             return;
493         } else if (!file.exists()) {
494             throw new IllegalArgumentException("File '" + file + "' does not exist");
495         } else if (file.isDirectory()) {
496             throw new IllegalArgumentException("Path '" + file + "' denotes a directory");
497         }
498     }
499 
500     private static void checkFileParseable(@Nullable final File file,
501             @Nullable final String formatString) {
502         if (file == null) {
503             if (formatString == null) {
504                 throw new IllegalArgumentException("Cannot detect RDF format "
505                         + "and compression of STDIN: please specify option -s");
506             }
507             return;
508         }
509         checkFileExist(file);
510         final RDFFormat defaultFormat = detectRDFFormat(formatString, null);
511         final Compression defaultCompression = detectCompression(formatString, null);
512         final RDFFormat format = RDFFormat.forFileName(file.getName());
513         if (format == null && defaultFormat == null) {
514             throw new IllegalArgumentException("Unknown RDF format for file " + file);
515         } else if (format != null && defaultFormat != null && !format.equals(defaultFormat)) {
516             System.err.println("Warning: detected RDF format for file " + file
517                     + " doesn't match specified format");
518         }
519         final Compression compression = Compression.forFileName(file.getName(), Compression.NONE);
520         if (defaultCompression != null && !compression.equals(defaultCompression)) {
521             System.err.println("Warning: detected compression format for file " + file
522                     + " doesn't match specified format");
523         }
524     }
525 
526     private static void disableLogging() {
527         final LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
528         try {
529             final JoranConfigurator configurator = new JoranConfigurator();
530             configurator.setContext(context);
531             context.reset();
532             configurator.doConfigure(RDFPopulator.class.getResource("logback.disabled.xml"));
533         } catch (final JoranException je) {
534             // ignore
535         }
536     }
537 
538     private static final class UploadHandler implements Handler<Record> {
539 
540         private static final int BUFFER_SIZE = 1024;
541 
542         private final Session session;
543 
544         private final Criteria criteria;
545 
546         private final Handler<Record> errorHandler;
547 
548         private final Map<URI, Record> buffer;
549 
550         UploadHandler(final Session session, final Criteria criteria,
551                 final Handler<Record> errorHandler) {
552             this.session = session;
553             this.criteria = criteria;
554             this.errorHandler = errorHandler;
555             this.buffer = Maps.newHashMapWithExpectedSize(BUFFER_SIZE);
556         }
557 
558         @Override
559         public void handle(final Record axiom) throws Throwable {
560             if (axiom == null) {
561                 flush(true);
562             } else {
563                 this.buffer.put(axiom.getID(), axiom);
564                 if (this.buffer.size() == BUFFER_SIZE) {
565                     flush(false);
566                 }
567             }
568         }
569 
570         private void flush(final boolean done) throws Throwable {
571             if (!this.buffer.isEmpty()) {
572                 try {
573                     final Operation.Merge operation = this.session.merge(KS.AXIOM)
574                             .criteria(this.criteria).records(this.buffer.values());
575                     operation.exec(new Handler<Outcome>() {
576 
577                         @Override
578                         public void handle(final Outcome outcome) throws Throwable {
579                             if (outcome.getStatus().isOK()) {
580                                 UploadHandler.this.buffer.remove(outcome.getObjectID());
581                             }
582                         }
583 
584                     });
585                 } catch (final Throwable ex) {
586                     MAIN_LOGGER.error("Upload failure: " + ex.getMessage(), ex);
587                 }
588                 for (final Record record : this.buffer.values()) {
589                     this.errorHandler.handle(record);
590                 }
591             }
592             if (done) {
593                 this.errorHandler.handle(null);
594             }
595         }
596 
597     }
598 
599 }