1 // Transaction reverse table service using a DART 2 module tagion.services.TRTService; 3 import tagion.services.options : TaskNames; 4 5 import std.algorithm : map, filter; 6 import std.array; 7 import std.exception; 8 import std.file; 9 import std.format : format; 10 import std.path : isValidPath; 11 import std.path; 12 import std.stdio; 13 import std.range : enumerate; 14 import tagion.actor; 15 import tagion.basic.Types : FileExtension; 16 import tagion.communication.HiRPC; 17 import tagion.crypto.SecureInterfaceNet; 18 import tagion.crypto.SecureNet; 19 import tagion.crypto.Types; 20 import tagion.dart.DART; 21 import tagion.dart.DARTBasic : DARTIndex, dartIndex, dartKey; 22 import tagion.dart.DARTException; 23 import tagion.dart.Recorder; 24 import tagion.hibon.Document; 25 import tagion.hibon.HiBONRecord : isRecord; 26 import tagion.logger.Logger; 27 import tagion.services.messages; 28 import tagion.services.options : TaskNames; 29 import tagion.services.replicator; 30 import tagion.utils.JSONCommon; 31 import tagion.utils.pretend_safe_concurrency; 32 import tagion.basic.Types; 33 import tagion.trt.TRT; 34 import tagion.hibon.HiBON; 35 import tagion.script.standardnames; 36 import tagion.script.common : TagionBill; 37 38 @safe 39 struct TRTOptions { 40 bool enable = false; 41 string folder_path = buildPath("."); 42 string trt_filename = "trt".setExtension(FileExtension.dart); 43 string trt_path; 44 45 this(string folder_path, string trt_filename) { 46 this.folder_path = folder_path; 47 this.trt_filename = trt_filename; 48 trt_path = buildPath(folder_path, trt_filename); 49 } 50 51 void setPrefix(string prefix) nothrow { 52 trt_filename = prefix ~ trt_filename; 53 trt_path = buildPath(folder_path, trt_filename); 54 } 55 56 mixin JSONCommon; 57 } 58 59 @safe 60 struct TRTService { 61 void task(immutable(TRTOptions) opts, immutable(TaskNames) task_names, shared(StdSecureNet) shared_net) { 62 DART trt_db; 63 Exception dart_exception; 64 65 const net = new StdSecureNet(shared_net); 66 auto rec_factory = RecordFactory(net); 67 auto hirpc = HiRPC(net); 68 ActorHandle dart_handle = ActorHandle(task_names.dart); 69 70 log("TRT PATH FOR DATABASE=%s", opts.trt_path); 71 trt_db = new DART(net, opts.trt_path); 72 if (dart_exception !is null) { 73 throw dart_exception; 74 } 75 76 scope (exit) { 77 trt_db.close(); 78 } 79 80 struct TRTRequest { 81 trtHiRPCRR req; 82 Document doc; 83 } 84 85 TRTRequest[uint] requests; 86 87 log("%s, starting trt with %(%02x%)", opts.trt_path, trt_db.bullseye); 88 89 void receive_recorder(dartReadRR.Response res, immutable(RecordFactory.Recorder) recorder) { 90 log("received recorder from dartread"); 91 if (!(res.id in requests)) { 92 return; 93 } 94 HiBON params = new HiBON; 95 foreach (i, bill; recorder[].enumerate) { 96 params[i] = bill.filed; 97 } 98 99 auto client_request = requests[res.id]; 100 scope (exit) { 101 requests.remove(res.id); 102 } 103 104 immutable receiver = hirpc.receive(client_request.doc); 105 106 Document response = hirpc.result(receiver, params).toDoc; 107 client_request.req.respond(response); 108 } 109 110 void trt_read(trtHiRPCRR client_req, Document doc) { 111 log("received trt request"); 112 if (!doc.isRecord!(HiRPC.Sender)) { 113 return; 114 } 115 log("before hirpc"); 116 immutable receiver = hirpc.receive(doc); 117 if (receiver.method.name != "search") { 118 log("not a HIRPC"); 119 // return hirpc error instead; 120 return; 121 } 122 log("before owner doc"); 123 auto owner_doc = receiver.method.params; 124 if (owner_doc[].empty) { 125 log("the owner doc was empty"); 126 // return hirpc error instead; 127 return; 128 } 129 130 log("before creating indices"); 131 auto owner_indices = owner_doc[] 132 .map!(owner => net.dartKey(TRTLabel, Pubkey(owner.get!Buffer))) 133 .array; 134 135 import std.algorithm; 136 137 owner_indices.each!(o => writefln("%(%02x%)", o)); 138 139 auto trt_read_recorder = trt_db.loads(owner_indices); 140 immutable indices = trt_read_recorder[].map!(a => cast(immutable)(a.dart_index)).array; 141 if (indices.empty) { 142 // return hirpc error instead; 143 return; 144 } 145 146 log("sending dartread request"); 147 auto dart_req = dartReadRR(); 148 requests[dart_req.id] = TRTRequest(client_req, doc); 149 150 dart_handle.send(dart_req, indices); 151 } 152 153 void modify(trtModify, immutable(RecordFactory.Recorder) dart_recorder) { 154 log("received modify request from dart"); 155 auto trt_recorder = rec_factory.recorder; 156 157 // get a recorder from all the dartkeys already in the db for the function 158 auto index_lookup = dart_recorder[] 159 .filter!(a => a.filed.isRecord!TagionBill) 160 .map!(a => TagionBill(a.filed)) 161 .map!(t => net.dartKey(TRTLabel, Pubkey(t.owner))); 162 163 auto already_in_dart = trt_db.loads(index_lookup); 164 165 createTRTUpdateRecorder(dart_recorder, already_in_dart, trt_recorder, net); 166 log("trt recorder modify %s", trt_recorder.toPretty); 167 trt_db.modify(trt_recorder); 168 } 169 170 run(&modify, &trt_read, &receive_recorder); 171 172 } 173 174 }