1   package eu.fbk.knowledgestore.server;
2   
3   import com.google.common.base.*;
4   import com.google.common.collect.AbstractIterator;
5   import com.google.common.collect.*;
6   import com.google.common.hash.Hashing;
7   import com.google.common.hash.HashingOutputStream;
8   import com.google.common.html.HtmlEscapers;
9   import com.google.common.io.ByteStreams;
10  import com.google.common.io.CountingOutputStream;
11  import com.google.common.io.FileBackedOutputStream;
12  import com.google.common.net.MediaType;
13  import com.google.common.net.UrlEscapers;
14  import eu.fbk.knowledgestore.*;
15  import eu.fbk.knowledgestore.Outcome.Status;
16  import eu.fbk.knowledgestore.data.*;
17  import eu.fbk.knowledgestore.datastore.DataStore;
18  import eu.fbk.knowledgestore.datastore.DataTransaction;
19  import eu.fbk.knowledgestore.filestore.FileStore;
20  import eu.fbk.knowledgestore.internal.rdf.RDFUtil;
21  import eu.fbk.knowledgestore.triplestore.TripleStore;
22  import eu.fbk.knowledgestore.triplestore.TripleTransaction;
23  import eu.fbk.knowledgestore.vocabulary.KS;
24  import eu.fbk.knowledgestore.vocabulary.NFO;
25  import eu.fbk.knowledgestore.vocabulary.NIE;
26  import info.aduna.iteration.CloseableIteration;
27  import org.openrdf.model.Statement;
28  import org.openrdf.model.URI;
29  import org.openrdf.model.ValueFactory;
30  import org.openrdf.model.vocabulary.RDF;
31  import org.openrdf.query.BindingSet;
32  import org.openrdf.query.Dataset;
33  import org.openrdf.query.QueryEvaluationException;
34  import org.openrdf.query.algebra.TupleExpr;
35  import org.openrdf.query.impl.DatasetImpl;
36  import org.openrdf.query.parser.ParsedQuery;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  
40  import javax.annotation.Nullable;
41  import java.io.*;
42  import java.util.Date;
43  import java.util.List;
44  import java.util.Map;
45  import java.util.Set;
46  import java.util.concurrent.atomic.AtomicLong;
47  
48  // TODO file garbage collection
49  
50  public final class Server extends AbstractKnowledgeStore {
51  
52      private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);
53  
54      private static final int DEFAULT_CHUNK_SIZE = 1024;
55  
56      private static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024;
57  
58      private static long fileVersionCounter = 0L;
59  
60      private final FileStore fileStore;
61  
62      private final DataStore dataStore;
63  
64      private final TripleStore tripleStore;
65  
66      private final int chunkSize;
67  
68      private final int bufferSize;
69  
70      private Server(final Builder builder) {
71  
72          boolean success = false;
73  
74          this.fileStore = Preconditions.checkNotNull(builder.fileStore);
75          this.dataStore = Preconditions.checkNotNull(builder.dataStore);
76          this.tripleStore = Preconditions.checkNotNull(builder.tripleStore);
77  
78          try {
79              this.chunkSize = MoreObjects.firstNonNull(builder.chunkSize, DEFAULT_CHUNK_SIZE);
80              this.bufferSize = MoreObjects.firstNonNull(builder.bufferSize, DEFAULT_BUFFER_SIZE);
81              Preconditions.checkArgument(this.chunkSize > 0);
82              Preconditions.checkArgument(this.bufferSize > 0);
83  
84              // TODO
85              try {
86                  this.fileStore.init();
87                  this.dataStore.init();
88                  this.tripleStore.init();
89              } catch (final Exception ex) {
90                  throw new Error(ex);
91              }
92  
93              success = true;
94  
95          } finally {
96              if (!success) {
97                  closeQuietly(this.fileStore);
98                  closeQuietly(this.dataStore);
99                  closeQuietly(this.tripleStore);
100             }
101         }
102     }
103 
104     @Override
105     protected Session doNewSession(@Nullable final String username, @Nullable final String password) {
106         return new SessionImpl(username, password);
107     }
108 
109     @Override
110     protected void doClose() {
111         closeQuietly(this.fileStore);
112         closeQuietly(this.dataStore);
113         closeQuietly(this.tripleStore);
114     }
115 
116     private static void closeQuietly(@Nullable final Closeable closeable) {
117         if (closeable != null) {
118             try {
119                 closeable.close();
120             } catch (final Throwable ex) {
121                 LOGGER.error(
122                         "Error closing " + closeable.getClass().getSimpleName() + ": "
123                                 + ex.getMessage(), ex);
124             }
125         }
126     }
127 
128     private final class SessionImpl extends AbstractSession {
129 
130         SessionImpl(@Nullable final String username, @Nullable final String password) {
131             super(Data.newNamespaceMap(Data.newNamespaceMap(), Data.getNamespaceMap()), username,
132                     password);
133         }
134 
135         private void check(final boolean condition, final Status status,
136                 @Nullable final URI objectID, @Nullable final String message, final Object... args)
137                 throws OperationException {
138             if (!condition) {
139                 throw newException(status, objectID,
140                         message == null ? null : String.format(message, args));
141             }
142         }
143 
144         private Outcome newOutcome(@Nullable final Status status, @Nullable final URI objectID,
145                 @Nullable final String message, final Object... args) {
146             return Outcome.create(status == null ? Status.ERROR_UNEXPECTED : status,
147                     getInvocationID(), objectID,
148                     message == null ? null : String.format(message, args));
149         }
150 
151         private OperationException newException(@Nullable final Status status,
152                 @Nullable final URI objectID, @Nullable final String message,
153                 final Throwable... causes) {
154             return new OperationException(newOutcome(status, objectID, message), causes);
155         }
156 
157         private <T> Stream<T> attach(final DataTransaction transaction, final Stream<T> stream) {
158             return stream.onClose(new Closeable() {
159 
160                 @Override
161                 public void close() throws IOException {
162                     transaction.end(true);
163                 }
164 
165             });
166         }
167 
168         private <T> Stream<T> attach(final TripleTransaction transaction, final Stream<T> stream) {
169             return stream.onClose(new Closeable() {
170 
171                 @Override
172                 public void close() throws IOException {
173                     transaction.end(true);
174                 }
175 
176             });
177         }
178 
179         @Override
180         protected Representation doDownload(@Nullable final Long timeout, final URI resourceID,
181                 @Nullable final Set<String> mimeTypes, final boolean useCaches) throws Throwable {
182 
183             // Note: no caches used at this moment, so useCaches is ignored
184 
185             // Start a new read-only datastore TX to retrieve file metadata
186             final DataTransaction transaction = Server.this.dataStore.begin(true);
187 
188             try {
189                 // Retrieve file metadata stored as part of the resource record
190                 final Record resource = transaction.lookup(KS.RESOURCE,
191                         ImmutableSet.of(resourceID), ImmutableSet.of(KS.STORED_AS)).getUnique();
192 
193                 // Return null if resource does not exist
194                 if (resource == null) {
195                     return null; // resource does not exist
196                 }
197 
198                 // Retrieve the file metadata; return null if there is no file stored
199                 final Record metadata = resource.getUnique(KS.STORED_AS, Record.class);
200                 if (metadata == null) {
201                     return null;
202                 }
203 
204                 // Retrieve the stored file name (must exist)
205                 final String fileName = metadata.getUnique(NFO.FILE_NAME, String.class);
206                 check(fileName != null, null, resourceID, "No filename stored for resource (!)");
207 
208                 // Check mimeType constraint, if any
209                 String transformToType = null;
210                 final String fileTypeString = metadata.getUnique(NIE.MIME_TYPE, String.class);
211                 if (mimeTypes != null) {
212                     check(fileTypeString != null, Status.ERROR_NOT_ACCEPTABLE, resourceID,
213                             "No MIME type stored for file %s", fileName);
214                     boolean compatible = false;
215                     final MediaType fileType = MediaType.parse(fileTypeString);
216                     for (final String type : mimeTypes) {
217                         try {
218                             final boolean matches = fileType.is(MediaType.parse(type)
219                                     .withoutParameters());
220                             final boolean transform = !matches && !compatible
221                                     && canTransform(fileTypeString, type);
222                             compatible = compatible || matches || transform;
223                             transformToType = transform ? type : null;
224                         } catch (final IllegalArgumentException ex) {
225                             // ignore error if supplied mime type is malformed
226                         }
227                     }
228                     check(compatible, Status.ERROR_NOT_ACCEPTABLE, resourceID,
229                             "Incompatible MIME type %s for file %s", fileType, fileName);
230                 }
231 
232                 // Open a stream over file contents
233                 InputStream stream = Server.this.fileStore.read(fileName);
234                 check(stream != null, null, resourceID, "File %s missing for resource %s (!)",
235                         fileName);
236 
237                 if (transformToType != null) {
238                     // Transformation required: do it and return a subset of metadata
239                     final String ext = Iterables.getFirst(
240                             Data.mimeTypeToExtensions(transformToType), "bin");
241                     final String name = MoreObjects.firstNonNull(
242                             metadata.getUnique(NFO.FILE_NAME, String.class, null), "download")
243                             + "." + ext;
244                     stream = transform(fileTypeString, transformToType, stream);
245                     final Representation representation = Representation.create(stream);
246                     final Record meta = representation.getMetadata();
247                     meta.setID(metadata.getID());
248                     meta.set(NIE.MIME_TYPE, transformToType);
249                     meta.set(NFO.FILE_NAME, name);
250                     meta.set(NFO.FILE_LAST_MODIFIED, metadata.getUnique(NFO.FILE_LAST_MODIFIED));
251                     return representation;
252 
253                 } else {
254                     // No transformation required: build and return the resulting representation
255                     final Representation representation = Representation.create(stream);
256                     representation.getMetadata().setID(metadata.getID());
257                     for (final URI property : metadata.getProperties()) {
258                         representation.getMetadata().set(property, metadata.get(property));
259                     }
260                     return representation;
261                 }
262 
263             } finally {
264                 // End the transaction (commit or rollback is irrelevant)
265                 transaction.end(true);
266             }
267         }
268 
269         private boolean canTransform(final String fromType, final String toType) {
270             final String type = toType.trim().toLowerCase();
271             return type.equals("text/html") || type.equals("text/plain");
272         }
273 
274         private InputStream transform(final String fromType, final String toType,
275                 final InputStream fromStream) throws IOException {
276             final String type = toType.trim().toLowerCase();
277             if (type.equals("text/html")) {
278                 // TODO inefficient + conversion to String may not work as charset is unknown
279                 final byte[] data = ByteStreams.toByteArray(fromStream);
280                 final String string = new String(data, Charsets.UTF_8);
281                 final ByteArrayOutputStream out = new ByteArrayOutputStream();
282                 final OutputStreamWriter writer = new OutputStreamWriter(out, Charsets.UTF_8);
283                 writer.append("<html>\n");
284                 writer.append("<head>\n");
285                 writer.append("<meta http-equiv=\"Content-type\" "
286                         + "content=\"text/html;charset=UTF-8\"/>\n");
287                 writer.append("</head>\n");
288                 writer.append("<body>\n");
289                 writer.append("<pre>");
290                 writer.append(HtmlEscapers.htmlEscaper().escape(string));
291                 writer.append("</pre>\n");
292                 writer.append("</body>\n");
293                 writer.append("</html>\n");
294                 writer.close();
295                 return new ByteArrayInputStream(out.toByteArray());
296             } else if (type.equals("text/plain")) {
297                 return fromStream; // pretend it can be interpreted as UTF-8 data
298             } else {
299                 throw new UnsupportedOperationException();
300             }
301         }
302 
303         @Override
304         protected Outcome doUpload(@Nullable final Long timeout, final URI resourceID,
305                 @Nullable final Representation representation) throws Throwable {
306 
307             // Keep track of the new file name and the status to return
308             String fileName = null;
309             Status status;
310 
311             // Start a read write datastore TX to update resource metadata
312             final DataTransaction transaction = Server.this.dataStore.begin(false);
313 
314             try {
315                 // Retrieve the resource record and the old metadata; fail if it does not exist
316                 final Record resource = transaction.lookup(KS.RESOURCE,
317                         ImmutableSet.of(resourceID), null).getUnique();
318                 if (resource == null) {
319                     throw newException(Status.ERROR_DEPENDENCY_NOT_FOUND, resourceID,
320                             "Specified resource does not exist");
321                 }
322 
323                 // Retrieve old metadata
324                 final Record oldMetadata = resource.getUnique(KS.STORED_AS, Record.class);
325 
326                 // Differentiate between delete and store representation
327                 if (representation == null) {
328                     // In case of deletions, update the resource record dropping the file metadata
329                     status = oldMetadata == null ? Status.OK_UNMODIFIED : Status.OK_DELETED;
330                     resource.set(KS.STORED_AS, null);
331 
332                 } else {
333                     // Otherwise, assign file name and file type, considering supplied values
334                     status = oldMetadata == null ? Status.OK_CREATED : Status.OK_MODIFIED;
335                     final Record metadata = representation.getMetadata();
336                     metadata.setID(Data.getValueFactory().createURI(resourceID + "_file"));
337                     fileName = metadata.getUnique(NFO.FILE_NAME, String.class);
338                     String fileType = metadata.getUnique(NIE.MIME_TYPE, String.class);
339                     if (fileType != null) {
340                         try {
341                             MediaType.parse(fileType);
342                         } catch (final IllegalArgumentException ex) {
343                             fileType = null; // invalid MIME type, drop
344                             metadata.set(NIE.MIME_TYPE, null);
345                         }
346                     }
347                     fileName = generateFileName(resourceID, fileName, fileType);
348                     fileType = fileType != null ? fileType : Data.extensionToMimeType(fileName);
349                     metadata.set(NFO.FILE_NAME, fileName);
350                     metadata.set(NIE.MIME_TYPE, fileType);
351 
352                     // Create new file using the assigned file name
353                     final OutputStream stream = Server.this.fileStore.write(fileName);
354                     try {
355                         // Store the representation, counting written bytes and computing MD5
356                         final CountingOutputStream cos = new CountingOutputStream(stream);
357                         final HashingOutputStream hos = new HashingOutputStream(Hashing.md5(), cos);
358                         representation.writeTo(hos);
359                         hos.close();
360 
361                         // Update metadata attributes
362                         final Record hash = Record.create();
363                         hash.set(NFO.HASH_ALGORITHM, "MD5");
364                         hash.set(NFO.HASH_VALUE, hos.hash().toString());
365                         metadata.set(NFO.HAS_HASH, hash);
366                         metadata.set(NFO.FILE_SIZE, cos.getCount());
367                         if (metadata.isNull(NFO.FILE_LAST_MODIFIED)) {
368                             metadata.set(NFO.FILE_LAST_MODIFIED, new Date());
369                         }
370                     } finally {
371                         stream.close();
372                     }
373 
374                     // Update the resource record
375                     resource.set(KS.STORED_AS, Record.create(metadata, true));
376                 }
377 
378                 // Update the resource record if necessary.
379                 if (status != Status.OK_UNMODIFIED) {
380                     transaction.store(KS.RESOURCE, resource);
381                 }
382 
383                 // Always delete the old file, if previously stored
384                 if (oldMetadata != null) {
385                     deleteFileQuietly(oldMetadata.getUnique(NFO.FILE_NAME, String.class));
386                 }
387 
388                 // Commit transaction
389                 transaction.end(true);
390 
391                 // Compute and return outcome
392                 return newOutcome(status, resourceID, null);
393 
394             } catch (final Throwable ex) {
395                 // Rollback changes on failure
396                 deleteFileQuietly(fileName);
397                 transaction.end(false);
398                 throw ex;
399             }
400         }
401 
402         private String generateFileName(final URI resourceID,
403                 @Nullable final String suppliedFileName, @Nullable final String suppliedFileType) {
404 
405             // Start with default values for file name, extension and MIME type
406             String fileName = "file";
407             String fileExt = "bin"; // default ext for application/octet-stream
408 
409             // Revise file name, extension and MIME type from supplied fileName, if any
410             if (suppliedFileName != null) {
411                 final String name = UrlEscapers.urlPathSegmentEscaper().escape(suppliedFileName);
412                 final int index = name.lastIndexOf('.');
413                 if (index > 0 && index < name.length() - 1) {
414                     fileName = name.substring(0, index);
415                     fileExt = name.substring(index + 1);
416                 }
417             }
418 
419             // Revise file extension and/or MIME type based on supplied MIME type, if any
420             if (suppliedFileType != null) {
421                 final List<String> mimeExtensions = Data.mimeTypeToExtensions(suppliedFileType);
422                 if (!mimeExtensions.isEmpty()) {
423                     fileExt = mimeExtensions.get(0);
424                 }
425             }
426 
427             // Revise file name based on resource ID, if possible
428             final String uri = resourceID.stringValue();
429             int start = 0;
430             int end = uri.length();
431             for (int index = 0; index < uri.length(); ++index) {
432                 final char ch = uri.charAt(index);
433                 if (ch == '/' || ch == ':') {
434                     start = index + 1;
435                 } else if (ch == '.') {
436                     end = index;
437                 } else if (ch == '#' || ch == '?') {
438                     end = Math.min(end, index);
439                     break;
440                 }
441             }
442             if (start < end) {
443                 fileName = uri.substring(start, end);
444             }
445 
446             // Obtain the file version
447             long fileVersion;
448             final long ts = System.currentTimeMillis();
449             synchronized (Server.class) {
450                 ++Server.fileVersionCounter;
451                 if (Server.fileVersionCounter < ts) {
452                     Server.fileVersionCounter = ts;
453                 }
454                 fileVersion = Server.fileVersionCounter;
455             }
456 
457             // Generate and return the filename
458             return fileName + "." + Long.toString(fileVersion, 32) + "." + fileExt;
459         }
460 
461         private void deleteFileQuietly(@Nullable final String fileName) {
462             if (fileName != null) {
463                 try {
464                     Server.this.fileStore.delete(fileName);
465                 } catch (final Throwable ex) {
466                     LOGGER.error("Failed to delete file " + fileName
467                             + " (will be garbage collected)", ex);
468                 }
469             }
470         }
471 
472         @Override
473         protected long doCount(@Nullable final Long timeout, final URI type,
474                 @Nullable final XPath condition, @Nullable final Set<URI> ids) throws Throwable {
475 
476             // Try to transform a retrieve with condition to a faster lookup, if possible
477             final Set<URI> actualIDs = ids != null ? ids : retrieveToLookup(type, condition);
478 
479             // If IDs have been supplied, we prefer to retrieve the records and apply the optional
480             // condition locally (more efficient if few IDs are used)
481             if (actualIDs != null) {
482                 return doRetrieve(timeout, type, condition, actualIDs, condition == null ? null : condition.getProperties(),
483                         null, null).count();
484             }
485 
486             // Otherwise, we resort to the count operation within a read-only datastore TX
487             final DataTransaction tx = Server.this.dataStore.begin(true);
488             try {
489                 return tx.count(type, condition);
490             } finally {
491                 tx.end(true); // commit or rollback irrelevant
492             }
493         }
494 
495         @Override
496         protected Stream<Record> doRetrieve(@Nullable final Long timeout, final URI type,
497                 @Nullable final XPath condition, @Nullable final Set<URI> ids,
498                 @Nullable final Set<URI> properties, @Nullable final Long offset,
499                 @Nullable final Long limit) throws Throwable {
500 
501             // Try to transform a retrieve with condition to a faster lookup, if possible
502             final Set<URI> actualIDs = ids != null ? ids : retrieveToLookup(type, condition);
503 
504             // Start a read-only datastore TX that will end when the resulting cursor is closed
505             final DataTransaction tx = Server.this.dataStore.begin(true);
506 
507             Stream<Record> stream;
508             if (actualIDs == null) {
509                 // 1st approach: do a retrieve() if no ID was supplied
510                 stream = tx.retrieve(type, condition, properties);
511 
512             } else {
513                 // 2nd approach: do a lookup() and apply condition locally
514                 Set<URI> props = properties;
515                 if (props != null && condition != null
516                         && !props.containsAll(condition.getProperties())) {
517                     props = Sets.union(properties, condition.getProperties());
518                 }
519                 stream = tx.lookup(type, actualIDs, props);
520                 if (condition != null) {
521                     stream = stream.filter(condition.asPredicate(), 0);
522                 }
523                 if (props != properties) {
524                     final URI[] array = properties.toArray(new URI[properties.size()]);
525                     stream = stream.transform(new Function<Record, Record>() {
526 
527                         @Override
528                         public Record apply(final Record record) {
529                             record.retain(array);
530                             return record;
531                         }
532 
533                     }, 0);
534                 }
535             }
536 
537             // Apply offset and limit directives
538             if (offset != null || limit != null) {
539                 stream = stream.slice(MoreObjects.firstNonNull(offset, 0L),
540                         MoreObjects.firstNonNull(limit, Long.MAX_VALUE));
541             }
542 
543             // Attach the transaction to the cursor, so that it ends when the latter is closed
544             return attach(tx, stream);
545         }
546 
547         @SuppressWarnings({ "unchecked", "rawtypes" })
548         private Set<URI> retrieveToLookup(final URI type, @Nullable final XPath condition)
549                 throws IOException {
550 
551             if (condition == null) {
552                 return null;
553             }
554 
555             final Map<URI, Set<Object>> restrictions = Maps.newHashMap();
556             condition.decompose(restrictions);
557 
558             DataTransaction tx = null;
559             Set<URI> ids = null;
560             try {
561                 if (KS.RESOURCE.equals(type) && restrictions.containsKey(KS.HAS_MENTION)) {
562                     ids = Sets.newHashSet();
563                     tx = Server.this.dataStore.begin(true);
564                     tx.lookup(KS.MENTION, (Set) restrictions.get(KS.HAS_MENTION),
565                             ImmutableSet.of(KS.MENTION_OF))
566                             .transform(URI.class, true, KS.MENTION_OF).toCollection(ids);
567 
568                 } else if (KS.MENTION.equals(type) && restrictions.containsKey(KS.MENTION_OF)) {
569                     ids = Sets.newHashSet();
570                     tx = Server.this.dataStore.begin(true);
571                     tx.lookup(KS.RESOURCE, (Set) restrictions.get(KS.MENTION_OF),
572                             ImmutableSet.of(KS.HAS_MENTION))
573                             .transform(URI.class, true, KS.HAS_MENTION).toCollection(ids);
574                 }
575             } finally {
576                 if (tx != null) {
577                     tx.end(false);
578                 }
579             }
580 
581             return ids;
582         }
583 
584         @Override
585         protected void doCreate(@Nullable final Long timeout, final URI type,
586                 @Nullable final Stream<? extends Record> records,
587                 final Handler<? super Outcome> handler) throws Throwable {
588 
589             modify(new RecordUpdater() {
590 
591                 @Override
592                 public Record computeNewRecord(final URI id, @Nullable final Record oldRecord,
593                         @Nullable final Record suppliedRecord) throws Throwable {
594                     assert suppliedRecord != null;
595                     check(oldRecord == null, Status.ERROR_OBJECT_ALREADY_EXISTS, id, null);
596                     return suppliedRecord;
597                 }
598 
599             }, type, null, records, handler);
600         }
601 
602         @Override
603         protected void doMerge(@Nullable final Long timeout, final URI type,
604                 @Nullable final Stream<? extends Record> records,
605                 @Nullable final Criteria criteria, final Handler<? super Outcome> handler)
606                 throws Throwable {
607 
608             modify(new RecordUpdater() {
609 
610                 @Override
611                 public Record computeNewRecord(final URI id, @Nullable final Record oldRecord,
612                         @Nullable final Record suppliedRecord) throws Throwable {
613                     assert suppliedRecord != null;
614                     if (criteria == null) {
615                         return oldRecord; // NOP
616                     } else {
617                         final Record record = oldRecord == null ? Record.create(id, type)
618                                 : Record.create(oldRecord, true);
619                         criteria.merge(record, suppliedRecord);
620                         return record;
621                     }
622                 }
623 
624             }, type, null, records, handler);
625         }
626 
627         @Override
628         protected void doUpdate(@Nullable final Long timeout, final URI type,
629                 @Nullable final XPath condition, @Nullable final Set<URI> ids,
630                 @Nullable final Record record, @Nullable final Criteria criteria,
631                 final Handler<? super Outcome> handler) throws Throwable {
632 
633             modify(new RecordUpdater() {
634 
635                 @Override
636                 public Record computeNewRecord(final URI id, @Nullable final Record oldRecord,
637                         @Nullable final Record suppliedRecord) throws Throwable {
638                     assert oldRecord != null;
639                     assert suppliedRecord == null;
640                     final Record newRecord = Record.create(oldRecord, true);
641                     criteria.merge(newRecord, record);
642                     return newRecord;
643                 }
644 
645             }, type, condition, ids == null ? null : Stream.create(ids), handler);
646         }
647 
648         @Override
649         protected void doDelete(@Nullable final Long timeout, final URI type,
650                 @Nullable final XPath condition, @Nullable final Set<URI> ids,
651                 final Handler<? super Outcome> handler) throws Throwable {
652 
653             modify(new RecordUpdater() {
654 
655                 @Override
656                 public Record computeNewRecord(final URI id, @Nullable final Record oldRecord,
657                         @Nullable final Record suppliedRecord) throws Throwable {
658                     assert oldRecord != null;
659                     assert suppliedRecord == null;
660                     return null;
661                 }
662 
663             }, type, condition, ids == null ? null : Stream.create(ids), handler);
664         }
665 
666         private void modify(final RecordUpdater updater, final URI type,
667                 @Nullable final XPath condition, @Nullable final Stream<?> recordOrIDStream,
668                 final Handler<? super Outcome> handler) throws Throwable {
669 
670             // If no cursor was supplied, do a retrieve operation to obtain one
671             final Stream<?> stream = recordOrIDStream != null ? recordOrIDStream : //
672                     retrieveIDs(type, condition);
673 
674             try {
675                 // Process records in chunks, keeping track of chunk start index
676                 stream.chunk(Server.this.chunkSize).toHandler(new Handler<List<?>>() {
677 
678                     private final AtomicLong index = new AtomicLong(0L);
679 
680                     @Override
681                     public void handle(@Nullable final List<?> chunk) throws Throwable {
682                         final long startIndex = this.index.get();
683                         if (chunk != null) {
684                             // Attempt to process the chunk in a single transaction
685                             final boolean success = modifyChunk(updater, type, condition, chunk,
686                                     handler, this.index, false);
687 
688                             // On failure, process elementary 1-element chunks, notifying failures
689                             if (!success) {
690                                 this.index.set(startIndex);
691                                 for (int i = 0; !Thread.interrupted() && i < chunk.size(); ++i) {
692                                     final List<?> newChunk = ImmutableList.of(chunk.get(i));
693                                     modifyChunk(updater, type, condition, newChunk, handler,
694                                             this.index, true);
695                                 }
696                             }
697                         }
698                     }
699 
700                 });
701 
702                 // Notify handler of completion
703                 handler.handle(null);
704 
705             } finally {
706                 // Ensure to close the cursor
707                 closeQuietly(stream);
708             }
709         }
710 
711         private boolean modifyChunk(final RecordUpdater updater, final URI type,
712                 @Nullable final XPath condition, final List<?> suppliedRecordsOrIDs,
713                 final Handler<? super Outcome> handler, final AtomicLong index,
714                 final boolean reportFailure) throws Throwable {
715 
716             // Extract IDs and allocate list for outcomes
717             final long startIndex = index.get();
718             final ValueFactory factory = Data.getValueFactory();
719             final int size = suppliedRecordsOrIDs.size();
720             final List<Outcome> outcomes = Lists.newArrayListWithCapacity(size);
721             final List<URI> ids = Lists.newArrayListWithCapacity(size);
722             final List<Record> suppliedRecords = Lists.newArrayListWithExpectedSize(size);
723             for (final Object input : suppliedRecordsOrIDs) {
724                 if (input instanceof URI) {
725                     ids.add((URI) input);
726                     suppliedRecords.add(null);
727                 } else {
728                     final Record record = (Record) input;
729                     ids.add(record.getID());
730                     suppliedRecords.add(record);
731                 }
732             }
733 
734             // Start a read-write TX to process the chunk
735             final DataTransaction tx = Server.this.dataStore.begin(false);
736 
737             try {
738                 // Retrieve old records for the IDs of this chunk
739                 final Stream<Record> stream = tx.lookup(type, ImmutableSet.copyOf(ids), null);
740                 final Map<URI, Record> oldRecords = stream.toMap(new Function<Record, URI>() {
741 
742                     @Override
743                     public URI apply(final Record record) {
744                         return record.getID();
745                     }
746 
747                 }, Functions.<Record>identity());
748 
749                 // Process old/new record pairs (only those whose old record matches the
750                 // optional condition - this must be checked again as we work in new TX)
751                 for (int i = 0; !Thread.interrupted() && i < size; ++i) {
752                     final URI id = ids.get(i);
753                     final Record oldRecord = oldRecords.get(id);
754                     final Record suppliedRecord = suppliedRecords.get(i);
755                     if (id == null) {
756                         assert suppliedRecord != null;
757                         outcomes.add(newOutcome(Status.ERROR_INVALID_INPUT, null,
758                                 "Missing ID for record:\n" + suppliedRecord //
759                                         .toString(Data.getNamespaceMap(), true)));
760 
761                     } else if (suppliedRecord != null || oldRecord != null
762                             && (condition == null || condition.evalBoolean(oldRecord))) {
763                         final URI oldInvocationID = getInvocationID();
764                         setInvocationID(factory.createURI(oldInvocationID + "#"
765                                 + index.incrementAndGet()));
766                         try {
767                             outcomes.add(modifyRecord(updater, tx, id, oldRecord, suppliedRecord));
768                         } catch (final OperationException ex) {
769                             outcomes.add(ex.getOutcome());
770                         } finally {
771                             setInvocationID(oldInvocationID);
772                         }
773                     }
774                 }
775 
776                 // Attempt commit
777                 tx.end(true);
778 
779                 // Notify handlers and signal success
780                 for (final Outcome outcome : outcomes) {
781                     handler.handle(outcome);
782                 }
783                 return true;
784 
785             } catch (final Throwable ex) {
786                 // Log exception
787                 LOGGER.error("Data processing error", ex);
788 
789                 // Report failure to handler, if requested to do so
790                 if (reportFailure) {
791                     for (int i = 0; i < ids.size(); ++i) {
792                         index.set(startIndex);
793                         handler.handle(Outcome.create(
794                                 Status.ERROR_UNEXPECTED,
795                                 factory.createURI(getInvocationID() + "#"
796                                         + index.incrementAndGet()), ids.get(i), ex.getMessage()));
797                     }
798                 }
799 
800                 // Rollback TX and signal failure
801                 tx.end(false);
802                 return false;
803             }
804         }
805 
806         private Outcome modifyRecord(final RecordUpdater updater,
807                 final DataTransaction transaction, final URI recordID,
808                 @Nullable final Record oldRecord, @Nullable final Record suppliedRecord)
809                 throws Throwable {
810 
811             // Allocate three maps where to track the modifications that have to be done
812             final Set<Record> recordsToStore = Sets.newHashSet();
813             final Set<Record> recordsToDelete = Sets.newHashSet();
814 
815             // Preprocess supplied record
816             if (suppliedRecord != null) {
817                 preprocess(suppliedRecord);
818             }
819 
820             // Compute the new status of the target object; if not deleted, expand and validate it
821             final Record newRecord = updater.computeNewRecord(recordID, oldRecord, suppliedRecord);
822             if (newRecord != null) {
823                 expand(newRecord);
824             }
825 
826             // Register the modification for the target object, determining the status on success
827             Status status = Status.OK_UNMODIFIED;
828             if (newRecord == null) {
829                 if (oldRecord != null) {
830                     recordsToDelete.add(oldRecord);
831                     status = Status.OK_DELETED;
832                 }
833             } else {
834                 if (oldRecord == null) {
835                     recordsToStore.add(newRecord);
836                     status = Status.OK_CREATED;
837                 } else if (!oldRecord.hash().equals(newRecord.hash())) {
838                     recordsToStore.add(newRecord);
839                     status = Status.OK_MODIFIED;
840                 }
841             }
842             if (status == Status.OK_UNMODIFIED) {
843                 return newOutcome(status, recordID, null); // nothing to do here
844             }
845 
846             // Extract related records before and after the modification to be performed
847             final Map<URI, Record> nilMap = ImmutableMap.of();
848             final Map<URI, Record> oldMap = oldRecord == null ? nilMap : extractRelated(oldRecord);
849             final Map<URI, Record> newMap = newRecord == null ? nilMap : extractRelated(newRecord);
850 
851             // For each related record, determine if it has to be changed and how
852             for (final URI id : Sets.union(oldMap.keySet(), newMap.keySet())) {
853 
854                 // Compute what to removed (oldRel/oldProp.) and to add (newRel/newProp.)
855                 final Record oldRel = oldMap.get(id);
856                 final Record newRel = newMap.get(id);
857                 final URI type = MoreObjects.firstNonNull(oldRel, newRel).getSystemType();
858                 if (oldRel != null && newRel != null) {
859                     for (final URI property : oldRel.getProperties()) {
860                         final List<URI> newValues = newRel.get(property, URI.class);
861                         if (!newValues.isEmpty()) {
862                             final List<URI> oldValues = oldRel.get(property, URI.class);
863                             oldRel.remove(property, newValues);
864                             newRel.remove(property, oldValues);
865                         }
866                     }
867                 }
868                 final List<URI> nilList = ImmutableList.of();
869                 final List<URI> oldProperties = oldRel == null ? nilList : oldRel.getProperties();
870                 final List<URI> newProperties = newRel == null ? nilList : newRel.getProperties();
871 
872                 // If there are changes to apply, fetch the record, update it locally, expand and
873                 // validate it and register the required modification (either creation or update)
874                 if (!oldProperties.isEmpty() || !newProperties.isEmpty()) {
875                     Record related = transaction.lookup(type, ImmutableSet.of(id), null)
876                             .getUnique();
877                     if (related == null) {
878                         related = Record.create(id, type);
879                     }
880                     recordsToStore.add(related);
881                     for (final URI property : oldProperties) {
882                         assert oldRel != null;
883                         if (!property.equals(RDF.TYPE)) {
884                             related.remove(property, oldRel.get(property));
885                         }
886                     }
887                     for (final URI property : newProperties) {
888                         assert newRel != null;
889                         if (!property.equals(RDF.TYPE)) {
890                             related.add(property, newRel.get(property));
891                         }
892                     }
893                     expand(related);
894                 }
895             }
896 
897             // If new state for involved records is OK, apply registered modifications
898             for (final Record record : recordsToStore) {
899                 transaction.store(record.getSystemType(), record);
900             }
901             for (final Record record : recordsToDelete) {
902                 transaction.delete(record.getSystemType(), record.getID());
903             }
904 
905             // On success, return Status referred to target object
906             return newOutcome(status, recordID, null);
907         }
908 
909         private Stream<URI> retrieveIDs(final URI type, @Nullable final XPath condition)
910                 throws Throwable {
911 
912             // Allocate a memory buffer that will overflow to disk after a certain size
913             final FileBackedOutputStream buffer = new FileBackedOutputStream(
914                     Server.this.bufferSize);
915 
916             try {
917                 // Store the IDs of all matching records one per line in the buffer
918                 final Writer writer = new OutputStreamWriter(buffer, Charsets.UTF_8);
919                 final DataTransaction tx = Server.this.dataStore.begin(true);
920                 Stream<Record> cursor = null;
921                 try {
922                     cursor = tx.retrieve(type, condition, ImmutableSet.<URI>of());
923                     cursor.toHandler(new Handler<Record>() {
924 
925                         @Override
926                         public void handle(final Record record) throws Throwable {
927                             if (record != null) {
928                                 writer.write(record.getID().stringValue());
929                                 writer.write("\n");
930                             }
931                         }
932 
933                     });
934                 } finally {
935                     closeQuietly(cursor);
936                     tx.end(true); // does not matter
937                     writer.flush();
938                 }
939 
940                 // Return a cursor over buffered IDs
941                 final BufferedReader reader = buffer.asByteSource().asCharSource(Charsets.UTF_8)
942                         .openBufferedStream();
943                 return Stream.create(new AbstractIterator<URI>() {
944 
945                     @Override
946                     protected URI computeNext() {
947                         try {
948                             final String line = reader.readLine();
949                             return line == null ? endOfData() : Data.getValueFactory().createURI(
950                                     line);
951                         } catch (final Throwable ex) {
952                             throw Throwables.propagate(ex);
953                         }
954                     }
955 
956                 }).onClose(buffer);
957 
958             } catch (final Throwable ex) {
959                 // Release the buffer and propagate
960                 buffer.close();
961                 throw ex;
962             }
963         }
964 
965         @Override
966         protected Stream<Record> doMatch(@Nullable final Long timeout,
967                 final Map<URI, XPath> conditions, final Map<URI, Set<URI>> ids,
968                 final Map<URI, Set<URI>> properties) throws Throwable {
969             // TODO
970             throw new UnsupportedOperationException();
971         }
972 
973         @SuppressWarnings("unchecked")
974         @Override
975         protected Outcome doSparqlUpdate(@Nullable Long timeout, @Nullable Stream<? extends Statement> statements) throws Throwable {
976 
977             LOGGER.debug("Server.UPDATING");
978             final TripleTransaction tx = Server.this.tripleStore.begin(false);
979             try {
980                 tx.add(statements);
981                 Outcome outcome = newOutcome(Status.OK_BULK, null, null);
982                 tx.end(true);
983                 return outcome;
984             } catch (final Throwable ex) {
985                 ex.printStackTrace();
986                 tx.end(false); // commit or rollback does not matter
987                 throw ex;
988             }
989             finally {
990                 closeQuietly(statements);
991             }
992         }
993 
994         @SuppressWarnings("unchecked")
995         @Override
996         protected Outcome doSparqlDelete(@Nullable Long timeout, @Nullable Stream<? extends Statement> statements) throws Throwable {
997 
998             LOGGER.debug("Server.REMOVING");
999             final TripleTransaction tx = Server.this.tripleStore.begin(false);
1000             try {
1001                 tx.remove(statements);
1002                 Outcome outcome = newOutcome(Status.OK_BULK, null, null);
1003                 tx.end(true);
1004                 return outcome;
1005             } catch (final Throwable ex) {
1006                 tx.end(false); // commit or rollback does not matter
1007                 throw ex;
1008             }
1009             finally {
1010                 closeQuietly(statements);
1011             }
1012         }
1013 
1014         @SuppressWarnings("unchecked")
1015         @Override
1016         protected <T> Stream<T> doSparql(@Nullable final Long timeout, final Class<T> type,
1017                 final String expression, @Nullable final Set<URI> defaultGraphs,
1018                 @Nullable final Set<URI> namedGraphs) throws Throwable {
1019 
1020             // Parse the query
1021             final ParsedQuery parsedQuery;
1022             try {
1023                 parsedQuery = SparqlHelper.parse(expression, null);
1024             } catch (final Throwable ex) {
1025                 throw newException(Status.ERROR_INVALID_INPUT, null, ex.getMessage(), ex);
1026             }
1027 
1028             // Override the query dataset, if provided in the operation parameters
1029             Dataset dataset = parsedQuery.getDataset();
1030             if (defaultGraphs != null || namedGraphs != null) {
1031                 final DatasetImpl ds = new DatasetImpl();
1032                 final Set<URI> emptyGraphs = ImmutableSet.of();
1033                 for (final URI graph : MoreObjects.firstNonNull(defaultGraphs, emptyGraphs)) {
1034                     ds.addDefaultGraph(graph);
1035                 }
1036                 for (final URI graph : MoreObjects.firstNonNull(namedGraphs, emptyGraphs)) {
1037                     ds.addNamedGraph(graph);
1038                 }
1039                 dataset = ds;
1040             }
1041 
1042             // Operate inside a triple store transaction
1043             final TripleTransaction tx = Server.this.tripleStore.begin(true);
1044             try {
1045                 // Start executing the query, obtaining a Sesame CloseableIteration object
1046                 final TupleExpr expr = parsedQuery.getTupleExpr();
1047                 final CloseableIteration<BindingSet, QueryEvaluationException> iteration;
1048                 iteration = SparqlHelper.evaluate(tx, expr, dataset, null, timeout);
1049 
1050                 // Wrap the iteration object dependings on the requested result
1051                 if (type == BindingSet.class) {
1052                     return attach(tx, (Stream<T>) RDFUtil.toBindingsStream(iteration, parsedQuery
1053                             .getTupleExpr().getBindingNames()));
1054                 } else if (type == Statement.class) {
1055                     return (Stream<T>) attach(tx, RDFUtil.toStatementStream(iteration));
1056                 } else if (type == Boolean.class) {
1057                     try {
1058                         return (Stream<T>) attach(tx, Stream.create(iteration.hasNext()));
1059                     } finally {
1060                         iteration.close();
1061                     }
1062                 } else {
1063                     throw new Error("Unexpected result type: " + type);
1064                 }
1065 
1066             } catch (final Throwable ex) {
1067                 tx.end(true); // commit or rollback does not matter
1068                 throw ex;
1069             }
1070         }
1071 
1072         @Override
1073         protected void doClose() {
1074             evictClosedSessions();
1075             // TODO
1076         }
1077 
1078     }
1079 
1080     private void preprocess(final Record record) throws Throwable {
1081 
1082         // Ignore ks:storedAs possibly supplied by clients, as it is computed with file upload
1083         if (KS.RESOURCE.equals(record.getSystemType())) {
1084             record.set(KS.STORED_AS, null);
1085         }
1086 
1087         // TODO: add here filtering logic to be applied to records coming from the client
1088     }
1089 
1090     private void expand(final Record record) throws Throwable {
1091 
1092         // TODO: validation and inference can be triggered here (perhaps using a Schema object)
1093     }
1094 
1095     private Map<URI, Record> extractRelated(final Record record) throws Throwable {
1096 
1097         // TODO: this has to be done better using some Schema object
1098 
1099         final URI id = record.getID();
1100         final URI type = record.getSystemType();
1101 
1102         final Map<URI, Record> map = Maps.newHashMap();
1103         if (type.equals(KS.RESOURCE)) {
1104             for (final URI mentionID : record.get(KS.HAS_MENTION, URI.class)) {
1105                 map.put(mentionID, Record.create(mentionID, KS.MENTION).add(KS.MENTION_OF, id));
1106             }
1107 
1108         } else if (type.equals(KS.MENTION)) {
1109             final URI resourceID = record.getUnique(KS.MENTION_OF, URI.class);
1110             if (resourceID != null) {
1111                 map.put(resourceID, Record.create(resourceID, KS.RESOURCE).add(KS.HAS_MENTION, id));
1112             }
1113 
1114         } else {
1115             // TODO: handle entities, axioms and contexts
1116             throw new Error("Unexpected type: " + type);
1117         }
1118 
1119         return map;
1120     }
1121 
1122     private interface RecordUpdater {
1123 
1124         @Nullable
1125         Record computeNewRecord(final URI id, @Nullable final Record oldRecord,
1126                 @Nullable final Record suppliedRecord) throws Throwable;
1127 
1128     }
1129 
1130     public static Builder builder(final FileStore fileStore, final DataStore dataStore,
1131             final TripleStore tripleStore) {
1132         return new Builder(fileStore, dataStore, tripleStore);
1133     }
1134 
1135     public static class Builder {
1136 
1137         private final FileStore fileStore;
1138 
1139         private final DataStore dataStore;
1140 
1141         private final TripleStore tripleStore;
1142 
1143         @Nullable
1144         private Integer chunkSize;
1145 
1146         @Nullable
1147         private Integer bufferSize;
1148 
1149         Builder(final FileStore fileStore, final DataStore dataStore, final TripleStore tripleStore) {
1150             this.fileStore = Preconditions.checkNotNull(fileStore);
1151             this.dataStore = Preconditions.checkNotNull(dataStore);
1152             this.tripleStore = Preconditions.checkNotNull(tripleStore);
1153         }
1154 
1155         public Builder chunkSize(@Nullable final Integer chunkSize) {
1156             this.chunkSize = chunkSize;
1157             return this;
1158         }
1159 
1160         public Builder bufferSize(@Nullable final Integer bufferSize) {
1161             this.bufferSize = bufferSize;
1162             return this;
1163         }
1164 
1165         public Server build() {
1166             return new Server(this);
1167         }
1168 
1169     }
1170 
1171 }