1 /// Service for validating inputs sent via socket 2 /// [Documentation documents/architecture/InputValidator](https://docs.tagion.org/#/documents/architecture/InputValidator) 3 module tagion.services.inputvalidator; 4 5 @safe: 6 7 import core.time; 8 import nngd; 9 import std.algorithm : remove; 10 import std.conv : to; 11 import std.format; 12 import std.socket; 13 import std.stdio; 14 import tagion.actor; 15 import tagion.basic.Debug : __write; 16 import tagion.basic.Types; 17 import tagion.communication.HiRPC; 18 import tagion.crypto.SecureInterfaceNet; 19 import tagion.hibon.Document; 20 import tagion.hibon.HiBONException; 21 import tagion.hibon.HiBONJSON; 22 import tagion.hibon.HiBONRecord; 23 import tagion.logger.Logger; 24 import tagion.network.ReceiveBuffer; 25 import tagion.script.namerecords; 26 import tagion.services.messages; 27 import tagion.services.options : TaskNames; 28 import tagion.utils.JSONCommon; 29 import tagion.utils.pretend_safe_concurrency; 30 31 struct InputValidatorOptions { 32 string sock_addr; 33 uint sock_recv_timeout = 1000; 34 uint sock_recv_buf = 0x4000; 35 uint sock_send_timeout = 200; 36 uint sock_send_buf = 1024; 37 38 import tagion.services.options : contract_sock_addr; 39 40 void setDefault() nothrow { 41 sock_addr = contract_sock_addr("CONTRACT_"); 42 } 43 44 void setPrefix(string prefix) nothrow { 45 sock_addr = contract_sock_addr(prefix ~ "CONTRACT_"); 46 } 47 48 mixin JSONCommon; 49 } 50 51 enum ResponseError { 52 Internal, 53 InvalidBuf, 54 InvalidDoc, 55 NotHiRPCSender, 56 Timeout, 57 } 58 59 /** 60 * InputValidator actor 61 * Examples: [tagion.testbench.services.inputvalidator] 62 * Sends: (inputDoc, Document) to hirpc_verifier; 63 **/ 64 struct InputValidatorService { 65 const SecureNet net; 66 static Topic rejected = Topic("reject/inputvalidator"); 67 68 pragma(msg, "TODO: Make inputvalidator safe when nng is"); 69 void task(immutable(InputValidatorOptions) opts, immutable(TaskNames) task_names) @trusted { 70 HiRPC hirpc = HiRPC(net); 71 72 void reject(T)(ResponseError err_type, T data = Document()) const nothrow { 73 try { 74 hirpc.Error message; 75 message.code = err_type; 76 debug { 77 // Altough it's a bit excessive, we send back the invalid data we received in debug mode. 78 message.message = err_type.to!string; 79 message.data = data; 80 } 81 const sender = hirpc.Sender(net, message); 82 int rc = sock.send(sender.toDoc.serialize); 83 if (rc != 0) { 84 log.error("Failed to responsd with rejection %s, because %s", err_type, nng_errstr( 85 rc)); 86 } 87 log(rejected, err_type.to!string, data); 88 } 89 catch (Exception e) { 90 log.error("Failed to deliver rejection %s", err_type.to!string); 91 } 92 } 93 94 NNGSocket sock = NNGSocket(nng_socket_type.NNG_SOCKET_REP); 95 sock.sendtimeout = opts.sock_send_timeout.msecs; 96 sock.recvtimeout = opts.sock_recv_timeout.msecs; 97 sock.recvbuf = opts.sock_recv_buf; 98 sock.sendbuf = opts.sock_send_buf; 99 100 ReceiveBuffer buf; 101 buf.max_size = opts.sock_recv_buf; 102 103 const listening = sock.listen(opts.sock_addr, nonblock: 104 true); 105 scope (exit) { 106 sock.close(); 107 } 108 109 if (listening == 0) { 110 log("listening on addr: %s", opts.sock_addr); 111 } 112 else { 113 import tagion.services.exception; 114 115 throw new ServiceException( 116 format("Failed to listen on addr: %s, %s", opts.sock_addr, nng_errstr(listening)) 117 ); 118 } 119 const recv = (scope void[] b) @trusted { 120 // 121 return cast(ptrdiff_t) sock.receivebuf(cast(ubyte[]) b, 0, false); 122 }; 123 setState(Ctrl.ALIVE); 124 while (!thisActor.stop) { 125 // Check for control signal 126 const received = receiveTimeout( 127 Duration.zero, 128 &signal, 129 &ownerTerminated, 130 &unknown 131 ); 132 if (received) { 133 continue; 134 } 135 136 version (BLOCKING) { 137 scope (failure) { 138 reject(ResponseError.Internal); 139 } 140 auto result_buf = sock.receive!Buffer; 141 if (sock.m_errno != nng_errno.NNG_OK) { 142 log(rejected, "NNG_ERRNO", cast(int) sock.m_errno); 143 continue; 144 } 145 if (sock.m_errno == nng_errno.NNG_ETIMEDOUT) { 146 if (result_buf.length > 0) { 147 reject(ResponseError.Timeout); 148 } 149 else { 150 continue; 151 } 152 } 153 if (sock.m_errno != nng_errno.NNG_OK) { 154 log(rejected, "NNG_ERRNO", cast(int) sock.m_errno); 155 continue; 156 } 157 if (result_buf.length <= 0) { 158 reject(ResponseError.InvalidBuf); 159 continue; 160 } 161 162 Document doc = result_buf; 163 } 164 else { 165 scope (failure) { 166 reject(ResponseError.Internal); 167 } 168 auto result_buf = buf(recv); 169 if (sock.m_errno == nng_errno.NNG_ETIMEDOUT) { 170 if (result_buf.data.length > 0) { 171 reject(ResponseError.Timeout); 172 } 173 else { 174 continue; 175 } 176 } 177 if (sock.m_errno != nng_errno.NNG_OK) { 178 log(rejected, "NNG_ERRNO", cast(int) sock.m_errno); 179 continue; 180 } 181 182 // Fixme ReceiveBuffer .size doesn't always return correct lenght 183 if (result_buf.data.size <= 0) { 184 reject(ResponseError.InvalidBuf); 185 continue; 186 } 187 188 Document doc = result_buf.data.idup; 189 } 190 191 if (!doc.isInorder) { 192 reject(ResponseError.InvalidBuf); 193 continue; 194 } 195 196 if (!doc.isRecord!(HiRPC.Sender)) { 197 reject(ResponseError.NotHiRPCSender, doc); 198 continue; 199 } 200 try { 201 log("Sending contract to hirpc_verifier"); 202 locate(task_names.hirpc_verifier).send(inputDoc(), doc); 203 204 auto receiver = hirpc.receive(doc); 205 auto response_ok = hirpc.result(receiver, ResultOk()); 206 sock.send(response_ok.toDoc.serialize); 207 } 208 catch (HiBONException _) { 209 reject(ResponseError.InvalidDoc, doc); 210 continue; 211 } 212 } 213 } 214 }