-
Notifications
You must be signed in to change notification settings - Fork 84
Introduce cursor tracking akin to jdbc input #205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
I haven't had a chance to test this extensively yet, but after looking at the code and deploying the plugin for a few brief tests, I have some thoughts/observations. Before I get to that, I'd like to provide some context as to how I'm achieving similar behavior that ensures "at least once" delivery of documents without using a cursor or sincedb-like mechanism. This will be helpful for comparing that solution to the one in this PR. ContextMy organization uses Logstash to ship logs and metrics from one Elastic Stack to another. We handle this by using the Elasticsearch input to periodically fetch documents from relevant indices and forward them to another destination over a variety of outputs. To avoid resending the same docs over and over, we add a tracking field (a simple boolean type) to each document that passes through the pipeline, then save the modified document back to Elasticsearch once the pipeline's other outputs succeed. The query powering the Elasticsearch input has criteria that excludes documents where that tracking field exists. This has worked pretty reliably so far. It also ensures retries, as documents that were not sent successfully will be picked up on subsequent iterations of the scheduled input until the tracking field exists in their source. However, it means we have to perform lots of writes back to the index. This is not ideal. ObservationsHere are some of my initial thoughts after looking at this PR. I may come back and edit this with additional findings/thoughts so that they're consolidated in one comment rather than spread out across several. Picking the right tracking field
Delivery guarantees and edge cases
Errors
{
"exception": "TypeError",
"message": "no implicit conversion of LogStash::Timestamp into String",
"backtrace": [
"org/jruby/RubyString.java:3219:in `gsub'",
"/usr/share/logstash/vendor/local_gems/0bcf64c0/logstash-input-elasticsearch-5.0.1/lib/logstash/inputs/elasticsearch/cursor_tracker.rb:52:in `inject_cursor'",
"/usr/share/logstash/vendor/local_gems/0bcf64c0/logstash-input-elasticsearch-5.0.1/lib/logstash/inputs/elasticsearch.rb:375:in `get_query_object'",
"/usr/share/logstash/vendor/local_gems/0bcf64c0/logstash-input-elasticsearch-5.0.1/lib/logstash/inputs/elasticsearch.rb:364:in `block in run'",
"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler/jobs_core.rb:241:in `block in do_call'",
"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler.rb:130:in `around_trigger'",
"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler/jobs_core.rb:240:in `do_call'",
"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler/jobs_core.rb:288:in `trigger_now'",
"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler/jobs_core.rb:339:in `block in start_work_thread'",
"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler/jobs_core.rb:342:in `block in start_work_thread'",
"org/jruby/RubyKernel.java:1722:in `loop'",
"/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/rufus-scheduler-3.9.1/lib/rufus/scheduler/jobs_core.rb:325:in `block in start_work_thread'"
]
} |
Wow! Thanks for taking the time to provide some great feedback, we really appreciate it!
I think about this as two things: backfilling historical data and streaming new data. Both are solved with the cursor but in different ways:
So we have one elasticsearch input (
Once you have completed your backfill, you can remove the backfill input, leaving just the streaming input. So now let's imagine you've built a fantastic alternative to the cursor (like your
This is actually also the case when using a boolean field on the documents. Because you need to use two outputs (1 to rewrite the document and 1 to send data to the remote destination), and that is not atomic (it's not even ordered) -- if the write to your destination fails, your write to the local system will almost certainly succeed. In this case, you have marked the documents as forwarded while actual forwarding has failed. If you're using a memory queue and Logstash restarts while the remote output is failing, you will have documents marked as transferred that were actually still being retried by the remote output. Ultimately, for both cases, if the destination output fails entirely, the default configuration is that the output retries indefinitely. For both cases, using a persistent queue ensures that an unexpected restart of Logstash or a force shutdown of a pipeline while the output is failing doesn't result in data loss. For both cases, capturing document-level failures requires a DLQ, and in both cases the tracking mechanism will consider documents in the DLQ as having been forwarded. But I believe there's actually a big advantage to the input cursor once you reach steady state and are no longer pulling 10s of thousands of documents per query. In this case, when the output is blocked, the input is allowed to keep pushing events into the queue until the queue fills and that then blocks the input. With the cursor, the timestamp increments at the end of each query, until the queue fills up and then blocks, each batch pushed into the queue is new documents. With the
A great question for @jsvd for sure. My read is that either
You could setup a third input, no need to reset state:
Once the "backfill" inputs have completed, you can safely remove them (or just leave them). |
I did see the PIT docs recommend I think we have maybe two options for dealing with the long scroll problem -- The first option would be to limit slices to one -- in this case it looks like we could persist the cursor to the value of the second biggest timestamp at the end of each page? And the max timestamp at the end of each scroll? Perhaps later we could add support for staggered check pointing across multiple slices? The second option would be to encourage users to use date math, perhaps we provide |
Hey @strawgate, thanks for jumping in with a super detailed response!
Backfill versus Streaming is a good distinction to point out. I had been thinking about how I might solve these problems with a single input, and using two of them with different query criteria hadn't crossed my mind. But that does seem like the right way to go about it.
Yeah, that makes a lot of sense and I believe addresses a hypothetical scenario where someone using this cursor tracking feature starts out with Quoting myself here...
I meant forced to Maybe it doesn't matter given the outcome isn't really that impactful as you had suggested.
I've been mulling over your feedback and thinking about how cursor tracking would work in comparison to what I have now in a few scenarios. I think most of my concerns around using a cursor would be addressed by switching from an in-memory queue to a persistent queue + DLQ. Using a tracking field on a document in Elasticsearch sort of emulates the idea of a persistent queue state, where under certain circumstances like a pipeline reload or Logstash restart, the document doesn't get marked as delivered and thereby allows it to be re-queried on the next input tick. But if I switched to a persistent queue then that gives me the same thing, without the added cost of having to write state back to documents in bulk. And the fact that the input wouldn't be able to queue up duplicate events that are currently in the queue or being worked on is a huge win for many reasons. I'm going to follow your suggestion of setting up multiple inputs to handle the backfill versus streaming scenarios and post an update once I've had a chance to see it work. Thanks again! |
@JAndritsch I shared a tool with you where you can setup various scenarios and measure data loss. I added your scenarios so you can play around with it. It's actually quite difficult to lose data with the cursor + memory queue for the backfill scenario as currently the whole read is 1 scroll, and it only updates the cursor after the whole read. I think you'll be surprised by how much data is lost when using the |
@strawgate That's awesome. Thank you for setting that up! I'll try to find some time to play around with it and share my thoughts. |
@strawgate Finally got a chance to play with the tool and added some new scenarios. You were right -- it's very surprising to see the cursor results in better data reliability even without a persistent queue. There's more I need to dig into here to better understand why. |
I've done a few modifications: I've introduced This significantly improves the tailing of an index use case, I was able to consistently tail an index being written to, using slices, without any data loss. For catch-up use cases where it is known data already exists in the index, a range such as I have also added a documentation section describing the ideal setup for tailing a file, including ingest pipeline, template mapping definitions and sample configuration: https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205/files#diff-cae5619b3d18ec99c5ccd0a9f6de0c6d3f53343c64692444551a7d29da6863e7R53 |
Thanks, @jsvd! I'll find a bit of time this week to test out the latest updates and report back on how that goes. |
A few more modifications are in place: now the cursor is persisted per search + per slice and at the end of the PIT. So slices is set to 5 and there's a PIT open followed by 10 queries per slice, there will be 50 checkpoints and a final one when closing the PIT. This means, in a backfilling use case, termination of Logstash will not require processing of the entire original date range. Also we should note that in this example the first 50 checkpoints the persisted value corresponds to the earliest value between all slices, and in the last checkpoint it is the latest value that is persisted. This ensures that if Logstash is restarted between PITs there won't be duplicate events, but that a minimal amount of duplicates can happen if the termination occurs while the PIT is open and queries are being executed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass through - great stuff
Co-authored-by: Rob Bavey <rob.bavey@elastic.co>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor doc suggestions, but other than that LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work on these docs! The config samples are especially helpful.
I left some suggestions inline for consideration, mostly around style, capitalization, indents, etc. Please use what you like.
Ping me when you're ready for final check. I'll update the content in this test PR to make sure everything checks out.
Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! 🚀
Mocked up preview is here: https://logstash-docs_bk_1828.docs-preview.app.elstc.co/guide/en/logstash/8.x/plugins-inputs-elasticsearch.html
Provide field value tracking, persisted to disk on each search_after page. Adds `:last_value` and `:present` placeholders, allowing the plugin to inject the cursor value and now-30 seconds, respectively, in the query string. Useful to track new data being written to an index or series of indices. Works best with nano second precision timestamps added by Elasticsearch's Ingest Pipelines. --------- Co-authored-by: Joel Andritsch <joel.andritsch@gmail.com> Co-authored-by: Rob Bavey <rob.bavey@elastic.co> Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
Provide field value tracking, persisted to disk on each search_after page. Adds `:last_value` and `:present` placeholders, allowing the plugin to inject the cursor value and now-30 seconds, respectively, in the query string. Useful to track new data being written to an index or series of indices. Works best with nano second precision timestamps added by Elasticsearch's Ingest Pipelines. --------- Co-authored-by: Joel Andritsch <joel.andritsch@gmail.com> Co-authored-by: Rob Bavey <rob.bavey@elastic.co> Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com>
With this change it's possible to configure a "tracking_field" whose value is injected into the query using the
:last_value
keyword, and a:present
field that representsnow - 30s
.Example configuration:
To test this, follow the guidance on the doc changes from this PR on how to set up Elasticsearch. Or you can use a script that prepares the index/ingest_pipeline and writes documents to ES like https://gist.github.com/jsvd/6ea96197078d906362227494e0c542dc
resolves #184
resolves #93