1   package eu.fbk.knowledgestore.internal.jaxrs;
2   
3   import java.io.ByteArrayInputStream;
4   import java.io.ByteArrayOutputStream;
5   import java.io.FilterInputStream;
6   import java.io.IOException;
7   import java.io.InputStream;
8   import java.io.OutputStream;
9   import java.lang.annotation.Annotation;
10  import java.lang.reflect.Type;
11  import java.util.Map;
12  import java.util.concurrent.atomic.AtomicLong;
13  
14  import javax.annotation.Nullable;
15  import javax.ws.rs.Consumes;
16  import javax.ws.rs.Produces;
17  import javax.ws.rs.WebApplicationException;
18  import javax.ws.rs.core.HttpHeaders;
19  import javax.ws.rs.core.MediaType;
20  import javax.ws.rs.core.MultivaluedMap;
21  import javax.ws.rs.ext.MessageBodyReader;
22  import javax.ws.rs.ext.MessageBodyWriter;
23  import javax.ws.rs.ext.Provider;
24  
25  import com.google.common.base.Charsets;
26  import com.google.common.base.Splitter;
27  import com.google.common.base.Throwables;
28  import com.google.common.collect.ImmutableSet;
29  import com.google.common.io.CountingInputStream;
30  import com.google.common.io.CountingOutputStream;
31  import com.google.common.reflect.TypeToken;
32  
33  import org.openrdf.model.Statement;
34  import org.openrdf.model.URI;
35  import org.openrdf.model.vocabulary.DCTERMS;
36  import org.openrdf.query.BindingSet;
37  import org.openrdf.query.resultio.BooleanQueryResultFormat;
38  import org.openrdf.query.resultio.TupleQueryResultFormat;
39  import org.openrdf.rio.RDFFormat;
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  import eu.fbk.knowledgestore.Outcome;
44  import eu.fbk.knowledgestore.data.Data;
45  import eu.fbk.knowledgestore.data.Record;
46  import eu.fbk.knowledgestore.data.Representation;
47  import eu.fbk.knowledgestore.data.Stream;
48  import eu.fbk.knowledgestore.internal.Logging;
49  import eu.fbk.knowledgestore.internal.Util;
50  import eu.fbk.knowledgestore.internal.rdf.RDFUtil;
51  import eu.fbk.knowledgestore.vocabulary.KS;
52  import eu.fbk.knowledgestore.vocabulary.NFO;
53  import eu.fbk.knowledgestore.vocabulary.NIE;
54  import eu.fbk.rdfpro.tql.TQL;
55  
56  @Provider
57  @Consumes(MediaType.WILDCARD)
58  @Produces(MediaType.WILDCARD)
59  public class Serializer implements MessageBodyReader<Object>, MessageBodyWriter<Object> {
60  
61      // TODO: supported types depend on imported libraries
62  
63      private static final Logger LOGGER = LoggerFactory.getLogger(Serializer.class);
64  
65      @Override
66      public boolean isReadable(final Class<?> type, final Type genericType,
67              final Annotation[] annotations, final MediaType mediaType) {
68  
69          final boolean result = type.isAssignableFrom(Representation.class)
70                  || isAssignable(genericType, Protocol.STREAM_OF_RECORDS.getType())
71                  || isAssignable(genericType, Protocol.STREAM_OF_OUTCOMES.getType())
72                  || isAssignable(genericType, Protocol.STREAM_OF_STATEMENTS.getType())
73                  || isAssignable(genericType, Protocol.STREAM_OF_TUPLES.getType())
74                  || isAssignable(genericType, Protocol.STREAM_OF_BOOLEANS.getType());
75  
76          if (!result) {
77              LOGGER.debug("Non deserializable stream: {} ({})", genericType, mediaType);
78          }
79  
80          return result;
81      }
82  
83      @Override
84      public boolean isWriteable(final Class<?> type, final Type genericType,
85              final Annotation[] annotations, final MediaType mediaType) {
86  
87          final boolean result = Representation.class.isAssignableFrom(type)
88                  || isAssignable(Protocol.STREAM_OF_RECORDS.getType(), genericType)
89                  || isAssignable(Protocol.STREAM_OF_OUTCOMES.getType(), genericType)
90                  || isAssignable(Protocol.STREAM_OF_STATEMENTS.getType(), genericType)
91                  || isAssignable(Protocol.STREAM_OF_TUPLES.getType(), genericType)
92                  || isAssignable(Protocol.STREAM_OF_BOOLEANS.getType(), genericType);
93  
94          if (!result) {
95              LOGGER.debug("Non serializable stream: {} ({})", genericType, mediaType);
96          }
97  
98          return result;
99      }
100 
101     @Override
102     public long getSize(final Object object, final Class<?> type, final Type genericType,
103             final Annotation[] annotations, final MediaType mediaType) {
104         throw new UnsupportedOperationException(); // JAX-RS promises never to call this method
105     }
106 
107     @SuppressWarnings("resource")
108     @Override
109     public Object readFrom(final Class<Object> type, final Type genericType,
110             final Annotation[] annotations, final MediaType mediaType,
111             final MultivaluedMap<String, String> headers, final InputStream input)
112             throws IOException, WebApplicationException {
113 
114         final String mimeType = mediaType.getType() + "/" + mediaType.getSubtype();
115 
116         final CountingInputStream in = new CountingInputStream(input);
117         final boolean chunked = "true".equalsIgnoreCase(headers.getFirst(Protocol.HEADER_CHUNKED));
118         final long ts = System.currentTimeMillis();
119 
120         try {
121             if (type.isAssignableFrom(Representation.class)) {
122                 final InputStream stream = interceptClose(in, ts);
123                 final Representation representation = Representation.create(stream);
124                 readMetadata(representation.getMetadata(), headers);
125                 return representation;
126 
127             } else if (isAssignable(genericType, Protocol.STREAM_OF_RECORDS.getType())) {
128                 final RDFFormat format = formatFor(mimeType);
129                 final AtomicLong numStatements = new AtomicLong();
130                 final AtomicLong numRecords = new AtomicLong();
131                 Stream<Statement> statements = RDFUtil.readRDF(in, format, null, null, false);
132                 statements = statements.track(numStatements, null);
133                 Stream<Record> records = Record.decode(statements, null, chunked);
134                 records = records.track(numRecords, null);
135                 interceptClose(records, in, ts, numRecords, "record(s)", numStatements,
136                         "statement(s)");
137                 return records;
138 
139             } else if (isAssignable(genericType, Protocol.STREAM_OF_OUTCOMES.getType())) {
140                 final RDFFormat format = formatFor(mimeType);
141                 final AtomicLong numStatements = new AtomicLong();
142                 final AtomicLong numOutcomes = new AtomicLong();
143                 Stream<Statement> statements = RDFUtil.readRDF(in, format, null, null, false);
144                 statements = statements.track(numStatements, null);
145                 Stream<Outcome> outcomes = Outcome.decode(statements, chunked);
146                 outcomes = outcomes.track(numOutcomes, null);
147                 interceptClose(outcomes, in, ts, numOutcomes, "outcome(s)", numStatements,
148                         "statement(s)");
149                 return outcomes;
150 
151             } else if (isAssignable(genericType, Protocol.STREAM_OF_STATEMENTS.getType())) {
152                 final RDFFormat format = formatFor(mimeType);
153                 final AtomicLong numStatements = new AtomicLong();
154                 Stream<Statement> statements = RDFUtil.readRDF(in, format, null, null, false);
155                 statements = statements.track(numStatements, null);
156                 interceptClose(statements, in, ts, numStatements, "statement(s)");
157                 return statements;
158 
159             } else if (isAssignable(genericType, Protocol.STREAM_OF_TUPLES.getType())) {
160                 final TupleQueryResultFormat format;
161                 format = TupleQueryResultFormat.forMIMEType(mimeType);
162                 final AtomicLong numTuples = new AtomicLong();
163                 Stream<BindingSet> tuples = RDFUtil.readSparqlTuples(format, in);
164                 tuples = tuples.track(numTuples, null);
165                 interceptClose(tuples, in, ts, numTuples, "tuple(s)");
166                 return tuples;
167 
168             } else if (isAssignable(genericType, Protocol.STREAM_OF_BOOLEANS.getType())) {
169                 final BooleanQueryResultFormat format;
170                 format = BooleanQueryResultFormat.forMIMEType(mimeType);
171                 final boolean result = RDFUtil.readSparqlBoolean(format, in);
172                 final Stream<Boolean> stream = Stream.create(result);
173                 interceptClose(stream, in, ts, 1, "boolean");
174                 return stream;
175             }
176         } catch (final Throwable ex) {
177             Util.closeQuietly(in); // done even if advised against it
178             Throwables.propagateIfPossible(ex, IOException.class);
179             throw Throwables.propagate(ex);
180         }
181 
182         throw new IllegalArgumentException("Cannot deserialize " + genericType + " from "
183                 + mimeType);
184     }
185 
186     @SuppressWarnings("unchecked")
187     @Override
188     public void writeTo(final Object object, final Class<?> type, final Type genericType,
189             final Annotation[] annotations, final MediaType mediaType,
190             final MultivaluedMap<String, Object> headers, final OutputStream output)
191             throws IOException, WebApplicationException {
192 
193         final String mimeType = mediaType.getType() + "/" + mediaType.getSubtype();
194 
195         final Map<String, String> namespaces = Data.getNamespaceMap();
196         final CountingOutputStream out = new CountingOutputStream(output);
197         final long ts = System.currentTimeMillis();
198 
199         try {
200             if (Representation.class.isAssignableFrom(type)) {
201                 final Representation representation = (Representation) object;
202                 writeMetadata(representation.getMetadata(), headers);
203                 representation.writeTo(out);
204                 logWrite(ts, out);
205 
206             } else if (isAssignable(Protocol.STREAM_OF_RECORDS.getType(), genericType)) {
207                 headers.putSingle(Protocol.HEADER_CHUNKED, "true");
208                 final String mime = setupType(mimeType, Protocol.MIME_TYPES_RDF, headers);
209                 final RDFFormat format = formatFor(mime);
210                 final AtomicLong recordCounter = new AtomicLong();
211                 Stream<? extends Record> records = (Stream<? extends Record>) object;
212                 records = records.track(recordCounter, null);
213                 final Stream<Statement> stmt = Record.encode(records, null);
214                 final long count = RDFUtil.writeRDF(out, format, namespaces, null, stmt);
215                 logWrite(ts, out, recordCounter.get(), "record(s)", count, "statement(s)");
216 
217             } else if (isAssignable(Protocol.STREAM_OF_OUTCOMES.getType(), genericType)) {
218                 headers.putSingle(Protocol.HEADER_CHUNKED, "true");
219                 final String mime = setupType(mimeType, Protocol.MIME_TYPES_RDF, headers);
220                 final RDFFormat format = formatFor(mime);
221                 final AtomicLong outcomeCounter = new AtomicLong();
222                 Stream<? extends Outcome> outcomes = (Stream<? extends Outcome>) object;
223                 outcomes = outcomes.track(outcomeCounter, null);
224                 final Stream<Statement> stmt = Outcome.encode(outcomes);
225                 final long count = RDFUtil.writeRDF(out, format, namespaces, null, stmt);
226                 logWrite(ts, out, outcomeCounter.get(), "outcome(s)", count, "statement(s)");
227 
228             } else if (isAssignable(Protocol.STREAM_OF_STATEMENTS.getType(), genericType)) {
229                 final String mime = setupType(mimeType, Protocol.MIME_TYPES_RDF, headers);
230                 final RDFFormat format = formatFor(mime);
231                 final Stream<? extends Statement> stmt = (Stream<? extends Statement>) object;
232                 final long count = RDFUtil.writeRDF(out, format, namespaces, null, stmt);
233                 logWrite(ts, out, count, "statement(s)");
234 
235             } else if (isAssignable(Protocol.STREAM_OF_TUPLES.getType(), genericType)) {
236                 final String mime = setupType(mimeType, Protocol.MIME_TYPES_SPARQL_TUPLE, headers);
237                 final TupleQueryResultFormat format = TupleQueryResultFormat.forMIMEType(mime);
238                 final Stream<? extends BindingSet> tuples = (Stream<? extends BindingSet>) object;
239                 final long count = RDFUtil.writeSparqlTuples(format, out, tuples);
240                 logWrite(ts, out, count, "tuple(s)");
241 
242             } else if (isAssignable(Protocol.STREAM_OF_BOOLEANS.getType(), genericType)) {
243                 final String mime = setupType(mimeType, Protocol.MIME_TYPES_SPARQL_BOOLEAN,
244                         headers);
245                 final BooleanQueryResultFormat format = BooleanQueryResultFormat.forMIMEType(mime);
246                 final boolean bool = ((Stream<? extends Boolean>) object).getUnique();
247                 RDFUtil.writeSparqlBoolean(format, out, bool);
248                 logWrite(ts, out, 1, "boolean");
249 
250             } else {
251                 throw new IllegalArgumentException("Cannot serialize " + genericType + " to "
252                         + mediaType);
253             }
254 
255         } finally {
256             Util.closeQuietly(object);
257             Util.closeQuietly(out); // done even if asked not to do so
258         }
259     }
260 
261     @Nullable
262     private static void readMetadata(final Record metadata,
263             final MultivaluedMap<String, String> headers) {
264 
265         // Read Content-Type header
266         final String mime = headers.getFirst(HttpHeaders.CONTENT_TYPE);
267         metadata.set(NIE.MIME_TYPE, mime != null ? mime : MediaType.APPLICATION_OCTET_STREAM);
268 
269         // Read Content-MD5 header, if available
270         final String md5 = headers.getFirst("Content-MD5");
271         if (md5 != null) {
272             final Record hash = Record.create();
273             hash.set(NFO.HASH_ALGORITHM, "MD5");
274             hash.set(NFO.HASH_VALUE, md5);
275             metadata.set(NFO.HAS_HASH, hash);
276         }
277 
278         // Read Content-Language header, if possible
279         final String language = headers.getFirst(HttpHeaders.CONTENT_LANGUAGE);
280         try {
281             metadata.set(DCTERMS.LANGUAGE, Data.languageCodeToURI(language));
282         } catch (final Throwable ex) {
283             LOGGER.warn("Invalid {}: {}", HttpHeaders.CONTENT_LANGUAGE, language);
284         }
285 
286         // Read custom X-KS-Meta header
287         final String encodedMeta = headers.getFirst(Protocol.HEADER_META);
288         if (encodedMeta != null) {
289             final InputStream in = new ByteArrayInputStream(encodedMeta.getBytes(Charsets.UTF_8));
290             final Stream<Statement> statements = RDFUtil.readRDF(in, RDFFormat.TURTLE,
291                     Data.getNamespaceMap(), null, true);
292             final Record record = Record.decode(statements,
293                     ImmutableSet.<URI>of(KS.REPRESENTATION), true).getUnique();
294             metadata.setID(record.getID());
295             for (final URI property : record.getProperties()) {
296                 metadata.set(property, record.get(property));
297             }
298         }
299     }
300 
301     @Nullable
302     private static void writeMetadata(final Record metadata,
303             final MultivaluedMap<String, Object> headers) {
304 
305         // Write Content-Type header
306         headers.putSingle(HttpHeaders.CONTENT_TYPE, metadata.getUnique(NIE.MIME_TYPE,
307                 String.class, MediaType.APPLICATION_OCTET_STREAM));
308 
309         // Write Content-MD5 header, if possible
310         final Record hash = metadata.getUnique(NFO.HAS_HASH, Record.class, null);
311         final String md5 = hash == null ? null : !"MD5".equals(hash.getUnique(NFO.HASH_ALGORITHM,
312                 String.class, null)) ? null //
313                 : hash.getUnique(NFO.HASH_VALUE, String.class, null);
314         headers.putSingle("Content-MD5", md5);
315 
316         // Write Content-Language header, if possible
317         String language = metadata.getUnique(NIE.LANGUAGE, String.class, null);
318         if (language == null) {
319             final URI languageURI = metadata.getUnique(DCTERMS.LANGUAGE, URI.class, null);
320             try {
321                 language = Data.languageURIToCode(languageURI);
322             } catch (final Throwable ex) {
323                 LOGGER.warn("Invalid language URI: ", languageURI);
324             }
325         }
326         headers.putSingle(HttpHeaders.CONTENT_LANGUAGE, language);
327 
328         // Write custom X-KS-Meta header
329         final ByteArrayOutputStream out = new ByteArrayOutputStream();
330         final Stream<Statement> statements = Record.encode(Stream.create(metadata),
331                 ImmutableSet.<URI>of(KS.REPRESENTATION));
332         RDFUtil.writeRDF(out, RDFFormat.TURTLE, Data.getNamespaceMap(), null, statements);
333         final String string = new String(out.toByteArray(), Charsets.UTF_8);
334         final StringBuilder builder = new StringBuilder();
335         String separator = "";
336         for (final String line : Splitter.on('\n').trimResults().omitEmptyStrings().split(string)) {
337             if (!line.toLowerCase().startsWith("@prefix")) {
338                 builder.append(separator).append(line);
339                 separator = " ";
340             }
341         }
342         headers.putSingle(Protocol.HEADER_META, builder.toString());
343     }
344 
345     private static boolean isAssignable(final Type lhs, final Type rhs) {
346         return TypeToken.of(lhs).isAssignableFrom(rhs);
347     }
348 
349     private static String setupType(final String jaxrsType, final String supportedTypes,
350             final MultivaluedMap<String, Object> headers) {
351 
352         if (jaxrsType != null) {
353             return jaxrsType;
354         }
355 
356         final int index = supportedTypes.indexOf(',');
357         final String mediaType = index < 0 ? supportedTypes : supportedTypes.substring(0, index);
358 
359         headers.putSingle("Content-Type", mediaType);
360         headers.remove("ETag"); // to stay on the safe side
361 
362         return mediaType;
363     }
364 
365     private static void interceptClose(final Stream<?> stream, final CountingInputStream in,
366             final long startTime, final Object... args) {
367         final Map<String, String> mdc = Logging.getMDC();
368         stream.onClose(new Runnable() {
369 
370             @Override
371             public void run() {
372                 final Map<String, String> oldMdc = Logging.getMDC();
373                 try {
374                     Logging.setMDC(mdc);
375                     logRead(in, startTime, args);
376                     // closing the stream should not be done, but GZIPFilter seems not to detect
377                     // EOF and does not release the underlying stream, causing the connection not
378                     // to be released
379                     Util.closeQuietly(in);
380                 } finally {
381                     Logging.setMDC(oldMdc);
382                 }
383             }
384 
385         });
386     }
387 
388     private static InputStream interceptClose(final CountingInputStream stream,
389             final long startTime, final Object... args) {
390         final Map<String, String> mdc = Logging.getMDC();
391         return new FilterInputStream(stream) {
392 
393             private boolean closed;
394 
395             @Override
396             public void close() throws IOException {
397                 if (this.closed) {
398                     return;
399                 }
400                 final Map<String, String> oldMdc = Logging.getMDC();
401                 try {
402                     Logging.setMDC(mdc);
403                     logRead(stream, startTime, args);
404                     // closing the stream should not be done, but GZIPFilter seems not to detect
405                     // EOF and does not release the underlying stream, causing the connection not
406                     // to be released
407                     Util.closeQuietly(this.in);
408                 } finally {
409                     this.closed = true;
410                     Logging.setMDC(oldMdc);
411                     super.close();
412                 }
413             }
414 
415         };
416     }
417 
418     private static void logRead(final CountingInputStream in, final long startTime,
419             final Object... args) {
420         if (LOGGER.isDebugEnabled()) {
421             boolean eof = false;
422             try {
423                 eof = in.read() == -1;
424             } catch (final Throwable ex) {
425                 // ignore
426             }
427             final long elapsed = System.currentTimeMillis() - startTime;
428             final StringBuilder builder = new StringBuilder();
429             builder.append("Http: read complete, ");
430             for (int i = 0; i < args.length; i += 2) {
431                 builder.append(args[i]).append(" ").append(args[i + 1]).append(", ");
432             }
433             builder.append(in.getCount()).append(" byte(s), ");
434             if (eof) {
435                 builder.append("EOF, ");
436             }
437             builder.append(elapsed).append(" ms");
438             LOGGER.debug(builder.toString());
439         }
440     }
441 
442     private static void logWrite(final long startTime, final CountingOutputStream stream,
443             final Object... args) {
444         if (LOGGER.isDebugEnabled()) {
445             final long elapsed = System.currentTimeMillis() - startTime;
446             final StringBuilder builder = new StringBuilder();
447             builder.append("Http: write complete, ");
448             for (int i = 0; i < args.length; i += 2) {
449                 builder.append(args[i]).append(" ").append(args[i + 1]).append(", ");
450             }
451             builder.append(stream.getCount()).append(" byte(s), ");
452             builder.append(elapsed).append(" ms");
453             LOGGER.debug(builder.toString());
454         }
455     }
456 
457     private static RDFFormat formatFor(final String mimeType) {
458         final RDFFormat format = RDFFormat.forMIMEType(mimeType);
459         if (format == null) {
460             throw new IllegalArgumentException("No RDF format for MIME type '" + mimeType + "'");
461         }
462         return format;
463     }
464 
465     static {
466         RDFFormat.register(TQL.FORMAT);
467     }
468 
469 }