1   package eu.fbk.knowledgestore.tool;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.io.OutputStream;
6   import java.util.List;
7   
8   import javax.annotation.Nullable;
9   
10  import com.google.common.base.Strings;
11  import com.google.common.collect.ImmutableSet;
12  import com.google.common.collect.Lists;
13  import com.google.common.io.CountingOutputStream;
14  
15  import org.openrdf.model.Statement;
16  import org.openrdf.model.URI;
17  import org.openrdf.model.impl.URIImpl;
18  import org.openrdf.model.vocabulary.OWL;
19  import org.openrdf.model.vocabulary.RDF;
20  import org.openrdf.model.vocabulary.RDFS;
21  import org.openrdf.rio.RDFFormat;
22  import org.openrdf.rio.RDFHandlerException;
23  import org.openrdf.rio.RDFWriter;
24  import org.openrdf.rio.Rio;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import jersey.repackaged.com.google.common.base.Throwables;
29  
30  import eu.fbk.knowledgestore.KnowledgeStore;
31  import eu.fbk.knowledgestore.Operation.Retrieve;
32  import eu.fbk.knowledgestore.Session;
33  import eu.fbk.knowledgestore.client.Client;
34  import eu.fbk.knowledgestore.data.Dictionary;
35  import eu.fbk.knowledgestore.data.Handler;
36  import eu.fbk.knowledgestore.data.Record;
37  import eu.fbk.knowledgestore.data.Serializer;
38  import eu.fbk.knowledgestore.data.Stream;
39  import eu.fbk.knowledgestore.internal.CommandLine;
40  import eu.fbk.knowledgestore.vocabulary.KS;
41  import eu.fbk.knowledgestore.vocabulary.NFO;
42  import eu.fbk.knowledgestore.vocabulary.NIE;
43  import eu.fbk.knowledgestore.vocabulary.NIF;
44  import eu.fbk.knowledgestore.vocabulary.NWR;
45  import eu.fbk.knowledgestore.vocabulary.SEM;
46  import eu.fbk.knowledgestore.vocabulary.TIME;
47  import eu.fbk.rdfpro.util.IO;
48  import eu.fbk.rdfpro.util.Statements;
49  
50  public class Dumper {
51  
52      private static Logger LOGGER = LoggerFactory.getLogger(Dumper.class);
53  
54      public static void main(final String[] args) throws Throwable {
55          try {
56              final CommandLine cmd = CommandLine
57                      .parser()
58                      .withName("ks-dumper")
59                      .withHeader("Downloads the contents of the resource " //
60                              + "or mention layer as a single RDF file")
61                      .withOption("s", "server", "the URL of the KS instance", "URL",
62                              CommandLine.Type.STRING, true, false, true)
63                      .withOption("u", "username", "the KS username (if required)", "USER",
64                              CommandLine.Type.STRING, true, false, false)
65                      .withOption("p", "password", "the KS password (if required)", "PWD",
66                              CommandLine.Type.STRING, true, false, false)
67                      .withOption("r", "resources", "dump resources data (default: false)")
68                      .withOption("m", "mentions", "dump mentions data (default: false)")
69                      .withOption("i", "identifier", "the URI identifier of the single record " //
70                              + "to download (default unspecified = download all records)", "URI",
71                              CommandLine.Type.STRING, true, false, false)
72                      .withOption("b", "binary", "use binary format (two files produced)")
73                      .withOption("o", "output", "the output file", "FILE", CommandLine.Type.FILE,
74                              true, false, true)
75                      .withFooter(
76                              "The RDF format and compression type is automatically detected based on the\n"
77                                      + "file extension (e.g., 'tql.gz' = gzipped TQL file)")
78                      .withLogger(LoggerFactory.getLogger("eu.fbk.knowledgestore")).parse(args);
79  
80              final String serverURL = cmd.getOptionValue("s", String.class);
81              final String username = Strings.emptyToNull(cmd.getOptionValue("u", String.class));
82              final String password = Strings.emptyToNull(cmd.getOptionValue("p", String.class));
83              final boolean dumpResources = cmd.hasOption("r");
84              final boolean dumpMentions = cmd.hasOption("m");
85              final String id = Strings.emptyToNull(cmd.getOptionValue("i", String.class, null));
86              final boolean binary = cmd.hasOption("b");
87              final File outputFile = cmd.getOptionValue("o", File.class);
88  
89              final KnowledgeStore ks = Client.builder(serverURL).compressionEnabled(true)
90                      .maxConnections(2).validateServer(false).build();
91              try {
92                  final Session session;
93                  if (username != null && password != null) {
94                      session = ks.newSession(username, password);
95                  } else {
96                      session = ks.newSession();
97                  }
98                  final Stream<Record> records = download(session, dumpResources, dumpMentions, id);
99                  if (binary) {
100                     writeBinary(records, outputFile);
101                 } else {
102                     writeRDF(records, outputFile);
103                 }
104                 session.close();
105             } finally {
106                 ks.close();
107             }
108 
109         } catch (final Throwable ex) {
110             CommandLine.fail(ex);
111         }
112     }
113 
114     private static Stream<Record> download(final Session session, final boolean dumpResources,
115             final boolean dumpMentions, @Nullable final String id) throws Throwable {
116 
117         final List<URI> types = Lists.newArrayList();
118         if (dumpResources) {
119             types.add(KS.RESOURCE);
120         }
121         if (dumpMentions) {
122             types.add(KS.MENTION);
123         }
124 
125         return Stream.concat(Stream.create(types)
126                 .transform(
127                         (final URI type) -> {
128                             LOGGER.info("Downloading {} data", type.getLocalName().toLowerCase());
129                             try {
130                                 final Retrieve retrieve = session.retrieve(type)
131                                         .limit((long) Integer.MAX_VALUE)
132                                         .timeout(7 * 24 * 60 * 60 * 1000L); // 1 week
133                                 if (id != null) {
134                                     retrieve.ids(new URIImpl(id));
135                                 }
136                                 return retrieve.exec();
137                                 // return session.retrieve(type).limit((long) Integer.MAX_VALUE)
138                                 // .timeout(24 * 60 * 60 * 1000L).exec();
139                             } catch (final Throwable ex) {
140                                 throw Throwables.propagate(ex);
141                             }
142                         }, 1));
143     }
144 
145     private static void writeRDF(final Stream<Record> records, final File file)
146             throws RDFHandlerException, IOException {
147 
148         final RDFFormat format = Rio.getParserFormatForFileName(file.getAbsolutePath());
149         final OutputStream stream = IO.write(file.getAbsolutePath());
150 
151         try {
152             final RDFWriter writer = Rio.createWriter(format, stream);
153             writer.startRDF();
154 
155             writer.handleNamespace("rdf", RDF.NAMESPACE);
156             writer.handleNamespace("rdfs", RDFS.NAMESPACE);
157             writer.handleNamespace("owl", OWL.NAMESPACE);
158             writer.handleNamespace("ks", KS.NAMESPACE);
159             writer.handleNamespace("nwr", NWR.NAMESPACE);
160             writer.handleNamespace("nif", NIF.NAMESPACE);
161             writer.handleNamespace("nfo", NFO.NAMESPACE);
162             writer.handleNamespace("nie", NIE.NAMESPACE);
163             writer.handleNamespace("sem", SEM.NAMESPACE);
164             writer.handleNamespace("time", TIME.NAMESPACE);
165 
166             records.toHandler(new Handler<Record>() {
167 
168                 private int records = 0;
169 
170                 private long triples = 0;
171 
172                 @Override
173                 public void handle(final Record record) throws Throwable {
174                     if (record == null || this.records > 0 && this.records % 1000 == 0) {
175                         LOGGER.info(this.records + " records, " + this.triples
176                                 + " triples processed");
177                     }
178                     if (record != null) {
179                         final List<Statement> statements = Record.encode(Stream.create(record),
180                                 ImmutableSet.of()).toList();
181                         writer.handleComment(record.getID().toString());
182                         for (final Statement statement : statements) {
183                             writer.handleStatement(statement);
184                         }
185                         ++this.records;
186                         this.triples += statements.size();
187                     }
188                 }
189 
190             });
191 
192             writer.endRDF();
193 
194         } finally {
195             stream.close();
196         }
197     }
198 
199     private static void writeBinary(final Stream<Record> records, final File file)
200             throws IOException {
201 
202         final String base = file.getAbsolutePath();
203         final Dictionary<URI> dictionary = Dictionary.createLocalDictionary(URI.class, new File(
204                 base + ".dict"));
205         final Serializer serializer = new Serializer(false, dictionary, Statements.VALUE_FACTORY);
206         final CountingOutputStream stream = new CountingOutputStream(IO.write(base + ".gz"));
207         try {
208             records.toHandler(new Handler<Record>() {
209 
210                 private int records = 0;
211 
212                 @Override
213                 public void handle(final Record record) throws Throwable {
214                     if (record == null || this.records > 0 && this.records % 1000 == 0) {
215                         LOGGER.info("{} records, {} bytes processed ({} bytes/record)",
216                                 this.records, stream.getCount(), stream.getCount() / this.records);
217                     }
218                     if (record != null) {
219                         serializer.toStream(stream, record);
220                         ++this.records;
221                     }
222                 }
223 
224             });
225 
226         } finally {
227             IO.closeQuietly(records);
228             IO.closeQuietly(stream);
229         }
230     }
231 
232 }