环形缓冲区

服务器接收客户端数据需要缓冲区,缓冲区的主要作用有粘包和拆包。

如果出现了半包的情况,缓冲区内已经处理过的数据不需要了再处理了,但未被解析的数据需要一次array.copy使未读数据前移,等待下次接收形成完整数据。

环形缓冲区相比于普通缓冲区可以去掉未读数据前移这一步。

环形缓冲区的实现

upgraded/ringbuffer at main · thisred/upgraded (github.com)

环形缓冲区有很多实现,其中我觉得最好的一种就是:记录读总数和写总数,写总数减去读总数就是未读消息长度。写下标和读下标可通过取余得到。

环形缓冲区的写入

向环形缓冲区写入可分为两种情况:

  • (写下标 + 未写入数据长度) % 数组长度 < 数组长度。这种情况直接写就行
  • (写下标 + 未写入数据长度) % 数组长度 > 数组长度。超出的部分,从零开始写,直接覆盖掉之前写过的数据就行。

环形缓冲区的读取

从读下标读取所需长度即可,基本和写入的正常情况一致。

public class RingBuffer
{
    private readonly int _capacity;

    /// <summary>
    /// 读总数
    /// </summary>
    private uint _readCount;

    /// <summary>
    /// 写总数
    /// </summary>
    private uint _writeCount;

    /// <summary>
    /// 构造函数,初始化缓冲区,设置缓冲区大小
    /// </summary>
    /// <param name="size">缓冲区大小,必须为2的幂</param>
    public RingBuffer(int size = 8192) : this(new byte[size])
    {
    }


    private RingBuffer(byte[] buffer)
    {
        if ((buffer.Length & (buffer.Length - 1)) != 0)
            throw new Exception("数组长度必须为2的幂");

        Array = buffer;
        Capacity = buffer.Length;
        _readCount = 0;
        _writeCount = 0;
    }

    /// <summary>
    /// 缓冲区大小,必须为2的幂
    /// </summary>
    public int Capacity
    {
        get => _capacity;
        private init
        {
            _capacity = value;
            _bufferSizeMask = value - 1;
        }
    }

    private readonly int _bufferSizeMask;

    public byte[] Array { get; }

    /// <summary>
    /// 写下标
    /// </summary>
    public int WriteIndex => (int)(_writeCount & (uint)_bufferSizeMask);

    /// <summary>
    /// 读下标
    /// </summary>
    public int ReadIndex => (int)(_readCount & (uint)_bufferSizeMask);

    /// <summary>
    /// 缓冲区未读数据长度
    /// </summary>
    public int DataLength => (int)(_writeCount - _readCount);

    /// <summary>
    /// 剩余空间
    /// </summary>
    public int RemainingLength => Capacity - DataLength;

    /// <summary>
    /// 向环形缓冲区中写给定数组
    /// </summary>
    /// <param name="srcBytes">要写入缓冲区的数组</param>
    /// <param name="srcOffset">给定数组的偏移量</param>
    /// <param name="count">写入长度</param>
    /// <returns>是否写入完毕</returns>
    public void Write(byte[] srcBytes, int srcOffset, int count)
    {
        if (srcBytes == null || srcBytes.Length == 0) return;
        if (srcOffset > srcBytes.Length) return;


        // 剩余空间 = 数组长度 - 写下标
        var remainingLength = Capacity - WriteIndex;
        // 剩余空间 > 写入长度
        if (remainingLength > count)
        {
            System.Buffer.BlockCopy(srcBytes, srcOffset, Array, WriteIndex, count);
        }
        else
        {
            // 剩余空间 < 写入长度
            System.Buffer.BlockCopy(srcBytes, srcOffset, Array, WriteIndex, remainingLength);
            System.Buffer.BlockCopy(srcBytes, remainingLength, Array, 0, count - remainingLength);
        }

        _writeCount += (uint)count;
    }

    /// <summary>
    ///     读取数据到目标字节数组
    /// </summary>
    /// <param name="destination">目标字节数组</param>
    /// <param name="dstOffset">目标字节数组偏移量</param>
    /// <param name="count">读取长度</param>
    /// <returns>读取长度</returns>
    public void ReadBytes(byte[] destination, int dstOffset, int count)
    {
        if (destination == null) return;
        if (dstOffset > destination.Length) return;

        if (DataLength < count) return;
        var remainingLength = Capacity - ReadIndex;
        if (remainingLength > count)
        {
            System.Buffer.BlockCopy(Array, ReadIndex, destination, dstOffset, count);
        }
        else
        {
            System.Buffer.BlockCopy(Array, ReadIndex, destination, 0, remainingLength);
            System.Buffer.BlockCopy(Array, 0, destination, remainingLength, count - remainingLength);
        }

        _readCount += (uint)count;
    }
}

大致实现就是这样,每次有消息过来就写在缓冲区里,增加写总数,读取就增加读总数,写总数减去读总数就是未读消息大小。

其他

用uint声明写总数和读总数为了使写总数与读总数超过uint范围时,此时相减还能得到未读消息大小。

缓冲区大小必须为2的幂是因为,写下标和读下标可直接用与运算得到下标值,避免了使用%取余。

返回顶部