diff --git a/CHANGELOG.md b/CHANGELOG.md index bc00c0ce..f4b4dc23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 5.1.0 + - Add "cursor"-like index tracking [#205](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205) + ## 5.0.2 - Add elastic-transport client support used in elasticsearch-ruby 8.x [#223](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/223) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index f17c92e3..6478dff5 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -48,7 +48,7 @@ This would create an Elasticsearch query with the following format: "sort": [ "_doc" ] }' - +[id="plugins-{type}s-{plugin}-scheduling"] ==== Scheduling Input from this plugin can be scheduled to run periodically according to a specific @@ -103,6 +103,133 @@ Common causes are: - When the hit result contains top-level fields that are {logstash-ref}/processing.html#reserved-fields[reserved in Logstash] but do not have the expected shape. Use the <<plugins-{type}s-{plugin}-target>> directive to avoid conflicts with the top-level namespace. - When <<plugins-{type}s-{plugin}-docinfo>> is enabled and the docinfo fields cannot be merged into the hit result. Combine <<plugins-{type}s-{plugin}-target>> and <<plugins-{type}s-{plugin}-docinfo_target>> to avoid conflict. +[id="plugins-{type}s-{plugin}-cursor"] +==== Tracking a field's value across runs + +.Technical Preview: Tracking a field's value +**** +The feature that allows tracking a field's value across runs is in _Technical Preview_. +Configuration options and implementation details are subject to change in minor releases without being preceded by deprecation warnings. +**** + +Some uses cases require tracking the value of a particular field between two jobs. +Examples include: + +* avoiding the need to re-process the entire result set of a long query after an unplanned restart +* grabbing only new data from an index instead of processing the entire set on each job. + +The Elasticsearch input plugin provides the <<plugins-{type}s-{plugin}-tracking_field>> and <<plugins-{type}s-{plugin}-tracking_field_seed>> options. +When <<plugins-{type}s-{plugin}-tracking_field>> is set, the plugin records the value of that field for the last document retrieved in a run into +a file. +(The file location defaults to <<plugins-{type}s-{plugin}-last_run_metadata_path>>.) + +You can then inject this value in the query using the placeholder `:last_value`. +The value will be injected into the query before execution, and then updated after the query completes if new data was found. + +This feature works best when: + +* the query sorts by the tracking field, +* the timestamp field is added by {es}, and +* the field type has enough resolution so that two events are unlikely to have the same value. + +Consider using a tracking field whose type is https://www.elastic.co/guide/en/elasticsearch/reference/current/date_nanos.html[date nanoseconds]. +If the tracking field is of this data type, you can use an extra placeholder called `:present` to inject the nano-second based value of "now-30s". +This placeholder is useful as the right-hand side of a range filter, allowing the collection of +new data but leaving partially-searchable bulk request data to the next scheduled job. + +[id="plugins-{type}s-{plugin}-tracking-sample"] +===== Sample configuration: Track field value across runs + +This section contains a series of steps to help you set up the "tailing" of data being written to a set of indices, using a date nanosecond field added by an Elasticsearch ingest pipeline and the `tracking_field` capability of this plugin. + +. Create ingest pipeline that adds Elasticsearch's `_ingest.timestamp` field to the documents as `event.ingested`: ++ +[source, json] + PUT _ingest/pipeline/my-pipeline + { + "processors": [ + { + "script": { + "lang": "painless", + "source": "ctx.putIfAbsent(\"event\", [:]); ctx.event.ingested = metadata().now.format(DateTimeFormatter.ISO_INSTANT);" + } + } + ] + } + +[start=2] +. Create an index mapping where the tracking field is of date nanosecond type and invokes the defined pipeline: ++ +[source, json] + PUT /_template/my_template + { + "index_patterns": ["test-*"], + "settings": { + "index.default_pipeline": "my-pipeline", + }, + "mappings": { + "properties": { + "event": { + "properties": { + "ingested": { + "type": "date_nanos", + "format": "strict_date_optional_time_nanos" + } + } + } + } + } + } + +[start=3] +. Define a query that looks at all data of the indices, sorted by the tracking field, and with a range filter since the last value seen until present: ++ +[source,json] +{ + "query": { + "range": { + "event.ingested": { + "gt": ":last_value", + "lt": ":present" + } + } + }, + "sort": [ + { + "event.ingested": { + "order": "asc", + "format": "strict_date_optional_time_nanos", + "numeric_type": "date_nanos" + } + } + ] +} + +[start=4] +. Configure the Elasticsearch input to query the indices with the query defined above, every minute, and track the `event.ingested` field: ++ +[source, ruby] + input { + elasticsearch { + id => tail_test_index + hosts => [ 'https://..'] + api_key => '....' + index => 'test-*' + query => '{ "query": { "range": { "event.ingested": { "gt": ":last_value", "lt": ":present"}}}, "sort": [ { "event.ingested": {"order": "asc", "format": "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" } } ] }' + tracking_field => "[event][ingested]" + slices => 5 # optional use of slices to speed data processing, should be equal to or less than number of primary shards + schedule => '* * * * *' # every minute + schedule_overlap => false # don't accumulate jobs if one takes longer than 1 minute + } + } + +With this sample setup, new documents are indexed into a `test-*` index. +The next scheduled run: + +* selects all new documents since the last observed value of the tracking field, +* uses {ref}/point-in-time-api.html#point-in-time-api[Point in time (PIT)] + {ref}/paginate-search-results.html#search-after[Search after] to paginate through all the data, and +* updates the value of the field at the end of the pagination. + [id="plugins-{type}s-{plugin}-options"] ==== Elasticsearch Input configuration options @@ -126,12 +253,14 @@ Please check out <<plugins-{type}s-{plugin}-obsolete-options>> for details. | <<plugins-{type}s-{plugin}-ecs_compatibility>> |<<string,string>>|No | <<plugins-{type}s-{plugin}-hosts>> |<<array,array>>|No | <<plugins-{type}s-{plugin}-index>> |<<string,string>>|No +| <<plugins-{type}s-{plugin}-last_run_metadata_path>> |<<string,string>>|No | <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No | <<plugins-{type}s-{plugin}-proxy>> |<<uri,uri>>|No | <<plugins-{type}s-{plugin}-query>> |<<string,string>>|No | <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations"]`|No | <<plugins-{type}s-{plugin}-request_timeout_seconds>> | <<number,number>>|No | <<plugins-{type}s-{plugin}-schedule>> |<<string,string>>|No +| <<plugins-{type}s-{plugin}-schedule_overlap>> |<<boolean,boolean>>|No | <<plugins-{type}s-{plugin}-scroll>> |<<string,string>>|No | <<plugins-{type}s-{plugin}-search_api>> |<<string,string>>, one of `["auto", "search_after", "scroll"]`|No | <<plugins-{type}s-{plugin}-size>> |<<number,number>>|No @@ -151,6 +280,8 @@ Please check out <<plugins-{type}s-{plugin}-obsolete-options>> for details. | <<plugins-{type}s-{plugin}-ssl_verification_mode>> |<<string,string>>, one of `["full", "none"]`|No | <<plugins-{type}s-{plugin}-socket_timeout_seconds>> | <<number,number>>|No | <<plugins-{type}s-{plugin}-target>> | {logstash-ref}/field-references-deepdive.html[field reference] | No +| <<plugins-{type}s-{plugin}-tracking_field>> |<<string,string>>|No +| <<plugins-{type}s-{plugin}-tracking_field_seed>> |<<string,string>>|No | <<plugins-{type}s-{plugin}-retries>> | <<number,number>>|No | <<plugins-{type}s-{plugin}-user>> |<<string,string>>|No |======================================================================= @@ -330,6 +461,17 @@ Check out {ref}/api-conventions.html#api-multi-index[Multi Indices documentation] in the Elasticsearch documentation for info on referencing multiple indices. +[id="plugins-{type}s-{plugin}-last_run_metadata_path"] +===== `last_run_metadata_path` + + * Value type is <<string,string>> + * There is no default value for this setting. + +The path to store the last observed value of the tracking field, when used. +By default this file is stored as `<path.data>/plugins/inputs/elasticsearch/<pipeline_id>/last_run_value`. + +This setting should point to file, not a directory, and Logstash must have read+write access to this file. + [id="plugins-{type}s-{plugin}-password"] ===== `password` @@ -410,6 +552,19 @@ for example: "* * * * *" (execute query every minute, on the minute) There is no schedule by default. If no schedule is given, then the statement is run exactly once. +[id="plugins-{type}s-{plugin}-schedule_overlap"] +===== `schedule_overlap` + + * Value type is <<boolean,boolean>> + * Default value is `true` + +Whether to allow queuing of a scheduled run if a run is occurring. +While this is ideal for ensuring a new run happens immediately after the previous on finishes if there +is a lot of work to do, but given the queue is unbounded it may lead to an out of memory over long periods of time +if the queue grows continuously. + +When in doubt, set `schedule_overlap` to false (it may become the default value in the future). + [id="plugins-{type}s-{plugin}-scroll"] ===== `scroll` @@ -622,6 +777,28 @@ When the `target` is set to a field reference, the `_source` of the hit is place This option can be useful to avoid populating unknown fields when a downstream schema such as ECS is enforced. It is also possible to target an entry in the event's metadata, which will be available during event processing but not exported to your outputs (e.g., `target \=> "[@metadata][_source]"`). +[id="plugins-{type}s-{plugin}-tracking_field"] +===== `tracking_field` + +* Value type is <<string,string>> +* There is no default value for this setting. + +Which field from the last event of a previous run will be used a cursor value for the following run. +The value of this field is injected into each query if the query uses the placeholder `:last_value`. +For the first query after a pipeline is started, the value used is either read from <<plugins-{type}s-{plugin}-last_run_metadata_path>> file, +or taken from <<plugins-{type}s-{plugin}-tracking_field_seed>> setting. + +Note: The tracking value is updated after each page is read and at the end of each Point in Time. In case of a crash the last saved value will be used so some duplication of data can occur. For this reason the use of unique document IDs for each event is recommended in the downstream destination. + +[id="plugins-{type}s-{plugin}-tracking_field_seed"] +===== `tracking_field_seed` + +* Value type is <<string,string>> +* Default value is `"1970-01-01T00:00:00.000000000Z"` + +The starting value for the <<plugins-{type}s-{plugin}-tracking_field>> if there is no <<plugins-{type}s-{plugin}-last_run_metadata_path>> already. +This field defaults to the nanosecond precision ISO8601 representation of `epoch`, or "1970-01-01T00:00:00.000000000Z", given nano-second precision timestamps are the +most reliable data format to use for this feature. [id="plugins-{type}s-{plugin}-user"] ===== `user` diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index c9d8b552..564acc6a 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -73,6 +73,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base require 'logstash/inputs/elasticsearch/paginated_search' require 'logstash/inputs/elasticsearch/aggregation' + require 'logstash/inputs/elasticsearch/cursor_tracker' include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck @@ -124,6 +125,20 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # by this pipeline input. config :slices, :validate => :number + # Enable tracking the value of a given field to be used as a cursor + # Main concerns: + # * using anything other than _event.timestamp easily leads to data loss + # * the first "synchronization run can take a long time" + config :tracking_field, :validate => :string + + # Define the initial seed value of the tracking_field + config :tracking_field_seed, :validate => :string, :default => "1970-01-01T00:00:00.000000000Z" + + # The location of where the tracking field value will be stored + # The value is persisted after each scheduled run (and not per result) + # If it's not set it defaults to '${path.data}/plugins/inputs/elasticsearch/<pipeline_id>/last_run_value' + config :last_run_metadata_path, :validate => :string + # If set, include Elasticsearch document information such as index, type, and # the id in the event. # @@ -250,6 +265,10 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # exactly once. config :schedule, :validate => :string + # Allow scheduled runs to overlap (enabled by default). Setting to false will + # only start a new scheduled run after the previous one completes. + config :schedule_overlap, :validate => :boolean + # If set, the _source of each hit will be added nested under the target instead of at the top-level config :target, :validate => :field_reference @@ -328,16 +347,30 @@ def register setup_query_executor + setup_cursor_tracker + @client end def run(output_queue) if @schedule - scheduler.cron(@schedule) { @query_executor.do_run(output_queue) } + scheduler.cron(@schedule, :overlap => @schedule_overlap) do + @query_executor.do_run(output_queue, get_query_object()) + end scheduler.join else - @query_executor.do_run(output_queue) + @query_executor.do_run(output_queue, get_query_object()) + end + end + + def get_query_object + if @cursor_tracker + query = @cursor_tracker.inject_cursor(@query) + @logger.debug("new query is #{query}") + else + query = @query end + LogStash::Json.load(query) end ## @@ -347,6 +380,11 @@ def push_hit(hit, output_queue, root_field = '_source') event = event_from_hit(hit, root_field) decorate(event) output_queue << event + record_last_value(event) + end + + def record_last_value(event) + @cursor_tracker.record_last_value(event) if @tracking_field end def event_from_hit(hit, root_field) @@ -640,6 +678,28 @@ def setup_query_executor end end + def setup_cursor_tracker + return unless @tracking_field + return unless @query_executor.is_a?(LogStash::Inputs::Elasticsearch::SearchAfter) + + if @resolved_search_api != "search_after" || @response_type != "hits" + raise ConfigurationError.new("The `tracking_field` feature can only be used with `search_after` non-aggregation queries") + end + + @cursor_tracker = CursorTracker.new(last_run_metadata_path: last_run_metadata_path, + tracking_field: @tracking_field, + tracking_field_seed: @tracking_field_seed) + @query_executor.cursor_tracker = @cursor_tracker + end + + def last_run_metadata_path + return @last_run_metadata_path if @last_run_metadata_path + + last_run_metadata_path = ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "elasticsearch", pipeline_id, "last_run_value") + FileUtils.mkdir_p ::File.dirname(last_run_metadata_path) + last_run_metadata_path + end + def get_transport_client_class # LS-core includes `elasticsearch` gem. The gem is composed of two separate gems: `elasticsearch-api` and `elasticsearch-transport` # And now `elasticsearch-transport` is old, instead we have `elastic-transport`. diff --git a/lib/logstash/inputs/elasticsearch/aggregation.rb b/lib/logstash/inputs/elasticsearch/aggregation.rb index e74a4357..91c12443 100644 --- a/lib/logstash/inputs/elasticsearch/aggregation.rb +++ b/lib/logstash/inputs/elasticsearch/aggregation.rb @@ -12,14 +12,9 @@ def initialize(client, plugin) @client = client @plugin_params = plugin.params + @index = @plugin_params["index"] @size = @plugin_params["size"] - @query = @plugin_params["query"] @retries = @plugin_params["retries"] - @agg_options = { - :index => @plugin_params["index"], - :size => 0 - }.merge(:body => @query) - @plugin = plugin end @@ -33,10 +28,18 @@ def retryable(job_name, &block) false end - def do_run(output_queue) + def aggregation_options(query_object) + { + :index => @index, + :size => 0, + :body => query_object + } + end + + def do_run(output_queue, query_object) logger.info("Aggregation starting") r = retryable(AGGREGATION_JOB) do - @client.search(@agg_options) + @client.search(aggregation_options(query_object)) end @plugin.push_hit(r, output_queue, 'aggregations') if r end diff --git a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb new file mode 100644 index 00000000..d43b1fd8 --- /dev/null +++ b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb @@ -0,0 +1,58 @@ +require 'fileutils' + +module LogStash; module Inputs; class Elasticsearch + class CursorTracker + include LogStash::Util::Loggable + + attr_reader :last_value + + def initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:) + @last_run_metadata_path = last_run_metadata_path + @last_value_hashmap = Java::java.util.concurrent.ConcurrentHashMap.new + @last_value = IO.read(@last_run_metadata_path) rescue nil || tracking_field_seed + @tracking_field = tracking_field + logger.info "Starting value for cursor field \"#{@tracking_field}\": #{@last_value}" + @mutex = Mutex.new + end + + def checkpoint_cursor(intermediate: true) + @mutex.synchronize do + if intermediate + # in intermediate checkpoints pick the smallest + converge_last_value {|v1, v2| v1 < v2 ? v1 : v2} + else + # in the last search of a PIT choose the largest + converge_last_value {|v1, v2| v1 > v2 ? v1 : v2} + @last_value_hashmap.clear + end + IO.write(@last_run_metadata_path, @last_value) + end + end + + def converge_last_value(&block) + return if @last_value_hashmap.empty? + new_last_value = @last_value_hashmap.reduceValues(1000, &block) + logger.debug? && logger.debug("converge_last_value: got #{@last_value_hashmap.values.inspect}. won: #{new_last_value}") + return if new_last_value == @last_value + @last_value = new_last_value + logger.info "New cursor value for field \"#{@tracking_field}\" is: #{new_last_value}" + end + + def record_last_value(event) + value = event.get(@tracking_field) + logger.trace? && logger.trace("storing last_value if #{@tracking_field} for #{Thread.current.object_id}: #{value}") + @last_value_hashmap.put(Thread.current.object_id, value) + end + + def inject_cursor(query_json) + # ":present" means "now - 30s" to avoid grabbing partially visible data in the PIT + result = query_json.gsub(":last_value", @last_value.to_s).gsub(":present", now_minus_30s) + logger.debug("inject_cursor: injected values for ':last_value' and ':present'", :query => result) + result + end + + def now_minus_30s + Java::java.time.Instant.now.minusSeconds(30).to_s + end + end +end; end; end diff --git a/lib/logstash/inputs/elasticsearch/paginated_search.rb b/lib/logstash/inputs/elasticsearch/paginated_search.rb index 2e8236bc..dd66b2c0 100644 --- a/lib/logstash/inputs/elasticsearch/paginated_search.rb +++ b/lib/logstash/inputs/elasticsearch/paginated_search.rb @@ -21,9 +21,10 @@ def initialize(client, plugin) @pipeline_id = plugin.pipeline_id end - def do_run(output_queue) - return retryable_search(output_queue) if @slices.nil? || @slices <= 1 + def do_run(output_queue, query) + @query = query + return retryable_search(output_queue) if @slices.nil? || @slices <= 1 retryable_slice_search(output_queue) end @@ -122,6 +123,13 @@ class SearchAfter < PaginatedSearch PIT_JOB = "create point in time (PIT)" SEARCH_AFTER_JOB = "search_after paginated search" + attr_accessor :cursor_tracker + + def do_run(output_queue, query) + super(output_queue, query) + @cursor_tracker.checkpoint_cursor(intermediate: false) if @cursor_tracker + end + def pit?(id) !!id&.is_a?(String) end @@ -192,6 +200,8 @@ def search(output_queue:, slice_id: nil, pit_id:) end end + @cursor_tracker.checkpoint_cursor(intermediate: true) if @cursor_tracker + logger.info("Query completed", log_details) end diff --git a/logstash-input-elasticsearch.gemspec b/logstash-input-elasticsearch.gemspec index 8e36201b..469f4d88 100644 --- a/logstash-input-elasticsearch.gemspec +++ b/logstash-input-elasticsearch.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-elasticsearch' - s.version = '5.0.2' + s.version = '5.1.0' s.licenses = ['Apache License (2.0)'] s.summary = "Reads query results from an Elasticsearch cluster" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/inputs/cursor_tracker_spec.rb b/spec/inputs/cursor_tracker_spec.rb new file mode 100644 index 00000000..291d6c61 --- /dev/null +++ b/spec/inputs/cursor_tracker_spec.rb @@ -0,0 +1,72 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/devutils/rspec/shared_examples" +require "logstash/inputs/elasticsearch" +require "logstash/inputs/elasticsearch/cursor_tracker" + +describe LogStash::Inputs::Elasticsearch::CursorTracker do + + let(:last_run_metadata_path) { Tempfile.new('cursor_tracker_testing').path } + let(:tracking_field_seed) { "1980-01-01T23:59:59.999999999Z" } + let(:options) do + { + :last_run_metadata_path => last_run_metadata_path, + :tracking_field => "my_field", + :tracking_field_seed => tracking_field_seed + } + end + + subject { described_class.new(**options) } + + it "creating a class works" do + expect(subject).to be_a described_class + end + + describe "checkpoint_cursor" do + before(:each) do + subject.checkpoint_cursor(intermediate: false) # store seed value + [ + Thread.new(subject) {|subject| subject.record_last_value(LogStash::Event.new("my_field" => "2025-01-03T23:59:59.999999999Z")) }, + Thread.new(subject) {|subject| subject.record_last_value(LogStash::Event.new("my_field" => "2025-01-01T23:59:59.999999999Z")) }, + Thread.new(subject) {|subject| subject.record_last_value(LogStash::Event.new("my_field" => "2025-01-02T23:59:59.999999999Z")) }, + ].each(&:join) + end + context "when doing intermediate checkpoint" do + it "persists the smallest value" do + subject.checkpoint_cursor(intermediate: true) + expect(IO.read(last_run_metadata_path)).to eq("2025-01-01T23:59:59.999999999Z") + end + end + context "when doing non-intermediate checkpoint" do + it "persists the largest value" do + subject.checkpoint_cursor(intermediate: false) + expect(IO.read(last_run_metadata_path)).to eq("2025-01-03T23:59:59.999999999Z") + end + end + end + + describe "inject_cursor" do + let(:new_value) { "2025-01-03T23:59:59.999999999Z" } + let(:fake_now) { "2026-09-19T23:59:59.999999999Z" } + + let(:query) do + %q[ + { "query": { "range": { "event.ingested": { "gt": :last_value, "lt": :present}}}, "sort": [ { "event.ingested": {"order": "asc", "format": "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" } } ] } + ] + end + + before(:each) do + subject.record_last_value(LogStash::Event.new("my_field" => new_value)) + subject.checkpoint_cursor(intermediate: false) + allow(subject).to receive(:now_minus_30s).and_return(fake_now) + end + + it "injects the value of the cursor into json query if it contains :last_value" do + expect(subject.inject_cursor(query)).to match(/#{new_value}/) + end + + it "injects current time into json query if it contains :present" do + expect(subject.inject_cursor(query)).to match(/#{fake_now}/) + end + end +end diff --git a/spec/inputs/elasticsearch_spec.rb b/spec/inputs/elasticsearch_spec.rb index 5067400e..0c07992c 100644 --- a/spec/inputs/elasticsearch_spec.rb +++ b/spec/inputs/elasticsearch_spec.rb @@ -1165,7 +1165,7 @@ def wait_receive_request context "when there's an exception" do before(:each) do - allow(client).to receive(:search).and_raise RuntimeError + allow(client).to receive(:search).and_raise RuntimeError.new("test exception") end it 'produces no events' do plugin.run queue @@ -1310,6 +1310,10 @@ def wait_receive_request let(:mock_queue) { double('queue', :<< => nil) } + before(:each) do + plugin.send(:setup_cursor_tracker) + end + it 'pushes a generated event to the queue' do plugin.send(:push_hit, hit, mock_queue) expect(mock_queue).to have_received(:<<) do |event| diff --git a/spec/inputs/integration/elasticsearch_spec.rb b/spec/inputs/integration/elasticsearch_spec.rb index 2e153495..a7c45bd5 100644 --- a/spec/inputs/integration/elasticsearch_spec.rb +++ b/spec/inputs/integration/elasticsearch_spec.rb @@ -76,6 +76,14 @@ shared_examples 'secured_elasticsearch' do it_behaves_like 'an elasticsearch index plugin' + let(:unauth_exception_class) do + begin + Elasticsearch::Transport::Transport::Errors::Unauthorized + rescue + Elastic::Transport::Transport::Errors::Unauthorized + end + end + context "incorrect auth credentials" do let(:config) do @@ -85,7 +93,7 @@ let(:queue) { [] } it "fails to run the plugin" do - expect { plugin.register }.to raise_error Elasticsearch::Transport::Transport::Errors::Unauthorized + expect { plugin.register }.to raise_error unauth_exception_class end end end