Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Simply Reactive - Eventually build a reactive E...

Simply Reactive - Eventually build a reactive Elasticsearch client (Technologieplauscherl)

Christoph Strobl

April 24, 2019

More Decks by Christoph Strobl

Other Decks in Programming


  1. E V E N T U A L LY B

    U I L D A R E A C T I V E 
 E L A S T I C S E A R C H C L I E N T S I M P LY R E A C T I V E C H R I S T O P H S T R O B L , P I V O TA L S O F T WA R E I N C . 
 @ S T R O B L C H R I S T O P H Technologieplauscherl 2019-04-24
  2. I D I O M AT I C C L

    O S E T O E X I S T I N G N O F L U F F E V O LVA B L E D E S I G N G O A L S
  3. R E S T - B U T… R E

    S P O N S E F O R M AT S J S O N PA R S I N G R O U T I N G / R E S O U R C E M G M T O B S TA C L E S
  4. T E C H N O L O G Y

    S TA C K Spring Web Client Web Request Handling Project Reactor Reactive Composition Reactor Netty Transportation Elasticsearch You know, for search…
  5. Why not…? public Mono<SearchResponse> search(SearchRequest request) { return Mono.create(sink !->

    restHighLevelClient.searchAsync(request, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() { @Override public void onResponse(SearchResponse searchResponse) { sink.success(searchResponse); } @Override public void onFailure(Exception e) { sink.error(e); } })); }
  6. public interface ReactiveElasticsearchClient { /** * Execute a {@link SearchRequest}

    against the {@literal search} API. * * @param consumer never {@literal null}. * @see <a href="https:!//!!www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on * elastic.co!</a> * @return the {@link Flux} emitting {@link SearchHit hits} one by one. !*/ default Flux<SearchHit> search(Consumer<SearchRequest> consumer) { SearchRequest request = new SearchRequest(); consumer.accept(request); return search(request); } /** * Execute the given {@link SearchRequest} against the {@literal search} API. * * @param searchRequest must not be {@literal null}. * @see <a href="https:!//!!www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on * elastic.co!</a> * @return the {@link Flux} emitting {@link SearchHit hits} one by one. !*/ default Flux<SearchHit> search(SearchRequest searchRequest) { return search(HttpHeaders.EMPTY, searchRequest); } /** * Execute the given {@link SearchRequest} against the {@literal search} API. * * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. * @param searchRequest must not be {@literal null}. * @see <a href="https:!//!!www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on * elastic.co!</a> * @return the {@link Flux} emitting {@link SearchHit hits} one by one. !*/ Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest); !// !!... } Evolvable Idiomatic No Fluff Close To Existing
  7. REST Interface Reactive Communication via WebClient client.search(request !-> request .indices("twitter")

    .types("users") .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) ).subscribe();
  8. REST Interface Reactive Communication via WebClient private Mono<ClientResponse> sendRequest(WebClient webClient,

    String logId, Request request, HttpHeaders headers) { RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) !// .uri(builder !-> { builder = builder.path(request.getEndpoint()); if (!ObjectUtils.isEmpty(request.getParameters())) { for (Entry<String, String> entry : request.getParameters().entrySet()) { builder = builder.queryParam(entry.getKey(), entry.getValue()); } } return builder.build(); }) .attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) !// .headers(theHeaders !-> { theHeaders.addAll(headers); if (request.getOptions() !!= null) { if (!ObjectUtils.isEmpty(request.getOptions().getHeaders())) { request.getOptions().getHeaders().forEach(it !-> theHeaders.add(it.getName(), it.getValue())); } } }); if (request.getEntity() !!= null) { Lazy<String> body = bodyExtractor(request); ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(), body!::get); requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue())); requestBodySpec.body(Mono.fromSupplier(body!::get), String.class); } else { ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters()); } return requestBodySpec !// .exchange() !// .onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build()); } client.search(request !-> request .indices("twitter") .types("users") .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) ).subscribe();
  9. REST Interface Reactive Communication via WebClient private Mono<ClientResponse> sendRequest(WebClient webClient,

    String logId, Request request, HttpHeaders headers) { RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) !// .uri(builder !-> { builder = builder.path(request.getEndpoint()); if (!ObjectUtils.isEmpty(request.getParameters())) { for (Entry<String, String> entry : request.getParameters().entrySet()) { builder = builder.queryParam(entry.getKey(), entry.getValue()); } } return builder.build(); }) .attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) !// .headers(theHeaders !-> { theHeaders.addAll(headers); if (request.getOptions() !!= null) { if (!ObjectUtils.isEmpty(request.getOptions().getHeaders())) { request.getOptions().getHeaders().forEach(it !-> theHeaders.add(it.getName(), it.getValue())); } } }); if (request.getEntity() !!= null) { Lazy<String> body = bodyExtractor(request); ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(), body!::get); requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue())); requestBodySpec.body(Mono.fromSupplier(body!::get), String.class); } else { ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters()); } return requestBodySpec !// .exchange() !// .onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build()); } POST client.search(request !-> request .indices("twitter") .types("users") .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) ).subscribe();
  10. REST Interface Reactive Communication via WebClient private Mono<ClientResponse> sendRequest(WebClient webClient,

    String logId, Request request, HttpHeaders headers) { RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) !// .uri(builder !-> { builder = builder.path(request.getEndpoint()); if (!ObjectUtils.isEmpty(request.getParameters())) { for (Entry<String, String> entry : request.getParameters().entrySet()) { builder = builder.queryParam(entry.getKey(), entry.getValue()); } } return builder.build(); }) .attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) !// .headers(theHeaders !-> { theHeaders.addAll(headers); if (request.getOptions() !!= null) { if (!ObjectUtils.isEmpty(request.getOptions().getHeaders())) { request.getOptions().getHeaders().forEach(it !-> theHeaders.add(it.getName(), it.getValue())); } } }); if (request.getEntity() !!= null) { Lazy<String> body = bodyExtractor(request); ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(), body!::get); requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue())); requestBodySpec.body(Mono.fromSupplier(body!::get), String.class); } else { ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters()); } return requestBodySpec !// .exchange() !// .onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build()); } POST twitter/users/_search client.search(request !-> request .indices("twitter") .types("users") .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) ).subscribe();
  11. REST Interface Reactive Communication via WebClient private Mono<ClientResponse> sendRequest(WebClient webClient,

    String logId, Request request, HttpHeaders headers) { RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) !// .uri(builder !-> { builder = builder.path(request.getEndpoint()); if (!ObjectUtils.isEmpty(request.getParameters())) { for (Entry<String, String> entry : request.getParameters().entrySet()) { builder = builder.queryParam(entry.getKey(), entry.getValue()); } } return builder.build(); }) .attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) !// .headers(theHeaders !-> { theHeaders.addAll(headers); if (request.getOptions() !!= null) { if (!ObjectUtils.isEmpty(request.getOptions().getHeaders())) { request.getOptions().getHeaders().forEach(it !-> theHeaders.add(it.getName(), it.getValue())); } } }); if (request.getEntity() !!= null) { Lazy<String> body = bodyExtractor(request); ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(), body!::get); requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue())); requestBodySpec.body(Mono.fromSupplier(body!::get), String.class); } else { ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters()); } return requestBodySpec !// .exchange() !// .onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build()); } POST twitter/users/_search { typed_keys=true, ignore_unavailable=false, … } client.search(request !-> request .indices("twitter") .types("users") .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) ).subscribe();
  12. REST Interface Reactive Communication via WebClient private Mono<ClientResponse> sendRequest(WebClient webClient,

    String logId, Request request, HttpHeaders headers) { RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) !// .uri(builder !-> { builder = builder.path(request.getEndpoint()); if (!ObjectUtils.isEmpty(request.getParameters())) { for (Entry<String, String> entry : request.getParameters().entrySet()) { builder = builder.queryParam(entry.getKey(), entry.getValue()); } } return builder.build(); }) .attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) !// .headers(theHeaders !-> { theHeaders.addAll(headers); if (request.getOptions() !!= null) { if (!ObjectUtils.isEmpty(request.getOptions().getHeaders())) { request.getOptions().getHeaders().forEach(it !-> theHeaders.add(it.getName(), it.getValue())); } } }); if (request.getEntity() !!= null) { Lazy<String> body = bodyExtractor(request); ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(), body!::get); requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue())); requestBodySpec.body(Mono.fromSupplier(body!::get), String.class); } else { ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters()); } return requestBodySpec !// .exchange() !// .onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build()); } POST twitter/users/_search { typed_keys=true, ignore_unavailable=false, … } { query : { match_all: { boost :1.0 } } } client.search(request !-> request .indices("twitter") .types("users") .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) ).subscribe();
  13. JSON, NDJSON, YAML, CBOR and Smile The XContentParser private <T>

    Publisher<? extends T> readResponseBody(String logId, Request request, ClientResponse response, Class<T> responseType) { if (RawActionResponse.class.equals(responseType)) { ClientLogger.logRawResponse(logId, response.statusCode()); return Mono.just(responseType.cast(RawActionResponse.create(response))); } if (response.statusCode().is5xxServerError()) { ClientLogger.logRawResponse(logId, response.statusCode()); return handleServerError(request, response); } return response.body(BodyExtractors.toMono(byte[].class)) !// .map(it !-> new String(it, StandardCharsets.UTF_8)) !// .doOnNext(it !-> ClientLogger.logResponse(logId, response.statusCode(), it)) !// .flatMap(content !-> doDecode(response, responseType, content)); } private static <T> Mono<T> doDecode(ClientResponse response, Class<T> responseType, String content) { String mediaType = response.headers().contentType().map(MediaType!::toString).orElse(XContentType.JSON.mediaType()); try { Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class); return Mono.justOrEmpty(responseType .cast(ReflectionUtils.invokeMethod(fromXContent, responseType, createParser(mediaType, content)))); } catch (Throwable errorParseFailure) { !// cause elasticsearch also uses AssertionError try { return Mono.error(BytesRestResponse.errorFromXContent(createParser(mediaType, content))); } catch (Exception e) { return Mono .error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value()))); } } } { "took": 57, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 2, "max_score": 1, "hits": [ { "_index": "twitter", "_type": "users", "_id": "3b6689bb-4a6a-4225-88a5-44712a6b0272", "_score": 1, "_source": { "name": "spiderman" } }, { "_index": "twitter", "_type": "users", "_id": "ca0a8e08-86bf-40e8-b5e5-90deef1c9e2f", "_score": 1, "_source": { "name": "batman" } } ] } }
  14. private <T> Publisher<? extends T> readResponseBody(String logId, Request request, ClientResponse

    response, Class<T> responseType) { if (RawActionResponse.class.equals(responseType)) { ClientLogger.logRawResponse(logId, response.statusCode()); return Mono.just(responseType.cast(RawActionResponse.create(response))); } if (response.statusCode().is5xxServerError()) { ClientLogger.logRawResponse(logId, response.statusCode()); return handleServerError(request, response); } return response.body(BodyExtractors.toMono(byte[].class)) !// .map(it !-> new String(it, StandardCharsets.UTF_8)) !// .doOnNext(it !-> ClientLogger.logResponse(logId, response.statusCode(), it)) !// .flatMap(content !-> doDecode(response, responseType, content)); } JSON, NDJSON, YAML, CBOR and Smile …but… { "took": 57, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 2, "max_score": 1, "hits": [ { "_index": "twitter", "_type": "users", "_id": "3b6689bb-4a6a-4225-88a5-44712a6b0272", "_score": 1, "_source": { "name": "spiderman" } }, { "_index": "twitter", "_type": "users", "_id": "ca0a8e08-86bf-40e8-b5e5-90deef1c9e2f", "_score": 1, "_source": { "name": "batman" } } ] } } J S O N S T R E A M I N G Maybe in 8.0? Please!
  15. Resources Routing and Management ClientConfiguration config = ClientConfiguration.builder() .connectedTo("localhost:9200", "localhost:9201")

    .build(); ReactiveElasticsearchClient client = ReactiveRestClients.create(config); client.search(request !-> request .indices("twitter") .types("users") .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) ).subscribe(); public interface HostProvider { /** * Get the {@link WebClient} connecting to an active host utilizing cached * {@link ElasticsearchHost}. * * @return the {@link Mono} emitting the client for an active host or *. {@link Mono#error(Throwable) an error} if none * found. !*/ default Mono<WebClient> getActive() { return getActive(Verification.LAZY); } /** * Get the {@link WebClient} connecting to an active host. * * @param verification must not be {@literal null}. * @return the {@link Mono} emitting the client for an active host or * {@link Mono#error(Throwable) an error} if none * found. !*/ default Mono<WebClient> getActive(Verification verification) { return lookupActiveHost(verification).map(this!::createWebClient); } !// … } public Mono<ClientResponse> execute(ReactiveElasticsearchClientCallback callback) { return this.hostProvider.getActive() .flatMap(callback!::doWithClient) .onErrorResume(throwable !-> { if (throwable instanceof ConnectException) { return hostProvider.getActive(Verification.ACTIVE) .flatMap(callback!::doWithClient); } return Mono.error(throwable); }); }
  16. Logging Reactive Communication via WebClient private Mono<ClientResponse> sendRequest(WebClient webClient, String

    logId, Request request, HttpHeaders headers) { RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) !// .uri(builder !-> { builder = builder.path(request.getEndpoint()); if (!ObjectUtils.isEmpty(request.getParameters())) { for (Entry<String, String> entry : request.getParameters().entrySet()) { builder = builder.queryParam(entry.getKey(), entry.getValue()); } } return builder.build(); }) .attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) !// .headers(theHeaders !-> { theHeaders.addAll(headers); if (request.getOptions() !!= null) { if (!ObjectUtils.isEmpty(request.getOptions().getHeaders())) { request.getOptions().getHeaders().forEach(it !-> theHeaders.add(it.getName(), it.getValue())); } } }); if (request.getEntity() !!= null) { Lazy<String> body = bodyExtractor(request); ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(), body!::get); requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue())); requestBodySpec.body(Mono.fromSupplier(body!::get), String.class); } else { ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters()); } return requestBodySpec !// .exchange() !// .onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build()); } client.search(request !-> request .indices("twitter") .types("users") .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) ).subscribe();
  17. Logging Reactive Communication via WebClient private Mono<ClientResponse> sendRequest(WebClient webClient, String

    logId, Request request, HttpHeaders headers) { RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) !// .uri(builder !-> { builder = builder.path(request.getEndpoint()); if (!ObjectUtils.isEmpty(request.getParameters())) { for (Entry<String, String> entry : request.getParameters().entrySet()) { builder = builder.queryParam(entry.getKey(), entry.getValue()); } } return builder.build(); }) .attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) !// .headers(theHeaders !-> { theHeaders.addAll(headers); if (request.getOptions() !!= null) { if (!ObjectUtils.isEmpty(request.getOptions().getHeaders())) { request.getOptions().getHeaders().forEach(it !-> theHeaders.add(it.getName(), it.getValue())); } } }); if (request.getEntity() !!= null) { Lazy<String> body = bodyExtractor(request); ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(), body!::get); requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue())); requestBodySpec.body(Mono.fromSupplier(body!::get), String.class); } else { ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters()); } return requestBodySpec !// .exchange() !// .onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build()); } client.search(request !-> request .indices("twitter") .types("users") .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) ).subscribe(); TRACE o.s.d.e.client.WIRE: 82 - [667a467f] Sending request POST /twitter/users/_search with parameters: {typed_keys=true, …} Request body: {„query":{"match_all":{"boost":1.0}}} TRACE o.s.d.e.client.WIRE: 110 - [667a467f] Received response: 200 OK Response body: {„took“:57,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":2, …..
  18. V E R S I O N : : 

    3 . 2 . 0 . M 3 G I T H U B : : 
 s p r i n g - d a t a - e l a s t i c s e a rc h M A I N : : 
 R e a c t i v e R e s t C l i e n t s C O N TA C T: : 
 @ s t ro b l c h r i s t o p h