1   package eu.fbk.knowledgestore.triplestore.virtuoso;
2   
3   import java.io.Closeable;
4   import java.io.IOException;
5   import java.lang.reflect.Field;
6   import java.sql.Connection;
7   import java.sql.SQLException;
8   import java.util.Collections;
9   import java.util.Iterator;
10  import java.util.concurrent.Future;
11  import java.util.concurrent.TimeUnit;
12  
13  import javax.annotation.Nullable;
14  
15  import com.google.common.base.Preconditions;
16  import com.google.common.base.Throwables;
17  import com.google.common.collect.Iterators;
18  
19  import org.openrdf.model.Resource;
20  import org.openrdf.model.Statement;
21  import org.openrdf.model.URI;
22  import org.openrdf.model.Value;
23  import org.openrdf.model.impl.ContextStatementImpl;
24  import org.openrdf.query.Binding;
25  import org.openrdf.query.BindingSet;
26  import org.openrdf.query.MalformedQueryException;
27  import org.openrdf.query.QueryEvaluationException;
28  import org.openrdf.query.QueryLanguage;
29  import org.openrdf.query.TupleQuery;
30  import org.openrdf.repository.RepositoryException;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  import info.aduna.iteration.CloseableIteration;
35  import info.aduna.iteration.CloseableIteratorIteration;
36  import info.aduna.iteration.EmptyIteration;
37  import info.aduna.iteration.IterationWrapper;
38  
39  import virtuoso.sesame2.driver.VirtuosoRepositoryConnection;
40  
41  import eu.fbk.knowledgestore.data.Data;
42  import eu.fbk.knowledgestore.data.Handler;
43  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
44  import eu.fbk.knowledgestore.triplestore.SelectQuery;
45  import eu.fbk.knowledgestore.triplestore.TripleTransaction;
46  
47  final class VirtuosoTripleTransaction implements TripleTransaction {
48  
49      private static final Logger LOGGER = LoggerFactory.getLogger(VirtuosoTripleTransaction.class);
50  
51      private final VirtuosoTripleStore store;
52  
53      private final VirtuosoRepositoryConnection connection;
54  
55      private final boolean readOnly;
56  
57      private final long ts;
58  
59      VirtuosoTripleTransaction(final VirtuosoTripleStore store, final boolean readOnly)
60              throws IOException {
61  
62          assert store != null;
63  
64          // try to connect to Virtuoso - under the hoods, a (pooled) JDBC connection is obtained
65          final long ts = System.currentTimeMillis();
66          final VirtuosoRepositoryConnection connection;
67          try {
68              connection = (VirtuosoRepositoryConnection) store.getVirtuoso().getConnection();
69          } catch (final RepositoryException ex) {
70              throw new IOException("Could not connect to Virtuoso", ex);
71          }
72  
73          this.store = store;
74          this.connection = connection;
75          this.readOnly = readOnly;
76          this.ts = ts;
77  
78          try {
79              // Following lines (that cause setAutoCommit(false) on underlying connection) will
80              // cause a major slow-down in some queries
81              // connection.begin();
82  
83              connection.getQuadStoreConnection().setAutoCommit(true);
84              connection.getQuadStoreConnection().setReadOnly(readOnly);
85              // connection.getQuadStoreConnection().prepareCall("log_enable(2)").execute();
86  
87          } catch (final Throwable ex) {
88              try {
89                  connection.close();
90              } catch (final RepositoryException ex2) {
91                  LOGGER.error("Cannot close connection after begin() failure", ex);
92              }
93              throw new IOException("Cannot setup read-only transaction", ex);
94          }
95  
96          if (LOGGER.isDebugEnabled()) {
97              LOGGER.debug(this + " started in " + (readOnly ? "read-only" : "read-write")
98                      + " mode, " + (System.currentTimeMillis() - ts) + " ms");
99          }
100     }
101 
102     private void checkWritable() {
103         if (this.readOnly) {
104             throw new IllegalStateException("Write operation not allowed on read-only transaction");
105         }
106     }
107 
108     @Nullable
109     private <T, E extends Exception> CloseableIteration<T, E> logClose(
110             @Nullable final CloseableIteration<T, E> iteration) {
111         if (iteration == null || !LOGGER.isDebugEnabled()) {
112             return iteration;
113         }
114         final long ts = System.currentTimeMillis();
115         return new IterationWrapper<T, E>(iteration) {
116 
117             @Override
118             protected void handleClose() throws E {
119                 try {
120                     super.handleClose();
121                 } finally {
122                     LOGGER.debug("Virtuoso iteration closed after {} ms",
123                             System.currentTimeMillis() - ts);
124                 }
125             }
126 
127         };
128     }
129 
130     @Override
131     public CloseableIteration<? extends Statement, ? extends Exception> get(
132             @Nullable final Resource subject, @Nullable final URI predicate,
133             @Nullable final Value object, @Nullable final Resource context) throws IOException,
134             IllegalStateException {
135 
136         try {
137             final long ts = System.currentTimeMillis();
138             final CloseableIteration<? extends Statement, ? extends Exception> result;
139             if (subject == null || predicate == null || object == null || context == null) {
140                 result = logClose(this.connection.getStatements(subject, predicate, object, false,
141                         context));
142                 LOGGER.debug("Virtuoso getStatements() iteration obtained in {} ms",
143                         System.currentTimeMillis() - ts);
144             } else {
145                 Iterator<Statement> iterator;
146                 if (this.connection.hasStatement(subject, predicate, object, false, context)) {
147                     iterator = Collections.emptyIterator();
148                 } else {
149                     iterator = Iterators.<Statement>singletonIterator(new ContextStatementImpl(
150                             subject, predicate, object, context));
151                 }
152                 result = new CloseableIteratorIteration<Statement, RuntimeException>(iterator);
153                 LOGGER.debug("Virtuoso hasStatement() evaluated in {} ms",
154                         System.currentTimeMillis() - ts);
155             }
156             return result;
157         } catch (final RepositoryException re) {
158             throw new IOException("Error while checking statement.", re);
159         }
160     }
161 
162     @Override
163     public CloseableIteration<BindingSet, QueryEvaluationException> query(final SelectQuery query,
164             @Nullable final BindingSet bindings, @Nullable final Long timeout)
165             throws DataCorruptedException, IOException, UnsupportedOperationException {
166 
167         LOGGER.debug("Evaluating query:\n{}", query);
168 
169         final TupleQuery tupleQuery;
170         try {
171             tupleQuery = this.connection
172                     .prepareTupleQuery(QueryLanguage.SPARQL, query.getString());
173 
174         } catch (final RepositoryException ex) {
175             throw new IOException("Failed to prepare SPARQL tuple query:\n" + query, ex);
176 
177         } catch (final MalformedQueryException ex) {
178             // should not happen, as SelectQuery can only be created with valid queries
179             throw new UnsupportedOperationException(
180                     "SPARQL query rejected as malformed by Virtuoso:\n" + query, ex);
181         }
182 
183         if (bindings != null) {
184             for (final Binding binding : bindings) {
185                 tupleQuery.setBinding(binding.getName(), binding.getValue());
186             }
187         }
188 
189         // note: the following setting seems to be totally ignored by Virtuoso
190         // if (timeout != null) {
191         // tupleQuery.setMaxQueryTime(timeout.intValue());
192         // }
193 
194         final int msTimeout = timeout == null ? 0 : timeout.intValue();
195         try {
196             this.connection.getQuadStoreConnection()
197                     .prepareCall("set result_timeout = " + msTimeout).execute();
198         } catch (final Throwable ex) {
199             LOGGER.warn("Failed to set result_timeout = " + msTimeout
200                     + " on Virtuoso JDBC connection", ex);
201         }
202 
203         try {
204             final long ts = System.currentTimeMillis();
205             CloseableIteration<BindingSet, QueryEvaluationException> result;
206             result = tupleQuery.evaluate();
207             result = new IterationWrapper<BindingSet, QueryEvaluationException>(result) {
208 
209                 @Override
210                 public boolean hasNext() throws QueryEvaluationException {
211                     try {
212                         return super.hasNext();
213                     } catch (final QueryEvaluationException ex) {
214                         if (isPartialResultException(ex)) {
215                             return false;
216                         }
217                         throw ex;
218                     }
219                 }
220 
221             };
222             result = logClose(result);
223             LOGGER.debug("Virtuoso iteration obtained in {} ms", System.currentTimeMillis() - ts);
224             return result;
225         } catch (final QueryEvaluationException ex) {
226             if (isPartialResultException(ex)) {
227                 return new EmptyIteration<>();
228             }
229             throw new IOException("Failed to execute query - " + ex.getMessage(), ex);
230         }
231     }
232 
233     @Override
234     public void infer(@Nullable final Handler<? super Statement> handler) throws IOException,
235             IllegalStateException {
236 
237         checkWritable();
238 
239         // No inference done at this level (to be implemented in a decorator).
240         if (handler != null) {
241             try {
242                 handler.handle(null);
243             } catch (final Throwable ex) {
244                 Throwables.propagateIfPossible(ex, IOException.class);
245                 throw new RuntimeException(ex);
246             }
247         }
248     }
249 
250     /**
251      * Adds the specified RDF statement to the triple store. Virtuoso may buffer the operation,
252      * performing it when more opportune and in any case ensuring that the same effects are
253      * produced as obtainable by directly executing the operation.
254      *
255      * @param statement
256      *            the RDF statement to add
257      * @throws DataCorruptedException
258      *             in case a non-recoverable data corruption situation is detected
259      * @throws IOException
260      *             in case another IO error occurs not implying data corruption
261      * @throws IllegalStateException
262      *             in case the transaction is read-only
263      */
264     public void add(final Statement statement) throws DataCorruptedException, IOException {
265 
266         Preconditions.checkNotNull(statement);
267         checkWritable();
268 
269         try {
270             this.connection.add(statement);
271         } catch (final RepositoryException ex) {
272             throw new IOException("Failed to add statement: " + statement, ex);
273         }
274     }
275 
276     @Override
277     public void add(final Iterable<? extends Statement> stream) throws IOException,
278             IllegalStateException {
279         addBulk(stream, false);
280     }
281 
282     /**
283      * Adds the specified RDF statements to the triple store. Implementations are designed to
284      * perform high throughput insertion.
285      *
286      * @param statements
287      *            the RDF statements to add
288      * @throws DataCorruptedException
289      *             in case a non-recoverable data corruption situation is detected
290      * @throws IOException
291      *             in case another IO error occurs not implying data corruption
292      * @throws IllegalStateException
293      *             in case the transaction is read-only
294      */
295     public void addBulk(final Iterable<? extends Statement> statements, final boolean transaction)
296             throws DataCorruptedException, IOException {
297 
298         Preconditions.checkNotNull(statements);
299         checkWritable();
300 
301         try {
302             if (!transaction && !this.store.existsTransactionMarker()) {
303                 this.store.addTransactionMarker();
304                 // log_enable affects only the current transaction.
305                 this.connection.getQuadStoreConnection().prepareCall("log_enable(2)").execute();
306             }
307             this.connection.add(statements);
308             this.connection.commit();
309 
310         } catch (final SQLException sqle) {
311             throw new IllegalStateException("Invalid internal operation.", sqle);
312         } catch (final RepositoryException e) {
313             throw new DataCorruptedException("Error while adding bulk data.", e);
314         }
315     }
316 
317     /**
318      * Removes the specified RDF statement from the triple store. Virtuoso may buffer the
319      * operation, performing it when more opportune and in any case ensuring that the same effects
320      * are produced as obtainable by directly executing the operation.
321      *
322      * @param statement
323      *            the RDF statement to remove
324      * @throws DataCorruptedException
325      *             in case a non-recoverable data corruption situation is detected
326      * @throws IOException
327      *             in case another IO error occurs not implying data corruption
328      * @throws IllegalStateException
329      *             in case the transaction is read-only
330      */
331     public void remove(final Statement statement) throws DataCorruptedException, IOException {
332 
333         Preconditions.checkState(!this.readOnly);
334         checkWritable();
335 
336         try {
337             this.connection.remove(statement);
338         } catch (final RepositoryException ex) {
339             throw new IOException("Failed to remove statement: " + statement, ex);
340         }
341     }
342 
343     @Override
344     public void remove(final Iterable<? extends Statement> stream) throws IOException,
345             IllegalStateException {
346         removeBulk(stream, false);
347     }
348 
349     /**
350      * Removes the specified RDF statements from the triple store. Implementations are designed to
351      * perform high throughput insertion.
352      *
353      * @param statements
354      *            the RDF statements to add
355      * @throws DataCorruptedException
356      *             in case a non-recoverable data corruption situation is detected
357      * @throws IOException
358      *             in case another IO error occurs not implying data corruption
359      * @throws IllegalStateException
360      *             in case the transaction is read-only
361      */
362     public void removeBulk(final Iterable<? extends Statement> statements,
363             final boolean transaction) throws DataCorruptedException, IOException {
364 
365         Preconditions.checkNotNull(statements);
366         checkWritable();
367 
368         try {
369             if (!transaction && !this.store.existsTransactionMarker()) {
370                 this.store.addTransactionMarker();
371                 // log_enable affects only the current transaction.
372                 this.connection.getQuadStoreConnection().prepareCall("log_enable(2)").execute();
373             }
374 
375             this.connection.remove(statements);
376             this.connection.commit();
377 
378         } catch (final SQLException sqle) {
379             throw new IllegalStateException("Invalid internal operation.", sqle);
380         } catch (final RepositoryException e) {
381             throw new DataCorruptedException("Error while adding bulk data.", e);
382         }
383     }
384 
385     @Override
386     public void end(final boolean commit) throws IOException {
387 
388         final long ts = System.currentTimeMillis();
389         boolean committed = false;
390 
391         try {
392             if (!this.readOnly) {
393                 if (commit) {
394                     try {
395                         if (this.store.existsTransactionMarker()) {
396                             this.connection.getQuadStoreConnection().prepareCall("log_enable(1)")
397                                     .execute();
398                             this.store.removeTransactionMarker();
399                         }
400                         this.connection.commit();
401                         committed = true;
402 
403                     } catch (final Throwable ex) {
404                         try {
405                             if (this.store.existsTransactionMarker()) {
406                                 throw new DataCorruptedException("Cannot rollback! "
407                                         + "Modifications performed outside a transaction.");
408                             }
409                             this.connection.rollback();
410                             LOGGER.debug("{} rolled back after commit failure", this);
411 
412                         } catch (final RepositoryException ex2) {
413                             throw new DataCorruptedException(
414                                     "Failed to rollback transaction after commit failure", ex);
415                         }
416                         throw new IOException("Failed to commit transaction (rollback forced)", ex);
417                     }
418                 } else {
419                     try {
420                         this.connection.rollback();
421                     } catch (final Throwable ex) {
422                         throw new DataCorruptedException("Failed to rollback transaction", ex);
423                     }
424                 }
425             }
426         } finally {
427             try {
428                 closeVirtuosoRepositoryConnection(this.connection);
429             } catch (final RepositoryException ex) {
430                 LOGGER.error("Failed to close connection", ex);
431             } finally {
432                 if (LOGGER.isDebugEnabled()) {
433                     final long now = System.currentTimeMillis();
434                     LOGGER.debug("{} {} and closed in {} ms, tx duration {} ms", this,
435                             committed ? "committed" : "rolled back", now - ts, now - this.ts);
436                 }
437             }
438         }
439     }
440 
441     @Override
442     public String toString() {
443         return getClass().getSimpleName();
444     }
445 
446     private static boolean isPartialResultException(final QueryEvaluationException ex) {
447         return ex.getMessage() != null && ex.getMessage().contains("Returning incomplete results");
448     }
449 
450     private static void closeVirtuosoRepositoryConnection(
451             final VirtuosoRepositoryConnection connection) throws RepositoryException {
452 
453         final Future<?> future = Data.getExecutor().schedule(new Runnable() {
454 
455             @Override
456             public void run() {
457                 final Connection jdbcConnection = connection.getQuadStoreConnection();
458                 try {
459                     final Field field = jdbcConnection.getClass().getDeclaredField("socket");
460                     field.setAccessible(true);
461                     final Closeable socket = (Closeable) field.get(jdbcConnection);
462                     socket.close(); // as Virtuoso driver ignores polite interrupt requests...
463                     LOGGER.warn("Closed socket backing virtuoso connection");
464                 } catch (final Throwable ex) {
465                     LOGGER.debug("Failed to close socket backing virtuoso connection "
466                             + "(connection class is " + jdbcConnection.getClass() + ")", ex);
467                 }
468             }
469 
470         }, 1000, TimeUnit.MILLISECONDS);
471 
472         try {
473             connection.close();
474         } finally {
475             future.cancel(false);
476         }
477     }
478 
479 }