1   package eu.fbk.knowledgestore.triplestore.virtuoso;
2   
3   import com.google.common.base.MoreObjects;
4   import com.google.common.base.Preconditions;
5   import com.google.common.base.Throwables;
6   import com.google.common.collect.Lists;
7   import eu.fbk.knowledgestore.data.Data;
8   import eu.fbk.knowledgestore.data.Handler;
9   import eu.fbk.knowledgestore.internal.Util;
10  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
11  import eu.fbk.knowledgestore.triplestore.SelectQuery;
12  import eu.fbk.knowledgestore.triplestore.TripleStore;
13  import eu.fbk.knowledgestore.triplestore.TripleTransaction;
14  import info.aduna.iteration.CloseableIteration;
15  import info.aduna.iteration.ConvertingIteration;
16  import org.openrdf.model.*;
17  import org.openrdf.model.Statement;
18  import org.openrdf.model.vocabulary.SESAME;
19  import org.openrdf.model.vocabulary.XMLSchema;
20  import org.openrdf.query.BindingSet;
21  import org.openrdf.query.Dataset;
22  import org.openrdf.query.QueryEvaluationException;
23  import org.openrdf.query.impl.ListBindingSet;
24  import org.slf4j.Logger;
25  import org.slf4j.LoggerFactory;
26  import virtuoso.jdbc4.ConnectionWrapper;
27  import virtuoso.jdbc4.VirtuosoConnection;
28  import virtuoso.jdbc4.VirtuosoConnectionPoolDataSource;
29  import virtuoso.jdbc4.VirtuosoPooledConnection;
30  import virtuoso.sql.ExtendedString;
31  import virtuoso.sql.RdfBox;
32  
33  import javax.annotation.Nullable;
34  import java.io.Closeable;
35  import java.io.IOException;
36  import java.lang.reflect.Field;
37  import java.sql.*;
38  import java.util.Collections;
39  import java.util.List;
40  import java.util.NoSuchElementException;
41  import java.util.Set;
42  import java.util.concurrent.Future;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.atomic.AtomicLong;
45  
46  public final class VirtuosoJdbcTripleStore implements TripleStore {
47  
48  	// see also the following resources for reference:
49  	// - https://newsreader.fbk.eu/trac/wiki/TripleStoreNotes
50  	// - http://docs.openlinksw.com/sesame/ (Virtuoso javadoc)
51  	// - http://www.openlinksw.com/vos/main/Main/VirtSesame2Provider
52  
53  	private static final Logger LOGGER = LoggerFactory.getLogger(VirtuosoJdbcTripleStore.class);
54  
55  	private static final String DEFAULT_HOST = "localhost";
56  
57  	private static final int DEFAULT_PORT = 1111;
58  
59  	private static final String DEFAULT_USERNAME = "dba";
60  
61  	private static final String DEFAULT_PASSWORD = "dba";
62  
63  	private static final int DEFAULT_FETCH_SIZE = 200;
64  
65  	private static final long GRACE_PERIOD = 5000; // 5s more for client-side timeout
66  
67  	private final VirtuosoConnectionPoolDataSource source;
68  
69  	private final int fetchSize;
70  
71  	private final AtomicLong transactionCounter;
72  
73  	/**
74  	 * Creates a new instance based on the supplied configuration properties.
75  	 *
76  	 * @param host      the name / IP address of the host where virtuoso is running; if null defaults to
77  	 *                  localhost
78  	 * @param port      the port Virtuoso is listening to; if null defaults to 1111
79  	 * @param username  the username to login into Virtuoso; if null defaults to dba
80  	 * @param password  the password to login into Virtuoso; if null default to dba
81  	 * @param fetchSize the number of results (solutions, triples, ...) to fetch from Virtuoso in a
82  	 *                  single operation when query results are iterated; if null defaults to 200
83  	 * @param charset   the charset to use for serializing / deserializing textual data exchanged with
84  	 *                  the server; if null defaults to UTF-8
85  	 */
86  	public VirtuosoJdbcTripleStore(@Nullable final String host, @Nullable final Integer port,
87  								   @Nullable final String username, @Nullable final String password,
88  								   @Nullable final Integer fetchSize, @Nullable final String charset) {
89  
90  		// Configure the data source
91  		// (see http://docs.openlinksw.com/virtuoso/VirtuosoDriverJDBC.html, section 7.4.4.2)
92  		this.source = new VirtuosoConnectionPoolDataSource();
93  		this.source.setServerName(MoreObjects.firstNonNull(host, DEFAULT_HOST));
94  		this.source.setPortNumber(MoreObjects.firstNonNull(port, DEFAULT_PORT));
95  		this.source.setUser(MoreObjects.firstNonNull(username, DEFAULT_USERNAME));
96  		this.source.setPassword(MoreObjects.firstNonNull(password, DEFAULT_PASSWORD));
97  		this.source.setCharset(charset != null ? charset : "UTF-8");
98  
99  		// Configure and validate other parameters
100 		this.fetchSize = MoreObjects.firstNonNull(fetchSize, DEFAULT_FETCH_SIZE);
101 		this.transactionCounter = new AtomicLong(0L);
102 		Preconditions.checkArgument(this.fetchSize > 0);
103 
104 		// Log relevant information
105 		LOGGER.info("VirtuosoTripleStore configured, URL={}, fetchSize={}",
106 				this.source.getServerName() + ":" + this.source.getPortNumber(), fetchSize);
107 	}
108 
109 	@Override
110 	public void init() throws IOException {
111 		// Nothing to do here
112 	}
113 
114 	@Override
115 	public TripleTransaction begin(final boolean readOnly) throws DataCorruptedException,
116 			IOException {
117 		return new VirtuosoTransaction(readOnly);
118 	}
119 
120 	@Override
121 	public void reset() throws IOException {
122 		Connection connection = null;
123 		try {
124 			connection = this.source.getConnection();
125 			connection.setReadOnly(false);
126 			connection.setAutoCommit(true);
127 			connection.prepareCall("RDF_GLOBAL_RESET ()").execute();
128 		} catch (final SQLException ex) {
129 			throw new IOException(ex);
130 		} finally {
131 			Util.closeQuietly(connection);
132 		}
133 	}
134 
135 	@Override
136 	public void close() {
137 		// no need to terminate pending transactions: this is done externally
138 		try {
139 			this.source.close();
140 		} catch (final SQLException ex) {
141 			LOGGER.error("Failed to shutdown Virtuoso driver", ex);
142 		}
143 	}
144 
145 	@Override
146 	public String toString() {
147 		return getClass().getSimpleName();
148 	}
149 
150 	private static Value castValue(final Object value) throws IllegalArgumentException {
151 
152 		final ValueFactory vf = Data.getValueFactory();
153 
154 		if (value == null) {
155 			return null;
156 		}
157 		else if (value instanceof ExtendedString) {
158 			final ExtendedString es = (ExtendedString) value;
159 			String string = es.toString();
160 			try {
161 				if (es.getIriType() == ExtendedString.IRI && (es.getStrType() & 0x01) == 0x01) {
162 					if (string.startsWith("_:")) {
163 						string = string.substring(2);
164 						return vf.createBNode(string);
165 					}
166 					else if (string.indexOf(':') < 0) {
167 						return vf.createURI(":" + string);
168 					}
169 					else {
170 						return vf.createURI(string);
171 					}
172 				}
173 				else if (es.getIriType() == ExtendedString.BNODE) {
174 					return vf.createBNode(string);
175 				}
176 				else {
177 					return vf.createLiteral(string);
178 				}
179 			} catch (final Throwable ex) {
180 				throw new IllegalArgumentException("Invalid value from Virtuoso: \"" + string
181 						+ "\", STRTYPE = " + es.getIriType(), ex);
182 			}
183 		}
184 		else if (value instanceof RdfBox) {
185 			final RdfBox rb = (RdfBox) value;
186 			if (rb.getLang() != null) {
187 				return vf.createLiteral(rb.toString(), rb.getLang());
188 			}
189 			else if (rb.getType() != null) {
190 				return vf.createLiteral(rb.toString(), vf.createURI(rb.getType()));
191 			}
192 			else {
193 				return vf.createLiteral(rb.toString());
194 			}
195 		}
196 		else if (value instanceof Blob) {
197 			return vf.createLiteral(value.toString(), XMLSchema.HEXBINARY);
198 		}
199 		else if (value instanceof Date) {
200 			return Data.convert(new java.util.Date(((Date) value).getTime()), Value.class);
201 		}
202 		else if (value instanceof Timestamp) {
203 			return Data.convert(new Date(((Timestamp) value).getTime()), Value.class);
204 		}
205 		else if (value instanceof Time) {
206 			return vf.createLiteral(value.toString(), XMLSchema.TIME);
207 		}
208 		else {
209 			try {
210 				return Data.convert(value, Value.class);
211 			} catch (final Throwable ex) {
212 				throw new IllegalArgumentException("Could not parse value: " + value, ex);
213 			}
214 		}
215 	}
216 
217 	private static String sqlForQuery(final String query, @Nullable final Dataset dataset,
218 									  @Nullable final BindingSet bindings) {
219 
220 		// Start composing a 'sparql' SQL command
221 		final StringBuilder builder = new StringBuilder("sparql\n ");
222 
223 		// Generate define directives for graphs in the FROM / FROM NAMED clauses
224 		if (dataset != null) {
225 			final Set<URI> empty = Collections.emptySet();
226 			for (final URI uri : MoreObjects.firstNonNull(dataset.getDefaultGraphs(), empty)) {
227 				builder.append(" define input:default-graph-uri <" + uri + "> \n");
228 			}
229 			for (final URI uri : MoreObjects.firstNonNull(dataset.getNamedGraphs(), empty)) {
230 				builder.append(" define input:named-graph-uri <" + uri + "> \n");
231 			}
232 		}
233 
234 		// Apply variable bindings, i.e., replace certain variables with supplied values
235 		if (bindings != null && bindings.size() > 0) {
236 			int i = 0;
237 			final int length = query.length();
238 			while (i < query.length()) {
239 				final char ch = query.charAt(i++);
240 				if (ch == '\\' && i < length) {
241 					builder.append(ch).append(query.charAt(i++));
242 				}
243 				else if (ch == '"' || ch == '\'') {
244 					builder.append(ch);
245 					while (i < length) {
246 						final char c = query.charAt(i++);
247 						builder.append(c);
248 						if (c == ch) {
249 							break;
250 						}
251 					}
252 				}
253 				else if ((ch == '?' || ch == '$') && i < length
254 						&& isVarFirstChar(query.charAt(i))) {
255 					int j = i + 1;
256 					while (j < length && isVarMiddleChar(query.charAt(j))) {
257 						++j;
258 					}
259 					final String name = query.substring(i, j);
260 					final Value value = bindings.getValue(name);
261 					if (value != null) {
262 						builder.append(Data.toString(value, null));
263 					}
264 					else {
265 						builder.append(ch).append(name);
266 					}
267 					i = j;
268 				}
269 				else {
270 					builder.append(ch);
271 				}
272 			}
273 		}
274 		else {
275 			builder.append(query);
276 		}
277 
278 		// Return the resulting SQL statement
279 		return builder.toString();
280 	}
281 
282 	private static boolean isVarFirstChar(final char c) {
283 		// Returns true if the supplied char can be used as first char in a variable name
284 		return '0' <= c && c <= '9' || 'A' <= c && c <= 'Z' || 'a' <= c && c <= 'z' || c == '_'
285 				|| 0x00C0 <= c && c <= 0x00D6 || 0x00D8 <= c && c <= 0x00F6 || 0x00F8 <= c
286 				&& c <= 0x02FF || 0x0370 <= c && c <= 0x037D || 0x037F <= c && c <= 0x1FFF
287 				|| 0x200C <= c && c <= 0x200D || 0x2070 <= c && c <= 0x218F || 0x2C00 <= c
288 				&& c <= 0x2FEF || 0x3001 <= c && c <= 0xD7FF || 0xF900 <= c && c <= 0xFDCF
289 				|| 0xFDF0 <= c && c <= 0xFFFD;
290 	}
291 
292 	private static boolean isVarMiddleChar(final char c) {
293 		// Returns true if the supplied char can be used as i-th char (i > 1) in a variable name
294 		return isVarFirstChar(c) || c == 0x00B7 || 0x0300 <= c && c <= 0x036F || 0x203F <= c
295 				&& c <= 0x2040;
296 	}
297 
298 	private static boolean isPartialResultException(final Throwable ex) {
299 		// TODO: try to better implement this method without checking for string containment
300 		// (perhaps now we have access to some SQL code)
301 		return ex.getMessage() != null && ex.getMessage().contains("Returning incomplete results");
302 	}
303 
304 	private static void killConnection(final Object connection) throws Throwable {
305 		if (connection instanceof ConnectionWrapper) {
306 			final Field field = ConnectionWrapper.class.getDeclaredField("pconn");
307 			field.setAccessible(true);
308 			killConnection(field.get(connection));
309 		}
310 		else if (connection instanceof VirtuosoPooledConnection) {
311 			killConnection(((VirtuosoPooledConnection) connection).getVirtuosoConnection());
312 		}
313 		else if (connection instanceof VirtuosoConnection) {
314 			final Field field = VirtuosoConnection.class.getDeclaredField("socket");
315 			field.setAccessible(true);
316 			final Closeable socket = (Closeable) field.get(connection);
317 			socket.close(); // as Virtuoso driver ignores polite interrupt
318 		}
319 		else {
320 			throw new Exception("Don't know how to kill connection "
321 					+ connection.getClass().getName());
322 		}
323 	}
324 
325 	private final class VirtuosoTransaction implements TripleTransaction {
326 
327 		private final Connection connection; // the underlying JDBC connection
328 
329 		private final boolean readOnly; // whether only get() and query() are allowed
330 
331 		private final String id; // transaction name (for logging purposes)
332 
333 		VirtuosoTransaction(final boolean readOnly) throws IOException {
334 
335 			// Setup ID and read-only setting
336 			this.readOnly = readOnly;
337 			this.id = "Virtuoso TX"
338 					+ VirtuosoJdbcTripleStore.this.transactionCounter.incrementAndGet();
339 
340 			// Acquire a JDBC connection from the pool
341 			Connection connection = null;
342 			try {
343 				connection = VirtuosoJdbcTripleStore.this.source.getConnection();
344 				connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
345 				connection.setReadOnly(readOnly);
346 				connection.setAutoCommit(true);
347 			} catch (final SQLException ex) {
348 				Util.closeQuietly(connection);
349 				throw new IOException("Could not connect to Virtuoso", ex);
350 			}
351 			this.connection = connection;
352 		}
353 
354 		private void checkWritable() {
355 			if (this.readOnly) {
356 				throw new IllegalStateException(
357 						"Write operation not allowed on read-only transaction");
358 			}
359 		}
360 
361 		@Override
362 		public CloseableIteration<? extends Statement, ? extends Exception> get(
363 				final Resource subject, final URI predicate, final Value object,
364 				final Resource context) throws IOException, IllegalStateException {
365 
366 			// Generate the query retrieving subset of variables ?s, ?p, ?o, ?c for req. stmts
367 			// NOTE: we always use GRAPH as triple in Virtuoso are *always* stored inside a graph
368 			final StringBuilder builder = new StringBuilder();
369 			builder.append("SELECT * WHERE { GRAPH ");
370 			builder.append(context == null ? "?c" : Data.toString(context, null));
371 			builder.append(" { ");
372 			builder.append(subject == null ? "?s" : Data.toString(subject, null));
373 			builder.append(' ');
374 			builder.append(predicate == null ? "?p" : Data.toString(predicate, null));
375 			builder.append(' ');
376 			builder.append(object == null ? "?o" : Data.toString(object, null));
377 			builder.append(" } }");
378 			final String query = builder.toString();
379 
380 			// Execute the query, converting each result from BindingSet to Statement
381 			return new ConvertingIteration<BindingSet, Statement, Exception>(
382 					new VirtuosoQueryIteration(query, null, null, null)) {
383 
384 				@Override
385 				protected Statement convert(final BindingSet tuple) throws Exception {
386 					final Resource s = subject != null ? subject : (Resource) tuple.getValue("s");
387 					final URI p = predicate != null ? predicate : (URI) tuple.getValue("p");
388 					final Value o = object != null ? object : tuple.getValue("o");
389 					final Resource c = context != null ? context : (Resource) tuple.getValue("c");
390 					return Data.getValueFactory().createStatement(s, p, o, c);
391 				}
392 
393 			};
394 		}
395 
396 		@Override
397 		public CloseableIteration<BindingSet, QueryEvaluationException> query(
398 				final SelectQuery query, final BindingSet bindings, final Long timeout)
399 				throws IOException, UnsupportedOperationException, IllegalStateException {
400 
401 			// Delegate to VirtuosoQueryIteration
402 			return new VirtuosoQueryIteration(query.getString(), query.getDataset(), bindings,
403 					timeout);
404 		}
405 
406 		@Override
407 		public void infer(final Handler<? super Statement> handler) throws IOException,
408 				IllegalStateException {
409 
410 			checkWritable();
411 
412 			// No inference done at this level (to be implemented in a decorator).
413 			if (handler != null) {
414 				try {
415 					handler.handle(null);
416 				} catch (final Throwable ex) {
417 					Throwables.propagateIfPossible(ex, IOException.class);
418 					throw new RuntimeException(ex);
419 				}
420 			}
421 		}
422 
423 		@Override
424 		public void add(final Iterable<? extends Statement> statements) throws IOException,
425 				IllegalStateException {
426 			LOGGER.debug("UPDATING");
427 			update(true, statements);
428 		}
429 
430 		@Override
431 		public void remove(final Iterable<? extends Statement> statements) throws IOException,
432 				IllegalStateException {
433 			LOGGER.debug("REMOVING");
434 			update(false, statements);
435 		}
436 
437 /*        private void update(final boolean insert, final Iterable<? extends Statement> statements)
438 				throws IOException, IllegalStateException {
439 
440             // Check arguments and state
441 
442 //            try {
443 //                VirtuosoTransaction.this.connection.prepareCall("log_enable(2)").execute();
444 //            } catch (SQLException e) {
445 //                e.printStackTrace();
446 //            }
447 
448             Preconditions.checkNotNull(statements);
449             checkWritable();
450 
451             try {
452                 // Compose the SQL statement
453                 final StringBuilder builder = new StringBuilder();
454                 builder.append("SPARQL ");
455                 builder.append(insert ? "INSERT" : "DELETE");
456                 builder.append(" DATA {");
457                 for (final Statement stmt : statements) {
458                     final Resource ctx = MoreObjects.firstNonNull(stmt.getContext(), SESAME.NIL);
459                     builder.append("\n  GRAPH ");
460                     builder.append(Data.toString(ctx, null));
461                     builder.append(" { ");
462                     builder.append(Data.toString(stmt.getSubject(), null));
463                     builder.append(' ');
464                     builder.append(Data.toString(stmt.getPredicate(), null));
465                     builder.append(' ');
466                     builder.append(Data.toString(stmt.getObject(), null));
467                     builder.append(" }");
468                 }
469                 builder.append("\n}");
470 
471                 // Issue the statement
472                 final java.sql.Statement statement = this.connection.createStatement();
473                 LOGGER.info("Starting Virtuoso ingestion");
474                 statement.executeUpdate(builder.toString());
475                 LOGGER.info("Virtuoso ingestion finished (burp!)");
476 
477 //                try {
478 //                    VirtuosoTransaction.this.connection.prepareCall("checkpoint").execute();
479 //                } catch (SQLException e) {
480 //                    e.printStackTrace();
481 //                }
482 
483             } catch (final SQLException ex) {
484                 throw new IOException(ex);
485             }
486         }          */
487 
488 		private void update(final boolean insert, final Iterable<? extends Statement> statements)
489 				throws IOException, IllegalStateException {
490 
491 //			try {
492 //				VirtuosoTransaction.this.connection.prepareCall("log_enable(2)").execute();
493 //			} catch (SQLException e) {
494 //				e.printStackTrace();
495 //			}
496 
497 			// Check arguments and state
498 			Preconditions.checkNotNull(statements);
499 			checkWritable();
500 
501 			try {
502 				String command = "DB.DBA.rdf_insert_triple_c (?,?,?,?,?,?)";
503 				if (!insert) {
504 					command = "DB.DBA.rdf_delete_triple_c (?,?,?,?,?,?)";
505 				}
506 
507 				PreparedStatement insertStmt;
508 				insertStmt = this.connection.prepareStatement(command);
509 
510 				for (Statement stmt : statements) {
511 					insertStmt.setString(1, stmt.getSubject().toString());
512 					insertStmt.setString(2, stmt.getPredicate().toString());
513 					if (stmt.getObject() instanceof Resource) {
514 						insertStmt.setString(3, stmt.getObject().toString());
515 						insertStmt.setNull(4, 12);
516 						insertStmt.setInt(5, 0);
517 					}
518 					else if (stmt.getObject() instanceof Literal) {
519 						Literal lit = (Literal) stmt.getObject();
520 						insertStmt.setString(3, lit.getLabel());
521 						if (lit.getLanguage() != null) { // NOTA: LANGUAGE VA CONTROLLATO ***PRIMA*** DI DATATYPE
522 							insertStmt.setString(4, lit.getLanguage());
523 							insertStmt.setInt(5, 2);
524 						}
525 						else if (lit.getDatatype() != null) {
526 							insertStmt.setString(4, lit.getDatatype().toString());
527 							insertStmt.setInt(5, 3);
528 						}
529 						else {
530 							insertStmt.setNull(4, 12);
531 							insertStmt.setInt(5, 1);
532 						}
533 					}
534 
535 					Resource context = stmt.getContext();
536 					if (context == null) {
537 						context = SESAME.NIL;
538 					}
539 					insertStmt.setString(6, context.toString());
540 					insertStmt.addBatch();
541 				}
542 
543 				LOGGER.info("Starting Virtuoso ingestion");
544 				insertStmt.executeBatch();
545 				LOGGER.info("Finishing Virtuoso ingestion (burp!)");
546 
547 				insertStmt.clearBatch();
548 				insertStmt.close();
549 
550 				LOGGER.info("Starting full text index update");
551 				try {
552 					VirtuosoTransaction.this.connection.prepareCall("DB.DBA.VT_INC_INDEX_DB_DBA_RDF_OBJ ()").execute();
553 				} catch (SQLException e) {
554 					e.printStackTrace();
555 				}
556 				LOGGER.info("Ending full text index update");
557 
558 			} catch (final SQLException ex) {
559 				throw new IOException(ex);
560 			}
561 		}
562 
563 		@Override
564 		public void end(final boolean commit) throws DataCorruptedException, IOException,
565 				IllegalStateException {
566 
567 			try {
568 				// Schedule a task for killing the connection by forcedly closing its socket
569 				final Future<?> future = Data.getExecutor().schedule(new Runnable() {
570 
571 					@Override
572 					public void run() {
573 						try {
574 							killConnection(VirtuosoTransaction.this.connection);
575 							LOGGER.warn("{} - killed Virtuoso JDBC connection", this);
576 						} catch (final Throwable ex) {
577 							LOGGER.debug(this + " - failed to kill Virtuoso JDBC connection "
578 									+ "(connection class is "
579 									+ VirtuosoTransaction.this.connection.getClass() + ")", ex);
580 						}
581 					}
582 
583 				}, 1000, TimeUnit.MILLISECONDS);
584 
585 				// Perform commit or rollback, in case of a non-read-only transaction
586 				if (!this.readOnly) {
587 					if (commit) {
588 						this.connection.commit();
589 					}
590 					else {
591 						this.connection.rollback();
592 					}
593 				}
594 
595 				// Try to close the connection 'kindly'. On success, unschedule the killing job
596 				this.connection.close();
597 				future.cancel(false);
598 
599 			} catch (final SQLException ex) {
600 				LOGGER.error(this + " - failed to close connection", ex);
601 			}
602 		}
603 
604 		@Override
605 		public String toString() {
606 			return this.id;
607 		}
608 
609 		private final class VirtuosoQueryIteration implements
610 				CloseableIteration<BindingSet, QueryEvaluationException> {
611 
612 			private final List<String> variables; // output variables
613 
614 			private java.sql.Statement statement; // to be closed at the end of the iteration
615 
616 			private ResultSet cursor; // to be closed at the end of the iteration
617 
618 			private BindingSet tuple; // next tuple to be returned
619 
620 			public VirtuosoQueryIteration(final String query, @Nullable final Dataset dataset,
621 										  @Nullable final BindingSet bindings, @Nullable final Long timeout)
622 					throws IOException {
623 
624 				try {
625 					// Convert the SPARQL query to the corresponding Virtuoso SQL command
626 					final String sql = sqlForQuery(query, dataset, bindings);
627 
628 					// Set the server-side timeout that control returning of partial results
629 					final int msTimeout = timeout == null ? 0 : timeout.intValue();
630 					try {
631 						VirtuosoTransaction.this.connection.prepareCall(
632 								"set result_timeout = " + msTimeout).execute();
633 					} catch (final Throwable ex) {
634 						LOGGER.warn(VirtuosoTransaction.this
635 								+ " - failed to set result_timeout = " + msTimeout
636 								+ " on Virtuoso JDBC connection (proceeding anyway)", ex);
637 					}
638 
639 					// Create and configure the SQL Statement object for executing the query
640 					this.statement = VirtuosoTransaction.this.connection.createStatement();
641 					this.statement.setFetchDirection(ResultSet.FETCH_FORWARD);
642 					this.statement.setFetchSize(VirtuosoJdbcTripleStore.this.fetchSize);
643 					if (timeout != null) {
644 						// Set the client-side timeout in seconds for getting the first result
645 						this.statement.setQueryTimeout((int) ((timeout + GRACE_PERIOD) / 1000));
646 					}
647 
648 					// Start with empty variable list and null output tuple (they will be changed
649 					// upon successful query execution).
650 					this.variables = Lists.newArrayList();
651 					this.tuple = null;
652 
653 					// Execute the query (this may fail in case of partial results)
654 					this.cursor = this.statement.executeQuery(sql);
655 
656 					// Retrieve output variables.
657 					final ResultSetMetaData metadata = this.cursor.getMetaData();
658 					for (int i = 1; i <= metadata.getColumnCount(); ++i) {
659 						this.variables.add(metadata.getColumnName(i));
660 					}
661 
662 				} catch (final Throwable ex) {
663 					if (isPartialResultException(ex)) {
664 						// Ignore the excetion and close immediately the allocated resources.
665 						// An empty iteration will be returned
666 						LOGGER.debug(
667 								"{} -no results / partial results returned due to expired timeout",
668 								VirtuosoTransaction.this);
669 						Util.closeQuietly(this);
670 					}
671 					throw new IOException("Could not obtain query result set", ex);
672 				}
673 			}
674 
675 			@Override
676 			public boolean hasNext() throws QueryEvaluationException {
677 				if (this.tuple == null) {
678 					this.tuple = advance();
679 				}
680 				return this.tuple != null;
681 			}
682 
683 			@Override
684 			public BindingSet next() throws QueryEvaluationException {
685 				if (this.tuple == null) {
686 					this.tuple = advance();
687 				}
688 				if (this.tuple == null) {
689 					throw new NoSuchElementException();
690 				}
691 				final BindingSet result = this.tuple;
692 				this.tuple = null;
693 				return result;
694 			}
695 
696 			@Override
697 			public void remove() throws QueryEvaluationException {
698 				throw new UnsupportedOperationException();
699 			}
700 
701 			@Override
702 			public void close() throws QueryEvaluationException {
703 				if (this.statement != null) {
704 
705 					final Future<?> future = Data.getExecutor().schedule(new Runnable() {
706 
707 						@Override
708 						public void run() {
709 							try {
710 								end(false);
711 								LOGGER.warn(VirtuosoTransaction.this
712 										+ " - forced closure of Virtuoso transaction "
713 										+ "after unsuccessfull attempt at closing Virtuoso iteration");
714 							} catch (final Throwable ex) {
715 								LOGGER.debug(VirtuosoTransaction.this
716 												+ " - failed to close Virtuoso transaction after "
717 												+ "unsuccessfull attempt at closing Virtuoso iteration",
718 										ex);
719 							}
720 						}
721 
722 					}, 1000, TimeUnit.MILLISECONDS);
723 
724 					try {
725 						// Try to close 'politely' the Virtuoso cursor and the associated
726 						// statements. After 1 second blocked in this operation, we will force
727 						// closure of the whole triple transaction, which in turn may cause the
728 						// killing of the Virtuoso JDBC connection (after one second waiting for
729 						// politely closing it)
730 						this.cursor.close();
731 						this.statement.close();
732 						future.cancel(false);
733 
734 					} catch (final SQLException e) {
735 						throw new QueryEvaluationException(e);
736 
737 					} finally {
738 						this.statement = null;
739 						this.cursor = null;
740 					}
741 				}
742 			}
743 
744 			@Override
745 			protected void finalize() throws Throwable {
746 				Util.closeQuietly(this);
747 			}
748 
749 			private BindingSet advance() throws QueryEvaluationException {
750 				try {
751 					final BindingSet result = null;
752 					if (this.cursor != null) {
753 						if (this.cursor.next()) {
754 							final int size = this.variables.size();
755 							final Value[] values = new Value[size];
756 							for (int i = 0; i < size; ++i) {
757 								values[i] = castValue(this.cursor.getObject(this.variables.get(i)));
758 							}
759 							return new ListBindingSet(this.variables, values);
760 						}
761 						else {
762 							close();
763 						}
764 					}
765 					return result;
766 				} catch (final Exception ex) {
767 					if (isPartialResultException(ex)) {
768 						// On partial results, terminate the iteration ignoring the exception
769 						LOGGER.debug("{} - partial results returned due to expired timeout",
770 								VirtuosoTransaction.this);
771 						return null;
772 					}
773 					throw new QueryEvaluationException("Could not retrieve next query result", ex);
774 				}
775 			}
776 
777 		}
778 
779 	}
780 
781 }