1   package eu.fbk.knowledgestore.datastore;
2   
3   import java.io.IOException;
4   import java.lang.ref.WeakReference;
5   import java.util.List;
6   import java.util.Map;
7   import java.util.Set;
8   import java.util.concurrent.atomic.AtomicBoolean;
9   import java.util.concurrent.atomic.AtomicInteger;
10  
11  import javax.annotation.Nullable;
12  
13  import com.google.common.base.Preconditions;
14  import com.google.common.collect.Lists;
15  import com.google.common.collect.Sets;
16  
17  import org.openrdf.model.URI;
18  import org.slf4j.Logger;
19  import org.slf4j.LoggerFactory;
20  
21  import eu.fbk.knowledgestore.data.Record;
22  import eu.fbk.knowledgestore.data.Stream;
23  import eu.fbk.knowledgestore.data.XPath;
24  import eu.fbk.knowledgestore.internal.Util;
25  import eu.fbk.knowledgestore.runtime.Component;
26  import eu.fbk.knowledgestore.runtime.Synchronizer;
27  
28  /**
29   * A {@code DataStore} wrapper that synchronizes and enforces a proper access to an another
30   * {@code DataStore}.
31   * <p>
32   * This wrapper provides the following guarantees with respect to external access to the wrapped
33   * {@link DataStore}:
34   * <ul>
35   * <li>transaction are started and committed according to the synchronization strategy enforced by
36   * a supplied {@link Synchronizer};</li>
37   * <li>at most one thread at a time can access the wrapped {@code DataStore} and its
38   * {@code DataTransaction}s, with the only exception of {@link DataStore#close()} and
39   * {@link DataTransaction#end(boolean)} which may be called concurrently with other active
40   * operations;</li>
41   * <li>access to the wrapped {@code DataStore} and its {@code DataTransaction} is enforced to
42   * occur strictly in adherence with the lifecycle defined for {@link Component}s (
43   * {@code IllegalStateException}s are thrown to the caller otherwise);</li>
44   * <li>before a {@code DataTransaction} is ended, all the streams previously returned and still
45   * open to be forcedly closed;</li>
46   * <li>before the {@code DataStore} is closed, pending {@code DataTransaction} are forcedly ended
47   * with a rollback;</li>
48   * </ul>
49   * </p>
50   */
51  public final class SynchronizedDataStore extends ForwardingDataStore {
52  
53      private static final Logger LOGGER = LoggerFactory.getLogger(SynchronizedDataStore.class);
54  
55      private static final int NEW = 0;
56  
57      private static final int INITIALIZED = 1;
58  
59      private static final int CLOSED = 2;
60  
61      private final DataStore delegate;
62  
63      private final Synchronizer synchronizer;
64  
65      private final Set<DataTransaction> transactions; // used also as lock object
66  
67      private final AtomicInteger state;
68  
69      /**
70       * Creates a new instance for the wrapped {@code DataStore} and the {@code Synchronizer}
71       * specification string supplied.
72       * 
73       * @param delegate
74       *            the wrapped {@code DataStore}
75       * @param synchronizerSpec
76       *            the synchronizer specification string (see {@link Synchronizer})
77       */
78      public SynchronizedDataStore(final DataStore delegate, final String synchronizerSpec) {
79          this(delegate, Synchronizer.create(synchronizerSpec));
80      }
81  
82      /**
83       * Creates a new instance for the wrapped {@code DataStore} and {@code Synchronizer}
84       * specified.
85       * 
86       * @param delegate
87       *            the wrapped {@code DataStore}
88       * @param synchronizer
89       *            the synchronizer responsible to regulate the access to the wrapped
90       *            {@code DataStore}
91       */
92      public SynchronizedDataStore(final DataStore delegate, final Synchronizer synchronizer) {
93          this.delegate = Preconditions.checkNotNull(delegate);
94          this.synchronizer = Preconditions.checkNotNull(synchronizer);
95          this.transactions = Sets.newHashSet();
96          this.state = new AtomicInteger(NEW);
97          LOGGER.debug("{} configured, synchronizer=", getClass().getSimpleName(), synchronizer);
98      }
99  
100     @Override
101     protected DataStore delegate() {
102         return this.delegate;
103     }
104 
105     private void checkState(final int expected) {
106         final int state = this.state.get();
107         if (state != expected) {
108             throw new IllegalStateException("DataStore "
109                     + (state == NEW ? "not initialized"
110                             : state == INITIALIZED ? "already initialized" : "already closed"));
111         }
112     }
113 
114     @Override
115     public synchronized void init() throws IOException {
116         checkState(NEW);
117         super.init();
118         this.state.set(INITIALIZED);
119     }
120 
121     @Override
122     public DataTransaction begin(final boolean readOnly) throws IOException, IllegalStateException {
123         checkState(INITIALIZED);
124         this.synchronizer.beginTransaction(readOnly);
125         DataTransaction transaction = null;
126         try {
127             synchronized (this) {
128                 checkState(INITIALIZED);
129                 transaction = delegate().begin(readOnly);
130                 transaction = new SynchronizedDataTransaction(transaction, readOnly);
131                 synchronized (this.transactions) {
132                     this.transactions.add(transaction);
133                 }
134             }
135         } finally {
136             if (transaction == null) {
137                 this.synchronizer.endTransaction(readOnly);
138             }
139         }
140         return transaction;
141     }
142 
143     @Override
144     public void close() {
145         if (!this.state.compareAndSet(INITIALIZED, CLOSED)
146                 && !this.state.compareAndSet(NEW, CLOSED)) {
147             return;
148         }
149         List<DataTransaction> transactionsToEnd;
150         synchronized (this.transactions) {
151             transactionsToEnd = Lists.newArrayList(this.transactions);
152         }
153         try {
154             for (final DataTransaction transaction : transactionsToEnd) {
155                 try {
156                     LOGGER.warn("Forcing rollback of DataTransaction " + transaction
157                             + "due to closure of DataStore");
158                     transaction.end(false);
159                 } catch (final Throwable ex) {
160                     LOGGER.error("Exception caught while ending DataTransaction " + transaction
161                             + "(rollback assumed): " + ex.getMessage(), ex);
162                 }
163             }
164         } finally {
165             super.close();
166         }
167     }
168 
169     private final class SynchronizedDataTransaction extends ForwardingDataTransaction {
170 
171         private final DataTransaction delegate;
172 
173         private final List<WeakReference<Stream<?>>> streams;
174 
175         private final boolean readOnly;
176 
177         private final AtomicBoolean ended;
178 
179         SynchronizedDataTransaction(final DataTransaction delegate, final boolean readOnly) {
180             this.delegate = Preconditions.checkNotNull(delegate);
181             this.streams = Lists.newArrayList();
182             this.readOnly = readOnly;
183             this.ended = new AtomicBoolean(false);
184         }
185 
186         @Override
187         protected DataTransaction delegate() {
188             return this.delegate;
189         }
190 
191         private <T extends Stream<?>> T registerStream(@Nullable final T stream) {
192             synchronized (this.streams) {
193                 if (stream == null) {
194                     return null;
195                 } else if (this.ended.get()) {
196                     Util.closeQuietly(stream);
197                 } else {
198                     final int size = this.streams.size();
199                     for (int i = size - 1; i >= 0; --i) {
200                         if (this.streams.get(i).get() == null) {
201                             this.streams.remove(i);
202                         }
203                     }
204                     this.streams.add(new WeakReference<Stream<?>>(stream));
205                 }
206             }
207             return stream;
208         }
209 
210         private void closeStreams() {
211             synchronized (this.streams) {
212                 final int size = this.streams.size();
213                 for (int i = size - 1; i >= 0; --i) {
214                     Util.closeQuietly(this.streams.remove(i).get());
215                 }
216             }
217         }
218 
219         private void checkState() {
220             if (this.ended.get()) {
221                 throw new IllegalStateException("DataTransaction already ended");
222             }
223         }
224 
225         private void checkWritable() {
226             if (this.readOnly) {
227                 throw new IllegalStateException("DataTransaction is read-only");
228             }
229         }
230 
231         @Override
232         public synchronized Stream<Record> lookup(final URI type, final Set<? extends URI> ids,
233                 @Nullable final Set<? extends URI> properties) throws IOException,
234                 IllegalArgumentException, IllegalStateException {
235             checkState();
236             return registerStream(super.lookup(type, ids, properties));
237         }
238 
239         @Override
240         public synchronized Stream<Record> retrieve(final URI type,
241                 @Nullable final XPath condition, @Nullable final Set<? extends URI> properties)
242                 throws IOException, IllegalArgumentException, IllegalStateException {
243             checkState();
244             return registerStream(super.retrieve(type, condition, properties));
245         }
246 
247         @Override
248         public synchronized long count(final URI type, @Nullable final XPath condition)
249                 throws IOException, IllegalArgumentException, IllegalStateException {
250             checkState();
251             return super.count(type, condition);
252         }
253 
254         @Override
255         public Stream<Record> match(final Map<URI, XPath> conditions,
256                 final Map<URI, Set<URI>> ids, final Map<URI, Set<URI>> properties)
257                 throws IOException, IllegalStateException {
258             checkState();
259             return registerStream(super.match(conditions, ids, properties));
260         }
261 
262         @Override
263         public void store(final URI type, final Record record) throws IOException,
264                 IllegalStateException {
265             checkState();
266             checkWritable();
267             super.store(type, record);
268         }
269 
270         @Override
271         public void delete(final URI type, final URI id) throws IOException, IllegalStateException {
272             checkState();
273             checkWritable();
274             super.delete(type, id);
275         }
276 
277         @Override
278         public void end(final boolean commit) throws IOException, IllegalStateException {
279             if (!this.ended.compareAndSet(false, true)) {
280                 return;
281             }
282             closeStreams();
283             SynchronizedDataStore.this.synchronizer.beginCommit();
284             try {
285                 super.end(commit);
286             } finally {
287                 SynchronizedDataStore.this.synchronizer.endCommit();
288                 SynchronizedDataStore.this.synchronizer.endTransaction(this.readOnly);
289                 synchronized (SynchronizedDataStore.this.transactions) {
290                     SynchronizedDataStore.this.transactions.remove(this);
291                 }
292             }
293         }
294 
295     }
296 
297 }