1 module tagion.tools.dartutil.synchronize; 2 3 import std.file : remove; 4 import std.format; 5 import tagion.communication.HiRPC; 6 import tagion.dart.BlockFile; 7 import tagion.dart.DART; 8 import tagion.dart.DARTRim; 9 import tagion.dart.synchronizer; 10 import tagion.hibon.Document; 11 import tagion.tools.Basic : nobose, noboseln, verbose; 12 import tagion.utils.Term; 13 14 @safe 15 class DARTUtilSynchronizer : JournalSynchronizer { 16 protected DART source; 17 protected DART destination; 18 this(BlockFile journalfile, DART destination, DART source) { 19 this.source = source; 20 this.destination = destination; 21 super(journalfile); 22 } 23 24 // 25 // This function emulates the connection between two DART's 26 // in a single thread 27 // 28 const(HiRPC.Receiver) query(ref const(HiRPC.Sender) request) { 29 Document send_request_to_source(const Document foreign_doc) { 30 // 31 // Remote excution 32 // Receive on the foreign end 33 const foreign_receiver = source.hirpc.receive(foreign_doc); 34 // Make query in to the foreign DART 35 const foreign_response = source(foreign_receiver); 36 37 return foreign_response.toDoc; 38 } 39 40 immutable foreign_doc = request.toDoc; 41 (() @trusted { fiber.yield; })(); 42 // Here a yield loop should be implement to poll for response from the foriegn DART 43 // A timeout should also be implemented in this poll loop 44 const response_doc = send_request_to_source(foreign_doc); 45 // 46 // Process the response returned for the foreign DART 47 // 48 const received = destination.hirpc.receive(response_doc); 49 return received; 50 } 51 52 override void finish() { 53 // journalfile.close; 54 _finished = true; 55 } 56 57 } 58 59 @safe 60 string[] synchronize(DART destination, DART source, string journal_basename) { 61 string[] journal_filenames; 62 uint count; 63 enum line_width = 32; 64 // foreach (sector; destination.sectors) { 65 foreach (ushort _rim; 0 .. ubyte.max + 1) { 66 ushort sector = cast(ushort)(_rim << 8); 67 const(ubyte)[] rims; 68 verbose("Sector %04x", sector); 69 immutable journal_filename = format("%s.%04x.dart_journal", journal_basename, sector); 70 BlockFile.create(journal_filename, DART.stringof, BLOCK_SIZE); 71 72 auto journalfile = BlockFile(journal_filename); 73 scope (exit) { 74 if (!journalfile.empty) { 75 journal_filenames ~= journal_filename; 76 verbose("Journalfile %s", journal_filename); 77 nobose("%s#%s", YELLOW, RESET); 78 79 } 80 else { 81 nobose("%sX%s", BLUE, RESET); 82 } 83 count++; 84 if (count % line_width == 0) { 85 noboseln("!"); 86 } 87 journalfile.close; 88 } 89 auto synch = new DARTUtilSynchronizer(journalfile, destination, source); 90 91 auto destination_synchronizer = destination.synchronizer(synch, Rims([cast(ubyte) _rim])); 92 while (!destination_synchronizer.empty) { 93 (() @trusted { destination_synchronizer.call; })(); 94 } 95 96 } 97 noboseln("Replay journal filenames"); 98 verbose("Replay journal filenames"); 99 count = 0; 100 foreach (journal_filename; journal_filenames) { 101 destination.replay(journal_filename); 102 verbose("Replay %s", journal_filename); 103 nobose("%s*%s", GREEN, RESET); 104 count++; 105 if (count % line_width == 0) { 106 noboseln("!"); 107 } 108 } 109 noboseln("\n%d journal files has been syncronized", count); 110 return journal_filenames; 111 }