This blog post is part of a series of 3:
- Importing Bano dataset with Logstash
- Using Logstash to lookup for addresses in Bano index
- Using Logstash to enrich an existing dataset with Bano
In the previous post, we described how we can transform a postal address to a normalized one with also the geo location point or transform a geo location point to a postal address.
Let’s say we have an existing dataset we want to enrich.
We will consider 3 scenarios:
- We have a CSV file
- We have a Relational Database, MySQL
- We have data in elasticsearch
Let’s see how to enrich those datasets.
Enriching the CSV file
Anytime I have to read a file from Logstash, I actually like a lot using filebeat for that.
So I changed the input part of Logstash and instead of using an
http-input plugin, I’m now using a
1 2 3 4 5
filebeat.yml file, I just configured this:
1 2 3 4 5 6 7
And I also added the x-pack monitoring to get some insights about the pipeline execution:
1 2 3
I created a naive load test like this where I’m doing 10 iterations fo processing the data:
1 2 3 4 5 6 7
Here is the dataset I have as an input:
So around 2500 lines.
Data looks like this:
1 2 3 4
We need to parse the data with a csv filter:
1 2 3 4 5 6 7 8 9
Here, because we have as an input the geo location points, we will use the slowest strategy that we saw in the previous post: sorting by geo distance.
To make sure I’m slowing down that much the pipeline, I replaced the stdout codec with
1 2 3
3m3.842s to do the 10 runs.
Which means around 18 seconds to enrich 2500 documents, so around 140 documents per second.
Not that bad.
If we look at the Logstash monitoring, we can see that the event latency is around 20-40ms.
We can easily spot the bottleneck.
Doing lookups in Elasticsearch is indeed slowing down our process here but not that much I would say (34ms per event in average). Pretty much acceptable for an ETL operation. That’s one of the reason doing slow operations in Logstash is much better than doing that in Elasticsearch directly as an ingest pipeline as the ingest pipeline is called during the indexing operation and having long running index operation will probably start to fill up the indexing queue of elasticsearch.
Connecting other datasources
You can also imagine reading from another source than a CSV with filebeat but directly read your existing data which exist in a SQL database for example with a
It would look like something close to:
1 2 3 4 5 6 7 8 9 10 11 12
We can also connect to elasticsearch an enrich existing data which are yet available in one index with the
You now have all the tools to do similar address conversion/enrichment. Note that you can use any dataset available. My plan is to index some other open data sources in elasticsearch and try to cover more countries than France.