1 module tagion.testbench.services.DARTService;
2 
3 import core.time;
4 import std.algorithm;
5 import std.array;
6 import std.file : exists, remove;
7 import std.path;
8 import std.stdio;
9 import std.typecons : Tuple;
10 import tagion.actor;
11 import tagion.behaviour;
12 import tagion.dart.DARTBasic : DARTIndex;
13 import tagion.dart.Recorder;
14 import tagion.hibon.Document;
15 import tagion.services.DART;
16 import tagion.services.messages;
17 import tagion.testbench.dart.dart_helper_functions;
18 import tagion.testbench.tools.Environment;
19 import tagion.utils.pretend_safe_concurrency : receiveOnly, receiveTimeout, register, thisTid;
20 
21 // import tagion.crypto.SecureNet;
22 import std.random;
23 import tagion.Keywords;
24 import tagion.basic.Types;
25 import tagion.communication.HiRPC;
26 import tagion.crypto.SecureInterfaceNet;
27 import tagion.crypto.SecureNet : StdHashNet, StdSecureNet;
28 import tagion.crypto.Types;
29 import tagion.dart.DART;
30 import tagion.dart.DARTBasic;
31 import tagion.dart.DARTFile : DARTFile;
32 import tagion.dart.DARTcrud : dartBullseye, dartCheckRead, dartRead;
33 import tagion.hibon.HiBONJSON;
34 import tagion.hibon.HiBONRecord;
35 import tagion.logger.LogRecords : LogInfo;
36 import tagion.logger.Logger;
37 import tagion.services.DARTInterface;
38 import tagion.services.TRTService;
39 import tagion.services.replicator;
40 import tagion.services.replicator : modify_log;
41 import tagion.testbench.actor.util;
42 import std.format;
43 
44 enum feature = Feature(
45             "see if we can read and write trough the dartservice",
46             []);
47 
48 alias FeatureContext = Tuple!(
49         WriteAndReadFromDartDb, "WriteAndReadFromDartDb",
50         FeatureGroup*, "result"
51 );
52 
53 @safe
54 struct DARTWorker {
55     void task(string sock_addr, Document doc, bool shouldError) @trusted {
56         import nngd;
57 
58         int rc;
59         NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_REQ);
60         s.recvtimeout = 1000.msecs;
61 
62         setState(Ctrl.ALIVE);
63         while (!thisActor.stop) {
64             const received = receiveTimeout(
65                     Duration.zero,
66                     &signal,
67                     &ownerTerminated,
68                     &unknown
69             );
70 
71             writefln("REQ %s to dial...", doc.toPretty);
72             rc = s.dial(sock_addr);
73             if (rc == 0)
74                 break;
75             writefln("REQ dial error %s", rc);
76             if (rc == nng_errno.NNG_ECONNREFUSED) {
77                 nng_sleep(100.msecs);
78             }
79             check(rc == 0, "NNG error");
80         }
81         while (!thisActor.stop) {
82             const received = receiveTimeout(
83                     Duration.zero,
84                     &signal,
85                     &ownerTerminated,
86                     &unknown
87             );
88             if (received) {
89                 continue;
90             }
91             rc = s.send!(immutable(ubyte[]))(doc.serialize);
92             check(rc == 0, "NNG error");
93             writefln("sent req");
94             Document received_doc = s.receive!(immutable(ubyte[]))();
95             thisActor.stop = true;
96             check(s.errno == 0, format("Received not valid response from nng", s.errno));
97 
98             HiRPC hirpc = HiRPC(null);
99             auto received_hirpc = hirpc.receive(received_doc);
100             if (!shouldError) {
101                 check(!received_hirpc.isError, format("received hirpc error: %s", received_doc.toPretty));
102             }
103             else {
104                 check(received_hirpc.isError, format("Should have thrown error got: %s", received_doc.toPretty));
105             }
106 
107 
108 
109         }
110     }
111 }
112 
113 @safe @Scenario("write and read from dart db",
114         [])
115 class WriteAndReadFromDartDb {
116 
117     ActorHandle handle;
118     ActorHandle dart_interface_handle;
119     ActorHandle replicator_handle;
120     DARTInterfaceOptions interface_opts;
121     TRTOptions trt_options;
122 
123     SecureNet supervisor_net;
124     DARTOptions opts;
125     ReplicatorOptions replicator_opts;
126     Mt19937 gen;
127     RandomArchives random_archives;
128     Document[] docs;
129     RecordFactory.Recorder insert_recorder;
130     RecordFactory record_factory;
131     HiRPC hirpc;
132 
133     struct SimpleDoc {
134         ulong n;
135         mixin HiBONRecord!(q{
136             this(ulong n) {
137                 this.n = n;
138             }
139         });
140     }
141 
142     this(DARTOptions opts, ReplicatorOptions replicator_opts, TRTOptions trt_options) {
143 
144         this.opts = opts;
145         this.replicator_opts = replicator_opts;
146         this.trt_options = trt_options;
147         supervisor_net = new StdSecureNet();
148         supervisor_net.generateKeyPair("supervisor very secret");
149 
150         record_factory = RecordFactory(supervisor_net);
151         hirpc = HiRPC(supervisor_net);
152 
153         gen = Mt19937(1234);
154 
155     }
156 
157     @Given("I have a dart db")
158     Document dartDb() {
159         if (opts.dart_path.exists) {
160             opts.dart_path.remove;
161         }
162 
163         auto hash_net = new StdHashNet;
164         DART.create(opts.dart_path, hash_net);
165         return result_ok;
166     }
167 
168     @Given("I have an dart actor with said db")
169     Document saidDb() {
170         thisActor.task_name = "dart_supervisor";
171         register(thisActor.task_name, thisTid);
172 
173         import tagion.services.options : TaskNames;
174 
175         writeln("DART task name", TaskNames().dart);
176 
177         auto net = new StdSecureNet();
178         net.generateKeyPair("dartnet very secret");
179 
180         handle = (() @trusted => spawn!DARTService(TaskNames().dart, cast(immutable) opts, TaskNames(), cast(
181                 shared) net, false))();
182 
183         replicator_handle = (() @trusted => spawn!ReplicatorService(
184                 TaskNames().replicator,
185                 cast(immutable) replicator_opts))();
186 
187         interface_opts.setDefault;
188         writeln(interface_opts.sock_addr);
189 
190         dart_interface_handle = (() @trusted => spawn(immutable(DARTInterfaceService)(cast(immutable) interface_opts, cast(immutable) trt_options, TaskNames()), "DartInterfaceService"))();
191 
192         waitforChildren(Ctrl.ALIVE, 3.seconds);
193 
194         return result_ok;
195     }
196 
197     @When("I send a dartModify command with a recorder containing changes to add")
198     Document toAdd() {
199         log.registerSubscriptionTask(thisActor.task_name);
200         submask.subscribe(modify_log);
201 
202         foreach (i; 0 .. 100) {
203             gen.popFront;
204             random_archives = RandomArchives(gen.front, 4, 10);
205             insert_recorder = record_factory.recorder;
206             docs = (() @trusted => cast(Document[]) random_archives.values.map!(a => SimpleDoc(a).toDoc).array)();
207 
208             insert_recorder.insert(docs, Archive.Type.ADD);
209             auto modify_send = dartModifyRR();
210             (() @trusted => handle.send(modify_send, cast(immutable) insert_recorder, immutable long(i)))();
211 
212             auto modify = receiveOnlyTimeout!(dartModifyRR.Response, Fingerprint);
213 
214             auto modify_log_result = receiveOnlyTimeout!(LogInfo, const(Document));
215             check(modify_log_result[1].isRecord!(RecordFactory.Recorder), "Did not receive recorder");
216 
217             handle.send(dartBullseyeRR());
218             const bullseye_res = receiveOnly!(dartBullseyeRR.Response, Fingerprint);
219             check(bullseye_res[1]!is Fingerprint.init, "bullseyes not the same");
220 
221             Document bullseye_sender = dartBullseye(hirpc).toDoc;
222 
223             handle.send(dartHiRPCRR(), bullseye_sender);
224             // writefln("SENDER: %s", bullseye_sender.toPretty);
225             auto hirpc_bullseye_res = receiveOnly!(dartHiRPCRR.Response, Document);
226             // writefln("RECEIVER %s", hirpc_bullseye_res[1].toPretty);
227 
228             auto hirpc_bullseye_receiver = hirpc.receive(hirpc_bullseye_res[1]);
229             auto hirpc_message = hirpc_bullseye_receiver.message[Keywords.result].get!Document;
230             auto hirpc_bullseye = hirpc_message[DARTFile.Params.bullseye].get!DARTIndex;
231             check(bullseye_res[1] == hirpc_bullseye, "hirpc bullseye not the same");
232 
233             /// read the archives
234             auto dart_indices = docs
235                 .map!(d => supervisor_net.dartIndex(d))
236                 .array;
237 
238             auto read_request = dartReadRR();
239             handle.send(read_request, dart_indices);
240             auto read_tuple = receiveOnly!(dartReadRR.Response, immutable(RecordFactory.Recorder));
241             auto read_recorder = read_tuple[1];
242 
243             check(equal(read_recorder[].map!(a => a.filed), insert_recorder[].map!(a => a.filed)), "Data not the same");
244 
245             Document read_sender = dartRead(dart_indices, hirpc).toDoc;
246 
247             handle.send(dartHiRPCRR(), read_sender);
248 
249             auto read_hirpc = receiveOnly!(dartHiRPCRR.Response, Document);
250             auto read_hirpc_recorder = hirpc.receive(read_hirpc[1]);
251             auto hirpc_recorder_message = read_hirpc_recorder.message[Keywords.result].get!Document;
252 
253             const hirpc_recorder = record_factory.recorder(hirpc_recorder_message);
254 
255             check(equal(hirpc_recorder[].map!(a => a.filed), insert_recorder[].map!(a => a.filed)), "hirpc data not the same as insertion");
256 
257             Document check_read_sender = dartCheckRead(dart_indices, hirpc).toDoc;
258             handle.send(dartHiRPCRR(), check_read_sender);
259             auto read_check_tuple = receiveOnly!(dartHiRPCRR.Response, Document);
260             auto read_check = hirpc.receive(read_check_tuple[1]);
261 
262             auto check_dart_indices = read_check.response.result[DART.Params.dart_indices].get!Document[].map!(
263                     d => d.get!DARTIndex).array;
264 
265             check(check_dart_indices.length == 0, "should be empty");
266 
267         }
268         submask.unsubscribe(modify_log);
269 
270         auto dummy_indexes = [DARTIndex([1, 2, 3, 4]), DARTIndex([2, 3, 4, 5])];
271         Document check_read_sender = dartCheckRead(dummy_indexes, hirpc).toDoc;
272         writefln("read_sender %s", check_read_sender.toPretty);
273         handle.send(dartHiRPCRR(), check_read_sender);
274         auto read_check_tuple = receiveOnly!(dartHiRPCRR.Response, Document);
275         auto read_check = hirpc.receive(read_check_tuple[1]);
276 
277         auto check_dart_indices = read_check.response.result[DART.Params.dart_indices].get!Document[].map!(d => d.get!DARTIndex)
278             .array;
279 
280         check(equal(check_dart_indices, dummy_indexes), "error in hirpc checkread");
281 
282         auto t1 = spawn!DARTWorker("dartworker1", interface_opts.sock_addr, check_read_sender, false);
283         auto t2 = spawn!DARTWorker("dartworker2", interface_opts.sock_addr, check_read_sender, false);
284         auto t3 = spawn!DARTWorker("dartworker3", interface_opts.sock_addr, check_read_sender, false);
285 
286         // send a message that should fail
287         auto t4 = spawn!DARTWorker("dartworker4", interface_opts.sock_addr, read_check_tuple[1], true);
288 
289         import core.thread;
290 
291         (() @trusted => Thread.sleep(3000.msecs))();
292 
293         return result_ok;
294     }
295 
296     @When("I send a dartRead command to see if it has the changed")
297     Document theChanged() @trusted {
298         // checked above
299 
300         return result_ok;
301     }
302 
303     @Then("the read recorder should be the same as the dartModify recorder")
304     Document dartModifyRecorder() {
305         // checked above
306 
307         handle.send(Sig.STOP);
308         waitforChildren(Ctrl.END);
309 
310         return result_ok;
311     }
312 
313 }