1   package eu.fbk.knowledgestore.runtime;
2   
3   import java.util.concurrent.Semaphore;
4   
5   import javax.annotation.Nullable;
6   
7   import com.google.common.base.Preconditions;
8   import com.google.common.base.Throwables;
9   
10  public final class Synchronizer {
11  
12      public static final int WX = -1;
13  
14      public static final int CX = -2;
15  
16      private final int maxConcurrentTx;
17  
18      private final int maxWriteTx; // can assume special values 0, WX or CX
19  
20      private final Semaphore mainSemaphore;
21  
22      @Nullable
23      private final Semaphore writeSemaphore;
24  
25      @Nullable
26      private final Semaphore commitSemaphore;
27  
28      private Synchronizer(final int maxConcurrentTx, final int maxWriteTx) {
29  
30          Preconditions.checkArgument(maxConcurrentTx > 0);
31          Preconditions.checkArgument(maxWriteTx >= 0 && maxWriteTx <= maxConcurrentTx
32                  || maxWriteTx == WX || maxWriteTx == CX);
33  
34          this.maxConcurrentTx = maxConcurrentTx;
35          this.maxWriteTx = maxWriteTx;
36          this.mainSemaphore = new Semaphore(maxConcurrentTx, true);
37          this.writeSemaphore = maxWriteTx != 0 ? new Semaphore(Math.max(maxWriteTx, 1), true)
38                  : null;
39          this.commitSemaphore = maxWriteTx == CX ? new Semaphore(1, true) : null;
40      }
41  
42      public static Synchronizer create(final String spec) {
43  
44          final int index = spec.indexOf(':');
45          final String first = (index <= 0 ? spec : spec.substring(0, index)).trim();
46          final String second = index <= 0 ? null : spec.substring(index + 1).trim().toUpperCase();
47  
48          try {
49              final int maxConcurrentTx = Integer.parseInt(first);
50              final int maxWriteTx = second == null ? 0 : second.equals("WX") ? WX : //
51                      second.equals("CX") ? CX : Integer.parseInt(second);
52              return new Synchronizer(maxConcurrentTx, maxWriteTx);
53          } catch (final Throwable ex) {
54              throw new IllegalArgumentException(
55                      "Illegal synchronizer specification '" + spec + "'", ex);
56          }
57      }
58  
59      public static Synchronizer create(final int maxConcurrentTx, final int maxWriteTx) {
60          return new Synchronizer(maxConcurrentTx, maxWriteTx);
61      }
62  
63      public void beginExclusive() {
64          try {
65              this.mainSemaphore.acquire(this.maxConcurrentTx);
66          } catch (final Throwable ex) {
67              Throwables.propagate(ex);
68          }
69      }
70  
71      public void endExclusive() {
72          this.mainSemaphore.release(this.maxConcurrentTx);
73      }
74  
75      public void beginTransaction(final boolean readOnly) {
76          boolean writeAcquired = false;
77          try {
78              if (readOnly) {
79                  this.mainSemaphore.acquire();
80              } else if (this.writeSemaphore == null) {
81                  throw new IllegalStateException("Write transactions have been disabled");
82              } else {
83                  this.writeSemaphore.acquire();
84                  writeAcquired = true;
85                  this.mainSemaphore.acquire(this.maxWriteTx == WX ? this.maxConcurrentTx : 1);
86              }
87          } catch (final Throwable ex) {
88              if (writeAcquired) {
89                  this.writeSemaphore.release();
90              }
91              Throwables.propagate(ex);
92          }
93      }
94  
95      public void endTransaction(final boolean readOnly) {
96          if (readOnly) {
97              this.mainSemaphore.release(1);
98          } else {
99              this.mainSemaphore.release(this.maxWriteTx == WX ? this.maxConcurrentTx : 1);
100             this.writeSemaphore.release();
101         }
102     }
103 
104     public void beginCommit() {
105         if (this.maxWriteTx == CX) {
106             boolean commitAcquired = false;
107             try {
108                 this.commitSemaphore.acquire();
109                 commitAcquired = true;
110                 this.mainSemaphore.acquire(this.maxConcurrentTx - 1);
111             } catch (final Throwable ex) {
112                 if (commitAcquired) {
113                     this.commitSemaphore.release();
114                 }
115                 Throwables.propagate(ex);
116             }
117         }
118     }
119 
120     public void endCommit() {
121         if (this.maxWriteTx == CX) {
122             this.mainSemaphore.release(this.maxConcurrentTx - 1);
123             this.commitSemaphore.release();
124         }
125     }
126 
127     @Override
128     public String toString() {
129         return this.maxConcurrentTx + ":"
130                 + (this.maxWriteTx == WX ? "WX" : this.maxWriteTx == CX ? "CX" : this.maxWriteTx);
131     }
132 
133 }