DART.TestSynchronizer

Undocumented in source.

Constructors

this
this(BlockFile journalfile, DART owner, DART foreign_dart)
Undocumented in source.

Members

Functions

query
const(HiRPC.Receiver) query(const(HiRPC.Sender) request)
Undocumented in source. Be warned that the author may not have intended to support it.

Variables

foreign_dart
DART foreign_dart;
Undocumented in source.
owner
DART owner;
Undocumented in source.

Inherited Members

From JournalSynchronizer

journalfile
BlockFile journalfile;

The actives is stored in this journal file. Which later can be run via the replay function

index
Index index;

Current block index

record
void record(RecordFactory.Recorder recorder)

Update and adds the recorder to the journal and store it

finish
void finish()

Should be called when the synchronization has finished

Examples

Examples: how use the DART

1 import tagion.basic.basic : assumeTrusted, tempfile;
2 import tagion.dart.BlockFile;
3 import tagion.dart.DARTFakeNet : DARTFakeNet;
4 import tagion.dart.Recorder;
5 import tagion.utils.Random;
6 
7 enum TEST_BLOCK_SIZE = 0x80;
8 
9 auto net = new DARTFakeNet("very_secret");
10 
11 immutable filename = fileId!DART.fullpath;
12 immutable filename_A = fileId!DART("A_").fullpath;
13 immutable filename_B = fileId!DART("B_").fullpath;
14 immutable filename_C = fileId!DART("C_").fullpath;
15 
16 { // Remote Synchronization test
17 
18     import std.file : remove;
19 
20     auto rand = Random!ulong(1234_5678_9012_345UL);
21     enum N = 1000;
22     auto random_tabel = new ulong[N];
23     foreach (ref r; random_tabel) {
24         immutable sector = rand.value(0x0000_0000_0000_ABBAUL, 0x0000_0000_0000_ABBDUL) << (
25                 8 * 6);
26         r = rand.value(0x0000_1234_5678_0000UL | sector, 0x0000_1334_FFFF_0000UL | sector);
27     }
28 
29     //
30     // The the following unittest dart A and B covers the same range angle
31     //
32     enum from = 0xABB9;
33     enum to = 0xABBD;
34 
35     //            import std.stdio;
36     { // Single element same sector sectors
37         const ulong[] same_sector_tabel = [
38             0xABB9_13ab_cdef_1234,
39             0xABB9_14ab_cdef_1234,
40             0xABB9_15ab_cdef_1234
41 
42         ];
43         // writefln("Test 0.0");
44         foreach (test_no; 0 .. 3) {
45             DARTFile.create(filename_A, net);
46             DARTFile.create(filename_B, net);
47             RecordFactory.Recorder recorder_B;
48             RecordFactory.Recorder recorder_A;
49             // Recorder recorder_B;
50             auto dart_A = new DART(net, filename_A, No.read_only, from, to);
51             auto dart_B = new DART(net, filename_B, No.read_only, from, to);
52             string[] journal_filenames;
53             scope (success) {
54                 // writefln("Exit scope");
55                 dart_A.close;
56                 dart_B.close;
57                 filename_A.remove;
58                 filename_B.remove;
59                 foreach (journal_filename; journal_filenames) {
60                     journal_filename.remove;
61                 }
62             }
63 
64             switch (test_no) {
65             case 0:
66                 write(dart_A, same_sector_tabel[0 .. 1], recorder_A);
67                 write(dart_B, same_sector_tabel[0 .. 0], recorder_B);
68                 break;
69             case 1:
70                 write(dart_A, same_sector_tabel[0 .. 1], recorder_A);
71                 write(dart_B, same_sector_tabel[1 .. 2], recorder_B);
72                 break;
73             case 2:
74                 write(dart_A, same_sector_tabel[0 .. 2], recorder_A);
75                 write(dart_B, same_sector_tabel[1 .. 3], recorder_B);
76                 break;
77             default:
78                 assert(0);
79             }
80             //writefln("\n------ %d ------", test_no);
81             //writefln("dart_A.dump");
82             //dart_A.dump;
83             //writefln("dart_B.dump");
84             //dart_B.dump;
85             //writefln("dart_A.fingerprint=%s", dart_A.fingerprint.cutHex);
86             //writefln("dart_B.fingerprint=%s", dart_B.fingerprint.cutHex);
87 
88             foreach (sector; dart_A.sectors) {
89                 immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
90                 journal_filenames ~= journal_filename;
91                 BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
92                 auto journalfile = BlockFile(journal_filename);
93                 auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
94                 auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
95                 // D!(sector, "%x");
96                 while (!dart_A_synchronizer.empty) {
97                     (() @trusted => dart_A_synchronizer.call)();
98                 }
99             }
100             foreach (journal_filename; journal_filenames) {
101                 dart_A.replay(journal_filename);
102             }
103             //writefln("dart_A.dump");
104             //dart_A.dump;
105             //writefln("dart_B.dump");
106             //dart_B.dump;
107             //writefln("dart_A.fingerprint=%s", dart_A.fingerprint.cutHex);
108             //writefln("dart_B.fingerprint=%s", dart_B.fingerprint.cutHex);
109 
110             assert(dart_A.fingerprint == dart_B.fingerprint);
111             if (test_no == 0) {
112                 assert(dart_A.fingerprint.isinit);
113             }
114             else {
115                 assert(!dart_A.fingerprint.isinit);
116             }
117         }
118     }
119 
120     { // Single element different sectors
121         //
122         // writefln("Test 0.1");
123         DARTFile.create(filename_A, net);
124         DARTFile.create(filename_B, net);
125         RecordFactory.Recorder recorder_B;
126         RecordFactory.Recorder recorder_A;
127         // Recorder recorder_B;
128         auto dart_A = new DART(net, filename_A, No.read_only, from, to);
129         auto dart_B = new DART(net, filename_B, No.read_only, from, to);
130         string[] journal_filenames;
131         scope (success) {
132             // writefln("Exit scope");
133             dart_A.close;
134             dart_B.close;
135             filename_A.remove;
136             filename_B.remove;
137             foreach (journal_filename; journal_filenames) {
138                 journal_filename.remove;
139             }
140         }
141 
142         write(dart_B, random_tabel[0 .. 1], recorder_B);
143         write(dart_A, random_tabel[1 .. 2], recorder_A);
144         // writefln("dart_A.dump");
145         // dart_A.dump;
146         // writefln("dart_B.dump");
147         // dart_B.dump;
148 
149         foreach (sector; dart_A.sectors) {
150             immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
151             journal_filenames ~= journal_filename;
152             BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
153             auto journalfile = BlockFile(journal_filename);
154             auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
155             auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
156             // D!(sector, "%x");
157             while (!dart_A_synchronizer.empty) {
158                 (() @trusted => dart_A_synchronizer.call)();
159             }
160         }
161         foreach (journal_filename; journal_filenames) {
162             dart_A.replay(journal_filename);
163         }
164         // writefln("dart_A.dump");
165         // dart_A.dump;
166         // writefln("dart_B.dump");
167         // dart_B.dump;
168         assert(!dart_A.fingerprint.isinit);
169         assert(dart_A.fingerprint == dart_B.fingerprint);
170     }
171     { // Synchronization of an empty DART 
172         // from DART A against DART B with ONE archive when DART A is empty
173         DARTFile.create(filename_A, net);
174         DARTFile.create(filename_B, net);
175         RecordFactory.Recorder recorder_B;
176         // Recorder recorder_B;
177         auto dart_A = new DART(net, filename_A, No.read_only, from, to);
178         auto dart_B = new DART(net, filename_B, No.read_only, from, to);
179         //
180         string[] journal_filenames;
181         scope (success) {
182             // writefln("Exit scope");
183             dart_A.close;
184             dart_B.close;
185             filename_A.remove;
186             filename_B.remove;
187             foreach (journal_filename; journal_filenames) {
188                 journal_filename.remove;
189             }
190         }
191 
192         const ulong[] single_archive = [0xABB9_13ab_11ef_0923];
193 
194         write(dart_B, single_archive, recorder_B);
195         // dart_B.dump;
196 
197         //
198         // Synchronize DART_B -> DART_A
199         //
200         // Collecting the journal file
201 
202         foreach (sector; dart_A.sectors) {
203             immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
204             journal_filenames ~= journal_filename;
205             BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
206             auto journalfile = BlockFile(journal_filename);
207             auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
208             auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
209             // D!(sector, "%x");
210             while (!dart_A_synchronizer.empty) {
211                 (() @trusted => dart_A_synchronizer.call)();
212             }
213         }
214         foreach (journal_filename; journal_filenames) {
215             dart_A.replay(journal_filename);
216         }
217         // writefln("dart_A.dump");
218         // dart_A.dump;
219         // writefln("dart_B.dump");
220         // dart_B.dump;
221         assert(!dart_A.fingerprint.isinit);
222         assert(dart_A.fingerprint == dart_B.fingerprint);
223 
224     }
225     { // Synchronization of an empty DART
226         // from DART A against DART B when DART A is empty
227         // writefln("Test 1");
228 
229         DARTFile.create(filename_A, net);
230         DARTFile.create(filename_B, net);
231         RecordFactory.Recorder recorder_B;
232         // Recorder recorder_B;
233         auto dart_A = new DART(net, filename_A, No.read_only, from, to);
234         auto dart_B = new DART(net, filename_B, No.read_only, from, to);
235         //
236         string[] journal_filenames;
237         scope (success) {
238             // writefln("Exit scope");
239             dart_A.close;
240             dart_B.close;
241             filename_A.remove;
242             filename_B.remove;
243             foreach (journal_filename; journal_filenames) {
244                 journal_filename.remove;
245             }
246         }
247 
248         write(dart_B, random_tabel[0 .. 17], recorder_B);
249         // writefln("dart_A.dump");
250         // dart_A.dump;
251         // writefln("dart_B.dump");
252         // dart_B.dump;
253 
254         //
255         // Synchronize DART_B -> DART_A
256         //
257         // Collecting the journal file
258 
259         foreach (sector; dart_A.sectors) {
260             immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
261             journal_filenames ~= journal_filename;
262             BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
263             auto journalfile = BlockFile(journal_filename);
264             auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
265             auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
266             // D!(sector, "%x");
267             while (!dart_A_synchronizer.empty) {
268                 (() @trusted => dart_A_synchronizer.call)();
269             }
270         }
271         foreach (journal_filename; journal_filenames) {
272             dart_A.replay(journal_filename);
273         }
274         // writefln("dart_A.dump");
275         // dart_A.dump;
276         // writefln("dart_B.dump");
277         // dart_B.dump;
278         assert(!dart_A.fingerprint.isinit);
279         assert(dart_A.fingerprint == dart_B.fingerprint);
280 
281     }
282 
283     { // Synchronization of a DART A which is a subset of DART B
284         // writefln("Test 2");
285         DARTFile.create(filename_A, net);
286         DARTFile.create(filename_B, net);
287         RecordFactory.Recorder recorder_A;
288         RecordFactory.Recorder recorder_B;
289         auto dart_A = new DART(net, filename_A, No.read_only, from, to);
290         auto dart_B = new DART(net, filename_B, No.read_only, from, to);
291         //
292         string[] journal_filenames;
293         scope (success) {
294             // writefln("Exit scope");
295             dart_A.close;
296             dart_B.close;
297             filename_A.remove;
298             filename_B.remove;
299         }
300 
301         write(dart_A, random_tabel[0 .. 17], recorder_A);
302         write(dart_B, random_tabel[0 .. 27], recorder_B);
303         // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex,  dart_B.fingerprint.cutHex);
304         // writefln("dart_A.dump");
305         // dart_A.dump;
306         // writefln("dart_B.dump");
307         // dart_B.dump;
308         assert(!dart_A.fingerprint.isinit);
309         assert(dart_A.fingerprint != dart_B.fingerprint);
310 
311         foreach (sector; dart_A.sectors) {
312             immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
313             journal_filenames ~= journal_filename;
314             BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
315             auto journalfile = BlockFile(journal_filename);
316             auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
317             auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
318             // D!(sector, "%x");
319             while (!dart_A_synchronizer.empty) {
320                 (() @trusted { dart_A_synchronizer.call; })();
321             }
322         }
323 
324         foreach (journal_filename; journal_filenames) {
325             dart_A.replay(journal_filename);
326         }
327         // writefln("dart_A.dump");
328         // dart_A.dump;
329         // writefln("dart_B.dump");
330         // dart_B.dump;
331         assert(!dart_A.fingerprint.isinit);
332         assert(dart_A.fingerprint == dart_B.fingerprint);
333 
334     }
335 
336     { // Synchronization of a DART A where DART A is a superset of DART B
337         // writefln("Test 3");
338         DARTFile.create(filename_A, net);
339         DARTFile.create(filename_B, net);
340         RecordFactory.Recorder recorder_A;
341         RecordFactory.Recorder recorder_B;
342         auto dart_A = new DART(net, filename_A, No.read_only, from, to);
343         auto dart_B = new DART(net, filename_B, No.read_only, from, to);
344         //
345         string[] journal_filenames;
346         scope (success) {
347             // writefln("Exit scope");
348             dart_A.close;
349             dart_B.close;
350             filename_A.remove;
351             filename_B.remove;
352         }
353 
354         write(dart_A, random_tabel[0 .. 27], recorder_A);
355         write(dart_B, random_tabel[0 .. 17], recorder_B);
356         //                write(dart_B, random_table[0..17], recorder_B);
357         // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex,  dart_B.fingerprint.cutHex);
358         // writefln("dart_A.dump");
359         // dart_A.dump;
360         // writefln("dart_B.dump");
361         // dart_B.dump;
362         assert(!dart_A.fingerprint.isinit);
363         assert(dart_A.fingerprint != dart_B.fingerprint);
364 
365         foreach (sector; dart_A.sectors) {
366             immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
367             journal_filenames ~= journal_filename;
368             BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
369             auto journalfile = BlockFile(journal_filename);
370             auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
371             auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
372             // D!(sector, "%x");
373             while (!dart_A_synchronizer.empty) {
374                 (() @trusted { dart_A_synchronizer.call; })();
375             }
376         }
377 
378         foreach (journal_filename; journal_filenames) {
379             dart_A.replay(journal_filename);
380         }
381         // writefln("dart_A.dump");
382         // dart_A.dump;
383         // writefln("dart_B.dump");
384         // dart_B.dump;
385         assert(!dart_A.fingerprint.isinit);
386         assert(dart_A.fingerprint == dart_B.fingerprint);
387 
388     }
389 
390     { // Synchronization of a DART A where DART A is complementary of DART B
391         // writefln("Test 4");
392         DARTFile.create(filename_A, net);
393         DARTFile.create(filename_B, net);
394         RecordFactory.Recorder recorder_A;
395         RecordFactory.Recorder recorder_B;
396         auto dart_A = new DART(net, filename_A, No.read_only, from, to);
397         auto dart_B = new DART(net, filename_B, No.read_only, from, to);
398         //
399         string[] journal_filenames;
400         scope (success) {
401             // writefln("Exit scope");
402             dart_A.close;
403             dart_B.close;
404             filename_A.remove;
405             filename_B.remove;
406         }
407 
408         write(dart_A, random_tabel[0 .. 27], recorder_A);
409         write(dart_B, random_tabel[28 .. 54], recorder_B);
410         //                write(dart_B, random_table[0..17], recorder_B);
411         // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex,  dart_B.fingerprint.cutHex);
412         // writefln("dart_A.dump");
413         // dart_A.dump;
414         // writefln("dart_B.dump");
415         // dart_B.dump;
416         assert(!dart_A.fingerprint.isinit);
417         assert(dart_A.fingerprint != dart_B.fingerprint);
418 
419         foreach (sector; dart_A.sectors) {
420             immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
421             journal_filenames ~= journal_filename;
422             BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
423             auto journalfile = BlockFile(journal_filename);
424             auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
425             auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
426             // D!(sector, "%x");
427             while (!dart_A_synchronizer.empty) {
428                 (() @trusted { dart_A_synchronizer.call; })();
429             }
430         }
431 
432         foreach (journal_filename; journal_filenames) {
433             // writefln("JOURNAL_FILENAME=%s", journal_filename);
434             dart_A.replay(journal_filename);
435         }
436         // writefln("dart_A.dump");
437         // dart_A.dump;
438         // writefln("dart_B.dump");
439         // dart_B.dump;
440         assert(!dart_A.fingerprint.isinit);
441         assert(dart_A.fingerprint == dart_B.fingerprint);
442     }
443 
444     { // Synchronization of a DART A where DART A of DART B has common data
445         // writefln("Test 5");
446         DARTFile.create(filename_A, net);
447         DARTFile.create(filename_B, net);
448         RecordFactory.Recorder recorder_A;
449         RecordFactory.Recorder recorder_B;
450         auto dart_A = new DART(net, filename_A, No.read_only, from, to);
451         auto dart_B = new DART(net, filename_B, No.read_only, from, to);
452         //
453         string[] journal_filenames;
454         scope (success) {
455             // writefln("Exit scope");
456             dart_A.close;
457             dart_B.close;
458             filename_A.remove;
459             filename_B.remove;
460         }
461 
462         write(dart_A, random_tabel[0 .. 54], recorder_A);
463         write(dart_B, random_tabel[28 .. 81], recorder_B);
464         //                write(dart_B, random_table[0..17], recorder_B);
465         // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex,  dart_B.fingerprint.cutHex);
466         // writefln("dart_A.dump");
467         // dart_A.dump;
468         // writefln("dart_B.dump");
469         // dart_B.dump;
470         assert(!dart_A.fingerprint.isinit);
471         assert(dart_A.fingerprint != dart_B.fingerprint);
472 
473         foreach (sector; dart_A.sectors) {
474             immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
475             journal_filenames ~= journal_filename;
476             BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
477             auto journalfile = BlockFile(journal_filename);
478             auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
479             auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
480             while (!dart_A_synchronizer.empty) {
481                 (() @trusted { dart_A_synchronizer.call; })();
482             }
483         }
484 
485         foreach (journal_filename; journal_filenames) {
486             dart_A.replay(journal_filename);
487         }
488         // writefln("dart_A.dump");
489         // dart_A.dump;
490         // writefln("dart_B.dump");
491         // dart_B.dump;
492         assert(!dart_A.fingerprint.isinit);
493         assert(dart_A.fingerprint == dart_B.fingerprint);
494 
495     }
496     pragma(msg, "fixme(pr) Test disabled because it takes a long time");
497     version (none) { // Synchronization of a Large DART A where DART A of DART B has common data
498         // writefln("Test 6");
499         DARTFile.create(filename_A, net);
500         DARTFile.create(filename_B, net);
501         RecordFactory.Recorder recorder_A;
502         RecordFactory.Recorder recorder_B;
503         auto dart_A = new DART(net, filename_A, from, to);
504         auto dart_B = new DART(net, filename_B, from, to);
505         //
506         string[] journal_filenames;
507         scope (success) {
508             // writefln("Exit scope");
509             dart_A.close;
510             dart_B.close;
511             filename_A.remove;
512             filename_B.remove;
513         }
514 
515         write(dart_A, random_tabel[0 .. 544], recorder_A);
516         write(dart_B, random_tabel[288 .. 811], recorder_B);
517         //                write(dart_B, random_table[0..17], recorder_B);
518         // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex,  dart_B.fingerprint.cutHex);
519         // writefln("dart_A.dump");
520         // dart_A.dump;
521         // writefln("dart_B.dump");
522         // dart_B.dump;
523         assert(!dart_A.fingerprint.isinit);
524         assert(dart_A.fingerprint != dart_B.fingerprint);
525 
526         foreach (sector; dart_A.sectors) {
527             immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
528             journal_filenames ~= journal_filename;
529             BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
530             auto synch = new TestSynchronizer(journal_filename, dart_A, dart_B);
531             auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
532             while (!dart_A_synchronizer.empty) {
533                 (() @trusted { dart_A_synchronizer.call; })();
534             }
535         }
536 
537         foreach (journal_filename; journal_filenames) {
538             dart_A.replay(journal_filename);
539         }
540         // writefln("dart_A.dump");
541         // dart_A.dump;
542         // writefln("dart_B.dump");
543         //dart_B.dump;
544         assert(!dart_A.fingerprint.isinit);
545         assert(dart_A.fingerprint == dart_B.fingerprint);
546     }
547 
548 }

Meta