1   package eu.fbk.knowledgestore.data;
2   
3   import java.io.File;
4   import java.io.FilterOutputStream;
5   import java.io.IOException;
6   import java.io.InputStream;
7   import java.io.ObjectInputStream;
8   import java.io.ObjectOutputStream;
9   import java.io.OutputStream;
10  import java.io.Serializable;
11  import java.net.URI;
12  import java.util.Collections;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.NoSuchElementException;
16  
17  import javax.annotation.Nullable;
18  
19  import com.google.common.base.Joiner;
20  import com.google.common.base.Preconditions;
21  import com.google.common.base.Strings;
22  import com.google.common.collect.ImmutableList;
23  import com.google.common.collect.ImmutableMap;
24  import com.google.common.collect.Iterables;
25  import com.google.common.collect.Lists;
26  import com.google.common.collect.Maps;
27  
28  import org.apache.hadoop.fs.FileStatus;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  
32  import eu.fbk.rdfpro.util.IO;
33  
34  // NOTE: the current implementation rewrites the dictionary file each time a mapping is added,
35  // keeping always a backup copy. This is secure, however performances are severely limited if a
36  // lot of insertions are performed. A better scheme using a log file should be used
37  
38  /**
39   * A persistent, synchronized, monotonic (add-only) dictionary mapping positive {@code int} keys
40   * to {@code Serializable} objects.
41   */
42  public abstract class Dictionary<T extends Serializable> {
43  
44      private static final long MAX_CLOCK_SKEW = 60 * 1000; // 60 sec
45  
46      private final Class<T> clazz;
47  
48      private final String url;
49  
50      private volatile List<T> keyToObjectIndex; // immutable, replaced on reload
51  
52      private volatile Map<T, Integer> objectToKeyIndex; // immutable, replaced on reload
53  
54      private long lastAccessed;
55  
56      public static <T extends Serializable> Dictionary<T> createLocalDictionary(
57              final Class<T> objectClass, final File file) throws IOException {
58  
59          // Check parameters
60          Preconditions.checkNotNull(objectClass);
61          Preconditions.checkNotNull(file);
62  
63          // Build, initialize and return the dictionary
64          final Dictionary<T> dictionary = new LocalDictionary<T>(objectClass, file.toURI()
65                  .toString(), file);
66          dictionary.reload();
67          return dictionary;
68      }
69  
70      public static <T extends Serializable> Dictionary<T> createHadoopDictionary(
71              final Class<T> objectClass, final String fileURL) throws IOException {
72  
73          // Check parameters
74          Preconditions.checkNotNull(objectClass);
75          Preconditions.checkNotNull(fileURL);
76  
77          // Resolve the supplied Hadoop URL, retrieving FileSystem and Path objects
78          final FileSystem fs = FileSystem.get(URI.create(fileURL),
79                  new org.apache.hadoop.conf.Configuration(true));
80          final Path path = new Path(URI.create(fileURL).getPath());
81  
82          // Build normalized URL
83          String urlBase = fs.getUri().toString();
84          String urlPath = path.toString();
85          if (urlBase.endsWith("/")) {
86              urlBase = urlBase.substring(0, urlBase.length() - 1);
87          }
88          if (!urlPath.startsWith("/")) {
89              urlPath = "/" + urlPath;
90          }
91          final String url = urlBase + urlPath;
92  
93          // Build a Dictionary using the Hadoop API for all the I/O
94          final Dictionary<T> dictionary = new HadoopDictionary<T>(objectClass, url, fs, path);
95  
96          // Load dictionary data and return the initialized dictionary
97          dictionary.reload();
98          return dictionary;
99      }
100 
101     Dictionary(final Class<T> objectClass, final String url) {
102         this.clazz = Preconditions.checkNotNull(objectClass);
103         this.url = Preconditions.checkNotNull(url);
104         this.keyToObjectIndex = Lists.newArrayList();
105         this.objectToKeyIndex = Maps.newHashMap();
106     }
107 
108     @Nullable
109     abstract Long lastModified(String suffix) throws IOException;
110 
111     abstract InputStream read(String suffix) throws IOException;
112 
113     abstract OutputStream write(String suffix) throws IOException;
114 
115     abstract void delete(String suffix) throws IOException;
116 
117     abstract void rename(String oldSuffix, String newSuffix) throws IOException;
118 
119     public Class<T> getObjectClass() {
120         return this.clazz;
121     }
122 
123     public String getDictionaryURL() {
124         return this.url;
125     }
126 
127     public T objectFor(final int key) throws IOException, NoSuchElementException {
128         return this.objectFor(key, true);
129     }
130 
131     @Nullable
132     public T objectFor(final int key, final boolean mustExist) throws IOException,
133             NoSuchElementException {
134 
135         Preconditions.checkArgument(key > 0, "Non-positive key %d", key);
136 
137         // local cache of keyToObjectIndex, which may change concurrently
138         List<T> index = this.keyToObjectIndex;
139 
140         if (key > index.size()) {
141             this.reload(); // object might have been added by another process
142             index = this.keyToObjectIndex; // pick up updated index
143         }
144 
145         if (key <= index.size()) {
146             return index.get(key - 1); // 1-based indexes
147         } else if (!mustExist) {
148             return null;
149         }
150 
151         throw new NoSuchElementException("No object for key " + key);
152     }
153 
154     public List<T> objectsFor(final Iterable<? extends Integer> keys, final boolean mustExist)
155             throws IOException, NoSuchElementException {
156 
157         // local cache of keyToObjectIndex, which may change within for cycles
158         List<T> index = this.keyToObjectIndex;
159 
160         for (final int key : keys) {
161             Preconditions.checkArgument(key > 0, "Non-positive key %d", key);
162             if (key > index.size()) {
163                 this.reload(); // missing objects might have been added by other processes
164                 index = this.keyToObjectIndex;
165                 break;
166             }
167         }
168 
169         final List<T> result = Lists.newArrayListWithCapacity(Iterables.size(keys));
170         List<Integer> missing = null;
171         for (final int key : keys) {
172             if (key <= index.size()) {
173                 result.add(index.get(key - 1)); // 1-based indexes
174             } else if (mustExist) {
175                 if (missing == null) {
176                     missing = Lists.newArrayList();
177                 }
178                 missing.add(key);
179             }
180         }
181 
182         if (missing != null) {
183             throw new NoSuchElementException("No objects for keys "
184                     + Joiner.on(", ").join(missing));
185         }
186 
187         return result;
188     }
189 
190     public Integer keyFor(final T object) throws IOException {
191         final Integer key = this.keyFor(object, true);
192         assert key != null;
193         return key;
194     }
195 
196     @Nullable
197     public Integer keyFor(final T object, final boolean mayGenerate) throws IOException {
198 
199         Preconditions.checkNotNull(object);
200 
201         Integer key = this.objectToKeyIndex.get(object);
202 
203         if (key == null && mayGenerate) {
204             this.update(Collections.singletonList(object));
205             key = this.objectToKeyIndex.get(object);
206         }
207 
208         return key;
209     }
210 
211     public List<Integer> keysFor(final Iterable<? extends T> objects, final boolean mayGenerate)
212             throws IOException {
213 
214         Preconditions.checkNotNull(objects);
215 
216         // local cache of objectToKeyIndex, which may change within for cycles
217         Map<T, Integer> index = this.objectToKeyIndex;
218 
219         final List<Integer> result = Lists.newArrayListWithCapacity(Iterables.size(objects));
220 
221         List<T> missingObjects = null;
222         List<Integer> missingOffsets = null;
223 
224         for (final T object : objects) {
225             final Integer key = index.get(object);
226             result.add(key);
227             if (key == null) {
228                 Preconditions.checkNotNull(object);
229                 if (missingOffsets == null) {
230                     missingObjects = Lists.newArrayList();
231                     missingOffsets = Lists.newArrayList();
232                 }
233                 assert missingObjects != null; // to make Eclipse happy :-(
234                 missingObjects.add(object);
235                 missingOffsets.add(result.size());
236             }
237         }
238 
239         if (missingObjects != null && mayGenerate) {
240             assert missingOffsets != null; // to make Eclipse happy :-(
241             this.update(missingObjects);
242             index = this.objectToKeyIndex; // pick up updated index
243             for (int i = 0; i < missingObjects.size(); ++i) {
244                 final int offset = missingOffsets.get(i);
245                 final T object = missingObjects.get(i);
246                 final Integer key = this.objectToKeyIndex.get(object);
247                 result.set(offset, key);
248             }
249         }
250 
251         return result;
252     }
253 
254     public <M extends Map<? super Integer, ? super T>> M toMap(@Nullable final M map)
255             throws IOException {
256 
257         @SuppressWarnings("unchecked")
258         final M actualMap = map != null ? map : (M) Maps.newHashMap();
259 
260         this.reload(); // make sure to read the most recently persisted data
261 
262         // local cache of objectToKeyIndex, which may change within the for cycle
263         final List<T> index = this.keyToObjectIndex;
264 
265         for (int i = 0; i < index.size(); ++i) {
266             actualMap.put(i, index.get(i));
267         }
268         return actualMap;
269     }
270 
271     public <L extends List<? super T>> L toList(@Nullable final L list) throws IOException {
272 
273         @SuppressWarnings("unchecked")
274         final L actualList = list != null ? list : (L) Lists.newArrayList();
275 
276         this.reload(); // make sure to read the most recently persisted data
277 
278         actualList.addAll(this.keyToObjectIndex);
279         return actualList;
280     }
281 
282     private synchronized void reload() throws IOException {
283 
284         // abort if the file was not modified after the last time we loaded it
285         final Long lastModified = lastModifiedWithBackup();
286         if (lastModified == null || lastModified < this.lastAccessed - Dictionary.MAX_CLOCK_SKEW) {
287             return;
288         }
289 
290         // prepare two builders for re-creating the in-memory indexes
291         final ImmutableList.Builder<T> keyToObjectIndexBuilder = ImmutableList.builder();
292         final ImmutableMap.Builder<T, Integer> objectToKeyIndexBuilder = ImmutableMap.builder();
293 
294         // read from the file, putting data in the builders
295         final ObjectInputStream stream = new ObjectInputStream(readWithBackup());
296         assert stream != null;
297 
298         T object = null;
299         try {
300             final int size = stream.readInt();
301             for (int key = 1; key <= size; ++key) {
302                 object = this.clazz.cast(stream.readObject());
303                 keyToObjectIndexBuilder.add(object);
304                 objectToKeyIndexBuilder.put(object, key);
305             }
306 
307         } catch (final ClassCastException ex) {
308             assert object != null;
309             throw new IOException("Cannot read from " + this.url + ": found "
310                     + object.getClass().getName() + ", expected " + this.clazz.getName());
311 
312         } catch (final ClassNotFoundException ex) {
313             throw new IOException("Cannot read from " + this.url + ": either the content is "
314                     + "malformed, or it encodes data of another dictionary using classes not "
315                     + "available in this JVM");
316         } finally {
317             stream.close();
318         }
319 
320         // on success, build the new in-memory indexes and store them in the object fields
321         this.keyToObjectIndex = keyToObjectIndexBuilder.build();
322         this.objectToKeyIndex = objectToKeyIndexBuilder.build();
323 
324         // update the last accessed time
325         this.lastAccessed = System.currentTimeMillis();
326     }
327 
328     private synchronized void update(final Iterable<T> newObjects) throws IOException {
329 
330         // make sure to have the most recent data (we rely on the fact locking is reentrant)
331         this.reload();
332 
333         // access current versions of in-memory indexes (after the reload)
334         List<T> keyToObjectIndex = this.keyToObjectIndex;
335         Map<T, Integer> objectToKeyIndex = this.objectToKeyIndex;
336 
337         // detect missing objects. nothing to do if there are no missing objects
338         final List<T> missing = Lists.newArrayList();
339         for (final T object : newObjects) {
340             if (!objectToKeyIndex.containsKey(object)) {
341                 missing.add(object);
342             }
343         }
344         if (missing.isEmpty()) {
345             return;
346         }
347 
348         // create new key -> object index that includes the missing objects
349         keyToObjectIndex = ImmutableList.copyOf(Iterables.concat(keyToObjectIndex, missing));
350 
351         // create new object -> key index that includes the missing objects
352         final ImmutableMap.Builder<T, Integer> builder = ImmutableMap.builder();
353         builder.putAll(objectToKeyIndex);
354         int key = objectToKeyIndex.size();
355         for (final T object : missing) {
356             builder.put(object, ++key);
357         }
358         objectToKeyIndex = builder.build();
359 
360         // write the new index to the file
361         final ObjectOutputStream stream = new ObjectOutputStream(writeWithBackup());
362         try {
363             stream.writeInt(keyToObjectIndex.size());
364             for (final T object : keyToObjectIndex) {
365                 stream.writeObject(object);
366             }
367         } finally {
368             stream.close();
369         }
370 
371         // update last accessed time
372         this.lastAccessed = System.currentTimeMillis();
373 
374         // update index member variables
375         this.keyToObjectIndex = keyToObjectIndex;
376         this.objectToKeyIndex = objectToKeyIndex;
377     }
378 
379     private InputStream readWithBackup() throws IOException {
380 
381         // we keep track of filesystem exceptions (but it's unclear when they are thrown)
382         IOException exception = null;
383 
384         try {
385             // 1. try to read the requested file
386             final InputStream result = read("");
387             if (result != null) {
388                 return result;
389             }
390         } catch (final IOException ex) {
391             exception = ex;
392         }
393 
394         try {
395             // 2. on failure, try to read its backup
396             final InputStream result = read(".backup");
397             if (result != null) {
398                 return result;
399             }
400         } catch (final IOException ex) {
401             if (exception == null) {
402                 exception = ex;
403             }
404         }
405 
406         // 3. only on failure check whether the two files exist
407         final boolean fileExists = lastModified("") != null;
408         final boolean backupExists = lastModified(".backup") != null;
409 
410         // 4. if they don't exist it's ok, just report this returning null
411         if (!fileExists && !backupExists) {
412             return null;
413         }
414 
415         // 5. otherwise we throw an exception (possibly the ones got before)
416         if (exception == null) {
417             exception = new IOException("Cannot read "
418                     + (fileExists ? this.url : this.url + ".backup") + " (file reported to exist)");
419         }
420         throw exception;
421     }
422 
423     private OutputStream writeWithBackup() throws IOException {
424 
425         // 1. delete filename.new if it exists
426         delete(".new");
427 
428         // 2. if filename exists, rename it to filename.backup (deleting old backup)
429         if (lastModified("") != null) {
430             delete(".backup");
431             rename("", ".backup");
432         }
433 
434         // 3. create filename.new, returning a stream for writing its content
435         return new FilterOutputStream(write(".new")) {
436 
437             @Override
438             public void close() throws IOException {
439                 super.close();
440                 rename(".new", "");
441             }
442 
443         };
444     }
445 
446     private Long lastModifiedWithBackup() throws IOException {
447 
448         Long lastModified = lastModified("");
449         if (lastModified == null) {
450             lastModified = lastModified(".backup");
451         }
452         return lastModified;
453     }
454 
455     private static final class LocalDictionary<T extends Serializable> extends Dictionary<T> {
456 
457         private final File file;
458 
459         LocalDictionary(final Class<T> objectClass, final String url, final File file) {
460             super(objectClass, url);
461             this.file = file;
462         }
463 
464         @Override
465         @Nullable
466         Long lastModified(final String suffix) throws IOException {
467             final long modifiedTime = applySuffix(suffix).lastModified();
468             return modifiedTime > 0 ? modifiedTime : null;
469         }
470 
471         @Override
472         InputStream read(final String suffix) throws IOException {
473             return IO.read(applySuffix(suffix).getAbsolutePath());
474         }
475 
476         @Override
477         OutputStream write(final String suffix) throws IOException {
478             return IO.write(applySuffix(suffix).getAbsolutePath());
479         }
480 
481         @Override
482         void delete(final String suffix) throws IOException {
483             applySuffix(suffix).delete();
484         }
485 
486         @Override
487         void rename(final String oldSuffix, final String newSuffix) throws IOException {
488             java.nio.file.Files.move(applySuffix(oldSuffix).toPath(), applySuffix(newSuffix)
489                     .toPath());
490         }
491 
492         private File applySuffix(final String suffix) {
493             return Strings.isNullOrEmpty(suffix) ? this.file : new File(
494                     this.file.getAbsolutePath() + suffix);
495         }
496 
497     }
498 
499     private static final class HadoopDictionary<T extends Serializable> extends Dictionary<T> {
500 
501         private final FileSystem fs;
502 
503         private final Path path;
504 
505         HadoopDictionary(final Class<T> objectClass, final String url, final FileSystem fs,
506                 final Path path) {
507             super(objectClass, url);
508             this.fs = fs;
509             this.path = path;
510         }
511 
512         @Override
513         @Nullable
514         Long lastModified(final String suffix) throws IOException {
515             final Path path = applySuffix(suffix);
516             try {
517                 final FileStatus status = this.fs.getFileStatus(path);
518                 if (status != null) {
519                     return status.getModificationTime();
520                 }
521 
522             } catch (final IOException ex) {
523                 if (this.fs.exists(path)) {
524                     throw ex;
525                 }
526             }
527             return null;
528         }
529 
530         @Override
531         InputStream read(final String suffix) throws IOException {
532             return this.fs.open(applySuffix(suffix));
533         }
534 
535         @Override
536         OutputStream write(final String suffix) throws IOException {
537             return this.fs.create(applySuffix(suffix));
538         }
539 
540         @Override
541         void delete(final String suffix) throws IOException {
542             final Path path = applySuffix(suffix);
543             IOException exception = null;
544             try {
545                 if (this.fs.delete(path, false)) {
546                     return;
547                 }
548             } catch (final IOException ex) {
549                 exception = ex;
550             }
551             if (this.fs.exists(path)) {
552                 throw exception != null ? exception : new IOException("Cannot delete " + path);
553             }
554         }
555 
556         @Override
557         void rename(final String oldSuffix, final String newSuffix) throws IOException {
558             if (oldSuffix.equals(newSuffix)) {
559                 return;
560             }
561             final Path from = applySuffix(oldSuffix);
562             final Path to = applySuffix(newSuffix);
563             final boolean renamed = this.fs.rename(from, to);
564             if (!renamed) {
565                 String message = "Cannot rename " + from + " to " + to;
566                 if (this.fs.exists(to)) {
567                     message += ": destination already exists";
568                 } else if (this.fs.exists(from)) {
569                     message += ": source does not exist";
570                 }
571                 throw new IOException(message);
572             }
573         }
574 
575         private Path applySuffix(final String suffix) {
576             return Strings.isNullOrEmpty(suffix) ? this.path : new Path(this.path.getParent()
577                     + "/" + this.path.getName() + suffix);
578         }
579 
580     }
581 
582 }