为什么Netty要自己写一个ByteBuf
# 15.2 ByteBuf源码分析
继承关系
从内存分配的角度看,byteBuf分为两类:
- 堆内存缓冲区:优点是内存的分配和回收快。缺点是进行IO读写时需要一次内存复制,用户空间和内核空间的复制。
- 直接内存缓冲区:优缺点和堆内存缓冲区整好相反。 经验表明ByteBuf的最佳实践是在IO通信线程的读写缓冲区使用DirectByteBuf,后端业务的编解码模块使用HeapByteBuf,这样的组合可以达到性能最优。 从内存回收的角度看、ByteBuf分两类,基于对象池的ByteBuf和普通ByteBuf。使用内存池后的Netty在高并发和高负载环境下内存和GC更加平稳。 15.2.1 AbstractByteBuf源码分析 实现ByteBuf的一些公共属性和功能。 主要成员变量
static final ResourceLeakDetector<ByteBuf> leakDetector =
ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class);//用于对象是否泄漏,定义为static,意味着所有byteBuf共享
int readerIndex;//读索引
int writerIndex;//写索引
private int markedReaderIndex;//读mark
private int markedWriterIndex;//写mark
private int maxCapacity;//最大容量
2
3
4
5
6
7
8
Byte数组不在这里,因为AbstractByteBuf无法确定使用直接内存还是堆内存。 readBytes(ByteBuf dst, int dstIndex, int length)
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
checkReadableBytes(length);//校验可读性
getBytes(readerIndex, dst, dstIndex, length);//读取。从readerIndex开始读取length个字节到目标数组中
readerIndex += length;//移动读指针
return this;
}
2
3
4
5
6
再看一下checkReadableBytes():
/**
* Throws an {@link IndexOutOfBoundsException} if the current
* {@linkplain #readableBytes() readable bytes} of this buffer is less
* than the specified value.
*/
protected final void checkReadableBytes(int minimumReadableBytes) {
if (minimumReadableBytes < 0) {
throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
}
checkReadableBytes0(minimumReadableBytes);
}
private void checkReadableBytes0(int minimumReadableBytes) {
ensureAccessible();
if (readerIndex > writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
readerIndex, minimumReadableBytes, writerIndex, this));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
writeBytes(byte[] src, int srcIndex, int length)
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
ensureWritable(length);//可写校验和扩容
setBytes(writerIndex, src, srcIndex, length);//从writerIndex开始写length长度
writerIndex += length;//移动写指针
return this;
}
2
3
4
5
6
ensureWritable(int minWritableBytes)
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
ensureWritable0(minWritableBytes);
return this;
}
final void ensureWritable0(int minWritableBytes) {
ensureAccessible();//检查这个buf是否还有引用(如果已经没有引用那就没必要在写了)
if (minWritableBytes <= writableBytes()) {//写入的字节小于可写字节,校验通过
return;
}
if (minWritableBytes > maxCapacity - writerIndex) {//写入的字节大于最大可写入字节,抛异常
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the current capacity to the power of 2.
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
// Adjust to the new capacity.
capacity(newCapacity);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
重用缓冲区
public ByteBuf discardReadBytes() {
ensureAccessible();
if (readerIndex == 0) {
return this;
}
if (readerIndex != writerIndex) {
setBytes(0, this, readerIndex, writerIndex - readerIndex);//复制缓冲区
writerIndex -= readerIndex;//重置写指针
adjustMarkers(readerIndex);//调整mark指针
readerIndex = 0;//重置读指针
} else {
adjustMarkers(readerIndex);
writerIndex = readerIndex = 0;
}
return this;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
15.2.2 AbstractReferenceCountedByteBuf源码分析
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");//通过原子的方式对成员变量进行更新,消除锁
private volatile int refCnt;//跟踪对象的引用次数,采用CAS对其自增1,默认值为1
}
2
3
4
5
6
7
15.2.2 UnPooledHeapByteBuf源码分析 非池化的基于堆内存,频繁的大块内存分配和回收会对性能造成影响,但是相比对外内存的申请和释放,成本还是低一些。 相比HeapByteBuf, UnPooledHeapByteBuf的实现更加加单,也不容易出现内存管理方面的问题,因此在满足性能的条件下,推荐使用UnPooledHeapByteBuf。
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
private final ByteBufAllocator alloc;
byte[] array;//这里直接使用JDK的ByteBuffer也可以,之所以使用Byte数组是因为性能和便捷的位操作
private ByteBuffer tmpNioBuf;//用于实现将netty的byteBuf转换为JDK的ByteBuffer
}
2
3
4
5
转换JDK Buffer netty基于byte数组实现 jdk的nio buf提供wrap方法,可直接实现 转换 看下转换方法
public ByteBuffer nioBuffer(int index, int length) {
ensureAccessible();
return ByteBuffer.wrap(array, index, length).slice();
}
public ByteBuffer slice() {
return new HeapByteBuffer(hb,//仍然使用的是原buffer的全局数组,只是改变了position和limit的位置,所以新buf和原buf内容是相互影响的
-1,
0,
this.remaining(),
this.remaining(),
this.position() + offset);
}
2
3
4
5
6
7
8
9
10
11
12
slice方法的作用:copy position到limit之间的内容, 15.2.3 pooledByteBuf内存池原理分析 PoolArena是netty的内存池显现类。 为了集中管理内存,提供内存申请是释放的效率,很多框架会申请一大块内存,提供相应的接口分配和释放内存,这样就不再频繁的使用系统调用来使用内存,可以提高性能。预先申请的那块内存就叫Memory Arena。PoolArena是netty对Memory Arena的实现。 Netty的PoolArena由多个chunk组成,每个chunk由多个Page组成。 PoolArena源码:
abstract class PoolArena<T> implements PoolArenaMetric {
static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
enum SizeClass {
Tiny,
Small,
Normal
}
static final int numTinySubpagePools = 512 >>> 4;
final PooledByteBufAllocator parent;
private final int maxOrder;
final int pageSize;
final int pageShifts;
final int chunkSize;
final int subpageOverflowMask;
final int numSmallSubpagePools;
final int directMemoryCacheAlignment;
final int directMemoryCacheAlignmentMask;
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
PoolChunk的实现
PoolChunk负责多个Page的内存管理,PoolChunk将其负责的多个Page构建成一棵二叉树。
假设一个chunk由16个page组成,则其组织形式:
Page大小是4字节,chunk大小是64字节。
每个节点都记录自己在整个Memory Arena中的偏移地址,一旦被分配,则该节点及其子节点在接下来的内存分配过程中会被忽略。
举例来说,我们申请16个字节空间,则第三层的某个节点会被标记为已分配,则再次分配内存的时候会从其他三个节点中分配。
分配内存时对树采用深度优先算法,但是从哪棵子树开始深度遍历是随机的。
PoolSubPage的实现
申请内存小于一个page,则内存分配在page中完成,每个page会被分为大小相等的多个块。
被分的单位块大小等于第一次申请的内存大小,例如一个Page8字节,第一次申请2字节,则该page被切分成4块,每块2字节。而且这个page以后只能用于分配2字节的内存申请,如果再来一个4字节的内存申请,只能在另一个Page中申请。
Page使用标识位来表示内存块是否可用。维护一个long数组,每个位表示一个块的使用情况。
例如page为128字节,第一次申请内存为1字节,则该page被分为128块,则long数组中有2个元素,(每个long64位,两个long可以表示128位)。0、1表示该块是否可用。
15.2.4 PooledDirectByteBuf内存池原理分析 创建字节缓冲区
由于采用内存池实现,所以创建PooledDirectByteBuf对象不能new一个实例,而是从内存池获取。然后设置引用计数器。
static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
final void reuse(int maxCapacity) {
maxCapacity(maxCapacity);
setRefCnt(1);
setIndex0(0, 0);
discardMarks();
}
2
3
4
5
6
7
8
9
10
11
复制字节缓冲区 会从内存池中获取一个新的buffer而不是new一个。
# 15.3 ByteBuffer相关辅助类
15.3.1 ByteBufHolder
对消息体进行包装和抽象,不同的子类有不同的实现。
实现ByteBufHolder的子类可以自己实现一些实用的方法。
Netty也有一些子类继承自ByteBufHolder。
15.3.2 ByteBufAllocator
字节缓冲区分配器,其实现类有两种:基于池的和普通的。
其API:
15.3.3 CompositeByteBuf
允许将多个ByteBuf组装到一起。
使用场景:如某协议包含消息头和消息体,当对消息进行编码的时候需要进行整合。
这种场景有两种实现方式:
- 将一个buf复制到另一个buf。或者创建一个新的buf将两个buf都放到新的buf。
- 通过List或其他集合容器,将两个buf都放入容器统一维护和处理。
看下源码:
public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> {
private static final ByteBuffer EMPTY_NIO_BUFFER = Unpooled.EMPTY_BUFFER.nioBuffer();
private static final Iterator<ByteBuf> EMPTY_ITERATOR = Collections.<ByteBuf>emptyList().iterator();
private final ByteBufAllocator alloc;
private final boolean direct;
private final ComponentList components;//维护buf的容器
private final int maxNumComponents;
private boolean freed;
}
2
3
4
5
6
7
8
9
10
11
12
再看下ComponentList:
private static final class ComponentList extends ArrayList<Component> {
ComponentList(int initialCapacity) {
super(initialCapacity);
}
// Expose this methods so we not need to create a new subList just to remove a range of elements.
@Override
public void removeRange(int fromIndex, int toIndex) {
super.removeRange(fromIndex, toIndex);
}
}
private static final class Component {
final ByteBuf buf;
final int length;
int offset;//在集合中的位置偏移
int endOffset;
Component(ByteBuf buf) {
this.buf = buf;
length = buf.readableBytes();
}
void freeIfNecessary() {
buf.release(); // We should not get a NPE here. If so, it must be a bug.
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
15.3.4 ByteBufUtil 几个常用的工具方法:
- encodeString
- decodeString
- hexDump