1 // DART database build on DARTFile including CRUD commands and synchronization 2 module tagion.dart.DART; 3 @safe: 4 import core.exception : RangeError; 5 import core.thread : Fiber; 6 import std.conv : ConvException; 7 import std.range : empty; 8 import std.stdio; 9 10 //import std.stdio; 11 import std.algorithm.iteration : filter, map; 12 import std.format : format; 13 import std.range; 14 import std.traits : EnumMembers; 15 import tagion.basic.Debug : __format, debugwrite = __write; 16 import tagion.basic.Types : Buffer; 17 import tagion.basic.basic : FUNCTION_NAME; 18 import tagion.basic.basic : EnumText, isinit; 19 import tagion.basic.tagionexceptions : Check; 20 import tagion.communication.HiRPC : Callers, HiRPC, HiRPCMethod; 21 import tagion.crypto.SecureInterfaceNet : HashNet, SecureNet; 22 import tagion.dart.BlockFile : BlockFile; 23 import tagion.dart.BlockFile : Index; 24 import tagion.dart.DARTBasic : DARTIndex; 25 import tagion.dart.DARTFile; 26 import tagion.dart.DARTRim; 27 import CRUD = tagion.dart.DARTcrud; 28 import tagion.dart.Recorder : Archive, RecordFactory; 29 import tagion.dart.synchronizer : JournalSynchronizer, Synchronizer; 30 import tagion.hibon.Document : Document; 31 import tagion.hibon.HiBON : HiBON; 32 import tagion.hibon.HiBONJSON; 33 import tagion.hibon.HiBONRecord : GetLabel, HiBONRecord, label, recordType; 34 35 /** 36 * Calculates the to-angle on the angle circle 37 * Params: 38 * from_sector = angle from 39 * to_sector = angle to 40 * Returns: 41 * to angle 42 */ 43 uint calc_to_value(const ushort from_sector, const ushort to_sector) pure nothrow @nogc { 44 return to_sector + ((from_sector >= to_sector) ? SECTOR_MAX_SIZE : 0); 45 } 46 47 unittest { 48 // One sector 49 assert(calc_to_value(0x46A6, 0x46A7) == 0x46A7); 50 // Full round 51 assert(calc_to_value(0x46A6, 0x46A6) == 0x46A6 + SECTOR_MAX_SIZE); 52 // Round around 53 assert(calc_to_value(0x46A7, 0x46A6) == 0x46A6 + SECTOR_MAX_SIZE); 54 55 } 56 57 /** 58 * Calculates the angle arc between from_sector to to_sector 59 * Params: 60 * from_sector = angle from 61 * to_sector = angle to 62 * Returns: 63 * sector size 64 */ 65 uint calc_sector_size(const ushort from_sector, const ushort to_sector) pure nothrow @nogc { 66 immutable from = from_sector; 67 immutable to = calc_to_value(from_sector, to_sector); 68 return to - from; 69 } 70 71 unittest { // check calc_sector_size 72 // Full round 73 assert(calc_sector_size(0x543A, 0x543A) == SECTOR_MAX_SIZE); 74 // One sector 75 assert(calc_sector_size(0x543A, 0x543B) == 1); 76 // Part angle 77 assert(calc_sector_size(0x1000, 0xF000) == 0xE000); 78 // Wrap around angle 79 assert(calc_sector_size(0xF000, 0x1000) == 0x2000); 80 } 81 /** 82 * DART support for HiRPC(dartRead,dartRim,dartBullseye and dartModify) 83 * DART include support for synchronization 84 * Examples: [tagion.testbench.dart] 85 */ 86 class DART : DARTFile { 87 immutable ushort from_sector; 88 immutable ushort to_sector; 89 const HiRPC hirpc; 90 91 /** Creates DART with given net and by given file path 92 * Params: 93 * net = Represent SecureNet for initializing DART 94 * filename = Represent path to DART file to open 95 * from_sector = Represents from angle for DART sharding. In development. 96 * to_sector = Represents to angle for DART sharding. In development. 97 */ 98 this(const SecureNet net, 99 string filename, 100 Flag!"read_only" read_only = No.read_only, 101 const ushort from_sector = 0, 102 const ushort to_sector = 0) @safe { 103 super(net, filename, read_only); 104 this.from_sector = from_sector; 105 this.to_sector = to_sector; 106 this.hirpc = HiRPC(net); 107 } 108 109 /** 110 * Creates DART with given net and by given file path safely with catching possible exceptions 111 * Params: 112 * net = Represent SecureNet for initializing DART 113 * filename = Represent path to DART file to open 114 * exception = Field used for returning exception in case when something gone wrong 115 * from_sector = Represents from angle for DART sharding. In development. 116 * to_sector = Represents to angle for DART sharding. In development. 117 */ 118 this(const SecureNet net, 119 string filename, 120 out Exception exception, 121 Flag!"read_only" read_only = No.read_only, 122 const ushort from_sector = 0, 123 const ushort to_sector = 0) @safe { 124 try { 125 this(net, filename, read_only, from_sector, to_sector); 126 } 127 catch (Exception e) { 128 exception = e; 129 } 130 } 131 132 /** 133 * Check if the sector is within the DART angle 134 * Params: 135 * sector = the sector in the DART 136 * Returns: true of the sector is within the DART range 137 */ 138 bool inRange(const ushort sector) pure nothrow { 139 return SectorRange.sectorInRange(sector, from_sector, to_sector); 140 } 141 142 /** 143 * Creates a SectorRange for the DART 144 * Returns: range of sectors 145 */ 146 SectorRange sectors() pure nothrow { 147 return SectorRange(from_sector, to_sector); 148 } 149 150 mixin(EnumText!(q{Queries}, Callers!DART)); 151 /** 152 * The dartBullseye method is called from opCall function 153 * This function return current database bullseye. 154 * Params: 155 received = the HiRPC received package 156 * @param read_only - !Because this function is a read only the read_only parameter has no effect 157 * @return HiRPC result that contains current database bullseye 158 */ 159 @HiRPCMethod private const(HiRPC.Sender) dartBullseye( 160 ref const(HiRPC.Receiver) received, 161 const bool read_only) 162 in { 163 mixin FUNCTION_NAME; 164 assert(received.method.name == __FUNCTION_NAME__); 165 } 166 do { 167 auto hibon_params = new HiBON; 168 hibon_params[Params.bullseye] = bullseye; 169 return hirpc.result(received, hibon_params); 170 } 171 /** 172 * The dartRead method is called from opCall function 173 * This function reads list of archive specified in the list of fingerprints. 174 175 * The result is returned as a Recorder object 176 * read from the DART 177 178 * Note: 179 * Because this function is a read only the read_only parameter has no effect 180 181 * params: received is the HiRPC package 182 * Example: 183 * --- 184 * // HiRPC metode 185 * { 186 * .... 187 * message : { 188 * method : "dartRead" 189 * params : { 190 * fingerprints : [ 191 * <GENERIC>, 192 * <GENERIC>, 193 * ..... 194 * ] 195 * } 196 * ... 197 * } 198 * } 199 200 * // HiRPC Result 201 * { 202 * .... 203 * message : { 204 * result : { 205 * recoder : <DOCUMENT> // Recorder 206 * limit : <UINT32> // Optional 207 * // This parameter is set if dart_indices list exceeds the limit 208 * } 209 * } 210 * } 211 * --- 212 */ 213 @HiRPCMethod private const(HiRPC.Sender) dartRead( 214 ref const(HiRPC.Receiver) received, 215 const bool read_only) 216 in { 217 mixin FUNCTION_NAME; 218 assert(received.method.name == __FUNCTION_NAME__); 219 } 220 do { 221 const doc_dart_indices = received.method.params[Params.dart_indices].get!(Document); 222 auto dart_indices = doc_dart_indices.range!(DARTIndex[]); 223 const recorder = loads(dart_indices, Archive.Type.ADD); 224 return hirpc.result(received, recorder.toDoc); 225 } 226 227 @HiRPCMethod private const(HiRPC.Sender) dartCheckRead( 228 ref const(HiRPC.Receiver) received, 229 const bool read_only) 230 in { 231 mixin FUNCTION_NAME; 232 assert(received.method.name == __FUNCTION_NAME__); 233 } 234 do { 235 auto doc_dart_indices = received.method.params[Params.dart_indices].get!(Document); 236 auto dart_indices = doc_dart_indices.range!(DARTIndex[]); 237 auto not_in_dart = checkload(dart_indices); 238 239 auto params = new HiBON; 240 auto params_dart_indices = new HiBON; 241 params_dart_indices = not_in_dart.map!(f => cast(Buffer) f); 242 params[Params.dart_indices] = params_dart_indices; 243 return hirpc.result(received, params); 244 } 245 246 /** 247 * The dartRim method is called from opCall function 248 * 249 * This method reads the Branches object at the specified rim 250 * 251 * Note: 252 * Because this function is a read only the read_only parameter has no effect 253 * 254 * Params: 255 * received is the HiRPC package 256 * Example: 257 * --- 258 * // HiRPC format 259 * 260 * { 261 * .... 262 * message : { 263 * method : "dartRim", 264 * params : { 265 * rims : <GENERIC> 266 * } 267 * } 268 * } 269 * 270 * // HiRPC Result 271 * { 272 * .... 273 * message : { 274 * result : { 275 * branches : <DOCUMENT> // Branches 276 * limit : <UINT32> // Optional 277 * // This parameter is set if fingerprints list exceeds the limit 278 * } 279 * } 280 * } 281 * 282 * ---- 283 */ 284 @HiRPCMethod private const(HiRPC.Sender) dartRim( 285 ref const(HiRPC.Receiver) received, 286 const bool read_only) 287 in { 288 mixin FUNCTION_NAME; 289 assert(received.method.name == __FUNCTION_NAME__); 290 } 291 do { 292 immutable params = received.params!Rims; 293 294 const rim_branches = branches(params.path); 295 HiBON hibon_params; 296 297 if (!params.key_leaves.empty) { 298 if (!rim_branches.empty) { 299 auto super_recorder = recorder; 300 foreach (key; params.key_leaves) { 301 const index=rim_branches.indices[key]; 302 if (index.isinit) { 303 HiRPC.Error not_found; 304 not_found.message=format("No archive found on %(%02x %):%02x", params.path, key); 305 super_recorder.add(not_found); 306 continue; 307 } 308 immutable data = blockfile.load(index); 309 const doc = Document(data); 310 super_recorder.add(doc); 311 312 } 313 return hirpc.result(received, super_recorder); 314 } 315 } 316 else if (!rim_branches.empty) { 317 hibon_params = rim_branches.toHiBON(true); 318 } 319 else if (params.path.length > ushort.sizeof || !params.key_leaves.empty) { 320 hibon_params = new HiBON; 321 // It not branches so maybe it is an archive 322 immutable key = params.path[$ - 1]; 323 const super_branches = branches(params.path[0 .. $ - 1]); 324 if (!super_branches.empty) { 325 const index = super_branches.indices[key]; 326 if (index != Index.init) { 327 // The archive is added to a recorder 328 immutable data = blockfile.load(index); 329 const doc = Document(data); 330 auto super_recorder = recorder; 331 super_recorder.add(doc); 332 return hirpc.result(received, super_recorder); 333 } 334 } 335 336 } 337 return hirpc.result(received, hibon_params); 338 } 339 340 /** 341 * The dartModify method is called from opCall function 342 * 343 * This function execute and modify function according to the recorder parameter 344 * 345 * Note: 346 * This function will fail if read only the read_only is true 347 * 348 * 349 * Example: 350 * --- 351 * // HiRPC format 352 * { 353 * .... 354 * message : { 355 * method : "dartModify" 356 * params : { 357 * recorder : <DOCUNENT> // Recorder object 358 * } 359 * } 360 * } 361 * 362 * // HiRPC Result 363 * { 364 * .... 365 * message : { 366 * result : { 367 * bullseye : <GENERIC> // Returns the update bullseye of the DART 368 * } 369 * } 370 * } 371 * 372 * --- 373 * Params: received is the HiRPC package 374 * Returns: HiBON Sender 375 */ 376 377 @HiRPCMethod private const(HiRPC.Sender) dartModify( 378 ref const(HiRPC.Receiver) received, 379 const bool read_only) 380 in { 381 mixin FUNCTION_NAME; 382 assert(received.method.name == __FUNCTION_NAME__); 383 } 384 do { 385 HiRPC.check(!read_only, "The DART is read only"); 386 const recorder = manufactor.recorder(received.method.params); 387 immutable bullseye = modify(recorder); 388 auto hibon_params = new HiBON; 389 hibon_params[Params.bullseye] = bullseye; 390 return hirpc.result(received, hibon_params); 391 } 392 393 /** 394 * This function handels HPRC Queries to the DART 395 * Params: 396 * received = Request HiRPC object 397 * If read_only is true deleting and erasing data in the DART will return an error 398 * Note. 399 * When the DART is accessed from an external HiRPC this flag should be kept false. 400 * 401 * Returns: 402 * The response from HPRC if the method is supported 403 * else the response return is marked empty 404 */ 405 const(HiRPC.Sender) opCall( 406 ref const(HiRPC.Receiver) received, 407 const bool read_only = true) { 408 import std.conv : to; 409 410 const method = received.method; 411 switch (method.name) { 412 static foreach (call; Callers!DART) { 413 case call: 414 enum code = format(q{return %s(received, read_only);}, call); 415 mixin(code); 416 } 417 default: 418 // Empty 419 } 420 immutable message = format("Method '%s' not supported", method.name); 421 return hirpc.error(received, message, 22); 422 } 423 424 /** 425 * Recorder journal 426 */ 427 @recordType("Journal") struct Journal { 428 Index index; 429 RecordFactory.Recorder recorder; 430 enum indexName = GetLabel!(index).name; 431 enum recorderName = GetLabel!(recorder).name; 432 /** 433 * Creator of the Journal recorder 434 * Params: 435 * manufactor = Recorder factory 436 * doc = Journal document 437 */ 438 this(RecordFactory manufactor, const Document doc) { 439 import tagion.logger.Logger; 440 441 442 443 .check(isRecord(doc), format("Document is not a %s", ThisType.stringof)); 444 index = doc[indexName].get!Index; 445 const recorder_doc = doc[recorderName].get!Document; 446 recorder = manufactor.recorder(recorder_doc); 447 } 448 /** 449 * Ditto 450 * Params: 451 * recorder = DART recorder 452 * index = index number 453 */ 454 this(const RecordFactory.Recorder recorder, const Index index) const pure nothrow @nogc { 455 this.recorder = recorder; 456 this.index = index; 457 } 458 459 mixin HiBONRecord!"{}"; 460 } 461 462 /** 463 * Creates a synchronization fiber from a synchroizer 464 * Params: 465 * synchonizer = synchronizer to be used 466 * rims = selected rim path 467 * Returns: 468 * synchronization fiber 469 */ 470 SynchronizationFiber synchronizer(Synchronizer synchonizer, const Rims rims) { 471 return new SynchronizationFiber(rims, synchonizer); 472 } 473 474 /** 475 * Synchronizer which supports synchronization from multiplet DART's 476 */ 477 @safe 478 class SynchronizationFiber : Fiber { 479 protected Synchronizer sync; 480 481 immutable(Rims) root_rims; 482 483 this(const Rims root_rims, Synchronizer sync) @trusted { 484 this.root_rims = root_rims; 485 this.sync = sync; 486 sync.set(this.outer, this, this.outer.hirpc); 487 super(&run); 488 } 489 490 protected uint _id; 491 /* 492 * Id for the HiRPC 493 * Returns: HiRPC id 494 */ 495 @property uint id() { 496 if (_id == 0) { 497 _id = hirpc.generateId(); 498 } 499 return _id; 500 } 501 502 /** 503 * Function to hanle syncronization stage for the DART 504 */ 505 final void run() 506 in { 507 assert(sync); 508 assert(blockfile); 509 } 510 do { 511 void iterate(const Rims params) @safe { 512 // 513 // Request Branches or Recorder at rims from the foreign DART. 514 // 515 const local_branches = branches(params.path); 516 const request_branches = CRUD.dartRim(rims : params, hirpc: 517 hirpc, id: 518 id); 519 const result_branches = sync.query(request_branches); 520 if (Branches.isRecord(result_branches.response.result)) { 521 const foreign_branches = result_branches.result!Branches; 522 // 523 // Read all the archives from the foreign DART 524 // 525 const request_archives = CRUD.dartRead( 526 foreign_branches 527 .dart_indices, hirpc, id); 528 const result_archives = sync.query(request_archives); 529 auto foreign_recoder = manufactor.recorder(result_archives.response.result); 530 // 531 // The rest of the fingerprints which are not in the foreign_branches must be sub-branches 532 // 533 534 auto local_recorder = recorder; 535 scope (success) { 536 sync.record(local_recorder); 537 } 538 foreach (const ubyte key; 0 .. KEY_SPAN) { 539 const sub_rims = Rims(params.path ~ key); 540 const local_print = local_branches.dart_index(key); 541 const foreign_print = foreign_branches.dart_index(key); 542 auto foreign_archive = foreign_recoder.find(foreign_print); 543 if (foreign_archive) { 544 if (local_print != foreign_print) { 545 local_recorder.insert(foreign_archive); 546 sync.removeRecursive(sub_rims); 547 } 548 } 549 else if (!foreign_print.isinit) { 550 // Foreign is poits to branches 551 if (!local_print.empty) { 552 const possible_branches_data = load(local_branches, key); 553 if (!Branches.isRecord(Document(possible_branches_data))) { 554 // If branch is an archive then it is removed because if it exists in foreign DART 555 // this archive will be added later 556 local_recorder.remove(local_print); 557 } 558 } 559 iterate(sub_rims); 560 } 561 else if (!local_print.empty) { 562 sync.removeRecursive(sub_rims); 563 } 564 } 565 } 566 else { 567 if (result_branches.isRecord!(RecordFactory.Recorder)) { 568 auto foreign_recoder = manufactor.recorder(result_branches.response.result); 569 sync.record(foreign_recoder); 570 } 571 // 572 // The foreign DART does not contain data at the rims 573 // 574 sync.removeRecursive(params); 575 } 576 } 577 578 iterate(root_rims); 579 sync.finish; 580 } 581 /** 582 * Checks if the synchronized has reached the end 583 * Returns: true if empty 584 */ 585 final bool empty() const pure nothrow { 586 return sync.empty; 587 } 588 } 589 590 /** 591 * Replays the journal file to update the DART 592 * The update blockfile can be generated from the synchroning process from an foreign dart 593 * 594 * If the process is broken for some reason this the resumed by running the replay function again 595 * on the same block file 596 * 597 * Params: 598 * journal_filename = Name of the BlockFile to be replaied 599 * 600 * Throws: 601 * The function will throw an exception if something went wrong in the process. 602 */ 603 void replay(const(string) journal_filename) { 604 auto journalfile = BlockFile(journal_filename, Yes.read_only); 605 scope (exit) { 606 journalfile.close; 607 } 608 // Adding and Removing archives 609 610 for (Index index = journalfile.masterBlock.root_index; index != Index.init;) { 611 immutable data = journalfile.load(index); 612 const doc = Document(data); 613 614 auto journal_replay = Journal(manufactor, doc); 615 index = journal_replay.index; 616 auto action_recorder = recorder; 617 action_recorder.insert(journal_replay.recorder.archives[]); 618 modify(action_recorder); 619 } 620 621 } 622 623 version (unittest) { 624 static class TestSynchronizer : JournalSynchronizer { 625 protected DART foreign_dart; 626 protected DART owner; 627 this(BlockFile journalfile, DART owner, DART foreign_dart) { 628 this.foreign_dart = foreign_dart; 629 this.owner = owner; 630 super(journalfile); 631 } 632 633 // 634 // This function emulates the connection between two DART's 635 // in a single thread 636 // 637 const(HiRPC.Receiver) query(ref const(HiRPC.Sender) request) { 638 Document send_request_to_foreign_dart(const Document foreign_doc) { 639 // 640 // Remote excution 641 // Receive on the foreign end 642 const foreign_receiver = foreign_dart.hirpc.receive(foreign_doc); 643 // Make query in to the foreign DART 644 const foreign_response = foreign_dart(foreign_receiver); 645 646 return foreign_response.toDoc; 647 } 648 649 immutable foreign_doc = request.toDoc; 650 (() @trusted { fiber.yield; })(); 651 // Here a yield loop should be implement to poll for response from the foriegn DART 652 // A timeout should also be implemented in this poll loop 653 const response_doc = send_request_to_foreign_dart(foreign_doc); 654 // 655 // Process the response returned for the foreign DART 656 // 657 const received = owner.hirpc.receive(response_doc); 658 return received; 659 } 660 } 661 662 } 663 664 ///Examples: how use the DART 665 unittest { 666 import tagion.basic.basic : assumeTrusted, tempfile; 667 import tagion.dart.BlockFile; 668 import tagion.dart.DARTFakeNet : DARTFakeNet; 669 import tagion.dart.Recorder; 670 import tagion.utils.Random; 671 672 enum TEST_BLOCK_SIZE = 0x80; 673 674 auto net = new DARTFakeNet("very_secret"); 675 676 immutable filename = fileId!DART.fullpath; 677 immutable filename_A = fileId!DART("A_").fullpath; 678 immutable filename_B = fileId!DART("B_").fullpath; 679 immutable filename_C = fileId!DART("C_").fullpath; 680 681 { // Remote Synchronization test 682 683 import std.file : remove; 684 685 auto rand = Random!ulong(1234_5678_9012_345UL); 686 enum N = 1000; 687 auto random_tabel = new ulong[N]; 688 foreach (ref r; random_tabel) { 689 immutable sector = rand.value(0x0000_0000_0000_ABBAUL, 0x0000_0000_0000_ABBDUL) << ( 690 8 * 6); 691 r = rand.value(0x0000_1234_5678_0000UL | sector, 0x0000_1334_FFFF_0000UL | sector); 692 } 693 694 // 695 // The the following unittest dart A and B covers the same range angle 696 // 697 enum from = 0xABB9; 698 enum to = 0xABBD; 699 700 // import std.stdio; 701 { // Single element same sector sectors 702 const ulong[] same_sector_tabel = [ 703 0xABB9_13ab_cdef_1234, 704 0xABB9_14ab_cdef_1234, 705 0xABB9_15ab_cdef_1234 706 707 ]; 708 // writefln("Test 0.0"); 709 foreach (test_no; 0 .. 3) { 710 DARTFile.create(filename_A, net); 711 DARTFile.create(filename_B, net); 712 RecordFactory.Recorder recorder_B; 713 RecordFactory.Recorder recorder_A; 714 // Recorder recorder_B; 715 auto dart_A = new DART(net, filename_A, No.read_only, from, to); 716 auto dart_B = new DART(net, filename_B, No.read_only, from, to); 717 string[] journal_filenames; 718 scope (success) { 719 // writefln("Exit scope"); 720 dart_A.close; 721 dart_B.close; 722 filename_A.remove; 723 filename_B.remove; 724 foreach (journal_filename; journal_filenames) { 725 journal_filename.remove; 726 } 727 } 728 729 switch (test_no) { 730 case 0: 731 write(dart_A, same_sector_tabel[0 .. 1], recorder_A); 732 write(dart_B, same_sector_tabel[0 .. 0], recorder_B); 733 break; 734 case 1: 735 write(dart_A, same_sector_tabel[0 .. 1], recorder_A); 736 write(dart_B, same_sector_tabel[1 .. 2], recorder_B); 737 break; 738 case 2: 739 write(dart_A, same_sector_tabel[0 .. 2], recorder_A); 740 write(dart_B, same_sector_tabel[1 .. 3], recorder_B); 741 break; 742 default: 743 assert(0); 744 } 745 //writefln("\n------ %d ------", test_no); 746 //writefln("dart_A.dump"); 747 //dart_A.dump; 748 //writefln("dart_B.dump"); 749 //dart_B.dump; 750 //writefln("dart_A.fingerprint=%s", dart_A.fingerprint.cutHex); 751 //writefln("dart_B.fingerprint=%s", dart_B.fingerprint.cutHex); 752 753 foreach (sector; dart_A.sectors) { 754 immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector); 755 journal_filenames ~= journal_filename; 756 BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE); 757 auto journalfile = BlockFile(journal_filename); 758 auto synch = new TestSynchronizer(journalfile, dart_A, dart_B); 759 auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector)); 760 // D!(sector, "%x"); 761 while (!dart_A_synchronizer.empty) { 762 (() @trusted => dart_A_synchronizer.call)(); 763 } 764 } 765 foreach (journal_filename; journal_filenames) { 766 dart_A.replay(journal_filename); 767 } 768 //writefln("dart_A.dump"); 769 //dart_A.dump; 770 //writefln("dart_B.dump"); 771 //dart_B.dump; 772 //writefln("dart_A.fingerprint=%s", dart_A.fingerprint.cutHex); 773 //writefln("dart_B.fingerprint=%s", dart_B.fingerprint.cutHex); 774 775 assert(dart_A.fingerprint == dart_B.fingerprint); 776 if (test_no == 0) { 777 assert(dart_A.fingerprint.isinit); 778 } 779 else { 780 assert(!dart_A.fingerprint.isinit); 781 } 782 } 783 } 784 785 { // Single element different sectors 786 // 787 // writefln("Test 0.1"); 788 DARTFile.create(filename_A, net); 789 DARTFile.create(filename_B, net); 790 RecordFactory.Recorder recorder_B; 791 RecordFactory.Recorder recorder_A; 792 // Recorder recorder_B; 793 auto dart_A = new DART(net, filename_A, No.read_only, from, to); 794 auto dart_B = new DART(net, filename_B, No.read_only, from, to); 795 string[] journal_filenames; 796 scope (success) { 797 // writefln("Exit scope"); 798 dart_A.close; 799 dart_B.close; 800 filename_A.remove; 801 filename_B.remove; 802 foreach (journal_filename; journal_filenames) { 803 journal_filename.remove; 804 } 805 } 806 807 write(dart_B, random_tabel[0 .. 1], recorder_B); 808 write(dart_A, random_tabel[1 .. 2], recorder_A); 809 // writefln("dart_A.dump"); 810 // dart_A.dump; 811 // writefln("dart_B.dump"); 812 // dart_B.dump; 813 814 foreach (sector; dart_A.sectors) { 815 immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector); 816 journal_filenames ~= journal_filename; 817 BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE); 818 auto journalfile = BlockFile(journal_filename); 819 auto synch = new TestSynchronizer(journalfile, dart_A, dart_B); 820 auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector)); 821 // D!(sector, "%x"); 822 while (!dart_A_synchronizer.empty) { 823 (() @trusted => dart_A_synchronizer.call)(); 824 } 825 } 826 foreach (journal_filename; journal_filenames) { 827 dart_A.replay(journal_filename); 828 } 829 // writefln("dart_A.dump"); 830 // dart_A.dump; 831 // writefln("dart_B.dump"); 832 // dart_B.dump; 833 assert(!dart_A.fingerprint.isinit); 834 assert(dart_A.fingerprint == dart_B.fingerprint); 835 } 836 { // Synchronization of an empty DART 837 // from DART A against DART B with ONE archive when DART A is empty 838 DARTFile.create(filename_A, net); 839 DARTFile.create(filename_B, net); 840 RecordFactory.Recorder recorder_B; 841 // Recorder recorder_B; 842 auto dart_A = new DART(net, filename_A, No.read_only, from, to); 843 auto dart_B = new DART(net, filename_B, No.read_only, from, to); 844 // 845 string[] journal_filenames; 846 scope (success) { 847 // writefln("Exit scope"); 848 dart_A.close; 849 dart_B.close; 850 filename_A.remove; 851 filename_B.remove; 852 foreach (journal_filename; journal_filenames) { 853 journal_filename.remove; 854 } 855 } 856 857 const ulong[] single_archive = [0xABB9_13ab_11ef_0923]; 858 859 write(dart_B, single_archive, recorder_B); 860 // dart_B.dump; 861 862 // 863 // Synchronize DART_B -> DART_A 864 // 865 // Collecting the journal file 866 867 foreach (sector; dart_A.sectors) { 868 immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector); 869 journal_filenames ~= journal_filename; 870 BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE); 871 auto journalfile = BlockFile(journal_filename); 872 auto synch = new TestSynchronizer(journalfile, dart_A, dart_B); 873 auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector)); 874 // D!(sector, "%x"); 875 while (!dart_A_synchronizer.empty) { 876 (() @trusted => dart_A_synchronizer.call)(); 877 } 878 } 879 foreach (journal_filename; journal_filenames) { 880 dart_A.replay(journal_filename); 881 } 882 // writefln("dart_A.dump"); 883 // dart_A.dump; 884 // writefln("dart_B.dump"); 885 // dart_B.dump; 886 assert(!dart_A.fingerprint.isinit); 887 assert(dart_A.fingerprint == dart_B.fingerprint); 888 889 } 890 { // Synchronization of an empty DART 891 // from DART A against DART B when DART A is empty 892 // writefln("Test 1"); 893 894 DARTFile.create(filename_A, net); 895 DARTFile.create(filename_B, net); 896 RecordFactory.Recorder recorder_B; 897 // Recorder recorder_B; 898 auto dart_A = new DART(net, filename_A, No.read_only, from, to); 899 auto dart_B = new DART(net, filename_B, No.read_only, from, to); 900 // 901 string[] journal_filenames; 902 scope (success) { 903 // writefln("Exit scope"); 904 dart_A.close; 905 dart_B.close; 906 filename_A.remove; 907 filename_B.remove; 908 foreach (journal_filename; journal_filenames) { 909 journal_filename.remove; 910 } 911 } 912 913 write(dart_B, random_tabel[0 .. 17], recorder_B); 914 // writefln("dart_A.dump"); 915 // dart_A.dump; 916 // writefln("dart_B.dump"); 917 // dart_B.dump; 918 919 // 920 // Synchronize DART_B -> DART_A 921 // 922 // Collecting the journal file 923 924 foreach (sector; dart_A.sectors) { 925 immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector); 926 journal_filenames ~= journal_filename; 927 BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE); 928 auto journalfile = BlockFile(journal_filename); 929 auto synch = new TestSynchronizer(journalfile, dart_A, dart_B); 930 auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector)); 931 // D!(sector, "%x"); 932 while (!dart_A_synchronizer.empty) { 933 (() @trusted => dart_A_synchronizer.call)(); 934 } 935 } 936 foreach (journal_filename; journal_filenames) { 937 dart_A.replay(journal_filename); 938 } 939 // writefln("dart_A.dump"); 940 // dart_A.dump; 941 // writefln("dart_B.dump"); 942 // dart_B.dump; 943 assert(!dart_A.fingerprint.isinit); 944 assert(dart_A.fingerprint == dart_B.fingerprint); 945 946 } 947 948 { // Synchronization of a DART A which is a subset of DART B 949 // writefln("Test 2"); 950 DARTFile.create(filename_A, net); 951 DARTFile.create(filename_B, net); 952 RecordFactory.Recorder recorder_A; 953 RecordFactory.Recorder recorder_B; 954 auto dart_A = new DART(net, filename_A, No.read_only, from, to); 955 auto dart_B = new DART(net, filename_B, No.read_only, from, to); 956 // 957 string[] journal_filenames; 958 scope (success) { 959 // writefln("Exit scope"); 960 dart_A.close; 961 dart_B.close; 962 filename_A.remove; 963 filename_B.remove; 964 } 965 966 write(dart_A, random_tabel[0 .. 17], recorder_A); 967 write(dart_B, random_tabel[0 .. 27], recorder_B); 968 // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex, dart_B.fingerprint.cutHex); 969 // writefln("dart_A.dump"); 970 // dart_A.dump; 971 // writefln("dart_B.dump"); 972 // dart_B.dump; 973 assert(!dart_A.fingerprint.isinit); 974 assert(dart_A.fingerprint != dart_B.fingerprint); 975 976 foreach (sector; dart_A.sectors) { 977 immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector); 978 journal_filenames ~= journal_filename; 979 BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE); 980 auto journalfile = BlockFile(journal_filename); 981 auto synch = new TestSynchronizer(journalfile, dart_A, dart_B); 982 auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector)); 983 // D!(sector, "%x"); 984 while (!dart_A_synchronizer.empty) { 985 (() @trusted { dart_A_synchronizer.call; })(); 986 } 987 } 988 989 foreach (journal_filename; journal_filenames) { 990 dart_A.replay(journal_filename); 991 } 992 // writefln("dart_A.dump"); 993 // dart_A.dump; 994 // writefln("dart_B.dump"); 995 // dart_B.dump; 996 assert(!dart_A.fingerprint.isinit); 997 assert(dart_A.fingerprint == dart_B.fingerprint); 998 999 } 1000 1001 { // Synchronization of a DART A where DART A is a superset of DART B 1002 // writefln("Test 3"); 1003 DARTFile.create(filename_A, net); 1004 DARTFile.create(filename_B, net); 1005 RecordFactory.Recorder recorder_A; 1006 RecordFactory.Recorder recorder_B; 1007 auto dart_A = new DART(net, filename_A, No.read_only, from, to); 1008 auto dart_B = new DART(net, filename_B, No.read_only, from, to); 1009 // 1010 string[] journal_filenames; 1011 scope (success) { 1012 // writefln("Exit scope"); 1013 dart_A.close; 1014 dart_B.close; 1015 filename_A.remove; 1016 filename_B.remove; 1017 } 1018 1019 write(dart_A, random_tabel[0 .. 27], recorder_A); 1020 write(dart_B, random_tabel[0 .. 17], recorder_B); 1021 // write(dart_B, random_table[0..17], recorder_B); 1022 // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex, dart_B.fingerprint.cutHex); 1023 // writefln("dart_A.dump"); 1024 // dart_A.dump; 1025 // writefln("dart_B.dump"); 1026 // dart_B.dump; 1027 assert(!dart_A.fingerprint.isinit); 1028 assert(dart_A.fingerprint != dart_B.fingerprint); 1029 1030 foreach (sector; dart_A.sectors) { 1031 immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector); 1032 journal_filenames ~= journal_filename; 1033 BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE); 1034 auto journalfile = BlockFile(journal_filename); 1035 auto synch = new TestSynchronizer(journalfile, dart_A, dart_B); 1036 auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector)); 1037 // D!(sector, "%x"); 1038 while (!dart_A_synchronizer.empty) { 1039 (() @trusted { dart_A_synchronizer.call; })(); 1040 } 1041 } 1042 1043 foreach (journal_filename; journal_filenames) { 1044 dart_A.replay(journal_filename); 1045 } 1046 // writefln("dart_A.dump"); 1047 // dart_A.dump; 1048 // writefln("dart_B.dump"); 1049 // dart_B.dump; 1050 assert(!dart_A.fingerprint.isinit); 1051 assert(dart_A.fingerprint == dart_B.fingerprint); 1052 1053 } 1054 1055 { // Synchronization of a DART A where DART A is complementary of DART B 1056 // writefln("Test 4"); 1057 DARTFile.create(filename_A, net); 1058 DARTFile.create(filename_B, net); 1059 RecordFactory.Recorder recorder_A; 1060 RecordFactory.Recorder recorder_B; 1061 auto dart_A = new DART(net, filename_A, No.read_only, from, to); 1062 auto dart_B = new DART(net, filename_B, No.read_only, from, to); 1063 // 1064 string[] journal_filenames; 1065 scope (success) { 1066 // writefln("Exit scope"); 1067 dart_A.close; 1068 dart_B.close; 1069 filename_A.remove; 1070 filename_B.remove; 1071 } 1072 1073 write(dart_A, random_tabel[0 .. 27], recorder_A); 1074 write(dart_B, random_tabel[28 .. 54], recorder_B); 1075 // write(dart_B, random_table[0..17], recorder_B); 1076 // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex, dart_B.fingerprint.cutHex); 1077 // writefln("dart_A.dump"); 1078 // dart_A.dump; 1079 // writefln("dart_B.dump"); 1080 // dart_B.dump; 1081 assert(!dart_A.fingerprint.isinit); 1082 assert(dart_A.fingerprint != dart_B.fingerprint); 1083 1084 foreach (sector; dart_A.sectors) { 1085 immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector); 1086 journal_filenames ~= journal_filename; 1087 BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE); 1088 auto journalfile = BlockFile(journal_filename); 1089 auto synch = new TestSynchronizer(journalfile, dart_A, dart_B); 1090 auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector)); 1091 // D!(sector, "%x"); 1092 while (!dart_A_synchronizer.empty) { 1093 (() @trusted { dart_A_synchronizer.call; })(); 1094 } 1095 } 1096 1097 foreach (journal_filename; journal_filenames) { 1098 // writefln("JOURNAL_FILENAME=%s", journal_filename); 1099 dart_A.replay(journal_filename); 1100 } 1101 // writefln("dart_A.dump"); 1102 // dart_A.dump; 1103 // writefln("dart_B.dump"); 1104 // dart_B.dump; 1105 assert(!dart_A.fingerprint.isinit); 1106 assert(dart_A.fingerprint == dart_B.fingerprint); 1107 } 1108 1109 { // Synchronization of a DART A where DART A of DART B has common data 1110 // writefln("Test 5"); 1111 DARTFile.create(filename_A, net); 1112 DARTFile.create(filename_B, net); 1113 RecordFactory.Recorder recorder_A; 1114 RecordFactory.Recorder recorder_B; 1115 auto dart_A = new DART(net, filename_A, No.read_only, from, to); 1116 auto dart_B = new DART(net, filename_B, No.read_only, from, to); 1117 // 1118 string[] journal_filenames; 1119 scope (success) { 1120 // writefln("Exit scope"); 1121 dart_A.close; 1122 dart_B.close; 1123 filename_A.remove; 1124 filename_B.remove; 1125 } 1126 1127 write(dart_A, random_tabel[0 .. 54], recorder_A); 1128 write(dart_B, random_tabel[28 .. 81], recorder_B); 1129 // write(dart_B, random_table[0..17], recorder_B); 1130 // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex, dart_B.fingerprint.cutHex); 1131 // writefln("dart_A.dump"); 1132 // dart_A.dump; 1133 // writefln("dart_B.dump"); 1134 // dart_B.dump; 1135 assert(!dart_A.fingerprint.isinit); 1136 assert(dart_A.fingerprint != dart_B.fingerprint); 1137 1138 foreach (sector; dart_A.sectors) { 1139 immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector); 1140 journal_filenames ~= journal_filename; 1141 BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE); 1142 auto journalfile = BlockFile(journal_filename); 1143 auto synch = new TestSynchronizer(journalfile, dart_A, dart_B); 1144 auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector)); 1145 while (!dart_A_synchronizer.empty) { 1146 (() @trusted { dart_A_synchronizer.call; })(); 1147 } 1148 } 1149 1150 foreach (journal_filename; journal_filenames) { 1151 dart_A.replay(journal_filename); 1152 } 1153 // writefln("dart_A.dump"); 1154 // dart_A.dump; 1155 // writefln("dart_B.dump"); 1156 // dart_B.dump; 1157 assert(!dart_A.fingerprint.isinit); 1158 assert(dart_A.fingerprint == dart_B.fingerprint); 1159 1160 } 1161 pragma(msg, "fixme(pr) Test disabled because it takes a long time"); 1162 version (none) { // Synchronization of a Large DART A where DART A of DART B has common data 1163 // writefln("Test 6"); 1164 DARTFile.create(filename_A, net); 1165 DARTFile.create(filename_B, net); 1166 RecordFactory.Recorder recorder_A; 1167 RecordFactory.Recorder recorder_B; 1168 auto dart_A = new DART(net, filename_A, from, to); 1169 auto dart_B = new DART(net, filename_B, from, to); 1170 // 1171 string[] journal_filenames; 1172 scope (success) { 1173 // writefln("Exit scope"); 1174 dart_A.close; 1175 dart_B.close; 1176 filename_A.remove; 1177 filename_B.remove; 1178 } 1179 1180 write(dart_A, random_tabel[0 .. 544], recorder_A); 1181 write(dart_B, random_tabel[288 .. 811], recorder_B); 1182 // write(dart_B, random_table[0..17], recorder_B); 1183 // writefln("bulleye_A=%s bulleye_B=%s", dart_A.fingerprint.cutHex, dart_B.fingerprint.cutHex); 1184 // writefln("dart_A.dump"); 1185 // dart_A.dump; 1186 // writefln("dart_B.dump"); 1187 // dart_B.dump; 1188 assert(!dart_A.fingerprint.isinit); 1189 assert(dart_A.fingerprint != dart_B.fingerprint); 1190 1191 foreach (sector; dart_A.sectors) { 1192 immutable journal_filename = format("%s.%04x.dart_journal", tempfile, sector); 1193 journal_filenames ~= journal_filename; 1194 BlockFile.create(journal_filename, DART.stringof, TEST_BLOCK_SIZE); 1195 auto synch = new TestSynchronizer(journal_filename, dart_A, dart_B); 1196 auto dart_A_synchronizer = dart_A.synchronizer(synch, Rims(sector)); 1197 while (!dart_A_synchronizer.empty) { 1198 (() @trusted { dart_A_synchronizer.call; })(); 1199 } 1200 } 1201 1202 foreach (journal_filename; journal_filenames) { 1203 dart_A.replay(journal_filename); 1204 } 1205 // writefln("dart_A.dump"); 1206 // dart_A.dump; 1207 // writefln("dart_B.dump"); 1208 //dart_B.dump; 1209 assert(!dart_A.fingerprint.isinit); 1210 assert(dart_A.fingerprint == dart_B.fingerprint); 1211 } 1212 1213 } 1214 } 1215 1216 }