需求描述
解决方案
Optional 支持
由于 Optional 并不支持序列化,因此无法直接将其作为方法返回值、托管给 JetCache。必须对其进行拓展,添加自定义的编码器以及解码器。
环境
依赖于 com.alicp.jetcache:jetcache-core:2.5.16。
配置
只要将自定义的编码器注册为 bean 即可。
1 2 3 4
| jetcache: remote: default: valueEncoder: bean:jetValueEncoder
|
编码器实现
- 在序列化之前,将 Optional 中的值取出,并在打上自定义的标记(IDENTITY_NUMBER 的值),对于非 Optional 类型的数据,则沿用默认的编码器器(JavaValueEncoder);
- 编码器需要注册成 spring bean,启用自定义标记之后,解码器则通过读取自定义标记进行匹配;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| @Component public class JetValueEncoder extends JavaValueEncoder {
private static final int INIT_BUF_SIZE = 256; private static final ThreadLocal<WeakReference<ByteArrayOutputStream>> THREAD_LOCAL = ThreadLocal.withInitial(() -> new WeakReference<>(new ByteArrayOutputStream(INIT_BUF_SIZE))); protected static int IDENTITY_NUMBER = 0x4A953A84;
public JetValueEncoder() { this(true); }
public JetValueEncoder(boolean useIdentityNumber) { super(useIdentityNumber); registerDecoder(); }
protected void registerDecoder() { SpringJavaValueDecoder decoder = new SpringJavaValueDecoder(true) { @Override @SuppressWarnings("unchecked") public Object doApply(byte[] buffer) throws Exception { Object obj = super.doApply(buffer); if (obj instanceof CacheValueHolder) { CacheValueHolder<Object> valueHolder = (CacheValueHolder<Object>) obj; valueHolder.setValue(Optional.ofNullable(valueHolder.getValue())); } return obj; } }; DecoderMap.register(JetValueEncoder.IDENTITY_NUMBER, decoder);
}
@Override @SuppressWarnings("unchecked") public byte[] apply(Object value) { if (value instanceof CacheValueHolder) { CacheValueHolder<Object> valueHolder = (CacheValueHolder<Object>) value; Object holderValue = ((CacheValueHolder<?>) value).getValue(); if (holderValue instanceof Optional) { Optional<?> valeOpt = (Optional<?>) holderValue; valeOpt.ifPresent(valueHolder::setValue); return doApply(value); } } return super.apply(value); }
private byte[] doApply(Object value) { try { WeakReference<ByteArrayOutputStream> ref = THREAD_LOCAL.get(); ByteArrayOutputStream bos = ref.get(); if (bos == null) { bos = new ByteArrayOutputStream(INIT_BUF_SIZE); THREAD_LOCAL.set(new WeakReference<>(bos)); }
try { if (useIdentityNumber) { bos.write((IDENTITY_NUMBER >> 24) & 0xFF); bos.write((IDENTITY_NUMBER >> 16) & 0xFF); bos.write((IDENTITY_NUMBER >> 8) & 0xFF); bos.write(IDENTITY_NUMBER & 0xFF); } ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(value); oos.flush(); return bos.toByteArray(); } finally { bos.reset(); } } catch (IOException e) { throw new CacheEncodeException("Java Encode error. " + "msg=" + e.getMessage(), e); } } }
|
分析
从源码中 SpringConfigProvider#parseValueEncoder
可以找到编码器的加载入口,当 valueEncoder 属性匹配到 “bean:”前缀的编码器时,则启用该 bean。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| protected void parseGeneralConfig(CacheBuilder builder, ConfigTree ct) { super.parseGeneralConfig(builder, ct); ExternalCacheBuilder ecb = (ExternalCacheBuilder) builder; ecb.setKeyPrefix(ct.getProperty("keyPrefix")); ecb.setValueEncoder(configProvider.parseValueEncoder(ct.getProperty("valueEncoder", CacheConsts.DEFAULT_SERIAL_POLICY))); ecb.setValueDecoder(configProvider.parseValueDecoder(ct.getProperty("valueDecoder", CacheConsts.DEFAULT_SERIAL_POLICY))); }
public Function<Object, byte[]> parseValueEncoder(String valueEncoder) { String beanName = parseBeanName(valueEncoder); if (beanName == null) { return super.parseValueEncoder(valueEncoder); } else { Object bean = applicationContext.getBean(beanName); if (bean instanceof Function) { return (Function<Object, byte[]>) bean; } else { return ((SerialPolicy) bean).encoder(); } } }
private String parseBeanName(String str) { final String beanPrefix = "bean:"; int len = beanPrefix.length(); if (str != null && str.startsWith(beanPrefix) && str.length() > len) { return str.substring(len); } else { return null; } }
public Object apply(byte[] buffer) { try { if (useIdentityNumber) { DecoderMap.registerBuildInDecoder(); int identityNumber = parseHeader(buffer); AbstractValueDecoder decoder = DecoderMap.getDecoder(identityNumber); Objects.requireNonNull(decoder, "no decoder for identity number:" + identityNumber); return decoder.doApply(buffer); } else { return doApply(buffer); } } catch (Exception e) { throw new CacheEncodeException("decode error", e); } }
|