1   package eu.fbk.knowledgestore.datastore;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.io.OutputStream;
6   import java.util.List;
7   import java.util.Map;
8   import java.util.Set;
9   
10  import javax.annotation.Nullable;
11  
12  import com.google.common.base.Function;
13  import com.google.common.base.MoreObjects;
14  import com.google.common.base.Preconditions;
15  import com.google.common.collect.ImmutableSet;
16  import com.google.common.collect.Iterables;
17  import com.google.common.collect.Lists;
18  import com.google.common.collect.Maps;
19  
20  import org.apache.hadoop.fs.FileSystem;
21  import org.apache.hadoop.fs.Path;
22  import org.openrdf.model.URI;
23  import org.openrdf.rio.RDFFormat;
24  import org.slf4j.Logger;
25  import org.slf4j.LoggerFactory;
26  
27  import eu.fbk.knowledgestore.data.Data;
28  import eu.fbk.knowledgestore.data.Record;
29  import eu.fbk.knowledgestore.data.Stream;
30  import eu.fbk.knowledgestore.data.XPath;
31  import eu.fbk.knowledgestore.internal.Util;
32  import eu.fbk.knowledgestore.internal.rdf.RDFUtil;
33  import eu.fbk.knowledgestore.runtime.Files;
34  import eu.fbk.knowledgestore.vocabulary.KS;
35  
36  /**
37   * A {@code DataStore} implementations that keeps all data in memory, with persistence provided by
38   * loading / saving data to file.
39   * <p>
40   * This class realizes a low-performance, functional implementation of the {@code DataStore}
41   * component. Record data is loaded at startup from a configurable file and then indexed in
42   * memory; data is written back at shutdown. Each (read-write) transaction works on its copy of
43   * data, and changes are merged back in the component upon successful commit, although data is
44   * written back to disk only at shutdown.
45   * </p>
46   */
47  public class MemoryDataStore implements DataStore {
48  
49      private static final Logger LOGGER = LoggerFactory.getLogger(MemoryDataStore.class);
50  
51      private static final String PATH_DEFAULT = "datastore.ttl";
52  
53      private Map<URI, Map<URI, Record>> tables;
54  
55      private int revision;
56  
57      private boolean initialized;
58  
59      private boolean closed;
60  
61      private final FileSystem fileSystem;
62  
63      private final Path filePath;
64  
65      /**
66       * Creates a new {@code MemoryDataStore} instance loading/storing data in the file at the path
67       * and file system specified.
68       * 
69       * @param fileSystem
70       *            the filesystem containing the file where to read/write data
71       * @param path
72       *            the path of the file where to read/write data, possibly relative to the file
73       *            system working directory; if null defaults to {@code datastore.ttl}
74       */
75      public MemoryDataStore(final FileSystem fileSystem, @Nullable final String path) {
76          this.fileSystem = Preconditions.checkNotNull(fileSystem);
77          this.filePath = new Path(MoreObjects.firstNonNull(path, MemoryDataStore.PATH_DEFAULT))
78                  .makeQualified(this.fileSystem); // resolve against working directory
79          this.tables = Maps.newHashMap();
80          this.revision = 1;
81          this.initialized = false;
82          this.closed = false;
83          for (final URI supportedType : DataStore.SUPPORTED_TYPES) {
84              this.tables.put(supportedType, Maps.<URI, Record>newLinkedHashMap());
85          }
86          MemoryDataStore.LOGGER.info("{} configured, path={}", this.getClass().getSimpleName(),
87                  this.filePath);
88      }
89  
90      @Override
91      public synchronized void init() throws IOException, IllegalStateException {
92          Preconditions.checkState(!this.initialized && !this.closed);
93          this.initialized = true;
94  
95          InputStream stream = null;
96          try {
97              if (this.fileSystem.exists(this.filePath)) {
98                  stream = Files.readWithBackup(this.fileSystem, this.filePath);
99                  final RDFFormat format = RDFFormat.forFileName(this.filePath.getName());
100                 final List<Record> records = Record.decode(
101                         RDFUtil.readRDF(stream, format, null, null, false),
102                         ImmutableSet.of(KS.RESOURCE, KS.MENTION, KS.ENTITY, KS.CONTEXT), false)
103                         .toList();
104                 for (final Record record : records) {
105                     final URI id = Preconditions.checkNotNull(record.getID());
106                     final URI type = Preconditions.checkNotNull(record.getSystemType());
107                     MemoryDataStore.this.tables.get(type).put(id, record);
108                 }
109                 MemoryDataStore.LOGGER.info("{} initialized, {} records loaded", this.getClass()
110                         .getSimpleName(), records.size());
111             } else {
112                 MemoryDataStore.LOGGER.info("{} initialized, no record loaded", this.getClass()
113                         .getSimpleName());
114             }
115         } finally {
116             Util.closeQuietly(stream);
117         }
118     }
119 
120     @Override
121     public synchronized DataTransaction begin(final boolean readOnly) throws IOException,
122             IllegalStateException {
123         Preconditions.checkState(this.initialized && !this.closed);
124         return new MemoryDataTransaction(readOnly);
125     }
126 
127     @Override
128     public synchronized void close() {
129 
130         if (this.closed) {
131             return;
132         }
133         this.closed = true;
134 
135     }
136 
137     @Override
138     public String toString() {
139         return this.getClass().getSimpleName();
140     }
141 
142     private synchronized void update(final Map<URI, Map<URI, Record>> tables, final int revision)
143             throws IOException {
144         if (this.revision != revision) {
145             throw new IOException("Commit failed due to concurrent modifications " + this.revision
146                     + ", " + revision);
147         }
148 
149         OutputStream stream = null;
150         try {
151             stream = Files.writeWithBackup(this.fileSystem, this.filePath);
152             final List<Record> records = Lists.newArrayList();
153             for (final URI type : tables.keySet()) {
154                 records.addAll(tables.get(type).values());
155             }
156             final RDFFormat format = RDFFormat.forFileName(this.filePath.getName());
157             RDFUtil.writeRDF(stream, format, Data.getNamespaceMap(), null,
158                     Record.encode(Stream.create(records), ImmutableSet.<URI>of()));
159             ++this.revision;
160             this.tables = tables;
161             MemoryDataStore.LOGGER.info("MemoryDataStore updated, {} records persisted",
162                     records.size());
163 
164         } catch (final Throwable ex) {
165             MemoryDataStore.LOGGER.error("MemoryDataStore update failed", ex);
166 
167         } finally {
168             Util.closeQuietly(stream);
169         }
170     }
171 
172     private class MemoryDataTransaction implements DataTransaction {
173 
174         private final Map<URI, Map<URI, Record>> tables;
175 
176         private final int revision;
177 
178         private final boolean readOnly;
179 
180         private boolean ended;
181 
182         MemoryDataTransaction(final boolean readOnly) {
183 
184             Map<URI, Map<URI, Record>> tables = MemoryDataStore.this.tables;
185             if (!readOnly) {
186                 tables = Maps.newHashMap();
187                 for (final Map.Entry<URI, Map<URI, Record>> entry : MemoryDataStore.this.tables
188                         .entrySet()) {
189                     tables.put(entry.getKey(), Maps.newLinkedHashMap(entry.getValue()));
190                 }
191             }
192 
193             this.tables = tables;
194             this.revision = MemoryDataStore.this.revision;
195             this.readOnly = readOnly;
196             this.ended = false;
197         }
198 
199         private Map<URI, Record> getTable(final URI type) {
200             final Map<URI, Record> table = this.tables.get(type);
201             if (table != null) {
202                 return table;
203             }
204             throw new IllegalArgumentException("Unsupported type " + type);
205         }
206 
207         private Stream<Record> select(final Map<URI, Record> table,
208                 final Stream<? extends URI> stream) {
209             return stream.transform(new Function<URI, Record>() {
210 
211                 @Override
212                 public Record apply(final URI id) {
213                     return table.get(id);
214                 }
215 
216             }, 0);
217         }
218 
219         private Stream<Record> filter(final Stream<Record> stream, @Nullable final XPath xpath) {
220             if (xpath == null) {
221                 return stream;
222             }
223             return stream.filter(xpath.asPredicate(), 0);
224         }
225 
226         private Stream<Record> project(final Stream<Record> stream,
227                 @Nullable final Iterable<? extends URI> properties) {
228             final URI[] array = properties == null ? null : Iterables.toArray(properties,
229                     URI.class);
230             return stream.transform(new Function<Record, Record>() {
231 
232                 @Override
233                 public final Record apply(final Record input) {
234                     final Record result = Record.create(input, true);
235                     if (array != null) {
236                         result.retain(array);
237                     }
238                     return result;
239                 }
240 
241             }, 0);
242         }
243 
244         @Override
245         public synchronized Stream<Record> lookup(final URI type, final Set<? extends URI> ids,
246                 @Nullable final Set<? extends URI> properties) throws IOException,
247                 IllegalArgumentException, IllegalStateException {
248             Preconditions.checkState(!this.ended);
249             final Map<URI, Record> table = this.getTable(type);
250             return this.project(this.select(table, Stream.create(ids)), properties);
251         }
252 
253         @Override
254         public synchronized Stream<Record> retrieve(final URI type,
255                 @Nullable final XPath condition, @Nullable final Set<? extends URI> properties)
256                 throws IOException, IllegalArgumentException, IllegalStateException {
257             Preconditions.checkState(!this.ended);
258             final Map<URI, Record> table = this.getTable(type);
259             return this.project(this.filter(Stream.create(table.values()), condition), properties);
260         }
261 
262         @Override
263         public synchronized long count(final URI type, @Nullable final XPath condition)
264                 throws IOException, IllegalArgumentException, IllegalStateException {
265             Preconditions.checkState(!this.ended);
266             final Map<URI, Record> table = this.getTable(type);
267             return this.filter(Stream.create(table.values()), condition).count();
268         }
269 
270         @Override
271         public Stream<Record> match(final Map<URI, XPath> conditions,
272                 final Map<URI, Set<URI>> ids, final Map<URI, Set<URI>> properties)
273                 throws IOException, IllegalStateException {
274             throw new UnsupportedOperationException(); // TODO
275         }
276 
277         @Override
278         public void store(final URI type, final Record record) throws IOException,
279                 IllegalStateException {
280             Preconditions.checkState(!this.ended);
281             Preconditions.checkState(!this.readOnly);
282             Preconditions.checkArgument(record.getID() != null);
283             final Map<URI, Record> table = this.getTable(type);
284             table.put(record.getID(), Record.create(record, true));
285         }
286 
287         @Override
288         public void delete(final URI type, final URI id) throws IOException, IllegalStateException {
289             Preconditions.checkState(!this.ended);
290             Preconditions.checkState(!this.readOnly);
291             Preconditions.checkArgument(id != null);
292             final Map<URI, Record> table = this.getTable(type);
293             table.remove(id);
294         }
295 
296         @Override
297         public synchronized void end(final boolean commit) throws IOException,
298                 IllegalStateException {
299             if (!this.ended) {
300                 this.ended = true;
301                 if (commit && !this.readOnly) {
302                     MemoryDataStore.this.update(this.tables, this.revision);
303                 }
304             }
305         }
306 
307         @Override
308         public String toString() {
309             return this.getClass().getSimpleName();
310         }
311 
312     }
313 
314 }