Skip to content

Commit a8f71db

Browse files
committed
Implement ByteArrayInputStreamCodec
1 parent bae7e03 commit a8f71db

File tree

2 files changed

+155
-1
lines changed

2 files changed

+155
-1
lines changed
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2023 asyncer.io projects
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.asyncer.r2dbc.mysql.codec;
18+
19+
import io.asyncer.r2dbc.mysql.MySqlParameter;
20+
import io.asyncer.r2dbc.mysql.ParameterWriter;
21+
import io.asyncer.r2dbc.mysql.api.MySqlReadableMetadata;
22+
import io.asyncer.r2dbc.mysql.constant.MySqlType;
23+
import io.asyncer.r2dbc.mysql.internal.util.VarIntUtils;
24+
import io.netty.buffer.ByteBuf;
25+
import io.netty.buffer.ByteBufAllocator;
26+
import reactor.core.publisher.Mono;
27+
28+
import java.io.ByteArrayInputStream;
29+
import java.io.InputStream;
30+
31+
import static io.asyncer.r2dbc.mysql.internal.util.InternalArrays.EMPTY_BYTES;
32+
33+
/**
34+
* Codec for {@link InputStream}.
35+
*/
36+
final class ByteArrayInputStreamCodec extends AbstractClassedCodec<ByteArrayInputStream> {
37+
38+
static final ByteArrayInputStreamCodec INSTANCE = new ByteArrayInputStreamCodec();
39+
40+
private ByteArrayInputStreamCodec() {
41+
super(ByteArrayInputStream.class);
42+
}
43+
44+
@Override
45+
public ByteArrayInputStream decode(ByteBuf value, MySqlReadableMetadata metadata, Class<?> target, boolean binary,
46+
CodecContext context) {
47+
if (!value.isReadable()) {
48+
return new ByteArrayInputStream(EMPTY_BYTES);
49+
}
50+
return new ByteArrayInputStream(value.array());
51+
}
52+
53+
@Override
54+
protected boolean doCanDecode(MySqlReadableMetadata metadata) {
55+
return metadata.getJavaType() == InputStream.class;
56+
}
57+
58+
@Override
59+
public boolean canEncode(Object value) {
60+
return value instanceof InputStream;
61+
}
62+
63+
@Override
64+
public MySqlParameter encode(Object value, CodecContext context) {
65+
return new ByteArrayInputStreamMysqlParameter((ByteArrayInputStream) value);
66+
}
67+
68+
private static final class ByteArrayInputStreamMysqlParameter extends AbstractMySqlParameter {
69+
70+
private final ByteArrayInputStream value;
71+
72+
private ByteArrayInputStreamMysqlParameter(ByteArrayInputStream value) {
73+
this.value = value;
74+
}
75+
76+
@Override
77+
public Mono<ByteBuf> publishBinary(ByteBufAllocator allocator) {
78+
return Mono.fromSupplier(() -> {
79+
int size = value.available();
80+
if (size == 0) {
81+
return allocator.buffer(Byte.BYTES).writeByte(0);
82+
}
83+
84+
int addedSize = VarIntUtils.varIntBytes(size);
85+
ByteBuf buf = allocator.buffer(addedSize + size);
86+
87+
try {
88+
VarIntUtils.writeVarInt(buf, size);
89+
90+
byte[] byteArray = new byte[size];
91+
int readBytes = value.read(byteArray);
92+
93+
if (readBytes != size) {
94+
buf.release();
95+
throw new IllegalStateException("Expected to read " + size + " bytes, but got " + readBytes);
96+
}
97+
98+
return buf.writeBytes(byteArray);
99+
} catch (Exception e) {
100+
buf.release();
101+
throw new RuntimeException(e);
102+
}
103+
});
104+
}
105+
106+
@Override
107+
public Mono<Void> publishText(ParameterWriter writer) {
108+
return Mono.fromRunnable(() -> {
109+
try {
110+
int size = value.available();
111+
byte[] byteArray = new byte[size];
112+
int readBytes = value.read(byteArray);
113+
114+
if (size != 0 && readBytes != size) {
115+
throw new IllegalStateException("Expected to read " + size + " bytes, but got " + readBytes);
116+
}
117+
118+
writer.writeHex(byteArray);
119+
} catch (Exception e) {
120+
throw new RuntimeException(e);
121+
}
122+
});
123+
}
124+
125+
@Override
126+
public String toString() {
127+
return value.toString();
128+
}
129+
130+
@Override
131+
public boolean equals(Object o) {
132+
if (this == o) {
133+
return true;
134+
}
135+
if (!(o instanceof ByteArrayInputStreamMysqlParameter)) {
136+
return false;
137+
}
138+
139+
ByteArrayInputStreamMysqlParameter that = (ByteArrayInputStreamMysqlParameter) o;
140+
return value.equals(that.value);
141+
}
142+
143+
@Override
144+
public int hashCode() {
145+
return value.hashCode();
146+
}
147+
148+
@Override
149+
public MySqlType getType() {
150+
return MySqlType.VARBINARY;
151+
}
152+
}
153+
}

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/DefaultCodecs.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ final class DefaultCodecs implements Codecs {
8181
BlobCodec.INSTANCE,
8282

8383
ByteBufferCodec.INSTANCE,
84-
ByteArrayCodec.INSTANCE
84+
ByteArrayCodec.INSTANCE,
85+
ByteArrayInputStreamCodec.INSTANCE
8586
);
8687

8788
private final List<Codec<?>> codecs;

0 commit comments

Comments
 (0)