1 // DART database build on DARTFile including CRUD commands and synchronization
2 module tagion.dart.DART;
3 @safe:
4 import core.exception : RangeError;
5 import core.thread : Fiber;
6 import std.conv : ConvException;
7 import std.range : empty;
8 import std.stdio;
9 
10 //import std.stdio;
11 import std.algorithm.iteration : filter, map;
12 import std.format : format;
13 import std.range;
14 import std.traits : EnumMembers;
15 import tagion.basic.Debug : __format, debugwrite = __write;
16 import tagion.basic.Types : Buffer;
17 import tagion.basic.basic : FUNCTION_NAME;
18 import tagion.basic.basic : EnumText, isinit;
19 import tagion.basic.tagionexceptions : Check;
20 import tagion.communication.HiRPC : Callers, HiRPC, HiRPCMethod;
21 import tagion.crypto.SecureInterfaceNet : HashNet, SecureNet;
22 import tagion.dart.BlockFile : BlockFile;
23 import tagion.dart.BlockFile : Index;
24 import tagion.dart.DARTBasic : DARTIndex;
25 import tagion.dart.DARTFile;
26 import tagion.dart.DARTRim;
27 import CRUD = tagion.dart.DARTcrud;
28 import tagion.dart.Recorder : Archive, RecordFactory;
29 import tagion.dart.synchronizer : JournalSynchronizer, Synchronizer;
30 import tagion.hibon.Document : Document;
31 import tagion.hibon.HiBON : HiBON;
32 import tagion.hibon.HiBONJSON;
33 import tagion.hibon.HiBONRecord : GetLabel, HiBONRecord, label, recordType;
34 
35 /**
36  * Calculates the to-angle on the angle circle 
37  * Params:
38  *   from_sector = angle from
39  *   to_sector = angle to
40  * Returns: 
41  *   to angle
42  */
43 uint calc_to_value(const ushort from_sector, const ushort to_sector) pure nothrow @nogc {
44     return to_sector + ((from_sector >= to_sector) ? SECTOR_MAX_SIZE : 0);
45 }
46 
47 unittest {
48     // One sector
49     assert(calc_to_value(0x46A6, 0x46A7) == 0x46A7);
50     // Full round
51     assert(calc_to_value(0x46A6, 0x46A6) == 0x46A6 + SECTOR_MAX_SIZE);
52     // Round around
53     assert(calc_to_value(0x46A7, 0x46A6) == 0x46A6 + SECTOR_MAX_SIZE);
54 
55 }
56 
57 /** 
58      * Calculates the angle arc between from_sector to to_sector
59      * Params:
60      *   from_sector = angle from
61      *   to_sector =  angle to
62      * Returns: 
63      *   sector size
64      */
65 uint calc_sector_size(const ushort from_sector, const ushort to_sector) pure nothrow @nogc {
66     immutable from = from_sector;
67     immutable to = calc_to_value(from_sector, to_sector);
68     return to - from;
69 }
70 
71 unittest { // check calc_sector_size
72     // Full round
73     assert(calc_sector_size(0x543A, 0x543A) == SECTOR_MAX_SIZE);
74     // One sector
75     assert(calc_sector_size(0x543A, 0x543B) == 1);
76     // Part angle
77     assert(calc_sector_size(0x1000, 0xF000) == 0xE000);
78     // Wrap around angle
79     assert(calc_sector_size(0xF000, 0x1000) == 0x2000);
80 }
81 /** 
82  * DART support for HiRPC(dartRead,dartRim,dartBullseye and dartModify)
83  * DART include support for synchronization
84  * Examples: [tagion.testbench.dart]
85  */
86 class DART : DARTFile {
87     immutable ushort from_sector;
88     immutable ushort to_sector;
89     const HiRPC hirpc;
90 
91     /** Creates DART with given net and by given file path
92     * Params: 
93     *   net = Represent SecureNet for initializing DART
94     *   filename = Represent path to DART file to open
95     *   from_sector = Represents from angle for DART sharding. In development.
96     *   to_sector = Represents to angle for DART sharding. In development.
97     */
98     this(const SecureNet net,
99             string filename,
100             Flag!"read_only" read_only = No.read_only,
101             const ushort from_sector = 0,
102             const ushort to_sector = 0) @safe {
103         super(net, filename, read_only);
104         this.from_sector = from_sector;
105         this.to_sector = to_sector;
106         this.hirpc = HiRPC(net);
107     }
108 
109     /** 
110     * Creates DART with given net and by given file path safely with catching possible exceptions
111     * Params:
112     *       net  = Represent SecureNet for initializing DART
113     *       filename = Represent path to DART file to open
114     *       exception = Field used for returning exception in case when something gone wrong
115     *       from_sector = Represents from angle for DART sharding. In development.
116     *       to_sector = Represents to angle for DART sharding. In development.
117     */
118     this(const SecureNet net,
119             string filename,
120             out Exception exception,
121             Flag!"read_only" read_only = No.read_only,
122             const ushort from_sector = 0,
123             const ushort to_sector = 0) @safe {
124         try {
125             this(net, filename, read_only, from_sector, to_sector);
126         }
127         catch (Exception e) {
128             exception = e;
129         }
130     }
131 
132     /** 
133      * Check if the sector is within the DART angle
134      * Params:
135      *   sector = the sector in the DART
136      * Returns: true of the sector is within the DART range
137      */
138     bool inRange(const ushort sector) pure nothrow {
139         return SectorRange.sectorInRange(sector, from_sector, to_sector);
140     }
141 
142     /** 
143      * Creates a SectorRange for the DART
144      * Returns: range of sectors
145      */
146     SectorRange sectors() pure nothrow {
147         return SectorRange(from_sector, to_sector);
148     }
149 
150     mixin(EnumText!(q{Queries}, Callers!DART));
151     /**
152      * The dartBullseye method is called from opCall function
153      * This function return current database bullseye.
154      * Params:
155 received = the HiRPC received package
156      * @param read_only - !Because this function is a read only the read_only parameter has no effect 
157      * @return HiRPC result that contains current database bullseye
158      */
159     @HiRPCMethod private const(HiRPC.Sender) dartBullseye(
160             ref const(HiRPC.Receiver) received,
161             const bool read_only)
162     in {
163         mixin FUNCTION_NAME;
164         assert(received.method.name == __FUNCTION_NAME__);
165     }
166     do {
167         auto hibon_params = new HiBON;
168         hibon_params[Params.bullseye] = bullseye;
169         return hirpc.result(received, hibon_params);
170     }
171     /**
172      * The dartRead method is called from opCall function
173      * This function reads list of archive specified in the list of fingerprints.
174 
175      * The result is returned as a Recorder object
176      * read from the DART
177 
178      * Note:
179      * Because this function is a read only the read_only parameter has no effect
180 
181      * params: received is the HiRPC package
182      * Example:
183      * ---
184      * // HiRPC metode
185      * {
186      *  ....
187      *    message : {
188      *        method : "dartRead"
189      *            params : {
190      *                fingerprints : [
191      *                     <GENERIC>,
192      *                     <GENERIC>,
193      *                     .....
194      *                          ]
195      *                     }
196      *                 ...
197      *                }
198      *             }
199 
200      * // HiRPC Result
201      *   {
202      *   ....
203      *       message : {
204      *           result : {
205      *               recoder : <DOCUMENT> // Recorder
206      *                   limit   : <UINT32> // Optional
207      *       // This parameter is set if dart_indices list exceeds the limit
208      *                    }
209      *               }
210      *   }
211      * ---
212      */
213     @HiRPCMethod private const(HiRPC.Sender) dartRead(
214             ref const(HiRPC.Receiver) received,
215             const bool read_only)
216     in {
217         mixin FUNCTION_NAME;
218         assert(received.method.name == __FUNCTION_NAME__);
219     }
220     do {
221         const doc_dart_indices = received.method.params[Params.dart_indices].get!(Document);
222         auto dart_indices = doc_dart_indices.range!(DARTIndex[]);
223         const recorder = loads(dart_indices, Archive.Type.ADD);
224         return hirpc.result(received, recorder.toDoc);
225     }
226 
227     @HiRPCMethod private const(HiRPC.Sender) dartCheckRead(
228             ref const(HiRPC.Receiver) received,
229             const bool read_only)
230     in {
231         mixin FUNCTION_NAME;
232         assert(received.method.name == __FUNCTION_NAME__);
233     }
234     do {
235         auto doc_dart_indices = received.method.params[Params.dart_indices].get!(Document);
236         auto dart_indices = doc_dart_indices.range!(DARTIndex[]);
237         auto not_in_dart = checkload(dart_indices);
238 
239         auto params = new HiBON;
240         auto params_dart_indices = new HiBON;
241         params_dart_indices = not_in_dart.map!(f => cast(Buffer) f);
242         params[Params.dart_indices] = params_dart_indices;
243         return hirpc.result(received, params);
244     }
245 
246     /**
247      *  The dartRim method is called from opCall function
248      *
249      *  This method reads the Branches object at the specified rim
250      *
251      *  Note:
252      *  Because this function is a read only the read_only parameter has no effect
253      *
254      *  Params:
255      *      received is the HiRPC package
256      *  Example:
257      *  ---
258      *  // HiRPC format
259      *
260      *  {
261      *      ....
262      *      message : {
263      *        method : "dartRim",
264      *            params : {
265      *                rims : <GENERIC>
266      *            }
267      *        }
268      *  }
269      *
270      *  // HiRPC Result
271      *  {
272      *    ....
273      *    message : {
274      *        result : {
275      *            branches : <DOCUMENT> // Branches
276      *            limit    : <UINT32> // Optional
277      *                // This parameter is set if fingerprints list exceeds the limit
278      *            }
279      *        }
280      *  }
281      *
282      * ----
283      */
284     @HiRPCMethod private const(HiRPC.Sender) dartRim(
285             ref const(HiRPC.Receiver) received,
286             const bool read_only)
287     in {
288         mixin FUNCTION_NAME;
289         assert(received.method.name == __FUNCTION_NAME__);
290     }
291     do {
292         immutable params = received.params!Rims;
293 
294         const rim_branches = branches(params.path);
295         HiBON hibon_params;
296         
297         if (!params.key_leaves.empty) {
298             if (!rim_branches.empty) {
299                 auto super_recorder = recorder;
300                 foreach (key; params.key_leaves) {
301                     const index=rim_branches.indices[key];  
302                     if (index.isinit) {
303                         HiRPC.Error not_found;
304                         not_found.message=format("No archive found on %(%02x %):%02x", params.path, key); 
305                         super_recorder.add(not_found);
306                         continue;
307                     }
308                     immutable data = blockfile.load(index);
309                     const doc = Document(data);
310                     super_recorder.add(doc);
311 
312                 }
313                 return hirpc.result(received, super_recorder);
314             }
315         }
316         else if (!rim_branches.empty) {
317             hibon_params = rim_branches.toHiBON(true);
318         }
319         else if (params.path.length > ushort.sizeof || !params.key_leaves.empty) {
320             hibon_params = new HiBON;
321             // It not branches so maybe it is an archive
322             immutable key = params.path[$ - 1];
323             const super_branches = branches(params.path[0 .. $ - 1]);
324             if (!super_branches.empty) {
325                 const index = super_branches.indices[key];
326                 if (index != Index.init) {
327                     // The archive is added to a recorder
328                     immutable data = blockfile.load(index);
329                     const doc = Document(data);
330                     auto super_recorder = recorder;
331                     super_recorder.add(doc);
332                     return hirpc.result(received, super_recorder);
333                 }
334             }
335 
336         }
337         return hirpc.result(received, hibon_params);
338     }
339 
340     /**
341      *  The dartModify method is called from opCall function
342      *
343      *  This function execute and modify function according to the recorder parameter
344      *
345      *  Note:
346      *  This function will fail if read only the read_only is true
347      *
348      *
349      *   Example:
350      *  ---
351      *     // HiRPC format
352      *   {
353      *       ....
354      *       message : {
355      *           method : "dartModify"
356      *           params : {
357      *               recorder : <DOCUNENT> // Recorder object
358      *           }
359      *       }
360      *   }
361      *
362      *  // HiRPC Result
363      *  {
364      *       ....
365      *       message : {
366      *           result   : {
367      *           bullseye : <GENERIC> // Returns the update bullseye of the DART
368      *           }
369      *       }
370      *  }
371      *
372      * ---
373      * Params: received is the HiRPC package
374      * Returns: HiBON Sender 
375      */
376 
377     @HiRPCMethod private const(HiRPC.Sender) dartModify(
378             ref const(HiRPC.Receiver) received,
379             const bool read_only)
380     in {
381         mixin FUNCTION_NAME;
382         assert(received.method.name == __FUNCTION_NAME__);
383     }
384     do {
385         HiRPC.check(!read_only, "The DART is read only");
386         const recorder = manufactor.recorder(received.method.params);
387         immutable bullseye = modify(recorder);
388         auto hibon_params = new HiBON;
389         hibon_params[Params.bullseye] = bullseye;
390         return hirpc.result(received, hibon_params);
391     }
392 
393     /**
394      * This function handels HPRC Queries to the DART
395      * Params:
396      *     received = Request HiRPC object
397      * If read_only is true deleting and erasing data in the DART will return an error
398      * Note.
399      * When the DART is accessed from an external HiRPC this flag should be kept false.
400      *
401      * Returns:
402      *     The response from HPRC if the method is supported
403      *     else the response return is marked empty
404      */
405     const(HiRPC.Sender) opCall(
406             ref const(HiRPC.Receiver) received,
407             const bool read_only = true) {
408         import std.conv : to;
409 
410         const method = received.method;
411         switch (method.name) {
412             static foreach (call; Callers!DART) {
413         case call:
414                 enum code = format(q{return %s(received, read_only);}, call);
415                 mixin(code);
416             }
417         default:
418             // Empty
419         }
420         immutable message = format("Method '%s' not supported", method.name);
421         return hirpc.error(received, message, 22);
422     }
423 
424     /** 
425  * Recorder journal
426  */
427     @recordType("Journal") struct Journal {
428         Index index;
429         RecordFactory.Recorder recorder;
430         enum indexName = GetLabel!(index).name;
431         enum recorderName = GetLabel!(recorder).name;
432         /**
433          * Creator of the Journal recorder
434          * Params:
435          *   manufactor = Recorder factory
436          *   doc = Journal document
437          */
438         this(RecordFactory manufactor, const Document doc) {
439             import tagion.logger.Logger;
440 
441             
442 
443             .check(isRecord(doc), format("Document is not a %s", ThisType.stringof));
444             index = doc[indexName].get!Index;
445             const recorder_doc = doc[recorderName].get!Document;
446             recorder = manufactor.recorder(recorder_doc);
447         }
448         /** 
449          * Ditto
450          * Params:
451          *   recorder = DART recorder
452          *   index = index number
453          */
454         this(const RecordFactory.Recorder recorder, const Index index) const pure nothrow @nogc {
455             this.recorder = recorder;
456             this.index = index;
457         }
458 
459         mixin HiBONRecord!"{}";
460     }
461 
462     /**
463      * Creates a synchronization fiber from a synchroizer 
464      * Params:
465      *   synchonizer = synchronizer to be used
466      *   rims = selected rim path
467      * Returns: 
468      *  synchronization fiber
469      */
470     SynchronizationFiber synchronizer(Synchronizer synchonizer, const Rims rims) {
471         return new SynchronizationFiber(rims, synchonizer);
472     }
473 
474     /**
475      * Synchronizer which supports synchronization from multiplet DART's 
476      */
477     @safe
478     class SynchronizationFiber : Fiber {
479         protected Synchronizer sync;
480 
481         immutable(Rims) root_rims;
482 
483         this(const Rims root_rims, Synchronizer sync) @trusted {
484             this.root_rims = root_rims;
485             this.sync = sync;
486             sync.set(this.outer, this, this.outer.hirpc);
487             super(&run);
488         }
489 
490         protected uint _id;
491         /* 
492          * Id for the HiRPC
493          * Returns: HiRPC id
494         */
495         @property uint id() {
496             if (_id == 0) {
497                 _id = hirpc.generateId();
498             }
499             return _id;
500         }
501 
502         /**
503          * Function to hanle syncronization stage for the DART
504          */
505         final void run()
506         in {
507             assert(sync);
508             assert(blockfile);
509         }
510         do {
511             void iterate(const Rims params) @safe {
512                 //
513                 // Request Branches or Recorder at rims from the foreign DART.
514                 //
515                 const local_branches = branches(params.path);
516                 const request_branches = CRUD.dartRim(rims : params, hirpc:
517                         hirpc, id:
518                         id);
519                 const result_branches = sync.query(request_branches);
520                 if (Branches.isRecord(result_branches.response.result)) {
521                     const foreign_branches = result_branches.result!Branches;
522                     //
523                     // Read all the archives from the foreign DART
524                     //
525                     const request_archives = CRUD.dartRead(
526                             foreign_branches
527                             .dart_indices, hirpc, id);
528                     const result_archives = sync.query(request_archives);
529                     auto foreign_recoder = manufactor.recorder(result_archives.response.result);
530                     //
531                     // The rest of the fingerprints which are not in the foreign_branches must be sub-branches
532                     // 
533 
534                     auto local_recorder = recorder;
535                     scope (success) {
536                         sync.record(local_recorder);
537                     }
538                     foreach (const ubyte key; 0 .. KEY_SPAN) {
539                         const sub_rims = Rims(params.path ~ key);
540                         const local_print = local_branches.dart_index(key);
541                         const foreign_print = foreign_branches.dart_index(key);
542                         auto foreign_archive = foreign_recoder.find(foreign_print);
543                         if (foreign_archive) {
544                             if (local_print != foreign_print) {
545                                 local_recorder.insert(foreign_archive);
546                                 sync.removeRecursive(sub_rims);
547                             }
548                         }
549                         else if (!foreign_print.isinit) {
550                             // Foreign is poits to branches
551                             if (!local_print.empty) {
552                                 const possible_branches_data = load(local_branches, key);
553                                 if (!Branches.isRecord(Document(possible_branches_data))) {
554                                     // If branch is an archive then it is removed because if it exists in foreign DART
555                                     // this archive will be added later
556                                     local_recorder.remove(local_print);
557                                 }
558                             }
559                             iterate(sub_rims);
560                         }
561                         else if (!local_print.empty) {
562                             sync.removeRecursive(sub_rims);
563                         }
564                     }
565                 }
566                 else {
567                     if (result_branches.isRecord!(RecordFactory.Recorder)) {
568                         auto foreign_recoder = manufactor.recorder(result_branches.response.result);
569                         sync.record(foreign_recoder);
570                     }
571                     //
572                     // The foreign DART does not contain data at the rims
573                     //
574                     sync.removeRecursive(params);
575                 }
576             }
577 
578             iterate(root_rims);
579             sync.finish;
580         }
581         /**
582          * Checks if the synchronized  has reached the end 
583          * Returns: true if empty
584          */
585         final bool empty() const pure nothrow {
586             return sync.empty;
587         }
588     }
589 
590     /**
591      * Replays the journal file to update the DART
592      * The update blockfile can be generated from the synchroning process from an foreign dart
593      *
594      * If the process is broken for some reason this the resumed by running the replay function again
595      * on the same block file
596      *
597      * Params:
598      *     journal_filename = Name of the BlockFile to be replaied
599      *
600      * Throws:
601      *     The function will throw an exception if something went wrong in the process.
602      */
603     void replay(const(string) journal_filename) {
604         auto journalfile = BlockFile(journal_filename, Yes.read_only);
605         scope (exit) {
606             journalfile.close;
607         }
608         // Adding and Removing archives
609 
610         for (Index index = journalfile.masterBlock.root_index; index != Index.init;) {
611             immutable data = journalfile.load(index);
612             const doc = Document(data);
613 
614             auto journal_replay = Journal(manufactor, doc);
615             index = journal_replay.index;
616             auto action_recorder = recorder;
617             action_recorder.insert(journal_replay.recorder.archives[]);
618             modify(action_recorder);
619         }
620 
621     }
622 
623     version (unittest) {
624         static class TestSynchronizer : JournalSynchronizer {
625             protected DART foreign_dart;
626             protected DART owner;
627             this(BlockFile journalfile, DART owner, DART foreign_dart) {
628                 this.foreign_dart = foreign_dart;
629                 this.owner = owner;
630                 super(journalfile);
631             }
632 
633             //
634             // This function emulates the connection between two DART's
635             // in a single thread
636             //
637             const(HiRPC.Receiver) query(ref const(HiRPC.Sender) request) {
638                 Document send_request_to_foreign_dart(const Document foreign_doc) {
639                     //
640                     // Remote excution
641                     // Receive on the foreign end
642                     const foreign_receiver = foreign_dart.hirpc.receive(foreign_doc);
643                     // Make query in to the foreign DART
644                     const foreign_response = foreign_dart(foreign_receiver);
645 
646                     return foreign_response.toDoc;
647                 }
648 
649                 immutable foreign_doc = request.toDoc;
650                 (() @trusted { fiber.yield; })();
651                 // Here a yield loop should be implement to poll for response from the foriegn DART
652                 // A timeout should also be implemented in this poll loop
653                 const response_doc = send_request_to_foreign_dart(foreign_doc);
654                 //
655                 // Process the response returned for the foreign DART
656                 //
657                 const received = owner.hirpc.receive(response_doc);
658                 return received;
659             }
660         }
661 
662     }
663 
664     ///Examples: how use the DART
665     unittest {
666         import tagion.basic.basic : assumeTrusted, tempfile;
667         import tagion.dart.BlockFile;
668         import tagion.dart.DARTFakeNet : DARTFakeNet;
669         import tagion.dart.Recorder;
670         import tagion.utils.Random;
671 
672         enum TEST_BLOCK_SIZE = 0x80;
673 
674         auto net = new DARTFakeNet("very_secret");
675 
676         immutable filename = fileId!DART.fullpath;
677         immutable filename_A = fileId!DART("A_").fullpath;
678         immutable filename_B = fileId!DART("B_").fullpath;
679         immutable filename_C = fileId!DART("C_").fullpath;
680 
681         { // Remote Synchronization test
682 
683             import std.file : remove;
684 
685             auto rand = Random!ulong(1234_5678_9012_345UL);
686             enum N = 1000;
687             auto random_tabel = new ulong[N];
688             foreach (ref r; random_tabel) {
689                 immutable sector = rand.value(0x0000_0000_0000_ABBAUL, 0x0000_0000_0000_ABBDUL) << (
690                         8 * 6);
691                 r = rand.value(0x0000_1234_5678_0000UL | sector, 0x0000_1334_FFFF_0000UL | sector);
692             }
693 
694             //
695             // The the following unittest dart A and B covers the same range angle
696             //
697             enum from = 0xABB9;
698             enum to = 0xABBD;
699 
700             //            import std.stdio;
701             { // Single element same sector sectors
702                 const ulong[] same_sector_tabel = [
703                     0xABB9_13ab_cdef_1234,
704                     0xABB9_14ab_cdef_1234,
705                     0xABB9_15ab_cdef_1234
706 
707                 ];
708                 // writefln("Test 0.0");
709                 foreach (test_no; 0 .. 3) {
710                     DARTFile.create(filename_A, net);
711                     DARTFile.create(filename_B, net);
712                     RecordFactory.Recorder recorder_B;
713                     RecordFactory.Recorder recorder_A;
714                     // Recorder recorder_B;
715                     auto dart_A = new DART(net, filename_A, No.read_only, from, to);
716                     auto dart_B = new DART(net, filename_B, No.read_only, from, to);
717                     string[] journal_filenames;
718                     scope (success) {
719                         // writefln("Exit scope");
720                         dart_A.close;
721                         dart_B.close;
722                         filename_A.remove;
723                         filename_B.remove;
724                         foreach (journal_filename; journal_filenames) {
725                             journal_filename.remove;
726                         }
727                     }
728 
729                     switch (test_no) {
730                     case 0:
731                         write(dart_A, same_sector_tabel[0 .. 1], recorder_A);
732                         write(dart_B, same_sector_tabel[0 .. 0], recorder_B);
733                         break;
734                     case 1:
735                         write(dart_A, same_sector_tabel[0 .. 1], recorder_A);
736                         write(dart_B, same_sector_tabel[1 .. 2], recorder_B);
737                         break;
738                     case 2:
739                         write(dart_A, same_sector_tabel[0 .. 2], recorder_A);
740                         write(dart_B, same_sector_tabel[1 .. 3], recorder_B);
741                         break;
742                     default:
743                         assert(0);
744                     }
745                     //writefln("\n------ %d ------", test_no);
746                     //writefln("dart_A.dump");
747                     //dart_A.dump;
748                     //writefln("dart_B.dump");
749                     //dart_B.dump;
750                     //writefln("dart_A.fingerprint=%s", dart_A.fingerprint.cutHex);
751                     //writefln("dart_B.fingerprint=%s", dart_B.fingerprint.cutHex);
752 
753                     foreach (sector; dart_A.sectors) {
754                         immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
755                         journal_filenames ~= journal_filename;
756                         BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
757                         auto journalfile = BlockFile(journal_filename);
758                         auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
759                         auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
760                         // D!(sector, "%x");
761                         while (!dart_A_synchronizer.empty) {
762                             (() @trusted => dart_A_synchronizer.call)();
763                         }
764                     }
765                     foreach (journal_filename; journal_filenames) {
766                         dart_A.replay(journal_filename);
767                     }
768                     //writefln("dart_A.dump");
769                     //dart_A.dump;
770                     //writefln("dart_B.dump");
771                     //dart_B.dump;
772                     //writefln("dart_A.fingerprint=%s", dart_A.fingerprint.cutHex);
773                     //writefln("dart_B.fingerprint=%s", dart_B.fingerprint.cutHex);
774 
775                     assert(dart_A.fingerprint == dart_B.fingerprint);
776                     if (test_no == 0) {
777                         assert(dart_A.fingerprint.isinit);
778                     }
779                     else {
780                         assert(!dart_A.fingerprint.isinit);
781                     }
782                 }
783             }
784 
785             { // Single element different sectors
786                 //
787                 // writefln("Test 0.1");
788                 DARTFile.create(filename_A, net);
789                 DARTFile.create(filename_B, net);
790                 RecordFactory.Recorder recorder_B;
791                 RecordFactory.Recorder recorder_A;
792                 // Recorder recorder_B;
793                 auto dart_A = new DART(net, filename_A, No.read_only, from, to);
794                 auto dart_B = new DART(net, filename_B, No.read_only, from, to);
795                 string[] journal_filenames;
796                 scope (success) {
797                     // writefln("Exit scope");
798                     dart_A.close;
799                     dart_B.close;
800                     filename_A.remove;
801                     filename_B.remove;
802                     foreach (journal_filename; journal_filenames) {
803                         journal_filename.remove;
804                     }
805                 }
806 
807                 write(dart_B, random_tabel[0 .. 1], recorder_B);
808                 write(dart_A, random_tabel[1 .. 2], recorder_A);
809                 // writefln("dart_A.dump");
810                 // dart_A.dump;
811                 // writefln("dart_B.dump");
812                 // dart_B.dump;
813 
814                 foreach (sector; dart_A.sectors) {
815                     immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
816                     journal_filenames ~= journal_filename;
817                     BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
818                     auto journalfile = BlockFile(journal_filename);
819                     auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
820                     auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
821                     // D!(sector, "%x");
822                     while (!dart_A_synchronizer.empty) {
823                         (() @trusted => dart_A_synchronizer.call)();
824                     }
825                 }
826                 foreach (journal_filename; journal_filenames) {
827                     dart_A.replay(journal_filename);
828                 }
829                 // writefln("dart_A.dump");
830                 // dart_A.dump;
831                 // writefln("dart_B.dump");
832                 // dart_B.dump;
833                 assert(!dart_A.fingerprint.isinit);
834                 assert(dart_A.fingerprint == dart_B.fingerprint);
835             }
836             { // Synchronization of an empty DART 
837                 // from DART A against DART B with ONE archive when DART A is empty
838                 DARTFile.create(filename_A, net);
839                 DARTFile.create(filename_B, net);
840                 RecordFactory.Recorder recorder_B;
841                 // Recorder recorder_B;
842                 auto dart_A = new DART(net, filename_A, No.read_only, from, to);
843                 auto dart_B = new DART(net, filename_B, No.read_only, from, to);
844                 //
845                 string[] journal_filenames;
846                 scope (success) {
847                     // writefln("Exit scope");
848                     dart_A.close;
849                     dart_B.close;
850                     filename_A.remove;
851                     filename_B.remove;
852                     foreach (journal_filename; journal_filenames) {
853                         journal_filename.remove;
854                     }
855                 }
856 
857                 const ulong[] single_archive = [0xABB9_13ab_11ef_0923];
858 
859                 write(dart_B, single_archive, recorder_B);
860                 // dart_B.dump;
861 
862                 //
863                 // Synchronize DART_B -> DART_A
864                 //
865                 // Collecting the journal file
866 
867                 foreach (sector; dart_A.sectors) {
868                     immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
869                     journal_filenames ~= journal_filename;
870                     BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
871                     auto journalfile = BlockFile(journal_filename);
872                     auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
873                     auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
874                     // D!(sector, "%x");
875                     while (!dart_A_synchronizer.empty) {
876                         (() @trusted => dart_A_synchronizer.call)();
877                     }
878                 }
879                 foreach (journal_filename; journal_filenames) {
880                     dart_A.replay(journal_filename);
881                 }
882                 // writefln("dart_A.dump");
883                 // dart_A.dump;
884                 // writefln("dart_B.dump");
885                 // dart_B.dump;
886                 assert(!dart_A.fingerprint.isinit);
887                 assert(dart_A.fingerprint == dart_B.fingerprint);
888 
889             }
890             { // Synchronization of an empty DART
891                 // from DART A against DART B when DART A is empty
892                 // writefln("Test 1");
893 
894                 DARTFile.create(filename_A, net);
895                 DARTFile.create(filename_B, net);
896                 RecordFactory.Recorder recorder_B;
897                 // Recorder recorder_B;
898                 auto dart_A = new DART(net, filename_A, No.read_only, from, to);
899                 auto dart_B = new DART(net, filename_B, No.read_only, from, to);
900                 //
901                 string[] journal_filenames;
902                 scope (success) {
903                     // writefln("Exit scope");
904                     dart_A.close;
905                     dart_B.close;
906                     filename_A.remove;
907                     filename_B.remove;
908                     foreach (journal_filename; journal_filenames) {
909                         journal_filename.remove;
910                     }
911                 }
912 
913                 write(dart_B, random_tabel[0 .. 17], recorder_B);
914                 // writefln("dart_A.dump");
915                 // dart_A.dump;
916                 // writefln("dart_B.dump");
917                 // dart_B.dump;
918 
919                 //
920                 // Synchronize DART_B -> DART_A
921                 //
922                 // Collecting the journal file
923 
924                 foreach (sector; dart_A.sectors) {
925                     immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
926                     journal_filenames ~= journal_filename;
927                     BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
928                     auto journalfile = BlockFile(journal_filename);
929                     auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
930                     auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
931                     // D!(sector, "%x");
932                     while (!dart_A_synchronizer.empty) {
933                         (() @trusted => dart_A_synchronizer.call)();
934                     }
935                 }
936                 foreach (journal_filename; journal_filenames) {
937                     dart_A.replay(journal_filename);
938                 }
939                 // writefln("dart_A.dump");
940                 // dart_A.dump;
941                 // writefln("dart_B.dump");
942                 // dart_B.dump;
943                 assert(!dart_A.fingerprint.isinit);
944                 assert(dart_A.fingerprint == dart_B.fingerprint);
945 
946             }
947 
948             { // Synchronization of a DART A which is a subset of DART B
949                 // writefln("Test 2");
950                 DARTFile.create(filename_A, net);
951                 DARTFile.create(filename_B, net);
952                 RecordFactory.Recorder recorder_A;
953                 RecordFactory.Recorder recorder_B;
954                 auto dart_A = new DART(net, filename_A, No.read_only, from, to);
955                 auto dart_B = new DART(net, filename_B, No.read_only, from, to);
956                 //
957                 string[] journal_filenames;
958                 scope (success) {
959                     // writefln("Exit scope");
960                     dart_A.close;
961                     dart_B.close;
962                     filename_A.remove;
963                     filename_B.remove;
964                 }
965 
966                 write(dart_A, random_tabel[0 .. 17], recorder_A);
967                 write(dart_B, random_tabel[0 .. 27], recorder_B);
968                 // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex,  dart_B.fingerprint.cutHex);
969                 // writefln("dart_A.dump");
970                 // dart_A.dump;
971                 // writefln("dart_B.dump");
972                 // dart_B.dump;
973                 assert(!dart_A.fingerprint.isinit);
974                 assert(dart_A.fingerprint != dart_B.fingerprint);
975 
976                 foreach (sector; dart_A.sectors) {
977                     immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
978                     journal_filenames ~= journal_filename;
979                     BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
980                     auto journalfile = BlockFile(journal_filename);
981                     auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
982                     auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
983                     // D!(sector, "%x");
984                     while (!dart_A_synchronizer.empty) {
985                         (() @trusted { dart_A_synchronizer.call; })();
986                     }
987                 }
988 
989                 foreach (journal_filename; journal_filenames) {
990                     dart_A.replay(journal_filename);
991                 }
992                 // writefln("dart_A.dump");
993                 // dart_A.dump;
994                 // writefln("dart_B.dump");
995                 // dart_B.dump;
996                 assert(!dart_A.fingerprint.isinit);
997                 assert(dart_A.fingerprint == dart_B.fingerprint);
998 
999             }
1000 
1001             { // Synchronization of a DART A where DART A is a superset of DART B
1002                 // writefln("Test 3");
1003                 DARTFile.create(filename_A, net);
1004                 DARTFile.create(filename_B, net);
1005                 RecordFactory.Recorder recorder_A;
1006                 RecordFactory.Recorder recorder_B;
1007                 auto dart_A = new DART(net, filename_A, No.read_only, from, to);
1008                 auto dart_B = new DART(net, filename_B, No.read_only, from, to);
1009                 //
1010                 string[] journal_filenames;
1011                 scope (success) {
1012                     // writefln("Exit scope");
1013                     dart_A.close;
1014                     dart_B.close;
1015                     filename_A.remove;
1016                     filename_B.remove;
1017                 }
1018 
1019                 write(dart_A, random_tabel[0 .. 27], recorder_A);
1020                 write(dart_B, random_tabel[0 .. 17], recorder_B);
1021                 //                write(dart_B, random_table[0..17], recorder_B);
1022                 // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex,  dart_B.fingerprint.cutHex);
1023                 // writefln("dart_A.dump");
1024                 // dart_A.dump;
1025                 // writefln("dart_B.dump");
1026                 // dart_B.dump;
1027                 assert(!dart_A.fingerprint.isinit);
1028                 assert(dart_A.fingerprint != dart_B.fingerprint);
1029 
1030                 foreach (sector; dart_A.sectors) {
1031                     immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
1032                     journal_filenames ~= journal_filename;
1033                     BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
1034                     auto journalfile = BlockFile(journal_filename);
1035                     auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
1036                     auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
1037                     // D!(sector, "%x");
1038                     while (!dart_A_synchronizer.empty) {
1039                         (() @trusted { dart_A_synchronizer.call; })();
1040                     }
1041                 }
1042 
1043                 foreach (journal_filename; journal_filenames) {
1044                     dart_A.replay(journal_filename);
1045                 }
1046                 // writefln("dart_A.dump");
1047                 // dart_A.dump;
1048                 // writefln("dart_B.dump");
1049                 // dart_B.dump;
1050                 assert(!dart_A.fingerprint.isinit);
1051                 assert(dart_A.fingerprint == dart_B.fingerprint);
1052 
1053             }
1054 
1055             { // Synchronization of a DART A where DART A is complementary of DART B
1056                 // writefln("Test 4");
1057                 DARTFile.create(filename_A, net);
1058                 DARTFile.create(filename_B, net);
1059                 RecordFactory.Recorder recorder_A;
1060                 RecordFactory.Recorder recorder_B;
1061                 auto dart_A = new DART(net, filename_A, No.read_only, from, to);
1062                 auto dart_B = new DART(net, filename_B, No.read_only, from, to);
1063                 //
1064                 string[] journal_filenames;
1065                 scope (success) {
1066                     // writefln("Exit scope");
1067                     dart_A.close;
1068                     dart_B.close;
1069                     filename_A.remove;
1070                     filename_B.remove;
1071                 }
1072 
1073                 write(dart_A, random_tabel[0 .. 27], recorder_A);
1074                 write(dart_B, random_tabel[28 .. 54], recorder_B);
1075                 //                write(dart_B, random_table[0..17], recorder_B);
1076                 // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex,  dart_B.fingerprint.cutHex);
1077                 // writefln("dart_A.dump");
1078                 // dart_A.dump;
1079                 // writefln("dart_B.dump");
1080                 // dart_B.dump;
1081                 assert(!dart_A.fingerprint.isinit);
1082                 assert(dart_A.fingerprint != dart_B.fingerprint);
1083 
1084                 foreach (sector; dart_A.sectors) {
1085                     immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
1086                     journal_filenames ~= journal_filename;
1087                     BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
1088                     auto journalfile = BlockFile(journal_filename);
1089                     auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
1090                     auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
1091                     // D!(sector, "%x");
1092                     while (!dart_A_synchronizer.empty) {
1093                         (() @trusted { dart_A_synchronizer.call; })();
1094                     }
1095                 }
1096 
1097                 foreach (journal_filename; journal_filenames) {
1098                     // writefln("JOURNAL_FILENAME=%s", journal_filename);
1099                     dart_A.replay(journal_filename);
1100                 }
1101                 // writefln("dart_A.dump");
1102                 // dart_A.dump;
1103                 // writefln("dart_B.dump");
1104                 // dart_B.dump;
1105                 assert(!dart_A.fingerprint.isinit);
1106                 assert(dart_A.fingerprint == dart_B.fingerprint);
1107             }
1108 
1109             { // Synchronization of a DART A where DART A of DART B has common data
1110                 // writefln("Test 5");
1111                 DARTFile.create(filename_A, net);
1112                 DARTFile.create(filename_B, net);
1113                 RecordFactory.Recorder recorder_A;
1114                 RecordFactory.Recorder recorder_B;
1115                 auto dart_A = new DART(net, filename_A, No.read_only, from, to);
1116                 auto dart_B = new DART(net, filename_B, No.read_only, from, to);
1117                 //
1118                 string[] journal_filenames;
1119                 scope (success) {
1120                     // writefln("Exit scope");
1121                     dart_A.close;
1122                     dart_B.close;
1123                     filename_A.remove;
1124                     filename_B.remove;
1125                 }
1126 
1127                 write(dart_A, random_tabel[0 .. 54], recorder_A);
1128                 write(dart_B, random_tabel[28 .. 81], recorder_B);
1129                 //                write(dart_B, random_table[0..17], recorder_B);
1130                 // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex,  dart_B.fingerprint.cutHex);
1131                 // writefln("dart_A.dump");
1132                 // dart_A.dump;
1133                 // writefln("dart_B.dump");
1134                 // dart_B.dump;
1135                 assert(!dart_A.fingerprint.isinit);
1136                 assert(dart_A.fingerprint != dart_B.fingerprint);
1137 
1138                 foreach (sector; dart_A.sectors) {
1139                     immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
1140                     journal_filenames ~= journal_filename;
1141                     BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
1142                     auto journalfile = BlockFile(journal_filename);
1143                     auto synch = new TestSynchronizer(journalfile, dart_A, dart_B);
1144                     auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
1145                     while (!dart_A_synchronizer.empty) {
1146                         (() @trusted { dart_A_synchronizer.call; })();
1147                     }
1148                 }
1149 
1150                 foreach (journal_filename; journal_filenames) {
1151                     dart_A.replay(journal_filename);
1152                 }
1153                 // writefln("dart_A.dump");
1154                 // dart_A.dump;
1155                 // writefln("dart_B.dump");
1156                 // dart_B.dump;
1157                 assert(!dart_A.fingerprint.isinit);
1158                 assert(dart_A.fingerprint == dart_B.fingerprint);
1159 
1160             }
1161             pragma(msg, "fixme(pr) Test disabled because it takes a long time");
1162             version (none) { // Synchronization of a Large DART A where DART A of DART B has common data
1163                 // writefln("Test 6");
1164                 DARTFile.create(filename_A, net);
1165                 DARTFile.create(filename_B, net);
1166                 RecordFactory.Recorder recorder_A;
1167                 RecordFactory.Recorder recorder_B;
1168                 auto dart_A = new DART(net, filename_A, from, to);
1169                 auto dart_B = new DART(net, filename_B, from, to);
1170                 //
1171                 string[] journal_filenames;
1172                 scope (success) {
1173                     // writefln("Exit scope");
1174                     dart_A.close;
1175                     dart_B.close;
1176                     filename_A.remove;
1177                     filename_B.remove;
1178                 }
1179 
1180                 write(dart_A, random_tabel[0 .. 544], recorder_A);
1181                 write(dart_B, random_tabel[288 .. 811], recorder_B);
1182                 //                write(dart_B, random_table[0..17], recorder_B);
1183                 // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex,  dart_B.fingerprint.cutHex);
1184                 // writefln("dart_A.dump");
1185                 // dart_A.dump;
1186                 // writefln("dart_B.dump");
1187                 // dart_B.dump;
1188                 assert(!dart_A.fingerprint.isinit);
1189                 assert(dart_A.fingerprint != dart_B.fingerprint);
1190 
1191                 foreach (sector; dart_A.sectors) {
1192                     immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector);
1193                     journal_filenames ~= journal_filename;
1194                     BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE);
1195                     auto synch = new TestSynchronizer(journal_filename, dart_A, dart_B);
1196                     auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector));
1197                     while (!dart_A_synchronizer.empty) {
1198                         (() @trusted { dart_A_synchronizer.call; })();
1199                     }
1200                 }
1201 
1202                 foreach (journal_filename; journal_filenames) {
1203                     dart_A.replay(journal_filename);
1204                 }
1205                 // writefln("dart_A.dump");
1206                 // dart_A.dump;
1207                 // writefln("dart_B.dump");
1208                 //dart_B.dump;
1209                 assert(!dart_A.fingerprint.isinit);
1210                 assert(dart_A.fingerprint == dart_B.fingerprint);
1211             }
1212 
1213         }
1214     }
1215 
1216 }