1 /// Service for validating inputs sent via socket
2 /// [Documentation documents/architecture/InputValidator](https://docs.tagion.org/#/documents/architecture/InputValidator)
3 module tagion.services.inputvalidator;
4 
5 @safe:
6 
7 import core.time;
8 import nngd;
9 import std.algorithm : remove;
10 import std.conv : to;
11 import std.format;
12 import std.socket;
13 import std.stdio;
14 import tagion.actor;
15 import tagion.basic.Debug : __write;
16 import tagion.basic.Types;
17 import tagion.communication.HiRPC;
18 import tagion.crypto.SecureInterfaceNet;
19 import tagion.hibon.Document;
20 import tagion.hibon.HiBONException;
21 import tagion.hibon.HiBONJSON;
22 import tagion.hibon.HiBONRecord;
23 import tagion.logger.Logger;
24 import tagion.network.ReceiveBuffer;
25 import tagion.script.namerecords;
26 import tagion.services.messages;
27 import tagion.services.options : TaskNames;
28 import tagion.utils.JSONCommon;
29 import tagion.utils.pretend_safe_concurrency;
30 
31 struct InputValidatorOptions {
32     string sock_addr;
33     uint sock_recv_timeout = 1000;
34     uint sock_recv_buf = 0x4000;
35     uint sock_send_timeout = 200;
36     uint sock_send_buf = 1024;
37 
38     import tagion.services.options : contract_sock_addr;
39 
40     void setDefault() nothrow {
41         sock_addr = contract_sock_addr("CONTRACT_");
42     }
43 
44     void setPrefix(string prefix) nothrow {
45         sock_addr = contract_sock_addr(prefix ~ "CONTRACT_");
46     }
47 
48     mixin JSONCommon;
49 }
50 
51 enum ResponseError {
52     Internal,
53     InvalidBuf,
54     InvalidDoc,
55     NotHiRPCSender,
56     Timeout,
57 }
58 
59 /** 
60  *  InputValidator actor
61  *  Examples: [tagion.testbench.services.inputvalidator]
62  *  Sends: (inputDoc, Document) to hirpc_verifier;
63 **/
64 struct InputValidatorService {
65     const SecureNet net;
66     static Topic rejected = Topic("reject/inputvalidator");
67 
68     pragma(msg, "TODO: Make inputvalidator safe when nng is");
69     void task(immutable(InputValidatorOptions) opts, immutable(TaskNames) task_names) @trusted {
70         HiRPC hirpc = HiRPC(net);
71 
72         void reject(T)(ResponseError err_type, T data = Document()) const nothrow {
73             try {
74                 hirpc.Error message;
75                 message.code = err_type;
76                 debug {
77                     // Altough it's a bit excessive, we send back the invalid data we received in debug mode.
78                     message.message = err_type.to!string;
79                     message.data = data;
80                 }
81                 const sender = hirpc.Sender(net, message);
82                 int rc = sock.send(sender.toDoc.serialize);
83                 if (rc != 0) {
84                     log.error("Failed to responsd with rejection %s, because %s", err_type, nng_errstr(
85                             rc));
86                 }
87                 log(rejected, err_type.to!string, data);
88             }
89             catch (Exception e) {
90                 log.error("Failed to deliver rejection %s", err_type.to!string);
91             }
92         }
93 
94         NNGSocket sock = NNGSocket(nng_socket_type.NNG_SOCKET_REP);
95         sock.sendtimeout = opts.sock_send_timeout.msecs;
96         sock.recvtimeout = opts.sock_recv_timeout.msecs;
97         sock.recvbuf = opts.sock_recv_buf;
98         sock.sendbuf = opts.sock_send_buf;
99 
100         ReceiveBuffer buf;
101         buf.max_size = opts.sock_recv_buf;
102 
103         const listening = sock.listen(opts.sock_addr, nonblock:
104                 true);
105         scope (exit) {
106             sock.close();
107         }
108 
109         if (listening == 0) {
110             log("listening on addr: %s", opts.sock_addr);
111         }
112         else {
113             import tagion.services.exception;
114 
115             throw new ServiceException(
116                     format("Failed to listen on addr: %s, %s", opts.sock_addr, nng_errstr(listening))
117             );
118         }
119         const recv = (scope void[] b) @trusted {
120             // 
121             return cast(ptrdiff_t) sock.receivebuf(cast(ubyte[]) b, 0, false);
122         };
123         setState(Ctrl.ALIVE);
124         while (!thisActor.stop) {
125             // Check for control signal
126             const received = receiveTimeout(
127                     Duration.zero,
128                     &signal,
129                     &ownerTerminated,
130                     &unknown
131             );
132             if (received) {
133                 continue;
134             }
135 
136             version (BLOCKING) {
137                 scope (failure) {
138                     reject(ResponseError.Internal);
139                 }
140                 auto result_buf = sock.receive!Buffer;
141                 if (sock.m_errno != nng_errno.NNG_OK) {
142                     log(rejected, "NNG_ERRNO", cast(int) sock.m_errno);
143                     continue;
144                 }
145                 if (sock.m_errno == nng_errno.NNG_ETIMEDOUT) {
146                     if (result_buf.length > 0) {
147                         reject(ResponseError.Timeout);
148                     }
149                     else {
150                         continue;
151                     }
152                 }
153                 if (sock.m_errno != nng_errno.NNG_OK) {
154                     log(rejected, "NNG_ERRNO", cast(int) sock.m_errno);
155                     continue;
156                 }
157                 if (result_buf.length <= 0) {
158                     reject(ResponseError.InvalidBuf);
159                     continue;
160                 }
161 
162                 Document doc = result_buf;
163             }
164             else {
165                 scope (failure) {
166                     reject(ResponseError.Internal);
167                 }
168                 auto result_buf = buf(recv);
169                 if (sock.m_errno == nng_errno.NNG_ETIMEDOUT) {
170                     if (result_buf.data.length > 0) {
171                         reject(ResponseError.Timeout);
172                     }
173                     else {
174                         continue;
175                     }
176                 }
177                 if (sock.m_errno != nng_errno.NNG_OK) {
178                     log(rejected, "NNG_ERRNO", cast(int) sock.m_errno);
179                     continue;
180                 }
181 
182                 // Fixme ReceiveBuffer .size doesn't always return correct lenght
183                 if (result_buf.data.size <= 0) {
184                     reject(ResponseError.InvalidBuf);
185                     continue;
186                 }
187 
188                 Document doc = result_buf.data.idup;
189             }
190 
191             if (!doc.isInorder) {
192                 reject(ResponseError.InvalidBuf);
193                 continue;
194             }
195 
196             if (!doc.isRecord!(HiRPC.Sender)) {
197                 reject(ResponseError.NotHiRPCSender, doc);
198                 continue;
199             }
200             try {
201                 log("Sending contract to hirpc_verifier");
202                 locate(task_names.hirpc_verifier).send(inputDoc(), doc);
203 
204                 auto receiver = hirpc.receive(doc);
205                 auto response_ok = hirpc.result(receiver, ResultOk());
206                 sock.send(response_ok.toDoc.serialize);
207             }
208             catch (HiBONException _) {
209                 reject(ResponseError.InvalidDoc, doc);
210                 continue;
211             }
212         }
213     }
214 }