JavaDriver JavaDriver
首页
  • 基础
  • 并发
  • JVM
  • 设计模式
  • 计算机网络
  • 操作系统
  • 数据结构
  • 算法
  • MYSQL
  • REDIS
  • Netty
  • Kafka
系统设计
非技术
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

YoungAnn

西二旗Java老司机一枚 致力于社会主义添砖Java
首页
  • 基础
  • 并发
  • JVM
  • 设计模式
  • 计算机网络
  • 操作系统
  • 数据结构
  • 算法
  • MYSQL
  • REDIS
  • Netty
  • Kafka
系统设计
非技术
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Netty

    • Netty
    • TCP粘包拆包问题的解决之道
    • Netty中有哪些解码器?
    • 为什么Netty要自己写一个ByteBuf
      • 15.2 ByteBuf源码分析
      • 15.3 ByteBuffer相关辅助类
    • ChannelPipeline和ChannelHandler
    • EventLoop和EventLoopGroup
    • Java NIO了解吗?
  • Kafka

  • 中间件
  • Netty
YoungAnn
2022-05-21
目录

为什么Netty要自己写一个ByteBuf

# 15.2 ByteBuf源码分析

继承关系 从内存分配的角度看,byteBuf分为两类:

  • 堆内存缓冲区:优点是内存的分配和回收快。缺点是进行IO读写时需要一次内存复制,用户空间和内核空间的复制。
  • 直接内存缓冲区:优缺点和堆内存缓冲区整好相反。 经验表明ByteBuf的最佳实践是在IO通信线程的读写缓冲区使用DirectByteBuf,后端业务的编解码模块使用HeapByteBuf,这样的组合可以达到性能最优。 从内存回收的角度看、ByteBuf分两类,基于对象池的ByteBuf和普通ByteBuf。使用内存池后的Netty在高并发和高负载环境下内存和GC更加平稳。 15.2.1 AbstractByteBuf源码分析 实现ByteBuf的一些公共属性和功能。 主要成员变量
    static final ResourceLeakDetector<ByteBuf> leakDetector =
            ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class);//用于对象是否泄漏,定义为static,意味着所有byteBuf共享

    int readerIndex;//读索引
    int writerIndex;//写索引
    private int markedReaderIndex;//读mark
    private int markedWriterIndex;//写mark
    private int maxCapacity;//最大容量
1
2
3
4
5
6
7
8

Byte数组不在这里,因为AbstractByteBuf无法确定使用直接内存还是堆内存。 readBytes(ByteBuf dst, int dstIndex, int length)

public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
       checkReadableBytes(length);//校验可读性
       getBytes(readerIndex, dst, dstIndex, length);//读取。从readerIndex开始读取length个字节到目标数组中
       readerIndex += length;//移动读指针
       return this;
   }
1
2
3
4
5
6

再看一下checkReadableBytes():

/**
     * Throws an {@link IndexOutOfBoundsException} if the current
     * {@linkplain #readableBytes() readable bytes} of this buffer is less
     * than the specified value.
     */
    protected final void checkReadableBytes(int minimumReadableBytes) {
        if (minimumReadableBytes < 0) {
            throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
        }
        checkReadableBytes0(minimumReadableBytes);
    }
    
private void checkReadableBytes0(int minimumReadableBytes) {
        ensureAccessible();
        if (readerIndex > writerIndex - minimumReadableBytes) {
            throw new IndexOutOfBoundsException(String.format(
                    "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
                    readerIndex, minimumReadableBytes, writerIndex, this));
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

writeBytes(byte[] src, int srcIndex, int length)

public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
        ensureWritable(length);//可写校验和扩容
        setBytes(writerIndex, src, srcIndex, length);//从writerIndex开始写length长度
        writerIndex += length;//移动写指针
        return this;
    }
1
2
3
4
5
6

ensureWritable(int minWritableBytes)

public ByteBuf ensureWritable(int minWritableBytes) {
        if (minWritableBytes < 0) {
            throw new IllegalArgumentException(String.format(
                    "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
        }
        ensureWritable0(minWritableBytes);
        return this;
    }
    
final void ensureWritable0(int minWritableBytes) {
        ensureAccessible();//检查这个buf是否还有引用(如果已经没有引用那就没必要在写了)
        if (minWritableBytes <= writableBytes()) {//写入的字节小于可写字节,校验通过
            return;
        }

        if (minWritableBytes > maxCapacity - writerIndex) {//写入的字节大于最大可写入字节,抛异常
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }

        // Normalize the current capacity to the power of 2.
        int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

        // Adjust to the new capacity.
        capacity(newCapacity);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

重用缓冲区

public ByteBuf discardReadBytes() {
        ensureAccessible();
        if (readerIndex == 0) {
            return this;
        }

        if (readerIndex != writerIndex) {
            setBytes(0, this, readerIndex, writerIndex - readerIndex);//复制缓冲区
            writerIndex -= readerIndex;//重置写指针
            adjustMarkers(readerIndex);//调整mark指针
            readerIndex = 0;//重置读指针
        } else {
            adjustMarkers(readerIndex);
            writerIndex = readerIndex = 0;
        }
        return this;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

15.2.2 AbstractReferenceCountedByteBuf源码分析

public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {

    private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
            AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");//通过原子的方式对成员变量进行更新,消除锁

    private volatile int refCnt;//跟踪对象的引用次数,采用CAS对其自增1,默认值为1
}
1
2
3
4
5
6
7

15.2.2 UnPooledHeapByteBuf源码分析 非池化的基于堆内存,频繁的大块内存分配和回收会对性能造成影响,但是相比对外内存的申请和释放,成本还是低一些。 相比HeapByteBuf, UnPooledHeapByteBuf的实现更加加单,也不容易出现内存管理方面的问题,因此在满足性能的条件下,推荐使用UnPooledHeapByteBuf。

public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
private final ByteBufAllocator alloc;
    byte[] array;//这里直接使用JDK的ByteBuffer也可以,之所以使用Byte数组是因为性能和便捷的位操作
    private ByteBuffer tmpNioBuf;//用于实现将netty的byteBuf转换为JDK的ByteBuffer
 }
1
2
3
4
5

转换JDK Buffer netty基于byte数组实现 jdk的nio buf提供wrap方法,可直接实现 转换 看下转换方法

public ByteBuffer nioBuffer(int index, int length) {
        ensureAccessible();
        return ByteBuffer.wrap(array, index, length).slice();
    }
public ByteBuffer slice() {
        return new HeapByteBuffer(hb,//仍然使用的是原buffer的全局数组,只是改变了position和limit的位置,所以新buf和原buf内容是相互影响的
                                        -1,
                                        0,
                                        this.remaining(),
                                        this.remaining(),
                                        this.position() + offset);
    }
1
2
3
4
5
6
7
8
9
10
11
12

slice方法的作用:copy position到limit之间的内容, 15.2.3 pooledByteBuf内存池原理分析 PoolArena是netty的内存池显现类。 为了集中管理内存,提供内存申请是释放的效率,很多框架会申请一大块内存,提供相应的接口分配和释放内存,这样就不再频繁的使用系统调用来使用内存,可以提高性能。预先申请的那块内存就叫Memory Arena。PoolArena是netty对Memory Arena的实现。 Netty的PoolArena由多个chunk组成,每个chunk由多个Page组成。 PoolArena源码:

abstract class PoolArena<T> implements PoolArenaMetric {
    static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();

    enum SizeClass {
        Tiny,
        Small,
        Normal
    }

    static final int numTinySubpagePools = 512 >>> 4;

    final PooledByteBufAllocator parent;

    private final int maxOrder;
    final int pageSize;
    final int pageShifts;
    final int chunkSize;
    final int subpageOverflowMask;
    final int numSmallSubpagePools;
    final int directMemoryCacheAlignment;
    final int directMemoryCacheAlignmentMask;
    private final PoolSubpage<T>[] tinySubpagePools;
    private final PoolSubpage<T>[] smallSubpagePools;

    private final PoolChunkList<T> q050;
    private final PoolChunkList<T> q025;
    private final PoolChunkList<T> q000;
    private final PoolChunkList<T> qInit;
    private final PoolChunkList<T> q075;
    private final PoolChunkList<T> q100;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

PoolChunk的实现 PoolChunk负责多个Page的内存管理,PoolChunk将其负责的多个Page构建成一棵二叉树。 假设一个chunk由16个page组成,则其组织形式: Page大小是4字节,chunk大小是64字节。 每个节点都记录自己在整个Memory Arena中的偏移地址,一旦被分配,则该节点及其子节点在接下来的内存分配过程中会被忽略。 举例来说,我们申请16个字节空间,则第三层的某个节点会被标记为已分配,则再次分配内存的时候会从其他三个节点中分配。 分配内存时对树采用深度优先算法,但是从哪棵子树开始深度遍历是随机的。 PoolSubPage的实现 申请内存小于一个page,则内存分配在page中完成,每个page会被分为大小相等的多个块。 被分的单位块大小等于第一次申请的内存大小,例如一个Page8字节,第一次申请2字节,则该page被切分成4块,每块2字节。而且这个page以后只能用于分配2字节的内存申请,如果再来一个4字节的内存申请,只能在另一个Page中申请。 Page使用标识位来表示内存块是否可用。维护一个long数组,每个位表示一个块的使用情况。 例如page为128字节,第一次申请内存为1字节,则该page被分为128块,则long数组中有2个元素,(每个long64位,两个long可以表示128位)。0、1表示该块是否可用。 15.2.4 PooledDirectByteBuf内存池原理分析 创建字节缓冲区 由于采用内存池实现,所以创建PooledDirectByteBuf对象不能new一个实例,而是从内存池获取。然后设置引用计数器。

static PooledDirectByteBuf newInstance(int maxCapacity) {
        PooledDirectByteBuf buf = RECYCLER.get();
        buf.reuse(maxCapacity);
        return buf;
    }
final void reuse(int maxCapacity) {
        maxCapacity(maxCapacity);
        setRefCnt(1);
        setIndex0(0, 0);
        discardMarks();
    }
1
2
3
4
5
6
7
8
9
10
11

复制字节缓冲区 会从内存池中获取一个新的buffer而不是new一个。

# 15.3 ByteBuffer相关辅助类

15.3.1 ByteBufHolder 对消息体进行包装和抽象,不同的子类有不同的实现。 实现ByteBufHolder的子类可以自己实现一些实用的方法。 Netty也有一些子类继承自ByteBufHolder。 15.3.2 ByteBufAllocator 字节缓冲区分配器,其实现类有两种:基于池的和普通的。 其API: 15.3.3 CompositeByteBuf 允许将多个ByteBuf组装到一起。 使用场景:如某协议包含消息头和消息体,当对消息进行编码的时候需要进行整合。 这种场景有两种实现方式:

  • 将一个buf复制到另一个buf。或者创建一个新的buf将两个buf都放到新的buf。
  • 通过List或其他集合容器,将两个buf都放入容器统一维护和处理。

看下源码:

public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> {

   private static final ByteBuffer EMPTY_NIO_BUFFER = Unpooled.EMPTY_BUFFER.nioBuffer();
   private static final Iterator<ByteBuf> EMPTY_ITERATOR = Collections.<ByteBuf>emptyList().iterator();

   private final ByteBufAllocator alloc;
   private final boolean direct;
   private final ComponentList components;//维护buf的容器
   private final int maxNumComponents;

   private boolean freed;
   }
1
2
3
4
5
6
7
8
9
10
11
12

再看下ComponentList:

private static final class ComponentList extends ArrayList<Component> {

        ComponentList(int initialCapacity) {
            super(initialCapacity);
        }

        // Expose this methods so we not need to create a new subList just to remove a range of elements.
        @Override
        public void removeRange(int fromIndex, int toIndex) {
            super.removeRange(fromIndex, toIndex);
        }
    }

private static final class Component {
        final ByteBuf buf;
        final int length;
        int offset;//在集合中的位置偏移
        int endOffset;

        Component(ByteBuf buf) {
            this.buf = buf;
            length = buf.readableBytes();
        }

        void freeIfNecessary() {
            buf.release(); // We should not get a NPE here. If so, it must be a bug.
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

15.3.4 ByteBufUtil 几个常用的工具方法:

  • encodeString
  • decodeString
  • hexDump
编辑 (opens new window)
上次更新: 2022/05/22, 00:01:01
Netty中有哪些解码器?
ChannelPipeline和ChannelHandler

← Netty中有哪些解码器? ChannelPipeline和ChannelHandler→

最近更新
01
电商-商品系统设计
12-17
02
关于如何写OKR
12-09
03
对事不对人 vs 对人不对事
12-09
更多文章>
Theme by Vdoing | Copyright © 2022-2023 YoungAnnn | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式