Publisher<? extends T> readResponseBody(String logId, Request request, ClientResponse response, Class<T> responseType) { if (RawActionResponse.class.equals(responseType)) { ClientLogger.logRawResponse(logId, response.statusCode()); return Mono.just(responseType.cast(RawActionResponse.create(response))); } if (response.statusCode().is5xxServerError()) { ClientLogger.logRawResponse(logId, response.statusCode()); return handleServerError(request, response); } return response.body(BodyExtractors.toMono(byte[].class)) !// .map(it !-> new String(it, StandardCharsets.UTF_8)) !// .doOnNext(it !-> ClientLogger.logResponse(logId, response.statusCode(), it)) !// .flatMap(content !-> doDecode(response, responseType, content)); } private static <T> Mono<T> doDecode(ClientResponse response, Class<T> responseType, String content) { String mediaType = response.headers().contentType().map(MediaType!::toString).orElse(XContentType.JSON.mediaType()); try { Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class); return Mono.justOrEmpty(responseType .cast(ReflectionUtils.invokeMethod(fromXContent, responseType, createParser(mediaType, content)))); } catch (Throwable errorParseFailure) { !// cause elasticsearch also uses AssertionError try { return Mono.error(BytesRestResponse.errorFromXContent(createParser(mediaType, content))); } catch (Exception e) { return Mono .error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value()))); } } } { "took": 57, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 2, "max_score": 1, "hits": [ { "_index": "twitter", "_type": "users", "_id": "3b6689bb-4a6a-4225-88a5-44712a6b0272", "_score": 1, "_source": { "name": "spiderman" } }, { "_index": "twitter", "_type": "users", "_id": "ca0a8e08-86bf-40e8-b5e5-90deef1c9e2f", "_score": 1, "_source": { "name": "batman" } } ] } }