-
Notifications
You must be signed in to change notification settings - Fork 3.6k
[feat][admin] PIP-415: Support getting message ID by index #24222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
035132b
to
75194f7
Compare
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#24220 PIP has a lot of adjustments.
@liangyepianzhou please ping me after you modify the code and I will review it again.
Good jobs |
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
Out of the scope of this PR, I think it would be better to add a method to |
[2] rename getMessageIDByIndexAndPartitionID to getMessageIDByIndex
Yes, we should add a new manager ledger interface to support customized ML implementations. Additionally, the current message index retrieval solution has suboptimal performance. We could improve this by adding a field in I've created an issue to track this enhancement. |
Overall LGTM, but the current impl will scan all entire ML, maybe you can optimize it via PIP-404 |
Interesting—I originally proposed directly adding the first entry's index in |
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
managedLedger) | ||
.thenCompose(firstIndex -> { | ||
if (index < firstIndex) { | ||
return CompletableFuture.completedFuture(PositionFactory.EARLIEST); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or consider returning null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why you think null is better than earliest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explicit signal that no valid position exists for the given index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can get more context here
https://github.com/apache/pulsar/pull/24220/files#r2090841224
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, got.
PositionFactory.EARLIEST
name feels odd, as its value (-1, -1)
doesn't correspond to a real message and may mislead developers into thinking it's a valid starting point. Returning null
would more clearly indicate that the index is out of range and no valid position exists when used as the public value.
It's just my opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return earliest is OK, but do we pass the index to the clients?
I seem to have some impression, but I'm not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(-1, -1) doesn't correspond to a real message and may mislead developers into thinking it's a valid starting point.
It's already documented, so it's good.
but do we pass the index to the clients?
Do you mean the actual first available index? If so, I think the API could be unnecessarily complicated (a good API does not do multiple things)
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a new HTTP API to retrieve a Pulsar message ID given its index, enabling caching between message IDs and offsets and providing new CLI and admin functionalities.
- Adds a new CLI command and corresponding API methods in the client admin components.
- Implements REST endpoint and internal broker logic to support retrieving message IDs by index.
- Adds tests to verify the new functionality for both partitioned and non-partitioned topics.
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java | Introduces the new CLI command "get-message-id-by-index". |
pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java | Adds tests verifying the new CLI command functionality. |
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java | Implements synchronous and asynchronous API methods for getMessageIdByIndex. |
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java | Updates the Topics interface with new method signatures and documentation. |
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java | Adds integration test scenarios for the new message ID by index functionality. |
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java | Implements new REST endpoint to support retrieving message ID by index. |
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | Provides the internal broker logic for the getMessageIdByIndex API. |
Comments suppressed due to low confidence (1)
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:5033
- The variable 'topicName' is not defined in this scope. It appears you intended to log the resolved topic name (perhaps 'encodedTopic' or a combination of tenant, namespace, and encodedTopic).
log.error("[{}] Failed to get message id by index for topic {}, partition id {}, index {}", clientAppId(), topicName, index, ex);
192c53f
to
ad9b66b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some small comments, Overall LGTM.
By the way, if you rely on the AppendMessageIndexInterceptor, I'd suggest you need to check whether the interceptor configured before start the broker.
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Show resolved
Hide resolved
managedLedger) | ||
.thenCompose(firstIndex -> { | ||
if (index < firstIndex) { | ||
return CompletableFuture.completedFuture(PositionFactory.EARLIEST); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return earliest is OK, but do we pass the index to the clients?
I seem to have some impression, but I'm not sure.
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
Show resolved
Hide resolved
c531f37
to
35a5ae5
Compare
PulsarAdminException.class, NotFoundException.class); | ||
} | ||
|
||
private void assertThrowsWithCause(Executable executable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private void assertThrowsWithCause(Executable executable, | |
private void assertThrowsWithCause(Runnable executable, |
@@ -41,6 +46,7 @@ | |||
import org.apache.pulsar.common.util.FutureUtil; | |||
import org.assertj.core.util.Sets; | |||
import org.awaitility.Awaitility; | |||
import org.junit.jupiter.api.function.Executable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import org.junit.jupiter.api.function.Executable; |
PIP:#24220
Motivation
we can now obtain the offset of a message by its message id:
get-message-by-id
cmdMessage.getIndex()
But we cannot obtain the message id by offset. Then we need to add a new API to get the message id by offset.
Modifications
Add a new http API to retrieve the message ID by offset.
We propose to add a new API to retrieve the message ID by offset, enabling us to cache the mapping between message ID and offset.
This will allow us to use offsets for seek and acknowledgment operations when consuming messages through the standardized API.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: