1 module tagion.services.replicator;
2 
3 import tagion.actor;
4 import tagion.crypto.SecureInterfaceNet;
5 import tagion.crypto.SecureNet : StdHashNet;
6 import tagion.crypto.Types : Fingerprint;
7 import tagion.dart.Recorder : RecordFactory;
8 import tagion.logger.Logger;
9 import tagion.recorderchain.RecorderChain;
10 import tagion.recorderchain.RecorderChainBlock : RecorderChainBlock, RecorderBlock;
11 import tagion.services.messages;
12 import tagion.utils.Miscellaneous : cutHex;
13 import tagion.basic.Types : FileExtension;
14 import tagion.basic.tagionexceptions;
15 import std.path : buildPath, setExtension;
16 import std.stdio;
17 import std.format;
18 import std.file : append, exists, mkdirRecurse;
19 import tagion.hibon.HiBONFile;
20 
21 
22 @safe
23 struct ReplicatorOptions {
24     import std.format;
25     import tagion.utils.JSONCommon;
26 
27     string folder_path = "./recorder";
28     int new_file_interval = 10_000;
29 
30     void setPrefix(string prefix) nothrow {
31         import std.exception;
32         import std.path : buildPath;
33 
34         folder_path = folder_path ~ prefix;
35         // assumeWontThrow(buildPath(folder_path, prefix));
36     }
37 
38     mixin JSONCommon;
39 }
40 enum modify_log = "modify/replicator";
41 
42 
43 version(NEW_REPLICATOR) {
44 
45 @safe
46 struct ReplicatorService {
47     static Topic modify_recorder = Topic(modify_log);
48 
49     void task(immutable(ReplicatorOptions) opts) {
50         HashNet net = new StdHashNet;
51         RecorderBlock last_block;
52 
53 
54         
55         File file;
56         scope(exit) {
57             file.close;
58         }
59 
60         void receiveRecorder(SendRecorder, immutable(RecordFactory.Recorder) recorder, Fingerprint bullseye, immutable(long) epoch_number) {
61             if (file is File.init || epoch_number % opts.new_file_interval == 0) {
62                 log("going to create new file");
63                 if (file !is File.init) {
64                     file.close;
65                 }
66                 const filename = format("%010d_epoch", epoch_number).setExtension(FileExtension.hibon);
67                 const filepath = buildPath(opts.folder_path, filename);
68                 log.trace("Creating new replicator file %s", filepath);
69 
70                 if (!opts.folder_path.exists) {
71                     mkdirRecurse(opts.folder_path);
72                 }
73 
74                 if (filepath.exists) {
75                     throw new TagionException(format("Error: File %s already exists", filepath));
76                 }
77                 file = File(filepath, "w");
78             }
79             RecorderBlock block;
80             scope(success) {
81                 file.fwrite(block);
82                 file.flush;
83                 last_block = block;
84             }
85 
86             block = RecorderBlock(
87                 recorder.toDoc,
88                 last_block is RecorderBlock.init ? Fingerprint.init : last_block.fingerprint,
89                 bullseye,
90                 epoch_number,
91                 net); 
92 
93             log.trace("Added recorder chain block with hash '%(%02x%)'", block.fingerprint);
94             log(modify_recorder, "modify", recorder);
95         }
96 
97         run(&receiveRecorder);
98 
99 
100 
101     }
102 
103 
104 
105 }
106 
107 
108 
109 
110 
111 
112 } else {
113 
114 
115 @safe
116 struct ReplicatorService {
117     static Topic modify_recorder = Topic(modify_log);
118 
119     void task(immutable(ReplicatorOptions) opts) {
120         HashNet net = new StdHashNet;
121 
122         RecorderChainStorage storage = new RecorderChainFileStorage(opts.folder_path, net);
123         RecorderChain recorder_chain = new RecorderChain(storage);
124 
125         void receiveRecorder(SendRecorder, immutable(RecordFactory.Recorder) recorder, Fingerprint bullseye, immutable(long) epoch_number) {
126             auto last_block = recorder_chain.getLastBlock;
127             auto block = new RecorderChainBlock(
128                     recorder.toDoc,
129                     last_block ? last_block.fingerprint : Fingerprint.init,
130                     bullseye,
131                     epoch_number,
132                     net);
133             recorder_chain.append(block);
134             log.trace("Added recorder chain block with hash '%s'", block.getHash.cutHex);
135             log(modify_recorder, "modify", recorder);
136         }
137 
138         run(&receiveRecorder);
139     }
140 
141 }
142 
143 
144 }
145