But actually, you will probably like to execute some actions against a Node, call some internal services…
So the implementation we wrote needs to be modified a bit.
Concepts
Before going further, we need to understand some of the concepts and roles behind some classes:
Action: defines an action against a Node. It has a unique name and methods which provides a RequestBuilder object and a Response object
ActionRequest: represents the request which must be executed (parameters basically). It should be serializable as its execution could potentially be executed on another node in the cluster (depends on your implementation). Imagine you have a Client Node which proxies the execution to the actual cluster. The REST layer will build a Request which will be sent to the cluster.
ActionRequestBuilder: manages all the asynchronous aspects (Future, Listeners…)
ActionResponse: the object which is sent back to the REST Layer. It also needs to be serializable.
TransportAction: where the execution is actually launched.
So basically, the REST layer gets a REST requests, transforms it in an ActionRequest which is forwarded to a TransportAction where the implementation logic is executed (or called). Then an ActionResponse object is created and sent back to the REST layer.
The REST layer serialize it as JSON and gives back the response as JSON to the user.
Action Request
We have a parameter coming from the REST layer which is either a String or a JSON content.
See the previous article if needed.
Let’s create a HelloRequest class:
123456789101112131415161718192021222324
publicclassHelloRequestextendsActionRequest<HelloRequest>{privateStringname;publicvoidsetName(Stringname){this.name=name;}publicvoidsetRestContent(BytesReferencerestContent){// Let's try to find the name from the bodyMap<String,Object>map=XContentHelper.convertToMap(restContent,false).v2();if(map.containsKey("name")){name=(String)map.get("name");}}publicStringgetName(){returnname;}@OverridepublicActionRequestValidationExceptionvalidate(){returnnull;}}
validate() method is called to validate a request before it’s sent to a Node.
You could imagine here having mandatory fields or parameters for example and fail the request
if not provided. For example (we won’t implement that in our example):
12345678910
@OverridepublicActionRequestValidationExceptionvalidate(){ActionRequestValidationExceptionvalidationException=null;if(name==null){validationException=newActionRequestValidationException();validationException.addValidationError("You must provide a name");returnvalidationException;}returnnull;}
As we want to ActionRequest being serializable, let’s implement the serialization layer:
This class will hold our response message. It can be serializable between nodes
because we implemented again readFrom(StreamInput) and writeTo(StreamOutput) methods.
But that’s not enough to serialize it as a JSON response. To do that, we need to implement
ToXContent interface and add toXContent(XContentBuilder, Params) method:
The NAME is a unique identifier. It helps a node to know which TransportAction class should be executed.
Also, this name is used in the context of X-Pack Security to filter what users can and can not do.
Transport Action
This is where the actual execution really happens. Let’s create here a TransportHelloAction class.
It extends HandledTransportAction so our new action will be automatically registered in the TransportService.
When the action cluster:admin/hello is called, doExecute() will be called.
So you can implement the logic there. Or you can also delegate it to a service.
For example, if you want to run something against the search service, you just have
to inject the SearchService class in the constructor, define a field which points to it
and then call the search service in doExecute. You can call more than one service here if you wish.
But here for our example, we will still keep it simple:
@OverridepublicvoidhandleRequest(RestRequestrequest,RestChannelchannel,NodeClientclient)throwsException{RestToXContentListener<ToXContent>listener=newRestToXContentListener<>(channel);Stringname=request.param("name");if(name==null&&request.content().length()>0){// Let's try to find the name from the bodyMap<String,Object>map=XContentHelper.convertToMap(request.content(),false).v2();if(map.containsKey("name")){name=(String)map.get("name");}}listener.onResponse(newMessage(name));}classMessageimplementsToXContent{privatefinalStringname;publicMessage(Stringname){if(name==null){this.name="World";}else{this.name=name;}}@OverridepublicXContentBuildertoXContent(XContentBuilderbuilder,Paramsparams)throwsIOException{returnbuilder.field("message","Hello "+name+"!");}}
We are basically associating here our HelloAction with its TransportHelloAction class.
Add transport layer tests
Our REST tests are still working but now we execute our logic within a Node instead of on the REST layer.
But we can test if the transport layer is actually working as expected.
To achieve this goal we can add a new JUnit Test which extends ESIntegTestCase:
In this class, we set that we want to have 90% of the time a TransportClient.
Rest of the time, we will have a NodeClient.
We also define plugins that we need to load in every node and in the Transport Client as well.
Then we can add our first test:
12345678910
publicvoidtestHelloWithFuture()throwsExecutionException,InterruptedException{HelloRequestrequest=newHelloRequest();request.setName("David");ActionFuture<HelloResponse>future=client().execute(HelloAction.INSTANCE,request);// Do something else if you wish...HelloResponseresponse=future.get();assertThat(response.getMessage(),is("Hello David!"));}
A BanoRequest basically just contains a dept field.
A BanoResponse is providing a list in a very similar way as the previous Indices inner class we wrote.
In IngestBanoPlugin, we just have to register the new action with:
@ESIntegTestCase.ClusterScope(transportClientRatio=0.9)publicclassTransportBanoTestextendsESIntegTestCase{@OverrideprotectedCollection<Class<?extendsPlugin>>nodePlugins(){returnCollections.singletonList(IngestBanoPlugin.class);}@OverrideprotectedCollection<Class<?extendsPlugin>>transportClientPlugins(){returnCollections.singletonList(IngestBanoPlugin.class);}privateintnumIndices;@BeforepublicvoidcreateIndices(){// We first create some indicesnumIndices=randomIntBetween(1,10);for(inti=0;i<numIndices;i++){createIndex(".bano-"+i);}// We create some manual indicescreateIndex(".bano-17",".bano-29",".bano-95");// We create some other indicesintnumOtherIndices=randomIntBetween(1,10);for(inti=0;i<numOtherIndices;i++){createIndex(randomAsciiOfLengthBetween(6,10).toLowerCase(Locale.getDefault()));}ensureYellow();}publicvoidtestBanoNoDept()throwsExecutionException,InterruptedException{BanoRequestrequest=newBanoRequest();BanoResponseresponse=client().execute(BanoAction.INSTANCE,request).get();for(inti=0;i<numIndices;i++){assertThat(response.getIndices(),hasItem(".bano-"+i));}assertThat(response.getIndices(),hasItem(".bano-17"));assertThat(response.getIndices(),hasItem(".bano-29"));assertThat(response.getIndices(),hasItem(".bano-95"));}publicvoidtestBanoOneDept()throwsExecutionException,InterruptedException{BanoRequestrequest=newBanoRequest();request.setDept("17");BanoResponseresponse=client().execute(BanoAction.INSTANCE,request).get();assertThat(response.getIndices(),iterableWithSize(1));assertThat(response.getIndices(),hasItem(".bano-17"));}publicvoidtestBanoNonExistingDept()throwsExecutionException,InterruptedException{BanoRequestrequest=newBanoRequest();request.setDept("idontbelieverandomizedtestingwillgeneratethat");BanoResponseresponse=client().execute(BanoAction.INSTANCE,request).get();assertThat(response.getIndices(),iterableWithSize(0));}}
What’s next?
Et voilĂ !
In a next post, we will discover how we can use the Task Management API to execute long running tasks
which can be run on a cluster, can be monitored and stopped if needed.