1   package eu.fbk.knowledgestore.triplestore.virtuoso;
2   
3   import java.io.IOException;
4   import java.sql.SQLException;
5   
6   import javax.annotation.Nullable;
7   import javax.sql.ConnectionPoolDataSource;
8   
9   import com.google.common.base.MoreObjects;
10  import com.google.common.base.Preconditions;
11  
12  import org.apache.hadoop.fs.FileSystem;
13  import org.apache.hadoop.fs.Path;
14  import org.openrdf.repository.RepositoryException;
15  import org.slf4j.Logger;
16  import org.slf4j.LoggerFactory;
17  
18  import virtuoso.jdbc4.VirtuosoConnectionPoolDataSource;
19  import virtuoso.sesame2.driver.VirtuosoRepository;
20  import virtuoso.sesame2.driver.VirtuosoRepositoryConnection;
21  
22  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
23  import eu.fbk.knowledgestore.triplestore.SynchronizedTripleStore;
24  import eu.fbk.knowledgestore.triplestore.TripleStore;
25  import eu.fbk.knowledgestore.triplestore.TripleTransaction;
26  
27  /**
28   * A {@code TripleStore} implementation accessing an external OpenLink Virtuoso server.
29   * <p>
30   * This class stores and access triples in an external Virtuoso triple store, communicating to it
31   * via the Virtuoso Sesame driver. Data modification is performed without relying on Virtuoso
32   * transactions, in order to support bulk loading. When writing data in a read-write
33   * {@code TripleTransaction}, a Virtuoso transaction is thus not created; a marker file is instead
34   * stored and later removed upon successful 'commit' of the {@code TripleTransaction}; in case of
35   * failure, the marker file remain on disk and signals that the triplestore is in a potentially
36   * corrupted state, triggering repopulation starting from master data. Given this mechanism, it is
37   * thus important for the component to be wrapped in a {@link SynchronizedTripleStore} that allows
38   * at most a write transaction at a time, preventing simultaneous read transactions
39   * (synchronization N:WX). Note that configuration, startup, shutdown and management in general of
40   * the Virtuoso server are a responsibility of the user, with the {@code VirtuosoTripleStore}
41   * component limiting to access Virtuoso for reading and writing triples.
42   * </p>
43   */
44  public final class VirtuosoTripleStore implements TripleStore {
45  
46      // see also the following resources for reference:
47      // - https://newsreader.fbk.eu/trac/wiki/TripleStoreNotes
48      // - http://docs.openlinksw.com/sesame/ (Virtuoso javadoc)
49      // - http://www.openlinksw.com/vos/main/Main/VirtSesame2Provider
50  
51      private static final String DEFAULT_HOST = "localhost";
52  
53      private static final int DEFAULT_PORT = 1111;
54  
55      private static final String DEFAULT_USERNAME = "dba";
56  
57      private static final String DEFAULT_PASSWORD = "dba";
58  
59      private static final boolean DEFAULT_POOLING = false;
60  
61      private static final int DEFAULT_BATCH_SIZE = 5000;
62  
63      private static final int DEFAULT_FETCH_SIZE = 200;
64  
65      private static final String DEFAULT_MARKER_FILENAME = "virtuoso.bulk.transaction";
66  
67      private static final Logger LOGGER = LoggerFactory.getLogger(VirtuosoTripleStore.class);
68  
69      private final VirtuosoRepository virtuoso;
70  
71      private final FileSystem fileSystem;
72  
73      private final Path markerPath;
74  
75      /**
76       * Creates a new instance based on the supplied most relevant properties.
77       * 
78       * @param fileSystem
79       *            the file system where to store the marker file
80       * @param host
81       *            the name / IP address of the host where virtuoso is running; if null defaults to
82       *            localhost
83       * @param port
84       *            the port Virtuoso is listening to; if null defaults to 1111
85       * @param username
86       *            the username to login into Virtuoso; if null defaults to dba
87       * @param password
88       *            the password to login into Virtuoso; if null default to dba
89       */
90      public VirtuosoTripleStore(final FileSystem fileSystem, @Nullable final String host,
91              @Nullable final Integer port, @Nullable final String username,
92              @Nullable final String password) {
93          this(fileSystem, host, port, username, password, null, null, null, null);
94      }
95  
96      /**
97       * Creates a new instance based the supplied complete set of configuration properties.
98       * 
99       * @param fileSystem
100      *            the file system where to store the marker file
101      * @param host
102      *            the name / IP address of the host where virtuoso is running; if null defaults to
103      *            localhost
104      * @param port
105      *            the port Virtuoso is listening to; if null defaults to 1111
106      * @param username
107      *            the username to login into Virtuoso; if null defaults to dba
108      * @param password
109      *            the password to login into Virtuoso; if null default to dba
110      * @param pooling
111      *            true if connection pooling should be used (impact on performances is
112      *            negligible); if null defaults to false
113      * @param batchSize
114      *            the number of added/removed triples to buffer on the client before sending them
115      *            to Virtuoso as a single chunk; if null defaults to 5000
116      * @param fetchSize
117      *            the number of results (solutions, triples, ...) to fetch from Virtuoso in a
118      *            single operation when query results are iterated; if null defaults to 200
119      * @param markerFilename
120      *            the name of the marker file created to signal Virtuoso is being used in a
121      *            non-transactional mode; if null defaults to virtuoso.bulk.transaction
122      */
123     public VirtuosoTripleStore(final FileSystem fileSystem, @Nullable final String host,
124             @Nullable final Integer port, @Nullable final String username,
125             @Nullable final String password, @Nullable final Boolean pooling,
126             @Nullable final Integer batchSize, @Nullable final Integer fetchSize,
127             @Nullable final String markerFilename) {
128 
129         // Apply default values
130         final String actualMarkerFilename = MoreObjects.firstNonNull(markerFilename,
131                 DEFAULT_MARKER_FILENAME);
132         final String actualHost = MoreObjects.firstNonNull(host, DEFAULT_HOST);
133         final int actualPort = MoreObjects.firstNonNull(port, DEFAULT_PORT);
134         final String actualUsername = MoreObjects.firstNonNull(username, DEFAULT_USERNAME);
135         final String actualPassword = MoreObjects.firstNonNull(password, DEFAULT_PASSWORD);
136         final boolean actualPooling = MoreObjects.firstNonNull(pooling, DEFAULT_POOLING);
137         final int actualBatchSize = MoreObjects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE);
138         final int actualFetchSize = MoreObjects.firstNonNull(fetchSize, DEFAULT_FETCH_SIZE);
139 
140         // Check parameters
141         Preconditions.checkArgument(actualPort > 0 && actualPort < 65536);
142         Preconditions.checkArgument(actualBatchSize > 0);
143         Preconditions.checkArgument(actualFetchSize > 0);
144 
145         // Instantiate the VirtuosoRepository
146         if (actualPooling) {
147             // Pooling (see http://docs.openlinksw.com/virtuoso/VirtuosoDriverJDBC.html, section
148             // 7.4.4.2) doesn't seem to affect performances. We keep this implementation: perhaps
149             // things may change with future versions of Virtuoso.
150             final VirtuosoConnectionPoolDataSource source = new VirtuosoConnectionPoolDataSource();
151             source.setServerName(actualHost);
152             source.setPortNumber(actualPort);
153             source.setUser(actualUsername);
154             source.setPassword(actualPassword);
155             this.virtuoso = new VirtuosoRepository((ConnectionPoolDataSource) source,
156                     "sesame:nil", true);
157         } else {
158             final String url = String.format("jdbc:virtuoso://%s:%d", actualHost, actualPort);
159             this.virtuoso = new VirtuosoRepository(url, actualUsername, actualPassword,
160                     "sesame:nil", true);
161         }
162 
163         // Further configure the VirtuosoRepository
164         this.virtuoso.setBatchSize(actualBatchSize);
165         this.virtuoso.setFetchSize(actualFetchSize);
166 
167         // Setup marker variables
168         this.fileSystem = Preconditions.checkNotNull(fileSystem);
169         this.markerPath = new Path(actualMarkerFilename).makeQualified(fileSystem);
170 
171         // Log relevant information
172         LOGGER.info("VirtuosoTripleStore URL: {}", actualHost + ":" + actualPort);
173         LOGGER.info("VirtuosoTripleStore marker: {}", this.markerPath);
174     }
175 
176     @Override
177     public void init() throws IOException {
178         try {
179             this.virtuoso.initialize(); // looking at Virtuoso code this seems a NOP
180         } catch (final RepositoryException ex) {
181             throw new IOException("Failed to initialize Virtuoso driver", ex);
182         }
183     }
184 
185     @Override
186     public TripleTransaction begin(final boolean readOnly) throws DataCorruptedException,
187             IOException {
188         // Check if there was an interrupted transaction.
189         if (existsTransactionMarker()) {
190             throw new DataCorruptedException("The triple store performed a bulk operation "
191                     + "that didn't complete successfully.");
192         }
193         return new VirtuosoTripleTransaction(this, readOnly);
194     }
195 
196     @Override
197     public void reset() throws IOException {
198         VirtuosoRepositoryConnection connection = null;
199         try {
200             connection = (VirtuosoRepositoryConnection) this.virtuoso.getConnection();
201             connection.getQuadStoreConnection().prepareCall("RDF_GLOBAL_RESET ()").execute();
202         } catch (final RepositoryException ex) {
203             throw new IOException("Could not connect to Virtuoso server", ex);
204         } catch (final SQLException e) {
205             throw new IOException("Something went wrong while invoking stored procedure.", e);
206         } finally {
207             if (connection != null) {
208                 try {
209                     connection.close();
210                 } catch (final RepositoryException re) {
211                     throw new IOException("Error while closing connection.", re);
212                 }
213             }
214         }
215 
216         final boolean removedTransactionMarker = removeTransactionMarker();
217         LOGGER.info("Database reset. Transaction marker removed: " + removedTransactionMarker);
218     }
219 
220     @Override
221     public void close() {
222         // no need to terminate pending transactions: this is done externally
223         try {
224             this.virtuoso.shutDown(); // looking at Virtuoso code this should be a NOP
225         } catch (final RepositoryException ex) {
226             LOGGER.error("Failed to shutdown Virtuoso driver", ex);
227         }
228     }
229 
230     @Override
231     public String toString() {
232         return getClass().getSimpleName();
233     }
234 
235     VirtuosoRepository getVirtuoso() {
236         return this.virtuoso;
237     }
238 
239     /**
240      * Checks if the transaction file exists.
241      * 
242      * @return <code>true</code> if the marker is present, <code>false</code> otherwise.
243      */
244     boolean existsTransactionMarker() throws IOException {
245         // try {
246         // return this.fileSystem.exists(this.markerPath)
247         // } catch (final IOException ioe) {
248         // throw new IOException("Error while checking virtuoso transaction file.", ioe);
249         // }
250         return false; // TODO disabled so not to depend on HDFS
251     }
252 
253     /**
254      * Adds the transaction file.
255      * 
256      * @return <code>true</code> if the marker was not present, <code>false</code> otherwise.
257      */
258     boolean addTransactionMarker() throws IOException {
259         // try {
260         // return this.fileSystem.createNewFile(this.markerPath);
261         // } catch (final IOException ioe) {
262         // throw new IOException("Error while adding virtuoso transaction file.", ioe);
263         // }
264         return false; // TODO disabled so not to depend on HDFS
265     }
266 
267     /**
268      * Removes the transaction file.
269      * 
270      * @return <code>true</code> if the marker was present, <code>false</code> otherwise.
271      */
272     boolean removeTransactionMarker() throws IOException {
273         // try {
274         // return this.fileSystem.delete(this.markerPath, false);
275         // } catch (final IOException ioe) {
276         // throw new IOException("Error while adding virtuoso transaction file.", ioe);
277         // }
278         return false; // TODO disabled so not to depend on HDFS
279     }
280 
281 }