Enriching your postal addresses with Elastic stack - part 2

2018-03-23
12 min read
Featured Image

This blog post is part of a series of 3:

In the previous post, we described how we indexed data coming from the BANO project so we now have indices containing all the french postal addresses.

Let’s see what we can do now with this dataset.

Searching for addresses

Good. Can we use a search engine to search?

GET .bano/_search?search_type=dfs_query_then_fetch
{
  "size": 1,
  "query": {
    "bool": {
      "should": [
        {
          "match": {
            "address.number": "23"
          }
        },
        {
          "match": {
            "address.street_name": "r verdiere"
          }
        },
        {
          "match": {
            "address.city": "rochelle"
          }
        }

      ]
    }
  }
}

This gives us back:

{
  "took": 170,
  "timed_out": false,
  "_shards": {
    "total": 99,
    "successful": 99,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 10380977,
    "max_score": 23.681055,
    "hits": [
      {
        "_index": ".bano-17",
        "_type": "doc",
        "_id": "173008250H-23",
        "_score": 23.681055,
        "_source": {
          "address": {
            "zipcode": "17000",
            "number": "23",
            "city": "La Rochelle",
            "street_name": "Rue Verdière"
          },
          "location": {
            "lon": -1.155167,
            "lat": 46.157353
          },
          "id": "173008250H-23",
          "source": "C+O",
          "region": "17"
        }
      }
    ]
  }
}

This takes 170ms on my machine but it can be faster if we know in advance the department number. So we can optimize the number of shards to hit:

GET .bano-17/_search?search_type=dfs_query_then_fetch
{
  // Same query
}

It now gives the same result but much faster:

{
  "took": 6,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 212872,
    "max_score": 18.955963,
    "hits": [
      {
        "_index": ".bano-17",
        "_type": "doc",
        "_id": "173008250H-23",
        "_score": 18.955963,
        "_source": {
          "address": {
            "zipcode": "17000",
            "number": "23",
            "city": "La Rochelle",
            "street_name": "Rue Verdière"
          },
          "location": {
            "lon": -1.155167,
            "lat": 46.157353
          },
          "id": "173008250H-23",
          "source": "C+O",
          "region": "17"
        }
      }
    ]
  }
}

Note that this query is finding 212.872 addresses but the most relevant one is the one we are searching for. Yeah, relevancy is one of the key points of a search engine.

Searching by geo point

We know that we can also have to search by a geo point.

Let’s do a naive approach first. We search for all the points in the dataset but we want to sort by distance from the point we have as an input. Which means that the first point we are getting back is the closest address:

GET .bano/_search
{
  "size": 1, 
  "sort": [
    {
      "_geo_distance": {
        "location": {
          "lat": 46.15735,
          "lon": -1.1551
        }
      }
    }
  ]
}

This gives:

{
  "took": 403,
  "timed_out": false,
  "_shards": {
    "total": 99,
    "successful": 99,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 16402853,
    "max_score": null,
    "hits": [
      {
        "_index": ".bano-17",
        "_type": "doc",
        "_id": "173008250H-23",
        "_score": null,
        "_source": {
          "address": {
            "zipcode": "17000",
            "number": "23",
            "city": "La Rochelle",
            "street_name": "Rue Verdière"
          },
          "location": {
            "lon": -1.155167,
            "lat": 46.157353
          },
          "id": "173008250H-23",
          "source": "C+O",
          "region": "17"
        },
        "sort": [
          5.176690615711886
        ]
      }
    ]
  }
}

Few things to note here. First, we asked for:

{
  "lat": 46.15735,
  "lon": -1.1551  
}

We are getting back another point. Of course, Bano does not have every single centimer but just addresses:

{
  "lat": 46.157353,
  "lon": -1.155167
}

Second thing, the total number of hits we are doing the sort on: 16.402.853, the full dataset that is. Which has a consequence on the response time: 403ms.

We can easily assume that when we are looking for an address, we will most likely be able to find one in an area of may be 2 kilometers around the point. So instead of sorting all the points, we can just filter first by distance the dataset:

GET .bano/_search
{
  "size": 1,
  "query": {
    "bool": {
      "filter": {
        "geo_distance": {
          "distance": "1km",
          "location": {
            "lat": 46.15735,
            "lon": -1.1551
          }
        }
      }
    }
  },
  "sort": [
    {
      "_geo_distance": {
        "location": {
          "lat": 46.15735,
          "lon": -1.1551
        }
      }
    }
  ]
}

This gives back the same point but with a different header:

{
  "took": 45,
  "timed_out": false,
  "_shards": {
    "total": 99,
    "successful": 99,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 4467,
    "max_score": null,
    "hits": [ /* ... */ ]
  }
}

45ms because we dramatically reduced the number of points to sort to 4.467.

And if we know the department, that can be even better:

GET .bano-17/_search
{
  // Same query
}

We now have a really decent response time. Note that filesystem cache plays a great role here as well:

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  }
}

So we know how to query the data. Let’s use all that to read an existing dataset and enrich it with Logstash.

Building incrementally a Logstash enrichment pipeline

If you have been following this blog, I’m always starting with a default pipeline like this (bano-enrich.conf file):

input { 
  stdin { } 
}

filter {
}

output {
  stdout { codec => rubydebug }
}

It’s simple to use as you just the run:

head -1 mydata | bin/logstash -f bano-enrich.conf

So far so good. But what is the problem?

Anytime you change your configuration and you want to test it, you need to send the same command line again. Which is not an issue. Your terminal has probably an history :).

The problem is the time taken by Logstash to start the process (the JVM and Logstash initialization itself). On my laptop, it can take around 20 seconds.

And this is not super developer friendly in my opinion.

Let me share a small trick to make that better. Use the http-input-plugin instead:

input {
  http { }
}

This will start an HTTP server running on 8080 port which you can use by running something like:

curl -XPOST "localhost:8080" -H "Content-Type: application/json" -d '{
  "test_case": "Address with text",
  "name": "Joe Smith",
  "address": {
    "number": "23",
    "street_name": "r verdiere",
    "city": "rochelle",
    "country": "France"
  }
}'

What is the difference? Well, because you are not using stdin, you can ask logstash to hot reload your pipeline anytime you save a new version of the file:

bin/logstash -r -f bano-enrich.conf

Then, when you update the bano-enrich.conf, you can see in logs:

[2018-03-24T11:06:17,680][INFO ][logstash.pipelineaction.reload] Reloading pipeline {"pipeline.id"=>:main}
[2018-03-24T11:06:18,007][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x457a565f run>"}
[2018-03-24T11:06:18,082][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-03-24T11:06:18,122][INFO ][logstash.pipeline        ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x6d4b91c4 sleep>"}
[2018-03-24T11:06:18,133][INFO ][logstash.agent           ] Pipelines running {:count=>2, :pipelines=>[".monitoring-logstash", "main"]}

So less than 1 second to reload the pipeline. That’s a huge win!

Sending our first data

Let’s call again our sample. I actually created a samples.sh script which allows me to add more use cases:

curl -XPOST "localhost:8080" -H "Content-Type: application/json" -d '{
  "test_case": "Address with text",
  "name": "Joe Smith",
  "address": {
    "number": "23",
    "street_name": "r verdiere",
    "city": "rochelle",
    "country": "France"
  }
}'

Running it gives:

{
     "test_case" => "Address with text",
      "@version" => "1",
          "host" => "0:0:0:0:0:0:0:1",
    "@timestamp" => 2018-03-24T09:09:58.749Z,
          "name" => "Joe Smith",
       "address" => {
               "city" => "rochelle",
        "street_name" => "r verdiere",
             "number" => "23",
            "country" => "France"
    },
       "headers" => {
         "content_length" => "158",
         "request_method" => "POST",
           "content_type" => "application/json",
              "http_host" => "localhost:8080",
           "request_path" => "/",
        "http_user_agent" => "curl/7.54.0",
            "request_uri" => "/",
           "http_version" => "HTTP/1.1",
            "http_accept" => "*/*"
    }
}

Calling Elasticsearch with addresses

We saw at the begining of this post how we can query elasticsearch to get meaningful data from our bano dataset.

Let’s just connect it with logstash by using the elasticsearch-filter-plugin:

elasticsearch {
  query_template => "search-by-name.json"
  index => ".bano"
  fields => {
    "location" => "[location]"
    "address" => "[address]"
  }
  remove_field => ["headers", "host", "@version", "@timestamp"]
}

Let’s explain some new parameters we have never spoke about.

query_template helps to write an elasticsearch query within an external file instead of having to flatten it inside the logstash configuration. That makes the code more readable but has a drawback. When you update the search-by-name.json Logstash does not detect this “change” and does not update the pipeline. So have to fake it by doing a super small change in the pipeline to be reloading again.

Here is the content of the search-by-name.json file:

{
  "size": 1,
  "query":{
    "bool": {
      "should": [
        {
          "match": {
            "address.number": "%{[address][number]}"
          }
        },
        {
          "match": {
            "address.street_name": "%{[address][street_name]}"
          }
        },
        {
          "match": {
            "address.city": "%{[address][city]}"
          }
        }
      ]
    }
  }
}

This looks familiar right?

index is the elasticsearch index or alias we want to query. Here we want to query .bano alias.

fields are the fields we want to extract from the elasticsearch response to populate our “event” (or document if you prefer).

Running again samples.sh gives:

{
    "test_case" => "Address with text",
     "location" => {
        "lon" => -1.155167,
        "lat" => 46.157353
    },
         "name" => "Joe Smith",
      "address" => {
               "city" => "La Rochelle",
        "street_name" => "Rue Verdière",
            "zipcode" => "17000",
             "number" => "23"
    }
}

Great! It works!

Calling Elasticsearch with geo points

But we have another use case. We also want to be able to search by location with a document like the following which I add to samples.sh script:

curl -XPOST "localhost:8080" -H "Content-Type: application/json" -d '{
  "test_case": "Address with geo",
  "location": {
    "lat": 46.15735,
    "lon": -1.1551
  }
}'

Running it gives:

{
    "test_case" => "Address with geo",
     "location" => {
        "lat" => 46.15735,
        "lon" => -1.1551
    }
}

We need to add some conditionals in the pipeline so we can now search using another query_template:

if [location][lat] and [location][lon] {
  # We search by distance in that case
  elasticsearch {
    query_template => "search-by-geo.json"
    index => ".bano"
    fields => {
      "location" => "[location]"
      "address" => "[address]"
    }
    remove_field => ["headers", "host", "@version", "@timestamp"]
  }
} else {
  # We search by address in that case
  elasticsearch {
    query_template => "search-by-name.json"
    index => ".bano"
    fields => {
      "location" => "[location]"
      "address" => "[address]"
    }
    remove_field => ["headers", "host", "@version", "@timestamp"]
  }
}

The search-by-geo.json template is also looking familiar:

{
  "size": 1,
  "query": {
    "bool": {
      "filter": {
        "geo_distance": {
          "distance": "1km",
          "location": {
            "lat": %{[location][lat]},
            "lon": %{[location][lon]}
          }
        }
      }
    }
  },
  "sort": [
    {
      "_geo_distance": {
        "location": {
          "lat": %{[location][lat]},
          "lon": %{[location][lon]}
        }
      }
    }
  ]
}

Running our samples again gives now:

{
    "test_case" => "Address with geo",
     "location" => {
        "lon" => -1.155167,
        "lat" => 46.157353
    },
      "address" => {
               "city" => "La Rochelle",
        "street_name" => "Rue Verdière",
            "zipcode" => "17000",
             "number" => "23"
    }
}

Optimizing the queries

We have also seen that it can be really unefficient to read the whole dataset every time. If we have the department number, we can probably help elasticsearch by querying the right index instead of all indices.

Let’s assume that the zipcode or part of the zipcode is provided in the input event. For example, let’s add the following use cases to our samples.sh:

curl -XPOST "localhost:8080" -H "Content-Type: application/json" -d '{
  "test_case": "Address with geo and zipcode",
  "address": {
    "zipcode": "17000"
  },
  "location": {
    "lat": 46.15735,
    "lon": -1.1551
  }
}'
curl -XPOST "localhost:8080" -H "Content-Type: application/json" -d '{
  "test_case": "Address with geo and partial zipcode",
  "address": {
    "zipcode": "17"
  },
  "location": {
    "lat": 46.15735,
    "lon": -1.1551
  }
}'

Let’s check first (before calling elasticsearch) if we have a zipcode and if so, let’s build a dept temporary field which we truncate to keep only the 2 first digits. This is the department number in France:

if [address][zipcode] {
  mutate { add_field => { "dept" => "%{[address][zipcode]}" } }
  truncate { 
    fields => ["dept"]
    length_bytes => 2
  }
}

Then, let’s create an index_suffix field:

mutate { add_field => { "index_suffix" => "-%{dept}" } }

But if there is no zipcode, we need to add some default values:

else {
  mutate { add_field => { "dept" => "" } }
  mutate { add_field => { "index_suffix" => "" } }
}

So far, so good. But wait, I mentioned that nothing can be that simple in France. And yeah… We have departments with 3 digits! o_O

Fortunately, they ate all starting with 97. So we need to take that into account as well:

if [address][zipcode] {
  mutate { add_field => { "dept" => "%{[address][zipcode]}" } }
  truncate { 
    fields => ["dept"]
    length_bytes => 2
  }
  if [dept] == "97" {
    mutate { replace => { "dept" => "%{[address][zipcode]}" } }
    truncate { 
      fields => ["dept"]
      length_bytes => 3
    }
  }
  mutate { add_field => { "index_suffix" => "-%{dept}" } }
} else {
  mutate { add_field => { "dept" => "" } }
  mutate { add_field => { "index_suffix" => "" } }
}

We can now change the index name with:

index => ".bano%{index_suffix}"

And also remove to temporary fields we created:

remove_field => ["headers", "host", "@version", "@timestamp", "index_suffix", "dept"]

At the end we check that we have no regression, specifically with department 974

curl -XPOST "localhost:8080" -H "Content-Type: application/json" -d '{
  "test_case": "Address with geo and zipcode in 974",
  "address": {
    "zipcode": "97400"
  },
  "location": {
    "lat": -21.214204,
    "lon": 55.361034
  }
}'

Which indeed gives us the right value:

{
    "test_case" => "Address with geo and zipcode in 974",
     "location" => {
        "lon" => 55.361034,
        "lat" => -21.214204
    },
      "address" => {
               "city" => "Les Avirons",
        "street_name" => "Chemin des Acacias",
            "zipcode" => "97425",
             "number" => "13"
    }
}

Next steps

Have a look at the next post to see how you can now use this technique to enrich your existing data.

The full logtstash pipeline

Here is the full pipeline I ended up with:

input {
  http { }
}

filter {
  if [address][zipcode] {
    mutate { add_field => { "dept" => "%{[address][zipcode]}" } }
    truncate { 
      fields => ["dept"]
      length_bytes => 2
    }
    if [dept] == "97" {
      mutate { replace => { "dept" => "%{[address][zipcode]}" } }
      truncate { 
        fields => ["dept"]
        length_bytes => 3
      }
    }
    mutate { add_field => { "index_suffix" => "-%{dept}" } }
  } else {
    mutate { add_field => { "dept" => "" } }
    mutate { add_field => { "index_suffix" => "" } }
  }

  if [location][lat] and [location][lon] {
    elasticsearch {
      query_template => "search-by-geo.json"
      index => ".bano"
      fields => {
        "location" => "[location]"
        "address" => "[address]"
      }
      remove_field => ["headers", "host", "@version", "@timestamp", "index_suffix", "dept"]
    }
  } else {
    elasticsearch {
      query_template => "search-by-name.json"
      index => ".bano"
      fields => {
        "location" => "[location]"
        "address" => "[address]"
      }
      remove_field => ["headers", "host", "@version", "@timestamp", "index_suffix", "dept"]
    }
  }
}

output {
  stdout { codec => rubydebug }
}
Share this article on
Avatar

David Pilato

20+ years of experience, mostly in Java. Living in Cergy, France.