1 /// Helper functions for interfacing with the DART. 2 module tagion.testbench.dart.dart_helper_functions; 3 4 import std.algorithm : filter, map; 5 import std.algorithm : each; 6 import std.format; 7 import std.random : MinstdRand0, randomShuffle; 8 import std.range; 9 import std.stdio : writefln, writeln; 10 import tagion.Keywords; 11 import tagion.basic.basic : isinit; 12 import tagion.basic.basic : tempfile; 13 import tagion.communication.HiRPC; 14 import tagion.dart.BlockFile : BlockFile; 15 import tagion.dart.DART : DART; 16 import tagion.dart.DARTBasic : DARTIndex; 17 import tagion.dart.DARTFakeNet; 18 import tagion.dart.DARTFile : DARTFile; 19 import tagion.dart.DARTRim; 20 import tagion.dart.DARTcrud : dartRead, dartRim; 21 import tagion.dart.Recorder : Archive, RecordFactory; 22 import tagion.hibon.Document; 23 import tagion.hibon.HiBONJSON : toPretty; 24 import tagion.utils.Random; 25 26 /** 27 * Takes a Rim and returns the document. 28 * Params: 29 * rim = The rim to check 30 * hirpc = The hirpc used 31 * db = The dart 32 * Returns: Result Document with branches and or records. 33 */ 34 Document getRim(Rims rim, HiRPC hirpc, DART db) @safe { 35 const rim_sender = dartRim(rim, hirpc); 36 auto rim_receiver = hirpc.receive(rim_sender.toDoc); 37 auto rim_result = db(rim_receiver, false); 38 return rim_result.message[Keywords.result].get!Document; 39 } 40 41 /** 42 * Reads a list of DARTIndexes and returnts the document 43 * Params: 44 * fingerprints = list of fingerprints to read 45 * hirpc = the hirpc used 46 * db = The dart 47 * Returns: Result Document. 48 */ 49 Document getRead(const DARTIndex[] fingerprints, HiRPC hirpc, DART db) @safe { 50 const sender = dartRead(fingerprints, hirpc); 51 auto receiver = hirpc.receive(sender.toDoc); 52 auto result = db(receiver, false); 53 return result.message[Keywords.result].get!Document; 54 } 55 56 /** 57 * Traverses dart until a split occurs. 58 * Params: 59 * rim = 60 * hirpc = 61 * db = 62 * Returns: Document with split, or the last document able to be retrieved if no splits. 63 */ 64 Document goToSplit(const Rims rim, const HiRPC hirpc, DART db) @safe { 65 const rim_doc = getRim(rim, hirpc, db); 66 67 if (DARTFile.Branches.isRecord(rim_doc)) { 68 auto rim_fingerprints = DARTFile.Branches(rim_doc) 69 .fingerprints 70 .enumerate 71 .filter!(f => !f.value.empty) 72 .array; 73 74 if (rim_fingerprints.length > 1) { 75 return rim_doc; 76 } 77 return goToSplit(Rims(rim, rim_fingerprints.front.index), hirpc, db); 78 } 79 80 return rim_doc; 81 } 82 83 /** 84 * Helper method to retrieve fingerprints from a Document 85 * Params: 86 * doc = The Document with fingerprints 87 * db = 88 * Returns: Returns a list of fingerprints from a Document 89 */ 90 DARTIndex[] getFingerprints(const Document doc, DART db = null) @safe { 91 92 pragma(msg, "fixme(cbr): Check the that we use the dartIndex and Fingerprint in this test correctetly"); 93 if (RecordFactory.Recorder.isRecord(doc)) { 94 assert(db !is null, "DART needed for this use case"); 95 auto recorder = db.recorder(doc); 96 return recorder[].map!(a => cast(DARTIndex)(a.fingerprint)).array; 97 98 } 99 100 return DARTFile.Branches(doc).dart_indices 101 .filter!(f => !f.isinit) 102 .array; 103 } 104 105 /** 106 * Adds archive in a shuffled random order based on the sequence states. 107 * Params: 108 * states = the random sequence. 109 * rnd = seed for random number generator. 110 * db = The dart 111 * Returns: list of fingerprints added to the db. 112 */ 113 114 DARTIndex[] randomAdd(const Sequence!ulong[] states, MinstdRand0 rnd, DART db) @safe { 115 DARTIndex[] dart_indexs; 116 117 foreach (state; states.dup.randomShuffle(rnd)) { 118 auto recorder = db.recorder(); 119 120 const(Document[]) docs = state.list.map!(r => DARTFakeNet.fake_doc(r)).array; 121 foreach (doc; docs) { 122 recorder.add(doc); 123 dart_indexs ~= DARTIndex(recorder[].front.dart_index); 124 } 125 db.modify(recorder); 126 } 127 return dart_indexs; 128 } 129 130 DARTIndex[] randomAdd(T)(T ranges, MinstdRand0 rnd, DART db) @safe 131 if (isRandomAccessRange!T && isInputRange!(ElementType!T) && is( 132 ElementType!(ElementType!T) : const(ulong))) { 133 DARTIndex[] dart_indexs; 134 foreach (range; ranges.randomShuffle(rnd)) { 135 auto recorder = db.recorder(); 136 auto docs = range.map!(r => DARTFakeNet.fake_doc(r)); 137 foreach (doc; docs) { 138 recorder.add(doc); 139 dart_indexs ~= DARTIndex(recorder[].front.dart_index); 140 } 141 db.modify(recorder); 142 } 143 return dart_indexs; 144 } 145 146 /** 147 * Removes archive in a random order. 148 * Params: 149 * dart_indexs = The dart_indexs to remove 150 * rnd = the random seed 151 * db = the database 152 */ 153 void randomRemove(const DARTIndex[] dart_indexs, MinstdRand0 rnd, DART db) @safe { 154 auto recorder = db.recorder(); 155 156 const random_order_dart_indexs = dart_indexs.dup.randomShuffle(rnd); 157 foreach (dart_index; random_order_dart_indexs) { 158 writefln("removing %(%02x%)", dart_index); 159 recorder.remove(dart_index); 160 } 161 db.modify(recorder); 162 } 163 164 /** 165 * Changes the sector in which the archive is created in. This is for testing only an angle of the database. 166 * Params: 167 * archive = the archive to change 168 * angle = The angle / sector 169 * size = The size from the angle so that it is possible to have more than one. 170 * Returns: new ulong where the sector has been changed. 171 */ 172 ulong putInSector(ulong archive, const ushort angle, const ushort size) @safe { 173 174 enum size_none_sector = (ulong.sizeof - ushort.sizeof) * 8; 175 const ulong sector = ((archive >> size_none_sector - angle) % size + angle) << size_none_sector; 176 177 const(ulong) new_archive = archive & ~( 178 ulong(ushort.max) << size_none_sector) | ulong(sector) << size_none_sector; 179 180 return new_archive; 181 } 182 183 // same as in unittests. 184 import tagion.dart.synchronizer; 185 186 static class TestSynchronizer : JournalSynchronizer { 187 protected DART foreign_dart; 188 protected DART owner; 189 this(string journal_filename, DART owner, DART foreign_dart) @safe { 190 this.foreign_dart = foreign_dart; 191 this.owner = owner; 192 auto _journalfile = BlockFile(journal_filename); 193 super(_journalfile); 194 } 195 196 // 197 // This function emulates the connection between two DART's 198 // in a single thread 199 // 200 const(HiRPC.Receiver) query(ref const(HiRPC.Sender) request) { 201 Document send_request_to_foreign_dart(const Document foreign_doc) { 202 // 203 // Remote excution 204 // Receive on the foreign end 205 const foreign_receiver = foreign_dart.hirpc.receive(foreign_doc); 206 // Make query in to the foreign DART 207 const foreign_response = foreign_dart(foreign_receiver); 208 209 return foreign_response.toDoc; 210 } 211 212 immutable foreign_doc = request.toDoc; 213 (() @trusted { fiber.yield; })(); 214 // Here a yield loop should be implement to poll for response from the foriegn DART 215 // A timeout should also be implemented in this poll loop 216 const response_doc = send_request_to_foreign_dart(foreign_doc); 217 // 218 // Process the response returned for the foreign DART 219 // 220 const received = owner.hirpc.receive(response_doc); 221 return received; 222 } 223 } 224 225 /** 226 * Syncs to darts 227 * Params: 228 * db1 = Dart to sync From 229 * db2 = Dart to sync TO 230 * from = angle start 231 * to = angle end 232 */ 233 void syncDarts(DART db1, DART db2, const ushort from, const ushort to) @safe { 234 235 enum TEST_BLOCK_SIZE = 0x80; 236 string[] journal_filenames; 237 238 foreach (sector; SectorRange(from, to)) { 239 immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector); 240 journal_filenames ~= journal_filename; 241 BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE); 242 auto synch = new TestSynchronizer(journal_filename, db2, db1); 243 244 auto db2_synchronizer = db2.synchronizer(synch, Rims(sector)); 245 // D!(sector, "%x"); 246 while (!db2_synchronizer.empty) { 247 (() @trusted => db2_synchronizer.call)(); 248 } 249 } 250 foreach (journal_filename; journal_filenames) { 251 db2.replay(journal_filename); 252 } 253 254 } 255 256 struct RandomArchives { 257 import std.random; 258 import std.random : Random; 259 260 uint seed; 261 bool in_dart; 262 uint number_of_archives; 263 264 this(const uint _seed, const uint from = 1, const uint to = 10) pure const @safe { 265 seed = _seed; 266 auto rnd = Random(seed); 267 number_of_archives = uniform(from, to, rnd); 268 } 269 270 auto values() pure nothrow @nogc @safe { 271 auto gen = Mt19937_64(seed); 272 return gen.take(number_of_archives); 273 } 274 } 275 276 unittest { 277 import std.algorithm; 278 import std.stdio; 279 280 const seed = 12345UL; 281 auto r = RandomArchives(seed, 1, 10); 282 auto t = RandomArchives(seed, 1, 10); 283 284 assert(r.values == t.values); 285 }