1 module tagion.services.replicator; 2 3 import tagion.actor; 4 import tagion.crypto.SecureInterfaceNet; 5 import tagion.crypto.SecureNet : StdHashNet; 6 import tagion.crypto.Types : Fingerprint; 7 import tagion.dart.Recorder : RecordFactory; 8 import tagion.logger.Logger; 9 import tagion.recorderchain.RecorderChain; 10 import tagion.recorderchain.RecorderChainBlock : RecorderChainBlock, RecorderBlock; 11 import tagion.services.messages; 12 import tagion.utils.Miscellaneous : cutHex; 13 import tagion.basic.Types : FileExtension; 14 import tagion.basic.tagionexceptions; 15 import std.path : buildPath, setExtension; 16 import std.stdio; 17 import std.format; 18 import std.file : append, exists, mkdirRecurse; 19 import tagion.hibon.HiBONFile; 20 21 22 @safe 23 struct ReplicatorOptions { 24 import std.format; 25 import tagion.utils.JSONCommon; 26 27 string folder_path = "./recorder"; 28 int new_file_interval = 10_000; 29 30 void setPrefix(string prefix) nothrow { 31 import std.exception; 32 import std.path : buildPath; 33 34 folder_path = folder_path ~ prefix; 35 // assumeWontThrow(buildPath(folder_path, prefix)); 36 } 37 38 mixin JSONCommon; 39 } 40 enum modify_log = "modify/replicator"; 41 42 43 version(NEW_REPLICATOR) { 44 45 @safe 46 struct ReplicatorService { 47 static Topic modify_recorder = Topic(modify_log); 48 49 void task(immutable(ReplicatorOptions) opts) { 50 HashNet net = new StdHashNet; 51 RecorderBlock last_block; 52 53 54 55 File file; 56 scope(exit) { 57 file.close; 58 } 59 60 void receiveRecorder(SendRecorder, immutable(RecordFactory.Recorder) recorder, Fingerprint bullseye, immutable(long) epoch_number) { 61 if (file is File.init || epoch_number % opts.new_file_interval == 0) { 62 log("going to create new file"); 63 if (file !is File.init) { 64 file.close; 65 } 66 const filename = format("%010d_epoch", epoch_number).setExtension(FileExtension.hibon); 67 const filepath = buildPath(opts.folder_path, filename); 68 log.trace("Creating new replicator file %s", filepath); 69 70 if (!opts.folder_path.exists) { 71 mkdirRecurse(opts.folder_path); 72 } 73 74 if (filepath.exists) { 75 throw new TagionException(format("Error: File %s already exists", filepath)); 76 } 77 file = File(filepath, "w"); 78 } 79 RecorderBlock block; 80 scope(success) { 81 file.fwrite(block); 82 file.flush; 83 last_block = block; 84 } 85 86 block = RecorderBlock( 87 recorder.toDoc, 88 last_block is RecorderBlock.init ? Fingerprint.init : last_block.fingerprint, 89 bullseye, 90 epoch_number, 91 net); 92 93 log.trace("Added recorder chain block with hash '%(%02x%)'", block.fingerprint); 94 log(modify_recorder, "modify", recorder); 95 } 96 97 run(&receiveRecorder); 98 99 100 101 } 102 103 104 105 } 106 107 108 109 110 111 112 } else { 113 114 115 @safe 116 struct ReplicatorService { 117 static Topic modify_recorder = Topic(modify_log); 118 119 void task(immutable(ReplicatorOptions) opts) { 120 HashNet net = new StdHashNet; 121 122 RecorderChainStorage storage = new RecorderChainFileStorage(opts.folder_path, net); 123 RecorderChain recorder_chain = new RecorderChain(storage); 124 125 void receiveRecorder(SendRecorder, immutable(RecordFactory.Recorder) recorder, Fingerprint bullseye, immutable(long) epoch_number) { 126 auto last_block = recorder_chain.getLastBlock; 127 auto block = new RecorderChainBlock( 128 recorder.toDoc, 129 last_block ? last_block.fingerprint : Fingerprint.init, 130 bullseye, 131 epoch_number, 132 net); 133 recorder_chain.append(block); 134 log.trace("Added recorder chain block with hash '%s'", block.getHash.cutHex); 135 log(modify_recorder, "modify", recorder); 136 } 137 138 run(&receiveRecorder); 139 } 140 141 } 142 143 144 } 145