1 module tagion.network.ReceiveBuffer; 2 3 import std.typecons : Tuple; 4 5 import LEB128 = tagion.utils.LEB128; 6 7 @safe: 8 9 struct ReceiveBuffer { 10 ubyte[] buffer; /// Allocated buffer 11 enum LEN_MAX = LEB128.calc_size(uint.max); 12 enum START_SIZE = 0x400; 13 static size_t max_size = 0x4000; 14 alias Receive = ptrdiff_t delegate(scope void[] buf) nothrow @safe; 15 alias ResultBuffer = Tuple!(ptrdiff_t, "size", ubyte[], "data"); 16 17 const(ResultBuffer) opCall(const Receive receive) nothrow { 18 if (buffer is null) { 19 buffer = new ubyte[START_SIZE]; 20 } 21 size_t pos; 22 ptrdiff_t total_size = -1; 23 scope (exit) { 24 } 25 while (total_size < 0 || pos <= total_size) { 26 const len = receive(buffer[pos .. $]); 27 if (len == 0) { 28 return ResultBuffer(pos, buffer[0 .. pos]); 29 } 30 if (len < 0) { 31 return ResultBuffer(len, buffer[0..pos]); 32 } 33 pos += len; 34 35 if (total_size < 0) { 36 if (LEB128.isCompleat(buffer[0 .. pos])) { 37 const leb128_len = LEB128.decode!size_t(buffer); 38 total_size = leb128_len.value + leb128_len.size; 39 if (total_size > max_size) { 40 return ResultBuffer(-2, null); 41 } 42 if (buffer.length <= total_size) { 43 buffer.length = total_size; 44 } 45 if (pos >= total_size) { 46 break; 47 } 48 } 49 } 50 51 } 52 return ResultBuffer(pos, buffer[0 .. total_size]); 53 } 54 55 } 56 57 version (unittest) { 58 import tagion.hibon.Document; 59 import std.algorithm; 60 import tagion.hibon.HiBONRecord; 61 import std.array; 62 import std.range; 63 import std.format; 64 65 @safe 66 struct TestStream { 67 const(void)[] buffer; 68 size_t chunck; 69 uint count; 70 this(const(ubyte[]) buf) { 71 buffer = buf; 72 } 73 74 ptrdiff_t receive(scope void[] buf) nothrow { 75 const _chunck = (count < 3) ? 1 : chunck; 76 count++; 77 const len = (() @trusted => cast(ptrdiff_t) min(_chunck, buf.length, buffer.length))(); 78 if (len >= 0) { 79 (() @trusted { buf[0 .. len] = buffer[0 .. len]; })(); 80 buffer = buffer[len .. $]; 81 return len; 82 } 83 return -1; 84 } 85 } 86 87 @safe 88 struct TestData { 89 string[] texts; 90 mixin HiBONRecord; 91 } 92 } 93 unittest { 94 static TestStream teststream; 95 TestData testdata; 96 testdata.texts = iota(17).map!((i) => format("Some text %d", i)).array; 97 98 teststream = TestStream(testdata.serialize); 99 teststream.chunck = 0x100; 100 ReceiveBuffer receive_buffer; 101 { 102 const result_buffer = receive_buffer(&teststream.receive); 103 assert(result_buffer.data == testdata.serialize); 104 } 105 106 testdata.texts = iota(120).map!((i) => format("Some text %d", i)).array; 107 teststream = TestStream(testdata.serialize); 108 teststream.chunck = 0x100; 109 assert(testdata.serialize.length > receive_buffer.START_SIZE, 110 "Test data should large than START_SIZE"); 111 { 112 const result_buffer = receive_buffer(&teststream.receive); 113 assert(result_buffer.data == testdata.serialize); 114 } 115 }