1 /// Helper functions for interfacing with the DART. 
2 module tagion.testbench.dart.dart_helper_functions;
3 
4 import std.algorithm : filter, map;
5 import std.algorithm : each;
6 import std.format;
7 import std.random : MinstdRand0, randomShuffle;
8 import std.range;
9 import std.stdio : writefln, writeln;
10 import tagion.Keywords;
11 import tagion.basic.basic : isinit;
12 import tagion.basic.basic : tempfile;
13 import tagion.communication.HiRPC;
14 import tagion.dart.BlockFile : BlockFile;
15 import tagion.dart.DART : DART;
16 import tagion.dart.DARTBasic : DARTIndex;
17 import tagion.dart.DARTFakeNet;
18 import tagion.dart.DARTFile : DARTFile;
19 import tagion.dart.DARTRim;
20 import tagion.dart.DARTcrud : dartRead, dartRim;
21 import tagion.dart.Recorder : Archive, RecordFactory;
22 import tagion.hibon.Document;
23 import tagion.hibon.HiBONJSON : toPretty;
24 import tagion.utils.Random;
25 
26 /** 
27  * Takes a Rim and returns the document.
28  * Params:
29  *   rim = The rim to check
30  *   hirpc = The hirpc used
31  *   db = The dart 
32  * Returns: Result Document with branches and or records.
33  */
34 Document getRim(Rims rim, HiRPC hirpc, DART db) @safe {
35     const rim_sender = dartRim(rim, hirpc);
36     auto rim_receiver = hirpc.receive(rim_sender.toDoc);
37     auto rim_result = db(rim_receiver, false);
38     return rim_result.message[Keywords.result].get!Document;
39 }
40 
41 /** 
42  * Reads a list of DARTIndexes and returnts the document
43  * Params:
44  *   fingerprints = list of fingerprints to read
45  *   hirpc = the hirpc used
46  *   db = The dart
47  * Returns: Result Document.
48  */
49 Document getRead(const DARTIndex[] fingerprints, HiRPC hirpc, DART db) @safe {
50     const sender = dartRead(fingerprints, hirpc);
51     auto receiver = hirpc.receive(sender.toDoc);
52     auto result = db(receiver, false);
53     return result.message[Keywords.result].get!Document;
54 }
55 
56 /** 
57  * Traverses dart until a split occurs.
58  * Params:
59  *   rim = 
60  *   hirpc = 
61  *   db = 
62  * Returns: Document with split, or the last document able to be retrieved if no splits.
63  */
64 Document goToSplit(const Rims rim, const HiRPC hirpc, DART db) @safe {
65     const rim_doc = getRim(rim, hirpc, db);
66 
67     if (DARTFile.Branches.isRecord(rim_doc)) {
68         auto rim_fingerprints = DARTFile.Branches(rim_doc)
69             .fingerprints
70             .enumerate
71             .filter!(f => !f.value.empty)
72             .array;
73 
74         if (rim_fingerprints.length > 1) {
75             return rim_doc;
76         }
77         return goToSplit(Rims(rim, rim_fingerprints.front.index), hirpc, db);
78     }
79 
80     return rim_doc;
81 }
82 
83 /** 
84  * Helper method to retrieve fingerprints from a Document
85  * Params:
86  *   doc = The Document with fingerprints
87  *   db = 
88  * Returns: Returns a list of fingerprints from a Document
89  */
90 DARTIndex[] getFingerprints(const Document doc, DART db = null) @safe {
91 
92     pragma(msg, "fixme(cbr): Check the that we use the dartIndex and Fingerprint in this test correctetly");
93     if (RecordFactory.Recorder.isRecord(doc)) {
94         assert(db !is null, "DART needed for this use case");
95         auto recorder = db.recorder(doc);
96         return recorder[].map!(a => cast(DARTIndex)(a.fingerprint)).array;
97 
98     }
99 
100     return DARTFile.Branches(doc).dart_indices
101         .filter!(f => !f.isinit)
102         .array;
103 }
104 
105 /** 
106  * Adds archive in a shuffled random order based on the sequence states.
107  * Params:
108  *   states = the random sequence.
109  *   rnd = seed for random number generator.
110  *   db = The dart
111  * Returns: list of fingerprints added to the db.
112  */
113 
114 DARTIndex[] randomAdd(const Sequence!ulong[] states, MinstdRand0 rnd, DART db) @safe {
115     DARTIndex[] dart_indexs;
116 
117     foreach (state; states.dup.randomShuffle(rnd)) {
118         auto recorder = db.recorder();
119 
120         const(Document[]) docs = state.list.map!(r => DARTFakeNet.fake_doc(r)).array;
121         foreach (doc; docs) {
122             recorder.add(doc);
123             dart_indexs ~= DARTIndex(recorder[].front.dart_index);
124         }
125         db.modify(recorder);
126     }
127     return dart_indexs;
128 }
129 
130 DARTIndex[] randomAdd(T)(T ranges, MinstdRand0 rnd, DART db) @safe
131         if (isRandomAccessRange!T && isInputRange!(ElementType!T) && is(
132             ElementType!(ElementType!T) : const(ulong))) {
133     DARTIndex[] dart_indexs;
134     foreach (range; ranges.randomShuffle(rnd)) {
135         auto recorder = db.recorder();
136         auto docs = range.map!(r => DARTFakeNet.fake_doc(r));
137         foreach (doc; docs) {
138             recorder.add(doc);
139             dart_indexs ~= DARTIndex(recorder[].front.dart_index);
140         }
141         db.modify(recorder);
142     }
143     return dart_indexs;
144 }
145 
146 /** 
147  * Removes archive in a random order.
148  * Params:
149  *   dart_indexs = The dart_indexs to remove
150  *   rnd = the random seed
151  *   db = the database
152  */
153 void randomRemove(const DARTIndex[] dart_indexs, MinstdRand0 rnd, DART db) @safe {
154     auto recorder = db.recorder();
155 
156     const random_order_dart_indexs = dart_indexs.dup.randomShuffle(rnd);
157     foreach (dart_index; random_order_dart_indexs) {
158         writefln("removing %(%02x%)", dart_index);
159         recorder.remove(dart_index);
160     }
161     db.modify(recorder);
162 }
163 
164 /** 
165  * Changes the sector in which the archive is created in. This is for testing only an angle of the database. 
166  * Params:
167  *   archive = the archive to change
168  *   angle = The angle / sector 
169  *   size = The size from the angle so that it is possible to have more than one.
170  * Returns: new ulong where the sector has been changed.
171  */
172 ulong putInSector(ulong archive, const ushort angle, const ushort size) @safe {
173 
174     enum size_none_sector = (ulong.sizeof - ushort.sizeof) * 8;
175     const ulong sector = ((archive >> size_none_sector - angle) % size + angle) << size_none_sector;
176 
177     const(ulong) new_archive = archive & ~(
178             ulong(ushort.max) << size_none_sector) | ulong(sector) << size_none_sector;
179 
180     return new_archive;
181 }
182 
183 // same as in unittests.
184 import tagion.dart.synchronizer;
185 
186 static class TestSynchronizer : JournalSynchronizer {
187     protected DART foreign_dart;
188     protected DART owner;
189     this(string journal_filename, DART owner, DART foreign_dart) @safe {
190         this.foreign_dart = foreign_dart;
191         this.owner = owner;
192         auto _journalfile = BlockFile(journal_filename);
193         super(_journalfile);
194     }
195 
196     //
197     // This function emulates the connection between two DART's
198     // in a single thread
199     //
200     const(HiRPC.Receiver) query(ref const(HiRPC.Sender) request) {
201         Document send_request_to_foreign_dart(const Document foreign_doc) {
202             //
203             // Remote excution
204             // Receive on the foreign end
205             const foreign_receiver = foreign_dart.hirpc.receive(foreign_doc);
206             // Make query in to the foreign DART
207             const foreign_response = foreign_dart(foreign_receiver);
208 
209             return foreign_response.toDoc;
210         }
211 
212         immutable foreign_doc = request.toDoc;
213         (() @trusted { fiber.yield; })();
214         // Here a yield loop should be implement to poll for response from the foriegn DART
215         // A timeout should also be implemented in this poll loop
216         const response_doc = send_request_to_foreign_dart(foreign_doc);
217         //
218         // Process the response returned for the foreign DART
219         //
220         const received = owner.hirpc.receive(response_doc);
221         return received;
222     }
223 }
224 
225 /** 
226  * Syncs to darts
227  * Params:
228  *   db1 = Dart to sync From
229  *   db2 = Dart to sync TO
230  *   from = angle start
231  *   to = angle end
232  */
233 void syncDarts(DART db1, DART db2, const ushort from, const ushort to) @safe {
234 
235     enum TEST_BLOCK_SIZE = 0x80;
236     string[] journal_filenames;
237 
238     foreach (sector; SectorRange(from, to)) {
239         immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
240         journal_filenames ~= journal_filename;
241         BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
242         auto synch = new TestSynchronizer(journal_filename, db2, db1);
243 
244         auto db2_synchronizer = db2.synchronizer(synch, Rims(sector));
245         // D!(sector, "%x");
246         while (!db2_synchronizer.empty) {
247             (() @trusted => db2_synchronizer.call)();
248         }
249     }
250     foreach (journal_filename; journal_filenames) {
251         db2.replay(journal_filename);
252     }
253 
254 }
255 
256 struct RandomArchives {
257     import std.random;
258     import std.random : Random;
259 
260     uint seed;
261     bool in_dart;
262     uint number_of_archives;
263 
264     this(const uint _seed, const uint from = 1, const uint to = 10) pure const @safe {
265         seed = _seed;
266         auto rnd = Random(seed);
267         number_of_archives = uniform(from, to, rnd);
268     }
269 
270     auto values() pure nothrow @nogc @safe {
271         auto gen = Mt19937_64(seed);
272         return gen.take(number_of_archives);
273     }
274 }
275 
276 unittest {
277     import std.algorithm;
278     import std.stdio;
279 
280     const seed = 12345UL;
281     auto r = RandomArchives(seed, 1, 10);
282     auto t = RandomArchives(seed, 1, 10);
283 
284     assert(r.values == t.values);
285 }