1   package eu.fbk.knowledgestore.tool;
2   
3   import java.io.BufferedReader;
4   import java.io.File;
5   import java.io.IOException;
6   import java.io.InputStream;
7   import java.io.Writer;
8   import java.nio.file.Path;
9   import java.nio.file.Paths;
10  import java.util.Arrays;
11  import java.util.List;
12  import java.util.Map;
13  import java.util.Properties;
14  import java.util.Random;
15  import java.util.Set;
16  import java.util.concurrent.ExecutorService;
17  import java.util.concurrent.Executors;
18  import java.util.concurrent.TimeUnit;
19  import java.util.concurrent.atomic.AtomicInteger;
20  import java.util.concurrent.atomic.AtomicLong;
21  import java.util.concurrent.atomic.AtomicReference;
22  import java.util.regex.Matcher;
23  import java.util.regex.Pattern;
24  
25  import javax.annotation.Nullable;
26  
27  import com.google.common.base.Charsets;
28  import com.google.common.base.Joiner;
29  import com.google.common.base.Preconditions;
30  import com.google.common.base.Splitter;
31  import com.google.common.base.Strings;
32  import com.google.common.collect.ImmutableList;
33  import com.google.common.collect.ImmutableSet;
34  import com.google.common.collect.Iterables;
35  import com.google.common.collect.Lists;
36  import com.google.common.collect.Maps;
37  import com.google.common.collect.Ordering;
38  import com.google.common.collect.Sets;
39  import com.google.common.primitives.Ints;
40  import com.google.common.primitives.Longs;
41  import com.google.common.util.concurrent.ThreadFactoryBuilder;
42  
43  import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
44  import org.openrdf.model.URI;
45  import org.openrdf.model.Value;
46  import org.openrdf.model.ValueFactory;
47  import org.openrdf.model.impl.ValueFactoryImpl;
48  import org.openrdf.query.Binding;
49  import org.openrdf.query.BindingSet;
50  import org.openrdf.query.impl.MapBindingSet;
51  import org.slf4j.Logger;
52  import org.slf4j.LoggerFactory;
53  import org.slf4j.MDC;
54  
55  import eu.fbk.knowledgestore.Operation.Sparql;
56  import eu.fbk.knowledgestore.Session;
57  import eu.fbk.knowledgestore.client.Client;
58  import eu.fbk.knowledgestore.data.Data;
59  import eu.fbk.knowledgestore.data.Record;
60  import eu.fbk.knowledgestore.data.Representation;
61  import eu.fbk.knowledgestore.data.Stream;
62  import eu.fbk.knowledgestore.internal.CommandLine;
63  import eu.fbk.knowledgestore.internal.Logging;
64  import eu.fbk.knowledgestore.vocabulary.KS;
65  import eu.fbk.rdfpro.util.IO;
66  import eu.fbk.rdfpro.util.Namespaces;
67  import eu.fbk.rdfpro.util.Statements;
68  import eu.fbk.rdfpro.util.Tracker;
69  
70  public final class TestDriver {
71  
72      private static final Logger LOGGER = LoggerFactory.getLogger(TestDriver.class);
73  
74      private static final ValueFactory FACTORY = ValueFactoryImpl.getInstance();
75  
76      private final String url; // test.url
77  
78      private final String username; // test.username
79  
80      private final String password; // test.password
81  
82      private final int warmupMixes; // test.warmupmixes
83  
84      private final int testMixes; // test.testmixes
85  
86      private final long warmupTime; // test.warmuptime
87  
88      private final long testTime; // test.testtime
89  
90      private final int clients; // test.clients
91  
92      private final long timeout; // test.timeout
93  
94      private final Query[] queries; // test.queries
95  
96      private final List<String> outputVariables;
97  
98      private List<String> inputVariables;
99  
100     private final byte[][] inputData;
101 
102     private final File outputFile;
103 
104     private final long seed;
105 
106     public static void main(final String... args) {
107         try {
108             // MDC.put(Logging.MDC_CONTEXT, "main");
109 
110             final CommandLine cmd = CommandLine
111                     .parser()
112                     .withName("ks-test-driver")
113                     .withHeader(
114                             "Perform a query scalability test against a KnowledgeStore. "
115                                     + "Test parameters and queries are supplied in a .properties file. "
116                                     + "Test data (produced with query-test-generator) "
117                                     + "is supplied in a .tsv file.")
118                     .withOption("c", "config", "the configuration file", "FILE",
119                             CommandLine.Type.FILE_EXISTING, true, false, true)
120                     .withFooter(
121                             "Test configuration may be overridden by supplying additional "
122                                     + "property=value\narguments on the command line.")
123                     .withLogger(LoggerFactory.getLogger("eu.fbk.nwrtools")).parse(args);
124 
125             final File configFile = cmd.getOptionValue("c", File.class);
126 
127             final Properties config = new Properties();
128             try (InputStream configStream = IO.read(configFile.getAbsolutePath())) {
129                 config.load(configStream);
130             }
131 
132             for (final String arg : cmd.getArgs(String.class)) {
133                 final int index = arg.indexOf('=');
134                 if (index > 0) {
135                     final String name = arg.substring(0, index);
136                     final String value = arg.substring(index + 1);
137                     config.setProperty(name, value);
138                 }
139             }
140 
141             new TestDriver(config, configFile.getParentFile()).run();
142 
143         } catch (final Throwable ex) {
144             CommandLine.fail(ex);
145         }
146     }
147 
148     public TestDriver(final Properties properties, @Nullable final File baseDir)
149             throws IOException {
150 
151         // Get base path
152         final Path base = (baseDir != null ? baseDir : new File(System.getProperty("user.dir")))
153                 .toPath();
154 
155         // Parse seed
156         this.seed = TestUtil.read(properties, "test.seed", Long.class, 0L);
157 
158         // Parse file names
159         final String dataArg = TestUtil.read(properties, "test.data", String.class);
160         final String outputArg = TestUtil.read(properties, "test.out", String.class);
161         final File dataFile = base.resolve(Paths.get(dataArg)).toFile();
162         this.outputFile = outputArg == null ? null : base.resolve(Paths.get(outputArg)).toFile();
163 
164         // Parse server URL, username and password
165         this.url = TestUtil.read(properties, "test.url", String.class);
166         this.username = TestUtil.read(properties, "test.username", String.class, null);
167         this.password = TestUtil.read(properties, "test.password", String.class, null);
168         LOGGER.info("SUT: {}{}", this.url,
169                 this.username == null && this.password == null ? " (anonymous access)"
170                         : " (authenticated access)");
171 
172         // Parse number of mixes, max times and client counts
173         this.warmupMixes = TestUtil.read(properties, "test.warmupmixes", Integer.class, 0);
174         this.testMixes = TestUtil.read(properties, "test.testmixes", Integer.class, 1);
175         this.warmupTime = TestUtil.read(properties, "test.warmuptime", Long.class, 3600L) * 1000;
176         this.testTime = TestUtil.read(properties, "test.testtime", Long.class, 3600L) * 1000;
177         this.clients = TestUtil.read(properties, "test.clients", Integer.class, 1);
178         LOGGER.info("{} mix(es), {} s warmup; {} mix(es), {} s test; {} client(s)",
179                 this.warmupMixes, this.warmupTime / 1000, this.testMixes, this.testTime / 1000,
180                 this.clients);
181 
182         // Parse default timeout
183         this.timeout = TestUtil.read(properties, "test.timeout", Long.class, -1L);
184 
185         // Parse test data
186         Preconditions.checkArgument(dataFile.exists(), "File " + dataFile + " does not exist");
187         final List<byte[]> data = Lists.newArrayList();
188         try (BufferedReader reader = new BufferedReader(IO.utf8Reader(IO.buffer(IO.read(dataFile
189                 .getAbsolutePath()))))) {
190             String line = reader.readLine();
191             final String[] inputVariables = line.split("\t");
192             for (int i = 0; i < inputVariables.length; ++i) {
193                 inputVariables[i] = inputVariables[i].substring(1);
194             }
195             this.inputVariables = ImmutableList.copyOf(inputVariables);
196             LOGGER.info("Input schema: ({})", Joiner.on(", ").join(this.inputVariables));
197             final Tracker tracker = new Tracker(LOGGER, null, //
198                     "Parsed " + dataFile + ": %d tuples (%d tuple/s avg)", //
199                     "Parsed %d tuples (%d tuple/s, %d tuple/s avg)");
200             tracker.start();
201             while ((line = reader.readLine()) != null) {
202                 data.add(line.getBytes(Charsets.UTF_8));
203                 tracker.increment();
204             }
205             tracker.end();
206         }
207         this.inputData = data.toArray(new byte[data.size()][]);
208 
209         // Parse queries
210         final Properties defaultQueryProperties = new Properties();
211         if (this.timeout >= 0) {
212             defaultQueryProperties.setProperty("timeout", Long.toString(this.timeout));
213         }
214         final List<Query> allQueries = Query.create(properties, defaultQueryProperties);
215         final List<Query> enabledQueries = Lists.newArrayList();
216         final Set<String> enabledNames = Sets.newLinkedHashSet(Arrays.asList(TestUtil.read(
217                 properties, "test.queries", String.class).split("\\s*[,]\\s*")));
218         for (final String name : enabledNames) {
219             boolean added = false;
220             for (final Query query : allQueries) {
221                 if (query.getName().equals(name)) {
222                     enabledQueries.add(query);
223                     added = true;
224                     break;
225                 }
226             }
227             Preconditions.checkArgument(added, "Unknown query " + name);
228         }
229         this.queries = enabledQueries.toArray(new Query[enabledQueries.size()]);
230         LOGGER.info("{} queries enabled ({} defined): {}", enabledQueries.size(),
231                 allQueries.size(), Joiner.on(", ").join(enabledQueries));
232 
233         // Check query variables and build list of output variables
234         final List<String> outputVariables = Lists.newArrayList("mix.client", "mix.index",
235                 "mix.input", "mix.start", "mix.time");
236         for (final String variable : this.inputVariables) {
237             outputVariables.add("input." + variable);
238         }
239         for (final Query query : this.queries) {
240             for (final String inputVariable : query.getInputVariables()) {
241                 if (!this.inputVariables.contains(inputVariable)) {
242                     throw new IllegalArgumentException("Query " + query
243                             + " refers to unknown input variable " + inputVariable);
244                 }
245             }
246             for (final String outputVariable : Ordering.natural().immutableSortedCopy(
247                     query.getOutputVariables())) {
248                 outputVariables.add(query.getName() + "." + outputVariable);
249             }
250         }
251         this.outputVariables = ImmutableList.copyOf(outputVariables);
252         LOGGER.info("Output schema: {} attributes", this.outputVariables.size());
253     }
254 
255     public void run() throws Throwable {
256 
257         // Open writer if possible
258         final Writer writer = this.outputFile == null ? null : IO.utf8Writer(IO.buffer(IO
259                 .write(this.outputFile.getAbsolutePath())));
260 
261         try {
262             // Allocate a random number generator
263             final Random random = new Random(this.seed);
264 
265             // Take a timestamp and allocate data structure for computing statistics
266             final long ts = System.currentTimeMillis();
267             final List<String> queryNames = Lists.newArrayList();
268             for (final Query query : this.queries) {
269                 queryNames.add(query.getName());
270             }
271             final Statistics stats = new Statistics(queryNames);
272 
273             // Log test started
274             LOGGER.info("Test started");
275 
276             // Perform warmup (if enabled)
277             if (this.warmupMixes > 0) {
278                 runClients(this.warmupMixes, this.warmupTime, random, "Warmup", null, null);
279             }
280 
281             // Perform the real test (if enabled)
282             if (this.clients > 0 && this.testMixes > 0) {
283                 if (writer != null) {
284                     for (int i = 0; i < this.outputVariables.size(); ++i) {
285                         writer.write(i == 0 ? "?" : "\t?");
286                         writer.write(this.outputVariables.get(i));
287                     }
288                     writer.write("\n");
289                 }
290                 runClients(this.testMixes, this.testTime, random, "Measurement", writer, stats);
291             }
292 
293             // Log test completion
294             LOGGER.info("Test completed in {} ms\n\n{}\n", System.currentTimeMillis() - ts, stats);
295 
296         } finally {
297             // Close TSV file
298             IO.closeQuietly(writer);
299         }
300     }
301 
302     private void runClients(final int maxMixes, final long maxTime, final Random random,
303             final String phaseName, @Nullable final Writer writer, @Nullable final Statistics stats)
304             throws Throwable {
305 
306         // Log start
307         LOGGER.info("{} started ({} clients, {} mix(es), {} queries/mix)", phaseName,
308                 this.clients, maxMixes, this.queries.length);
309 
310         // Create a Tracker to track the progress of the process
311         final Tracker tracker = new Tracker(LOGGER, null,
312                 "Completed %d query mixes (%d mixes/s avg)",
313                 "Completed %d query mixes (%d mixes/s, %d mixes/s avg)");
314         tracker.start();
315 
316         // Start a thread for each concurrent client
317         final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
318         final Thread mainThread = Thread.currentThread();
319         final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
320                 .setDaemon(true).setNameFormat("client-%02d").build());
321         final AtomicLong startTimestamp = new AtomicLong(Long.MAX_VALUE);
322         final AtomicLong endTimestamp = new AtomicLong(Long.MIN_VALUE);
323         final long[] clientExecutionTimes = new long[this.clients];
324         final int[] clientMixes = new int[this.clients];
325         try {
326             final AtomicInteger globalMixCounter = new AtomicInteger(maxMixes);
327             for (int i = 0; i < this.clients; ++i) {
328                 final int clientId = i;
329                 executor.submit(new Runnable() {
330 
331                     @Override
332                     public void run() {
333                         final String oldContext = MDC.get(Logging.MDC_CONTEXT);
334                         try {
335                             MDC.put(Logging.MDC_CONTEXT, String.format("client%d", clientId));
336                             final AtomicInteger localMixCounter = new AtomicInteger(0);
337                             final long startTs = System.currentTimeMillis();
338                             synchronized (startTimestamp) {
339                                 if (startTs < startTimestamp.get()) {
340                                     startTimestamp.set(startTs);
341                                 }
342                             }
343                             final long endTs = runClient(clientId, globalMixCounter,
344                                     localMixCounter, maxTime, random, tracker, startTs, writer,
345                                     stats);
346                             clientExecutionTimes[clientId] = endTs - startTs;
347                             clientMixes[clientId] = localMixCounter.get();
348                             synchronized (endTimestamp) {
349                                 if (endTs > endTimestamp.get()) {
350                                     endTimestamp.set(endTs);
351                                 }
352                             }
353                         } catch (final Throwable ex) {
354                             LOGGER.error("[{}] Client failed", clientId, ex);
355                             exceptionHolder.compareAndSet(null, ex);
356                             mainThread.interrupt(); // Stop waiting for other threads
357                         } finally {
358                             MDC.put(Logging.MDC_CONTEXT, oldContext);
359                         }
360                     }
361 
362                 });
363             }
364             executor.shutdown();
365             executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
366 
367         } catch (final InterruptedException ex) {
368             final Throwable ex2 = exceptionHolder.get();
369             if (ex2 != null) {
370                 ex.addSuppressed(ex2); // keep track of both exceptions
371             }
372             throw ex;
373 
374         } finally {
375             executor.shutdownNow();
376             tracker.end();
377         }
378 
379         // Report exception, if any
380         if (exceptionHolder.get() != null) {
381             throw exceptionHolder.get();
382         }
383 
384         // Compute and report elapsed time
385         final long elapsed = endTimestamp.get() - startTimestamp.get();
386         if (stats != null) {
387             stats.reportElapsedTest(elapsed);
388         }
389 
390         // Log completion
391         LOGGER.info("{} completed in {} ms (client time: {}-{} ms; client mixes: {}-{})",
392                 phaseName, elapsed, Longs.min(clientExecutionTimes),
393                 Longs.max(clientExecutionTimes), Ints.min(clientMixes), Ints.max(clientMixes));
394     }
395 
396     private long runClient(final int clientId, final AtomicInteger globalMixCounter,
397             final AtomicInteger localMixCounter, final long maxTime, final Random random,
398             @Nullable final Tracker tracker, final long startTimestamp,
399             @Nullable final Writer writer, @Nullable final Statistics stats) throws IOException {
400 
401         // Log start
402         LOGGER.debug("Client started");
403 
404         // Initialize a client and open a session with the SUT
405         long timestamp = startTimestamp;
406         try (Client client = Client.builder(this.url).compressionEnabled(true)
407                 .validateServer(false).build()) {
408             try (final Session session = client.newSession(this.username, this.password)) {
409 
410                 // Log connection acquired
411                 LOGGER.debug("Client ready");
412 
413                 // Perform as many query mixes as requested
414                 final String clientContext = MDC.get(Logging.MDC_CONTEXT);
415                 try {
416                     while (globalMixCounter.getAndDecrement() > 0
417                             && timestamp < startTimestamp + maxTime) {
418                         // Update MDC context
419                         final int mixIndex = localMixCounter.incrementAndGet();
420                         final String mixContext = String.format("%s.mix%d", clientContext,
421                                 mixIndex);
422                         MDC.put(Logging.MDC_CONTEXT, mixContext);
423 
424                         // Pick up a random input tuple
425                         final int index;
426                         synchronized (random) {
427                             index = random.nextInt(this.inputData.length);
428                         }
429                         final String inputLine = new String(this.inputData[index], Charsets.UTF_8);
430                         final BindingSet input = TestUtil.decode(this.inputVariables, inputLine);
431 
432                         // Start building the output tuple adding data identifying this mix
433                         final ValueFactory vf = Statements.VALUE_FACTORY;
434                         final MapBindingSet output = new MapBindingSet();
435                         final long mixStartTimestamp = timestamp;
436                         output.addBinding("mix.client", vf.createLiteral(clientId));
437                         output.addBinding("mix.index", vf.createLiteral(mixIndex));
438                         output.addBinding("mix.input", vf.createLiteral(index));
439                         output.addBinding("mix.start", vf.createLiteral(mixStartTimestamp));
440 
441                         // Log beginning of query mix
442                         LOGGER.debug("Started for input #{}", index);
443 
444                         // Evaluate the queries of the mix, augmenting the output tuple
445                         for (final Query query : this.queries) {
446                             try {
447                                 MDC.put(Logging.MDC_CONTEXT,
448                                         String.format("%s.%s", mixContext, query.getName()));
449                                 final MapBindingSet queryOutput = new MapBindingSet();
450                                 timestamp = query.evaluate(session, timestamp, input, queryOutput,
451                                         stats);
452                                 for (final Binding binding : queryOutput) {
453                                     output.addBinding(query.getName() + "." + binding.getName(),
454                                             binding.getValue());
455                                 }
456                             } finally {
457                                 MDC.put(Logging.MDC_CONTEXT, mixContext);
458                             }
459                         }
460                         final long elapsed = timestamp - mixStartTimestamp;
461 
462                         // Log completion of query mix
463                         LOGGER.debug("Completed in {} ms", elapsed);
464                         if (tracker != null) {
465                             tracker.increment();
466                         }
467 
468                         // Store query mix time and update associated statistics, if supplied
469                         output.addBinding("mix.time", vf.createLiteral(elapsed));
470                         if (stats != null) {
471                             stats.reportQueryMixCompletion(elapsed);
472                         }
473 
474                         // Emit the output tuple if a Writer has been supplied
475                         if (writer != null) {
476                             LOGGER.trace("Emitting:\n{}",
477                                     TestUtil.format(this.outputVariables, output, "\n"));
478                             final String outputLine = TestUtil
479                                     .encode(this.outputVariables, output);
480                             synchronized (writer) {
481                                 writer.write(outputLine);
482                                 writer.write("\n");
483                             }
484                         }
485                     }
486                 } finally {
487                     MDC.put(Logging.MDC_CONTEXT, clientContext);
488                 }
489             }
490         }
491 
492         // Log end
493         LOGGER.debug("Client terminated ({} query mixes)", localMixCounter.get());
494 
495         // Return end timestamp of last executed query
496         return timestamp;
497     }
498 
499     private static abstract class Query {
500 
501         private static final Logger LOGGER = LoggerFactory.getLogger(Query.class);
502 
503         private final String name;
504 
505         private final Long timeout;
506 
507         private final Set<String> inputVariables;
508 
509         private final Set<String> outputVariables;
510 
511         Query(final String name, final Properties properties,
512                 final Iterable<String> inputVariables, final Iterable<String> outputVariables) {
513 
514             final String timeout = properties.getProperty("timeout");
515 
516             this.name = name;
517             this.timeout = timeout != null ? Long.parseLong(timeout) : null;
518             this.inputVariables = ImmutableSet.copyOf(inputVariables);
519             this.outputVariables = ImmutableSet.copyOf(Iterables.concat( //
520                     ImmutableSet.of("start", "time", "error"), outputVariables));
521         }
522 
523         public static List<Query> create(final Properties properties,
524                 final Properties defaultQueryProperties) {
525 
526             final Map<String, Properties> map = Maps.newLinkedHashMap();
527             for (final Object key : properties.keySet()) {
528                 final String keyString = key.toString();
529                 final int index = keyString.indexOf(".");
530                 if (index > 0) {
531                     final String queryName = keyString.substring(0, index);
532                     final String propertyName = keyString.substring(index + 1);
533                     final String propertyValue = properties.getProperty(keyString);
534                     Properties queryProperties = map.get(queryName);
535                     if (queryProperties == null) {
536                         queryProperties = new Properties();
537                         queryProperties.putAll(defaultQueryProperties);
538                         map.put(queryName, queryProperties);
539                     }
540                     queryProperties.setProperty(propertyName, propertyValue);
541                 }
542             }
543 
544             final List<Query> queries = Lists.newArrayList();
545             for (final Map.Entry<String, Properties> entry : map.entrySet()) {
546                 final String queryName = entry.getKey();
547                 final Properties queryProperties = entry.getValue();
548                 final String queryType = queryProperties.getProperty("type");
549                 if ("download".equalsIgnoreCase(queryType)) {
550                     queries.add(new DownloadQuery(queryName, queryProperties));
551                 } else if ("retrieve".equalsIgnoreCase(queryType)) {
552                     queries.add(new RetrieveQuery(queryName, queryProperties));
553                 } else if ("lookup".equalsIgnoreCase(queryType)) {
554                     queries.add(new LookupQuery(queryName, queryProperties));
555                 } else if ("lookupall".equalsIgnoreCase(queryType)) {
556                     queries.add(new LookupAllQuery(queryName, queryProperties));
557                 } else if ("count".equalsIgnoreCase(queryType)) {
558                     queries.add(new CountQuery(queryName, queryProperties));
559                 } else if ("sparql".equalsIgnoreCase(queryType)) {
560                     queries.add(new SparqlQuery(queryName, queryProperties));
561                 }
562             }
563             return queries;
564         }
565 
566         public String getName() {
567             return this.name;
568         }
569 
570         public Long getTimeout() {
571             return this.timeout;
572         }
573 
574         public Set<String> getInputVariables() {
575             return this.inputVariables;
576         }
577 
578         public Set<String> getOutputVariables() {
579             return this.outputVariables;
580         }
581 
582         public long evaluate(final Session session, final long startTimestamp,
583                 final BindingSet input, final MapBindingSet output,
584                 @Nullable final Statistics stats) {
585 
586             final ValueFactory vf = ValueFactoryImpl.getInstance();
587 
588             if (LOGGER.isDebugEnabled()) {
589                 final StringBuilder builder = new StringBuilder();
590                 builder.append("Started: ");
591                 builder.append(TestUtil.format(this.inputVariables, input, " "));
592                 LOGGER.debug(builder.toString());
593             }
594 
595             String error = "";
596             try {
597                 doEvaluate(session, input, output);
598             } catch (final Throwable ex) {
599                 error = ex.getClass().getSimpleName() + " - "
600                         + Strings.nullToEmpty(ex.getMessage());
601                 LOGGER.warn("Got exception", ex);
602             }
603 
604             long size = -1;
605             try {
606                 final Value value = output.getValue("size");
607                 if (value != null) {
608                     size = Long.parseLong(value.stringValue());
609                 }
610             } catch (final Throwable ex) {
611                 // Ignore
612             }
613 
614             final long endTimestamp = System.currentTimeMillis();
615             final long elapsed = endTimestamp - startTimestamp;
616 
617             if (stats != null) {
618                 stats.reportQueryCompletion(this.name, !"".equals(error), elapsed, size);
619             }
620 
621             output.addBinding("time", vf.createLiteral(elapsed));
622 
623             if (LOGGER.isDebugEnabled()) {
624                 final StringBuilder builder = new StringBuilder();
625                 builder.append("".equals(error) ? "Success" : "Failure");
626                 builder.append(": ");
627                 builder.append(TestUtil.format(this.inputVariables, input, " "));
628                 builder.append(" -> ");
629                 builder.append(TestUtil.format(this.outputVariables, output, " "));
630                 LOGGER.debug(builder.toString());
631             }
632 
633             output.addBinding("start", vf.createLiteral(startTimestamp));
634             output.addBinding("error", vf.createLiteral(error));
635 
636             return endTimestamp;
637         }
638 
639         abstract void doEvaluate(Session session, BindingSet input, MapBindingSet output)
640                 throws Throwable;
641 
642         @Override
643         public String toString() {
644             return this.name;
645         }
646 
647         private static class DownloadQuery extends Query {
648 
649             private final Template id;
650 
651             private final boolean caching;
652 
653             DownloadQuery(final String name, final Properties properties) {
654                 this(name, properties, new Template(properties.getProperty("id")));
655             }
656 
657             private DownloadQuery(final String name, final Properties properties, final Template id) {
658                 super(name, properties, id.getVariables(), ImmutableList.of("size"));
659                 this.id = id;
660                 this.caching = "false".equalsIgnoreCase(properties.getProperty("caching"));
661             }
662 
663             @Override
664             void doEvaluate(final Session session, final BindingSet input,
665                     final MapBindingSet output) throws Throwable {
666 
667                 final URI id = (URI) Statements.parseValue(this.id.instantiate(input),
668                         Namespaces.DEFAULT);
669 
670                 long size = 0L;
671                 try (final Representation representation = session.download(id)
672                         .caching(this.caching).timeout(getTimeout()).exec()) {
673                     if (representation != null) {
674                         size = representation.writeToByteArray().length;
675                     } else {
676                         LOGGER.warn("No results for DOWNLOAD request, id " + id);
677                     }
678                 } catch (final Throwable ex) {
679                     throw new RuntimeException("Failed DOWNLOAD, id " + TestUtil.format(id)
680                             + ", caching " + this.caching, ex);
681                 } finally {
682                     output.addBinding("size", FACTORY.createLiteral(size));
683                 }
684             }
685         }
686 
687         private static class RetrieveQuery extends Query {
688 
689             private final URI layer;
690 
691             @Nullable
692             private final Template condition;
693 
694             @Nullable
695             private final Long offset;
696 
697             @Nullable
698             private final Long limit;
699 
700             @Nullable
701             private final List<URI> properties;
702 
703             RetrieveQuery(final String name, final Properties properties) {
704                 this(name, properties, Template.forString(properties.getProperty("condition")));
705             }
706 
707             private RetrieveQuery(final String name, final Properties properties,
708                     @Nullable final Template condition) {
709 
710                 super(name, properties, condition.getVariables(), ImmutableList.of("size"));
711 
712                 final String offset = properties.getProperty("offset");
713                 final String limit = properties.getProperty("limit");
714 
715                 List<URI> props = null;
716                 if (properties.containsKey("properties")) {
717                     props = Lists.newArrayList();
718                     for (final String token : Splitter.onPattern("[ ,;]").omitEmptyStrings()
719                             .trimResults().split(properties.getProperty("properties"))) {
720                         props.add((URI) Statements.parseValue(token));
721                     }
722                 }
723 
724                 this.layer = (URI) Statements.parseValue(properties.getProperty("layer"),
725                         Namespaces.DEFAULT);
726                 this.condition = condition;
727                 this.offset = offset == null ? null : Long.parseLong(offset);
728                 this.limit = limit == null ? null : Long.parseLong(limit);
729                 this.properties = props;
730             }
731 
732             @Override
733             void doEvaluate(final Session session, final BindingSet input,
734                     final MapBindingSet output) throws Throwable {
735 
736                 final String condition = Strings.nullToEmpty(this.condition.instantiate(input));
737 
738                 long numTriples = 0L;
739                 try {
740                     // FIXME: conditions do not seem to work
741                     final Stream<Record> stream = session.retrieve(this.layer)
742                             .condition(condition).offset(this.offset).limit(this.limit)
743                             .properties(this.properties).exec();
744                     numTriples = Record.encode(stream, ImmutableList.of(this.layer)).count();
745                     if (numTriples == 0) {
746                         LOGGER.warn("No results for RETRIEVE request, layer "
747                                 + TestUtil.format(this.layer) + ", condition '" + condition
748                                 + "', offset " + this.offset + ", limit " + this.limit);
749                     }
750 
751                 } catch (final Throwable ex) {
752                     throw new RuntimeException("Failed RETRIEVE " + TestUtil.format(this.layer)
753                             + ", condition " + condition + ", offset " + this.offset + ", limit"
754                             + this.limit + ", properties " + this.properties, ex);
755                 } finally {
756                     output.addBinding("size", FACTORY.createLiteral(numTriples));
757                 }
758             }
759         }
760 
761         private static class LookupQuery extends Query {
762 
763             private final URI layer;
764 
765             @Nullable
766             private final Template id;
767 
768             @Nullable
769             private final List<URI> properties;
770 
771             LookupQuery(final String name, final Properties properties) {
772                 this(name, properties, Template.forString(properties.getProperty("id")));
773             }
774 
775             private LookupQuery(final String name, final Properties properties,
776                     @Nullable final Template id) {
777 
778                 super(name, properties, id.getVariables(), ImmutableList.of("size"));
779 
780                 List<URI> props = null;
781                 if (properties.containsKey("properties")) {
782                     props = Lists.newArrayList();
783                     for (final String token : Splitter.onPattern("[ ,;]").omitEmptyStrings()
784                             .trimResults().split(properties.getProperty("properties"))) {
785                         props.add((URI) Statements.parseValue(token));
786                     }
787                 }
788 
789                 this.layer = (URI) Statements.parseValue(properties.getProperty("layer"),
790                         Namespaces.DEFAULT);
791                 this.id = id;
792                 this.properties = props;
793             }
794 
795             @Override
796             void doEvaluate(final Session session, final BindingSet input,
797                     final MapBindingSet output) throws Throwable {
798 
799                 final URI id = (URI) Statements.parseValue(this.id.instantiate(input),
800                         Namespaces.DEFAULT);
801 
802                 long numTriples = 0L;
803                 try {
804                     final Stream<Record> stream = session.retrieve(this.layer).ids(id)
805                             .properties(this.properties).exec();
806                     numTriples = Record.encode(stream, ImmutableList.of(this.layer)).count();
807                     if (numTriples == 0) {
808                         LOGGER.warn("No results for LOOKUP request, layer "
809                                 + TestUtil.format(this.layer) + ", id " + id);
810                     }
811 
812                 } catch (final Throwable ex) {
813                     throw new RuntimeException("Failed LOOKUP " + TestUtil.format(this.layer)
814                             + ", id " + TestUtil.format(id) + ", properties " + this.properties,
815                             ex);
816                 } finally {
817                     output.addBinding("size", FACTORY.createLiteral(numTriples));
818                 }
819             }
820 
821         }
822 
823         private static class LookupAllQuery extends Query {
824 
825             @Nullable
826             private final Template id;
827 
828             LookupAllQuery(final String name, final Properties properties) {
829                 this(name, properties, Template.forString(properties.getProperty("id")));
830             }
831 
832             private LookupAllQuery(final String name, final Properties properties,
833                     @Nullable final Template id) {
834                 super(name, properties, id.getVariables(), ImmutableList.of("size"));
835                 this.id = id;
836             }
837 
838             @Override
839             void doEvaluate(final Session session, final BindingSet input,
840                     final MapBindingSet output) throws Throwable {
841 
842                 final URI id = (URI) Statements.parseValue(this.id.instantiate(input),
843                         Namespaces.DEFAULT);
844 
845                 long numTriples = 0L;
846                 try {
847                     numTriples += Record.encode(session.retrieve(KS.RESOURCE).ids(id).exec(),
848                             ImmutableList.of(KS.RESOURCE)).count();
849                     numTriples += Record.encode(
850                             session.retrieve(KS.MENTION).condition("ks:mentionOf = $$", id)
851                                     .limit(100000L).exec(), ImmutableList.of(KS.MENTION)).count();
852                     if (numTriples == 0) {
853                         LOGGER.warn("No results for LOOKUP ALL request, id " + id);
854                     }
855                 } catch (final Throwable ex) {
856                     throw new RuntimeException("Failed LOOKUP ALL, id " + TestUtil.format(id), ex);
857                 } finally {
858                     output.addBinding("size", FACTORY.createLiteral(numTriples));
859                 }
860             }
861         }
862 
863         private static class CountQuery extends Query {
864 
865             private final URI layer;
866 
867             @Nullable
868             private final Template condition;
869 
870             CountQuery(final String name, final Properties properties) {
871                 this(name, properties, Template.forString(properties.getProperty("condition")));
872             }
873 
874             private CountQuery(final String name, final Properties properties,
875                     @Nullable final Template condition) {
876 
877                 super(name, properties, condition.getVariables(), ImmutableList.of("size"));
878 
879                 this.layer = (URI) Statements.parseValue(properties.getProperty("layer"));
880                 this.condition = condition;
881             }
882 
883             @Override
884             void doEvaluate(final Session session, final BindingSet input,
885                     final MapBindingSet output) throws Throwable {
886 
887                 final String condition = Strings.nullToEmpty(this.condition.instantiate(input));
888                 long numResults = 0L;
889                 try {
890                     numResults = session.count(this.layer).condition(condition).exec();
891                     if (numResults == 0) {
892                         LOGGER.warn("No results for COUNT request, layer "
893                                 + TestUtil.format(this.layer) + ", condition '" + condition + "'");
894                     }
895                 } catch (final Throwable ex) {
896                     throw new RuntimeException("Count " + TestUtil.format(this.layer) + " where "
897                             + condition + " failed", ex);
898                 } finally {
899                     output.addBinding("size", FACTORY.createLiteral(numResults));
900                 }
901             }
902 
903         }
904 
905         private static final class SparqlQuery extends Query {
906 
907             private final Template query;
908 
909             private final String form;
910 
911             SparqlQuery(final String name, final Properties properties) {
912                 this(name, properties, Template.forString(properties.getProperty("query")));
913             }
914 
915             private SparqlQuery(final String name, final Properties properties,
916                     final Template query) {
917                 super(name, properties, query.getVariables(), ImmutableList.of("size"));
918                 this.query = query;
919                 this.form = detectQueryForm(query.getText());
920             }
921 
922             private static String detectQueryForm(final String query) {
923 
924                 final int length = query.length();
925 
926                 int start = 0;
927                 while (start < length) {
928                     final char ch = query.charAt(start);
929                     if (ch == '#') { // comment
930                         while (start < length && query.charAt(start) != '\n') {
931                             ++start;
932                         }
933                     } else if (ch == 'p' || ch == 'b' || ch == 'P' || ch == 'B') { // prefix/base
934                         while (start < length && query.charAt(start) != '>') {
935                             ++start;
936                         }
937                     } else if (!Character.isWhitespace(ch)) { // found
938                         break;
939                     }
940                     ++start;
941                 }
942 
943                 for (int i = start; i < query.length(); ++i) {
944                     final char ch = query.charAt(i);
945                     if (Character.isWhitespace(ch)) {
946                         final String form = query.substring(start, i).toLowerCase();
947                         if (!"select".equals(form) && !"construct".equals(form)
948                                 && !"describe".equals(form) && !"ask".equals(form)) {
949                             throw new IllegalArgumentException("Unknown query form: " + form);
950                         }
951                         return form;
952                     }
953                 }
954 
955                 throw new IllegalArgumentException("Cannot detect query form");
956             }
957 
958             @Override
959             void doEvaluate(final Session session, final BindingSet input,
960                     final MapBindingSet output) throws Throwable {
961 
962                 long numResults = 0;
963                 final String queryString = this.query.instantiate(input);
964                 final Sparql operation = session.sparql(queryString).timeout(getTimeout());
965 
966                 try {
967                     switch (this.form) {
968                     case "select":
969                         numResults = operation.execTuples().count();
970                         break;
971                     case "construct":
972                     case "describe":
973                         numResults = operation.execTriples().count();
974                         break;
975                     case "ask":
976                         operation.execBoolean();
977                         numResults = 1;
978                         break;
979                     default:
980                         throw new Error();
981                     }
982                     if (numResults == 0) {
983                         LOGGER.warn("No results for SPARQL request, query is\n" + queryString);
984                     }
985                     output.addBinding("size", FACTORY.createLiteral(numResults));
986                 } catch (final Throwable ex) {
987                     throw new RuntimeException("Failed SPARQL, form " + this.form.toUpperCase()
988                             + ", query:\n" + queryString, ex);
989                 } finally {
990                 }
991             }
992 
993         }
994 
995         private static final class Template {
996 
997             private static final Pattern PATTERN = Pattern.compile("\\$\\{([^\\}]+)\\}");
998 
999             private static Template EMPTY = new Template("");
1000 
1001             private final String text;
1002 
1003             private final String[] placeholderVariables;
1004 
1005             private final Set<String> variables;
1006 
1007             private Template(final String string) {
1008                 Preconditions.checkNotNull(string);
1009                 final List<String> variables = Lists.newArrayList();
1010                 final StringBuilder builder = new StringBuilder();
1011                 final Matcher matcher = PATTERN.matcher(string);
1012                 int offset = 0;
1013                 while (matcher.find()) {
1014                     builder.append(string.substring(offset, matcher.start()).replace("%", "%%"));
1015                     builder.append("%s");
1016                     variables.add(matcher.group(1));
1017                     offset = matcher.end();
1018                 }
1019                 builder.append(string.substring(offset).replace("%", "%%"));
1020                 this.text = builder.toString();
1021                 this.placeholderVariables = variables.toArray(new String[variables.size()]);
1022                 this.variables = ImmutableSet.copyOf(variables);
1023             }
1024 
1025             static Template forString(@Nullable final String string) {
1026                 return string == null ? EMPTY : new Template(string);
1027             }
1028 
1029             String getText() {
1030                 return this.text;
1031             }
1032 
1033             Set<String> getVariables() {
1034                 return this.variables;
1035             }
1036 
1037             String instantiate(final BindingSet bindings) {
1038                 final Object[] placeholderValues = new String[this.placeholderVariables.length];
1039                 for (int i = 0; i < placeholderValues.length; ++i) {
1040                     final Value value = bindings.getValue(this.placeholderVariables[i]);
1041                     placeholderValues[i] = Data.toString(value, null);
1042                 }
1043                 return String.format(this.text, placeholderValues);
1044             }
1045 
1046         }
1047 
1048     }
1049 
1050     private static final class Statistics {
1051 
1052         private static final String EMPTY = String.format("%-8s", "");
1053 
1054         private final DescriptiveStatistics queryMixTime;
1055 
1056         private final Map<String, QueryInfo> queryInfos;
1057 
1058         private final QueryInfo globalInfo;
1059 
1060         private long elapsedTime;
1061 
1062         public Statistics(final Iterable<String> queryNames) {
1063             this.queryMixTime = new DescriptiveStatistics();
1064             this.queryInfos = Maps.newLinkedHashMap();
1065             this.globalInfo = new QueryInfo();
1066             this.elapsedTime = 0L;
1067             for (final String queryName : queryNames) {
1068                 this.queryInfos.put(queryName, new QueryInfo());
1069             }
1070         }
1071 
1072         public synchronized void reportQueryCompletion(final String queryName,
1073                 final boolean failure, final long time, final long size) {
1074             final QueryInfo info = this.queryInfos.get(queryName);
1075             info.time.addValue(time);
1076             this.globalInfo.time.addValue(time);
1077             if (size >= 0) {
1078                 info.size.addValue(size);
1079                 this.globalInfo.size.addValue(size);
1080             }
1081             if (failure) {
1082                 ++info.numFailures;
1083                 ++this.globalInfo.numFailures;
1084             }
1085         }
1086 
1087         public synchronized void reportQueryMixCompletion(final long time) {
1088             this.queryMixTime.addValue(time);
1089         }
1090 
1091         public synchronized void reportElapsedTest(final long elapsedTime) {
1092             this.elapsedTime = elapsedTime;
1093         }
1094 
1095         @Override
1096         public synchronized String toString() {
1097 
1098             // Compute sum of query execution times
1099             long testTotalTime = 0;
1100             for (final QueryInfo info : this.queryInfos.values()) {
1101                 testTotalTime += (long) info.time.getSum();
1102             }
1103 
1104             // Create and return a statistics table
1105             final StringBuilder builder = new StringBuilder();
1106             emitHeader(builder);
1107             emitSeparator(builder);
1108             for (final Map.Entry<String, QueryInfo> entry : this.queryInfos.entrySet()) {
1109                 emitStats(builder, testTotalTime, entry.getKey(), entry.getValue());
1110             }
1111             emitSeparator(builder);
1112             emitStats(builder, testTotalTime, "query (avg)", this.globalInfo);
1113             emitSeparator(builder);
1114             emitStats(builder, testTotalTime, "query mix", (int) this.queryMixTime.getN(), -1,
1115                     null, this.queryMixTime);
1116             return builder.toString();
1117         }
1118 
1119         private void emitHeader(final StringBuilder builder) {
1120 
1121             builder.append(String.format("%-12s%-16s%-64s%-64s%-24s%-16s\n", "", "   Executions",
1122                     "     Result size [solutions, triples or bytes]", "     Execution time [ms]",
1123                     "     Total time [ms]", "    Rate"));
1124 
1125             builder.append(Strings.repeat(" ", 12));
1126             for (final String field : new String[] { "Total", "Error", "Min", "Q1", "Q2", "Q3",
1127                     "Max", "Geom", "Mean", "Std", "Min", "Q1", "Q2", "Q3", "Max", "Geom", "Mean",
1128                     "Std", "Sum", "Clock", "Share", "/Sec", "/Hour" }) {
1129                 builder.append(String.format("%8s", field));
1130             }
1131             builder.append("\n");
1132         }
1133 
1134         private void emitSeparator(final StringBuilder builder) {
1135             builder.append(Strings.repeat("-", 8 * 23 + 12)).append("\n");
1136         }
1137 
1138         private void emitStats(final StringBuilder builder, final long testTotalTime,
1139                 final String label, final QueryInfo info) {
1140             emitStats(builder, testTotalTime, label, (int) info.time.getN(), info.numFailures,
1141                     info.size, info.time);
1142         }
1143 
1144         private void emitStats(final StringBuilder builder, final long testTotalTime,
1145                 final String label, final int numSuccesses, final int numFailures,
1146                 @Nullable final DescriptiveStatistics size, final DescriptiveStatistics time) {
1147 
1148             builder.append(String.format("%-12s", label));
1149 
1150             builder.append(numSuccesses >= 0 ? String.format("%8d", numSuccesses) : EMPTY);
1151             builder.append(numFailures >= 0 ? String.format("%8d", numFailures) : EMPTY);
1152 
1153             if (size != null) {
1154                 builder.append(String.format("%8d%8d%8d%8d%8d%8.0f%8.0f%8.0f",
1155                         (long) size.getMin(), (long) size.getPercentile(25),
1156                         (long) size.getPercentile(50), (long) size.getPercentile(75),
1157                         (long) size.getMax(), size.getGeometricMean(), size.getMean(),
1158                         size.getStandardDeviation()));
1159             } else {
1160                 builder.append(Strings.repeat(EMPTY, 8));
1161             }
1162 
1163             if (time != null) {
1164                 final long queryTotalTime = (long) time.getSum();
1165                 final double share = (double) queryTotalTime / testTotalTime;
1166                 final long elapsed = (long) (this.elapsedTime * share);
1167                 final double rate = 1000.0 * time.getN() / elapsed;
1168 
1169                 builder.append(String.format("%8d%8d%8d%8d%8d%8.0f%8.0f%8.0f",
1170                         (long) time.getMin(), (long) time.getPercentile(25),
1171                         (long) time.getPercentile(50), (long) time.getPercentile(75),
1172                         (long) time.getMax(), time.getGeometricMean(), time.getMean(),
1173                         time.getStandardDeviation()));
1174 
1175                 builder.append(String.format("%8d%8d%8.2f", queryTotalTime, elapsed, share));
1176                 builder.append(String.format("%8.2f%8.0f", rate, rate * 3600));
1177 
1178             } else {
1179                 builder.append(Strings.repeat(EMPTY, 13));
1180             }
1181 
1182             builder.append("\n");
1183         }
1184 
1185         private static class QueryInfo {
1186 
1187             public final DescriptiveStatistics time = new DescriptiveStatistics();
1188 
1189             public final DescriptiveStatistics size = new DescriptiveStatistics();
1190 
1191             public int numFailures;
1192 
1193         }
1194 
1195     }
1196 
1197 }