Skip to content

365.add support of custom codecs in topics #447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: release_v2.4.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions topic/src/main/java/tech/ydb/topic/TopicClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.description.ConsumerDescription;
import tech.ydb.topic.description.TopicDescription;
import tech.ydb.topic.impl.GrpcTopicRpc;
Expand Down Expand Up @@ -164,6 +165,14 @@ default CompletableFuture<Result<ConsumerDescription>> describeConsumer(String p
@Override
void close();

/**
* Register custom codec implementation to TopicClient *
*
* @param codec - custom implementation
*/
void registerCodec(Codec codec);


/**
* BUILDER
*/
Expand Down
72 changes: 66 additions & 6 deletions topic/src/main/java/tech/ydb/topic/description/Codec.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,72 @@
package tech.ydb.topic.description;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**

*
* Interface for custom codec implementation.
* <p>
*
* You can use custom codec as below
* 1. Implement interface methods
* Specify getId which return value more than 10000. This value identify codec across others
* 2. Use code below to write data
* Codec codecImpl = ....
* Topic client = TopicClient.newClient(ydbTransport).build();
* <p>
* client.registerCodec(codecImpl);
* WriterSettings settings = WriterSettings.newBuilder()
* .setTopicPath(topicName)
* .setCodec(codecId)
* .build();
* <p>
* SyncWriter writer = client.createSyncWriter(settings);
* <p>
* 3. Use to read data. Codec should be registered in {@link CodecRegistry}
* Codec codecImpl = ....
* Topic client = TopicClient.newClient(ydbTransport).build();
* <p>
* ReaderSettings readerSettings = ReaderSettings.newBuilder()
* .addTopic(TopicReadSettings.newBuilder().setPath(topicName).build())
* .setConsumerName(TEST_CONSUMER1)
* .build();
* <p>
* SyncReader reader = client.createSyncReader(readerSettings);
*
* @author Nikolay Perfilov
*/
public enum Codec {
RAW,
GZIP,
LZOP,
ZSTD,
CUSTOM;
public interface Codec {
int RAW = 1;
int GZIP = 2;
int LZOP = 3;
int ZSTD = 4;

/**
* Get codec identifier
* @return codec identifier
*/
int getId();

/**
* Decode data
*
* @param byteArrayInputStream input stream
* @return output stream
* @throws IOException throws when error occurs
*/

InputStream decode(InputStream byteArrayInputStream) throws IOException;

/**
* Encode data
*
* @param byteArrayOutputStream output stream
* @return output stream
* @throws IOException throws when error occurs
*/
OutputStream encode(OutputStream byteArrayOutputStream) throws IOException;

}
63 changes: 63 additions & 0 deletions topic/src/main/java/tech/ydb/topic/description/CodecRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package tech.ydb.topic.description;

import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.topic.impl.GzipCodec;
import tech.ydb.topic.impl.LzopCodec;
import tech.ydb.topic.impl.RawCodec;
import tech.ydb.topic.impl.ZstdCodec;

/**
* Register for custom topic codec. Local to TopicClient
*
* @author Evgeny Kuvardin
**/
public class CodecRegistry {

private static final Logger logger = LoggerFactory.getLogger(CodecRegistry.class);

final Map<Integer, Codec> customCodecMap;

public CodecRegistry() {
customCodecMap = new HashMap<>();
customCodecMap.put(Codec.RAW, RawCodec.getInstance());
customCodecMap.put(Codec.GZIP, GzipCodec.getInstance());
customCodecMap.put(Codec.LZOP, LzopCodec.getInstance());
customCodecMap.put(Codec.ZSTD, ZstdCodec.getInstance());
}

/**
* Register codec implementation
* @param codec codec implementation
* @return previous implementation with associated codec
*/
public Codec registerCodec(Codec codec) {
assert codec != null;
int codecId = codec.getId();

Codec result = customCodecMap.put(codecId, codec);

if (result != null) {
logger.info(
"Replace codec which have already associated with this id. CodecId: {} Codec: {}",
codecId,
result);
}

return result;
}

/**
* Get codec implementation by associated id
* @param codecId codec identifier
* @return codec implementation
*/
public Codec getCodec(int codecId) {
return customCodecMap.get(codecId);
}

}
13 changes: 5 additions & 8 deletions topic/src/main/java/tech/ydb/topic/description/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -14,7 +13,6 @@

import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.utils.ProtoUtils;

/**
* @author Nikolay Perfilov
Expand All @@ -23,7 +21,7 @@ public class Consumer {
private final String name;
private final boolean important;
private final Instant readFrom;
private final List<Codec> supportedCodecs;
private final List<Integer> supportedCodecs;
private final Map<String, String> attributes;
private final ConsumerStats stats;

Expand All @@ -40,8 +38,7 @@ public Consumer(YdbTopic.Consumer consumer) {
this.name = consumer.getName();
this.important = consumer.getImportant();
this.readFrom = ProtobufUtils.protoToInstant(consumer.getReadFrom());
this.supportedCodecs = consumer.getSupportedCodecs().getCodecsList()
.stream().map(ProtoUtils::codecFromProto).collect(Collectors.toList());
this.supportedCodecs = new ArrayList<>(consumer.getSupportedCodecs().getCodecsList());
this.attributes = consumer.getAttributesMap();
this.stats = new ConsumerStats(consumer.getConsumerStats());
}
Expand All @@ -68,7 +65,7 @@ public SupportedCodecs getSupportedCodecs() {
return new SupportedCodecs(supportedCodecs);
}

public List<Codec> getSupportedCodecsList() {
public List<Integer> getSupportedCodecsList() {
return supportedCodecs;
}

Expand All @@ -88,7 +85,7 @@ public static class Builder {
private String name;
private boolean important = false;
private Instant readFrom = null;
private List<Codec> supportedCodecs = new ArrayList<>();
private List<Integer> supportedCodecs = new ArrayList<>();
private Map<String, String> attributes = new HashMap<>();
private ConsumerStats stats = null;

Expand All @@ -107,7 +104,7 @@ public Builder setReadFrom(Instant readFrom) {
return this;
}

public Builder addSupportedCodec(Codec codec) {
public Builder addSupportedCodec(int codec) {
this.supportedCodecs.add(codec);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
* @author Nikolay Perfilov
*/
public class SupportedCodecs {
private final List<Codec> codecs;
private final List<Integer> codecs;

public SupportedCodecs(Builder builder) {
this.codecs = ImmutableList.copyOf(builder.codecs);
}

public SupportedCodecs(List<Codec> codecs) {
public SupportedCodecs(List<Integer> codecs) {
this.codecs = codecs;
}

public List<Codec> getCodecs() {
public List<Integer> getCodecs() {
return codecs;
}

Expand All @@ -31,14 +31,14 @@ public static Builder newBuilder() {
* BUILDER
*/
public static class Builder {
private List<Codec> codecs = new ArrayList<>();
private List<Integer> codecs = new ArrayList<>();

public Builder addCodec(Codec codec) {
public Builder addCodec(int codec) {
codecs.add(codec);
return this;
}

public Builder setCodecs(List<Codec> supportedCodecs) {
public Builder setCodecs(List<Integer> supportedCodecs) {
this.codecs = supportedCodecs;
return this;
}
Expand Down
43 changes: 43 additions & 0 deletions topic/src/main/java/tech/ydb/topic/impl/GzipCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package tech.ydb.topic.impl;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import tech.ydb.topic.description.Codec;

/**
* Compression codec which implements the GZIP algorithm
*/
public class GzipCodec implements Codec {

private static final GzipCodec INSTANCE = new GzipCodec();

private GzipCodec() {
}

/**
* Get single instance
* @return single instance of RawCodec
*/
public static GzipCodec getInstance() {
return INSTANCE;
}

@Override
public int getId() {
return Codec.GZIP;
}

@Override
public InputStream decode(InputStream byteArrayInputStream) throws IOException {
return new GZIPInputStream(byteArrayInputStream);
}

@Override
public OutputStream encode(OutputStream byteArrayOutputStream) throws IOException {
return new GZIPOutputStream(byteArrayOutputStream);
}
}
48 changes: 48 additions & 0 deletions topic/src/main/java/tech/ydb/topic/impl/LzopCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package tech.ydb.topic.impl;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.anarres.lzo.LzoAlgorithm;
import org.anarres.lzo.LzoCompressor;
import org.anarres.lzo.LzoLibrary;
import org.anarres.lzo.LzopInputStream;
import org.anarres.lzo.LzopOutputStream;

import tech.ydb.topic.description.Codec;

/**
* Compression codec which implements the LZO algorithm
*/
public class LzopCodec implements Codec {

private static final LzopCodec INSTANCE = new LzopCodec();

private LzopCodec() {
}

/**
* Get single instance
* @return single instance of RawCodec
*/
public static LzopCodec getInstance() {
return INSTANCE;
}

@Override
public int getId() {
return Codec.LZOP;
}

@Override
public InputStream decode(InputStream byteArrayInputStream) throws IOException {
return new LzopInputStream(byteArrayInputStream);
}

@Override
public OutputStream encode(OutputStream byteArrayOutputStream) throws IOException {
LzoCompressor lzoCompressor = LzoLibrary.getInstance().newCompressor(LzoAlgorithm.LZO1X, null);
return new LzopOutputStream(byteArrayOutputStream, lzoCompressor);
}
}
Loading