Skip to content

add filter for logging principal of incoming requests #3649

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: 7.7.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2025 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafka.schemaregistry.rest;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.security.Principal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.confluent.kafka.schemaregistry.utils.PrincipalContext;

/**
* This class is a servlet filter that logs the user principal for each incoming request to
* Schema Registry. It is a necessary step to allow for building resource associations
*/
public class PrincipalLoggingFilter implements Filter {

private static final Logger log = LoggerFactory.getLogger(PrincipalLoggingFilter.class.getName());

@Override
public void init(FilterConfig filterConfig) throws ServletException {

Check failure on line 40 in core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java#L40

Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation.
}

@Override
public void doFilter(ServletRequest request, ServletResponse servletResponse,
FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest req = (HttpServletRequest) request;
Principal principal = req.getUserPrincipal();

if (principal != null) {
log.info("User Principal: {}", principal.getName());
PrincipalContext.setPrincipal(principal.getName());
} else {
log.info("No User Principal found for the request.");
PrincipalContext.clear();
}

try {
filterChain.doFilter(request, servletResponse);
} finally {
PrincipalContext.clear(); // Clear the principal after the request is processed
}
}

@Override
public void destroy() {

Check failure on line 65 in core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java#L65

Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@
import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer;
import io.confluent.rest.Application;
import io.confluent.rest.RestConfigException;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.resource.ResourceCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.DispatcherType;
import javax.ws.rs.core.Configurable;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -59,6 +62,10 @@ public SchemaRegistryRestApplication(Properties props) throws RestConfigExceptio
@Override
protected void configurePreResourceHandling(ServletContextHandler context) {
super.configurePreResourceHandling(context);
PrincipalLoggingFilter principalLoggingFilter = new PrincipalLoggingFilter();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this pull out the user_id field from the REQUEST types? I am not super familiar with this code base to know what this does implicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all requests that come into SR will pass through this request filter, which will set the principal user (unique identifier for the request sender) in a principalContext. this is needed because the logs are populated at the service level, not at the request level - so we keep it in a principalContext which can be read when the actual CRUD SR calls happen. I have also added a sample log to the PR description - let me know if you want it reformatted another way

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, yeah I wasn't sure which id or reference the principalContext would use by default. This and the example log message clarify that, thanks!

FilterHolder filterHolder = new FilterHolder(principalLoggingFilter);
filterHolder.setName("PrincipalLoggingFilter");
context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
context.setErrorHandler(new JsonErrorHandler());
// This handler runs before first Session, Security or ServletHandler
context.insertHandler(new RequestHeaderHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException;
import io.confluent.kafka.schemaregistry.storage.serialization.Serializer;
import io.confluent.kafka.schemaregistry.utils.PrincipalContext;
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
import io.confluent.rest.NamedURI;
import io.confluent.rest.RestConfig;
Expand Down Expand Up @@ -658,6 +659,10 @@
schema = new Schema(subject, version, schema.getId(), newSchema);
}

String schemaHash = MD5.ofSchema(schema).toHexString();
String principal = PrincipalContext.getPrincipal();
logResourceAssociation(schemaHash, principal);

return register(subject, schema, normalize, request.doPropagateSchemaTags());
} catch (IllegalArgumentException e) {
throw new InvalidSchemaException(e);
Expand All @@ -677,7 +682,7 @@
* it is set in the return object.
*/
@Override
public Schema register(String subject,

Check failure on line 685 in core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java#L685

Refactor this method to reduce its Cognitive Complexity from 80 to the 15 allowed.
Schema schema,
boolean normalize,
boolean propagateSchemaTags)
Expand Down Expand Up @@ -1145,6 +1150,9 @@
} else {
kafkaStore.put(key, null);
}
String schemaHash = MD5.ofSchema(schema).toHexString();
String principal = PrincipalContext.getPrincipal();
logResourceAssociation(schemaHash, principal);
} catch (StoreTimeoutException te) {
throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te);
} catch (StoreException e) {
Expand Down Expand Up @@ -1715,6 +1723,11 @@
if (schemaValue != null && (!schemaValue.isDeleted() || returnDeletedSchema)) {
schema = toSchemaEntity(schemaValue);
}
if (schema != null) {
String schemaHash = MD5.ofSchema(schema).toHexString();
String principal = PrincipalContext.getPrincipal();
logResourceAssociation(schemaHash, principal);
}
return schema;
}
}
Expand Down Expand Up @@ -1887,6 +1900,9 @@
if (schema == null) {
return null;
}
String schemaHash = MD5.ofSchema(schema).toHexString();
String principal = PrincipalContext.getPrincipal();
logResourceAssociation(schemaHash, principal);

SchemaIdAndSubjects schemaIdAndSubjects =
this.lookupCache.schemaIdAndSubjects(toSchemaEntity(schema));
Expand Down Expand Up @@ -2626,6 +2642,10 @@
}
}

private void logResourceAssociation(String schemaHash, String principal) {
log.info("Resource association log - (Principal, schemaHash): ({}, {})", principal, schemaHash);
}

@Override
public SchemaRegistryConfig config() {
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,16 @@ public boolean equals(Object o) {
MD5 otherMd5 = (MD5) o;
return Arrays.equals(this.md5, otherMd5.md5);
}

public String toHexString() {
StringBuilder hexString = new StringBuilder();
for (byte b : md5) {
String hex = Integer.toHexString(0xFF & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2025 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafka.schemaregistry.utils;

public class PrincipalContext {

Check warning on line 18 in core/src/main/java/io/confluent/kafka/schemaregistry/utils/PrincipalContext.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

core/src/main/java/io/confluent/kafka/schemaregistry/utils/PrincipalContext.java#L18

Add a private constructor to hide the implicit public one.
private static final ThreadLocal<String> principalHolder = new ThreadLocal<>();

public static void setPrincipal(String principal) {
principalHolder.set(principal);
}

public static String getPrincipal() {
return principalHolder.get();
}

public static void clear() {
principalHolder.remove();
}
}
Loading