1 /// Service which exposes dart reads over a socket
2 module tagion.services.DARTInterface;
3 
4 @safe:
5 
6 import tagion.utils.JSONCommon;
7 
8 struct DARTInterfaceOptions {
9     import tagion.services.options : contract_sock_addr;
10 
11     string sock_addr;
12     string dart_prefix = "DART_";
13     int sendtimeout = 10_000;
14     int receivetimeout = 1000;
15     uint pool_size = 12;
16     uint sendbuf = 0x2_0000;
17 
18     void setDefault() nothrow {
19         sock_addr = contract_sock_addr(dart_prefix);
20     }
21 
22     void setPrefix(string prefix) nothrow {
23         sock_addr = contract_sock_addr(prefix ~ dart_prefix);
24     }
25 
26     mixin JSONCommon;
27 
28 }
29 
30 import core.time;
31 import core.thread;
32 import nngd;
33 import std.stdio;
34 import std.format;
35 import tagion.actor;
36 import tagion.communication.HiRPC;
37 import tagion.hibon.Document;
38 import tagion.hibon.HiBONRecord : isRecord;
39 import tagion.logger.Logger;
40 import tagion.services.messages;
41 import tagion.services.options;
42 import tagion.utils.pretend_safe_concurrency;
43 import tagion.services.TRTService : TRTOptions;
44 
45 struct DartWorkerContext {
46     string dart_task_name;
47     int worker_timeout;
48     bool trt_enable;
49     string trt_task_name;
50 }
51 
52 enum InterfaceError {
53     Timeout,
54     InvalidDoc,
55     DARTLocate,
56     TRTLocate,
57 }
58 
59 void dartHiRPCCallback(NNGMessage* msg, void* ctx) @trusted {
60 
61     thread_attachThis();
62 
63     HiRPC hirpc = HiRPC(null);
64 
65     void send_doc(Document doc) @trusted {
66         msg.length = doc.full_size;
67         msg.body_prepend(doc.serialize);
68     }
69 
70     void send_error(InterfaceError err_type, string extra_msg = "") @trusted {
71         import std.conv;
72 
73         hirpc.Error message;
74         message.code = err_type;
75         message.message = err_type.to!string ~ extra_msg;
76         const sender = hirpc.Sender(null, message);
77         writefln("INTERFACE ERROR: %s", err_type.to!string ~ extra_msg);
78         send_doc(sender.toDoc);
79         // msg.body_append(sender.toDoc.serialize);
80     }
81 
82     void dartHiRPCResponse(dartHiRPCRR.Response res, Document doc) @trusted {
83         writeln("Interface successful response");
84         send_doc(doc);
85         // msg.body_append(doc.serialize);
86     }
87 
88     void trtHiRPCResponse(trtHiRPCRR.Response res, Document doc) @trusted {
89         writeln("TRT Inteface succesful response");
90         send_doc(doc);
91     }
92 
93     if (msg is null) {
94         writeln("no message received");
95         return;
96     }
97     if (msg.length < 1) {
98         writeln("received empty msg");
99         return;
100     }
101 
102     thisActor.task_name = format("%s", thisTid);
103     auto cnt = cast(DartWorkerContext*) ctx;
104     if (cnt is null) {
105         writeln("the context was nil");
106         return;
107     }
108     // we use an empty hirpc only for sending errors.
109 
110     Document doc = msg.body_trim!(immutable(ubyte[]))(msg.length);
111     msg.clear();
112 
113     if (!doc.isInorder || !doc.isRecord!(HiRPC.Sender)) {
114         send_error(InterfaceError.InvalidDoc);
115         writeln("Non-valid request received");
116         return;
117     }
118     writeln("Kernel received a document");
119 
120     const empty_hirpc = HiRPC(null);
121 
122     immutable receiver = empty_hirpc.receive(doc);
123     if (!receiver.isMethod) {
124         send_error(InterfaceError.InvalidDoc);
125         return;
126     }
127 
128     if (receiver.method.name == "search" && cnt.trt_enable) {
129         writeln("TRT SEARCH REQUEST");
130         auto trt_tid = locate(cnt.trt_task_name);
131         if (trt_tid is Tid.init) {
132             send_error(InterfaceError.TRTLocate, cnt.trt_task_name);
133             return;
134         }
135         trt_tid.send(trtHiRPCRR(), doc);
136         auto trt_resp = receiveTimeout(cnt.worker_timeout.msecs, &trtHiRPCResponse);
137         if (!trt_resp) {
138             send_error(InterfaceError.Timeout);
139             writeln("Timeout on trt request");
140             return;
141         }
142 
143     }
144     else {
145         auto dart_tid = locate(cnt.dart_task_name);
146         if (dart_tid is Tid.init) {
147             send_error(InterfaceError.DARTLocate, cnt.dart_task_name);
148             return;
149         }
150         dart_tid.send(dartHiRPCRR(), doc);
151         auto dart_resp = receiveTimeout(cnt.worker_timeout.msecs, &dartHiRPCResponse);
152         if (!dart_resp) {
153             send_error(InterfaceError.Timeout);
154             writeln("Timeout on dart request");
155             return;
156         }
157 
158     }
159 }
160 
161 import tagion.services.exception;
162 
163 void checkSocketError(int rc) {
164     if (rc != 0) {
165         import std.format;
166 
167         throw new ServiceException(format("Failed to dial %s", nng_errstr(rc)));
168     }
169 }
170 
171 struct DARTInterfaceService {
172     immutable(DARTInterfaceOptions) opts;
173     immutable(TRTOptions) trt_opts;
174     immutable(TaskNames) task_names;
175 
176     pragma(msg, "FIXME: make dart interface @safe when nng is");
177     void task() @trusted {
178         setState(Ctrl.STARTING);
179 
180         DartWorkerContext ctx;
181         ctx.dart_task_name = task_names.dart;
182         ctx.worker_timeout = opts.sendtimeout;
183         ctx.trt_task_name = task_names.trt;
184         ctx.trt_enable = trt_opts.enable;
185 
186         NNGSocket sock = NNGSocket(nng_socket_type.NNG_SOCKET_REP);
187         sock.sendtimeout = opts.sendtimeout.msecs;
188         sock.recvtimeout = opts.receivetimeout.msecs;
189         sock.sendbuf = opts.sendbuf;
190 
191         NNGPool pool = NNGPool(&sock, &dartHiRPCCallback, opts.pool_size, &ctx);
192         scope (exit) {
193             pool.shutdown();
194         }
195         pool.init();
196         auto rc = sock.listen(opts.sock_addr);
197         checkSocketError(rc);
198 
199         // Receive actor signals
200         run();
201     }
202 
203 }