1   package eu.fbk.knowledgestore.triplestore;
2   
3   import java.io.IOException;
4   import java.lang.ref.WeakReference;
5   import java.util.List;
6   import java.util.concurrent.atomic.AtomicBoolean;
7   import java.util.concurrent.atomic.AtomicInteger;
8   
9   import javax.annotation.Nullable;
10  
11  import com.google.common.base.Preconditions;
12  import com.google.common.collect.Lists;
13  
14  import org.openrdf.model.Resource;
15  import org.openrdf.model.Statement;
16  import org.openrdf.model.URI;
17  import org.openrdf.model.Value;
18  import org.openrdf.query.BindingSet;
19  import org.openrdf.query.QueryEvaluationException;
20  import org.slf4j.Logger;
21  import org.slf4j.LoggerFactory;
22  
23  import info.aduna.iteration.CloseableIteration;
24  
25  import eu.fbk.knowledgestore.data.Handler;
26  import eu.fbk.knowledgestore.internal.Util;
27  import eu.fbk.knowledgestore.runtime.Component;
28  import eu.fbk.knowledgestore.runtime.Synchronizer;
29  
30  /**
31   * A {@code TripleStore} wrapper that synchronizes and enforces a proper access to a wrapped
32   * {@code TripleStore}.
33   * <p>
34   * This wrapper provides the following guarantees with respect to external access to the wrapped
35   * {@link TripleStore}:
36   * <ul>
37   * <li>transaction are started and committed according to the synchronization strategy enforced by
38   * a supplied {@link Synchronizer};</li>
39   * <li>at most one thread at a time can access the wrapped {@code TripleStore} and its
40   * {@code TripleTransaction}s, with the only exception of {@link TripleStore#close()} and
41   * {@link TripleTransaction#end(boolean)} which may be called concurrently with other active
42   * operations;</li>
43   * <li>access to the wrapped {@code TripleStore} and its {@code TripleTransaction} is enforced to
44   * occur strictly in adherence with the lifecycle defined for {@link Component}s (
45   * {@code IllegalStateException}s are returned to the caller otherwise);</li>
46   * <li>method {@link #reset()} of wrapped {@code TripleStore} is called with no transactions
47   * active (this implies waiting for completion of pending transactions);</li>
48   * <li>before a {@code TripleTransaction} is ended, all the iterations previously returned and
49   * still open to be forcedly closed;</li>
50   * <li>before the {@code TripleStore} is closed, pending {@code TripleTransaction} are forcedly
51   * ended with a rollback.</li>
52   * </ul>
53   * </p>
54   */
55  public class SynchronizedTripleStore extends ForwardingTripleStore {
56  
57      private static final Logger LOGGER = LoggerFactory.getLogger(SynchronizedTripleStore.class);
58  
59      private static final int NEW = 0;
60  
61      private static final int INITIALIZED = 1;
62  
63      private static final int CLOSED = 2;
64  
65      private final TripleStore delegate;
66  
67      private final Synchronizer synchronizer;
68  
69      private final List<TripleTransaction> transactions;
70  
71      private final AtomicInteger state;
72  
73      /**
74       * Creates a new instance for the wrapped {@code TripleStore} and the {@code Synchronizer}
75       * specification string supplied.
76       *
77       * @param delegate
78       *            the wrapped {@code DataStore}
79       * @param synchronizerSpec
80       *            the synchronizer specification string (see {@link Synchronizer})
81       */
82      public SynchronizedTripleStore(final TripleStore delegate, final String synchronizerSpec) {
83          this(delegate, Synchronizer.create(synchronizerSpec));
84      }
85  
86      /**
87       * Creates a new instance for the wrapped {@code TripleStore} and {@code Synchronizer}
88       * specified.
89       *
90       * @param delegate
91       *            the wrapped {@code TripleStore}
92       * @param synchronizer
93       *            the synchronizer responsible to regulate the access to the wrapped
94       *            {@code TripleStore}
95       */
96      public SynchronizedTripleStore(final TripleStore delegate, final Synchronizer synchronizer) {
97          this.delegate = Preconditions.checkNotNull(delegate);
98          this.synchronizer = Preconditions.checkNotNull(synchronizer);
99          this.transactions = Lists.newArrayList();
100         this.state = new AtomicInteger(NEW);
101         LOGGER.debug("{} configured, synchronizer={}", getClass().getSimpleName(), synchronizer);
102     }
103 
104     @Override
105     protected TripleStore delegate() {
106         return this.delegate;
107     }
108 
109     private void checkState(final int expected) {
110         final int state = this.state.get();
111         if (state != expected) {
112             throw new IllegalStateException("TripleStore "
113                     + (state == NEW ? "not initialized"
114                             : state == INITIALIZED ? "already initialized" : "already closed"));
115         }
116     }
117 
118     @Override
119     public synchronized void init() throws IOException {
120         checkState(NEW);
121         super.init();
122         this.state.set(INITIALIZED);
123     }
124 
125     @Override
126     public TripleTransaction begin(final boolean readOnly) throws IOException {
127         checkState(INITIALIZED);
128         this.synchronizer.beginTransaction(readOnly);
129         TripleTransaction transaction = null;
130         try {
131             synchronized (this) {
132                 checkState(INITIALIZED);
133                 transaction = delegate().begin(readOnly);
134                 if (Thread.interrupted()) {
135                     transaction.end(false);
136                     throw new IllegalStateException("Interrupted");
137                 }
138                 transaction = new SynchronizedTripleTransaction(transaction, readOnly);
139                 synchronized (this.transactions) {
140                     this.transactions.add(transaction);
141                 }
142             }
143         } finally {
144             if (transaction == null) {
145                 this.synchronizer.endTransaction(readOnly);
146             }
147         }
148         return transaction;
149     }
150 
151     @Override
152     public void reset() throws IOException {
153         checkState(INITIALIZED);
154         this.synchronizer.beginExclusive();
155         try {
156             synchronized (this) {
157                 checkState(INITIALIZED);
158                 delegate().reset();
159             }
160         } finally {
161             this.synchronizer.endExclusive();
162         }
163     }
164 
165     @Override
166     public void close() {
167         if (!this.state.compareAndSet(INITIALIZED, CLOSED)
168                 && !this.state.compareAndSet(NEW, CLOSED)) {
169             return;
170         }
171         List<TripleTransaction> transactionsToEnd;
172         synchronized (this.transactions) {
173             transactionsToEnd = Lists.newArrayList(this.transactions);
174         }
175         try {
176             for (final TripleTransaction transaction : transactionsToEnd) {
177                 try {
178                     LOGGER.warn("Forcing rollback of tx " + transaction
179                             + " due to closure of TripleStore");
180                     transaction.end(false);
181                 } catch (final Throwable ex) {
182                     LOGGER.error("Exception caught while ending tx " + transaction
183                             + " (rollback assumed): " + ex.getMessage(), ex);
184                 }
185             }
186         } finally {
187             super.close();
188         }
189     }
190 
191     private final class SynchronizedTripleTransaction extends ForwardingTripleTransaction {
192 
193         private final TripleTransaction delegate;
194 
195         private final List<WeakReference<CloseableIteration<?, ?>>> iterations;
196 
197         private final boolean readOnly;
198 
199         private final AtomicBoolean ended;
200 
201         SynchronizedTripleTransaction(final TripleTransaction delegate, final boolean readOnly) {
202             this.delegate = delegate;
203             this.iterations = Lists.newArrayList();
204             this.readOnly = readOnly;
205             this.ended = new AtomicBoolean(false);
206         }
207 
208         @Override
209         protected TripleTransaction delegate() {
210             return this.delegate;
211         }
212 
213         private <T extends CloseableIteration<?, ?>> T registerIteration(
214                 @Nullable final T iteration) {
215             synchronized (this.iterations) {
216                 if (iteration == null) {
217                     return null;
218                 } else if (this.ended.get() || Thread.interrupted()) {
219                     Util.closeQuietly(iteration);
220                     throw new IllegalStateException("Closed / interrupted");
221                 } else {
222                     final int size = this.iterations.size();
223                     for (int i = size - 1; i >= 0; --i) {
224                         if (this.iterations.get(i).get() == null) {
225                             this.iterations.remove(i);
226                         }
227                     }
228                     this.iterations.add(new WeakReference<CloseableIteration<?, ?>>(iteration));
229                 }
230             }
231             return iteration;
232         }
233 
234         private void closeIterations() {
235             synchronized (this.iterations) {
236                 final int size = this.iterations.size();
237                 for (int i = size - 1; i >= 0; --i) {
238                     Util.closeQuietly(this.iterations.remove(i).get());
239                 }
240             }
241         }
242 
243         private void checkState() {
244             if (this.ended.get()) {
245                 throw new IllegalStateException("DataTransaction already ended");
246             }
247             if (Thread.interrupted()) {
248                 throw new IllegalStateException("Interrupted");
249             }
250         }
251 
252         @Override
253         public synchronized CloseableIteration<? extends Statement, ? extends Exception> get(
254                 @Nullable final Resource subject, @Nullable final URI predicate,
255                 @Nullable final Value object, @Nullable final Resource context)
256                 throws IOException, IllegalStateException {
257             checkState();
258             return registerIteration(super.get(subject, predicate, object, context));
259         }
260 
261         @Override
262         public synchronized CloseableIteration<BindingSet, QueryEvaluationException> query(
263                 final SelectQuery query, @Nullable final BindingSet bindings,
264                 @Nullable final Long timeout) throws IOException, UnsupportedOperationException {
265             checkState();
266             return registerIteration(super.query(query, bindings, timeout));
267         }
268 
269         @Override
270         public synchronized void infer(@Nullable final Handler<? super Statement> handler)
271                 throws IOException, IllegalStateException {
272             checkState();
273             super.infer(handler);
274         }
275 
276         @Override
277         public synchronized void add(final Iterable<? extends Statement> stream)
278                 throws IOException, IllegalStateException {
279             checkState();
280             super.add(stream);
281         }
282 
283         @Override
284         public synchronized void remove(final Iterable<? extends Statement> stream)
285                 throws IOException, IllegalStateException {
286             checkState();
287             super.remove(stream);
288         }
289 
290         @Override
291         public void end(final boolean commit) throws IOException {
292             if (!this.ended.compareAndSet(false, true)) {
293                 return;
294             }
295             closeIterations();
296             SynchronizedTripleStore.this.synchronizer.beginCommit();
297             try {
298                 super.end(commit);
299             } finally {
300                 SynchronizedTripleStore.this.synchronizer.endCommit();
301                 SynchronizedTripleStore.this.synchronizer.endTransaction(this.readOnly);
302                 synchronized (SynchronizedTripleStore.this.transactions) {
303                     SynchronizedTripleStore.this.transactions.remove(this);
304                 }
305             }
306         }
307 
308     }
309 
310 }