1   package eu.fbk.knowledgestore;
2   
3   import java.util.Arrays;
4   import java.util.Collection;
5   import java.util.List;
6   import java.util.Map;
7   import java.util.Set;
8   import java.util.regex.Matcher;
9   import java.util.regex.Pattern;
10  
11  import javax.annotation.Nullable;
12  
13  import com.google.common.base.MoreObjects;
14  import com.google.common.base.Preconditions;
15  import com.google.common.collect.ImmutableList;
16  import com.google.common.collect.ImmutableMap;
17  import com.google.common.collect.ImmutableSet;
18  import com.google.common.collect.Iterables;
19  import com.google.common.collect.Lists;
20  import com.google.common.collect.Maps;
21  import com.google.common.collect.Sets;
22  
23  import org.openrdf.model.Statement;
24  import org.openrdf.model.URI;
25  import org.openrdf.query.BindingSet;
26  
27  import eu.fbk.knowledgestore.Outcome.Status;
28  import eu.fbk.knowledgestore.data.Criteria;
29  import eu.fbk.knowledgestore.data.Data;
30  import eu.fbk.knowledgestore.data.Handler;
31  import eu.fbk.knowledgestore.data.ParseException;
32  import eu.fbk.knowledgestore.data.Record;
33  import eu.fbk.knowledgestore.data.Representation;
34  import eu.fbk.knowledgestore.data.Stream;
35  import eu.fbk.knowledgestore.data.XPath;
36  import eu.fbk.knowledgestore.vocabulary.KS;
37  
38  // best effort basis: will interrupt thread, then it is up to the implementation to
39  // decide what to do
40  // for read operations, HTTP connection can be interrupted; for create / merge
41  // operations, no more data should be sent; delete / update will make the connection
42  // fail
43  
44  // properties for which there is no criteria are not modified
45  
46  public abstract class Operation {
47  
48      private final Map<String, String> inheritedNamespaces;
49  
50      Map<String, String> namespaces;
51  
52      Long timeout;
53  
54      Operation(@Nullable final Map<String, String> inheritedNamespaces) {
55          this.inheritedNamespaces = inheritedNamespaces != null ? inheritedNamespaces
56                  : ImmutableMap.<String, String>of();
57          this.namespaces = this.inheritedNamespaces;
58          this.timeout = null;
59      }
60  
61      /**
62       * Sets the optional timeout for this operation in milliseconds. Passing null or a
63       * non-positive value will remove any timeout previously set.
64       * 
65       * @param timeout
66       *            the timeout; null or non-positive to reset
67       * @return this operation object for call chaining
68       */
69      public synchronized Operation timeout(@Nullable final Long timeout) {
70          this.timeout = timeout == null || timeout > 0 ? timeout : null;
71          return this;
72      }
73  
74      /**
75       * Sets the optional namespaces for this operation. Supplied namespaces overrides the
76       * namespaces inherited from the {@code Session}. Passing null will remove any namespace map
77       * previously set on the operation.
78       * 
79       * @param namespaces
80       *            the namespace map overriding session namespaces; null to reset
81       * @return this operation object for call chaining
82       */
83      public synchronized Operation namespaces(@Nullable final Map<String, String> namespaces) {
84          this.namespaces = namespaces == null ? this.inheritedNamespaces : Data.newNamespaceMap(
85                  namespaces, this.inheritedNamespaces);
86          return this;
87      }
88  
89      // UTILITY METHODS
90  
91      static URI checkType(final URI type) {
92          Preconditions.checkNotNull(type, "No type specified");
93          if (!type.equals(KS.RESOURCE) && !type.equals(KS.MENTION) && !type.equals(KS.ENTITY)
94                  && !type.equals(KS.AXIOM)) {
95              throw new IllegalArgumentException("Invalid type: " + type);
96          }
97          return type;
98      }
99  
100     static XPath conditionFor(final URI property, final Object... allowedValues) {
101         final String namespace = property.getNamespace();
102         final String prefix = MoreObjects.firstNonNull(//
103                 Data.namespaceToPrefix(namespace, Data.getNamespaceMap()), "ns");
104         return XPath.parse(
105                 String.format("with %s: <%s> : %s:%s = {}", prefix, namespace, prefix,
106                         property.getLocalName()), allowedValues);
107     }
108 
109     static <T> Handler<T> handlerFor(@Nullable final Collection<? super T> collection) {
110         if (collection == null) {
111             return null;
112         } else {
113             return new Handler<T>() {
114 
115                 @Override
116                 public void handle(final T element) {
117                     if (element != null) {
118                         collection.add(element);
119                     }
120                 }
121 
122             };
123         }
124     }
125 
126     @Nullable
127     static XPath merge(final Iterable<XPath> conditions) {
128         return conditions == null || Iterables.isEmpty(conditions) ? null : XPath.compose("and",
129                 (Object[]) Iterables.toArray(conditions, XPath.class));
130     }
131 
132     static <T> List<T> add(@Nullable final List<T> list, final T element) {
133         if (list == null) {
134             return ImmutableList.of(element);
135         } else {
136             final List<T> tmp = Lists.newArrayList(list);
137             tmp.add(element);
138             return ImmutableList.copyOf(tmp);
139         }
140     }
141 
142     static <T> Set<T> add(@Nullable final Set<T> set, final T element) {
143         if (set == null) {
144             return ImmutableSet.of(element);
145         } else {
146             final List<T> tmp = Lists.newArrayList(set);
147             tmp.add(element);
148             return ImmutableSet.copyOf(tmp);
149         }
150     }
151 
152     /**
153      * Download operation.
154      * <p>
155      * This operation attempts at fetching the representation associated to a resource with the ID
156      * specified. The operation is controlled by two optional parameters:
157      * {@link #caching(boolean)} enables or disables the use of intermediate caches, if available,
158      * while {@link #accept(String...)} / {@link #accept(Iterable)} require the returned
159      * representation to have a certain MIME type (if it is not the case, the invocation fails).
160      * </p>
161      */
162     public abstract static class Download extends Operation {
163 
164         private final URI resourceID;
165 
166         @Nullable
167         private Set<String> mimeTypes;
168 
169         private boolean caching;
170 
171         /**
172          * Creates a new {@code Download} operation instance.
173          * 
174          * @param inheritedNamespaces
175          *            an <b>immutable</b> map of inherited namespaces, possibly null
176          * @param resourceID
177          *            the ID of the resource whose representation should be retrieved
178          */
179         protected Download(@Nullable final Map<String, String> inheritedNamespaces,
180                 final URI resourceID) {
181             super(inheritedNamespaces);
182             this.resourceID = Preconditions.checkNotNull(resourceID, "Null resource ID");
183             this.mimeTypes = null;
184             this.caching = true;
185         }
186 
187         @Override
188         public Download timeout(@Nullable final Long timeout) {
189             return (Download) super.timeout(timeout);
190         }
191 
192         @Override
193         public Download namespaces(@Nullable final Map<String, String> namespaces) {
194             return (Download) super.namespaces(namespaces);
195         }
196 
197         /**
198          * Sets the acceptable MIME type (default: accept everything). Supplied values override
199          * previously configured MIME types; passing null will drop the constraint.
200          * 
201          * @param mimeTypes
202          *            the acceptable MIME types; null (default) to drop any constraint
203          * @return this operation object, for call chaining
204          */
205         public final synchronized Download accept(@Nullable final String... mimeTypes) {
206             return accept(mimeTypes == null ? null : Arrays.asList(mimeTypes));
207         }
208 
209         /**
210          * Sets the acceptable MIME types (default: accept everything). Supplied values override
211          * previously configured MIME types; passing null will drop the constraint.
212          * 
213          * @param mimeTypes
214          *            the acceptable MIME types; null (default) to drop any constraint
215          * @return this operation object, for call chaining
216          */
217         public final synchronized Download accept(
218                 @Nullable final Iterable<? extends String> mimeTypes) {
219             this.mimeTypes = mimeTypes == null ? null : Sets.newLinkedHashSet(mimeTypes);
220             return this;
221         }
222 
223         /**
224          * Sets whether the representation can be retrieved from caches (default true).
225          * 
226          * @param caching
227          *            true, if the representation can be retrieved from caches
228          * @return this operation object, for call chaining
229          */
230         public final synchronized Download caching(final boolean caching) {
231             this.caching = caching;
232             return this;
233         }
234 
235         /**
236          * Executes the operation, returning the requested representation or null, if it does not
237          * exist. Note that returned representations MUST be closed after use.
238          * 
239          * @return the requested representation, or null if it does not exist
240          * @throws OperationException
241          *             in case of failure (see possible outcome status codes)
242          */
243         public final synchronized Representation exec() throws OperationException {
244             return doExec(this.timeout, this.resourceID, this.mimeTypes, this.caching);
245         }
246 
247         /**
248          * Implementation method responsible of executing the download operation.
249          * 
250          * @param timeout
251          *            the optional timeout for the operation; null if there is no timeout
252          * @param resourceID
253          *            the ID of the resource whose representation must be returned
254          * @param mimeTypes
255          *            the acceptable MIME types; null if there is no constraint
256          * @param caching
257          *            true, if the representation can be retrieved from a cache
258          * @return the resource representation, if exists, otherwise null
259          * @throws OperationException
260          *             in case of failure (see possible outcome status codes)
261          */
262         @Nullable
263         protected abstract Representation doExec(@Nullable Long timeout, URI resourceID,
264                 @Nullable Set<String> mimeTypes, boolean caching) throws OperationException;
265 
266     }
267 
268     /**
269      * Upload operation.
270      * <p>
271      * The operation outcome may assume one among the following {@code Status} codes:
272      * </p>
273      * <table border="1">
274      * <tr>
275      * <th>Status</th>
276      * <th>Explanation</th>
277      * <th>Reported as</td>
278      * </tr>
279      * <tr>
280      * <td>{@link Status#OK_CREATED}</td>
281      * <td>the file was successfully uploaded without replacing an existing file for the same
282      * resource</td>
283      * <td>{@code exec()} return value</td>
284      * </tr>
285      * <tr>
286      * <td>{@link Status#OK_MODIFIED}</td>
287      * <td>the file was successfully uploaded and replaced an existing file bound to the same
288      * resource</td>
289      * <td>{@code exec()} return value</td>
290      * </tr>
291      * <tr>
292      * <td>{@link Status#OK_DELETED}</td>
293      * <td>the file was successfully deleted</td>
294      * <td>{@code exec()} return value</td>
295      * </tr>
296      * <tr>
297      * <td>{@link Status#ERROR_INVALID_INPUT}</td>
298      * <td>in case the supplied URI is not valid or a problem is detected in the uploaded file</td>
299      * <td>{@code OperationException}</td>
300      * </tr>
301      * <tr>
302      * <td>{@link Status#ERROR_OBJECT_NOT_FOUND}</td>
303      * <td>the file cannot be deleted because it does not exist (the associated resource may or
304      * may not exist)</td>
305      * <td>{@code OperationException}</td>
306      * </tr>
307      * <tr>
308      * <td>{@link Status#ERROR_DEPENDENCY_NOT_FOUND}</td>
309      * <td>supplied file cannot be stored because the referenced resource does not exist</td>
310      * <td>{@code OperationException}</td>
311      * </tr>
312      * <tr>
313      * <td>{@link Status#ERROR_UNEXPECTED}</td>
314      * <td>an unexpected error occurred preventing the successful execution of the operation</td>
315      * <td>{@code OperationException}</td>
316      * </tr>
317      * <tr>
318      * <td>{@link Status#ERROR_UNKNOWN}</td>
319      * <td>a connectivity problem caused the interruption of the operation whose outcome is
320      * unknown</td>
321      * <td>{@code OperationException}</td>
322      * </tr>
323      * </table>
324      */
325     public abstract static class Upload extends Operation {
326 
327         private final URI resourceID;
328 
329         @Nullable
330         private Representation representation;
331 
332         protected Upload(final Map<String, String> inheritedNamespaces, final URI id) {
333             super(inheritedNamespaces);
334             this.resourceID = Preconditions.checkNotNull(id, "Null resource ID");
335             this.representation = null;
336         }
337 
338         @Override
339         public Upload timeout(@Nullable final Long timeout) {
340             return (Upload) super.timeout(timeout);
341         }
342 
343         @Override
344         public Upload namespaces(@Nullable final Map<String, String> namespaces) {
345             return (Upload) super.namespaces(namespaces);
346         }
347 
348         /**
349          * Sets the representation to upload, if any. Setting null will cause any previously
350          * stored representation to be dropped.
351          * 
352          * @param representation
353          *            the representation to upload, if any
354          * @return this operation object, for call chaining
355          */
356         public final synchronized Upload representation(
357                 @Nullable final Representation representation) {
358             this.representation = representation;
359             return this;
360         }
361 
362         /**
363          * Executes the operation, returning its outcome.
364          * 
365          * @return the outcome of the operation
366          * @throws OperationException
367          *             in case of failure (see possible outcome status codes)
368          */
369         public final synchronized Outcome exec() throws OperationException {
370             return doExec(this.timeout, this.resourceID, this.representation);
371         }
372 
373         protected abstract Outcome doExec(@Nullable Long timeout, URI resourceID,
374                 @Nullable Representation representation) throws OperationException;
375 
376     }
377 
378     public abstract static class Count extends Operation {
379 
380         private final URI type;
381 
382         @Nullable
383         private List<XPath> conditions;
384 
385         @Nullable
386         private Set<URI> ids;
387 
388         protected Count(final Map<String, String> namespaces, final URI type) {
389             super(namespaces);
390             this.type = checkType(type);
391             this.conditions = null;
392             this.ids = null;
393         }
394 
395         @Override
396         public Count timeout(@Nullable final Long timeout) {
397             return (Count) super.timeout(timeout);
398         }
399 
400         @Override
401         public Count namespaces(@Nullable final Map<String, String> namespaces) {
402             return (Count) super.namespaces(namespaces);
403         }
404 
405         public final synchronized Count conditions(@Nullable final XPath... conditions) {
406             return conditions(conditions == null ? null : Arrays.asList(conditions));
407         }
408 
409         public final synchronized Count conditions(
410                 @Nullable final Iterable<? extends XPath> conditions) {
411             this.conditions = conditions == null ? null : ImmutableList.copyOf(conditions);
412             return this;
413         }
414 
415         public final synchronized Count condition(final String condition,
416                 final Object... arguments) throws ParseException {
417             this.conditions = add(this.conditions,
418                     XPath.parse(this.namespaces, condition, arguments));
419             return this;
420         }
421 
422         public final synchronized Count condition(final URI property,
423                 final Object... allowedValues) {
424             this.conditions = add(this.conditions, conditionFor(property, allowedValues));
425             return this;
426         }
427 
428         public final synchronized Count ids(@Nullable final URI... ids) {
429             return ids(ids == null ? null : Arrays.asList(ids));
430         }
431 
432         public final synchronized Count ids(@Nullable final Iterable<? extends URI> ids) {
433             this.ids = ids == null ? null : ImmutableSet.copyOf(ids);
434             return this;
435         }
436 
437         public final synchronized long exec() throws OperationException {
438             return doExec(this.timeout, this.type, merge(this.conditions), this.ids);
439         }
440 
441         protected abstract long doExec(@Nullable Long timeout, URI type,
442                 @Nullable XPath condition, @Nullable Set<URI> ids) throws OperationException;
443 
444     }
445 
446     public abstract static class Retrieve extends Operation {
447 
448         private final URI type;
449 
450         @Nullable
451         private List<XPath> conditions;
452 
453         @Nullable
454         private Set<URI> ids;
455 
456         @Nullable
457         private Set<URI> properties;
458 
459         @Nullable
460         private Long offset;
461 
462         @Nullable
463         private Long limit;
464 
465         protected Retrieve(final Map<String, String> namespaces, final URI type) {
466             super(namespaces);
467             this.type = checkType(type);
468             this.conditions = null;
469             this.ids = null;
470             this.properties = null;
471             this.offset = null;
472             this.limit = null;
473         }
474 
475         @Override
476         public Retrieve timeout(@Nullable final Long timeout) {
477             return (Retrieve) super.timeout(timeout);
478         }
479 
480         @Override
481         public Retrieve namespaces(@Nullable final Map<String, String> namespaces) {
482             return (Retrieve) super.namespaces(namespaces);
483         }
484 
485         public final synchronized Retrieve conditions(@Nullable final XPath... conditions) {
486             return conditions(conditions == null ? null : Arrays.asList(conditions));
487         }
488 
489         public final synchronized Retrieve conditions(
490                 @Nullable final Iterable<? extends XPath> conditions) {
491             this.conditions = conditions == null ? null : ImmutableList.copyOf(conditions);
492             return this;
493         }
494 
495         public final synchronized Retrieve condition(final String condition,
496                 final Object... arguments) throws ParseException {
497             this.conditions = add(this.conditions,
498                     XPath.parse(this.namespaces, condition, arguments));
499             return this;
500         }
501 
502         public final synchronized Retrieve condition(final URI property,
503                 final Object... allowedValues) {
504             this.conditions = add(this.conditions, conditionFor(property, allowedValues));
505             return this;
506         }
507 
508         public final synchronized Retrieve ids(@Nullable final URI... ids) {
509             return ids(ids == null ? null : Arrays.asList(ids));
510         }
511 
512         public final synchronized Retrieve ids(@Nullable final Iterable<? extends URI> ids) {
513             this.ids = ids == null ? null : ImmutableSet.copyOf(ids);
514             return this;
515         }
516 
517         // no call or empty = all properties
518 
519         public final synchronized Retrieve properties(@Nullable final URI... properties) {
520             return properties(properties == null ? null : Arrays.asList(properties));
521         }
522 
523         public final synchronized Retrieve properties(
524                 @Nullable final Iterable<? extends URI> properties) {
525             this.properties = properties == null ? null : ImmutableSet.copyOf(properties);
526             return this;
527         }
528 
529         public final synchronized Retrieve offset(@Nullable final Long offset) {
530             this.offset = offset == null || offset <= 0 ? null : offset;
531             return this;
532         }
533 
534         public final synchronized Retrieve limit(@Nullable final Long limit) {
535             this.limit = limit == null || limit <= 0 ? null : limit;
536             return this;
537         }
538 
539         public final synchronized Stream<Record> exec() throws OperationException {
540             return doExec(this.timeout, this.type, merge(this.conditions), this.ids,
541                     this.properties, this.offset, this.limit);
542         }
543 
544         protected abstract Stream<Record> doExec(@Nullable final Long timeout, final URI type,
545                 @Nullable final XPath condition, @Nullable final Set<URI> ids,
546                 @Nullable final Set<URI> properties, @Nullable Long offset, @Nullable Long limit)
547                 throws OperationException;
548 
549     }
550 
551     public abstract static class Create extends Operation {
552 
553         private final URI type;
554 
555         @Nullable
556         private Stream<? extends Record> records;
557 
558         protected Create(final Map<String, String> namespaces, final URI type) {
559             super(namespaces);
560             this.type = checkType(type);
561             this.records = null;
562         }
563 
564         @Override
565         public Create timeout(@Nullable final Long timeout) {
566             return (Create) super.timeout(timeout);
567         }
568 
569         @Override
570         public Create namespaces(@Nullable final Map<String, String> namespaces) {
571             return (Create) super.namespaces(namespaces);
572         }
573 
574         public final synchronized Create records(@Nullable final Record... records) {
575             this.records = records == null ? null : Stream.create(records);
576             return this;
577         }
578 
579         public final synchronized Create records(//
580                 @Nullable final Iterable<? extends Record> records) {
581             this.records = records == null ? null : Stream.create(records);
582             return this;
583         }
584 
585         public final synchronized Outcome exec() throws OperationException {
586             return doExec(this.timeout, this.type, this.records, null);
587         }
588 
589         // TODO: errors from the handlers are logged and cause the invocation to be interrupted
590         // eventually; still, the handler will be notified of all the outcomes until the
591         // invocation ends
592 
593         public final synchronized Outcome exec(@Nullable final Handler<? super Outcome> handler)
594                 throws OperationException {
595             return doExec(this.timeout, this.type, this.records, handler);
596         }
597 
598         public final synchronized Outcome exec(
599                 @Nullable final Collection<? super Outcome> collection) throws OperationException {
600             return doExec(this.timeout, this.type, this.records, handlerFor(collection));
601         }
602 
603         protected abstract Outcome doExec(@Nullable Long timeout, final URI type,
604                 @Nullable final Stream<? extends Record> records,
605                 @Nullable final Handler<? super Outcome> handler) throws OperationException;
606 
607     }
608 
609     public abstract static class Merge extends Operation {
610 
611         private final URI type;
612 
613         @Nullable
614         private Stream<? extends Record> records;
615 
616         @Nullable
617         private Criteria criteria;
618 
619         protected Merge(final Map<String, String> namespaces, final URI type) {
620             super(namespaces);
621             this.type = checkType(type);
622             this.records = null;
623             this.criteria = null;
624         }
625 
626         @Override
627         public Merge timeout(@Nullable final Long timeout) {
628             return (Merge) super.timeout(timeout);
629         }
630 
631         @Override
632         public Merge namespaces(@Nullable final Map<String, String> namespaces) {
633             return (Merge) super.namespaces(namespaces);
634         }
635 
636         public final synchronized Merge records(@Nullable final Record... records) {
637             this.records = records == null ? null : Stream.create(records);
638             return this;
639         }
640 
641         public final synchronized Merge records(//
642                 @Nullable final Iterable<? extends Record> records) {
643             this.records = records == null ? null : Stream.create(records);
644             return this;
645         }
646 
647         public final synchronized Merge criteria(@Nullable final Criteria... criteria) {
648             return criteria(criteria == null ? null : Arrays.asList(criteria));
649         }
650 
651         public final synchronized Merge criteria(
652                 @Nullable final Iterable<? extends Criteria> criteria) {
653             this.criteria = criteria == null || Iterables.isEmpty(criteria) ? null : Criteria
654                     .compose(Iterables.toArray(criteria, Criteria.class));
655             return this;
656         }
657 
658         public final synchronized Merge criteria(@Nullable final String criteria)
659                 throws ParseException {
660             this.criteria = criteria == null ? null : Criteria.parse(criteria, this.namespaces);
661             return this;
662         }
663 
664         public final synchronized Outcome exec() throws OperationException {
665             return doExec(this.timeout, this.type, this.records, this.criteria, null);
666         }
667 
668         public final synchronized Outcome exec(@Nullable final Handler<? super Outcome> handler)
669                 throws OperationException {
670             return doExec(this.timeout, this.type, this.records, this.criteria, handler);
671         }
672 
673         public final synchronized Outcome exec(
674                 @Nullable final Collection<? super Outcome> collection) throws OperationException {
675             return doExec(this.timeout, this.type, this.records, this.criteria,
676                     handlerFor(collection));
677         }
678 
679         protected abstract Outcome doExec(@Nullable Long timeout, URI type,
680                 @Nullable Stream<? extends Record> stream, @Nullable Criteria criteria,
681                 @Nullable Handler<? super Outcome> handler) throws OperationException;
682 
683     }
684 
685     public abstract static class Update extends Operation {
686 
687         private final URI type;
688 
689         @Nullable
690         private List<XPath> conditions;
691 
692         @Nullable
693         private Set<URI> ids;
694 
695         @Nullable
696         private Record record;
697 
698         @Nullable
699         private Criteria criteria;
700 
701         protected Update(final Map<String, String> namespaces, final URI type) {
702             super(namespaces);
703             this.type = checkType(type);
704             this.conditions = null;
705             this.ids = null;
706             this.record = null;
707             this.criteria = null;
708         }
709 
710         @Override
711         public Update timeout(@Nullable final Long timeout) {
712             return (Update) super.timeout(timeout);
713         }
714 
715         @Override
716         public Update namespaces(@Nullable final Map<String, String> namespaces) {
717             return (Update) super.namespaces(namespaces);
718         }
719 
720         public final synchronized Update conditions(@Nullable final XPath... conditions) {
721             return conditions(conditions == null ? null : Arrays.asList(conditions));
722         }
723 
724         public final synchronized Update conditions(
725                 @Nullable final Iterable<? extends XPath> conditions) {
726             this.conditions = conditions == null ? null : ImmutableList.copyOf(conditions);
727             return this;
728         }
729 
730         public final synchronized Update condition(final String condition,
731                 final Object... arguments) throws ParseException {
732             this.conditions = add(this.conditions,
733                     XPath.parse(this.namespaces, condition, arguments));
734             return this;
735         }
736 
737         public final synchronized Update condition(final URI property,
738                 final Object... allowedValues) {
739             this.conditions = add(this.conditions, conditionFor(property, allowedValues));
740             return this;
741         }
742 
743         public final synchronized Update ids(@Nullable final URI... ids) {
744             return ids(ids == null ? null : Arrays.asList(ids));
745         }
746 
747         public final synchronized Update ids(@Nullable final Iterable<? extends URI> ids) {
748             this.ids = ids == null ? null : ImmutableSet.copyOf(ids);
749             return this;
750         }
751 
752         public final synchronized Update record(@Nullable final Record record) {
753             this.record = record;
754             return this;
755         }
756 
757         public final synchronized Update criteria(@Nullable final Criteria... criteria) {
758             return criteria(criteria == null ? null : Arrays.asList(criteria));
759         }
760 
761         public final synchronized Update criteria(
762                 @Nullable final Iterable<? extends Criteria> criteria) {
763             this.criteria = criteria == null || Iterables.isEmpty(criteria) ? null : Criteria
764                     .compose(Iterables.toArray(criteria, Criteria.class));
765             return this;
766         }
767 
768         public final synchronized Update criteria(@Nullable final String criteria)
769                 throws ParseException {
770             this.criteria = criteria == null ? null : Criteria.parse(criteria, this.namespaces);
771             return this;
772         }
773 
774         public final synchronized Outcome exec() throws OperationException {
775             return doExec(this.timeout, this.type, merge(this.conditions), this.ids, this.record,
776                     this.criteria, null);
777         }
778 
779         public final synchronized Outcome exec(@Nullable final Handler<? super Outcome> handler)
780                 throws OperationException {
781             final Record record = this.record != null ? this.record : Record.create();
782             return doExec(this.timeout, this.type, merge(this.conditions), this.ids, record,
783                     this.criteria, handler);
784         }
785 
786         public final synchronized Outcome exec(
787                 @Nullable final Collection<? super Outcome> collection) throws OperationException {
788             return doExec(this.timeout, this.type, merge(this.conditions), this.ids, this.record,
789                     this.criteria, handlerFor(collection));
790         }
791 
792         protected abstract Outcome doExec(@Nullable Long timeout, URI type,
793                 @Nullable XPath condition, @Nullable Set<URI> ids, @Nullable Record record,
794                 @Nullable Criteria criteria, @Nullable Handler<? super Outcome> handler)
795                 throws OperationException;
796 
797     }
798 
799     public abstract static class Delete extends Operation {
800 
801         private final URI type;
802 
803         @Nullable
804         private List<XPath> conditions;
805 
806         @Nullable
807         private Set<URI> ids;
808 
809         protected Delete(final Map<String, String> namespaces, final URI type) {
810             super(namespaces);
811             this.type = checkType(type);
812             this.conditions = null;
813             this.ids = null;
814         }
815 
816         @Override
817         public Delete timeout(@Nullable final Long timeout) {
818             return (Delete) super.timeout(timeout);
819         }
820 
821         @Override
822         public Delete namespaces(@Nullable final Map<String, String> namespaces) {
823             return (Delete) super.namespaces(namespaces);
824         }
825 
826         public final synchronized Delete conditions(@Nullable final XPath... conditions) {
827             return conditions(conditions == null ? null : Arrays.asList(conditions));
828         }
829 
830         public final synchronized Delete conditions(
831                 @Nullable final Iterable<? extends XPath> conditions) {
832             this.conditions = conditions == null ? null : ImmutableList.copyOf(conditions);
833             return this;
834         }
835 
836         public final synchronized Delete condition(final String condition,
837                 final Object... arguments) throws ParseException {
838             this.conditions = add(this.conditions,
839                     XPath.parse(this.namespaces, condition, arguments));
840             return this;
841         }
842 
843         public final synchronized Delete condition(final URI property,
844                 final Object... allowedValues) {
845             this.conditions = add(this.conditions, conditionFor(property, allowedValues));
846             return this;
847         }
848 
849         public final synchronized Delete ids(@Nullable final URI... ids) {
850             return ids(ids == null ? null : Arrays.asList(ids));
851         }
852 
853         public final synchronized Delete ids(@Nullable final Iterable<? extends URI> ids) {
854             this.ids = ids == null ? null : ImmutableSet.copyOf(ids);
855             return this;
856         }
857 
858         public final synchronized Outcome exec() throws OperationException {
859             return doExec(this.timeout, this.type, merge(this.conditions), this.ids, null);
860         }
861 
862         public final synchronized Outcome exec(@Nullable final Handler<? super Outcome> handler)
863                 throws OperationException {
864             return doExec(this.timeout, this.type, merge(this.conditions), this.ids, handler);
865         }
866 
867         public final synchronized Outcome exec(
868                 @Nullable final Collection<? super Outcome> collection) throws OperationException {
869             return doExec(this.timeout, this.type, merge(this.conditions), this.ids,
870                     handlerFor(collection));
871         }
872 
873         protected abstract Outcome doExec(@Nullable Long timeout, URI type,
874                 @Nullable XPath condition, @Nullable Set<URI> ids,
875                 @Nullable Handler<? super Outcome> handler) throws OperationException;
876 
877     }
878 
879     public abstract static class Match extends Operation {
880 
881         private static final Map<URI, URI> COMPONENT_NORMALIZATION_MAP = //
882         ImmutableMap.<URI, URI>builder().put(KS.RESOURCE, KS.RESOURCE)
883                 .put(KS.MATCHED_RESOURCE, KS.RESOURCE).put(KS.MENTION, KS.MENTION)
884                 .put(KS.MATCHED_MENTION, KS.MENTION).put(KS.ENTITY, KS.ENTITY)
885                 .put(KS.MATCHED_ENTITY, KS.ENTITY).build();
886 
887         private final Map<URI, Set<XPath>> conditions;
888 
889         private final Map<URI, Set<URI>> ids;
890 
891         private final Map<URI, Set<URI>> properties;
892 
893         protected Match(final Map<String, String> namespaces) {
894             super(namespaces);
895             this.conditions = Maps.newHashMap();
896             this.ids = Maps.newHashMap();
897             this.properties = Maps.newHashMap();
898         }
899 
900         @Override
901         public Match timeout(@Nullable final Long timeout) {
902             return (Match) super.timeout(timeout);
903         }
904 
905         @Override
906         public Match namespaces(@Nullable final Map<String, String> namespaces) {
907             return (Match) super.namespaces(namespaces);
908         }
909 
910         public final synchronized Match conditions(@Nullable final URI component,
911                 @Nullable final XPath... conditions) {
912             return conditions(component, conditions == null ? null : Arrays.asList(conditions));
913         }
914 
915         public final synchronized Match conditions(@Nullable final URI component,
916                 @Nullable final Iterable<? extends XPath> conditions) {
917             if (component == null) {
918                 this.conditions.clear();
919             } else {
920                 this.conditions.put(checkComponent(component), conditions == null ? null
921                         : ImmutableSet.copyOf(conditions));
922             }
923             return this;
924         }
925 
926         public final synchronized Match condition(final URI component, final String condition,
927                 final Object... arguments) throws ParseException {
928             final URI comp = checkComponent(component);
929             final XPath cond = XPath.parse(this.namespaces, condition, arguments);
930             this.conditions.put(comp, add(this.conditions.get(comp), cond));
931             return this;
932         }
933 
934         public final synchronized Match condition(final URI component, final URI property,
935                 final Object... allowedValues) {
936             final URI comp = checkComponent(component);
937             final XPath cond = conditionFor(property, allowedValues);
938             this.conditions.put(comp, add(this.conditions.get(comp), cond));
939             return this;
940         }
941 
942         public final synchronized Match ids(@Nullable final URI component,
943                 @Nullable final URI... ids) {
944             return ids(component, ids == null ? null : Arrays.asList(ids));
945         }
946 
947         public final synchronized Match ids(@Nullable final URI component,
948                 @Nullable final Iterable<? extends URI> ids) {
949             if (component == null) {
950                 this.ids.clear();
951             } else {
952                 this.ids.put(checkComponent(component),
953                         ids == null ? null : ImmutableSet.copyOf(ids));
954             }
955             return this;
956         }
957 
958         // no call = exclude class; empty = all properties
959 
960         public final synchronized Match properties(@Nullable final URI component,
961                 @Nullable final URI... properties) {
962             return properties(component, properties == null ? null : Arrays.asList(properties));
963         }
964 
965         public final synchronized Match properties(@Nullable final URI component,
966                 @Nullable final Iterable<? extends URI> properties) {
967             if (component == null) {
968                 this.properties.clear();
969             } else {
970                 this.properties.put(checkComponent(component), properties == null ? null
971                         : ImmutableSet.copyOf(properties));
972             }
973             return this;
974         }
975 
976         public final synchronized Stream<Record> exec() throws OperationException {
977             final Map<URI, XPath> conditions = Maps.newHashMap();
978             for (final URI component : this.conditions.keySet()) {
979                 conditions.put(component, merge(this.conditions.get(component)));
980             }
981             return doExec(this.timeout, conditions, this.ids, this.properties);
982         }
983 
984         protected abstract Stream<Record> doExec(@Nullable final Long timeout,
985                 final Map<URI, XPath> conditions, final Map<URI, Set<URI>> ids,
986                 final Map<URI, Set<URI>> properties) throws OperationException;
987 
988         private URI checkComponent(final URI component) {
989             final URI uri = COMPONENT_NORMALIZATION_MAP.get(component);
990             Preconditions.checkArgument(uri != null, "Invalid match component %s", component);
991             return uri;
992         }
993 
994     }
995 
996     // TODO: add missing namespaces based on available namespaces
997 
998     public abstract static class SparqlUpdate extends Operation {
999 
1000         @Nullable
1001         private Stream<? extends Statement> statements;
1002 
1003         protected SparqlUpdate(final Map<String, String> namespaces) throws ParseException {
1004             super(namespaces);
1005         }
1006 
1007         public final synchronized SparqlUpdate statements(@Nullable final Statement... statements) {
1008             this.statements = statements == null ? null : Stream.create(statements);
1009             return this;
1010         }
1011 
1012         public final synchronized SparqlUpdate statements(//
1013                                                  @Nullable final Iterable<? extends Statement> statements) {
1014             this.statements = statements == null ? null : Stream.create(statements);
1015             return this;
1016         }
1017 
1018         public final synchronized Outcome exec() throws OperationException {
1019             return doExec(this.timeout, this.statements);
1020         }
1021 
1022         protected abstract Outcome doExec(@Nullable Long timeout,
1023                                           @Nullable final Stream<? extends Statement> statements) throws OperationException;
1024 
1025     }
1026 
1027 
1028     public abstract static class SparqlDelete extends Operation {
1029 
1030         @Nullable
1031         private Stream<? extends Statement> statements;
1032 
1033         protected SparqlDelete(final Map<String, String> namespaces) throws ParseException {
1034             super(namespaces);
1035         }
1036 
1037         public final synchronized SparqlDelete statements(@Nullable final Statement... statements) {
1038             this.statements = statements == null ? null : Stream.create(statements);
1039             return this;
1040         }
1041 
1042         public final synchronized SparqlDelete statements(//
1043                                                           @Nullable final Iterable<? extends Statement> statements) {
1044             this.statements = statements == null ? null : Stream.create(statements);
1045             return this;
1046         }
1047 
1048         public final synchronized Outcome exec() throws OperationException {
1049             return doExec(this.timeout, this.statements);
1050         }
1051 
1052         protected abstract Outcome doExec(@Nullable Long timeout,
1053                                           @Nullable final Stream<? extends Statement> statements) throws OperationException;
1054 
1055     }
1056 
1057     /**
1058      * SPARQL query operation.
1059      * <p>
1060      * This operation evaluates a SPARQL query on the KnowledgeStore SPARQL endpoint. All SPARQL
1061      * query forms are supported:
1062      * <ul>
1063      * <li>SELECT queries return a {@code Stream} of {@code BindingSet}s (call
1064      * {@link Sparql#execTuples()});</li>
1065      * <li>CONSTRUCT and DESCRIBE queries return a {@code Stream} of {@link Statement}s (call
1066      * {@link Sparql#execTriples()});</li>
1067      * <li>ASK queries return a boolean result (call {@link Sparql#execBoolean()}).</li>
1068      * </ul>
1069      * </p>
1070      * <p>
1071      * The operation can be configured, as usual, by specifying a timeout and supplying optional
1072      * namespace bindings to be used for parsing the SPARQL expression. In addition,
1073      * {@link Sparql#defaultGraphs(Iterable) default} and {@link Sparql#namedGraphs(Iterable)
1074      * named} graphs can be specified at the operation level, overriding the corresponding
1075      * declarations in the {@code FROM} and {@code FROM NAMED} clauses of the SPARQL query
1076      * expression.
1077      * </p>
1078      */
1079     public abstract static class Sparql extends Operation {
1080 
1081         private static final Pattern PLACEHOLDER_PATTERN = Pattern
1082                 .compile("(?:(?<=\\A|[^\\\\]))([$][$])(?:(?=\\z|.))");
1083 
1084         private final String expression;
1085 
1086         @Nullable
1087         private Set<URI> defaultGraphs;
1088 
1089         @Nullable
1090         private Set<URI> namedGraphs;
1091 
1092         /**
1093          * Creates a new {@code Sparql} operation instance (to be used in {@code Session}
1094          * implementations).
1095          * 
1096          * @param namespaces
1097          *            an <b>immutable</b> map of inherited namespaces, possibly null
1098          * @param expression
1099          *            the SPARQL query expression, not null and possibly containing {@code $$}
1100          *            placeholders
1101          * @param arguments
1102          *            the values to assign to {@code $$} placeholders
1103          * @throws ParseException
1104          *             in case the query expression is not valid
1105          */
1106         protected Sparql(final Map<String, String> namespaces, final String expression,
1107                 final Object... arguments) throws ParseException {
1108             super(namespaces);
1109             this.expression = expand(expression, arguments);
1110             this.defaultGraphs = null;
1111             this.namedGraphs = null;
1112         }
1113 
1114         @Override
1115         public Sparql timeout(@Nullable final Long timeout) {
1116             return (Sparql) super.timeout(timeout);
1117         }
1118 
1119         @Override
1120         public Sparql namespaces(@Nullable final Map<String, String> namespaces) {
1121             return (Sparql) super.namespaces(namespaces);
1122         }
1123 
1124         /**
1125          * Sets the optional default graphs for the query, overriding the default graphs specified
1126          * in the <tt>FROM</tt> clause. Passing null will remove any default graph previously set.
1127          * 
1128          * @param defaultGraphs
1129          *            a vararg array with the non-null URIs of the default graphs, possibly empty;
1130          *            pass null to remove any default graph previously set
1131          * @return this operation object for call chaining
1132          */
1133         public final synchronized Sparql defaultGraphs(@Nullable final URI... defaultGraphs) {
1134             return defaultGraphs(defaultGraphs == null ? null : Arrays.asList(defaultGraphs));
1135         }
1136 
1137         /**
1138          * Sets the optional default graphs for the query, overriding default graphs specified in
1139          * the <tt>FROM</tt> clause. Passing null will remove any default graph previously set.
1140          * 
1141          * @param defaultGraphs
1142          *            an {@code Iterable} with the non-null URIs of the default graphs, possibly
1143          *            empty; pass null to remove any default graph previously set
1144          * @return this operation object for call chaining
1145          */
1146         public final synchronized Sparql defaultGraphs(//
1147                 @Nullable final Iterable<URI> defaultGraphs) {
1148             this.defaultGraphs = defaultGraphs == null ? null : ImmutableSet.copyOf(defaultGraphs);
1149             return this;
1150         }
1151 
1152         /**
1153          * Sets the optional named graphs for the query, overriding named graphs specified in the
1154          * <tt>FROM NAMED</tt> clause. Passing null will remove any named graph previously set.
1155          * 
1156          * @param namedGraphs
1157          *            a vararg array with the non-null URIs of the named graphs, possibly empty;
1158          *            pass null to remove any named graph previously set
1159          * @return this operation object for call chaining
1160          */
1161         public final synchronized Sparql namedGraphs(@Nullable final URI... namedGraphs) {
1162             return namedGraphs(namedGraphs == null ? null : Arrays.asList(namedGraphs));
1163         }
1164 
1165         /**
1166          * Sets the optional named graphs for the query, overriding named graphs specified in the
1167          * <tt>FROM NAMED</tt> clause. Passing null will remove any named graph previously set.
1168          * 
1169          * @param namedGraphs
1170          *            an {@code Iterable} with the non-null URIs of the named graphs, possibly
1171          *            empty; pass null to remove any named graph previously set
1172          * @return this operation object for call chaining
1173          */
1174         public final synchronized Sparql namedGraphs(@Nullable final Iterable<URI> namedGraphs) {
1175             this.namedGraphs = namedGraphs == null ? null : ImmutableSet.copyOf(namedGraphs);
1176             return this;
1177         }
1178 
1179         /**
1180          * Evaluates the query, returning its boolean result; applicable to ASK queries.
1181          * 
1182          * @return the boolean result of the query
1183          * @throws OperationException
1184          *             on failure (see possible outcome status codes)
1185          */
1186         public final synchronized boolean execBoolean() throws OperationException {
1187             return doExec(this.timeout, Boolean.class, this.expression, this.defaultGraphs,
1188                     this.namedGraphs).getUnique();
1189         }
1190 
1191         /**
1192          * Evaluates the query, returning a {@code Stream} with the resulting {@code Statement}s;
1193          * applicable to CONSTRUCT and DESCRIBE queries.
1194          * 
1195          * @return the resulting {@code Stream} of {@code Statement}s
1196          * @throws OperationException
1197          *             on failure (see possible outcome status codes)
1198          */
1199         public final synchronized Stream<Statement> execTriples() throws OperationException {
1200             return doExec(this.timeout, Statement.class, this.expression, this.defaultGraphs,
1201                     this.namedGraphs);
1202         }
1203 
1204         /**
1205          * Evaluates the query, returning a {@code Stream} with the resulting {@code BindingSet}s;
1206          * applicable to SELECT queries. After access to the returned {@code Stream}, a
1207          * {@code List<String>} with output variables can be obtained by querying the
1208          * {@code Stream} metadata attribute {@code variables}.
1209          * 
1210          * @return the resulting {@code Stream} of {@code BindingSet}s
1211          * @throws OperationException
1212          *             on failure (see possible outcome status codes)
1213          */
1214         public final synchronized Stream<BindingSet> execTuples() throws OperationException {
1215             return doExec(this.timeout, BindingSet.class, this.expression, this.defaultGraphs,
1216                     this.namedGraphs);
1217         }
1218 
1219         /**
1220          * Implementation method responsible of executing the SPARQL operation.
1221          * 
1222          * @param timeout
1223          *            the optional timeout for the operation; null if there is no timeout
1224          * @param type
1225          *            the expected type of elements for the resulting {@code Stream}; either
1226          *            {@link Statement}, {@link BindingSet} or {@link Boolean}
1227          * @param expression
1228          *            the SPARQL query expression
1229          * @param defaultGraphs
1230          *            the optional set of default graphs overriding the ones possibly specified in
1231          *            the {@code FROM} query clause; if null, no override should take place
1232          * @param namedGraphs
1233          *            the optional set of named graphs overriding the ones possibly specified in
1234          *            the {@code FROM NAMED} query clause; if null, no override should take place
1235          * @param <T>
1236          *            the type of result elements
1237          * @return a {@code Stream} with the result of the query
1238          * @throws OperationException
1239          *             in case of failure (see possible outcome status codes)
1240          */
1241         protected abstract <T> Stream<T> doExec(@Nullable final Long timeout, final Class<T> type,
1242                 final String expression, @Nullable final Set<URI> defaultGraphs,
1243                 @Nullable final Set<URI> namedGraphs) throws OperationException;
1244 
1245         private String expand(final String expression, final Object... arguments)
1246                 throws ParseException {
1247             int expansions = 0;
1248             String result = expression;
1249             final Matcher matcher = PLACEHOLDER_PATTERN.matcher(expression);
1250             try {
1251                 if (matcher.find()) {
1252                     final StringBuilder builder = new StringBuilder();
1253                     int last = 0;
1254                     do {
1255                         Object arg = arguments[expansions++];
1256                         builder.append(expression.substring(last, matcher.start(1)));
1257                         builder.append(arg instanceof Number ? arg : Data.toString(arg, null,
1258                                 false));
1259                         last = matcher.end(1);
1260                     } while (matcher.find());
1261                     builder.append(expression.substring(last, expression.length()));
1262                     result = builder.toString();
1263                 }
1264             } catch (final IndexOutOfBoundsException ex) {
1265                 throw new ParseException(expression, "No argument supplied for placeholder #"
1266                         + expansions);
1267             }
1268             if (expansions != arguments.length) {
1269                 throw new ParseException(expression, "Expression string contains " + expansions
1270                         + " placholders, but " + arguments.length + " arguments where supplied");
1271             }
1272             return result;
1273         }
1274 
1275     }
1276 
1277 }