Skip to content

Introduce cursor tracking akin to jdbc input #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Apr 7, 2025
Merged

Conversation

jsvd
Copy link
Member

@jsvd jsvd commented May 29, 2024

With this change it's possible to configure a "tracking_field" whose value is injected into the query using the :last_value keyword, and a :present field that represents now - 30s.

Example configuration:

input {
  elasticsearch {
    id => tail
    hosts => [ 'https://host...']
    api_key => "..."
    index => "my_data"
    query => '{ "query": { "range": { "event.ingested": { "gt": ":last_value", "lt": ":present"}}}, "sort": [ { "event.ingested": {"order": "asc", "format": "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" } } ] }'
    tracking_field => "[event][ingested]"
    tracking_field_seed => "1980-01-01T23:59:59.999999999Z"
    slices => 5
    schedule => '* * * * *'
    schedule_overlap => false
  }
}
output {
  elasticsearch { .. }
}

To test this, follow the guidance on the doc changes from this PR on how to set up Elasticsearch. Or you can use a script that prepares the index/ingest_pipeline and writes documents to ES like https://gist.github.com/jsvd/6ea96197078d906362227494e0c542dc

resolves #184
resolves #93

@jsvd jsvd changed the title move cursor tracking to separate class Introduce cursor tracking akin to jdbc input May 29, 2024
@JAndritsch
Copy link
Contributor

JAndritsch commented Mar 1, 2025

I haven't had a chance to test this extensively yet, but after looking at the code and deploying the plugin for a few brief tests, I have some thoughts/observations.

Before I get to that, I'd like to provide some context as to how I'm achieving similar behavior that ensures "at least once" delivery of documents without using a cursor or sincedb-like mechanism. This will be helpful for comparing that solution to the one in this PR.

Context

My organization uses Logstash to ship logs and metrics from one Elastic Stack to another. We handle this by using the Elasticsearch input to periodically fetch documents from relevant indices and forward them to another destination over a variety of outputs.

To avoid resending the same docs over and over, we add a tracking field (a simple boolean type) to each document that passes through the pipeline, then save the modified document back to Elasticsearch once the pipeline's other outputs succeed. The query powering the Elasticsearch input has criteria that excludes documents where that tracking field exists.

This has worked pretty reliably so far. It also ensures retries, as documents that were not sent successfully will be picked up on subsequent iterations of the scheduled input until the tracking field exists in their source. However, it means we have to perform lots of writes back to the index. This is not ideal.

Observations

Here are some of my initial thoughts after looking at this PR. I may come back and edit this with additional findings/thoughts so that they're consolidated in one comment rather than spread out across several.

Picking the right tracking field

  • All documents in the queried indices need to have the tracking field present in its source for this to work. This means there needs to be a process at ingestion time that ensures a non-blank tracking field exists for every document. This probably needs to be stated verbosely somewhere in the documentation for this plugin.
  • Any documents that lack the tracking field at the time this plugin is used will either need to be modified to include that field or somehow excluded from the query by other means. The plugin supports a tracking_field_seed which can help account for starting at a later point in the stream, but if there are documents which have already been processed by some forwarder pipeline and do not have this tracking field set then you'll need a separate thing to exclude them.
    • For my case, I'd probably need to make sure that my query criteria continues to exclude documents with my former boolean tracking field in combination with using the cursor tracking. This isn't necessarily an issue but was worth calling out in case there are others who might be in the same state as me.
  • @timestamp may not be reliable if historical documents are indexed with a value that occurs before the tracking field's most recently recorded value. Using event.ingested is probably the better choice.
  • Tracking only works with time-parsable fields, which makes sense.

Delivery guarantees and edge cases

  • There's no way to recover a missed document once tracking position has moved beyond it. You can maybe try modifying the cached last value by hand, but that would be a hacky workaround. This would also mean the cursor will pick up duplicate documents on its next tick.
    • This is in contrast to my current solution, which accounts for this by saving tracking status on each document. In this scenario, it doesn't matter when the event occurred, time-wise. The absence of the tracking field means the query will find it on next run.
  • Cursor tracking is updated once the event is added to the output queue, and then the input assumes it'll be handled from there and shouldn't find that same event in future iterations. If the event fails at the output, future invocations of the query will not pick it up again as tracking state was already recorded by this point. This means the output needs to be properly configured for retries, persistence, and/or a DLQ. That's something that could probably be included in the documentation for this plugin as just a reminder to those wanting to use it.
    • Alternatively, it might be interesting if we could control when the tracking data is updated. I think that would require a new output plugin to go along with this one, where users need to explicitly invoke a plugin at the output stage that updates the tracking state only after all of the pipeline's other outputs have succeeded. This would help ensure a failure to output a batch of events results in those events being picked up on the next scheduled query. Although I'd imagine this gets tricky when considering how the scheduling of the input works in relation to what I'm suggesting.
  • Should :schedule_overlap be forced to true when using a tracking field? Otherwise I'm not sure what behavior we'll get if we allow schedules to overlap and change the cursor value.
  • What happens if I change the tracking field in my input after a previous one has been in use? For example, maybe I start with event.ingested for the logs-* index but then later decide I'd rather use some custom timestamp field I'm adding. I can reset state by configuring the input to use a new file path for tracking, but then I won't have a way to ensure documents previously sent based on event.ingested won't be re-queried. I'd have to somehow include that last tracking field's value in my query to exclude those documents from being found again.

Errors

  • (I added a suggestion that fixes this): It appears the inject_cursor method is throwing an error for a pipeline of mine. Seems to be caused by passing a Timestamp into gsub without an explicit conversion to a string. I've configured the tracking_field to be @timestamp. When the query runs, here's a snippet of the stack trace I see:
{
    "exception": "TypeError",
    "message": "no implicit conversion of LogStash::Timestamp into String",
    "backtrace": [
        "org/jruby/RubyString.java:3219:in `gsub'",
        "/usr/share/logstash/vendor/local_gems/0bcf64c0/logstash-input-elasticsearch-5.0.1/lib/logstash/inputs/elasticsearch/cursor_tracker.rb:52:in `inject_cursor'",
        "/usr/share/logstash/vendor/local_gems/0bcf64c0/logstash-input-elasticsearch-5.0.1/lib/logstash/inputs/elasticsearch.rb:375:in `get_query_object'",
        "/usr/share/logstash/vendor/local_gems/0bcf64c0/logstash-input-elasticsearch-5.0.1/lib/logstash/inputs/elasticsearch.rb:364:in `block in run'",
        "/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler/jobs_core.rb:241:in `block in do_call'",
        "/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler.rb:130:in `around_trigger'",
        "/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler/jobs_core.rb:240:in `do_call'",
        "/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler/jobs_core.rb:288:in `trigger_now'",
        "/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler/jobs_core.rb:339:in `block in start_work_thread'",
        "/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler/jobs_core.rb:342:in `block in start_work_thread'",
        "org/jruby/RubyKernel.java:1722:in `loop'",
        "/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler/jobs_core.rb:325:in `block in start_work_thread'"
    ]
}

@strawgate
Copy link

strawgate commented Mar 2, 2025

Wow! Thanks for taking the time to provide some great feedback, we really appreciate it!

Picking the right tracking field

  • All documents in the queried indices need to have the tracking field present in its source for this to work. This means there needs to be a process at ingestion time that ensures a non-blank tracking field exists for every document. This probably needs to be stated verbosely somewhere in the documentation for this plugin.
  • Any documents that lack the tracking field at the time this plugin is used will either need to be modified to include that field or somehow excluded from the query by other means. The plugin supports a tracking_field_seed which can help account for starting at a later point in the stream, but if there are documents which have already been processed by some forwarder pipeline and do not have this tracking field set then you'll need a separate thing to exclude them.

I think about this as two things: backfilling historical data and streaming new data. Both are solved with the cursor but in different ways:

  1. For the streaming use-case we want a field like event.ingested and the moment we setup the ingest pipeline to tag the documents, our streaming use-case is good to go.
  2. For the backfill use-case we can, surprisingly, just use the @timestamp field! We just need to exclude any documents handled by our streaming use-case with: not exists event.ingested. The disadvantages of using the @timestamp field disappear when we are no longer trying to use it to query newly inserted documents.

So we have one elasticsearch input (cursor < event.ingested) to grab all new documents, which all have event.ingested and another input (cursor < timestamp and not exists event.ingested) targeting@timestamp and filtering out documents with event.ingested to grab all historical documents:

input {
  elasticsearch {
    id => streaming
    query => '{ "query": { "range": {"event.ingested": { "gt": ":last_value", "lte": "now"}}}, "sort": [ { "event.ingested": "asc" } ] }'
    tracking_field => "[event][ingested]"
    slices => 2
    schedule => '*/30 * * * * *'
  }
  elasticsearch {
    id => backfill
    query => '{ "query": {"query":{"bool":{"must_not":[{"exists":{"field":"event.ingested"}}],"must":[],"filter":[{"range":{"@timestamp":{"gte":":last_value"}}}],"should":[]}}}, "sort": [ { "@timestamp": "asc" } ] }'
    tracking_field => "@timestamp"
    slices => 2
    schedule => '*/30 * * * * *'
  }
}

Once you have completed your backfill, you can remove the backfill input, leaving just the streaming input.

So now let's imagine you've built a fantastic alternative to the cursor (like your reporting.transferred: true), but you want to switch to our input cursors. You want to exclude any documents that don't have reporting.transferred. Well, you simply add the event.ingested ingest pipeline and add not exists reporting.transferred to both queries:

  1. Adding it to the @timestamp input ensures the backfill process does not transfer anything that was already transferred. It also covers documents in that awkward window between when the reporting.transferred: true step was removed and the event.ingested was added.
  2. Adding it to the event.ingested input ensures that the streaming process does not transfer any data that was already transferred. It specifically ensures that any documents that got ingested after event.ingested was added but before the reporting.transferred: true step was removed do not get sent teice.

Delivery guarantees and edge cases

  • There's no way to recover a missed document once tracking position has moved beyond it. You can maybe try modifying the cached last value by hand, but that would be a hacky workaround. This would also mean the cursor will pick up duplicate documents on its next tick.
    • This is in contrast to my current solution, which accounts for this by saving tracking status on each document. In this scenario, it doesn't matter when the event occurred, time-wise. The absence of the tracking field means the query will find it on next run.
  • Cursor tracking is updated once the event is added to the output queue, and then the input assumes it'll be handled from there and shouldn't find that same event in future iterations. If the event fails at the output, future invocations of the query will not pick it up again as tracking state was already recorded by this point. This means the output needs to be properly configured for retries, persistence, and/or a DLQ. That's something that could probably be included in the documentation for this plugin as just a reminder to those wanting to use it.

This is actually also the case when using a boolean field on the documents. Because you need to use two outputs (1 to rewrite the document and 1 to send data to the remote destination), and that is not atomic (it's not even ordered) -- if the write to your destination fails, your write to the local system will almost certainly succeed. In this case, you have marked the documents as forwarded while actual forwarding has failed. If you're using a memory queue and Logstash restarts while the remote output is failing, you will have documents marked as transferred that were actually still being retried by the remote output.

Ultimately, for both cases, if the destination output fails entirely, the default configuration is that the output retries indefinitely. For both cases, using a persistent queue ensures that an unexpected restart of Logstash or a force shutdown of a pipeline while the output is failing doesn't result in data loss.

For both cases, capturing document-level failures requires a DLQ, and in both cases the tracking mechanism will consider documents in the DLQ as having been forwarded.

But I believe there's actually a big advantage to the input cursor once you reach steady state and are no longer pulling 10s of thousands of documents per query. In this case, when the output is blocked, the input is allowed to keep pushing events into the queue until the queue fills and that then blocks the input. With the cursor, the timestamp increments at the end of each query, until the queue fills up and then blocks, each batch pushed into the queue is new documents.

With the reporting.transferred solution, when the destination output blocks, the reporting.transferred output also blocks. With the reporting.transferred output blocked, each subsequent invocation of the ES query will return the same documents that are already in the queue, pushing the same documents into the queue over and over again until it fills up.

  • Should :schedule_overlap be forced to true when using a tracking field? Otherwise I'm not sure what behavior we'll get if we allow schedules to overlap and change the cursor value.

A great question for @jsvd for sure. My read is that either true or false should be fine with them leading to slightly different, but acceptable, behavior -- it looks like we force the max worker threads to 1 for each instance of the scheduler. So the scheduler will only ever run 1 thread at a time. When overlap is true, jobs will continue to schedule into the work queue when the worker thread is running. When overlap is false, jobs will not schedule into the work queue while the work thread is running. I think the only real impact this has is that if your cron job is set to every 10 minutes, and the job takes 15 minutes, when you have overlap: false, it will take 5 more minutes before the next job runs, with overlap:true the next job will run immediately.

  • What happens if I change the tracking field in my input after a previous one has been in use? For example, maybe I start with event.ingested for the logs-* index but then later decide I'd rather use some custom timestamp field I'm adding. I can reset state by configuring the input to use a new file path for tracking, but then I won't have a way to ensure documents previously sent based on event.ingested won't be re-queried. I'd have to somehow include that last tracking field's value in my query to exclude those documents from being found again.

You could setup a third input, no need to reset state:

  1. (backfill) cursor 1 < timestamp if (event.ingested does not exist and event.new_ingested does not exist)
  2. (backfill) cursor 2 < event.ingested (if event.new_ingested does not exist)
  3. (streaming) cursor 3 < event.new_ingested

Once the "backfill" inputs have completed, you can safely remove them (or just leave them).

@strawgate
Copy link

strawgate commented Mar 2, 2025

I did see the PIT docs recommend "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" on the sort to ensure that sorting treats date and date_nanos the same.

I think we have maybe two options for dealing with the long scroll problem --

The first option would be to limit slices to one -- in this case it looks like we could persist the cursor to the value of the second biggest timestamp at the end of each page? And the max timestamp at the end of each scroll?

Perhaps later we could add support for staggered check pointing across multiple slices?

The second option would be to encourage users to use date math, perhaps we provide next_value as well? We'd need something like cursor.increment_on_empty_results and maybe cursor.next_value_limit which could be used to ensure that you're not trying to pull a timestamp where data could still be inserted into?

@JAndritsch
Copy link
Contributor

Hey @strawgate, thanks for jumping in with a super detailed response!

I think about this as two things: backfilling historical data and streaming new data. Both are solved with the cursor but in different ways...

Backfill versus Streaming is a good distinction to point out. I had been thinking about how I might solve these problems with a single input, and using two of them with different query criteria hadn't crossed my mind. But that does seem like the right way to go about it.

So now let's imagine you've built a fantastic alternative to the cursor (like your reporting.transferred: true), but you want to switch to our input cursors. You want to exclude any documents that don't have reporting.transferred. Well, you simply add the event.ingested ingest pipeline and add not exists reporting.transferred to both queries...

Yeah, that makes a lot of sense and I believe addresses a hypothetical scenario where someone using this cursor tracking feature starts out with event.ingested but then later decides to move to a different field for tracking. They'll need to maintain a backwards-compatible input that knows to exclude pulling documents that may overlap between the new input.

Quoting myself here...

Should :schedule_overlap be forced to true when using a tracking field? Otherwise I'm not sure what behavior we'll get if we allow schedules to overlap and change the cursor value.

I meant forced to false. Must not have had enough coffee when I initially typed the question XD.

Maybe it doesn't matter given the outcome isn't really that impactful as you had suggested.

This is actually also the case when using a boolean field on the documents. Because you need to use two outputs (1 to rewrite the document and 1 to send data to the remote destination), and that is not atomic (it's not even ordered) -- if the write to your destination fails, your write to the local system will almost certainly succeed. In this case, you have marked the documents as forwarded while actual forwarding has failed. If you're using a memory queue and Logstash restarts while the remote output is failing, you will have documents marked as transferred that were actually still being retried by the remote output.

Ultimately, for both cases, if the destination output fails entirely, the default configuration is that the output retries
indefinitely. For both cases, using a persistent queue ensures that an unexpected restart of Logstash or a force shutdown of a pipeline while the output is failing doesn't result in data loss.

For both cases, capturing document-level failures requires a DLQ, and in both cases the tracking mechanism will consider documents in the DLQ as having been forwarded.

But I believe there's actually a big advantage to the input cursor once you reach steady state and are no longer pulling 10s of thousands of documents per query. In this case, when the output is blocked, the input is allowed to keep pushing events into the queue until the queue fills and that then blocks the input. With the cursor, the timestamp increments at the end of each query, until the queue fills up and then blocks, each batch pushed into the queue is new documents.

With the reporting.transferred solution, when the destination output blocks, the reporting.transferred output also blocks. With the reporting.transferred output blocked, each subsequent invocation of the ES query will return the same documents that are already in the queue, pushing the same documents into the queue over and over again until it fills up.

I've been mulling over your feedback and thinking about how cursor tracking would work in comparison to what I have now in a few scenarios. I think most of my concerns around using a cursor would be addressed by switching from an in-memory queue to a persistent queue + DLQ.

Using a tracking field on a document in Elasticsearch sort of emulates the idea of a persistent queue state, where under certain circumstances like a pipeline reload or Logstash restart, the document doesn't get marked as delivered and thereby allows it to be re-queried on the next input tick. But if I switched to a persistent queue then that gives me the same thing, without the added cost of having to write state back to documents in bulk. And the fact that the input wouldn't be able to queue up duplicate events that are currently in the queue or being worked on is a huge win for many reasons.

I'm going to follow your suggestion of setting up multiple inputs to handle the backfill versus streaming scenarios and post an update once I've had a chance to see it work.

Thanks again!

@strawgate
Copy link

@JAndritsch I shared a tool with you where you can setup various scenarios and measure data loss. I added your scenarios so you can play around with it. It's actually quite difficult to lose data with the cursor + memory queue for the backfill scenario as currently the whole read is 1 scroll, and it only updates the cursor after the whole read. I think you'll be surprised by how much data is lost when using the reporting.transferred method + a MQ

@JAndritsch
Copy link
Contributor

@strawgate That's awesome. Thank you for setting that up! I'll try to find some time to play around with it and share my thoughts.

@JAndritsch
Copy link
Contributor

@strawgate Finally got a chance to play with the tool and added some new scenarios.

You were right -- it's very surprising to see the cursor results in better data reliability even without a persistent queue. There's more I need to dig into here to better understand why.

@jsvd
Copy link
Member Author

jsvd commented Mar 17, 2025

I've done a few modifications:

I've introduced :present as a second placeholder to pin down the concept of now-30s in the initial PIT+search and all the subsequent search_after queries within the same PIT.
Using a range query of :last_value up to :present ensures range filter is cached during all queries within the same job run, and avoids data loss at the edge of the PIT timestamps.

This significantly improves the tailing of an index use case, I was able to consistently tail an index being written to, using slices, without any data loss.

For catch-up use cases where it is known data already exists in the index, a range such as :last_value to :last_value||1w can be used instead, combined with a second range filter to query up to a known boundary (e.g. up to January 31st).

I have also added a documentation section describing the ideal setup for tailing a file, including ingest pipeline, template mapping definitions and sample configuration: https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205/files#diff-cae5619b3d18ec99c5ccd0a9f6de0c6d3f53343c64692444551a7d29da6863e7R53

@jsvd jsvd marked this pull request as ready for review March 17, 2025 14:26
@jsvd jsvd closed this Mar 17, 2025
@jsvd jsvd reopened this Mar 17, 2025
@JAndritsch
Copy link
Contributor

Thanks, @jsvd! I'll find a bit of time this week to test out the latest updates and report back on how that goes.

@jsvd
Copy link
Member Author

jsvd commented Mar 17, 2025

A few more modifications are in place: now the cursor is persisted per search + per slice and at the end of the PIT.

So slices is set to 5 and there's a PIT open followed by 10 queries per slice, there will be 50 checkpoints and a final one when closing the PIT.

This means, in a backfilling use case, termination of Logstash will not require processing of the entire original date range.

Also we should note that in this example the first 50 checkpoints the persisted value corresponds to the earliest value between all slices, and in the last checkpoint it is the latest value that is persisted.

This ensures that if Logstash is restarted between PITs there won't be duplicate events, but that a minimal amount of duplicates can happen if the termination occurs while the PIT is open and queries are being executed.

Copy link
Contributor

@robbavey robbavey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass through - great stuff

@jsvd jsvd requested a review from robbavey March 20, 2025 10:49
Copy link
Contributor

@robbavey robbavey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor doc suggestions, but other than that LGTM

@jsvd jsvd requested a review from karenzone March 26, 2025 13:27
Copy link
Contributor

@karenzone karenzone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work on these docs! The config samples are especially helpful.
I left some suggestions inline for consideration, mostly around style, capitalization, indents, etc. Please use what you like.

Ping me when you're ready for final check. I'll update the content in this test PR to make sure everything checks out.

jsvd and others added 5 commits April 4, 2025 11:14
Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
@jsvd jsvd requested a review from karenzone April 4, 2025 12:39
Copy link
Contributor

@karenzone karenzone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd jsvd merged commit d9bf375 into logstash-plugins:main Apr 7, 2025
1 check passed
jsvd added a commit to jsvd/logstash-input-elasticsearch that referenced this pull request Apr 7, 2025
Provide field value tracking, persisted to disk on each search_after page.

Adds `:last_value` and `:present` placeholders, allowing the plugin to inject the cursor value and now-30 seconds, respectively, in the query string.

Useful to track new data being written to an index or series of indices.

Works best with nano second precision timestamps added by Elasticsearch's Ingest Pipelines.

---------

Co-authored-by: Joel Andritsch <joel.andritsch@gmail.com>
Co-authored-by: Rob Bavey <rob.bavey@elastic.co>
Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
@jsvd jsvd deleted the cursor branch April 7, 2025 12:31
jsvd added a commit that referenced this pull request Apr 7, 2025
Provide field value tracking, persisted to disk on each search_after page.

Adds `:last_value` and `:present` placeholders, allowing the plugin to inject the cursor value and now-30 seconds, respectively, in the query string.

Useful to track new data being written to an index or series of indices.

Works best with nano second precision timestamps added by Elasticsearch's Ingest Pipelines.

---------

Co-authored-by: Joel Andritsch <joel.andritsch@gmail.com>
Co-authored-by: Rob Bavey <rob.bavey@elastic.co>
Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add a "sincedb" type of mecanism [feature] Support configurations so it behaves like JDBC input
6 participants