环形缓冲区

服务器接收数据可看做生产者消费者模式,客户端数据传入可看做生产者,服务器读取数据可看做消费者。

这就出现两种情况:

  • 生产者生产的速度 < 消费者消费的速度:这种情况最好,客户端的数据来了就能立即处理完成。
  • 生产者生产的速度 > 消费者消费的速度:这种情况最糟,客户端发来的数据,服务器处理的慢,这时候客户端的体验就很差,如果缓冲区写满还要另行处理。

服务器接收客户端数据需要缓冲区,缓冲区的主要作用有粘包和拆包。

如果出现了半包的情况,缓冲区内已经处理过的数据不需要了再处理了,但未被解析的数据需要一次array.copy使未读数据前移,等待下次接收形成完整数据。

环形缓冲区相比于普通缓冲区可以去掉未读数据前移这一步。

环形缓冲区的实现

环形缓冲区有很多实现,其中我觉得最好的一种就是:记录读总数和写总数,写总数减去读总数就是未读消息长度。写下标和读下标可通过取余得到。

环形缓冲区的写入

向环形缓冲区写入可分为几种情况:

正常情况

  • (写下标 + 未写入数据长度) % 数组长度 < 数组长度。这种情况直接写就行
  • (写下标 + 未写入数据长度) % 数组长度 > 数组长度。超出的部分,从零开始写,直接覆盖掉之前写过的数据就行。

不正常情况(生产者速度大于消费者速度)

  • (写下标 + 未写入数据长度) % 数组长度 > 读下标。这种情况下,客户端发送的请求,服务器还没来得及处理,缓冲区就被写满了。这种情况可能服务器代码写的有问题,也可能是假的客户端一直在发送恶意请求。

环形缓冲区的读取

从读下标读取所需长度即可,基本和写入的正常情况一致。

public class RingBuffer
{
    // 缓冲区大小,必须为2的幂
    private int BufferSize { get; set; }
    
    //缓冲区最大长度
    private int MaxBufferSize { get; set; }
    
    // 缓冲区本区
    private byte[] Bytes { get; set; }
    
    // 读总数
    private uint _readCount;
    
    // 写总数
    private uint _writeCount;
    
    // 写下标
    private int WriteIdx => (int) (_writeCount & (uint) (BufferSize - 1));
    
    // 读下标
    private int ReadIdx => (int) (_readCount & (uint) (BufferSize - 1));
    
    // 缓冲区未读数据长度
    public int DataLength => (int) (_writeCount - _readCount);
    
    private readonly object _lockObj = new();

    /// <summary>
    /// 构造函数,初始化缓冲区,设置缓冲区大小
    /// </summary>
    /// <param name="size">缓冲区大小,必须为2的幂</param>
    /// <param name="maxBufferSize"></param>
    public RingBuffer(int size = 2048, int maxBufferSize = 8192)
    {
        Bytes = new byte[size];
        BufferSize = size;
        MaxBufferSize = maxBufferSize;
        _readCount = 0;
        _writeCount = 0;
    }
    
    /// <summary>
    /// 向环形缓冲区中写给定数组
    /// </summary>
    /// <param name="srcBytes">要写入缓冲区的数组</param>
    /// <param name="srcOffset">给定数组的偏移量</param>
    /// <param name="count">写入长度</param>
    /// <returns>是否写入完毕</returns>
    public bool Write(byte[] srcBytes, int srcOffset, int count)
    {
        if (srcBytes == null || srcBytes.Length == 0) return false;
        if (srcOffset > srcBytes.Length) return false;
        // lock (_lockObj)//只有一个写和一个读线程的情况下,这里锁就可以注释掉
        {
            if (BufferSize - DataLength <= count)
            {
                //扩展缓冲区
                var reSize = ReSize(count);
                if (!reSize)
                {
                    return false;
                }
            }

            var remainingLength = BufferSize - WriteIdx;
            if (remainingLength > count)
            {
                Buffer.BlockCopy(srcBytes, srcOffset, Bytes, WriteIdx, count);
            }
            else
            {
                Buffer.BlockCopy(srcBytes, srcOffset, Bytes, WriteIdx, remainingLength);
                Buffer.BlockCopy(srcBytes, remainingLength, Bytes, 0, count - remainingLength);
            }

            Interlocked.Add(ref _writeCount, (uint) count);
        }

        return true;
    }

    public byte[] GetRead(int count)
    {
        var destination = new byte[count];
        lock (_lockObj)
        {
            if (DataLength < count) return null;
            
            var remainingLength = BufferSize - ReadIdx;
            if (remainingLength > count)
            {
                Buffer.BlockCopy(Bytes, ReadIdx, destination, 0, count);
            }
            else
            {
                Buffer.BlockCopy(Bytes, ReadIdx, destination, 0, remainingLength);
                Buffer.BlockCopy(Bytes, 0, destination, remainingLength, count - remainingLength);
            }

            Interlocked.Add(ref _readCount, (uint) count);
            return destination;
        }
    }
    
    /// <summary>
    /// 扩展缓冲区,必须以2的幂扩展
    /// </summary>
    /// <param name="count"></param>
    private bool ReSize(int count)
    {
        if (BufferSize >= MaxBufferSize)
        {
            return false;
        }

        lock (_lockObj)
        {
            if (count < BufferSize - DataLength)
            {
                return false;
            }

            var size = BufferSize;
            while (count >= size - DataLength)
            {
                size = BufferSize * 2;
            }

            var newBytes = new byte[size];
            if (WriteIdx > ReadIdx || DataLength == 0)
            {
                Buffer.BlockCopy(Bytes, ReadIdx, newBytes, 0, DataLength);
            }
            else
            {
                var remainingLength = BufferSize - ReadIdx;
                Buffer.BlockCopy(Bytes, ReadIdx, newBytes, 0, remainingLength);
                Buffer.BlockCopy(Bytes, 0, newBytes, remainingLength, DataLength - remainingLength);
            }

            Bytes = newBytes;
            BufferSize = size;
            return true;
        }
    }
}

大致实现就是这样,每次有消息过来就写在缓冲区里,增加写总数,读取就增加读总数,写总数减去读总数就是未读消息大小。

其他

用uint声明写总数和读总数为了使写总数与读总数超过uint范围时,此时相减还能得到未读消息大小。

缓冲区大小必须为2的幂是因为,写下标和读下标可直接用与运算得到下标值,避免了使用%取余。

单生产者单消费者的情况,也就是只有一个写和一个读线程的情况下,写缓冲区方法里那个锁可以去掉。

返回顶部