带有 Kyro 的 Mule 3.7 提供 77% 的性能提升

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / Java 学习路线 / 一对一提问 / 学习打卡/ 赠书活动

目前,正在 星球 内带小伙伴们做第一个项目:全栈前后端分离博客项目,采用技术栈 Spring Boot + Mybatis Plus + Vue 3.x + Vite 4手把手,前端 + 后端全栈开发,从 0 到 1 讲解每个功能点开发步骤,1v1 答疑,陪伴式直到项目上线,目前已更新了 204 小节,累计 32w+ 字,讲解图:1416 张,还在持续爆肝中,后续还会上新更多项目,目标是将 Java 领域典型的项目都整上,如秒杀系统、在线商城、IM 即时通讯、权限管理等等,已有 870+ 小伙伴加入,欢迎点击围观

目前,mule 依靠普通的旧 java 序列化来将对象存储到文件或通过 mule 集群复制它们。这对以下方面施加了限制:

  • 我们可以存储的对象类型:它们需要实现可序列化接口并提供适当的构造函数。此外,类是可序列化的还不够,它的所有组合对象也必须是可序列化的。
  • 的序列化程序远不是城里最快的。

如果你经常使用 batch 模块 ,那么你已经知道在 batch 中 我们已经 使用 kryo 框架 解决了这个问题,它允许以比 java 默认更快的速度序列化更广泛的对象。然而,在 mule 3.7.0 发布之前,kryo 功能是批处理模块所独有的。如果你想在持久对象存储上使用它怎么办?那么分布式虚拟机队列呢?甚至更多:为什么我不能选择使用我选择的任何框架进行序列化?我们在 mule 3.7.0 中通过以下方式解决了这个问题:

  • 创建一个序列化 api ,以允许任何 java 开发人员贡献和使用他自己的序列化程序。
  • 我们创建了一个基于 kryo 的序列化 api 实现,它随 ee 分发一起提供并且可以轻松配置。

序列化接口

我们从头开始解决这个问题,首先创建一个序列化 api,它将 mule 及其扩展与要使用的实际序列化机制分离,还允许用户配置他们想要使用的机制,甚至提供自己的机制。

mule 每次都依赖于序列化:

  • 您从持久对象库中读/写。
  • 您从持久性 vm 或 jms 队列读取/写入。
  • 对象通过 mule 集群分布。
  • 您从文件中读取/写入对象。

更改用于此类任务的序列化机制可以大大提高功能和性能。

serialization api主要定义了以下接口:


 /*
 * copyright (c) mulesoft, inc.  all rights reserved.  http://www.mulesoft.com
 * the software in this package is published under the terms of the cpal v1.0
 * license, a copy of which has been included with this distribution in the
 * license.txt file.
 */
package org.mule.api.serialization;

import org.mule.util.store.deserializationpostinitialisable;

import java.io.inputstream; import java.io.outputstream; import java.io.serializable;

/**

  • defines a component capable to serialize/deserialize objects into/from an array of

  • {@link byte}s. unlike usual serializing components, this one doesn't enforce the

  • serialized object to implement {@link serializable}. however, some implementations

  • might require that condition and throw {@link illegalargumentexception} if not

  • met.

  • <p/>

  • implementations are also responsible for the correct initialization of classes

  • implementing the {@link deserializationpostinitialisable}

  • interface.

  • <p/>

  • <p/>

  • unexpected behavior can result of deserializing objects that were generated with

  • a different implementation of {@link objectserializer}.

  • <p/>

  • implementations are required to be thread-safe

  • @since 3.7.0 */ public interface objectserializer {

    /**

    • serializes the given object into a an array of {@link byte}s
    • @param object the object to be serialized. might be <code>null</code>
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ byte[] serialize(object object) throws serializationexception;

    /**

    • serializes the given object and writes the result into {@code out}
    • @param object the object to be serialized. might be <code>null</code>
    • @param out an {@link outputstream} where the result will be written
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ void serialize(object object, outputstream out) throws serializationexception;

    /**

    • deserializes the given bytes. unexpected behavior can result of deserializing
    • a byte[] that was generated with another implementation.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • @param bytes an array of byte that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes) throws serializationexception;

    /**

    • deserializes the given bytes.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param bytes an array of byte that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes, classloader classloader) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream, classloader classloader) throws serializationexception; }

本合同的主要概念是:

  • 它与 byte[] 以及流进行序列化。
  • 它是线程安全的。
  • 序列化时,通过传递输出流来支持流式传输。
  • 反序列化时,通过允许输入流作为输入源来支持流式传输。
  • 反序列化时,您可以指定要使用的类加载器。默认情况下,使用当前执行的那个。
  • 反序列化时,如果获取的对象实现了 deserializationpostinitialisable 接口,则序列化器将负责在返回对象之前正确初始化该对象。

配置

默认情况下,mule 将像往常一样继续使用 java 序列化,没有开箱即用的变化。然而,每个应用程序都可以使用 mule 的 <configuration> 标签来配置它想要使用的对象序列化器:


 /*
 * copyright (c) mulesoft, inc.  all rights reserved.  http://www.mulesoft.com
 * the software in this package is published under the terms of the cpal v1.0
 * license, a copy of which has been included with this distribution in the
 * license.txt file.
 */
package org.mule.api.serialization;

import org.mule.util.store.deserializationpostinitialisable;

import java.io.inputstream; import java.io.outputstream; import java.io.serializable;

/**

  • defines a component capable to serialize/deserialize objects into/from an array of

  • {@link byte}s. unlike usual serializing components, this one doesn't enforce the

  • serialized object to implement {@link serializable}. however, some implementations

  • might require that condition and throw {@link illegalargumentexception} if not

  • met.

  • <p/>

  • implementations are also responsible for the correct initialization of classes

  • implementing the {@link deserializationpostinitialisable}

  • interface.

  • <p/>

  • <p/>

  • unexpected behavior can result of deserializing objects that were generated with

  • a different implementation of {@link objectserializer}.

  • <p/>

  • implementations are required to be thread-safe

  • @since 3.7.0 */ public interface objectserializer {

    /**

    • serializes the given object into a an array of {@link byte}s
    • @param object the object to be serialized. might be <code>null</code>
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ byte[] serialize(object object) throws serializationexception;

    /**

    • serializes the given object and writes the result into {@code out}
    • @param object the object to be serialized. might be <code>null</code>
    • @param out an {@link outputstream} where the result will be written
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ void serialize(object object, outputstream out) throws serializationexception;

    /**

    • deserializes the given bytes. unexpected behavior can result of deserializing
    • a byte[] that was generated with another implementation.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • @param bytes an array of byte that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes) throws serializationexception;

    /**

    • deserializes the given bytes.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param bytes an array of byte that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes, classloader classloader) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream, classloader classloader) throws serializationexception; }


注意: 唯一一个行为不会受此组件影响的组件是批处理模块,出于其自身的功能原因,无论如何都需要使用 kryo 来工作。

获取配置的对象序列化器

有很多方法可以获得对象序列化器。推荐的方法是通过依赖注入。下面显示了如何获取已配置为默认的 objectserializer:


 /*
 * copyright (c) mulesoft, inc.  all rights reserved.  http://www.mulesoft.com
 * the software in this package is published under the terms of the cpal v1.0
 * license, a copy of which has been included with this distribution in the
 * license.txt file.
 */
package org.mule.api.serialization;

import org.mule.util.store.deserializationpostinitialisable;

import java.io.inputstream; import java.io.outputstream; import java.io.serializable;

/**

  • defines a component capable to serialize/deserialize objects into/from an array of

  • {@link byte}s. unlike usual serializing components, this one doesn't enforce the

  • serialized object to implement {@link serializable}. however, some implementations

  • might require that condition and throw {@link illegalargumentexception} if not

  • met.

  • <p/>

  • implementations are also responsible for the correct initialization of classes

  • implementing the {@link deserializationpostinitialisable}

  • interface.

  • <p/>

  • <p/>

  • unexpected behavior can result of deserializing objects that were generated with

  • a different implementation of {@link objectserializer}.

  • <p/>

  • implementations are required to be thread-safe

  • @since 3.7.0 */ public interface objectserializer {

    /**

    • serializes the given object into a an array of {@link byte}s
    • @param object the object to be serialized. might be <code>null</code>
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ byte[] serialize(object object) throws serializationexception;

    /**

    • serializes the given object and writes the result into {@code out}
    • @param object the object to be serialized. might be <code>null</code>
    • @param out an {@link outputstream} where the result will be written
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ void serialize(object object, outputstream out) throws serializationexception;

    /**

    • deserializes the given bytes. unexpected behavior can result of deserializing
    • a byte[] that was generated with another implementation.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • @param bytes an array of byte that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes) throws serializationexception;

    /**

    • deserializes the given bytes.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param bytes an array of byte that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes, classloader classloader) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream, classloader classloader) throws serializationexception; }


相反,如果你想要一个特定的命名序列化器(无论它是否是默认的)你也可以这样做:


 /*
 * copyright (c) mulesoft, inc.  all rights reserved.  http://www.mulesoft.com
 * the software in this package is published under the terms of the cpal v1.0
 * license, a copy of which has been included with this distribution in the
 * license.txt file.
 */
package org.mule.api.serialization;

import org.mule.util.store.deserializationpostinitialisable;

import java.io.inputstream; import java.io.outputstream; import java.io.serializable;

/**

  • defines a component capable to serialize/deserialize objects into/from an array of

  • {@link byte}s. unlike usual serializing components, this one doesn't enforce the

  • serialized object to implement {@link serializable}. however, some implementations

  • might require that condition and throw {@link illegalargumentexception} if not

  • met.

  • <p/>

  • implementations are also responsible for the correct initialization of classes

  • implementing the {@link deserializationpostinitialisable}

  • interface.

  • <p/>

  • <p/>

  • unexpected behavior can result of deserializing objects that were generated with

  • a different implementation of {@link objectserializer}.

  • <p/>

  • implementations are required to be thread-safe

  • @since 3.7.0 */ public interface objectserializer {

    /**

    • serializes the given object into a an array of {@link byte}s
    • @param object the object to be serialized. might be <code>null</code>
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ byte[] serialize(object object) throws serializationexception;

    /**

    • serializes the given object and writes the result into {@code out}
    • @param object the object to be serialized. might be <code>null</code>
    • @param out an {@link outputstream} where the result will be written
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ void serialize(object object, outputstream out) throws serializationexception;

    /**

    • deserializes the given bytes. unexpected behavior can result of deserializing
    • a byte[] that was generated with another implementation.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • @param bytes an array of byte that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes) throws serializationexception;

    /**

    • deserializes the given bytes.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param bytes an array of byte that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes, classloader classloader) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream, classloader classloader) throws serializationexception; }


最后,你总是可以从 mulecontext 中提取它,尽管依赖注入是首选:


 /*
 * copyright (c) mulesoft, inc.  all rights reserved.  http://www.mulesoft.com
 * the software in this package is published under the terms of the cpal v1.0
 * license, a copy of which has been included with this distribution in the
 * license.txt file.
 */
package org.mule.api.serialization;

import org.mule.util.store.deserializationpostinitialisable;

import java.io.inputstream; import java.io.outputstream; import java.io.serializable;

/**

  • defines a component capable to serialize/deserialize objects into/from an array of

  • {@link byte}s. unlike usual serializing components, this one doesn't enforce the

  • serialized object to implement {@link serializable}. however, some implementations

  • might require that condition and throw {@link illegalargumentexception} if not

  • met.

  • <p/>

  • implementations are also responsible for the correct initialization of classes

  • implementing the {@link deserializationpostinitialisable}

  • interface.

  • <p/>

  • <p/>

  • unexpected behavior can result of deserializing objects that were generated with

  • a different implementation of {@link objectserializer}.

  • <p/>

  • implementations are required to be thread-safe

  • @since 3.7.0 */ public interface objectserializer {

    /**

    • serializes the given object into a an array of {@link byte}s
    • @param object the object to be serialized. might be <code>null</code>
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ byte[] serialize(object object) throws serializationexception;

    /**

    • serializes the given object and writes the result into {@code out}
    • @param object the object to be serialized. might be <code>null</code>
    • @param out an {@link outputstream} where the result will be written
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ void serialize(object object, outputstream out) throws serializationexception;

    /**

    • deserializes the given bytes. unexpected behavior can result of deserializing
    • a byte[] that was generated with another implementation.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • @param bytes an array of byte that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes) throws serializationexception;

    /**

    • deserializes the given bytes.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param bytes an array of byte that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes, classloader classloader) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream, classloader classloader) throws serializationexception; }


kryo 序列化器

对于 ee 用户,我们还将提供依赖于 kryo 框架的 objectserializer 的第二个实现。使用 kryo 提供:

  • 更好的性能。 kryo 比 java 序列化快得多
  • 支持更广泛的 java 类型。 kryo 不受 java 序列化强加的大多数限制的限制,例如要求实现可序列化接口、具有默认构造函数等(这 并不 意味着它可以序列化任何东西)
  • 支持压缩:您可以使用 deflate 或 gzip 压缩算法

添加了一个新的 kryo 命名空间来配置此序列化程序:


 /*
 * copyright (c) mulesoft, inc.  all rights reserved.  http://www.mulesoft.com
 * the software in this package is published under the terms of the cpal v1.0
 * license, a copy of which has been included with this distribution in the
 * license.txt file.
 */
package org.mule.api.serialization;

import org.mule.util.store.deserializationpostinitialisable;

import java.io.inputstream; import java.io.outputstream; import java.io.serializable;

/**

  • defines a component capable to serialize/deserialize objects into/from an array of

  • {@link byte}s. unlike usual serializing components, this one doesn't enforce the

  • serialized object to implement {@link serializable}. however, some implementations

  • might require that condition and throw {@link illegalargumentexception} if not

  • met.

  • <p/>

  • implementations are also responsible for the correct initialization of classes

  • implementing the {@link deserializationpostinitialisable}

  • interface.

  • <p/>

  • <p/>

  • unexpected behavior can result of deserializing objects that were generated with

  • a different implementation of {@link objectserializer}.

  • <p/>

  • implementations are required to be thread-safe

  • @since 3.7.0 */ public interface objectserializer {

    /**

    • serializes the given object into a an array of {@link byte}s
    • @param object the object to be serialized. might be <code>null</code>
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ byte[] serialize(object object) throws serializationexception;

    /**

    • serializes the given object and writes the result into {@code out}
    • @param object the object to be serialized. might be <code>null</code>
    • @param out an {@link outputstream} where the result will be written
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ void serialize(object object, outputstream out) throws serializationexception;

    /**

    • deserializes the given bytes. unexpected behavior can result of deserializing
    • a byte[] that was generated with another implementation.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • @param bytes an array of byte that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes) throws serializationexception;

    /**

    • deserializes the given bytes.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param bytes an array of byte that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes, classloader classloader) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream, classloader classloader) throws serializationexception; }


上面的配置将默认序列化器设置为基于 kryo 的序列化器。此外,您还可以像这样配置压缩:


 /*
 * copyright (c) mulesoft, inc.  all rights reserved.  http://www.mulesoft.com
 * the software in this package is published under the terms of the cpal v1.0
 * license, a copy of which has been included with this distribution in the
 * license.txt file.
 */
package org.mule.api.serialization;

import org.mule.util.store.deserializationpostinitialisable;

import java.io.inputstream; import java.io.outputstream; import java.io.serializable;

/**

  • defines a component capable to serialize/deserialize objects into/from an array of

  • {@link byte}s. unlike usual serializing components, this one doesn't enforce the

  • serialized object to implement {@link serializable}. however, some implementations

  • might require that condition and throw {@link illegalargumentexception} if not

  • met.

  • <p/>

  • implementations are also responsible for the correct initialization of classes

  • implementing the {@link deserializationpostinitialisable}

  • interface.

  • <p/>

  • <p/>

  • unexpected behavior can result of deserializing objects that were generated with

  • a different implementation of {@link objectserializer}.

  • <p/>

  • implementations are required to be thread-safe

  • @since 3.7.0 */ public interface objectserializer {

    /**

    • serializes the given object into a an array of {@link byte}s
    • @param object the object to be serialized. might be <code>null</code>
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ byte[] serialize(object object) throws serializationexception;

    /**

    • serializes the given object and writes the result into {@code out}
    • @param object the object to be serialized. might be <code>null</code>
    • @param out an {@link outputstream} where the result will be written
    • @return an array of {@link byte}
    • @throws serializationexception in case of unexpected exception */ void serialize(object object, outputstream out) throws serializationexception;

    /**

    • deserializes the given bytes. unexpected behavior can result of deserializing
    • a byte[] that was generated with another implementation.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • @param bytes an array of byte that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes) throws serializationexception;

    /**

    • deserializes the given bytes.
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param bytes an array of byte that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code bytes} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(byte[] bytes, classloader classloader) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • implementation will choose the {@link java.lang.classloader}
    • to use for deserialization.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream) throws serializationexception;

    /**

    • deserializes the given stream of bytes.
    • <p/>
    • even if deserialization fails, this method will close the
    • {@code inputstream}
    • <p/>
    • if the obtained object implements {@link deserializationpostinitialisable}
    • then this serializer will be responsible for properly initializing
    • the object before returning it.
    • @param inputstream a stream of bytes that an original object was serialized into
    • @param classloader the {@link java.lang.classloader} to deserialize with
    • @return the deserialized object
    • @throws illegalargumentexception if {@code inputstream} is {@code null}
    • @throws serializationexception in case of unexpected exception */ <t> t deserialize(inputstream inputstream, classloader classloader) throws serializationexception; }


使用 kryo 时的性能

使用 kryo 将在使用以下组件时提供性能改进:

  • 持久或集群对象存储
  • 持久或分布式虚拟机队列
  • jms传输

因为在您衡量之前没有任何改进,所以我们请性能团队的强大 luciano gandini 进行一些测试。他运行了一些测试来比较使用标准 java 序列化程序和 kryo 的不同设置时持久/分布式 vm 队列和对象存储的每秒事务数 (tps)。这是一些结果……

持久虚拟机队列

持久对象存储

分布式虚拟机队列 (ha)

分布式对象存储 (ha)

关于压缩

正如您在上面的图表中看到的,在所有情况下,没有压缩的 kryo 都比标准序列化程序快得多。然而,压缩模式只提供了对 ha 情况的实际改进。这是为什么?执行这些测试的环境具有非常好的 io 速度。为了使压缩有价值,cpu 用于压缩-解压缩的时间必须明显低于通过减少有效负载大小节省的 io 时间。因为网络操作比磁盘操作慢(至少在我们的环境中)并且因为 ha 集群需要节点复制(这转化为更多的流量),只有在 ha 情况下压缩才有回报。这是一个普遍的常数吗?绝对不!你可能在磁盘较慢或 io 需求较高的机器上运行 mule,在任何情况下压缩都值得。此外,这些测试是使用 1mb 的有效负载执行的,但数据流越大,压缩就越有价值。

业绩总结

简单地说,这些是性能的结果:

从上表得出的结论是:

  • 使用分布式对象存储时,性能最高可提高 77.13%,使用分布式虚拟机队列时,性能最高可提高 63.77%,使用本地持久性虚拟机队列时,性能最高可提高 64.71%。
  • 尽管本地对象存储没有显示出太大的改进(使用压缩时它们实际上更慢),但没有任何用例在使用 kryo 时没有获得一定程度的增益。

一如既往,请记住,绩效结果始终被视为指导方针,而不是绝对事实。根据您的应用程序、环境、负载大小等,每种情况下的实际输出可能会有所不同。

局限性和注意事项

尽管如此,这仍然不是灵丹妙药。请考虑以下因素:

更改序列化程序需要一个干净的平台

序列化程序不可互操作,也不可互换(至少我们发布 ootb 的序列化程序)。这意味着,如果您决定更改应用程序使用的序列化程序,则需要确保 vm/jms 队列中的所有消息都已被消耗,并且在新序列化程序启动时这些队列为空。这是因为 kryo 序列化程序将无法读取由 Java 编写的数据报,反之亦然。同样的事情也适用于持久对象存储。如果您尝试读取由不同的序列化程序生成的条目,您会发现自己运气不好。

共享虚拟机连接器中的序列化

mule esb 的 3.5.0 版引入了域的概念,作为应用程序之间共享资源的一种方式。例如,您可以在域上定义一个虚拟机 连接器 ,以允许通过虚拟机消息队列进行应用程序间通信。但是,序列化程序只能在应用程序级别进行配置,不能在域中进行配置。那么,如果两个应用程序(a 和 b)通过在两者所属的域上定义的 vm 连接器相互通信,但 a 使用 java 序列化,b 使用 kryo 序列化,会发生什么情况?答案是:它只是有效。每当任一应用程序尝试写入使用共享连接器的端点时,该特定消息将不会使用应用程序的序列化程序进行序列化,而是使用 vm 连接器正在使用的序列化程序。所以这很好,对吧?是的,从即插即用体验的角度来看,这很好。但请注意,您将无法告诉共享虚拟机连接器使用 kryo 并从中获得性能改进。

为什么本地持久对象存储案例几乎没有改进?

与其他情况不同,本地持久对象库没有显示出太大的改进。那是因为对象存储实现的激烈竞争几乎吸收了所有的收益。我们将在未来的版本中单独解决这个问题。

为什么没有图表显示 jms 的改进?

根据 jms api,队列不适用于原始负载对象。相反,您必须提供 javax.jms.message 类的实例。然后代理客户端负责序列化它,而不是骡子。因此,kryo 在这种情况下的影响是最小的。将 kryo 与 jms 一起使用的唯一性能提升是 mule 序列化 mulesession 并将其作为 base64 格式的标头。使用 kryo 序列化 mulesession 可以为您提供高达 10% 的性能速度,但我们不将其视为示例用例,因为序列化的大部分取决于 jms broker 而不是 mule。

有问题的类型

虽然 kryo 能够序列化未实现可序列化接口的对象,但将 kryo 设置为默认序列化器并不意味着 vm 传输、objectserializer 或集群等组件将能够处理未实现此类接口的对象.那是因为即使 kryo 可以处理这些对象,这些组件背后的 java api 仍然期望在它们的方法签名中有可序列化的实例。但是请注意,虽然标准序列化会因实现可序列化接口但包含另一个未实现可序列化接口的对象而失败,但 kryo 很可能(但不保证)成功。一个典型的例子是一个包含 org.apache.xerces.jaxp.datatype.xmlgregoriancalendarimpl 的 pojo,就像 netsuite ms 动态 云连接器 所做的那样

包起来

当我们着手进行这项改进时,我们知道这会产生很大的影响,但老实说,这大大超出了我们的预期。我个人并不期望在最好的情况下有超过 30% 的改进。看到最好的情况有 77% 的峰值改善,最常见的情况有 64% 的改善,这让我很高兴。我希望它也能让你开心。

一如既往,请不要拖延关于如何不断改进这一点的反馈。