1   package eu.fbk.knowledgestore.datastore;
2   
3   import com.google.common.collect.Iterables;
4   import com.zaxxer.hikari.HikariConfig;
5   import com.zaxxer.hikari.HikariDataSource;
6   import eu.fbk.knowledgestore.data.Record;
7   import eu.fbk.knowledgestore.data.Stream;
8   import eu.fbk.knowledgestore.data.XPath;
9   import eu.fbk.knowledgestore.runtime.DataCorruptedException;
10  import eu.fbk.knowledgestore.vocabulary.KS;
11  import org.openrdf.model.URI;
12  import org.slf4j.Logger;
13  import org.slf4j.LoggerFactory;
14  
15  import javax.annotation.Nullable;
16  import java.io.*;
17  import java.sql.*;
18  import java.util.*;
19  
20  /**
21   * Created with IntelliJ IDEA.
22   * User: alessio
23   * Date: 08/09/14
24   * Time: 17:57
25   * To change this template use File | Settings | File Templates.
26   */
27  
28  public class MySQLDataStore implements DataStore {
29  
30  //	private String connectionString;
31  //	private String dbUser;
32  //	private String dbPass;
33  
34  	static Logger logger = LoggerFactory.getLogger(MySQLDataStore.class);
35  	public HikariDataSource dataSource;
36  	final HikariConfig config = new HikariConfig();
37  
38  	public MySQLDataStore(String host, String username, String password, String databaseName) {
39  		config.setMinimumIdle(2); // default = max
40  		config.setMaximumPoolSize(10); // default = 10
41  		config.setConnectionTimeout(30000); // default 30000 ms (30 s)
42  		config.setIdleTimeout(600000); // default 600000 ms (10 m)
43  		config.setMaxLifetime(1800000); // default 1800000 ms (30 m)
44  		config.setLeakDetectionThreshold(600000); // default 0 ms = disabled
45  
46  		config.setDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
47  		config.addDataSourceProperty("serverName", host);
48  		config.addDataSourceProperty("port", "3306");
49  		config.addDataSourceProperty("databaseName", databaseName);
50  		config.addDataSourceProperty("user", username);
51  		config.addDataSourceProperty("password", password);
52  
53  		config.addDataSourceProperty("cachePrepStmts", true);
54  		config.addDataSourceProperty("prepStmtCacheSize", 250);
55  		config.addDataSourceProperty("prepStmtCacheSqlLimit", 2048);
56  		config.addDataSourceProperty("useServerPrepStmts", true);
57  
58  //		try {
59  //			Class.forName("com.mysql.jdbc.Driver");
60  //		} catch (Exception e) {
61  //			throw new Error(e);
62  //		}
63  //		connectionString = "jdbc:mysql://" + host + ":3306/" + databaseName;
64  //		logger.debug(String.format("Connection string: %s", connectionString));
65  //		dbUser = username;
66  //		dbPass = password;
67  	}
68  
69  	public class MySQLTransaction implements DataTransaction {
70  
71  		private Connection con;
72  		boolean readOnly;
73  
74  		private final static String insertQuery = "INSERT INTO $tableName (`key`, `value`) VALUES (MD5(?), ?) ON DUPLICATE KEY UPDATE `value` = VALUES(`value`)";
75  		private final static String selectQuery = "SELECT `value` FROM $tableName WHERE `key` = MD5(?)";
76  		private final static String deleteQuery = "DELETE FROM $tableName WHERE `key` = MD5(?)";
77  		private final static String countQuery = "SELECT COUNT(*) FROM $tableName";
78  		private final static String selectAllQuery = "SELECT `value` FROM $tableName";
79  
80  		HashMap<URI, PreparedStatement> insertBatchStatements = new HashMap<>();
81  
82  		public MySQLTransaction(boolean readOnly) throws SQLException {
83  			this.readOnly = readOnly;
84  			this.con = dataSource.getConnection();
85  			this.con.setAutoCommit(false);
86  
87  			insertBatchStatements.put(KS.MENTION, con.prepareStatement(insertQuery.replace("$tableName", "mentions")));
88  			insertBatchStatements.put(KS.RESOURCE, con.prepareStatement(insertQuery.replace("$tableName", "resources")));
89  
90  //			this.connect(dbUser, dbPass);
91  		}
92  
93  		private void connect(String dbUser, String dbPass) throws SQLException {
94  //			con = DriverManager.getConnection(connectionString, dbUser, dbPass);
95  		}
96  
97  		private String getTableName(URI type) throws IOException {
98  			if (type.equals(KS.MENTION)) {
99  				return "mentions";
100 			}
101 			else if (type.equals(KS.RESOURCE)) {
102 				return "resources";
103 			}
104 			throw new IOException(String.format("Unknown URI: %s", type));
105 		}
106 
107 		private byte[] serializeRecord(Record record) throws IOException {
108 			ObjectOutput out = null;
109 			byte[] returnBytes;
110 
111 			try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
112 				out = new ObjectOutputStream(bos);
113 				out.writeObject(record);
114 				returnBytes = bos.toByteArray();
115 			} finally {
116 				if (out != null) {
117 					out.close();
118 				}
119 			}
120 
121 			return returnBytes;
122 		}
123 
124 		private Record unserializeRecord(byte[] bytes) throws IOException {
125 			ObjectInput in = null;
126 			Record returnRecord;
127 
128 			try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
129 				in = new ObjectInputStream(bis);
130 				try {
131 					returnRecord = (Record) in.readObject();
132 				} catch (ClassNotFoundException e) {
133 					throw new IOException(e);
134 				}
135 			} finally {
136 				if (in != null) {
137 					in.close();
138 				}
139 			}
140 
141 			return returnRecord;
142 		}
143 
144 		@Override
145 		public Stream<Record> lookup(URI type, Set<? extends URI> ids, @Nullable Set<? extends URI> properties) throws IOException, IllegalArgumentException, IllegalStateException {
146 			String tableName = getTableName(type);
147 			List<Record> returns = new ArrayList<>();
148 
149 			for (URI id : ids) {
150 				String uri;
151 				try {
152 					uri = id.toString();
153 				} catch (NullPointerException e) {
154 					throw new IOException(e);
155 				}
156 
157 				logger.debug(String.format("Selecting %s", uri));
158 				String query = selectQuery.replace("$tableName", tableName);
159 				try {
160 					PreparedStatement stmt = con.prepareStatement(query);
161 					stmt.setString(1, uri);
162 
163 					ResultSet set = stmt.executeQuery();
164 
165 					while (set.next()) {
166 						Record r = unserializeRecord(set.getBytes("value"));
167 						if (properties != null) {
168 							r.retain(Iterables.toArray(properties, URI.class));
169 						}
170 						returns.add(r);
171 					}
172 				} catch (SQLException e) {
173 					throw new IOException(e);
174 				}
175 
176 			}
177 
178 			return Stream.create(returns);
179 		}
180 
181 		@Override
182 		public Stream<Record> retrieve(URI type, @Nullable XPath condition, @Nullable Set<? extends URI> properties) throws IOException, IllegalArgumentException, IllegalStateException {
183 			String tableName = getTableName(type);
184 			List<Record> returns = new ArrayList<>();
185 
186 			logger.debug("Retrieving all lines");
187 			String query = selectAllQuery.replace("$tableName", tableName);
188 
189 			try {
190 				Statement statement = con.createStatement();
191 				ResultSet resultSet = statement.executeQuery(query);
192 
193 				while (resultSet.next()) {
194 					Record r = unserializeRecord(resultSet.getBytes("value"));
195 					if (condition != null && !condition.evalBoolean(r)) {
196 						continue;
197 					}
198 
199 					if (properties != null) {
200 						r.retain(Iterables.toArray(properties, URI.class));
201 					}
202 					returns.add(r);
203 				}
204 			} catch (SQLException e) {
205 				throw new IOException(e);
206 			}
207 
208 			return Stream.create(returns);
209 		}
210 
211 		@Override
212 		public long count(URI type, @Nullable XPath condition) throws IOException, IllegalArgumentException, IllegalStateException {
213 			String tableName = getTableName(type);
214 			logger.debug("Counting rows");
215 			String query = countQuery.replace("$tableName", tableName);
216 
217 			try {
218 				Statement statement = con.createStatement();
219 				ResultSet resultSet = statement.executeQuery(query);
220 
221 				if (resultSet.next()) {
222 					return resultSet.getLong(1);
223 				}
224 			} catch (SQLException e) {
225 				throw new IOException(e);
226 			}
227 
228 			throw new IOException();
229 		}
230 
231 		@Override
232 		public Stream<Record> match(Map<URI, XPath> conditions, Map<URI, Set<URI>> ids, Map<URI, Set<URI>> properties) throws IOException, IllegalStateException {
233 			return null;  //To change body of implemented methods use File | Settings | File Templates.
234 		}
235 
236 		@Override
237 		public void store(URI type, Record record) throws IOException, IllegalStateException {
238 //			String tableName = getTableName(type);
239 //			String query = insertQuery.replace("$tableName", tableName);
240 
241 			String uri;
242 			try {
243 				uri = record.getID().toString();
244 			} catch (NullPointerException e) {
245 				throw new IOException(e);
246 			}
247 
248 			logger.debug(String.format("Inserting %s", uri));
249 			try {
250 //				PreparedStatement stmt = con.prepareStatement(query);
251 //				stmt.setString(1, uri);
252 //				stmt.setBytes(2, serializeRecord(record));
253 //				stmt.executeUpdate();
254 				insertBatchStatements.get(type).setString(1, uri);
255 				insertBatchStatements.get(type).setBytes(2, serializeRecord(record));
256 				insertBatchStatements.get(type).addBatch();
257 			} catch (SQLException e) {
258 				throw new IOException(e);
259 			}
260 		}
261 
262 		@Override
263 		public void delete(URI type, URI id) throws IOException, IllegalStateException {
264 			String tableName = getTableName(type);
265 
266 			String uri;
267 			try {
268 				uri = id.toString();
269 			} catch (NullPointerException e) {
270 				throw new IOException(e);
271 			}
272 
273 			logger.debug(String.format("Deleting %s", uri));
274 			String query = deleteQuery.replace("$tableName", tableName);
275 			try {
276 				PreparedStatement stmt = con.prepareStatement(query);
277 				stmt.setString(1, uri);
278 				stmt.executeUpdate();
279 			} catch (SQLException e) {
280 				throw new IOException(e);
281 			}
282 		}
283 
284 		@Override
285 		public void end(boolean commit) throws DataCorruptedException, IOException, IllegalStateException {
286 			try {
287 				if (commit) {
288 					for (URI type : insertBatchStatements.keySet()) {
289 						insertBatchStatements.get(type).executeBatch();
290 					}
291 					con.commit();
292 				}
293 				else {
294 					con.rollback();
295 				}
296 				con.close();
297 			} catch (Exception e) {
298 				throw new IOException(e);
299 			}
300 		}
301 	}
302 
303 	@Override
304 	public DataTransaction begin(boolean readOnly) throws DataCorruptedException, IOException, IllegalStateException {
305 
306 		MySQLTransaction ret = null;
307 		try {
308 			ret = new MySQLTransaction(readOnly);
309 		} catch (Exception e) {
310 			throw new IOException(e);
311 		}
312 
313 		return ret;
314 	}
315 
316 	@Override
317 	public void init() throws IOException, IllegalStateException {
318 		dataSource = new HikariDataSource(config);
319 		//To change body of implemented methods use File | Settings | File Templates.
320 	}
321 
322 	@Override
323 	public void close() {
324 		dataSource.close();
325 		//To change body of implemented methods use File | Settings | File Templates.
326 	}
327 }