Dask-ELK usage

In order to user dask-elk, first you need to create an instance of the DaskElasticClient

To connect to an Elasticsearch cluster in localhost:

from dask_elk.client import DaskElasticClient
client = DaskElasticClient(host='localhost')

Read dataframe

To read from data from the elasticsearch:

my_index ='myindex'
df = client.read(index=my_index, doc_type='_doc')

Client will do the following work:

  • Identify the publish address of the cluster’s nodes
  • Obtain information on the index:
    • Mapping of the index
    • In how many shards is index divided
    • Which nodes contain the shards of the index

By default client will try to connect to each node containing the shards and issue the request on each node. Thus it will create number of tasks equal to the number of shards per requested index You can increase the number of tasks by setting the number of documents per partition:

df = client.read(index=my_index, doc_type='_doc', number_of_docs_per_partition=1000)

The returned object is a dask dataframe.

You can also specify a dsl query to be pushed down to Elasticsearch to minimize the data returned back

df = client.read(query=query, index=my_index, doc_type='_doc')

The above code will push down the query to the Elasticsearch and minimize the returned data. Since DaskElasticClient uses the scroll api from Elasticsearch, aggregation queries are not supported.

Save dataframe

You can save your data back to an Elasticsearch index:

df = client.save(df, index='index_to_save', doc_type='_doc', action='index')

Save method uses bulk actions under the hood to save documents back to elasticsearch. The number of tasks depends on the number of paritions of the passed df

You can use some python fromat functions to dynamically create indices during save (The operation is experimental) e.g assuming that timestamp is a datetime column then the folloing code will try to index documents on idices splited per day

index = 'index-{timestamp:%Y.%m.%d}'
df = client.save(df, index=index, doc_type='_doc', action='index')

To update documents provided df needs to contain _id column. You should also provide update as action parameter