1   package eu.fbk.knowledgestore.filestore;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.io.OutputStream;
6   import java.util.List;
7   import java.util.Set;
8   import java.util.concurrent.atomic.AtomicInteger;
9   
10  import com.google.common.base.Preconditions;
11  import com.google.common.base.Throwables;
12  import com.google.common.collect.Lists;
13  import com.google.common.collect.Sets;
14  
15  import org.slf4j.Logger;
16  import org.slf4j.LoggerFactory;
17  
18  import eu.fbk.knowledgestore.data.Stream;
19  import eu.fbk.knowledgestore.runtime.Component;
20  import eu.fbk.knowledgestore.runtime.Synchronizer;
21  
22  /**
23   * A {@code FileStore} wrapper that synchronizes and enforces a proper access to an another
24   * {@code FileStore}.
25   * <p>
26   * This wrapper provides the following guarantees with respect to external access to the wrapped
27   * {@link FileStore}:
28   * <ul>
29   * <li>operations are started and committed according to the synchronization strategy enforced by
30   * a supplied {@link Synchronizer};</li>
31   * <li>access to the wrapped {@code DataStore} and its {@code DataTransaction} is enforced to
32   * occur strictly in adherence with the lifecycle defined for {@link Component}s (
33   * {@code IllegalStateException}s are thrown to the caller otherwise);</li>
34   * <li>before the {@code FileStore} is closed, all pending {@code FileStore#list()} operations are
35   * terminated.</li>
36   * </ul>
37   * </p>
38   */
39  public final class SynchronizedFileStore extends ForwardingFileStore {
40  
41      private static final Logger LOGGER = LoggerFactory.getLogger(SynchronizedFileStore.class);
42  
43      private static final int NUM_LOCKS = 255;
44  
45      private static final int NEW = 0;
46  
47      private static final int INITIALIZED = 1;
48  
49      private static final int CLOSED = 2;
50  
51      private final FileStore delegate;
52  
53      private final Synchronizer synchronizer;
54  
55      private final AtomicInteger state;
56  
57      private final Object[] fileLocks;
58  
59      private final Set<Stream<String>> pendingListStreams;
60  
61      /**
62       * Creates a new instance for the wrapped {@code FileStore} and the {@code Synchronizer}
63       * specification string supplied.
64       *
65       * @param delegate
66       *            the wrapped {@code FileStore}
67       * @param synchronizerSpec
68       *            the synchronizer specification string (see {@link Synchronizer})
69       */
70      public SynchronizedFileStore(final FileStore delegate, final String synchronizerSpec) {
71          this.delegate = Preconditions.checkNotNull(delegate);
72          this.synchronizer = Synchronizer.create(synchronizerSpec);
73          this.state = new AtomicInteger(NEW);
74          this.fileLocks = new Object[NUM_LOCKS];
75          for (int i = 0; i < NUM_LOCKS; ++i) {
76              this.fileLocks[i] = new Object();
77          }
78          this.pendingListStreams = Sets.newHashSet();
79      }
80  
81      @Override
82      protected FileStore delegate() {
83          return this.delegate;
84      }
85  
86      private void checkState(final int expected) {
87          final int state = this.state.get();
88          if (state != expected) {
89              throw new IllegalStateException("FileStore "
90                      + (state == NEW ? "not initialized"
91                              : state == INITIALIZED ? "already initialized" : "already closed"));
92          }
93      }
94  
95      private Object lockFor(final String fileName) {
96          return this.fileLocks[fileName.hashCode() % 0x7FFFFFFF % NUM_LOCKS];
97      }
98  
99      @Override
100     public void init() throws IOException {
101         checkState(NEW);
102         super.init();
103         this.state.set(INITIALIZED);
104     }
105 
106     @Override
107     public InputStream read(final String fileName) throws FileMissingException, IOException {
108         checkState(INITIALIZED);
109         Preconditions.checkNotNull(fileName);
110         this.synchronizer.beginTransaction(true);
111         try {
112             checkState(INITIALIZED);
113             synchronized (lockFor(fileName)) {
114                 return super.read(fileName);
115             }
116         } finally {
117             this.synchronizer.endTransaction(true);
118         }
119     }
120 
121     @Override
122     public OutputStream write(final String fileName) throws FileExistsException, IOException {
123         checkState(INITIALIZED);
124         Preconditions.checkNotNull(fileName);
125         this.synchronizer.beginTransaction(false);
126         try {
127             checkState(INITIALIZED);
128             synchronized (lockFor(fileName)) {
129                 return super.write(fileName);
130             }
131         } finally {
132             this.synchronizer.endTransaction(false);
133         }
134     }
135 
136     @Override
137     public void delete(final String fileName) throws FileMissingException, IOException {
138         checkState(INITIALIZED);
139         Preconditions.checkNotNull(fileName);
140         this.synchronizer.beginTransaction(false);
141         try {
142             checkState(INITIALIZED);
143             synchronized (lockFor(fileName)) {
144                 super.delete(fileName);
145             }
146         } finally {
147             this.synchronizer.endTransaction(false);
148         }
149     }
150 
151     @Override
152     public Stream<String> list() throws IOException {
153 
154         checkState(INITIALIZED);
155 
156         this.synchronizer.beginTransaction(true);
157         try {
158             checkState(INITIALIZED);
159             final Stream<String> stream = super.list();
160             synchronized (this.pendingListStreams) {
161                 this.pendingListStreams.add(stream);
162             }
163             stream.onClose(new Runnable() {
164 
165                 @Override
166                 public void run() {
167                     SynchronizedFileStore.this.synchronizer.endTransaction(true);
168                     synchronized (SynchronizedFileStore.this.pendingListStreams) {
169                         SynchronizedFileStore.this.pendingListStreams.remove(this);
170                     }
171                 }
172 
173             });
174             return stream;
175 
176         } catch (final Throwable ex) {
177             this.synchronizer.endTransaction(true);
178             Throwables.propagateIfPossible(ex, IOException.class);
179             throw Throwables.propagate(ex);
180         }
181     }
182 
183     @Override
184     public void close() {
185         if (!this.state.compareAndSet(INITIALIZED, CLOSED)
186                 && !this.state.compareAndSet(NEW, CLOSED)) {
187             return;
188         }
189         List<Stream<String>> streamsToEnd;
190         synchronized (this.pendingListStreams) {
191             streamsToEnd = Lists.newArrayList(this.pendingListStreams);
192         }
193         try {
194             for (final Stream<String> stream : streamsToEnd) {
195                 try {
196                     LOGGER.warn("Forcing closure of stream due to FileStore closure");
197                     stream.close();
198                 } catch (final Throwable ex) {
199                     LOGGER.error("Exception caught while closing stream: " + ex.getMessage(), ex);
200                 }
201             }
202         } finally {
203             super.close();
204         }
205     }
206 
207 }