1 // Transaction reverse table service using a DART
2 module tagion.services.TRTService;
3 import tagion.services.options : TaskNames;
4 
5 import std.algorithm : map, filter;
6 import std.array;
7 import std.exception;
8 import std.file;
9 import std.format : format;
10 import std.path : isValidPath;
11 import std.path;
12 import std.stdio;
13 import std.range : enumerate;
14 import tagion.actor;
15 import tagion.basic.Types : FileExtension;
16 import tagion.communication.HiRPC;
17 import tagion.crypto.SecureInterfaceNet;
18 import tagion.crypto.SecureNet;
19 import tagion.crypto.Types;
20 import tagion.dart.DART;
21 import tagion.dart.DARTBasic : DARTIndex, dartIndex, dartKey;
22 import tagion.dart.DARTException;
23 import tagion.dart.Recorder;
24 import tagion.hibon.Document;
25 import tagion.hibon.HiBONRecord : isRecord;
26 import tagion.logger.Logger;
27 import tagion.services.messages;
28 import tagion.services.options : TaskNames;
29 import tagion.services.replicator;
30 import tagion.utils.JSONCommon;
31 import tagion.utils.pretend_safe_concurrency;
32 import tagion.basic.Types;
33 import tagion.trt.TRT;
34 import tagion.hibon.HiBON;
35 import tagion.script.standardnames;
36 import tagion.script.common : TagionBill;
37 
38 @safe
39 struct TRTOptions {
40     bool enable = false;
41     string folder_path = buildPath(".");
42     string trt_filename = "trt".setExtension(FileExtension.dart);
43     string trt_path;
44 
45     this(string folder_path, string trt_filename) {
46         this.folder_path = folder_path;
47         this.trt_filename = trt_filename;
48         trt_path = buildPath(folder_path, trt_filename);
49     }
50 
51     void setPrefix(string prefix) nothrow {
52         trt_filename = prefix ~ trt_filename;
53         trt_path = buildPath(folder_path, trt_filename);
54     }
55 
56     mixin JSONCommon;
57 }
58 
59 @safe
60 struct TRTService {
61     void task(immutable(TRTOptions) opts, immutable(TaskNames) task_names, shared(StdSecureNet) shared_net) {
62         DART trt_db;
63         Exception dart_exception;
64 
65         const net = new StdSecureNet(shared_net);
66         auto rec_factory = RecordFactory(net);
67         auto hirpc = HiRPC(net);
68         ActorHandle dart_handle = ActorHandle(task_names.dart);
69 
70         log("TRT PATH FOR DATABASE=%s", opts.trt_path);
71         trt_db = new DART(net, opts.trt_path);
72         if (dart_exception !is null) {
73             throw dart_exception;
74         }
75 
76         scope (exit) {
77             trt_db.close();
78         }
79 
80         struct TRTRequest {
81             trtHiRPCRR req;
82             Document doc;
83         }
84 
85         TRTRequest[uint] requests;
86 
87         log("%s, starting trt with %(%02x%)", opts.trt_path, trt_db.bullseye);
88 
89         void receive_recorder(dartReadRR.Response res, immutable(RecordFactory.Recorder) recorder) {
90             log("received recorder from dartread");
91             if (!(res.id in requests)) {
92                 return;
93             }
94             HiBON params = new HiBON;
95             foreach (i, bill; recorder[].enumerate) {
96                 params[i] = bill.filed;
97             }
98 
99             auto client_request = requests[res.id];
100             scope (exit) {
101                 requests.remove(res.id);
102             }
103 
104             immutable receiver = hirpc.receive(client_request.doc);
105 
106             Document response = hirpc.result(receiver, params).toDoc;
107             client_request.req.respond(response);
108         }
109 
110         void trt_read(trtHiRPCRR client_req, Document doc) {
111             log("received trt request");
112             if (!doc.isRecord!(HiRPC.Sender)) {
113                 return;
114             }
115             log("before hirpc");
116             immutable receiver = hirpc.receive(doc);
117             if (receiver.method.name != "search") {
118                 log("not a HIRPC");
119                 // return hirpc error instead;
120                 return;
121             }
122             log("before owner doc");
123             auto owner_doc = receiver.method.params;
124             if (owner_doc[].empty) {
125                 log("the owner doc was empty");
126                 // return hirpc error instead;
127                 return;
128             }
129 
130             log("before creating indices");
131             auto owner_indices = owner_doc[]
132                 .map!(owner => net.dartKey(TRTLabel, Pubkey(owner.get!Buffer)))
133                 .array;
134 
135             import std.algorithm;
136 
137             owner_indices.each!(o => writefln("%(%02x%)", o));
138 
139             auto trt_read_recorder = trt_db.loads(owner_indices);
140             immutable indices = trt_read_recorder[].map!(a => cast(immutable)(a.dart_index)).array;
141             if (indices.empty) {
142                 // return hirpc error instead;
143                 return;
144             }
145 
146             log("sending dartread request");
147             auto dart_req = dartReadRR();
148             requests[dart_req.id] = TRTRequest(client_req, doc);
149 
150             dart_handle.send(dart_req, indices);
151         }
152 
153         void modify(trtModify, immutable(RecordFactory.Recorder) dart_recorder) {
154             log("received modify request from dart");
155             auto trt_recorder = rec_factory.recorder;
156 
157             // get a recorder from all the dartkeys already in the db for the function
158             auto index_lookup = dart_recorder[]
159                 .filter!(a => a.filed.isRecord!TagionBill)
160                 .map!(a => TagionBill(a.filed))
161                 .map!(t => net.dartKey(TRTLabel, Pubkey(t.owner)));
162 
163             auto already_in_dart = trt_db.loads(index_lookup);
164 
165             createTRTUpdateRecorder(dart_recorder, already_in_dart, trt_recorder, net);
166             log("trt recorder modify %s", trt_recorder.toPretty);
167             trt_db.modify(trt_recorder);
168         }
169 
170         run(&modify, &trt_read, &receive_recorder);
171 
172     }
173 
174 }