1   package eu.fbk.knowledgestore.datastore.hbase.utils;
2   
3   import java.io.ByteArrayInputStream;
4   import java.io.ByteArrayOutputStream;
5   import java.io.IOException;
6   import java.io.InputStream;
7   import java.io.OutputStream;
8   import java.util.GregorianCalendar;
9   import java.util.List;
10  import java.util.Set;
11  import java.util.TimeZone;
12  
13  import javax.annotation.Nullable;
14  import javax.xml.datatype.DatatypeFactory;
15  import javax.xml.datatype.XMLGregorianCalendar;
16  
17  import com.google.common.base.Preconditions;
18  import com.google.common.collect.ImmutableList;
19  import com.google.common.collect.Iterables;
20  import com.google.common.collect.Lists;
21  
22  import org.apache.avro.Schema;
23  import org.apache.avro.generic.GenericData;
24  import org.apache.avro.generic.GenericDatumReader;
25  import org.apache.avro.generic.GenericDatumWriter;
26  import org.apache.avro.generic.GenericRecord;
27  import org.apache.avro.io.DatumReader;
28  import org.apache.avro.io.DatumWriter;
29  import org.apache.avro.io.Decoder;
30  import org.apache.avro.io.DecoderFactory;
31  import org.apache.avro.io.Encoder;
32  import org.apache.avro.io.EncoderFactory;
33  import org.openrdf.model.BNode;
34  import org.openrdf.model.Literal;
35  import org.openrdf.model.Resource;
36  import org.openrdf.model.Statement;
37  import org.openrdf.model.URI;
38  import org.openrdf.model.Value;
39  import org.openrdf.model.ValueFactory;
40  import org.openrdf.model.vocabulary.RDF;
41  import org.openrdf.model.vocabulary.XMLSchema;
42  
43  import eu.fbk.knowledgestore.data.Data;
44  import eu.fbk.knowledgestore.data.Dictionary;
45  import eu.fbk.knowledgestore.data.Record;
46  
47  // NOTE: supports only serialization and deserialization of Record, URI, BNode, Literal,
48  // Statement objects. For records, it is possible to specify which properties to serialize /
49  // deserialize.
50  
51  // TODO: add ideas from smaz/jsmaz to dictionary-compress short strings / uris
52  // <https://github.com/icedrake/jsmaz> (30-50% string reduction achievable)
53  
54  public final class AvroSerializer {
55  
56      private final Dictionary<URI> dictionary;
57  
58      private final ValueFactory factory;
59  
60      private final DatatypeFactory datatypeFactory;
61  
62      public AvroSerializer() {
63          this(null);
64      }
65  
66      public AvroSerializer(@Nullable final Dictionary<URI> dictionary) {
67          this.dictionary = dictionary;
68          this.factory = Data.getValueFactory();
69          this.datatypeFactory = Data.getDatatypeFactory();
70      }
71  
72      public Dictionary<URI> getDictionary() {
73          return this.dictionary;
74      }
75  
76      public byte[] compressURI(final URI uri) {
77          Preconditions.checkNotNull(uri);
78          try {
79              final ByteArrayOutputStream stream = new ByteArrayOutputStream();
80              final Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null);
81              final DatumWriter<Object> writer = new GenericDatumWriter<Object>(
82                      AvroSchemas.COMPRESSED_IDENTIFIER);
83              this.dictionary.keyFor(uri); // ensure a compressed version of URI is available
84              final Object generic = encodeIdentifier(uri);
85              writer.write(generic, encoder);
86              return stream.toByteArray();
87  
88          } catch (final IOException ex) {
89              throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
90          }
91      }
92  
93      public URI expandURI(final byte[] bytes) {
94          Preconditions.checkNotNull(bytes);
95          try {
96              final InputStream stream = new ByteArrayInputStream(bytes);
97              final Decoder decoder = DecoderFactory.get().directBinaryDecoder(stream, null);
98              final DatumReader<Object> reader = new GenericDatumReader<Object>(
99                      AvroSchemas.COMPRESSED_IDENTIFIER);
100             final Object generic = reader.read(null, decoder);
101             return (URI) decodeNode(generic);
102 
103         } catch (final IOException ex) {
104             throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
105         }
106     }
107 
108     public byte[] toBytes(final Object object) {
109         try {
110             final ByteArrayOutputStream stream = new ByteArrayOutputStream();
111             this.toStream(stream, object);
112             return stream.toByteArray();
113         } catch (final IOException ex) {
114             throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
115         }
116     }
117 
118     public byte[] toBytes(final Record object, @Nullable final Set<URI> propertiesToSerialize) {
119         try {
120             final ByteArrayOutputStream stream = new ByteArrayOutputStream();
121             this.toStream(stream, object, propertiesToSerialize);
122             return stream.toByteArray();
123         } catch (final IOException ex) {
124             throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
125         }
126     }
127 
128     public Object fromBytes(final byte[] bytes) {
129         try {
130             return this.fromStream(new ByteArrayInputStream(bytes));
131         } catch (final IOException ex) {
132             throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
133         }
134     }
135 
136     public Record fromBytes(final byte[] bytes, final @Nullable Set<URI> propertiesToDeserialize) {
137         try {
138             return this.fromStream(new ByteArrayInputStream(bytes), propertiesToDeserialize);
139         } catch (final IOException ex) {
140             throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
141         }
142     }
143 
144     public void toStream(final OutputStream stream, final Object object) throws IOException {
145         final Object generic = encodeNode(object);
146         final Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null);
147         final DatumWriter<Object> writer = new GenericDatumWriter<Object>(AvroSchemas.NODE);
148         writer.write(generic, encoder);
149         encoder.flush();
150     }
151 
152     public void toStream(final OutputStream stream, final Record object,
153             @Nullable final Set<URI> propertiesToSerialize) throws IOException {
154         final Object generic = encodeRecord(object, propertiesToSerialize);
155         final Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null);
156         final DatumWriter<Object> writer = new GenericDatumWriter<Object>(AvroSchemas.NODE);
157         writer.write(generic, encoder);
158         encoder.flush();
159     }
160 
161     public Object fromStream(final InputStream stream) throws IOException {
162         final Decoder decoder = DecoderFactory.get().directBinaryDecoder(stream, null);
163         final DatumReader<Object> reader = new GenericDatumReader<Object>(AvroSchemas.NODE);
164         final Object generic = reader.read(null, decoder);
165         return decodeNode(generic);
166     }
167 
168     public Record fromStream(final InputStream stream,
169             @Nullable final Set<URI> propertiesToDeserialize) throws IOException {
170         final Decoder decoder = DecoderFactory.get().directBinaryDecoder(stream, null);
171         final DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(
172                 AvroSchemas.NODE);
173         final GenericRecord generic = reader.read(null, decoder);
174         return decodeRecord(generic, propertiesToDeserialize);
175     }
176 
177     private List<Object> decodeNodes(final Object generic) {
178         if (generic instanceof Iterable<?>) {
179             final Iterable<?> iterable = (Iterable<?>) generic;
180             final int size = Iterables.size(iterable);
181             final List<Object> nodes = Lists.<Object>newArrayListWithCapacity(size);
182             for (final Object element : iterable) {
183                 nodes.add(decodeNode(element));
184             }
185             return nodes;
186         }
187         Preconditions.checkNotNull(generic);
188         return ImmutableList.of(decodeNode(generic));
189     }
190 
191     private Object decodeNode(final Object generic) {
192         if (generic instanceof GenericRecord) {
193             final GenericRecord record = (GenericRecord) generic;
194             final Schema schema = record.getSchema();
195             if (schema.equals(AvroSchemas.RECORD)) {
196                 return decodeRecord(record, null);
197             } else if (schema.equals(AvroSchemas.PLAIN_IDENTIFIER)
198                     || schema.equals(AvroSchemas.COMPRESSED_IDENTIFIER)) {
199                 return decodeIdentifier(record);
200             } else if (schema.equals(AvroSchemas.STATEMENT)) {
201                 return decodeStatement(record);
202             }
203         }
204         return decodeLiteral(generic);
205     }
206 
207     @SuppressWarnings("unchecked")
208     private Record decodeRecord(final GenericRecord generic,
209             @Nullable final Set<URI> propertiesToDecode) {
210         final Record record = Record.create();
211         final GenericRecord encodedID = (GenericRecord) generic.get(0);
212         if (encodedID != null) {
213             record.setID((URI) decodeIdentifier(encodedID));
214         }
215         for (final GenericRecord prop : (Iterable<GenericRecord>) generic.get(1)) {
216             final URI property = (URI) decodeIdentifier((GenericRecord) prop.get(0));
217             final List<Object> values = decodeNodes(prop.get(1));
218             if (propertiesToDecode == null || propertiesToDecode.contains(property)) {
219                 record.set(property, values);
220             }
221         }
222         return record;
223     }
224 
225     private Value decodeValue(final Object generic) {
226         if (generic instanceof GenericRecord) {
227             final GenericRecord record = (GenericRecord) generic;
228             final Schema schema = record.getSchema();
229             if (schema.equals(AvroSchemas.COMPRESSED_IDENTIFIER)
230                     || schema.equals(AvroSchemas.PLAIN_IDENTIFIER)) {
231                 return decodeIdentifier(record);
232             }
233         }
234         return decodeLiteral(generic);
235     }
236 
237     private Resource decodeIdentifier(final GenericRecord record) {
238         final Schema schema = record.getSchema();
239         if (schema.equals(AvroSchemas.COMPRESSED_IDENTIFIER)) {
240             try {
241                 return this.dictionary.objectFor((Integer) record.get(0));
242             } catch (final IOException ex) {
243                 throw new IllegalStateException("Cannot access dictionary: " + ex.getMessage(), ex);
244             }
245         } else if (schema.equals(AvroSchemas.PLAIN_IDENTIFIER)) {
246             final String string = record.get(0).toString();
247             if (string.startsWith("_:")) {
248                 return this.factory.createBNode(string.substring(2));
249             } else {
250                 return this.factory.createURI(string);
251             }
252         }
253         throw new IllegalArgumentException("Unsupported encoded identifier: " + record);
254     }
255 
256     private Literal decodeLiteral(final Object generic) {
257         if (generic instanceof GenericRecord) {
258             final GenericRecord record = (GenericRecord) generic;
259             final Schema schema = record.getSchema();
260             if (schema.equals(AvroSchemas.STRING_LANG)) {
261                 final String label = record.get(0).toString(); // Utf8 class used
262                 final Object language = record.get(1);
263                 return this.factory.createLiteral(label, language.toString());
264             } else if (schema.equals(AvroSchemas.SHORT)) {
265                 return this.factory.createLiteral(((Integer) record.get(0)).shortValue());
266             } else if (schema.equals(AvroSchemas.BYTE)) {
267                 return this.factory.createLiteral(((Integer) record.get(0)).byteValue());
268             } else if (schema.equals(AvroSchemas.BIGINTEGER)) {
269                 return this.factory.createLiteral(record.get(0).toString(), XMLSchema.INTEGER);
270             } else if (schema.equals(AvroSchemas.BIGDECIMAL)) {
271                 return this.factory.createLiteral(record.get(0).toString(), XMLSchema.DECIMAL);
272             } else if (schema.equals(AvroSchemas.CALENDAR)) {
273                 final int tz = (Integer) record.get(0);
274                 final GregorianCalendar calendar = new GregorianCalendar();
275                 calendar.setTimeInMillis((Long) record.get(1));
276                 calendar.setTimeZone(TimeZone.getTimeZone(String.format("GMT%s%02d:%02d",
277                         tz >= 0 ? "+" : "-", Math.abs(tz) / 60, Math.abs(tz) % 60)));
278                 return this.factory.createLiteral(this.datatypeFactory
279                         .newXMLGregorianCalendar(calendar));
280             }
281         } else if (generic instanceof CharSequence) {
282             return this.factory.createLiteral(generic.toString()); // Utf8 class used
283         } else if (generic instanceof Boolean) {
284             return this.factory.createLiteral((Boolean) generic);
285         } else if (generic instanceof Long) {
286             return this.factory.createLiteral((Long) generic);
287         } else if (generic instanceof Integer) {
288             return this.factory.createLiteral((Integer) generic);
289         } else if (generic instanceof Double) {
290             return this.factory.createLiteral((Double) generic);
291         } else if (generic instanceof Float) {
292             return this.factory.createLiteral((Float) generic);
293         }
294         Preconditions.checkNotNull(generic);
295         throw new IllegalArgumentException("Unsupported generic data: " + generic);
296     }
297 
298     private Statement decodeStatement(final GenericRecord record) {
299         final Resource subj = decodeIdentifier((GenericRecord) record.get(0));
300         final URI pred = (URI) decodeIdentifier((GenericRecord) record.get(1));
301         final Value obj = decodeValue(record.get(2));
302         final Resource ctx = decodeIdentifier((GenericRecord) record.get(3));
303         if (ctx == null) {
304             return this.factory.createStatement(subj, pred, obj);
305         } else {
306             return this.factory.createStatement(subj, pred, obj, ctx);
307         }
308     }
309 
310     private Object encodeNodes(final Iterable<? extends Object> nodes) {
311         final int size = Iterables.size(nodes);
312         if (size == 1) {
313             return encodeNode(Iterables.get(nodes, 0));
314         }
315         final List<Object> list = Lists.<Object>newArrayListWithCapacity(size);
316         for (final Object node : nodes) {
317             list.add(encodeNode(node));
318         }
319         return list;
320     }
321 
322     private Object encodeNode(final Object node) {
323         if (node instanceof Record) {
324             return encodeRecord((Record) node, null);
325         } else if (node instanceof Literal) {
326             return encodeLiteral((Literal) node);
327         } else if (node instanceof Resource) {
328             return encodeIdentifier((Resource) node);
329         } else if (node instanceof Statement) {
330             return encodeStatement((Statement) node);
331         }
332         Preconditions.checkNotNull(node);
333         throw new IllegalArgumentException("Unsupported node: " + node);
334     }
335 
336     private Object encodeRecord(final Record record, @Nullable final Set<URI> propertiesToEncode) {
337         final URI id = record.getID();
338         final Object encodedID = id == null ? null : encodeIdentifier(id);
339         final List<Object> props = Lists.newArrayList();
340         for (final URI property : record.getProperties()) {
341             if (propertiesToEncode == null || propertiesToEncode.contains(property)) {
342                 ensureInDictionary(property);
343                 final List<? extends Object> nodes = record.get(property);
344                 if (property.equals(RDF.TYPE)) {
345                     for (final Object value : nodes) {
346                         if (value instanceof URI) {
347                             ensureInDictionary((URI) value);
348                         }
349                     }
350                 }
351                 final GenericData.Record prop = new GenericData.Record(AvroSchemas.PROPERTY);
352                 prop.put("propertyURI", encodeIdentifier(property));
353                 prop.put("propertyValue", encodeNodes(nodes));
354                 props.add(prop);
355             }
356         }
357         return AvroSerializer.newGenericRecord(AvroSchemas.RECORD, encodedID, props);
358     }
359 
360     private Object encodeValue(final Value value) {
361         if (value instanceof Literal) {
362             return encodeLiteral((Literal) value);
363         } else if (value instanceof Resource) {
364             return encodeIdentifier((Resource) value);
365         } else {
366             throw new IllegalArgumentException("Unsupported value: " + value);
367         }
368     }
369 
370     private Object encodeIdentifier(final Resource identifier) {
371         if (identifier instanceof URI) {
372             try {
373                 final Integer key = this.dictionary.keyFor((URI) identifier, false);
374                 if (key != null) {
375                     return AvroSerializer.newGenericRecord(AvroSchemas.COMPRESSED_IDENTIFIER, key);
376                 }
377             } catch (final IOException ex) {
378                 throw new IllegalStateException("Cannot access dictionary: " + ex.getMessage(), ex);
379             }
380         }
381         final String id = identifier instanceof BNode ? "_:" + ((BNode) identifier).getID()
382                 : identifier.stringValue();
383         return AvroSerializer.newGenericRecord(AvroSchemas.PLAIN_IDENTIFIER, id);
384     }
385 
386     private Object encodeLiteral(final Literal literal) {
387         final URI datatype = literal.getDatatype();
388         if (datatype == null || datatype.equals(XMLSchema.STRING)) {
389             final String language = literal.getLanguage();
390             if (language == null) {
391                 return literal.getLabel();
392             } else {
393                 return AvroSerializer.newGenericRecord(AvroSchemas.STRING_LANG,
394                         literal.getLabel(), language);
395             }
396         } else if (datatype.equals(XMLSchema.BOOLEAN)) {
397             return literal.booleanValue();
398         } else if (datatype.equals(XMLSchema.LONG)) {
399             return literal.longValue();
400         } else if (datatype.equals(XMLSchema.INT)) {
401             return literal.intValue();
402         } else if (datatype.equals(XMLSchema.DOUBLE)) {
403             return literal.doubleValue();
404         } else if (datatype.equals(XMLSchema.FLOAT)) {
405             return literal.floatValue();
406         } else if (datatype.equals(XMLSchema.SHORT)) {
407             return AvroSerializer.newGenericRecord(AvroSchemas.SHORT, literal.intValue());
408         } else if (datatype.equals(XMLSchema.BYTE)) {
409             return AvroSerializer.newGenericRecord(AvroSchemas.BYTE, literal.intValue());
410         } else if (datatype.equals(XMLSchema.INTEGER)) {
411             return AvroSerializer.newGenericRecord(AvroSchemas.BIGINTEGER, literal.stringValue());
412         } else if (datatype.equals(XMLSchema.DECIMAL)) {
413             return AvroSerializer.newGenericRecord(AvroSchemas.BIGDECIMAL, literal.stringValue());
414         } else if (datatype.equals(XMLSchema.DATETIME)) {
415             final XMLGregorianCalendar calendar = literal.calendarValue();
416             return AvroSerializer.newGenericRecord(AvroSchemas.CALENDAR, calendar.getTimezone(),
417                     calendar.toGregorianCalendar().getTimeInMillis());
418         }
419         throw new IllegalArgumentException("Unsupported literal: " + literal);
420     }
421 
422     private Object encodeStatement(final Statement statement) {
423         return AvroSerializer.newGenericRecord(AvroSchemas.STATEMENT,
424                 encodeIdentifier(statement.getSubject()),
425                 encodeIdentifier(statement.getPredicate()), //
426                 encodeValue(statement.getObject()), //
427                 encodeIdentifier(statement.getContext()));
428     }
429 
430     private URI ensureInDictionary(final URI uri) {
431         try {
432             this.dictionary.keyFor(uri);
433             return uri;
434         } catch (final IOException ex) {
435             throw new IllegalStateException("Cannot access dictionary: " + ex.getMessage(), ex);
436         }
437     }
438 
439     private static GenericData.Record newGenericRecord(final Schema schema,
440             final Object... fieldValues) {
441 
442         final GenericData.Record record = new GenericData.Record(schema);
443         for (int i = 0; i < fieldValues.length; ++i) {
444             record.put(i, fieldValues[i]);
445         }
446         return record;
447     }
448 
449 }