1 /// Tagion DART actor service 2 module tagion.services.DART; 3 4 import std.algorithm : map, filter; 5 import std.array; 6 import std.exception; 7 import std.file; 8 import std.format : format; 9 import std.path : isValidPath; 10 import std.path; 11 import std.stdio; 12 import tagion.actor; 13 import tagion.basic.Types; 14 import tagion.communication.HiRPC; 15 import tagion.crypto.SecureInterfaceNet; 16 import tagion.crypto.SecureNet; 17 import tagion.crypto.Types; 18 import tagion.dart.DART; 19 import tagion.dart.DARTBasic : DARTIndex, dartIndex; 20 import tagion.dart.DARTException; 21 import tagion.dart.Recorder; 22 import tagion.hibon.Document; 23 import tagion.hibon.HiBONRecord : isRecord; 24 import tagion.logger.Logger; 25 import tagion.services.messages; 26 import tagion.services.options : TaskNames; 27 import tagion.services.replicator; 28 import tagion.utils.JSONCommon; 29 import tagion.utils.pretend_safe_concurrency; 30 31 @safe 32 struct DARTOptions { 33 string folder_path = buildPath("."); 34 string dart_filename = "dart".setExtension(FileExtension.dart); 35 string dart_path; 36 37 this(string folder_path, string dart_filename) { 38 this.folder_path = folder_path; 39 this.dart_filename = dart_filename; 40 dart_path = buildPath(folder_path, dart_filename); 41 } 42 43 void setPrefix(string prefix) nothrow { 44 dart_filename = prefix ~ dart_filename; 45 dart_path = buildPath(folder_path, dart_filename); 46 } 47 48 mixin JSONCommon; 49 } 50 51 @safe 52 struct DARTService { 53 void task(immutable(DARTOptions) opts, 54 immutable(TaskNames) task_names, 55 shared(StdSecureNet) shared_net, 56 bool trt_enable) { 57 58 DART db; 59 Exception dart_exception; 60 const net = new StdSecureNet(shared_net); 61 db = new DART(net, opts.dart_path); 62 if (dart_exception !is null) { 63 throw dart_exception; 64 } 65 66 ActorHandle replicator_handle = ActorHandle(task_names.replicator); 67 ActorHandle trt_handle = ActorHandle(task_names.trt); 68 69 scope (exit) { 70 db.close(); 71 } 72 73 void read(dartReadRR req, immutable(DARTIndex)[] fingerprints) @safe { 74 import std.algorithm; 75 import tagion.hibon.HiBONtoText; 76 import tagion.utils.Miscellaneous; 77 78 RecordFactory.Recorder read_recorder = db.loads(fingerprints); 79 req.respond(RecordFactory.uniqueRecorder(read_recorder)); 80 } 81 82 void checkRead(dartCheckReadRR req, immutable(DARTIndex)[] fingerprints) @safe { 83 immutable(DARTIndex)[] check_read = (() @trusted => cast(immutable) db.checkload(fingerprints))(); 84 log("after checkread response"); 85 86 req.respond(check_read); 87 } 88 89 log("Starting dart with %(%02x%)", db.bullseye); 90 91 auto hirpc = HiRPC(net); 92 93 void dartHiRPC(dartHiRPCRR req, Document doc) { 94 import tagion.hibon.HiBONJSON; 95 96 log("Received HiRPC request"); 97 98 if (!doc.isRecord!(HiRPC.Sender)) { 99 log("wrong request sent to dartservice. Expected HiRPC.Sender got %s", doc.toPretty); 100 return; 101 } 102 103 immutable receiver = hirpc.receive(doc); 104 105 if (receiver.method.name == "search") { 106 log("SEARCH REQUEST"); 107 108 auto owner_doc = receiver.method.params; 109 Buffer[] owner_pkeys; 110 foreach (owner; owner_doc[]) { 111 owner_pkeys ~= owner.get!Buffer; 112 } 113 auto res = db.search(owner_pkeys, net); 114 115 Document response = hirpc.result(receiver, Document(res)).toDoc; 116 req.respond(response); 117 return; 118 } 119 if (!(receiver.method.name == DART.Queries.dartRead 120 || receiver.method.name == DART.Queries.dartRim 121 || receiver.method.name == DART.Queries.dartBullseye 122 || receiver.method.name == DART.Queries.dartCheckRead)) { 123 log("unsupported request"); 124 return; 125 } 126 127 Document result = db(receiver, false).toDoc; 128 log("darthirpc response: %s", result.toPretty); 129 req.respond(result); 130 } 131 132 void modify(dartModifyRR req, immutable(RecordFactory.Recorder) recorder, immutable(long) epoch_number) @trusted { 133 134 log("Received modify request with length=%s", recorder.length); 135 136 immutable fingerprint_before = Fingerprint(db.bullseye); 137 import core.exception : AssertError; 138 139 try { 140 141 auto eye = db.modify(recorder); 142 log("New bullseye is %(%02x%)", eye); 143 144 req.respond(eye); 145 replicator_handle.send(SendRecorder(), recorder, eye, epoch_number); 146 if (trt_enable) { 147 trt_handle.send(trtModify(), recorder); 148 } 149 } 150 catch (AssertError e) { 151 log("Received ASSERT ERROR bullseye before %(%02x%), %s archives that were tried to be added \n%s", fingerprint_before, e, recorder 152 .toPretty); 153 fail(e); 154 } 155 catch (Error e) { 156 log.error("DART Error %s", e); 157 } 158 159 } 160 161 void bullseye(dartBullseyeRR req) @safe { 162 auto eye = Fingerprint(db.bullseye); 163 req.respond(eye); 164 } 165 166 run(&modify, &read, &checkRead, &bullseye, &dartHiRPC); 167 168 } 169 }