1 module tagion.network.ReceiveBuffer;
2 
3 import std.typecons : Tuple;
4 
5 import LEB128 = tagion.utils.LEB128;
6 
7 @safe:
8 
9 struct ReceiveBuffer {
10     ubyte[] buffer; /// Allocated buffer
11     enum LEN_MAX = LEB128.calc_size(uint.max);
12     enum START_SIZE = 0x400;
13     static size_t max_size = 0x4000;
14     alias Receive = ptrdiff_t delegate(scope void[] buf) nothrow @safe;
15     alias ResultBuffer = Tuple!(ptrdiff_t, "size", ubyte[], "data");
16 
17     const(ResultBuffer) opCall(const Receive receive) nothrow {
18         if (buffer is null) {
19             buffer = new ubyte[START_SIZE];
20         }
21         size_t pos;
22         ptrdiff_t total_size = -1;
23         scope (exit) {
24         }
25         while (total_size < 0 || pos <= total_size) {
26             const len = receive(buffer[pos .. $]);
27             if (len == 0) {
28                 return ResultBuffer(pos, buffer[0 .. pos]);
29             }
30             if (len < 0) {
31                 return ResultBuffer(len, buffer[0..pos]);
32             }
33             pos += len;
34 
35             if (total_size < 0) {
36                 if (LEB128.isCompleat(buffer[0 .. pos])) {
37                     const leb128_len = LEB128.decode!size_t(buffer);
38                     total_size = leb128_len.value + leb128_len.size;
39                     if (total_size > max_size) {
40                         return ResultBuffer(-2, null);
41                     }
42                     if (buffer.length <= total_size) {
43                         buffer.length = total_size;
44                     }
45                     if (pos >= total_size) {
46                         break;
47                     }
48                 }
49             }
50 
51         }
52         return ResultBuffer(pos, buffer[0 .. total_size]);
53     }
54 
55 } 
56 
57 version (unittest) {
58     import tagion.hibon.Document;
59     import std.algorithm;
60     import tagion.hibon.HiBONRecord;
61     import std.array;
62     import std.range;
63     import std.format;
64 
65     @safe
66     struct TestStream {
67         const(void)[] buffer;
68         size_t chunck;
69         uint count;
70         this(const(ubyte[]) buf) {
71             buffer = buf;
72         }
73 
74         ptrdiff_t receive(scope void[] buf) nothrow {
75             const _chunck = (count < 3) ? 1 : chunck;
76             count++;
77             const len = (() @trusted => cast(ptrdiff_t) min(_chunck, buf.length, buffer.length))();
78             if (len >= 0) {
79                 (() @trusted { buf[0 .. len] = buffer[0 .. len]; })();
80                 buffer = buffer[len .. $];
81                 return len;
82             }
83             return -1;
84         }
85     }
86 
87     @safe
88     struct TestData {
89         string[] texts;
90         mixin HiBONRecord;
91     }
92 }
93 unittest {
94     static TestStream teststream;
95     TestData testdata;
96     testdata.texts = iota(17).map!((i) => format("Some text %d", i)).array;
97 
98     teststream = TestStream(testdata.serialize);
99     teststream.chunck = 0x100;
100     ReceiveBuffer receive_buffer;
101     {
102         const result_buffer = receive_buffer(&teststream.receive);
103         assert(result_buffer.data == testdata.serialize);
104     }
105 
106     testdata.texts = iota(120).map!((i) => format("Some text %d", i)).array;
107     teststream = TestStream(testdata.serialize);
108     teststream.chunck = 0x100;
109     assert(testdata.serialize.length > receive_buffer.START_SIZE,
110             "Test data should large than START_SIZE");
111     {
112         const result_buffer = receive_buffer(&teststream.receive);
113         assert(result_buffer.data == testdata.serialize);
114     }
115 }