The correct way to index data into Elasticsearch with (Python) elasticsearch-dsl

14 May 2021   0 comments   Python, MDN, Elasticsearch

This is how MDN Web Docs uses Elasticsearch. Daily, we build all the content and then upload it all using elasticsearch-dsl using aliases. Because there are no good complete guides to do this, I thought I'd write it down for the next person who needs to do something similar. Let's jump straight into the code. The reader will need a healthy dose of imagination to fill in their details.

Indexing

# models.py

from datetime.datetime import utcnow

from elasticsearch_dsl import Document

PREFIX = "myprefix"


class MyDocument(Document):
    title = Text()
    body = Text()
    # ...

    class Index:
        name = (
            f'{PREFIX}_{utcnow().strftime("%Y%m%d%H%M%S")}'
        )

What's important to note here is that the MyDocument.Index.name is dynamically allocated every single time the module is imported. It's not very important exactly what it is called but it's important that it becomes unique each time.
This means that when you start using MyDocument it will automatically figure out which index to use. Now, it's time to create the index and bulk publish it.

# index.py
# Note! This example code skips over things like progress bars
# and verbose logging and misc sanity checks and stuff.

from elasticsearch.helpers import parallel_bulk
from elasticsearch_dsl import Index
from elasticsearch_dsl.connections import connections

from .models import MyDocument, PREFIX


def index(buildroot: Path, url: str, update=False):
    """
    * 'buildroot' is where the files are we're going to read and index
    * 'url' is the host URL for the Elasticsearch server
    * 'update' is if just want to "cake on" a couple of documents 
      instead of starting over and doing a complete indexing.
    """

    # Connect and stuff
    connections.create_connection(hosts=[url], retry_on_timeout=True)
    connection = connections.get_connection()
    health = connection.cluster.health()
    status = health["status"]
    if status not in ("green", "yellow"):
        raise Exception(f"status {status} not green or yellow")

    if update:
        for name in connection.indices.get_alias():
            if name.startswith(f"{PREFIX}_"):
                document_index = Index(name)
                break
        else:
            raise IndexAliasError(
                f"Unable to find an index called {PREFIX}_*"
            )

    else:
        # Confusingly, `._index` is actually not a private API.
        # It's the documented way you're supposed to reach it.
        document_index = MyDocument._index
        document_index.create()

    def generator():
        for doc in Path(buildroot):
            # The reason for specifying the exact index name is that we might
            # be doing an update and if you don't specify it, elasticsearch_dsl
            # will fall back to using whatever Document._meta.Index automatically
            # becomes in this moment.
            yield to_search(doc, _index=document_index._name).to_dict(True)

    for success, info in parallel_bulk(connection, generator()):
        # 'success' is a boolean
        # 'info' has stuff like:
        #  - info["index"]["error"]
        #  - info["index"]["_shards"]["successful"]
        #  - info["index"]["_shards"]["failed"]
        pass

    if update:
        # When you do an update, Elasticsearch will internally delete the
        # previous docs (based on the _id primary key we set).
        # Normally, Elasticsearch will do this when you restart the cluster
        # but that's not something we usually do.
        # See https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
        document_index.forcemerge()
    else:
        # Now we're going to bundle the change to set the alias to point
        # to the new index and delete all old indexes.
        # The reason for doing this together in one update is to make it atomic.
        alias_updates = [
            {"add": {"index": document_index._name, "alias": PREFIX}}
        ]
        for index_name in connection.indices.get_alias():
            if index_name.startswith(f"{PREFIX}_"):
                if index_name != document_index._name:
                    alias_updates.append({"remove_index": {"index": index_name}})
        connection.indices.update_aliases({"actions": alias_updates})

    print("All done!")



def to_search(file: Path, _index=None):
    with open(file) as f:
        data = json.load(f)
    return MyDocument(
        _index=_index,
        _id=data["identifier"],
        title=data["title"],
        body=data["body"]
    )

A lot is left to the reader as an exercise to fill in but these are the most important operations. It demonstrates how you can

  1. Correctly create indexes
  2. Atomically create an alias and clean up old indexes (and aliases)
  3. How you can add to an existing index

After you've run this you'll see something like this:

$ curl http://localhost:9200/_cat/indices?v
...
health status index                   uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   myprefix_20210514141421 vulVt5EKRW2MNV47j403Mw   1   1      11629            0     28.7mb         28.7mb

$ curl http://localhost:9200/_cat/aliases?v
...
alias    index                   filter routing.index routing.search is_write_index
myprefix myprefix_20210514141421 -      -             -              -

Searching

When it comes to using the index, well, it depends on where your code for that is. For example, on MDN Web Docs, the code that searches the index is in an entirely different code-base. It's incidentally Python (and elasticsearch-dsl) in both places but other than that they have nothing in common. So for the searching, you need to manually make sure you write down the name of the index (or name of the alias if you prefer) into the code that searches. For example:

from elasticsearch_dsl import Search

def search(params):
    search_query = Search(index=settings.SEARCH_INDEX_NAME)

    # Do stuff to 'search_query' based on 'params'

    response = search_query.execute()   
    for hit in response:
        # ...

If you're within the same code that has that models.MyDocument in the first example code above, you can simply do things like this:

from elasticsearch_dsl import Index
from elasticsearch_dsl.connections import connections

from .models import PREFIX


def analyze(
    url: str,
    text: str,
    analyzer: str,
):
    connections.create_connection(hosts=[url])
    index = Index(PREFIX)
    analysis = index.analyze(body={"text": text, "analyzer": analyzer})
    # ...

Comments

Your email will never ever be published

Related posts