1   package eu.fbk.knowledgestore.client;
2   
3   import com.google.common.base.Charsets;
4   import com.google.common.base.MoreObjects;
5   import com.google.common.base.Preconditions;
6   import com.google.common.base.Strings;
7   import com.google.common.collect.ImmutableSet;
8   import com.google.common.collect.Maps;
9   import com.google.common.escape.Escaper;
10  import com.google.common.io.BaseEncoding;
11  import com.google.common.net.HttpHeaders;
12  import com.google.common.net.UrlEscapers;
13  import eu.fbk.knowledgestore.AbstractKnowledgeStore;
14  import eu.fbk.knowledgestore.AbstractSession;
15  import eu.fbk.knowledgestore.Outcome;
16  import eu.fbk.knowledgestore.Outcome.Status;
17  import eu.fbk.knowledgestore.Session;
18  import eu.fbk.knowledgestore.data.*;
19  import eu.fbk.knowledgestore.internal.Util;
20  import eu.fbk.knowledgestore.internal.jaxrs.Protocol;
21  import eu.fbk.knowledgestore.internal.jaxrs.Serializer;
22  import eu.fbk.knowledgestore.internal.rdf.RDFUtil;
23  import eu.fbk.knowledgestore.vocabulary.NIE;
24  import org.apache.http.client.config.RequestConfig;
25  import org.apache.http.config.RegistryBuilder;
26  import org.apache.http.conn.HttpClientConnectionManager;
27  import org.apache.http.conn.socket.ConnectionSocketFactory;
28  import org.apache.http.conn.socket.PlainConnectionSocketFactory;
29  import org.apache.http.conn.ssl.DefaultHostnameVerifier;
30  import org.apache.http.conn.ssl.NoopHostnameVerifier;
31  import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
32  import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
33  import org.glassfish.jersey.apache.connector.ApacheClientProperties;
34  import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
35  import org.glassfish.jersey.client.ClientConfig;
36  import org.glassfish.jersey.client.ClientProperties;
37  import org.glassfish.jersey.client.RequestEntityProcessing;
38  import org.glassfish.jersey.message.GZipEncoder;
39  import org.openrdf.model.Statement;
40  import org.openrdf.model.URI;
41  import org.openrdf.model.Value;
42  import org.openrdf.query.BindingSet;
43  import org.openrdf.rio.RDFFormat;
44  import org.slf4j.Logger;
45  import org.slf4j.LoggerFactory;
46  
47  import javax.annotation.Nullable;
48  import javax.net.ssl.HostnameVerifier;
49  import javax.net.ssl.SSLContext;
50  import javax.net.ssl.TrustManager;
51  import javax.net.ssl.X509TrustManager;
52  import javax.ws.rs.HttpMethod;
53  import javax.ws.rs.WebApplicationException;
54  import javax.ws.rs.client.ClientBuilder;
55  import javax.ws.rs.client.Entity;
56  import javax.ws.rs.client.Invocation;
57  import javax.ws.rs.client.ResponseProcessingException;
58  import javax.ws.rs.core.*;
59  import java.io.InputStream;
60  import java.lang.reflect.Type;
61  import java.security.cert.X509Certificate;
62  import java.text.SimpleDateFormat;
63  import java.util.Date;
64  import java.util.Map;
65  import java.util.Set;
66  import java.util.concurrent.atomic.AtomicReference;
67  
68  // TODO: decide where to place the Configuration class
69  
70  public final class Client extends AbstractKnowledgeStore {
71  
72      private static final Logger LOGGER = LoggerFactory.getLogger(Client.class);
73  
74      private static final String USER_AGENT = String.format(
75              "KnowledgeStore/%s Apache-HttpClient/%s",
76              Util.getVersion("eu.fbk.knowledgestore", "ks-core", "devel"),
77              Util.getVersion("org.apache.httpcomponents", "httpclient", "unknown"));
78  
79      private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
80  
81      private static final String MIME_TYPE_RDF = "application/x-tql"; // "text/turtle";
82  
83      private static final String MIME_TYPE_TUPLE = "text/tab-separated-values";
84  
85      private static final String MIME_TYPE_BOOLEAN = "text/boolean";
86  
87      private static final int DEFAULT_MAX_CONNECTIONS = 2;
88  
89      private static final boolean DEFAULT_VALIDATE_SERVER = true;
90  
91      private static final int DEFAULT_CONNECTION_TIMEOUT = 1000; // 1 sec
92      private static final int DEFAULT_SOCKET_TIMEOUT = 10000; // 10 sec
93  
94      private static final boolean DEFAULT_COMPRESSION_ENABLED = LoggerFactory.getLogger(
95              "org.apache.http.wire").isDebugEnabled();;
96  
97      private final String serverURL;
98  
99      private final boolean compressionEnabled;
100 
101     private final HttpClientConnectionManager connectionManager;
102 
103     private final javax.ws.rs.client.Client client;
104 
105     private final Map<String, String> targets; // path -> URI
106 
107     private Client(final Builder builder) {
108 
109         String url = Preconditions.checkNotNull(builder.serverURL);
110         if (url.endsWith("/")) {
111             url = url.substring(0, url.length() - 1);
112         }
113 
114         final int timeout;
115         timeout = MoreObjects.firstNonNull(builder.connectionTimeout, DEFAULT_CONNECTION_TIMEOUT);
116         Preconditions.checkArgument(timeout >= 0, "Invalid connection timeout %d", timeout);
117 
118         final int socketTimeout;
119         socketTimeout = MoreObjects.firstNonNull(builder.socketTimeout, DEFAULT_SOCKET_TIMEOUT);
120         Preconditions.checkArgument(socketTimeout >= 0, "Invalid connection timeout %d", socketTimeout);
121 
122         this.serverURL = url;
123         this.compressionEnabled = MoreObjects.firstNonNull(builder.compressionEnabled,
124                 DEFAULT_COMPRESSION_ENABLED);
125         this.connectionManager = createConnectionManager(
126                 MoreObjects.firstNonNull(builder.maxConnections, DEFAULT_MAX_CONNECTIONS),
127                 MoreObjects.firstNonNull(builder.validateServer, DEFAULT_VALIDATE_SERVER));
128         this.client = createJaxrsClient(this.connectionManager, timeout, socketTimeout, builder.proxy);
129         this.targets = Maps.newConcurrentMap();
130     }
131 
132     public synchronized String getServerURL() {
133         checkNotClosed();
134         return this.serverURL;
135     }
136 
137     @Override
138     protected Session doNewSession(@Nullable final String username, @Nullable final String password) {
139         return new SessionImpl(username, password);
140     }
141 
142     @Override
143     protected void doClose() {
144         try {
145             this.client.close();
146         } finally {
147             this.connectionManager.shutdown();
148         }
149     }
150 
151     private static PoolingHttpClientConnectionManager createConnectionManager(
152             final int maxConnections, final boolean validateServer) {
153 
154         // Setup SSLContext and HostnameVerifier based on validateServer parameter
155         final SSLContext sslContext;
156         HostnameVerifier hostVerifier;
157         try {
158             if (validateServer) {
159                 sslContext = SSLContext.getDefault();
160                 hostVerifier = new DefaultHostnameVerifier();
161             } else {
162                 sslContext = SSLContext.getInstance(Protocol.HTTPS_PROTOCOLS[0]);
163                 sslContext.init(null, new TrustManager[] { new X509TrustManager() {
164 
165                     @Override
166                     public void checkClientTrusted(final X509Certificate[] chain,
167                             final String authType) {
168                     }
169 
170                     @Override
171                     public void checkServerTrusted(final X509Certificate[] chain,
172                             final String authType) {
173                     }
174 
175                     @Override
176                     public X509Certificate[] getAcceptedIssuers() {
177                         return null;
178                     }
179 
180                 } }, null);
181                 hostVerifier = NoopHostnameVerifier.INSTANCE;
182             }
183         } catch (final Throwable ex) {
184             throw new RuntimeException("SSL configuration failed", ex);
185         }
186 
187         // Create HTTP connection factory
188         final ConnectionSocketFactory httpConnectionFactory = PlainConnectionSocketFactory
189                 .getSocketFactory();
190 
191         // Create HTTPS connection factory
192         final ConnectionSocketFactory httpsConnectionFactory = new SSLConnectionSocketFactory(
193                 sslContext, Protocol.HTTPS_PROTOCOLS, Protocol.HTTPS_CIPHER_SUITES, hostVerifier);
194 
195         // Create pooled connection manager
196         final PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager(
197                 RegistryBuilder.<ConnectionSocketFactory>create()
198                         .register("http", httpConnectionFactory)
199                         .register("https", httpsConnectionFactory).build());
200 
201         // Setup max concurrent connections
202         manager.setMaxTotal(maxConnections);
203         manager.setDefaultMaxPerRoute(maxConnections);
204         manager.setValidateAfterInactivity(1000); // validate connection after 1s idle
205         return manager;
206     }
207 
208     private static javax.ws.rs.client.Client createJaxrsClient(
209             final HttpClientConnectionManager connectionManager, final int connectionTimeout,
210             final int socketTimeout, @Nullable final ProxyConfig proxy) {
211 
212         // Configure requests
213         final RequestConfig requestConfig = RequestConfig.custom()//
214                 .setExpectContinueEnabled(false) //
215                 .setRedirectsEnabled(false) //
216                 .setConnectionRequestTimeout(connectionTimeout) //
217                 .setConnectTimeout(connectionTimeout) //
218                 .setSocketTimeout(socketTimeout)
219                 .build();
220 
221         // Configure client
222         final ClientConfig config = new ClientConfig();
223         config.connectorProvider(new ApacheConnectorProvider());
224         config.property(ApacheClientProperties.CONNECTION_MANAGER, connectionManager);
225         config.property(ApacheClientProperties.REQUEST_CONFIG, requestConfig);
226         config.property(ApacheClientProperties.DISABLE_COOKIES, true); // not needed
227         config.property(ClientProperties.REQUEST_ENTITY_PROCESSING,
228                 RequestEntityProcessing.CHUNKED); // required to stream data to the server
229         if (proxy != null) {
230             config.property(ClientProperties.PROXY_URI, proxy.getURL());
231             config.property(ClientProperties.PROXY_USERNAME, proxy.getUsername());
232             config.property(ClientProperties.PROXY_PASSWORD, proxy.getPassword());
233         }
234 
235         // Register filter and custom serializer
236         config.register(Serializer.class);
237         config.register(GZipEncoder.class);
238 
239         // Create and return a configured JAX-RS client
240         return ClientBuilder.newClient(config);
241     }
242 
243     private final class SessionImpl extends AbstractSession {
244 
245         private final String authorization;
246 
247         SessionImpl(@Nullable final String username, @Nullable final String password) {
248             super(Data.newNamespaceMap(Data.newNamespaceMap(), Data.getNamespaceMap()), username,
249                     password);
250             final String actualUsername = MoreObjects.firstNonNull(username, "");
251             final String actualPassword = MoreObjects.firstNonNull(password, "");
252             final String authorizationString = actualUsername + ":" + actualPassword;
253             final byte[] authorizationBytes = authorizationString.getBytes(Charsets.ISO_8859_1);
254             this.authorization = "Basic " + BaseEncoding.base64().encode(authorizationBytes);
255         }
256 
257         @Override
258         protected Status doFail(final Throwable ex, final AtomicReference<String> message)
259                 throws Throwable {
260 
261             if (ex instanceof WebApplicationException) {
262                 final Response response = ((WebApplicationException) ex).getResponse();
263                 try {
264                     final RDFFormat format = RDFFormat.forMIMEType(response.getMediaType()
265                             .toString());
266                     final Outcome outcome = Outcome.decode(
267                             RDFUtil.readRDF((InputStream) response.getEntity(), format, null,
268                                     null, false), false).getUnique();
269                     message.set(outcome.getMessage());
270                     return outcome.getStatus();
271                 } catch (final Throwable ex2) {
272                     LOGGER.error("Unable to decode error body", ex2);
273                     return Status.valueOf(response.getStatus());
274                 } finally {
275                     response.close();
276                 }
277 
278             } else if (ex instanceof ResponseProcessingException) {
279                 final Response response = ((ResponseProcessingException) ex).getResponse();
280                 try {
281                     final StringBuilder builder = new StringBuilder(
282                             "Client side error (server response: ");
283                     builder.append(response.getStatus());
284                     if (response.hasEntity()) {
285                         final String etag = response.getHeaderString(HttpHeaders.ETAG);
286                         builder.append(", ").append(etag != null ? etag : response.getMediaType());
287                         final Date lastModified = response.getLastModified();
288                         if (lastModified != null) {
289                             synchronized (DATE_FORMAT) {
290                                 builder.append(", ").append(DATE_FORMAT.format(lastModified));
291                             }
292                         }
293                     }
294                     message.set(builder.toString());
295                     return Status.valueOf(response.getStatus());
296                 } finally {
297                     response.close();
298                 }
299 
300             } else {
301                 return super.doFail(ex, message);
302             }
303         }
304 
305         @Override
306         @Nullable
307         protected Representation doDownload(@Nullable final Long timeout, final URI id,
308                 @Nullable final Set<String> mimeTypes, final boolean useCaches) throws Throwable {
309 
310             final String query = query(Protocol.PARAMETER_ID, id);
311 
312             final Map<String, Object> headers = Maps.newHashMap();
313             if (mimeTypes != null) {
314                 headers.put(HttpHeaders.ACCEPT, mimeTypes);
315             }
316             if (!useCaches) {
317                 final CacheControl cacheControl = new CacheControl();
318                 cacheControl.setNoStore(true);
319                 headers.put(HttpHeaders.CACHE_CONTROL, cacheControl);
320             }
321 
322             try {
323                 return invoke(HttpMethod.GET, Protocol.PATH_REPRESENTATIONS, query, headers, null,
324                         new GenericType<Representation>(Representation.class), timeout);
325 
326             } catch (final WebApplicationException ex) {
327                 if (ex.getResponse().getStatus() == 404) {
328                     ex.getResponse().close();
329                     return null;
330                 }
331                 throw ex;
332             }
333         }
334 
335         @Override
336         protected Outcome doUpload(@Nullable final Long timeout, final URI id,
337                 final Representation representation) throws Exception {
338 
339             final String path = Protocol.PATH_REPRESENTATIONS;
340             final String query = query(Protocol.PARAMETER_ID, id);
341             final Entity<?> entity = representation == null ? null : entity(representation);
342             return invoke(HttpMethod.PUT, path, query, null, entity, Protocol.STREAM_OF_OUTCOMES,
343                     timeout).getUnique();
344         }
345 
346         @Override
347         protected long doCount(@Nullable final Long timeout, final URI type,
348                 @Nullable final XPath condition, @Nullable final Set<URI> ids) throws Throwable {
349 
350             final String path = Protocol.pathFor(type) + "/" + Protocol.SUBPATH_COUNT;
351             final String query = query(Protocol.PARAMETER_CONDITION, condition,
352                     Protocol.PARAMETER_ID, ids);
353             final Statement result = invoke(HttpMethod.GET, path, query, null, null,
354                     Protocol.STREAM_OF_STATEMENTS, timeout).getUnique();
355             return Data.convert(result.getObject(), Long.class);
356         }
357 
358         @Override
359         protected Stream<Record> doRetrieve(@Nullable final Long timeout, final URI type,
360                 @Nullable final XPath condition, @Nullable final Set<URI> ids,
361                 @Nullable final Set<URI> properties, @Nullable final Long offset,
362                 @Nullable final Long limit) throws Throwable {
363 
364             final String path = Protocol.pathFor(type);
365             final String query = query(//
366                     Protocol.PARAMETER_CONDITION, condition, //
367                     Protocol.PARAMETER_ID, ids, //
368                     Protocol.PARAMETER_PROPERTY, properties, //
369                     Protocol.PARAMETER_OFFSET, offset, //
370                     Protocol.PARAMETER_LIMIT, limit);
371             final Stream<Record> result = invoke(HttpMethod.GET, path, query, null, null,
372                     Protocol.STREAM_OF_RECORDS, timeout);
373             result.setProperty("types", ImmutableSet.of(type));
374             return result;
375         }
376 
377         @Override
378         protected void doCreate(@Nullable final Long timeout, final URI type,
379                 final Stream<? extends Record> records, final Handler<? super Outcome> handler)
380                 throws Exception {
381 
382             final String path = Protocol.pathFor(type) + "/" + Protocol.SUBPATH_CREATE;
383             final Entity<?> entity = entity(records, type);
384             final Stream<Outcome> result = invoke(HttpMethod.POST, path, null, null, entity,
385                     Protocol.STREAM_OF_OUTCOMES, timeout);
386             result.toHandler(handler);
387         }
388 
389         @Override
390         protected void doMerge(@Nullable final Long timeout, final URI type,
391                 final Stream<? extends Record> records, final Criteria criteria,
392                 final Handler<? super Outcome> handler) throws Exception {
393 
394             final String path = Protocol.pathFor(type) + "/" + Protocol.SUBPATH_MERGE;
395             final String query = query(Protocol.PARAMETER_CRITERIA, criteria);
396             final Entity<?> entity = entity(records, type);
397             final Stream<Outcome> result = invoke(HttpMethod.POST, path, query, null, entity,
398                     Protocol.STREAM_OF_OUTCOMES, timeout);
399             result.toHandler(handler);
400         }
401 
402         @Override
403         protected void doUpdate(@Nullable final Long timeout, final URI type,
404                 final XPath condition, final Set<URI> ids, final Record record,
405                 final Criteria criteria, final Handler<? super Outcome> handler) throws Exception {
406 
407             final String path = Protocol.pathFor(type) + "/" + Protocol.SUBPATH_UPDATE;
408             final String query = query(//
409                     Protocol.PARAMETER_CRITERIA, criteria, //
410                     Protocol.PARAMETER_CONDITION, condition, //
411                     Protocol.PARAMETER_ID, ids);
412             final Entity<?> entity = entity(Stream.create(record), type);
413             final Stream<Outcome> result = invoke(HttpMethod.POST, path, query, null, entity,
414                     Protocol.STREAM_OF_OUTCOMES, timeout);
415             result.toHandler(handler);
416         }
417 
418         @Override
419         protected void doDelete(@Nullable final Long timeout, final URI type,
420                 final XPath condition, final Set<URI> ids, final Handler<? super Outcome> handler)
421                 throws Exception {
422 
423             final String path = Protocol.pathFor(type) + "/" + Protocol.SUBPATH_DELETE;
424             final String query = query(//
425                     Protocol.PARAMETER_CONDITION, condition, //
426                     Protocol.PARAMETER_ID, ids);
427             final Stream<Outcome> result = invoke(HttpMethod.POST, path, query, null, null,
428                     Protocol.STREAM_OF_OUTCOMES, timeout);
429             result.toHandler(handler);
430         }
431 
432         @Override
433         protected Stream<Record> doMatch(@Nullable final Long timeout,
434                 final Map<URI, XPath> conditions, final Map<URI, Set<URI>> ids,
435                 final Map<URI, Set<URI>> properties) throws Exception {
436             // TODO
437             throw new UnsupportedOperationException();
438         }
439 
440         @SuppressWarnings("unchecked")
441         @Override
442         protected <T> Stream<T> doSparql(@Nullable final Long timeout, final Class<T> type,
443                 final String expression, final Set<URI> defaultGraphs, final Set<URI> namedGraphs)
444                 throws Exception {
445 
446             final String path = Protocol.PATH_SPARQL;
447             final String query = query(//
448                     Protocol.PARAMETER_QUERY, expression, //
449                     Protocol.PARAMETER_DEFAULT_GRAPH, defaultGraphs, //
450                     Protocol.PARAMETER_NAMED_GRAPH, namedGraphs);
451             GenericType<?> responseType;
452             if (type == Statement.class) {
453                 responseType = Protocol.STREAM_OF_STATEMENTS;
454             } else if (type == BindingSet.class) {
455                 responseType = Protocol.STREAM_OF_TUPLES;
456             } else if (type == Boolean.class) {
457                 responseType = Protocol.STREAM_OF_BOOLEANS;
458             } else {
459                 throw new Error("Unexpected result type: " + type);
460             }
461             return (Stream<T>) invoke(HttpMethod.GET, path, query, null, null, responseType,
462                     timeout);
463         }
464 
465         @Override
466         protected Outcome doSparqlUpdate(@Nullable Long timeout, @Nullable Stream<? extends Statement> statements) throws Throwable {
467             final String path = Protocol.PATH_UPDATE;
468             final GenericEntity<Stream<Statement>> entity = new GenericEntity<Stream<Statement>>((Stream<Statement>) statements, Protocol.STREAM_OF_STATEMENTS.getType());
469             Entity<?> entityEntity = Entity.entity(entity, new Variant(MediaType.valueOf(MIME_TYPE_RDF), (String) null, Client.this.compressionEnabled ? "gzip" : "identity"));
470             return invoke(HttpMethod.POST, path, null, null, entityEntity, Protocol.STREAM_OF_OUTCOMES, timeout).getUnique();
471         }
472 
473         @Override
474         protected Outcome doSparqlDelete(@Nullable Long timeout, @Nullable Stream<? extends Statement> statements) throws Throwable {
475             final String path = Protocol.PATH_DELETE;
476             final GenericEntity<Stream<Statement>> entity = new GenericEntity<Stream<Statement>>((Stream<Statement>) statements, Protocol.STREAM_OF_STATEMENTS.getType());
477             Entity<?> entityEntity = Entity.entity(entity, new Variant(MediaType.valueOf(MIME_TYPE_RDF), (String) null, Client.this.compressionEnabled ? "gzip" : "identity"));
478             return invoke(HttpMethod.POST, path, null, null, entityEntity, Protocol.STREAM_OF_OUTCOMES, timeout).getUnique();
479         }
480 
481         private String query(final Object... queryNameValues) {
482             final StringBuilder builder = new StringBuilder();
483             final Escaper escaper = UrlEscapers.urlFormParameterEscaper();
484             String separator = "?";
485             for (int i = 0; i < queryNameValues.length; i += 2) {
486                 final Object name = queryNameValues[i].toString();
487                 final Object value = queryNameValues[i + 1];
488                 if (value == null) {
489                     continue;
490                 }
491                 final Iterable<?> iterable = value instanceof Iterable<?> ? (Iterable<?>) value
492                         : ImmutableSet.of(value);
493                 for (final Object element : iterable) {
494                     if (element == null) {
495                         continue;
496                     }
497                     String encoded;
498                     if (element instanceof Value && !name.equals(Protocol.PARAMETER_DEFAULT_GRAPH)
499                             && !name.equals(Protocol.PARAMETER_NAMED_GRAPH)) {
500                         encoded = Data.toString(element, Data.getNamespaceMap());
501                     } else {
502                         encoded = element.toString();
503                     }
504                     builder.append(separator).append(name).append("=");
505                     builder.append(escaper.escape(encoded));
506                     separator = "&";
507                 }
508             }
509             return builder.toString();
510         }
511 
512         private Entity<Representation> entity(final Representation representation) {
513             final String mimeType = representation.getMetadata().getUnique(NIE.MIME_TYPE,
514                     String.class, MediaType.APPLICATION_OCTET_STREAM);
515             final Variant variant = new Variant(MediaType.valueOf(mimeType), (String) null,
516                     Client.this.compressionEnabled ? "gzip" : "identity");
517             return Entity.entity(representation, variant);
518         }
519 
520         @SuppressWarnings("unchecked")
521         private Entity<GenericEntity<Stream<Record>>> entity(
522                 final Stream<? extends Record> records, final URI type) {
523 
524             records.setProperty("types", ImmutableSet.of(type));
525             final GenericEntity<Stream<Record>> entity = new GenericEntity<Stream<Record>>(
526                     (Stream<Record>) records, Protocol.STREAM_OF_RECORDS.getType());
527             return Entity.entity(entity, new Variant(MediaType.valueOf(MIME_TYPE_RDF),
528                     (String) null, Client.this.compressionEnabled ? "gzip" : "identity"));
529         }
530 
531         private <T> T invoke(final String method, final String path, @Nullable final String query,
532                 @Nullable final Map<String, Object> headers,
533                 @Nullable final Entity<?> requestEntity, final GenericType<T> responseType,
534                 @Nullable final Long timeout) {
535 
536             // Determine target URI based on path and stored redirections
537             final String action = method + ":" + path;
538             final String target = Client.this.targets.get(action);
539             final String uri = target != null ? target : Client.this.serverURL + "/" + path;
540 
541             // Do a probe first in case we don't know whether the method / URI pair is restricted
542             String actualQuery = query;
543             Entity<?> actualRequestEntity = requestEntity;
544             if (target == null) {
545                 actualQuery = Strings.isNullOrEmpty(query) ? "?probe=true" : query + "&probe=true";
546                 if (requestEntity != null) {
547                     final Variant variant = requestEntity.getVariant();
548                     actualRequestEntity = Entity.entity(new byte[0],
549                             new Variant(variant.getMediaType(), (String) null, "identity"));
550                 }
551             }
552 
553             // Encode timeout
554             if (timeout != null) {
555                 final long timeoutInSeconds = Math.max(1, timeout / 1000);
556                 actualQuery = Strings.isNullOrEmpty(actualQuery) ? "?timeout=" + timeoutInSeconds
557                         : actualQuery + "&timeout=" + timeoutInSeconds;
558             }
559 
560             // Determine Accept MIME type based on expected (Java) response type
561             String acceptType = MediaType.WILDCARD;
562             if (responseType.equals(Protocol.STREAM_OF_RECORDS)
563                     || responseType.equals(Protocol.STREAM_OF_OUTCOMES)
564                     || responseType.equals(Protocol.STREAM_OF_STATEMENTS)) {
565                 acceptType = MIME_TYPE_RDF;
566             } else if (responseType.equals(Protocol.STREAM_OF_TUPLES)) {
567                 acceptType = MIME_TYPE_TUPLE;
568             } else if (responseType.equals(Protocol.STREAM_OF_BOOLEANS)) {
569                 acceptType = MIME_TYPE_BOOLEAN;
570             }
571 
572             // Create an invocation builder for the target URI + query string
573             final Invocation.Builder invoker = Client.this.client.target(
574                     actualQuery == null ? uri : uri + actualQuery).request(acceptType);
575 
576             // Add custom headers, if any.
577             if (headers != null) {
578                 for (final Map.Entry<String, Object> entry : headers.entrySet()) {
579                     invoker.header(entry.getKey(), entry.getValue());
580                 }
581             }
582 
583             // Add invocation ID and User-Agent headers
584             invoker.header(HttpHeaders.USER_AGENT, USER_AGENT);
585             invoker.header(Protocol.HEADER_INVOCATION, getInvocationID().stringValue());
586 
587             // Reject response compression, if disabled
588             invoker.header(HttpHeaders.ACCEPT_ENCODING,
589                     Client.this.compressionEnabled ? "gzip, deflate, identity" : "identity");
590 
591             // Add credentials IFF the HTTPS scheme is used
592             if (uri.startsWith("https")) {
593                 invoker.header(HttpHeaders.AUTHORIZATION, this.authorization);
594             }
595 
596             // Log the request
597             if (LOGGER.isDebugEnabled()) {
598                 final StringBuilder builder = new StringBuilder("Http: ");
599                 builder.append(method).append(' ')
600                         .append(actualQuery == null ? uri : uri + actualQuery);
601                 if (actualRequestEntity != null) {
602                     Type type = actualRequestEntity.getEntity().getClass();
603                     if (type.equals(GenericEntity.class)) {
604                         type = ((GenericEntity<?>) actualRequestEntity.getEntity()).getType();
605                     }
606                     builder.append(' ').append(Util.formatType(type));
607                     builder.append(' ').append(actualRequestEntity.getMediaType());
608                 }
609                 if (getUsername() != null) {
610                     builder.append(' ').append(getUsername());
611                 }
612                 LOGGER.debug(builder.toString());
613             }
614 
615             // Perform the request
616             final long timestamp = System.currentTimeMillis();
617             final Response response = actualRequestEntity == null ? invoker.method(method) : //
618                     invoker.method(method, actualRequestEntity);
619             final long elapsed = System.currentTimeMillis() - timestamp;
620 
621             // Log the response
622             if (LOGGER.isDebugEnabled()) {
623                 final StringBuilder builder = new StringBuilder("Http: ");
624                 builder.append(response.getStatus());
625                 if (response.hasEntity()) {
626                     final String etag = response.getHeaderString(HttpHeaders.ETAG);
627                     builder.append(", ").append(etag != null ? etag : response.getMediaType());
628                     final Date lastModified = response.getLastModified();
629                     if (lastModified != null) {
630                         synchronized (DATE_FORMAT) {
631                             builder.append(", ").append(DATE_FORMAT.format(lastModified));
632                         }
633                     }
634                 }
635                 builder.append(", ").append(elapsed).append(" ms");
636                 LOGGER.debug(builder.toString());
637             }
638 
639             // On redirection, close response, update targets map and try again
640             final int status = response.getStatus();
641             if (status == 302 || status == 307 || status == 308) {
642                 response.close();
643                 String newURI = response.getHeaderString(HttpHeaders.LOCATION);
644                 final int index = newURI.indexOf('?');
645                 newURI = index < 0 ? newURI : newURI.substring(0, index);
646                 Client.this.targets.put(action, newURI);
647                 LOGGER.debug("Http: stored redirection: {} -> {}", path, newURI);
648                 return invoke(method, path, query, headers, requestEntity, responseType, timeout);
649             }
650 
651             // Otherwise, update targets map and either return response or fail
652             Client.this.targets.put(action, uri);
653             if (status / 100 == 2) {
654                 if (Representation.class.isAssignableFrom(responseType.getRawType())) {
655                     response.bufferEntity();
656                 }
657                 final T result = response.readEntity(responseType);
658                 if (result instanceof Stream<?>) {
659                     ((Stream<?>) result).onClose(new Runnable() {
660 
661                         @Override
662                         public void run() {
663                             response.close();
664                         }
665 
666                     });
667                 }
668                 return result;
669             } else {
670                 Util.closeQuietly(response);
671                 throw new WebApplicationException(response);
672             }
673         }
674     }
675 
676     public static Builder builder(final String serverURL) {
677         return new Builder(serverURL);
678     }
679 
680     public static class Builder {
681 
682         String serverURL;
683 
684         @Nullable
685         Integer maxConnections;
686 
687         @Nullable
688         Integer connectionTimeout;
689 
690         @Nullable
691         Integer socketTimeout;
692 
693         @Nullable
694         Boolean compressionEnabled;
695 
696         @Nullable
697         Boolean validateServer;
698 
699         @Nullable
700         ProxyConfig proxy;
701 
702         Builder(final String serverURL) {
703             this.serverURL = Preconditions.checkNotNull(serverURL);
704         }
705 
706         public Builder maxConnections(@Nullable final Integer maxConnections) {
707             this.maxConnections = maxConnections;
708             return this;
709         }
710 
711         public Builder connectionTimeout(@Nullable final Integer connectionTimeout) {
712             this.connectionTimeout = connectionTimeout;
713             return this;
714         }
715 
716         public Builder socketTimeout(@Nullable final Integer socketTimeout) {
717             this.socketTimeout = socketTimeout;
718             return this;
719         }
720 
721         public Builder compressionEnabled(@Nullable final Boolean compressionEnabled) {
722             this.compressionEnabled = compressionEnabled;
723             return this;
724         }
725 
726         public Builder validateServer(@Nullable final Boolean validateServer) {
727             this.validateServer = validateServer;
728             return this;
729         }
730 
731         public Builder proxy(@Nullable final ProxyConfig proxy) {
732             this.proxy = proxy;
733             return this;
734         }
735 
736         public Client build() {
737             return new Client(this);
738         }
739 
740     }
741 
742 }