1 module tagion.tools.dartutil.synchronize;
2 
3 import std.file : remove;
4 import std.format;
5 import tagion.communication.HiRPC;
6 import tagion.dart.BlockFile;
7 import tagion.dart.DART;
8 import tagion.dart.DARTRim;
9 import tagion.dart.synchronizer;
10 import tagion.hibon.Document;
11 import tagion.tools.Basic : nobose, noboseln, verbose;
12 import tagion.utils.Term;
13 
14 @safe
15 class DARTUtilSynchronizer : JournalSynchronizer {
16     protected DART source;
17     protected DART destination;
18     this(BlockFile journalfile, DART destination, DART source) {
19         this.source = source;
20         this.destination = destination;
21         super(journalfile);
22     }
23 
24     //
25     // This function emulates the connection between two DART's
26     // in a single thread
27     //
28     const(HiRPC.Receiver) query(ref const(HiRPC.Sender) request) {
29         Document send_request_to_source(const Document foreign_doc) {
30             //
31             // Remote excution
32             // Receive on the foreign end
33             const foreign_receiver = source.hirpc.receive(foreign_doc);
34             // Make query in to the foreign DART
35             const foreign_response = source(foreign_receiver);
36 
37             return foreign_response.toDoc;
38         }
39 
40         immutable foreign_doc = request.toDoc;
41         (() @trusted { fiber.yield; })();
42         // Here a yield loop should be implement to poll for response from the foriegn DART
43         // A timeout should also be implemented in this poll loop
44         const response_doc = send_request_to_source(foreign_doc);
45         //
46         // Process the response returned for the foreign DART
47         //
48         const received = destination.hirpc.receive(response_doc);
49         return received;
50     }
51 
52     override void finish() {
53         //            journalfile.close;
54         _finished = true;
55     }
56 
57 }
58 
59 @safe
60 string[] synchronize(DART destination, DART source, string journal_basename) {
61     string[] journal_filenames;
62     uint count;
63     enum line_width = 32;
64     //    foreach (sector; destination.sectors) {
65     foreach (ushort _rim; 0 .. ubyte.max + 1) {
66         ushort sector = cast(ushort)(_rim << 8);
67         const(ubyte)[] rims;
68         verbose("Sector %04x", sector);
69         immutable journal_filename = format("%s.%04x.dart_journal", journal_basename, sector);
70         BlockFile.create(journal_filename, DART.stringof, BLOCK_SIZE);
71 
72         auto journalfile = BlockFile(journal_filename);
73         scope (exit) {
74             if (!journalfile.empty) {
75                 journal_filenames ~= journal_filename;
76                 verbose("Journalfile %s", journal_filename);
77                 nobose("%s#%s", YELLOW, RESET);
78 
79             }
80             else {
81                 nobose("%sX%s", BLUE, RESET);
82             }
83             count++;
84             if (count % line_width == 0) {
85                 noboseln("!");
86             }
87             journalfile.close;
88         }
89         auto synch = new DARTUtilSynchronizer(journalfile, destination, source);
90 
91         auto destination_synchronizer = destination.synchronizer(synch, Rims([cast(ubyte) _rim]));
92         while (!destination_synchronizer.empty) {
93             (() @trusted { destination_synchronizer.call; })();
94         }
95 
96     }
97     noboseln("Replay journal filenames");
98     verbose("Replay journal filenames");
99     count = 0;
100     foreach (journal_filename; journal_filenames) {
101         destination.replay(journal_filename);
102         verbose("Replay %s", journal_filename);
103         nobose("%s*%s", GREEN, RESET);
104         count++;
105         if (count % line_width == 0) {
106             noboseln("!");
107         }
108     }
109     noboseln("\n%d journal files has been syncronized", count);
110     return journal_filenames;
111 }