1 /// Tagion DART actor service
2 module tagion.services.DART;
3 
4 import std.algorithm : map, filter;
5 import std.array;
6 import std.exception;
7 import std.file;
8 import std.format : format;
9 import std.path : isValidPath;
10 import std.path;
11 import std.stdio;
12 import tagion.actor;
13 import tagion.basic.Types;
14 import tagion.communication.HiRPC;
15 import tagion.crypto.SecureInterfaceNet;
16 import tagion.crypto.SecureNet;
17 import tagion.crypto.Types;
18 import tagion.dart.DART;
19 import tagion.dart.DARTBasic : DARTIndex, dartIndex;
20 import tagion.dart.DARTException;
21 import tagion.dart.Recorder;
22 import tagion.hibon.Document;
23 import tagion.hibon.HiBONRecord : isRecord;
24 import tagion.logger.Logger;
25 import tagion.services.messages;
26 import tagion.services.options : TaskNames;
27 import tagion.services.replicator;
28 import tagion.utils.JSONCommon;
29 import tagion.utils.pretend_safe_concurrency;
30 
31 @safe
32 struct DARTOptions {
33     string folder_path = buildPath(".");
34     string dart_filename = "dart".setExtension(FileExtension.dart);
35     string dart_path;
36 
37     this(string folder_path, string dart_filename) {
38         this.folder_path = folder_path;
39         this.dart_filename = dart_filename;
40         dart_path = buildPath(folder_path, dart_filename);
41     }
42 
43     void setPrefix(string prefix) nothrow {
44         dart_filename = prefix ~ dart_filename;
45         dart_path = buildPath(folder_path, dart_filename);
46     }
47 
48     mixin JSONCommon;
49 }
50 
51 @safe
52 struct DARTService {
53     void task(immutable(DARTOptions) opts,
54             immutable(TaskNames) task_names,
55             shared(StdSecureNet) shared_net,
56             bool trt_enable) {
57 
58         DART db;
59         Exception dart_exception;
60         const net = new StdSecureNet(shared_net);
61         db = new DART(net, opts.dart_path);
62         if (dart_exception !is null) {
63             throw dart_exception;
64         }
65 
66         ActorHandle replicator_handle = ActorHandle(task_names.replicator);
67         ActorHandle trt_handle = ActorHandle(task_names.trt);
68 
69         scope (exit) {
70             db.close();
71         }
72 
73         void read(dartReadRR req, immutable(DARTIndex)[] fingerprints) @safe {
74             import std.algorithm;
75             import tagion.hibon.HiBONtoText;
76             import tagion.utils.Miscellaneous;
77 
78             RecordFactory.Recorder read_recorder = db.loads(fingerprints);
79             req.respond(RecordFactory.uniqueRecorder(read_recorder));
80         }
81 
82         void checkRead(dartCheckReadRR req, immutable(DARTIndex)[] fingerprints) @safe {
83             immutable(DARTIndex)[] check_read = (() @trusted => cast(immutable) db.checkload(fingerprints))();
84             log("after checkread response");
85 
86             req.respond(check_read);
87         }
88 
89         log("Starting dart with %(%02x%)", db.bullseye);
90 
91         auto hirpc = HiRPC(net);
92 
93         void dartHiRPC(dartHiRPCRR req, Document doc) {
94             import tagion.hibon.HiBONJSON;
95 
96             log("Received HiRPC request");
97 
98             if (!doc.isRecord!(HiRPC.Sender)) {
99                 log("wrong request sent to dartservice. Expected HiRPC.Sender got %s", doc.toPretty);
100                 return;
101             }
102 
103             immutable receiver = hirpc.receive(doc);
104 
105             if (receiver.method.name == "search") {
106                 log("SEARCH REQUEST");
107 
108                 auto owner_doc = receiver.method.params;
109                 Buffer[] owner_pkeys;
110                 foreach (owner; owner_doc[]) {
111                     owner_pkeys ~= owner.get!Buffer;
112                 }
113                 auto res = db.search(owner_pkeys, net);
114 
115                 Document response = hirpc.result(receiver, Document(res)).toDoc;
116                 req.respond(response);
117                 return;
118             }
119             if (!(receiver.method.name == DART.Queries.dartRead
120                     || receiver.method.name == DART.Queries.dartRim
121                     || receiver.method.name == DART.Queries.dartBullseye
122                     || receiver.method.name == DART.Queries.dartCheckRead)) {
123                 log("unsupported request");
124                 return;
125             }
126 
127             Document result = db(receiver, false).toDoc;
128             log("darthirpc response: %s", result.toPretty);
129             req.respond(result);
130         }
131 
132         void modify(dartModifyRR req, immutable(RecordFactory.Recorder) recorder, immutable(long) epoch_number) @trusted {
133 
134             log("Received modify request with length=%s", recorder.length);
135 
136             immutable fingerprint_before = Fingerprint(db.bullseye);
137             import core.exception : AssertError;
138 
139             try {
140 
141                 auto eye = db.modify(recorder);
142                 log("New bullseye is %(%02x%)", eye);
143 
144                 req.respond(eye);
145                 replicator_handle.send(SendRecorder(), recorder, eye, epoch_number);
146                 if (trt_enable) {
147                     trt_handle.send(trtModify(), recorder);
148                 }
149             }
150             catch (AssertError e) {
151                 log("Received ASSERT ERROR bullseye before %(%02x%), %s archives that were tried to be added \n%s", fingerprint_before, e, recorder
152                         .toPretty);
153                 fail(e);
154             }
155             catch (Error e) {
156                 log.error("DART Error %s", e);
157             }
158 
159         }
160 
161         void bullseye(dartBullseyeRR req) @safe {
162             auto eye = Fingerprint(db.bullseye);
163             req.respond(eye);
164         }
165 
166         run(&modify, &read, &checkRead, &bullseye, &dartHiRPC);
167 
168     }
169 }