From 31aa667c49980862148f88ad54239aff3ed4d289 Mon Sep 17 00:00:00 2001 From: djibodu Date: Fri, 28 Mar 2025 16:21:35 -0400 Subject: [PATCH 1/6] add filter for logging principal of incoming requests --- .../rest/PrincipalLoggingFilter.java | 61 +++++++++++++++++++ .../rest/SchemaRegistryRestApplication.java | 7 +++ 2 files changed, 68 insertions(+) create mode 100644 core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java new file mode 100644 index 00000000000..4fae185845d --- /dev/null +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java @@ -0,0 +1,61 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.rest; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.security.Principal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** +* This class is a servlet filter that logs the user principal for each incoming request to +* Schema Registry. It is a necessary step to allow for building resource associations +*/ +public class PrincipalLoggingFilter implements Filter { + + private static final Logger log = LoggerFactory.getLogger(PrincipalLoggingFilter.class.getName()); + + @Override + public void init(FilterConfig filterConfig) throws ServletException { + } + + @Override + public void doFilter(ServletRequest request, ServletResponse servletResponse, + FilterChain filterChain) throws IOException, ServletException { + HttpServletRequest req = (HttpServletRequest) request; + Principal principal = req.getUserPrincipal(); + String principalId = null; + + if (principal != null) { + log.info("User Principal: {}", principal.getName()); + } else { + log.info("No User Principal found for the request."); + } + + filterChain.doFilter(request, servletResponse); // what does this line do? + } + + @Override + public void destroy() { + } +} diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java index 6ddf2d1d4dd..c4d58accbee 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java @@ -35,14 +35,17 @@ import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer; import io.confluent.rest.Application; import io.confluent.rest.RestConfigException; +import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.util.resource.Resource; import org.eclipse.jetty.util.resource.ResourceCollection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.DispatcherType; import javax.ws.rs.core.Configurable; import java.io.IOException; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -59,6 +62,10 @@ public SchemaRegistryRestApplication(Properties props) throws RestConfigExceptio @Override protected void configurePreResourceHandling(ServletContextHandler context) { super.configurePreResourceHandling(context); + PrincipalLoggingFilter principalLoggingFilter = new PrincipalLoggingFilter(); + FilterHolder filterHolder = new FilterHolder(principalLoggingFilter); + filterHolder.setName("PrincipalLoggingFilter"); + context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST)); context.setErrorHandler(new JsonErrorHandler()); // This handler runs before first Session, Security or ServletHandler context.insertHandler(new RequestHeaderHandler()); From 59d64f89bf00746575562cca47edd44911a4c9dd Mon Sep 17 00:00:00 2001 From: djibodu Date: Mon, 31 Mar 2025 15:57:47 -0400 Subject: [PATCH 2/6] remove unused principalId variable --- .../kafka/schemaregistry/rest/PrincipalLoggingFilter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java index 4fae185845d..a435e2884b7 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java @@ -44,7 +44,6 @@ public void doFilter(ServletRequest request, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { HttpServletRequest req = (HttpServletRequest) request; Principal principal = req.getUserPrincipal(); - String principalId = null; if (principal != null) { log.info("User Principal: {}", principal.getName()); From b4d68a57887c2f93c1bf59eed8898894137f3875 Mon Sep 17 00:00:00 2001 From: djibodu Date: Tue, 1 Apr 2025 10:11:15 -0400 Subject: [PATCH 3/6] remove placeholder comments --- .../kafka/schemaregistry/rest/PrincipalLoggingFilter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java index a435e2884b7..f58ceb1595b 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java @@ -51,7 +51,7 @@ public void doFilter(ServletRequest request, ServletResponse servletResponse, log.info("No User Principal found for the request."); } - filterChain.doFilter(request, servletResponse); // what does this line do? + filterChain.doFilter(request, servletResponse); } @Override From abe11935aa3abcbae4ab8912e028ece28d8b7293 Mon Sep 17 00:00:00 2001 From: djibodu Date: Tue, 1 Apr 2025 15:08:06 -0400 Subject: [PATCH 4/6] log schema hashes - need to combine them into tuples --- .../schemaregistry/storage/KafkaSchemaRegistry.java | 11 +++++++++++ .../confluent/kafka/schemaregistry/storage/MD5.java | 12 ++++++++++++ 2 files changed, 23 insertions(+) diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index d049034ba1a..bc3510e15da 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -658,6 +658,9 @@ public Schema register(String subject, RegisterSchemaRequest request, boolean no schema = new Schema(subject, version, schema.getId(), newSchema); } + String schemaHash = MD5.ofSchema(schema).toHexString(); + log.info("Attempting to register schema with hash: {}", schemaHash); + return register(subject, schema, normalize, request.doPropagateSchemaTags()); } catch (IllegalArgumentException e) { throw new InvalidSchemaException(e); @@ -1145,6 +1148,8 @@ public void deleteSchemaVersion(String subject, } else { kafkaStore.put(key, null); } + String schemaHash = MD5.ofSchema(schema).toHexString(); + log.info("Deleting schema version of schema with hash: {}", schemaHash); } catch (StoreTimeoutException te) { throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te); } catch (StoreException e) { @@ -1715,6 +1720,10 @@ public Schema get(String subject, int version, boolean returnDeletedSchema) if (schemaValue != null && (!schemaValue.isDeleted() || returnDeletedSchema)) { schema = toSchemaEntity(schemaValue); } + if (schema != null) { + String schemaHash = MD5.ofSchema(schema).toHexString(); + log.info("Reading schema with hash: {}", schemaHash); + } return schema; } } @@ -1887,6 +1896,8 @@ public List listVersionsForId(int id, String subject, boolean lo if (schema == null) { return null; } + String schemaHash = MD5.ofSchema(schema).toHexString(); + log.info("Schema hash for schema being read: {}", schemaHash); SchemaIdAndSubjects schemaIdAndSubjects = this.lookupCache.schemaIdAndSubjects(toSchemaEntity(schema)); diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/MD5.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/MD5.java index 1692ccf1fb0..b889c52a99a 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/MD5.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/MD5.java @@ -81,4 +81,16 @@ public boolean equals(Object o) { MD5 otherMd5 = (MD5) o; return Arrays.equals(this.md5, otherMd5.md5); } + + public String toHexString() { + StringBuilder hexString = new StringBuilder(); + for (byte b : md5) { + String hex = Integer.toHexString(0xFF & b); + if (hex.length() == 1) { + hexString.append('0'); + } + hexString.append(hex); + } + return hexString.toString(); + } } From 46d938d475e01130cde80e3888f7d9f7de07664c Mon Sep 17 00:00:00 2001 From: djibodu Date: Tue, 1 Apr 2025 15:39:37 -0400 Subject: [PATCH 5/6] log associations as a tuple --- .../rest/PrincipalLoggingFilter.java | 9 +++++- .../storage/KafkaSchemaRegistry.java | 17 +++++++--- .../utils/PrincipalContext.java | 32 +++++++++++++++++++ 3 files changed, 53 insertions(+), 5 deletions(-) create mode 100644 core/src/main/java/io/confluent/kafka/schemaregistry/utils/PrincipalContext.java diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java index f58ceb1595b..1e420713faa 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java @@ -26,6 +26,7 @@ import java.security.Principal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.confluent.kafka.schemaregistry.utils.PrincipalContext; /** * This class is a servlet filter that logs the user principal for each incoming request to @@ -47,11 +48,17 @@ public void doFilter(ServletRequest request, ServletResponse servletResponse, if (principal != null) { log.info("User Principal: {}", principal.getName()); + PrincipalContext.setPrincipal(principal.getName()); } else { log.info("No User Principal found for the request."); + PrincipalContext.clear(); } - filterChain.doFilter(request, servletResponse); + try { + filterChain.doFilter(request, servletResponse); + } finally { + PrincipalContext.clear(); // Clear the principal after the request is processed + } } @Override diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index bc3510e15da..7aaa4c126d6 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -84,6 +84,7 @@ import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException; import io.confluent.kafka.schemaregistry.storage.serialization.Serializer; +import io.confluent.kafka.schemaregistry.utils.PrincipalContext; import io.confluent.kafka.schemaregistry.utils.QualifiedSubject; import io.confluent.rest.NamedURI; import io.confluent.rest.RestConfig; @@ -659,7 +660,8 @@ public Schema register(String subject, RegisterSchemaRequest request, boolean no } String schemaHash = MD5.ofSchema(schema).toHexString(); - log.info("Attempting to register schema with hash: {}", schemaHash); + String principal = PrincipalContext.getPrincipal(); + logResourceAssociation(schemaHash, principal); return register(subject, schema, normalize, request.doPropagateSchemaTags()); } catch (IllegalArgumentException e) { @@ -1149,7 +1151,8 @@ public void deleteSchemaVersion(String subject, kafkaStore.put(key, null); } String schemaHash = MD5.ofSchema(schema).toHexString(); - log.info("Deleting schema version of schema with hash: {}", schemaHash); + String principal = PrincipalContext.getPrincipal(); + logResourceAssociation(schemaHash, principal); } catch (StoreTimeoutException te) { throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te); } catch (StoreException e) { @@ -1722,7 +1725,8 @@ public Schema get(String subject, int version, boolean returnDeletedSchema) } if (schema != null) { String schemaHash = MD5.ofSchema(schema).toHexString(); - log.info("Reading schema with hash: {}", schemaHash); + String principal = PrincipalContext.getPrincipal(); + logResourceAssociation(schemaHash, principal); } return schema; } @@ -1897,7 +1901,8 @@ public List listVersionsForId(int id, String subject, boolean lo return null; } String schemaHash = MD5.ofSchema(schema).toHexString(); - log.info("Schema hash for schema being read: {}", schemaHash); + String principal = PrincipalContext.getPrincipal(); + logResourceAssociation(schemaHash, principal); SchemaIdAndSubjects schemaIdAndSubjects = this.lookupCache.schemaIdAndSubjects(toSchemaEntity(schema)); @@ -2637,6 +2642,10 @@ private static boolean shouldInclude(boolean isDeleted, LookupFilter filter) { } } + private void logResourceAssociation(String schemaHash, String principal) { + log.info("Resource association log - (Principal, schemaHash): ({}, {})", principal, schemaHash); + } + @Override public SchemaRegistryConfig config() { return config; diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/utils/PrincipalContext.java b/core/src/main/java/io/confluent/kafka/schemaregistry/utils/PrincipalContext.java new file mode 100644 index 00000000000..458b21549f1 --- /dev/null +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/utils/PrincipalContext.java @@ -0,0 +1,32 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.utils; + +public class PrincipalContext { + private static final ThreadLocal principalHolder = new ThreadLocal<>(); + + public static void setPrincipal(String principal) { + principalHolder.set(principal); + } + + public static String getPrincipal() { + return principalHolder.get(); + } + + public static void clear() { + principalHolder.remove(); + } +} \ No newline at end of file From 7fa76480b010d8061f66a19261f989693fd4c545 Mon Sep 17 00:00:00 2001 From: djibodu Date: Wed, 2 Apr 2025 16:38:36 -0400 Subject: [PATCH 6/6] minor refactoring --- .../storage/KafkaSchemaRegistry.java | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index 7aaa4c126d6..89a12bf4dcf 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -659,9 +659,7 @@ public Schema register(String subject, RegisterSchemaRequest request, boolean no schema = new Schema(subject, version, schema.getId(), newSchema); } - String schemaHash = MD5.ofSchema(schema).toHexString(); - String principal = PrincipalContext.getPrincipal(); - logResourceAssociation(schemaHash, principal); + logResourceAssociation(schema); return register(subject, schema, normalize, request.doPropagateSchemaTags()); } catch (IllegalArgumentException e) { @@ -1150,9 +1148,7 @@ public void deleteSchemaVersion(String subject, } else { kafkaStore.put(key, null); } - String schemaHash = MD5.ofSchema(schema).toHexString(); - String principal = PrincipalContext.getPrincipal(); - logResourceAssociation(schemaHash, principal); + logResourceAssociation(schema); } catch (StoreTimeoutException te) { throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te); } catch (StoreException e) { @@ -1306,6 +1302,7 @@ public Schema lookUpSchemaUnderSubject( if (schema == null) { return null; } + logResourceAssociation(schema); Config config = getConfigInScope(subject); Schema existingSchema = lookUpSchemaUnderSubject( config, subject, schema, normalize, lookupDeletedSchema, false); @@ -1724,9 +1721,7 @@ public Schema get(String subject, int version, boolean returnDeletedSchema) schema = toSchemaEntity(schemaValue); } if (schema != null) { - String schemaHash = MD5.ofSchema(schema).toHexString(); - String principal = PrincipalContext.getPrincipal(); - logResourceAssociation(schemaHash, principal); + logResourceAssociation(schema); } return schema; } @@ -1900,9 +1895,7 @@ public List listVersionsForId(int id, String subject, boolean lo if (schema == null) { return null; } - String schemaHash = MD5.ofSchema(schema).toHexString(); - String principal = PrincipalContext.getPrincipal(); - logResourceAssociation(schemaHash, principal); + logResourceAssociation(toSchemaEntity(schema)); SchemaIdAndSubjects schemaIdAndSubjects = this.lookupCache.schemaIdAndSubjects(toSchemaEntity(schema)); @@ -2642,7 +2635,9 @@ private static boolean shouldInclude(boolean isDeleted, LookupFilter filter) { } } - private void logResourceAssociation(String schemaHash, String principal) { + private void logResourceAssociation(Schema schema) { + String schemaHash = MD5.ofSchema(schema).toHexString(); + String principal = PrincipalContext.getPrincipal(); log.info("Resource association log - (Principal, schemaHash): ({}, {})", principal, schemaHash); }