1 module tagion.dart.synchronizer;
2 
3 import tagion.communication.HiRPC;
4 
5 alias HiRPCSender = HiRPC.Sender;
6 alias HiRPCReceiver = HiRPC.Receiver;
7 import tagion.dart.BlockFile;
8 import tagion.dart.DART;
9 import tagion.dart.DARTFile;
10 import tagion.dart.DARTRim;
11 import tagion.dart.Recorder;
12 import tagion.hibon.Document;
13 
14 /**
15 * Interface to the DART synchronizer
16 */
17 @safe
18 interface Synchronizer {
19     /**
20         * Recommend to put a yield the SynchronizationFiber between send and receive between the DART's
21         */
22     const(HiRPC.Receiver) query(ref const(HiRPC.Sender) request);
23     /**
24         * Stores the add and remove actions in the journal replay log file
25         * 
26         * Params:
27         *   recorder = DART recorder
28         */
29     void record(RecordFactory.Recorder recorder);
30     /**
31         * This function is called when whole branches doesn't exist in the foreign DART
32         * and need to be removed in the local DART
33         * Params:
34         *   rims = path to the selected rim
35         */
36     void removeRecursive(const Rims rims);
37     /**
38         * This function is called when the SynchronizationFiber run-function finishes
39         */
40     void finish();
41     /**
42         * Called in by the SynchronizationFiber constructor
43         * which enable the query function to yield the run function in SynchronizationFiber
44         *
45         * Params:
46         *     owner = is the dart to be modified
47         *     fiber = is the synchronizer fiber object
48         */
49     void set(DART owner, DART.SynchronizationFiber fiber, HiRPC hirpc);
50     /**
51         * Checks if the syncronizer is empty
52         * Returns:
53         *     If the SynchronizationFiber has finished then this function returns `true`
54         */
55     bool empty() const pure nothrow;
56 }
57 
58 /**
59     *  Standards DART Synchronization object
60     */
61 @safe
62 abstract class StdSynchronizer : Synchronizer {
63 
64     protected DART.SynchronizationFiber fiber; /// Contains the reference to SynchronizationFiber
65     immutable uint chunck_size; /// Max number of archives operates in one recorder action
66     protected {
67         //        BlockFile journalfile; /// The actives is stored in this journal file. Which late can be run via the replay function
68         bool _finished; /// Finish flag set when the Fiber function returns
69         bool _timeout; /// Set via the timeout method to indicate and network timeout
70         DART owner;
71         //        Index index; /// Current block index
72         HiRPC hirpc;
73     }
74     this(const uint chunck_size = 0x400) {
75         this.chunck_size = chunck_size;
76     }
77 
78     /** 
79         * Remove all archive at selected rim path
80         * Params:
81         *   selected_rims = selected rims to be removed
82         */
83     void removeRecursive(const Rims selected_rims) {
84 
85         auto rim_walker = owner.rimWalkerRange(selected_rims.path);
86         uint count = 0;
87         auto recorder_worker = owner.recorder;
88         foreach (archive_data; rim_walker) {
89             const archive_doc = Document(archive_data);
90             assert(!archive_doc.empty, "archive should not be empty");
91             recorder_worker.remove(archive_doc);
92             count++;
93             if (count > chunck_size) {
94                 record(recorder_worker);
95                 count = 0;
96                 recorder_worker.clear;
97             }
98         }
99         record(recorder_worker);
100     }
101 
102     /** 
103         * Should be called when the synchronization has finished
104         */
105     void finish() {
106         _finished = true;
107     }
108 
109     /**
110         * 
111         * Params:
112         *   owner = DART to be synchronized
113         *   fiber = syncronizer fiber
114         *   hirpc = remote credential used 
115         */
116     void set(
117             DART owner,
118             DART.SynchronizationFiber fiber,
119             HiRPC hirpc) nothrow @trusted {
120         import std.conv : emplace;
121 
122         this.fiber = fiber;
123         this.owner = owner;
124         emplace(&this.hirpc, hirpc);
125     }
126 
127     /**
128         * Should be call on timeout timeout
129         */
130     void timeout() {
131         //  journalfile.close;
132         _timeout = true;
133     }
134 
135     /**
136         * Checks if synchronization has ended
137         * Returns: true on empty
138         */
139     bool empty() const pure nothrow {
140         return (_finished || _timeout);
141     }
142     /* 
143         * Check the synchronization timeout
144         * Returns: true on timeout
145         */
146     bool timeout() const pure nothrow {
147         return _timeout;
148     }
149 }
150 
151 @safe
152 class JournalSynchronizer : StdSynchronizer {
153     protected {
154         BlockFile journalfile; /// The actives is stored in this journal file. Which later can be run via the replay function
155         Index index; /// Current block index
156     }
157     this(BlockFile journalfile, const uint chunck_size = 0x400) {
158         this.journalfile = journalfile;
159         super(chunck_size);
160     }
161 
162     /** 
163         * Update and adds the recorder to the journal and store it
164         * Params:
165         *   recorder = DART recorder
166         */
167     void record(const RecordFactory.Recorder recorder) @safe {
168         if (!recorder.empty) {
169             const journal = const(DART.Journal)(recorder, index);
170             const allocated = journalfile.save(journal.toDoc);
171             index = Index(allocated.index);
172             journalfile.root_index = index;
173             scope (exit) {
174                 journalfile.store;
175             }
176         }
177     }
178     /** 
179         * Should be called when the synchronization has finished
180         */
181     override void finish() {
182         journalfile.close;
183         super.finish;
184         _finished = true;
185     }
186 
187 }