Creating Elasticsearch Transport Action

2016-08-01
2016-10-20
11 min read

This blog post is part of a series which will teach you:

In the previous article, we discovered how to add a REST plugin. It was a simple implementation as in RestHelloAction class we wrote something like:

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
    String name = restRequest.param("name");
    return channel -> {
        Message message = new Message(name);
        XContentBuilder builder = channel.newBuilder();
        builder.startObject();
        message.toXContent(builder, restRequest);
        builder.endObject();
        channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
    };
}

But actually, you will probably like to execute some actions against a Node, call some internal services… So the implementation we wrote needs to be modified a bit.

Concepts

Before going further, we need to understand some of the concepts and roles behind some classes:

  • Action: defines an action against a Node. It has a unique name and methods which provides a RequestBuilder object and a Response object
  • ActionRequest: represents the request which must be executed (parameters basically). It should be serializable as its execution could potentially be executed on another node in the cluster (depends on your implementation). Imagine you have a Client Node which proxies the execution to the actual cluster. The REST layer will build a Request which will be sent to the cluster.
  • ActionRequestBuilder: manages all the asynchronous aspects (Future, Listeners…)
  • ActionResponse: the object which is sent back to the REST Layer. It also needs to be serializable.
  • TransportAction: where the execution is actually launched.

So basically, the REST layer gets a REST requests, transforms it in an ActionRequest which is forwarded to a TransportAction where the implementation logic is executed (or called). Then an ActionResponse object is created and sent back to the REST layer. The REST layer serialize it as JSON and gives back the response as JSON to the user.

Action Request

We have a parameter coming from the REST layer which is either a String or a JSON content. See the previous article if needed.

Let’s create a HelloRequest class:

public class HelloRequest extends ActionRequest<HelloRequest> {
    private String name;

    public void setName(String name) {
        this.name = name;
    }

    public void setRestContent(BytesReference restContent) {
        // Let's try to find the name from the body
        Map<String, Object> map = XContentHelper.convertToMap(restContent, false).v2();
        if (map.containsKey("name")) {
            name = (String) map.get("name");
        }
    }

    public String getName() {
        return name;
    }

    @Override
    public ActionRequestValidationException validate() {
        return null;
    }
}

validate() method is called to validate a request before it’s sent to a Node. You can imagine here having mandatory fields or parameters for example and fail the request if not provided. For example (we won’t implement that in our example):

@Override
public ActionRequestValidationException validate() {
    ActionRequestValidationException validationException = null;
    if (name == null) {
        validationException = new ActionRequestValidationException();
        validationException.addValidationError("You must provide a name");
        return validationException;
    }
    return null;
}

As we want to ActionRequest being serializable, let’s implement the serialization layer:

@Override
public void readFrom(StreamInput in) throws IOException {
    super.readFrom(in);
    name = in.readOptionalString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
    super.writeTo(out);
    out.writeOptionalString(name);
}

Action Response

So we have the request. Let’s build the response and create a HelloResponse:

public class HelloResponse extends ActionResponse {

    private String message;

    public HelloResponse() {
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public String getMessage() {
        return message;
    }

    @Override
    public void readFrom(StreamInput in) throws IOException {
        super.readFrom(in);
        message = in.readOptionalString();
    }

    @Override
    public void writeTo(StreamOutput out) throws IOException {
        super.writeTo(out);
        out.writeOptionalString(message);
    }
}

This class will hold our response message. It can be serializable between nodes because we implemented again readFrom(StreamInput) and writeTo(StreamOutput) methods.

But that’s not enough to serialize it as a JSON response. To do that, we need to implement ToXContent interface and add toXContent(XContentBuilder, Params) method:

public class HelloResponse extends ActionResponse implements ToXContent {
    // ...

    @Override
    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
        return builder.field("message", message);
    }
}

Action Request Builder

We have to extend ActionRequestBuilder class. Let’s do it and create HelloRequestBuilder class:

public class HelloRequestBuilder extends ActionRequestBuilder<HelloRequest, HelloResponse, HelloRequestBuilder> {

    public HelloRequestBuilder(ElasticsearchClient client, Action<HelloRequest, HelloResponse, HelloRequestBuilder> action) {
        super(client, action, new HelloRequest());
    }

}

This looks quite easy… You have to know that a lot of magic is happening here behind the scene. We will see what it brings in tests.

Action

We have everything now to define our Action class. We create a HelloAction

public class HelloAction extends Action<HelloRequest, HelloResponse, HelloRequestBuilder> {

    public static final HelloAction INSTANCE = new HelloAction();
    public static final String NAME = "cluster:admin/hello";

    private HelloAction() {
        super(NAME);
    }

    @Override
    public HelloResponse newResponse() {
        return new HelloResponse();
    }

    @Override
    public HelloRequestBuilder newRequestBuilder(ElasticsearchClient elasticsearchClient) {
        return new HelloRequestBuilder(elasticsearchClient, INSTANCE);
    }
}

The NAME is a unique identifier. It helps a node to know which TransportAction class should be executed. Also, this name is used in the context of X-Pack Security to filter what users can and can not do.

Transport Action

This is where the actual execution really happens. Let’s create here a HelloTransportAction class. It extends HandledTransportAction so our new action will be automatically registered in the TransportService.

public class HelloTransportAction extends HandledTransportAction<HelloRequest, HelloResponse> {

    @Inject
    public HelloTransportAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
                                IndexNameExpressionResolver resolver, TransportService transportService) {
        super(settings, HelloAction.NAME, threadPool, transportService, actionFilters, resolver, HelloRequest::new);
    }

    @Override
    protected void doExecute(HelloRequest request, ActionListener<HelloResponse> listener) {
        // Implement here
    }
}

When the action cluster:admin/hello is called, doExecute() will be called. So you can implement the logic there. Or you can also delegate it to a service.

For example, if you want to run something against the search service, you just have to inject the SearchService class in the constructor, define a field which points to it and then call the search service in doExecute. You can call more than one service here if you wish.

But here for our example, we will still keep it simple:

@Override
protected void doExecute(HelloRequest request, ActionListener<HelloResponse> listener) {
    try {
        String name = request.getName();
        if (name == null) {
            name = "World";
        }
        HelloResponse response = new HelloResponse();
        response.setMessage("Hello " + name + "!");
        listener.onResponse(response);
    } catch (Exception e) {
        listener.onFailure(e);
    }
}

As you can see (even if it does not make sense here), you can also call the onFailure() method if you want to send an error to the caller.

Modify the REST layer

In the previous article, we wrote all the logic in our RestHelloAction class and Message inner class:

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
    String name = request.param("name");

    if (name == null && request.content().length() > 0) {
        // Let's try to find the name from the body
        Map<String, Object> map = XContentHelper.convertToMap(request.content(), false).v2();
        if (map.containsKey("name")) {
            name = (String) map.get("name");
        }
    }

    return channel -> {
        Message message = new Message(name);
        XContentBuilder builder = channel.newBuilder();
        builder.startObject();
        message.toXContent(builder, restRequest);
        builder.endObject();
        channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
    };
}

class Message implements ToXContent {

    private final String name;

    public Message(String name) {
        if (name == null) {
            this.name = "World";
        } else {
            this.name = name;
        }
    }

    @Override
    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
        return builder.field("message", "Hello " + name + "!");
    }
}

Let’s replace all that now with:

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
    HelloRequest request = new HelloRequest();

    String name = restRequest.param("name");
    if (name != null) {
        request.setName(name);
    } else if (restRequest.hasContent()){
        request.setRestContent(restRequest.content());
    }

    return channel -> client.execute(HelloAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

Basically, we create a HelloRequest object and just execute it on the cluster. Easy, right?

Register the action

Last but not least, we need to tell the plugin about this new action. In IngestBanoPlugin, we add this method:

@Override
public List<ActionHandler<? extends ActionRequest<?>, ? extends ActionResponse>> getActions() {
    return Collections.singletonList(new ActionHandler<>(HelloAction.INSTANCE, HelloTransportAction.class));
}

We are basically associating here our HelloAction with its HelloTransportAction class.

Add transport layer tests

Our REST tests are still working but now we execute our logic within a Node instead of on the REST layer. But we can test if the transport layer is actually working as expected.

To achieve this goal we can add a new JUnit Test which extends ESIntegTestCase:

@ESIntegTestCase.ClusterScope(transportClientRatio = 0.9)
public class TransportHelloTest extends ESIntegTestCase {
    @Override
    protected Collection<Class<? extends Plugin>> nodePlugins() {
        return Collections.singletonList(IngestBanoPlugin.class);
    }

    @Override
    protected Collection<Class<? extends Plugin>> transportClientPlugins() {
        return Collections.singletonList(IngestBanoPlugin.class);
    }
}

In this class, we set that we want to have 90% of the time a TransportClient. Rest of the time, we will have a NodeClient. We also define plugins that we need to load in every node and in the Transport Client as well.

Then we can add our first test:

public void testHelloWithFuture() throws ExecutionException, InterruptedException {
    HelloRequest request = new HelloRequest();
    request.setName("David");
    ActionFuture<HelloResponse> future = client().execute(HelloAction.INSTANCE, request);

    // Do something else if you wish...

    HelloResponse response = future.get();
    assertThat(response.getMessage(), is("Hello David!"));
}

Even simpler:

public void testHelloWithFutureInlined() throws ExecutionException, InterruptedException {
    HelloRequest request = new HelloRequest();
    request.setName("David");
    HelloResponse response = client().execute(HelloAction.INSTANCE, request).get();
    assertThat(response.getMessage(), is("Hello David!"));
}

We can also use a listener.

public void testHelloWithListener() throws ExecutionException, InterruptedException {
    HelloRequest request = new HelloRequest();
    request.setName("David");

    final Boolean[] success = {false};

    client().execute(HelloAction.INSTANCE, request, new ActionListener<HelloResponse>() {
        @Override
        public void onResponse(HelloResponse helloResponse) {
            assertThat(helloResponse.getMessage(), is("Hello David!"));
            success[0] = true;
        }

        @Override
        public void onFailure(Exception e) {
            fail("We got an error: " + e.getMessage());
        }
    });

    awaitBusy(() -> success[0]);
}

What about our Bano REST action?

We can do the same thing for the RestBanoAction class. Remember it was:

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
    return channel -> client.admin().indices().prepareGetIndex()
            .setIndices(".bano*")
            .execute(new RestBuilderListener<GetIndexResponse>(channel) {
                @Override
                public RestResponse buildResponse(GetIndexResponse getIndexResponse, XContentBuilder builder) throws Exception {
                    Indices indices = new Indices();
                    for (String index : getIndexResponse.getIndices()) {
                        indices.addIndex(index);
                    }
                    builder.startObject();
                    indices.toXContent(builder, restRequest);
                    builder.endObject();
                    return new BytesRestResponse(RestStatus.OK, builder);
                }
            });
}

class Indices implements ToXContent {

    private final List<String> indices;

    public Indices() {
        indices = new ArrayList<>();
    }

    public void addIndex(String index) {
        indices.add(index);
    }

    @Override
    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
        builder.startArray("bano");
        for (String index : indices) {
            builder.value(index);
        }
        return builder.endArray();
    }
}

Let’s replace all that with:

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
    BanoRequest request = new BanoRequest();

    String dept = restRequest.param("dept");
    if (dept != null) {
        request.setDept(dept);
    }

    return channel -> client.execute(BanoAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

A BanoRequest basically just contains a dept field. A BanoResponse is providing a list in a very similar way as the previous Indices inner class we wrote. In IngestBanoPlugin, we just have to register the new action with:

@Override
public List<ActionHandler<? extends ActionRequest<?>, ? extends ActionResponse>> getActions() {
    return Arrays.asList(
        new ActionHandler<>(HelloAction.INSTANCE, HelloTransportAction.class),
        new ActionHandler<>(BanoAction.INSTANCE, BanoTransportAction.class));
}

The BanoTransportAction class is:

public class BanoTransportAction extends HandledTransportAction<BanoRequest, BanoResponse> {

    private final ClusterService clusterService;

    @Inject
    public BanoTransportAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
                               IndexNameExpressionResolver resolver, TransportService transportService,
                               ClusterService clusterService) {
        super(settings, BanoAction.NAME, threadPool, transportService, actionFilters, resolver, BanoRequest::new);
        this.clusterService = clusterService;
    }

    @Override
    protected void doExecute(BanoRequest request, ActionListener<BanoResponse> listener) {
        String indices = ".bano-";

        if (request.getDept() != null) {
            indices += request.getDept();
        } else {
            indices += "*";
        }

        BanoResponse response = new BanoResponse();

        try {
            GetIndexRequest indexRequest = new GetIndexRequest().indices(indices);
            String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), indexRequest);

            for (String index : concreteIndices) {
                response.addIndex(index);
            }

            listener.onResponse(response);
        } catch (IndexNotFoundException e) {
            listener.onResponse(response);
        } catch (Exception e) {
            listener.onFailure(e);
        }
    }
}

Note that we injected here the ClusterService as we need it to retrieve the indices.

Cherry on the cake, we can test that with JUnit as we have seen before:

@ESIntegTestCase.ClusterScope(transportClientRatio = 0.9)
public class TransportBanoTest extends ESIntegTestCase {

    @Override
    protected Collection<Class<? extends Plugin>> nodePlugins() {
        return Collections.singletonList(IngestBanoPlugin.class);
    }

    @Override
    protected Collection<Class<? extends Plugin>> transportClientPlugins() {
        return Collections.singletonList(IngestBanoPlugin.class);
    }

    private int numIndices;

    @Before
    public void createIndices() {
        // We first create some indices
        numIndices = randomIntBetween(1, 10);
        for (int i = 0; i < numIndices; i++) {
            createIndex(".bano-" + i);
        }

        // We create some manual indices
        createIndex(".bano-17", ".bano-29", ".bano-95");

        // We create some other indices
        int numOtherIndices = randomIntBetween(1, 10);
        for (int i = 0; i < numOtherIndices; i++) {
            createIndex(randomAsciiOfLengthBetween(6, 10).toLowerCase(Locale.getDefault()));
        }

        ensureYellow();
    }

    public void testBanoNoDept() throws ExecutionException, InterruptedException {
        BanoRequest request = new BanoRequest();
        BanoResponse response = client().execute(BanoAction.INSTANCE, request).get();
        for (int i = 0; i < numIndices; i++) {
            assertThat(response.getIndices(), hasItem(".bano-" + i));
        }
        assertThat(response.getIndices(), hasItem(".bano-17"));
        assertThat(response.getIndices(), hasItem(".bano-29"));
        assertThat(response.getIndices(), hasItem(".bano-95"));
    }

    public void testBanoOneDept() throws ExecutionException, InterruptedException {
        BanoRequest request = new BanoRequest();
        request.setDept("17");
        BanoResponse response = client().execute(BanoAction.INSTANCE, request).get();
        assertThat(response.getIndices(), iterableWithSize(1));
        assertThat(response.getIndices(), hasItem(".bano-17"));
    }

    public void testBanoNonExistingDept() throws ExecutionException, InterruptedException {
        BanoRequest request = new BanoRequest();
        request.setDept("idontbelieverandomizedtestingwillgeneratethat");
        BanoResponse response = client().execute(BanoAction.INSTANCE, request).get();
        assertThat(response.getIndices(), iterableWithSize(0));
    }
}

What’s next?

Et voilà!

In a next post, we will discover how we can use the Task Management API to execute long running tasks which can be run on a cluster, can be monitored and stopped if needed.

Share this article on
Avatar

David Pilato

20+ years of experience, mostly in Java. Living in Cergy, France.