1   package eu.fbk.knowledgestore.runtime;
2   
3   import java.io.ByteArrayInputStream;
4   import java.io.ByteArrayOutputStream;
5   import java.io.IOException;
6   import java.io.InputStream;
7   import java.io.OutputStream;
8   import java.util.GregorianCalendar;
9   import java.util.List;
10  import java.util.Set;
11  import java.util.TimeZone;
12  
13  import javax.annotation.Nullable;
14  import javax.xml.datatype.DatatypeFactory;
15  import javax.xml.datatype.XMLGregorianCalendar;
16  
17  import com.google.common.base.Preconditions;
18  import com.google.common.collect.ImmutableList;
19  import com.google.common.collect.Iterables;
20  import com.google.common.collect.Lists;
21  
22  import org.apache.avro.Schema;
23  import org.apache.avro.generic.GenericData;
24  import org.apache.avro.generic.GenericDatumReader;
25  import org.apache.avro.generic.GenericDatumWriter;
26  import org.apache.avro.generic.GenericRecord;
27  import org.apache.avro.io.DatumReader;
28  import org.apache.avro.io.DatumWriter;
29  import org.apache.avro.io.Decoder;
30  import org.apache.avro.io.DecoderFactory;
31  import org.apache.avro.io.Encoder;
32  import org.apache.avro.io.EncoderFactory;
33  import org.openrdf.model.BNode;
34  import org.openrdf.model.Literal;
35  import org.openrdf.model.Resource;
36  import org.openrdf.model.Statement;
37  import org.openrdf.model.URI;
38  import org.openrdf.model.Value;
39  import org.openrdf.model.ValueFactory;
40  import org.openrdf.model.vocabulary.RDF;
41  import org.openrdf.model.vocabulary.XMLSchema;
42  
43  import eu.fbk.knowledgestore.data.Data;
44  import eu.fbk.knowledgestore.data.Dictionary;
45  import eu.fbk.knowledgestore.data.Record;
46  
47  // NOTE: supports only serialization and deserialization of Record, URI, BNode, Literal,
48  // Statement objects. For records, it is possible to specify which properties to serialize /
49  // deserialize.
50  
51  // TODO: add ideas from smaz/jsmaz to dictionary-compress short strings / uris
52  // <https://github.com/icedrake/jsmaz> (30-50% string reduction achievable)
53  
54  public final class SerializerAvro {
55  
56      private final Dictionary<URI> dictionary;
57  
58      private final ValueFactory factory;
59  
60      private final DatatypeFactory datatypeFactory;
61  
62      public SerializerAvro() {
63          this((Dictionary<URI>) null);
64      }
65  
66      public SerializerAvro(@Nullable final Dictionary<URI> dictionary) {
67          this.dictionary = dictionary;
68          this.factory = Data.getValueFactory();
69          this.datatypeFactory = Data.getDatatypeFactory();
70      }
71  
72      public SerializerAvro(final String fileName) throws IOException {
73          this.dictionary = Dictionary.createHadoopDictionary(URI.class, fileName);
74          this.factory = Data.getValueFactory();
75          this.datatypeFactory = Data.getDatatypeFactory();
76      }
77  
78      public Dictionary<URI> getDictionary() {
79          return this.dictionary;
80      }
81  
82      public byte[] compressURI(final URI uri) {
83          Preconditions.checkNotNull(uri);
84          try {
85              final ByteArrayOutputStream stream = new ByteArrayOutputStream();
86              final Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null);
87              final DatumWriter<Object> writer = new GenericDatumWriter<Object>(
88                      Schemas.COMPRESSED_IDENTIFIER);
89              this.dictionary.keyFor(uri); // ensure a compressed version of URI is available
90              final Object generic = encodeIdentifier(uri);
91              writer.write(generic, encoder);
92              return stream.toByteArray();
93  
94          } catch (final IOException ex) {
95              throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
96          }
97      }
98  
99      public URI expandURI(final byte[] bytes) {
100         Preconditions.checkNotNull(bytes);
101         try {
102             final InputStream stream = new ByteArrayInputStream(bytes);
103             final Decoder decoder = DecoderFactory.get().directBinaryDecoder(stream, null);
104             final DatumReader<Object> reader = new GenericDatumReader<Object>(
105                     Schemas.COMPRESSED_IDENTIFIER);
106             final Object generic = reader.read(null, decoder);
107             return (URI) decodeNode(generic);
108 
109         } catch (final IOException ex) {
110             throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
111         }
112     }
113 
114     public byte[] toBytes(final Object object) {
115         try {
116             final ByteArrayOutputStream stream = new ByteArrayOutputStream();
117             this.toStream(stream, object);
118             return stream.toByteArray();
119         } catch (final IOException ex) {
120             throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
121         }
122     }
123 
124     public byte[] toBytes(final Record object, @Nullable final Set<URI> propertiesToSerialize) {
125         try {
126             final ByteArrayOutputStream stream = new ByteArrayOutputStream();
127             this.toStream(stream, object, propertiesToSerialize);
128             return stream.toByteArray();
129         } catch (final IOException ex) {
130             throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
131         }
132     }
133 
134     public Object fromBytes(final byte[] bytes) {
135         try {
136             return this.fromStream(new ByteArrayInputStream(bytes));
137         } catch (final IOException ex) {
138             throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
139         }
140     }
141 
142     public Record fromBytes(final byte[] bytes, final @Nullable Set<URI> propertiesToDeserialize) {
143         try {
144             return this.fromStream(new ByteArrayInputStream(bytes), propertiesToDeserialize);
145         } catch (final IOException ex) {
146             throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
147         }
148     }
149 
150     public void toStream(final OutputStream stream, final Object object) throws IOException {
151         final Object generic = encodeNode(object);
152         final Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null);
153         final DatumWriter<Object> writer = new GenericDatumWriter<Object>(Schemas.NODE);
154         writer.write(generic, encoder);
155         encoder.flush();
156     }
157 
158     public void toStream(final OutputStream stream, final Record object,
159             @Nullable final Set<URI> propertiesToSerialize) throws IOException {
160         final Object generic = encodeRecord(object, propertiesToSerialize);
161         final Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null);
162         final DatumWriter<Object> writer = new GenericDatumWriter<Object>(Schemas.NODE);
163         writer.write(generic, encoder);
164         encoder.flush();
165     }
166 
167     public Object fromStream(final InputStream stream) throws IOException {
168         final Decoder decoder = DecoderFactory.get().directBinaryDecoder(stream, null);
169         final DatumReader<Object> reader = new GenericDatumReader<Object>(Schemas.NODE);
170         final Object generic = reader.read(null, decoder);
171         return decodeNode(generic);
172     }
173 
174     public Record fromStream(final InputStream stream,
175             @Nullable final Set<URI> propertiesToDeserialize) throws IOException {
176         final Decoder decoder = DecoderFactory.get().directBinaryDecoder(stream, null);
177         final DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(
178                 Schemas.NODE);
179         final GenericRecord generic = reader.read(null, decoder);
180         return decodeRecord(generic, propertiesToDeserialize);
181     }
182 
183     private List<Object> decodeNodes(final Object generic) {
184         if (generic instanceof Iterable<?>) {
185             final Iterable<?> iterable = (Iterable<?>) generic;
186             final int size = Iterables.size(iterable);
187             final List<Object> nodes = Lists.<Object>newArrayListWithCapacity(size);
188             for (final Object element : iterable) {
189                 nodes.add(decodeNode(element));
190             }
191             return nodes;
192         }
193         Preconditions.checkNotNull(generic);
194         return ImmutableList.of(decodeNode(generic));
195     }
196 
197     private Object decodeNode(final Object generic) {
198         if (generic instanceof GenericRecord) {
199             final GenericRecord record = (GenericRecord) generic;
200             final Schema schema = record.getSchema();
201             if (schema.equals(Schemas.RECORD)) {
202                 return decodeRecord(record, null);
203             } else if (schema.equals(Schemas.PLAIN_IDENTIFIER)
204                     || schema.equals(Schemas.COMPRESSED_IDENTIFIER)) {
205                 return decodeIdentifier(record);
206             } else if (schema.equals(Schemas.STATEMENT)) {
207                 return decodeStatement(record);
208             }
209         }
210         return decodeLiteral(generic);
211     }
212 
213     @SuppressWarnings("unchecked")
214     private Record decodeRecord(final GenericRecord generic,
215             @Nullable final Set<URI> propertiesToDecode) {
216         final Record record = Record.create();
217         final GenericRecord encodedID = (GenericRecord) generic.get(0);
218         if (encodedID != null) {
219             record.setID((URI) decodeIdentifier(encodedID));
220         }
221         for (final GenericRecord prop : (Iterable<GenericRecord>) generic.get(1)) {
222             final URI property = (URI) decodeIdentifier((GenericRecord) prop.get(0));
223             final List<Object> values = decodeNodes(prop.get(1));
224             if (propertiesToDecode == null || propertiesToDecode.contains(property)) {
225                 record.set(property, values);
226             }
227         }
228         return record;
229     }
230 
231     private Value decodeValue(final Object generic) {
232         if (generic instanceof GenericRecord) {
233             final GenericRecord record = (GenericRecord) generic;
234             final Schema schema = record.getSchema();
235             if (schema.equals(Schemas.COMPRESSED_IDENTIFIER)
236                     || schema.equals(Schemas.PLAIN_IDENTIFIER)) {
237                 return decodeIdentifier(record);
238             }
239         }
240         return decodeLiteral(generic);
241     }
242 
243     private Resource decodeIdentifier(final GenericRecord record) {
244         final Schema schema = record.getSchema();
245         if (schema.equals(Schemas.COMPRESSED_IDENTIFIER)) {
246             try {
247                 return this.dictionary.objectFor((Integer) record.get(0));
248             } catch (final IOException ex) {
249                 throw new IllegalStateException("Cannot access dictionary: " + ex.getMessage(), ex);
250             }
251         } else if (schema.equals(Schemas.PLAIN_IDENTIFIER)) {
252             final String string = record.get(0).toString();
253             if (string.startsWith("_:")) {
254                 return this.factory.createBNode(string.substring(2));
255             } else {
256                 return this.factory.createURI(string);
257             }
258         }
259         throw new IllegalArgumentException("Unsupported encoded identifier: " + record);
260     }
261 
262     private Literal decodeLiteral(final Object generic) {
263         if (generic instanceof GenericRecord) {
264             final GenericRecord record = (GenericRecord) generic;
265             final Schema schema = record.getSchema();
266             if (schema.equals(Schemas.STRING_LANG)) {
267                 final String label = record.get(0).toString(); // Utf8 class used
268                 final Object language = record.get(1);
269                 return this.factory.createLiteral(label, language.toString());
270             } else if (schema.equals(Schemas.SHORT)) {
271                 return this.factory.createLiteral(((Integer) record.get(0)).shortValue());
272             } else if (schema.equals(Schemas.BYTE)) {
273                 return this.factory.createLiteral(((Integer) record.get(0)).byteValue());
274             } else if (schema.equals(Schemas.BIGINTEGER)) {
275                 return this.factory.createLiteral(record.get(0).toString(), XMLSchema.INTEGER);
276             } else if (schema.equals(Schemas.BIGDECIMAL)) {
277                 return this.factory.createLiteral(record.get(0).toString(), XMLSchema.DECIMAL);
278             } else if (schema.equals(Schemas.CALENDAR)) {
279                 final int tz = (Integer) record.get(0);
280                 final GregorianCalendar calendar = new GregorianCalendar();
281                 calendar.setTimeInMillis((Long) record.get(1));
282                 calendar.setTimeZone(TimeZone.getTimeZone(String.format("GMT%s%02d:%02d",
283                         tz >= 0 ? "+" : "-", Math.abs(tz) / 60, Math.abs(tz) % 60)));
284                 return this.factory.createLiteral(this.datatypeFactory
285                         .newXMLGregorianCalendar(calendar));
286             }
287         } else if (generic instanceof CharSequence) {
288             return this.factory.createLiteral(generic.toString()); // Utf8 class used
289         } else if (generic instanceof Boolean) {
290             return this.factory.createLiteral((Boolean) generic);
291         } else if (generic instanceof Long) {
292             return this.factory.createLiteral((Long) generic);
293         } else if (generic instanceof Integer) {
294             return this.factory.createLiteral((Integer) generic);
295         } else if (generic instanceof Double) {
296             return this.factory.createLiteral((Double) generic);
297         } else if (generic instanceof Float) {
298             return this.factory.createLiteral((Float) generic);
299         }
300         Preconditions.checkNotNull(generic);
301         throw new IllegalArgumentException("Unsupported generic data: " + generic);
302     }
303 
304     private Statement decodeStatement(final GenericRecord record) {
305         final Resource subj = decodeIdentifier((GenericRecord) record.get(0));
306         final URI pred = (URI) decodeIdentifier((GenericRecord) record.get(1));
307         final Value obj = decodeValue(record.get(2));
308         final Resource ctx = decodeIdentifier((GenericRecord) record.get(3));
309         if (ctx == null) {
310             return this.factory.createStatement(subj, pred, obj);
311         } else {
312             return this.factory.createStatement(subj, pred, obj, ctx);
313         }
314     }
315 
316     private Object encodeNodes(final Iterable<? extends Object> nodes) {
317         final int size = Iterables.size(nodes);
318         if (size == 1) {
319             return encodeNode(Iterables.get(nodes, 0));
320         }
321         final List<Object> list = Lists.<Object>newArrayListWithCapacity(size);
322         for (final Object node : nodes) {
323             list.add(encodeNode(node));
324         }
325         return list;
326     }
327 
328     private Object encodeNode(final Object node) {
329         if (node instanceof Record) {
330             return encodeRecord((Record) node, null);
331         } else if (node instanceof Literal) {
332             return encodeLiteral((Literal) node);
333         } else if (node instanceof Resource) {
334             return encodeIdentifier((Resource) node);
335         } else if (node instanceof Statement) {
336             return encodeStatement((Statement) node);
337         }
338         Preconditions.checkNotNull(node);
339         throw new IllegalArgumentException("Unsupported node: " + node);
340     }
341 
342     private Object encodeRecord(final Record record, @Nullable final Set<URI> propertiesToEncode) {
343         final URI id = record.getID();
344         final Object encodedID = id == null ? null : encodeIdentifier(id);
345         final List<Object> props = Lists.newArrayList();
346         for (final URI property : record.getProperties()) {
347             if (propertiesToEncode == null || propertiesToEncode.contains(property)) {
348                 ensureInDictionary(property);
349                 final List<? extends Object> nodes = record.get(property);
350                 if (property.equals(RDF.TYPE)) {
351                     for (final Object value : nodes) {
352                         if (value instanceof URI) {
353                             ensureInDictionary((URI) value);
354                         }
355                     }
356                 }
357                 final GenericData.Record prop = new GenericData.Record(Schemas.PROPERTY);
358                 prop.put("propertyURI", encodeIdentifier(property));
359                 prop.put("propertyValue", encodeNodes(nodes));
360                 props.add(prop);
361             }
362         }
363         return SerializerAvro.newGenericRecord(Schemas.RECORD, encodedID, props);
364     }
365 
366     private Object encodeValue(final Value value) {
367         if (value instanceof Literal) {
368             return encodeLiteral((Literal) value);
369         } else if (value instanceof Resource) {
370             return encodeIdentifier((Resource) value);
371         } else {
372             throw new IllegalArgumentException("Unsupported value: " + value);
373         }
374     }
375 
376     private Object encodeIdentifier(final Resource identifier) {
377         if (identifier instanceof URI) {
378             try {
379                 final Integer key = this.dictionary.keyFor((URI) identifier, false);
380                 if (key != null) {
381                     return SerializerAvro.newGenericRecord(Schemas.COMPRESSED_IDENTIFIER, key);
382                 }
383             } catch (final IOException ex) {
384                 throw new IllegalStateException("Cannot access dictionary: " + ex.getMessage(), ex);
385             }
386         }
387         final String id = identifier instanceof BNode ? "_:" + ((BNode) identifier).getID()
388                 : identifier.stringValue();
389         return SerializerAvro.newGenericRecord(Schemas.PLAIN_IDENTIFIER, id);
390     }
391 
392     private Object encodeLiteral(final Literal literal) {
393         final URI datatype = literal.getDatatype();
394         if (datatype == null || datatype.equals(XMLSchema.STRING)) {
395             final String language = literal.getLanguage();
396             if (language == null) {
397                 return literal.getLabel();
398             } else {
399                 return SerializerAvro.newGenericRecord(Schemas.STRING_LANG, literal.getLabel(),
400                         language);
401             }
402         } else if (datatype.equals(XMLSchema.BOOLEAN)) {
403             return literal.booleanValue();
404         } else if (datatype.equals(XMLSchema.LONG)) {
405             return literal.longValue();
406         } else if (datatype.equals(XMLSchema.INT)) {
407             return literal.intValue();
408         } else if (datatype.equals(XMLSchema.DOUBLE)) {
409             return literal.doubleValue();
410         } else if (datatype.equals(XMLSchema.FLOAT)) {
411             return literal.floatValue();
412         } else if (datatype.equals(XMLSchema.SHORT)) {
413             return SerializerAvro.newGenericRecord(Schemas.SHORT, literal.intValue());
414         } else if (datatype.equals(XMLSchema.BYTE)) {
415             return SerializerAvro.newGenericRecord(Schemas.BYTE, literal.intValue());
416         } else if (datatype.equals(XMLSchema.INTEGER)) {
417             return SerializerAvro.newGenericRecord(Schemas.BIGINTEGER, literal.stringValue());
418         } else if (datatype.equals(XMLSchema.DECIMAL)) {
419             return SerializerAvro.newGenericRecord(Schemas.BIGDECIMAL, literal.stringValue());
420         } else if (datatype.equals(XMLSchema.DATETIME)) {
421             final XMLGregorianCalendar calendar = literal.calendarValue();
422             return SerializerAvro.newGenericRecord(Schemas.CALENDAR, calendar.getTimezone(),
423                     calendar.toGregorianCalendar().getTimeInMillis());
424         }
425         throw new IllegalArgumentException("Unsupported literal: " + literal);
426     }
427 
428     private Object encodeStatement(final Statement statement) {
429         return SerializerAvro.newGenericRecord(Schemas.STATEMENT,
430                 encodeIdentifier(statement.getSubject()),
431                 encodeIdentifier(statement.getPredicate()), //
432                 encodeValue(statement.getObject()), //
433                 encodeIdentifier(statement.getContext()));
434     }
435 
436     private URI ensureInDictionary(final URI uri) {
437         try {
438             this.dictionary.keyFor(uri);
439             return uri;
440         } catch (final IOException ex) {
441             throw new IllegalStateException("Cannot access dictionary: " + ex.getMessage(), ex);
442         }
443     }
444 
445     private static GenericData.Record newGenericRecord(final Schema schema,
446             final Object... fieldValues) {
447 
448         final GenericData.Record record = new GenericData.Record(schema);
449         for (int i = 0; i < fieldValues.length; ++i) {
450             record.put(i, fieldValues[i]);
451         }
452         return record;
453     }
454 
455     private static final class Schemas {
456 
457         /** The namespace for KS-specific AVRO schemas. */
458         public static final String NAMESPACE = "eu.fbk.knowledgestore";
459 
460         /** AVRO schema for NULL. */
461         public static final Schema NULL = Schema.create(Schema.Type.NULL);
462 
463         /** AVRO schema for boolean literals. */
464         public static final Schema BOOLEAN = Schema.create(Schema.Type.BOOLEAN);
465 
466         /** AVRO schema for string literals. */
467         public static final Schema STRING = Schema.create(Schema.Type.STRING);
468 
469         /** AVRO schema for string literals with a language. */
470         public static final Schema STRING_LANG = Schema.createRecord("stringlang", null,
471                 Schemas.NAMESPACE, false);
472 
473         /** AVRO schema for long literals. */
474         public static final Schema LONG = Schema.create(Schema.Type.LONG);
475 
476         /** AVRO schema for int literals. */
477         public static final Schema INT = Schema.create(Schema.Type.INT);
478 
479         /** AVRO schema for short literals. */
480         public static final Schema SHORT = Schema.createRecord("short", null, Schemas.NAMESPACE,
481                 false);
482 
483         /** AVRO schema for byte literals. */
484         public static final Schema BYTE = Schema.createRecord("byte", null, Schemas.NAMESPACE,
485                 false);
486 
487         /** AVRO schema for double literals. */
488         public static final Schema DOUBLE = Schema.create(Schema.Type.DOUBLE);
489 
490         /** AVRO schema for float literals. */
491         public static final Schema FLOAT = Schema.create(Schema.Type.FLOAT);
492 
493         /** AVRO schema for big integer literals. */
494         public static final Schema BIGINTEGER = Schema.createRecord("biginteger", null,
495                 Schemas.NAMESPACE, false);
496 
497         /** AVRO schema for big decimal literals. */
498         public static final Schema BIGDECIMAL = Schema.createRecord("bigdecimal", null,
499                 Schemas.NAMESPACE, false);
500 
501         /** AVRO schema for non-compressed IDs (URIs, BNodes). */
502         public static final Schema PLAIN_IDENTIFIER = Schema //
503                 .createRecord("plainidentifier", null, Schemas.NAMESPACE, false);
504 
505         /** AVRO schema for compressed ID (URIs, BNodes). */
506         public static final Schema COMPRESSED_IDENTIFIER = Schema //
507                 .createRecord("compressedidentifier", null, Schemas.NAMESPACE, false);
508 
509         /** AVRO schema for any ID (URIs, BNodes). */
510         public static final Schema IDENTIFIER = Schema.createUnion(ImmutableList.<Schema>of(
511                 PLAIN_IDENTIFIER, COMPRESSED_IDENTIFIER));
512 
513         /** AVRO schema for calendar literals. */
514         public static final Schema CALENDAR = Schema.createRecord("calendar", null,
515                 Schemas.NAMESPACE, false);
516 
517         /** AVRO schema for RDF statements. */
518         public static final Schema STATEMENT = Schema.createRecord("statement", null,
519                 Schemas.NAMESPACE, false);
520 
521         /** AVRO schema for record nodes ({@code Record}). */
522         public static final Schema RECORD = Schema.createRecord("struct", null, Schemas.NAMESPACE,
523                 false);
524 
525         /** AVRO schema for generic data model nodes. */
526         public static final Schema NODE = Schema.createUnion(ImmutableList.<Schema>of(
527                 Schemas.BOOLEAN, Schemas.STRING, Schemas.STRING_LANG, Schemas.LONG, Schemas.INT,
528                 Schemas.SHORT, Schemas.BYTE, Schemas.DOUBLE, Schemas.FLOAT, Schemas.BIGINTEGER,
529                 Schemas.BIGDECIMAL, Schemas.PLAIN_IDENTIFIER, Schemas.COMPRESSED_IDENTIFIER,
530                 Schemas.CALENDAR, Schemas.STATEMENT, Schemas.RECORD));
531 
532         /** AVRO schema for lists of nodes. */
533         public static final Schema LIST = Schema.createArray(Schemas.NODE);
534 
535         /** AVRO schema for properties of a record node. */
536         public static final Schema PROPERTY = Schema.createRecord("property", null,
537                 Schemas.NAMESPACE, false);
538 
539         private Schemas() {
540         }
541 
542         static {
543             Schemas.STRING_LANG.setFields(ImmutableList.<Schema.Field>of(new Schema.Field("label",
544                     Schemas.STRING, null, null), new Schema.Field("language", Schemas.STRING,
545                     null, null)));
546             Schemas.SHORT.setFields(ImmutableList.<Schema.Field>of(new Schema.Field("short",
547                     Schemas.INT, null, null)));
548             Schemas.BYTE.setFields(ImmutableList.<Schema.Field>of(new Schema.Field("byte",
549                     Schemas.INT, null, null)));
550             Schemas.BIGINTEGER.setFields(ImmutableList.<Schema.Field>of(new Schema.Field(
551                     "biginteger", Schemas.STRING, null, null)));
552             Schemas.BIGDECIMAL.setFields(ImmutableList.<Schema.Field>of(new Schema.Field(
553                     "bigdecimal", Schemas.STRING, null, null)));
554             Schemas.PLAIN_IDENTIFIER.setFields(ImmutableList.<Schema.Field>of(new Schema.Field(
555                     "identifier", Schemas.STRING, null, null)));
556             Schemas.COMPRESSED_IDENTIFIER.setFields(ImmutableList
557                     .<Schema.Field>of(new Schema.Field("identifier", Schemas.INT, null, null)));
558             Schemas.CALENDAR.setFields(ImmutableList.<Schema.Field>of(new Schema.Field("timezone",
559                     Schemas.INT, null, null), new Schema.Field("timestamp", Schemas.LONG, null,
560                     null)));
561 
562             Schemas.STATEMENT.setFields(ImmutableList.<Schema.Field>of(
563                     new Schema.Field("subject", Schemas.IDENTIFIER, null, null),
564                     new Schema.Field("predicate", Schemas.IDENTIFIER, null, null),
565                     new Schema.Field("object", Schema.createUnion(ImmutableList.<Schema>of(
566                             Schemas.BOOLEAN, Schemas.STRING, Schemas.STRING_LANG, Schemas.LONG,
567                             Schemas.INT, Schemas.SHORT, Schemas.BYTE, Schemas.DOUBLE,
568                             Schemas.FLOAT, Schemas.BIGINTEGER, Schemas.BIGDECIMAL,
569                             Schemas.CALENDAR, Schemas.PLAIN_IDENTIFIER,
570                             Schemas.COMPRESSED_IDENTIFIER)), null, null), //
571                     new Schema.Field("context", Schemas.IDENTIFIER, null, null)));
572 
573             Schemas.PROPERTY.setFields(ImmutableList.<Schema.Field>of(
574                     new Schema.Field("propertyURI", Schemas.COMPRESSED_IDENTIFIER, null, null),
575                     new Schema.Field("propertyValue", Schema.createUnion(ImmutableList.<Schema>of(
576                             Schemas.BOOLEAN, Schemas.STRING, Schemas.STRING_LANG, Schemas.LONG,
577                             Schemas.INT, Schemas.SHORT, Schemas.BYTE, Schemas.DOUBLE,
578                             Schemas.FLOAT, Schemas.BIGINTEGER, Schemas.BIGDECIMAL,
579                             Schemas.CALENDAR, Schemas.PLAIN_IDENTIFIER,
580                             Schemas.COMPRESSED_IDENTIFIER, Schemas.STATEMENT, Schemas.RECORD,
581                             Schemas.LIST)), null, null)));
582 
583             Schemas.RECORD.setFields(ImmutableList.<Schema.Field>of(
584                     new Schema.Field("id",
585                             Schema.createUnion(ImmutableList.<Schema>of(Schemas.NULL,
586                                     Schemas.PLAIN_IDENTIFIER, Schemas.COMPRESSED_IDENTIFIER)),
587                             null, null), //
588                     new Schema.Field("properties", Schema.createArray(Schemas.PROPERTY), null,
589                             null)));
590         }
591 
592     }
593 
594 }