Issue
I need to come up with a strategy to process and update documents in an elasticsearch index periodically and efficiently. I do not have to look at documents that I processed before.
My setting is that I have a long running process, which continuously inserts documents to an index, say approx. 500 documents per hour (think about the common logging example).
I need to find a solution to update some amount of documents periodically (via cron job, e.g) to run some code on a specific field (text field, eg.) to enhance that document with a number of new fields. I want to do this to offer more fine grained aggregations on the index. In the logging analogy, this could be, e.g., I get the UserAgent-string from a log entry (document), do some parsing on that, and add some new fields back to that document and index it.
So my approach would be:
- Get some amount of documents (or even all) that I haven't looked at before. I could query them by combining
must_not
andexists
, for instance. - Run my code on these documents (run the parser, compute some new stuff, whatever).
- Update the documents obtained previously (probably most preferably via bulk api).
I know there is the Update by query API. But this does not seem to be right here, since I need to run my own code (which btw depends on external libraries), on my server and not as a painless script, which would not offer that comprehensive tasks I need.
I am accessing elasticsearch via python.
The problem is now that I don't know how to implement the above approach. E.g. what if the amount of document obtained in step 1. is larger than myindex.settings.index.max_result_window
?
Any ideas?
Solution
I considered @Jay's comment and ended up with this pattern, for the moment:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import scan
from my_module.postprocessing import post_process_doc
es = Elasticsearch(...)
es.ping()
def update_docs( docs ):
""""""
for idx,doc in enumerate(docs):
if idx % 10000 == 0:
print( 'next 10k' )
new_field_value = post_process_doc( doc )
doc_update = {
"_index": doc["_index"],
"_id" : doc["_id"],
"_op_type" : "update",
"doc" : { <<the new field>> : new_field_value }
}
yield doc_update
docs = scan( es, query='{ "query" : { "bool": { "must_not": { "exists": { "field": <<the new field>> }} } }}', index=index, scroll="1m", preserve_order=True )
bulk( es, update_docs( docs ) )
Comments:
- I learned that elasticsearch keeps a view of the search results when you do a scroll and pass the corresponding ids with the query request. The scan abstraction method will handle that for you. The scroll-parameter in the method above tells elasticsearch how long the view will be open, i.e., how long the view will be consistant.
- As stated in my comment the documentation says that they no longer recommend using the scroll API for deep pagination. If you need to preserve the index state while paging use .. point in time (PIT), but I haven't tried it yet.
- In my implementation, I needed to pass
preserve_over=True
, otherwise an error was thrown. - Remember to update the mapping of the index beforehand, e.g., when you want to add a nested fields as another field in your document.
Answered By - matthaeus Answer Checked By - Mildred Charles (WPSolving Admin)