1 /// Service which exposes dart reads over a socket 2 module tagion.services.DARTInterface; 3 4 @safe: 5 6 import tagion.utils.JSONCommon; 7 8 struct DARTInterfaceOptions { 9 import tagion.services.options : contract_sock_addr; 10 11 string sock_addr; 12 string dart_prefix = "DART_"; 13 int sendtimeout = 10_000; 14 int receivetimeout = 1000; 15 uint pool_size = 12; 16 uint sendbuf = 0x2_0000; 17 18 void setDefault() nothrow { 19 sock_addr = contract_sock_addr(dart_prefix); 20 } 21 22 void setPrefix(string prefix) nothrow { 23 sock_addr = contract_sock_addr(prefix ~ dart_prefix); 24 } 25 26 mixin JSONCommon; 27 28 } 29 30 import core.time; 31 import core.thread; 32 import nngd; 33 import std.stdio; 34 import std.format; 35 import tagion.actor; 36 import tagion.communication.HiRPC; 37 import tagion.hibon.Document; 38 import tagion.hibon.HiBONRecord : isRecord; 39 import tagion.logger.Logger; 40 import tagion.services.messages; 41 import tagion.services.options; 42 import tagion.utils.pretend_safe_concurrency; 43 import tagion.services.TRTService : TRTOptions; 44 45 struct DartWorkerContext { 46 string dart_task_name; 47 int worker_timeout; 48 bool trt_enable; 49 string trt_task_name; 50 } 51 52 enum InterfaceError { 53 Timeout, 54 InvalidDoc, 55 DARTLocate, 56 TRTLocate, 57 } 58 59 void dartHiRPCCallback(NNGMessage* msg, void* ctx) @trusted { 60 61 thread_attachThis(); 62 63 HiRPC hirpc = HiRPC(null); 64 65 void send_doc(Document doc) @trusted { 66 msg.length = doc.full_size; 67 msg.body_prepend(doc.serialize); 68 } 69 70 void send_error(InterfaceError err_type, string extra_msg = "") @trusted { 71 import std.conv; 72 73 hirpc.Error message; 74 message.code = err_type; 75 message.message = err_type.to!string ~ extra_msg; 76 const sender = hirpc.Sender(null, message); 77 writefln("INTERFACE ERROR: %s", err_type.to!string ~ extra_msg); 78 send_doc(sender.toDoc); 79 // msg.body_append(sender.toDoc.serialize); 80 } 81 82 void dartHiRPCResponse(dartHiRPCRR.Response res, Document doc) @trusted { 83 writeln("Interface successful response"); 84 send_doc(doc); 85 // msg.body_append(doc.serialize); 86 } 87 88 void trtHiRPCResponse(trtHiRPCRR.Response res, Document doc) @trusted { 89 writeln("TRT Inteface succesful response"); 90 send_doc(doc); 91 } 92 93 if (msg is null) { 94 writeln("no message received"); 95 return; 96 } 97 if (msg.length < 1) { 98 writeln("received empty msg"); 99 return; 100 } 101 102 thisActor.task_name = format("%s", thisTid); 103 auto cnt = cast(DartWorkerContext*) ctx; 104 if (cnt is null) { 105 writeln("the context was nil"); 106 return; 107 } 108 // we use an empty hirpc only for sending errors. 109 110 Document doc = msg.body_trim!(immutable(ubyte[]))(msg.length); 111 msg.clear(); 112 113 if (!doc.isInorder || !doc.isRecord!(HiRPC.Sender)) { 114 send_error(InterfaceError.InvalidDoc); 115 writeln("Non-valid request received"); 116 return; 117 } 118 writeln("Kernel received a document"); 119 120 const empty_hirpc = HiRPC(null); 121 122 immutable receiver = empty_hirpc.receive(doc); 123 if (!receiver.isMethod) { 124 send_error(InterfaceError.InvalidDoc); 125 return; 126 } 127 128 if (receiver.method.name == "search" && cnt.trt_enable) { 129 writeln("TRT SEARCH REQUEST"); 130 auto trt_tid = locate(cnt.trt_task_name); 131 if (trt_tid is Tid.init) { 132 send_error(InterfaceError.TRTLocate, cnt.trt_task_name); 133 return; 134 } 135 trt_tid.send(trtHiRPCRR(), doc); 136 auto trt_resp = receiveTimeout(cnt.worker_timeout.msecs, &trtHiRPCResponse); 137 if (!trt_resp) { 138 send_error(InterfaceError.Timeout); 139 writeln("Timeout on trt request"); 140 return; 141 } 142 143 } 144 else { 145 auto dart_tid = locate(cnt.dart_task_name); 146 if (dart_tid is Tid.init) { 147 send_error(InterfaceError.DARTLocate, cnt.dart_task_name); 148 return; 149 } 150 dart_tid.send(dartHiRPCRR(), doc); 151 auto dart_resp = receiveTimeout(cnt.worker_timeout.msecs, &dartHiRPCResponse); 152 if (!dart_resp) { 153 send_error(InterfaceError.Timeout); 154 writeln("Timeout on dart request"); 155 return; 156 } 157 158 } 159 } 160 161 import tagion.services.exception; 162 163 void checkSocketError(int rc) { 164 if (rc != 0) { 165 import std.format; 166 167 throw new ServiceException(format("Failed to dial %s", nng_errstr(rc))); 168 } 169 } 170 171 struct DARTInterfaceService { 172 immutable(DARTInterfaceOptions) opts; 173 immutable(TRTOptions) trt_opts; 174 immutable(TaskNames) task_names; 175 176 pragma(msg, "FIXME: make dart interface @safe when nng is"); 177 void task() @trusted { 178 setState(Ctrl.STARTING); 179 180 DartWorkerContext ctx; 181 ctx.dart_task_name = task_names.dart; 182 ctx.worker_timeout = opts.sendtimeout; 183 ctx.trt_task_name = task_names.trt; 184 ctx.trt_enable = trt_opts.enable; 185 186 NNGSocket sock = NNGSocket(nng_socket_type.NNG_SOCKET_REP); 187 sock.sendtimeout = opts.sendtimeout.msecs; 188 sock.recvtimeout = opts.receivetimeout.msecs; 189 sock.sendbuf = opts.sendbuf; 190 191 NNGPool pool = NNGPool(&sock, &dartHiRPCCallback, opts.pool_size, &ctx); 192 scope (exit) { 193 pool.shutdown(); 194 } 195 pool.init(); 196 auto rc = sock.listen(opts.sock_addr); 197 checkSocketError(rc); 198 199 // Receive actor signals 200 run(); 201 } 202 203 }