Skip to content

Introduce cursor tracking akin to jdbc input #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2cab0c4
Add cursor to follow new data on an index through a tracked field
jsvd May 29, 2024
1526e9f
Update lib/logstash/inputs/elasticsearch/cursor_tracker.rb
jsvd May 29, 2024
96d2294
Update lib/logstash/inputs/elasticsearch/cursor_tracker.rb
jsvd Mar 6, 2025
64bba49
add :present injection to safeguard against PIT right-edge inconsiste…
jsvd Mar 14, 2025
50a19b9
revert incorrect changes in aggs
jsvd Mar 14, 2025
5df7291
Revert "revert incorrect changes in aggs"
jsvd Mar 14, 2025
2b922c5
fix tests
jsvd Mar 14, 2025
887463f
add docs
jsvd Mar 14, 2025
30bed1b
document the use case for tailing an index
jsvd Mar 17, 2025
1a307db
minor fixes
jsvd Mar 17, 2025
ba1dff2
fix docs references
jsvd Mar 17, 2025
d598f37
cursor checkpoint after each page
jsvd Mar 17, 2025
2c4af0b
Update lib/logstash/inputs/elasticsearch.rb
jsvd Mar 18, 2025
d44c978
fix get_query_object
jsvd Mar 18, 2025
ceabf2e
Update lib/logstash/inputs/elasticsearch.rb
jsvd Mar 18, 2025
3c32b91
fix log entry
jsvd Mar 18, 2025
92362df
add cursor unit testing
jsvd Mar 18, 2025
eed4295
fix testing
jsvd Mar 18, 2025
f45f271
allow test to work with es-ruby 7 and 8
jsvd Mar 19, 2025
5bb7a4f
Fix ordered list formatting in documentation
jsvd Mar 19, 2025
6217204
store last_run_value in a location namespaced by pipeline id
jsvd Mar 19, 2025
b3d81c9
minor test tweaks
jsvd Mar 19, 2025
713d528
[skip ci] Update docs/index.asciidoc
jsvd Mar 19, 2025
0062138
default seed value to nanosecond epoch
jsvd Mar 20, 2025
7eb964e
improve name of now_in_nanos to now_minus_30s
jsvd Mar 20, 2025
7141ad9
Apply suggestions from code review
jsvd Mar 20, 2025
5ab5546
de-emphasize the cursor docs section
jsvd Mar 20, 2025
8de96d8
Update documentation for tracking field usage
jsvd Mar 24, 2025
67b7926
fix es doc links
jsvd Mar 28, 2025
a19f666
Update docs/index.asciidoc
jsvd Apr 4, 2025
f57b915
Update docs/index.asciidoc
jsvd Apr 4, 2025
13b80b7
Update docs/index.asciidoc
jsvd Apr 4, 2025
e1e85f2
Apply suggestions from code review
jsvd Apr 4, 2025
920e047
Update docs/index.asciidoc
jsvd Apr 4, 2025
8d5f71e
Update docs/index.asciidoc
jsvd Apr 4, 2025
1806d33
Fix formatting error in heading
karenzone Apr 4, 2025
86995a1
[skip ci] bump version to 5.1.0
jsvd Apr 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 168 additions & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ This would create an Elasticsearch query with the following format:
"sort": [ "_doc" ]
}'


[id="plugins-{type}s-{plugin}-scheduling"]
==== Scheduling

Input from this plugin can be scheduled to run periodically according to a specific
Expand Down Expand Up @@ -103,6 +103,123 @@ Common causes are:
- When the hit result contains top-level fields that are {logstash-ref}/processing.html#reserved-fields[reserved in Logstash] but do not have the expected shape. Use the <<plugins-{type}s-{plugin}-target>> directive to avoid conflicts with the top-level namespace.
- When <<plugins-{type}s-{plugin}-docinfo>> is enabled and the docinfo fields cannot be merged into the hit result. Combine <<plugins-{type}s-{plugin}-target>> and <<plugins-{type}s-{plugin}-docinfo_target>> to avoid conflict.

[id="plugins-{type}s-{plugin}-cursor"]
==== Tracking a field's value across runs

NOTE: experimental:[] `tracking_field` and related settings are experimental and subject to change in the future

It is sometimes desirable to track the value of a particular field between two jobs:
* avoid re-processing the entire result set of a long query after an unplanned restart
* only grab new data from an index instead of processing the entire set on each job

For this, the Elasticsearch input plugin provides the <<plugins-{type}s-{plugin}-tracking_field>> and <<plugins-{type}s-{plugin}-tracking_field_seed>> options.
When <<plugins-{type}s-{plugin}-tracking_field>> is set, the plugin will record the value of that field for the last document retrieved in a run into
a file (location defaults to <<plugins-{type}s-{plugin}-last_run_metadata_path>>).

The user can then inject this value in the query using the placeholder `:last_value`. The value will be injected into the query
before execution, and the updated after the query completes if new data was found.

This feature works best when:

. the query sorts by the tracking field;
. the timestamp field is added by {es};
. the field type has enough resolution so that two events are unlikely to have the same value.

It is recommended to use a tracking field whose type is https://www.elastic.co/guide/en/elasticsearch/reference/current/date_nanos.html[date nanoseconds].
If the tracking field is of this data type, an extra placeholder called `:present` can be used to inject the nano-second based value of "now-30s".
This placeholder is useful as the right-hand side of a range filter, allowing the collection of
new data but leaving partially-searcheable bulk request data to the next scheduled job.

Below is a series of steps to help set up the "tailing" of data being written to a set of indices, using a date nanosecond field
added by an Elasticsearch ingest pipeline, and the `tracking_field` capability of this plugin.

. create ingest pipeline that adds Elasticsearch's `_ingest.timestamp` field to the documents as `event.ingested`:

[source, json]
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"script": {
"lang": "painless",
"source": "ctx.putIfAbsent(\"event\", [:]); ctx.event.ingested = metadata().now.format(DateTimeFormatter.ISO_INSTANT);"
}
}
]
}

[start=2]
. create an index mapping where the tracking field is of date nanosecond type and invokes the defined pipeline:

[source, json]
PUT /_template/my_template
{
"index_patterns": ["test-*"],
"settings": {
"index.default_pipeline": "my-pipeline",
},
"mappings": {
"properties": {
"event": {
"properties": {
"ingested": {
"type": "date_nanos",
"format": "strict_date_optional_time_nanos"
}
}
}
}
}
}

[start=3]
. define a query that looks at all data of the indices, sorted by the tracking field, and with a range filter since the last value seen until present:

[source,json]
{
"query": {
"range": {
"event.ingested": {
"gt": ":last_value",
"lt": ":present"
}
}
},
"sort": [
{
"event.ingested": {
"order": "asc",
"format": "strict_date_optional_time_nanos",
"numeric_type": "date_nanos"
}
}
]
}

[start=4]
. configure the Elasticsearch input to query the indices with the query defined above, every minute, and track the `event.ingested` field:

[source, ruby]
input {
elasticsearch {
id => tail_test_index
hosts => [ 'https://..']
api_key => '....'
index => 'test-*'
query => '{ "query": { "range": { "event.ingested": { "gt": ":last_value", "lt": ":present"}}}, "sort": [ { "event.ingested": {"order": "asc", "format": "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" } } ] }'
tracking_field => "[event][ingested]"
slices => 5 # optional use of slices to speed data processing, should be equal to or less than number of primary shards
schedule => '* * * * *' # every minute
schedule_overlap => false # don't accumulate jobs if one takes longer than 1 minute
}
}

With this setup, as new documents are indexed an `test-*` index, the next scheduled run will:

. select all new documents since the last observed value of the tracking field;
. use {ref}/point-in-time-api.html#point-in-time-api[Point in time (PIT)] + {ref}/paginate-search-results.html#search-after[Search after] to paginate through all the data;
. update the value of the field at the end of the pagination.

[id="plugins-{type}s-{plugin}-options"]
==== Elasticsearch Input configuration options

Expand All @@ -126,12 +243,14 @@ Please check out <<plugins-{type}s-{plugin}-obsolete-options>> for details.
| <<plugins-{type}s-{plugin}-ecs_compatibility>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-hosts>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-index>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-last_run_metadata_path>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-proxy>> |<<uri,uri>>|No
| <<plugins-{type}s-{plugin}-query>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations"]`|No
| <<plugins-{type}s-{plugin}-request_timeout_seconds>> | <<number,number>>|No
| <<plugins-{type}s-{plugin}-schedule>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-schedule_overlap>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-scroll>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-search_api>> |<<string,string>>, one of `["auto", "search_after", "scroll"]`|No
| <<plugins-{type}s-{plugin}-size>> |<<number,number>>|No
Expand All @@ -151,6 +270,8 @@ Please check out <<plugins-{type}s-{plugin}-obsolete-options>> for details.
| <<plugins-{type}s-{plugin}-ssl_verification_mode>> |<<string,string>>, one of `["full", "none"]`|No
| <<plugins-{type}s-{plugin}-socket_timeout_seconds>> | <<number,number>>|No
| <<plugins-{type}s-{plugin}-target>> | {logstash-ref}/field-references-deepdive.html[field reference] | No
| <<plugins-{type}s-{plugin}-tracking_field>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-tracking_field_seed>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-retries>> | <<number,number>>|No
| <<plugins-{type}s-{plugin}-user>> |<<string,string>>|No
|=======================================================================
Expand Down Expand Up @@ -330,6 +451,17 @@ Check out {ref}/api-conventions.html#api-multi-index[Multi Indices
documentation] in the Elasticsearch documentation for info on
referencing multiple indices.

[id="plugins-{type}s-{plugin}-last_run_metadata_path"]
===== `last_run_metadata_path`

* Value type is <<string,string>>
* There is no default value for this setting.

The path to store the last observed value of the tracking field, when used.
By default this file is stored as `<path.data>/plugins/inputs/elasticsearch/<pipeline_id>/last_run_value`.

This setting should point to file, not a directory, and Logstash must have read+write access to this file.

[id="plugins-{type}s-{plugin}-password"]
===== `password`

Expand Down Expand Up @@ -410,6 +542,19 @@ for example: "* * * * *" (execute query every minute, on the minute)
There is no schedule by default. If no schedule is given, then the statement is run
exactly once.

[id="plugins-{type}s-{plugin}-schedule_overlap"]
===== `schedule_overlap`

* Value type is <<boolean,boolean>>
* Default value is `true`

Whether to allow queuing of a scheduled run if a run is occurring.
While this is ideal for ensuring a new run happens immediately after the previous on finishes if there
is a lot of work to do, but given the queue is unbounded it may lead to an out of memory over long periods of time
if the queue grows continuously.

When in doubt, set `schedule_overlap` to false (it may become the default value in the future).

[id="plugins-{type}s-{plugin}-scroll"]
===== `scroll`

Expand Down Expand Up @@ -622,6 +767,28 @@ When the `target` is set to a field reference, the `_source` of the hit is place
This option can be useful to avoid populating unknown fields when a downstream schema such as ECS is enforced.
It is also possible to target an entry in the event's metadata, which will be available during event processing but not exported to your outputs (e.g., `target \=> "[@metadata][_source]"`).

[id="plugins-{type}s-{plugin}-tracking_field"]
===== `tracking_field`

* Value type is <<string,string>>
* There is no default value for this setting.

Which field from the last event of a previous run will be used a cursor value for the following run.
The value of this field is injected into each query if the query uses the placeholder `:last_value`.
For the first query after a pipeline is started, the value used is either read from <<plugins-{type}s-{plugin}-last_run_metadata_path>> file,
or taken from <<plugins-{type}s-{plugin}-tracking_field_seed>> setting.

Note: The tracking value is updated after each page is read and at the end of each Point in Time. In case of a crash the last saved value will be used so some duplication of data can occur. For this reason the use of unique document IDs for each event is recommended in the downstream destination.

[id="plugins-{type}s-{plugin}-tracking_field_seed"]
===== `tracking_field_seed`

* Value type is <<string,string>>
* Default value is `"1970-01-01T00:00:00.000000000Z"`

The starting value for the <<plugins-{type}s-{plugin}-tracking_field>> if there is no <<plugins-{type}s-{plugin}-last_run_metadata_path>> already.
This field defaults to the nanosecond precision ISO8601 representation of `epoch`, or "1970-01-01T00:00:00.000000000Z", given nano-second precision timestamps are the
most reliable data format to use for this feature.

[id="plugins-{type}s-{plugin}-user"]
===== `user`
Expand Down
64 changes: 62 additions & 2 deletions lib/logstash/inputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base

require 'logstash/inputs/elasticsearch/paginated_search'
require 'logstash/inputs/elasticsearch/aggregation'
require 'logstash/inputs/elasticsearch/cursor_tracker'

include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
Expand Down Expand Up @@ -124,6 +125,20 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
# by this pipeline input.
config :slices, :validate => :number

# Enable tracking the value of a given field to be used as a cursor
# Main concerns:
# * using anything other than _event.timestamp easily leads to data loss
# * the first "synchronization run can take a long time"
config :tracking_field, :validate => :string

# Define the initial seed value of the tracking_field
config :tracking_field_seed, :validate => :string, :default => "1970-01-01T00:00:00.000000000Z"

# The location of where the tracking field value will be stored
# The value is persisted after each scheduled run (and not per result)
# If it's not set it defaults to '${path.data}/plugins/inputs/elasticsearch/<pipeline_id>/last_run_value'
config :last_run_metadata_path, :validate => :string

# If set, include Elasticsearch document information such as index, type, and
# the id in the event.
#
Expand Down Expand Up @@ -250,6 +265,10 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
# exactly once.
config :schedule, :validate => :string

# Allow scheduled runs to overlap (enabled by default). Setting to false will
# only start a new scheduled run after the previous one completes.
config :schedule_overlap, :validate => :boolean

# If set, the _source of each hit will be added nested under the target instead of at the top-level
config :target, :validate => :field_reference

Expand Down Expand Up @@ -328,16 +347,30 @@ def register

setup_query_executor

setup_cursor_tracker

@client
end

def run(output_queue)
if @schedule
scheduler.cron(@schedule) { @query_executor.do_run(output_queue) }
scheduler.cron(@schedule, :overlap => @schedule_overlap) do
@query_executor.do_run(output_queue, get_query_object())
end
scheduler.join
else
@query_executor.do_run(output_queue)
@query_executor.do_run(output_queue, get_query_object())
end
end

def get_query_object
if @cursor_tracker
query = @cursor_tracker.inject_cursor(@query)
@logger.debug("new query is #{query}")
else
query = @query
end
LogStash::Json.load(query)
end

##
Expand All @@ -347,6 +380,11 @@ def push_hit(hit, output_queue, root_field = '_source')
event = event_from_hit(hit, root_field)
decorate(event)
output_queue << event
record_last_value(event)
end

def record_last_value(event)
@cursor_tracker.record_last_value(event) if @tracking_field
end

def event_from_hit(hit, root_field)
Expand Down Expand Up @@ -640,6 +678,28 @@ def setup_query_executor
end
end

def setup_cursor_tracker
return unless @tracking_field
return unless @query_executor.is_a?(LogStash::Inputs::Elasticsearch::SearchAfter)

if @resolved_search_api != "search_after" || @response_type != "hits"
raise ConfigurationError.new("The `tracking_field` feature can only be used with `search_after` non-aggregation queries")
end

@cursor_tracker = CursorTracker.new(last_run_metadata_path: last_run_metadata_path,
tracking_field: @tracking_field,
tracking_field_seed: @tracking_field_seed)
@query_executor.cursor_tracker = @cursor_tracker
end

def last_run_metadata_path
return @last_run_metadata_path if @last_run_metadata_path

last_run_metadata_path = ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "elasticsearch", pipeline_id, "last_run_value")
FileUtils.mkdir_p ::File.dirname(last_run_metadata_path)
last_run_metadata_path
end

def get_transport_client_class
# LS-core includes `elasticsearch` gem. The gem is composed of two separate gems: `elasticsearch-api` and `elasticsearch-transport`
# And now `elasticsearch-transport` is old, instead we have `elastic-transport`.
Expand Down
19 changes: 11 additions & 8 deletions lib/logstash/inputs/elasticsearch/aggregation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,9 @@ def initialize(client, plugin)
@client = client
@plugin_params = plugin.params

@index = @plugin_params["index"]
@size = @plugin_params["size"]
@query = @plugin_params["query"]
@retries = @plugin_params["retries"]
@agg_options = {
:index => @plugin_params["index"],
:size => 0
}.merge(:body => @query)

@plugin = plugin
end

Expand All @@ -33,10 +28,18 @@ def retryable(job_name, &block)
false
end

def do_run(output_queue)
def aggregation_options(query_object)
{
:index => @index,
:size => 0,
:body => query_object
}
end

def do_run(output_queue, query_object)
logger.info("Aggregation starting")
r = retryable(AGGREGATION_JOB) do
@client.search(@agg_options)
@client.search(aggregation_options(query_object))
end
@plugin.push_hit(r, output_queue, 'aggregations') if r
end
Expand Down
Loading