Skip to content

Commit 896e394

Browse files
committed
document the use case for tailing an index
1 parent 7c976e1 commit 896e394

File tree

1 file changed

+115
-0
lines changed

1 file changed

+115
-0
lines changed

docs/index.asciidoc

+115
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,119 @@ This would create an Elasticsearch query with the following format:
4949
}'
5050

5151

52+
[id="plugins-{type}s-{plugin}-cursor"]
53+
==== Tracking a field's value across runs
54+
55+
It is sometimes desirable to track the value of a particular field between two jobs:
56+
* avoid re-processing the entire result set of a long query after an unplanned restart
57+
* only grab new data from an index instead of processing the entire set on each job
58+
59+
For this, the Elasticsearch input plugin provides the <<tracking_field>> and <<tracking_field_seed>> options.
60+
When <<tracking_field>> is set, the plugin will record the value of that field for the last document retrieved in a run into
61+
a file (location defaults to <<last_run_metadata_path>>.
62+
63+
The user can then inject this value in the query using the placeholder `:last_value`. The value will be injected into the query
64+
before execution, and the updated after the query completes, assuming new data was found.
65+
66+
The plugin also offers another placeholder called `:present` used to inject the nano-second based
67+
68+
This feature works best when:
69+
* the query sorts by the tracking field
70+
* the field type has enough resolution so that two events are unlikely to have the same value for the field
71+
72+
A suggestion is to use a tracking field that has nanosecond second precision, like
73+
https://www.elastic.co/guide/en/elasticsearch/reference/current/date_nanos.html[date nanoseconds] field type.
74+
75+
A good use case for this feature is to track new data in an index, which can be achieved by:
76+
77+
1. create ingest pipeline that adds Elasticsearch's `_ingest.timestamp` field to the documents as `event.ingested`:
78+
79+
[source, json]
80+
PUT _ingest/pipeline/my-pipeline
81+
{
82+
"processors": [
83+
{
84+
"script": {
85+
"lang": "painless",
86+
"source": "ctx.putIfAbsent(\"event\", [:]); ctx.event.ingested = metadata().now.format(DateTimeFormatter.ISO_INSTANT);"
87+
}
88+
}
89+
]
90+
}
91+
92+
93+
2. create an index mapping where the tracking field is of date nanosecond type and invokes the defined pipeline:
94+
95+
[source, json]
96+
PUT /_template/my_template
97+
{
98+
"index_patterns": ["test-*"],
99+
"settings": {
100+
"index.default_pipeline": "my-pipeline",
101+
},
102+
"mappings": {
103+
"properties": {
104+
"event": {
105+
"properties": {
106+
"ingested": {
107+
"type": "date_nanos",
108+
"format": "strict_date_optional_time_nanos"
109+
}
110+
}
111+
}
112+
}
113+
}
114+
}
115+
116+
3. define a query that looks at all data of the indices, sorted by the tracking field, and with a range filter since the last value seen until present:
117+
118+
[source,json]
119+
{
120+
"query": {
121+
"range": {
122+
"event.ingested": {
123+
"gt": ":last_value",
124+
"lt": ":present"
125+
}
126+
}
127+
},
128+
"sort": [
129+
{
130+
"event.ingested": {
131+
"order": "asc",
132+
"format": "strict_date_optional_time_nanos",
133+
"numeric_type": "date_nanos"
134+
}
135+
}
136+
]
137+
}
138+
139+
4. configure the Elasticsearch input to query the indices with the query defined above, every minute, and track the `event.ingested` field:
140+
141+
[source, ruby]
142+
input {
143+
elasticsearch {
144+
id => tail_test_index
145+
hosts => [ 'https://..']
146+
api_key => '....'
147+
index => 'test-*'
148+
query => '{ "query": { "range": { "event.ingested": { "gt": ":last_value", "lt": ":present"}}}, "sort": [ { "event.ingested": {"order": "asc", "format": "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" } } ] }'
149+
tracking_field => "[event][ingested]"
150+
# set a seed value to a value known to be older than any value of `event.ingested`
151+
tracking_field_seed => "1980-01-01T23:59:59.999999999Z"
152+
slices => 5 # optional use of slices to speed data processing, should be less than number of primary shards
153+
schedule => '* * * * *' # every minute
154+
schedule_overlap => false # don't accumulate jobs if one takes longer than 1 minute
155+
}
156+
}
157+
158+
With this setup, as new documents are indexed an `test-*` index, the next scheduled run will:
159+
160+
1. select all new documents since the last observed value of the tracking field;
161+
2. use PIT+search_after to paginate through all the data;
162+
3. update the value of the field at the end of the pagination.
163+
164+
[id="plugins-{type}s-{plugin}-scheduling"]
52165
==== Scheduling
53166

54167
Input from this plugin can be scheduled to run periodically according to a specific
@@ -659,6 +772,8 @@ The value of this field is injected into each query if the query uses the placeh
659772
For the first query after a pipeline is started, the value used is either read from <<last_run_metadata_path>> file,
660773
or taken from <<tracking_field_seed>> setting.
661774

775+
Note: The tracking value is updated only after the PIT+search_after run completes, it won't update during the search_after pagination. This is to allow use of slices.
776+
662777
[id="plugins-{type}s-{plugin}-tracking_field_seed"]
663778
===== `tracking_field_seed`
664779

0 commit comments

Comments
 (0)