1   package eu.fbk.knowledgestore.triplestore;
2   
3   import com.google.common.base.Preconditions;
4   import eu.fbk.knowledgestore.data.Data;
5   import eu.fbk.knowledgestore.data.Handler;
6   import eu.fbk.knowledgestore.data.Stream;
7   import info.aduna.iteration.CloseableIteration;
8   import info.aduna.iteration.IterationWrapper;
9   import org.openrdf.model.Resource;
10  import org.openrdf.model.Statement;
11  import org.openrdf.model.URI;
12  import org.openrdf.model.Value;
13  import org.openrdf.query.BindingSet;
14  import org.openrdf.query.QueryEvaluationException;
15  import org.slf4j.Logger;
16  import org.slf4j.LoggerFactory;
17  
18  import javax.annotation.Nullable;
19  import java.io.IOException;
20  import java.util.concurrent.atomic.AtomicBoolean;
21  import java.util.concurrent.atomic.AtomicLong;
22  
23  /**
24   * A {@code TripleStore} wrapper that log calls to the operations of a wrapped {@code TripleStore}
25   * and their execution times.
26   * <p>
27   * This wrapper intercepts calls to an underlying {@code TripleStore} and to the
28   * {@code TripleTransaction}s it creates, and logs request information and execution times via
29   * SLF4J (level DEBUG, logger named after this class). The overhead introduced by this wrapper
30   * when logging is disabled is negligible.
31   * </p>
32   */
33  public final class LoggingTripleStore extends ForwardingTripleStore {
34  
35      private static final Logger LOGGER = LoggerFactory.getLogger(LoggingTripleStore.class);
36  
37      private final TripleStore delegate;
38  
39      /**
40       * Creates a new instance for the wrapped {@code TripleStore} specified.
41       *
42       * @param delegate
43       *            the wrapped {@code TripleStore}
44       */
45      public LoggingTripleStore(final TripleStore delegate) {
46          this.delegate = Preconditions.checkNotNull(delegate);
47          LOGGER.debug("{} configured", getClass().getSimpleName());
48      }
49  
50      @Override
51      protected TripleStore delegate() {
52          return this.delegate;
53      }
54  
55      @Override
56      public void init() throws IOException {
57          if (LOGGER.isDebugEnabled()) {
58              final long ts = System.currentTimeMillis();
59              super.init();
60              LOGGER.debug("{} - initialized in {} ms", this, System.currentTimeMillis() - ts);
61          } else {
62              super.init();
63          }
64      }
65  
66      @Override
67      public TripleTransaction begin(final boolean readOnly) throws IOException {
68          if (LOGGER.isDebugEnabled()) {
69              final long ts = System.currentTimeMillis();
70              final TripleTransaction transaction = new LoggingTripleTransaction(
71                      super.begin(readOnly), ts);
72              LOGGER.debug("{} - started in {} mode in {} ms", transaction, readOnly ? "read-only"
73                      : "read-write", System.currentTimeMillis() - ts);
74              return transaction;
75          } else {
76              return super.begin(readOnly);
77          }
78      }
79  
80      @Override
81      public void reset() throws IOException {
82          if (LOGGER.isDebugEnabled()) {
83              final long ts = System.currentTimeMillis();
84              super.reset();
85              LOGGER.debug("{} - reset done in {} ms", this, System.currentTimeMillis() - ts);
86          } else {
87              super.reset();
88          }
89      }
90  
91      @Override
92      public void close() {
93          if (LOGGER.isDebugEnabled()) {
94              final long ts = System.currentTimeMillis();
95              super.close();
96              LOGGER.debug("{} - closed in {} ms", this, System.currentTimeMillis() - ts);
97          } else {
98              super.close();
99          }
100     }
101 
102     private static final class LoggingTripleTransaction extends ForwardingTripleTransaction {
103 
104         private final TripleTransaction delegate;
105 
106         private final long ts;
107 
108         LoggingTripleTransaction(final TripleTransaction delegate, final long ts) {
109             this.delegate = Preconditions.checkNotNull(delegate);
110             this.ts = ts;
111         }
112 
113         @Override
114         protected TripleTransaction delegate() {
115             return this.delegate;
116         }
117 
118         private String format(@Nullable final Value value) {
119             return value == null ? "*" : Data.toString(value, Data.getNamespaceMap());
120         }
121 
122         @Nullable
123         private <T, E extends Exception> CloseableIteration<T, E> logClose(
124                 @Nullable final CloseableIteration<T, E> iteration, final String name,
125                 final long ts) {
126             return iteration == null ? null : new IterationWrapper<T, E>(iteration) {
127 
128                 private int count = 0;
129 
130                 private boolean hasNext = true;
131 
132                 @Override
133                 public boolean hasNext() throws E {
134                     this.hasNext = super.hasNext();
135                     return this.hasNext;
136                 }
137 
138                 @Override
139                 public T next() throws E {
140                     final T result = super.next();
141                     ++this.count;
142                     return result;
143                 }
144 
145                 @Override
146                 protected void handleClose() throws E {
147                     try {
148                         super.handleClose();
149                     } finally {
150                         if (LOGGER.isDebugEnabled()) {
151                             LOGGER.debug("{} - {} closed after {} ms, {} results retrieved{}",
152                                     LoggingTripleTransaction.this, name,
153                                     System.currentTimeMillis() - ts, this.count, this.hasNext ? ""
154                                             : " (exhausted)");
155                         }
156                     }
157                 }
158 
159             };
160         }
161 
162         @Override
163         public CloseableIteration<? extends Statement, ? extends Exception> get(
164                 @Nullable final Resource subject, @Nullable final URI predicate,
165                 @Nullable final Value object, @Nullable final Resource context)
166                 throws IOException, IllegalStateException {
167 
168             if (LOGGER.isDebugEnabled()) {
169                 final String name = "statement iteration for <" + format(subject) + ", "
170                         + format(predicate) + ", " + format(object) + ", " + format(context) + ">";
171                 final long ts = System.currentTimeMillis();
172                 CloseableIteration<? extends Statement, ? extends Exception> result;
173                 result = logClose(super.get(subject, predicate, object, context), name, ts);
174                 LOGGER.debug("{} - {} obtained in {} ms", this, name, System.currentTimeMillis()
175                         - ts);
176                 return result;
177             } else {
178                 return super.get(subject, predicate, object, context);
179             }
180         }
181 
182         @Override
183         public CloseableIteration<BindingSet, QueryEvaluationException> query(
184                 final SelectQuery query, @Nullable final BindingSet bindings, final Long timeout)
185                 throws IOException, UnsupportedOperationException {
186 
187             if (LOGGER.isDebugEnabled()) {
188                 LOGGER.debug("{} - evaluating query ({} bindings, {} timeout):\n{}", this,
189                         bindings == null ? 0 : bindings.size(), timeout, query);
190                 final String name = "query result iteration";
191                 final long ts = System.currentTimeMillis();
192                 CloseableIteration<BindingSet, QueryEvaluationException> result;
193                 result = logClose(super.query(query, bindings, timeout), name, ts);
194                 LOGGER.debug("{} - {} obtained in {} ms", this, name, System.currentTimeMillis()
195                         - ts);
196                 return result;
197             } else {
198                 return super.query(query, bindings, timeout);
199             }
200         }
201 
202         @Override
203         public void infer(@Nullable final Handler<? super Statement> handler) throws IOException,
204                 IllegalStateException {
205 
206             if (LOGGER.isDebugEnabled()) {
207                 LOGGER.debug("{} - start materializing inferences");
208                 final long ts = System.currentTimeMillis();
209                 super.infer(handler);
210                 LOGGER.debug("{} - inferences materialized in {} ms", this,
211                         System.currentTimeMillis() - ts);
212             } else {
213                 super.infer(handler);
214             }
215         }
216 
217         @Override
218         public void add(final Iterable<? extends Statement> statements) throws IOException,
219                 IllegalStateException {
220 
221             if (LOGGER.isDebugEnabled()) {
222                 final AtomicLong count = new AtomicLong();
223                 final AtomicBoolean eof = new AtomicBoolean();
224                 @SuppressWarnings("unchecked")
225                 Iterable<Statement> stmts = (Iterable<Statement>) statements;
226                 final Stream<Statement> stream = Stream.create(stmts).track(count, eof);
227                 final long ts = System.currentTimeMillis();
228                 super.add(stream);
229                 LOGGER.debug("{} - {} statements added in {} ms{}", this, count,
230                         System.currentTimeMillis() - ts, eof.get() ? ", EOF" : "");
231             } else {
232                 super.add(statements);
233             }
234 
235         }
236 
237         @Override
238         public void remove(final Iterable<? extends Statement> statements) throws IOException,
239                 IllegalStateException {
240 
241             if (LOGGER.isDebugEnabled()) {
242                 final AtomicLong count = new AtomicLong();
243                 final AtomicBoolean eof = new AtomicBoolean();
244                 @SuppressWarnings("unchecked")
245                 Iterable<Statement> stmts = (Iterable<Statement>) statements;
246                 final Stream<Statement> stream = Stream.create(stmts).track(count, eof);
247                 final long ts = System.currentTimeMillis();
248                 super.remove(stream);
249                 LOGGER.debug("{} - {} statements removed in {} ms{}", this, count,
250                         System.currentTimeMillis() - ts, eof.get() ? ", EOF" : "");
251             } else {
252                 super.remove(statements);
253             }
254         }
255 
256         @Override
257         public void end(final boolean commit) throws IOException {
258 
259             if (LOGGER.isDebugEnabled()) {
260                 final long ts = System.currentTimeMillis();
261                 super.end(commit);
262                 final long ts2 = System.currentTimeMillis();
263                 LOGGER.debug("{} - {} done in {} ms, tx duration {} ms", this, commit ? "commit"
264                         : "rollback", ts2 - ts, ts2 - this.ts);
265             } else {
266                 super.end(commit);
267             }
268         }
269 
270     }
271 
272 }