1 module tagion.dart.synchronizer; 2 3 import tagion.communication.HiRPC; 4 5 alias HiRPCSender = HiRPC.Sender; 6 alias HiRPCReceiver = HiRPC.Receiver; 7 import tagion.dart.BlockFile; 8 import tagion.dart.DART; 9 import tagion.dart.DARTFile; 10 import tagion.dart.DARTRim; 11 import tagion.dart.Recorder; 12 import tagion.hibon.Document; 13 14 /** 15 * Interface to the DART synchronizer 16 */ 17 @safe 18 interface Synchronizer { 19 /** 20 * Recommend to put a yield the SynchronizationFiber between send and receive between the DART's 21 */ 22 const(HiRPC.Receiver) query(ref const(HiRPC.Sender) request); 23 /** 24 * Stores the add and remove actions in the journal replay log file 25 * 26 * Params: 27 * recorder = DART recorder 28 */ 29 void record(RecordFactory.Recorder recorder); 30 /** 31 * This function is called when whole branches doesn't exist in the foreign DART 32 * and need to be removed in the local DART 33 * Params: 34 * rims = path to the selected rim 35 */ 36 void removeRecursive(const Rims rims); 37 /** 38 * This function is called when the SynchronizationFiber run-function finishes 39 */ 40 void finish(); 41 /** 42 * Called in by the SynchronizationFiber constructor 43 * which enable the query function to yield the run function in SynchronizationFiber 44 * 45 * Params: 46 * owner = is the dart to be modified 47 * fiber = is the synchronizer fiber object 48 */ 49 void set(DART owner, DART.SynchronizationFiber fiber, HiRPC hirpc); 50 /** 51 * Checks if the syncronizer is empty 52 * Returns: 53 * If the SynchronizationFiber has finished then this function returns `true` 54 */ 55 bool empty() const pure nothrow; 56 } 57 58 /** 59 * Standards DART Synchronization object 60 */ 61 @safe 62 abstract class StdSynchronizer : Synchronizer { 63 64 protected DART.SynchronizationFiber fiber; /// Contains the reference to SynchronizationFiber 65 immutable uint chunck_size; /// Max number of archives operates in one recorder action 66 protected { 67 // BlockFile journalfile; /// The actives is stored in this journal file. Which late can be run via the replay function 68 bool _finished; /// Finish flag set when the Fiber function returns 69 bool _timeout; /// Set via the timeout method to indicate and network timeout 70 DART owner; 71 // Index index; /// Current block index 72 HiRPC hirpc; 73 } 74 this(const uint chunck_size = 0x400) { 75 this.chunck_size = chunck_size; 76 } 77 78 /** 79 * Remove all archive at selected rim path 80 * Params: 81 * selected_rims = selected rims to be removed 82 */ 83 void removeRecursive(const Rims selected_rims) { 84 85 auto rim_walker = owner.rimWalkerRange(selected_rims.path); 86 uint count = 0; 87 auto recorder_worker = owner.recorder; 88 foreach (archive_data; rim_walker) { 89 const archive_doc = Document(archive_data); 90 assert(!archive_doc.empty, "archive should not be empty"); 91 recorder_worker.remove(archive_doc); 92 count++; 93 if (count > chunck_size) { 94 record(recorder_worker); 95 count = 0; 96 recorder_worker.clear; 97 } 98 } 99 record(recorder_worker); 100 } 101 102 /** 103 * Should be called when the synchronization has finished 104 */ 105 void finish() { 106 _finished = true; 107 } 108 109 /** 110 * 111 * Params: 112 * owner = DART to be synchronized 113 * fiber = syncronizer fiber 114 * hirpc = remote credential used 115 */ 116 void set( 117 DART owner, 118 DART.SynchronizationFiber fiber, 119 HiRPC hirpc) nothrow @trusted { 120 import std.conv : emplace; 121 122 this.fiber = fiber; 123 this.owner = owner; 124 emplace(&this.hirpc, hirpc); 125 } 126 127 /** 128 * Should be call on timeout timeout 129 */ 130 void timeout() { 131 // journalfile.close; 132 _timeout = true; 133 } 134 135 /** 136 * Checks if synchronization has ended 137 * Returns: true on empty 138 */ 139 bool empty() const pure nothrow { 140 return (_finished || _timeout); 141 } 142 /* 143 * Check the synchronization timeout 144 * Returns: true on timeout 145 */ 146 bool timeout() const pure nothrow { 147 return _timeout; 148 } 149 } 150 151 @safe 152 class JournalSynchronizer : StdSynchronizer { 153 protected { 154 BlockFile journalfile; /// The actives is stored in this journal file. Which later can be run via the replay function 155 Index index; /// Current block index 156 } 157 this(BlockFile journalfile, const uint chunck_size = 0x400) { 158 this.journalfile = journalfile; 159 super(chunck_size); 160 } 161 162 /** 163 * Update and adds the recorder to the journal and store it 164 * Params: 165 * recorder = DART recorder 166 */ 167 void record(const RecordFactory.Recorder recorder) @safe { 168 if (!recorder.empty) { 169 const journal = const(DART.Journal)(recorder, index); 170 const allocated = journalfile.save(journal.toDoc); 171 index = Index(allocated.index); 172 journalfile.root_index = index; 173 scope (exit) { 174 journalfile.store; 175 } 176 } 177 } 178 /** 179 * Should be called when the synchronization has finished 180 */ 181 override void finish() { 182 journalfile.close; 183 super.finish; 184 _finished = true; 185 } 186 187 }