1 module tagion.testbench.services.DARTService; 2 3 import core.time; 4 import std.algorithm; 5 import std.array; 6 import std.file : exists, remove; 7 import std.path; 8 import std.stdio; 9 import std.typecons : Tuple; 10 import tagion.actor; 11 import tagion.behaviour; 12 import tagion.dart.DARTBasic : DARTIndex; 13 import tagion.dart.Recorder; 14 import tagion.hibon.Document; 15 import tagion.services.DART; 16 import tagion.services.messages; 17 import tagion.testbench.dart.dart_helper_functions; 18 import tagion.testbench.tools.Environment; 19 import tagion.utils.pretend_safe_concurrency : receiveOnly, receiveTimeout, register, thisTid; 20 21 // import tagion.crypto.SecureNet; 22 import std.random; 23 import tagion.Keywords; 24 import tagion.basic.Types; 25 import tagion.communication.HiRPC; 26 import tagion.crypto.SecureInterfaceNet; 27 import tagion.crypto.SecureNet : StdHashNet, StdSecureNet; 28 import tagion.crypto.Types; 29 import tagion.dart.DART; 30 import tagion.dart.DARTBasic; 31 import tagion.dart.DARTFile : DARTFile; 32 import tagion.dart.DARTcrud : dartBullseye, dartCheckRead, dartRead; 33 import tagion.hibon.HiBONJSON; 34 import tagion.hibon.HiBONRecord; 35 import tagion.logger.LogRecords : LogInfo; 36 import tagion.logger.Logger; 37 import tagion.services.DARTInterface; 38 import tagion.services.TRTService; 39 import tagion.services.replicator; 40 import tagion.services.replicator : modify_log; 41 import tagion.testbench.actor.util; 42 import std.format; 43 44 enum feature = Feature( 45 "see if we can read and write trough the dartservice", 46 []); 47 48 alias FeatureContext = Tuple!( 49 WriteAndReadFromDartDb, "WriteAndReadFromDartDb", 50 FeatureGroup*, "result" 51 ); 52 53 @safe 54 struct DARTWorker { 55 void task(string sock_addr, Document doc, bool shouldError) @trusted { 56 import nngd; 57 58 int rc; 59 NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_REQ); 60 s.recvtimeout = 1000.msecs; 61 62 setState(Ctrl.ALIVE); 63 while (!thisActor.stop) { 64 const received = receiveTimeout( 65 Duration.zero, 66 &signal, 67 &ownerTerminated, 68 &unknown 69 ); 70 71 writefln("REQ %s to dial...", doc.toPretty); 72 rc = s.dial(sock_addr); 73 if (rc == 0) 74 break; 75 writefln("REQ dial error %s", rc); 76 if (rc == nng_errno.NNG_ECONNREFUSED) { 77 nng_sleep(100.msecs); 78 } 79 check(rc == 0, "NNG error"); 80 } 81 while (!thisActor.stop) { 82 const received = receiveTimeout( 83 Duration.zero, 84 &signal, 85 &ownerTerminated, 86 &unknown 87 ); 88 if (received) { 89 continue; 90 } 91 rc = s.send!(immutable(ubyte[]))(doc.serialize); 92 check(rc == 0, "NNG error"); 93 writefln("sent req"); 94 Document received_doc = s.receive!(immutable(ubyte[]))(); 95 thisActor.stop = true; 96 check(s.errno == 0, format("Received not valid response from nng", s.errno)); 97 98 HiRPC hirpc = HiRPC(null); 99 auto received_hirpc = hirpc.receive(received_doc); 100 if (!shouldError) { 101 check(!received_hirpc.isError, format("received hirpc error: %s", received_doc.toPretty)); 102 } 103 else { 104 check(received_hirpc.isError, format("Should have thrown error got: %s", received_doc.toPretty)); 105 } 106 107 108 109 } 110 } 111 } 112 113 @safe @Scenario("write and read from dart db", 114 []) 115 class WriteAndReadFromDartDb { 116 117 ActorHandle handle; 118 ActorHandle dart_interface_handle; 119 ActorHandle replicator_handle; 120 DARTInterfaceOptions interface_opts; 121 TRTOptions trt_options; 122 123 SecureNet supervisor_net; 124 DARTOptions opts; 125 ReplicatorOptions replicator_opts; 126 Mt19937 gen; 127 RandomArchives random_archives; 128 Document[] docs; 129 RecordFactory.Recorder insert_recorder; 130 RecordFactory record_factory; 131 HiRPC hirpc; 132 133 struct SimpleDoc { 134 ulong n; 135 mixin HiBONRecord!(q{ 136 this(ulong n) { 137 this.n = n; 138 } 139 }); 140 } 141 142 this(DARTOptions opts, ReplicatorOptions replicator_opts, TRTOptions trt_options) { 143 144 this.opts = opts; 145 this.replicator_opts = replicator_opts; 146 this.trt_options = trt_options; 147 supervisor_net = new StdSecureNet(); 148 supervisor_net.generateKeyPair("supervisor very secret"); 149 150 record_factory = RecordFactory(supervisor_net); 151 hirpc = HiRPC(supervisor_net); 152 153 gen = Mt19937(1234); 154 155 } 156 157 @Given("I have a dart db") 158 Document dartDb() { 159 if (opts.dart_path.exists) { 160 opts.dart_path.remove; 161 } 162 163 auto hash_net = new StdHashNet; 164 DART.create(opts.dart_path, hash_net); 165 return result_ok; 166 } 167 168 @Given("I have an dart actor with said db") 169 Document saidDb() { 170 thisActor.task_name = "dart_supervisor"; 171 register(thisActor.task_name, thisTid); 172 173 import tagion.services.options : TaskNames; 174 175 writeln("DART task name", TaskNames().dart); 176 177 auto net = new StdSecureNet(); 178 net.generateKeyPair("dartnet very secret"); 179 180 handle = (() @trusted => spawn!DARTService(TaskNames().dart, cast(immutable) opts, TaskNames(), cast( 181 shared) net, false))(); 182 183 replicator_handle = (() @trusted => spawn!ReplicatorService( 184 TaskNames().replicator, 185 cast(immutable) replicator_opts))(); 186 187 interface_opts.setDefault; 188 writeln(interface_opts.sock_addr); 189 190 dart_interface_handle = (() @trusted => spawn(immutable(DARTInterfaceService)(cast(immutable) interface_opts, cast(immutable) trt_options, TaskNames()), "DartInterfaceService"))(); 191 192 waitforChildren(Ctrl.ALIVE, 3.seconds); 193 194 return result_ok; 195 } 196 197 @When("I send a dartModify command with a recorder containing changes to add") 198 Document toAdd() { 199 log.registerSubscriptionTask(thisActor.task_name); 200 submask.subscribe(modify_log); 201 202 foreach (i; 0 .. 100) { 203 gen.popFront; 204 random_archives = RandomArchives(gen.front, 4, 10); 205 insert_recorder = record_factory.recorder; 206 docs = (() @trusted => cast(Document[]) random_archives.values.map!(a => SimpleDoc(a).toDoc).array)(); 207 208 insert_recorder.insert(docs, Archive.Type.ADD); 209 auto modify_send = dartModifyRR(); 210 (() @trusted => handle.send(modify_send, cast(immutable) insert_recorder, immutable long(i)))(); 211 212 auto modify = receiveOnlyTimeout!(dartModifyRR.Response, Fingerprint); 213 214 auto modify_log_result = receiveOnlyTimeout!(LogInfo, const(Document)); 215 check(modify_log_result[1].isRecord!(RecordFactory.Recorder), "Did not receive recorder"); 216 217 handle.send(dartBullseyeRR()); 218 const bullseye_res = receiveOnly!(dartBullseyeRR.Response, Fingerprint); 219 check(bullseye_res[1]!is Fingerprint.init, "bullseyes not the same"); 220 221 Document bullseye_sender = dartBullseye(hirpc).toDoc; 222 223 handle.send(dartHiRPCRR(), bullseye_sender); 224 // writefln("SENDER: %s", bullseye_sender.toPretty); 225 auto hirpc_bullseye_res = receiveOnly!(dartHiRPCRR.Response, Document); 226 // writefln("RECEIVER %s", hirpc_bullseye_res[1].toPretty); 227 228 auto hirpc_bullseye_receiver = hirpc.receive(hirpc_bullseye_res[1]); 229 auto hirpc_message = hirpc_bullseye_receiver.message[Keywords.result].get!Document; 230 auto hirpc_bullseye = hirpc_message[DARTFile.Params.bullseye].get!DARTIndex; 231 check(bullseye_res[1] == hirpc_bullseye, "hirpc bullseye not the same"); 232 233 /// read the archives 234 auto dart_indices = docs 235 .map!(d => supervisor_net.dartIndex(d)) 236 .array; 237 238 auto read_request = dartReadRR(); 239 handle.send(read_request, dart_indices); 240 auto read_tuple = receiveOnly!(dartReadRR.Response, immutable(RecordFactory.Recorder)); 241 auto read_recorder = read_tuple[1]; 242 243 check(equal(read_recorder[].map!(a => a.filed), insert_recorder[].map!(a => a.filed)), "Data not the same"); 244 245 Document read_sender = dartRead(dart_indices, hirpc).toDoc; 246 247 handle.send(dartHiRPCRR(), read_sender); 248 249 auto read_hirpc = receiveOnly!(dartHiRPCRR.Response, Document); 250 auto read_hirpc_recorder = hirpc.receive(read_hirpc[1]); 251 auto hirpc_recorder_message = read_hirpc_recorder.message[Keywords.result].get!Document; 252 253 const hirpc_recorder = record_factory.recorder(hirpc_recorder_message); 254 255 check(equal(hirpc_recorder[].map!(a => a.filed), insert_recorder[].map!(a => a.filed)), "hirpc data not the same as insertion"); 256 257 Document check_read_sender = dartCheckRead(dart_indices, hirpc).toDoc; 258 handle.send(dartHiRPCRR(), check_read_sender); 259 auto read_check_tuple = receiveOnly!(dartHiRPCRR.Response, Document); 260 auto read_check = hirpc.receive(read_check_tuple[1]); 261 262 auto check_dart_indices = read_check.response.result[DART.Params.dart_indices].get!Document[].map!( 263 d => d.get!DARTIndex).array; 264 265 check(check_dart_indices.length == 0, "should be empty"); 266 267 } 268 submask.unsubscribe(modify_log); 269 270 auto dummy_indexes = [DARTIndex([1, 2, 3, 4]), DARTIndex([2, 3, 4, 5])]; 271 Document check_read_sender = dartCheckRead(dummy_indexes, hirpc).toDoc; 272 writefln("read_sender %s", check_read_sender.toPretty); 273 handle.send(dartHiRPCRR(), check_read_sender); 274 auto read_check_tuple = receiveOnly!(dartHiRPCRR.Response, Document); 275 auto read_check = hirpc.receive(read_check_tuple[1]); 276 277 auto check_dart_indices = read_check.response.result[DART.Params.dart_indices].get!Document[].map!(d => d.get!DARTIndex) 278 .array; 279 280 check(equal(check_dart_indices, dummy_indexes), "error in hirpc checkread"); 281 282 auto t1 = spawn!DARTWorker("dartworker1", interface_opts.sock_addr, check_read_sender, false); 283 auto t2 = spawn!DARTWorker("dartworker2", interface_opts.sock_addr, check_read_sender, false); 284 auto t3 = spawn!DARTWorker("dartworker3", interface_opts.sock_addr, check_read_sender, false); 285 286 // send a message that should fail 287 auto t4 = spawn!DARTWorker("dartworker4", interface_opts.sock_addr, read_check_tuple[1], true); 288 289 import core.thread; 290 291 (() @trusted => Thread.sleep(3000.msecs))(); 292 293 return result_ok; 294 } 295 296 @When("I send a dartRead command to see if it has the changed") 297 Document theChanged() @trusted { 298 // checked above 299 300 return result_ok; 301 } 302 303 @Then("the read recorder should be the same as the dartModify recorder") 304 Document dartModifyRecorder() { 305 // checked above 306 307 handle.send(Sig.STOP); 308 waitforChildren(Ctrl.END); 309 310 return result_ok; 311 } 312 313 }