1 module tagion.tools.tagionshell; 2 3 import core.time; 4 import std.algorithm; 5 import std.array; 6 import std.concurrency; 7 import std.conv; 8 import std.exception; 9 import std.file : exists; 10 import std.format; 11 import std.getopt; 12 import std.json; 13 import std.stdio : File, stderr, stdout, writefln, writeln; 14 import tagion.basic.Types : Buffer, FileExtension, hasExtension; 15 import tagion.basic.range : doFront; 16 import tagion.communication.HiRPC; 17 import tagion.crypto.SecureInterfaceNet; 18 import tagion.crypto.SecureNet; 19 import tagion.hibon.Document; 20 import tagion.hibon.HiBON; 21 import tagion.hibon.HiBONFile : fread, fwrite; 22 import tagion.hibon.HiBONRecord : isRecord; 23 import tagion.script.TagionCurrency; 24 import tagion.script.common; 25 import tagion.tools.Basic; 26 import tagion.tools.revision; 27 import tagion.tools.shell.shelloptions; 28 import tagion.tools.wallet.WalletInterface; 29 import tagion.tools.wallet.WalletOptions; 30 import tagion.utils.StdTime : currentTime; 31 import tagion.wallet.AccountDetails; 32 import tagion.wallet.SecureWallet; 33 import tagion.utils.LRUT; 34 35 import core.thread; 36 import nngd.nngd; 37 38 mixin Main!(_main, "shell"); 39 40 alias DartCache = LRUT!(Buffer, TagionBill); 41 42 long getmemstatus() { 43 long sz = -1; 44 auto f = File("/proc/self/status", "rt"); 45 foreach (line; f.byLine) { 46 if (line.startsWith("VmRSS")) { 47 sz = to!long(line.split()[1]); 48 break; 49 } 50 } 51 f.close(); 52 return sz; 53 } 54 55 void writeit(A...)(A a) { 56 writeln(a); 57 stdout.flush(); 58 } 59 60 void dart_worker(ShellOptions opt) { 61 int rc; 62 NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_SUB); 63 s.recvtimeout = msecs(1000); 64 s.subscribe(""); 65 writeit("DS: subscribed"); 66 while (true) { 67 rc = s.dial(opt.tagion_subscription_addr); 68 if (rc == 0) 69 break; 70 } 71 scope (exit) { 72 s.close; 73 } 74 writeit("DS: connected"); 75 while (true) { 76 Document received_doc = s.receive!(immutable(ubyte[]))(); 77 writeit(format("DS: received %d bytes", received_doc.length)); 78 } 79 } 80 81 void contract_handler(WebData* req, WebData* rep, void* ctx) { 82 83 thread_attachThis(); 84 85 int rc; 86 ShellOptions* opt = cast(ShellOptions*) ctx; 87 if (req.type != "application/octet-stream") { 88 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 89 rep.msg = "invalid data type"; 90 return; 91 } 92 93 const contract_addr = opt.node_contract_addr; 94 95 writeit(format("WH: contract: with %d bytes for %s", req.rawdata.length, contract_addr)); 96 NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_REQ); 97 s.recvtimeout = msecs(10000); 98 writeit(format("WH: contract: trying to dial %s", contract_addr)); 99 while (true) { 100 rc = s.dial(contract_addr); 101 if (rc == 0) 102 break; 103 } 104 scope (exit) { 105 s.close(); 106 } 107 rc = s.send(req.rawdata); 108 if (rc != 0) { 109 writeit("contract_handler: send: ", nng_errstr(s.errno)); 110 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 111 rep.msg = "socket error"; 112 return; 113 } 114 ubyte[4096] buf; 115 size_t len = s.receivebuf(buf, 4096); 116 if (len == size_t.max && s.errno != 0) { 117 writeit("contract_handler: recv: ", nng_errstr(s.errno)); 118 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 119 rep.msg = "socket error"; 120 return; 121 } 122 writeit(format("WH: dart: received %d bytes", len)); 123 rep.status = (len > 0) ? nng_http_status.NNG_HTTP_STATUS_OK : nng_http_status.NNG_HTTP_STATUS_NO_CONTENT; 124 rep.type = "applicaion/octet-stream"; 125 rep.rawdata = (len > 0) ? buf[0 .. len] : null; 126 } 127 128 import crud = tagion.dart.DARTcrud; 129 130 static void bullseye_handler(WebData* req, WebData* rep, void* ctx) { 131 132 thread_attachThis(); 133 134 NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_REQ); 135 136 int rc; 137 ShellOptions* opt = cast(ShellOptions*) ctx; 138 while (true) { 139 rc = s.dial(opt.node_dart_addr); 140 if (rc == 0) 141 break; 142 } 143 scope (exit) { 144 s.close(); 145 } 146 147 rc = s.send(crud.dartBullseye.toDoc.serialize); 148 ubyte[192] buf; 149 size_t len = s.receivebuf(buf, buf.length); 150 if (len == size_t.max && s.errno != 0) { 151 writeit("bullseye_handler: recv: ", nng_errstr(s.errno)); 152 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 153 rep.msg = "socket error"; 154 return; 155 } 156 157 const receiver = HiRPC(null).receive(Document(buf.idup)); 158 if (!receiver.isResponse) { 159 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 160 rep.msg = "response error"; 161 return; 162 } 163 164 const dartindex = parseJSON(receiver.response.toPretty); 165 166 rep.status = (len > 0) ? nng_http_status.NNG_HTTP_STATUS_OK : nng_http_status.NNG_HTTP_STATUS_NO_CONTENT; 167 rep.type = "application/json"; 168 rep.json = dartindex; 169 } 170 171 static void dartcache_handler(WebData* req, WebData* rep, void* ctx) { 172 173 thread_attachThis(); 174 175 int rc; 176 const size_t buflen = 1048576; 177 ubyte[1048576] buf; 178 immutable(ubyte)[] docbuf; 179 180 ShellOptions* opt = cast(ShellOptions*) ctx; 181 if (req.type != "application/octet-stream") { 182 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 183 rep.msg = "invalid data type"; 184 return; 185 } 186 187 SecureNet net = new StdSecureNet(); 188 net.generateKeyPair("very_secret"); 189 HiRPC hirpc = HiRPC(net); 190 Document doc = Document(cast(immutable(ubyte[])) req.rawdata); 191 immutable receiver = hirpc.receive(doc); 192 auto pkey_doc = receiver.method.params; 193 Buffer[] owner_pkeys; 194 foreach (owner; pkey_doc[]) { 195 owner_pkeys ~= owner.get!Buffer; 196 } 197 198 TagionBill[] found_bills; 199 200 // to chache 201 202 if (!found_bills.empty) { 203 foreach (bill; found_bills) { 204 remove!(x => x == bill.owner)(owner_pkeys); 205 } 206 } 207 208 if (!owner_pkeys.empty) { 209 auto dreq = new HiBON; 210 dreq = owner_pkeys; 211 212 NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_REQ); 213 s.recvtimeout = 60_000.msecs; 214 while (true) { 215 rc = s.dial(opt.node_dart_addr); 216 if (rc == 0) 217 break; 218 } 219 scope (exit) { 220 s.close(); 221 } 222 223 rc = s.send(cast(ubyte[])(hirpc.search(dreq).toDoc.serialize)); 224 225 if (rc != 0) { 226 writeit("dart_handler: send: ", nng_errstr(rc)); 227 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 228 rep.msg = "socket error"; 229 return; 230 } 231 232 size_t len = 0, doclen = 0; 233 do { 234 len = s.receivebuf(buf, buflen); 235 if (len == size_t.max && s.errno != 0) { 236 writeit("dart_handler: recv: ", nng_errstr(s.errno)); 237 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 238 rep.msg = "socket error"; 239 return; 240 } 241 if (len > buflen) { 242 writeit("dart_handler: recv wrong size: ", len); 243 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 244 rep.msg = "socket error"; 245 return; 246 } 247 writeit(format("WH: dart: received %d bytes", len)); 248 docbuf ~= buf[0 .. len]; 249 doclen += len; 250 } 251 while (len > buflen - 1); 252 253 const repdoc = Document(docbuf); 254 immutable repreceiver = hirpc.receive(repdoc); 255 found_bills ~= repreceiver.response.result[] 256 .map!(e => TagionBill(e.get!Document)) 257 .array; 258 259 } 260 261 HiBON params = new HiBON; 262 263 foreach (i, bill; found_bills) { 264 params[i] = bill.toHiBON; 265 } 266 267 Document response = hirpc.result(receiver, params).toDoc; 268 269 rep.status = (found_bills.length > 0) ? nng_http_status.NNG_HTTP_STATUS_OK : nng_http_status 270 .NNG_HTTP_STATUS_NO_CONTENT; 271 rep.type = "applicaion/octet-stream"; 272 rep.rawdata = (found_bills.length > 0) ? cast(ubyte[])(response.serialize) : null; 273 274 writeit("WH: dart: res ", response.toPretty); 275 } 276 277 static void dart_handler(WebData* req, WebData* rep, void* ctx) { 278 279 thread_attachThis(); 280 281 int rc; 282 const size_t buflen = 1048576; 283 ubyte[1048576] buf; 284 ubyte[] docbuf; 285 ShellOptions* opt = cast(ShellOptions*) ctx; 286 if (req.type != "application/octet-stream") { 287 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 288 rep.msg = "invalid data type"; 289 return; 290 } 291 292 const dart_addr = opt.node_dart_addr; 293 294 writeit(format("WH: dart: with %d bytes for %s", req.rawdata.length, dart_addr)); 295 NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_REQ); 296 s.recvtimeout = 60_000.msecs; 297 while (true) { 298 rc = s.dial(dart_addr); 299 if (rc == 0) 300 break; 301 } 302 scope (exit) { 303 s.close(); 304 } 305 rc = s.send(req.rawdata); 306 if (rc != 0) { 307 writeit("dart_handler: error on send: ", nng_errstr(rc)); 308 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 309 rep.msg = "socket error"; 310 return; 311 } 312 writeit(format("WH: dart: sent %d bytes", req.rawdata.length)); 313 size_t len = 0, doclen = 0; 314 do { 315 len = s.receivebuf(buf, buflen); 316 if (len == size_t.max && s.errno != 0) { 317 writeit("dart_handler: error on recv: ", nng_errstr(s.errno)); 318 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 319 rep.msg = "socket error"; 320 return; 321 } 322 if (len > buflen) { 323 writeit("dart_handler: recv wrong size: ", len); 324 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 325 rep.msg = "socket error"; 326 return; 327 } 328 writeit(format("WH: dart: received %d bytes", len)); 329 docbuf ~= buf[0 .. len]; 330 doclen += len; 331 } 332 while (len > buflen - 1); 333 rep.status = (doclen > 0) ? nng_http_status.NNG_HTTP_STATUS_OK : nng_http_status.NNG_HTTP_STATUS_NO_CONTENT; 334 rep.type = "applicaion/octet-stream"; 335 rep.rawdata = (doclen > 0) ? docbuf[0 .. doclen] : null; 336 } 337 338 static void i2p_handler(WebData* req, WebData* rep, void* ctx) { 339 340 thread_attachThis(); 341 rt_moduleTlsCtor(); 342 343 int rc; 344 ShellOptions* opt = cast(ShellOptions*) ctx; 345 if (req.type != "application/octet-stream") { 346 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 347 rep.msg = "invalid data type"; 348 return; 349 } 350 writeit(format("WH: invoice2pay: with %d bytes", req.rawdata.length)); 351 352 WalletOptions options; 353 auto wallet_config_file = opt.default_i2p_wallet; 354 if (wallet_config_file.exists) { 355 options.load(wallet_config_file); 356 } 357 else { 358 writeit("i2p: invalid wallet config: " ~ opt.default_i2p_wallet); 359 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 360 rep.msg = "invalid wallet config"; 361 return; 362 } 363 auto wallet_interface = WalletInterface(options); 364 365 if (!wallet_interface.load) { 366 writeit("i2p: Wallet does not exist"); 367 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 368 rep.msg = "wallet does not exist"; 369 return; 370 } 371 const flag = wallet_interface.secure_wallet.login(opt.default_i2p_wallet_pin); 372 if (!flag) { 373 writeit("i2p: Wallet wrong pincode"); 374 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 375 rep.msg = "Faucet invalid pin code"; 376 return; 377 } 378 379 if (!wallet_interface.secure_wallet.isLoggedin) { 380 writeit("i2p: invalid wallet login"); 381 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 382 rep.msg = "invalid wallet login"; 383 return; 384 } 385 386 writeit("Before creating of invoices"); 387 388 Document[] requests_to_pay; 389 requests_to_pay ~= Document(cast(immutable(ubyte[])) req.rawdata); 390 TagionBill[] to_pay; 391 import tagion.hibon.HiBONRecord; 392 393 foreach (doc; requests_to_pay) { 394 if (doc.valid != Document.Element.ErrorCode.NONE) { 395 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 396 rep.msg = "invalid document: "; 397 writeln("i2p: invalid document"); 398 return; 399 } 400 if (doc.isRecord!TagionBill) { 401 to_pay ~= TagionBill(doc); 402 } 403 else if (doc.isRecord!Invoice) { 404 import tagion.utils.StdTime : currentTime; 405 406 auto read_invoice = Invoice(doc); 407 to_pay ~= TagionBill(read_invoice.amount, currentTime, read_invoice.pkey, Buffer.init); 408 } 409 else { 410 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 411 rep.msg = "invalid faucet request"; 412 return; 413 } 414 } 415 416 writeit(to_pay[0].toPretty); 417 418 SignedContract signed_contract; 419 TagionCurrency fees; 420 const payment_status = wallet_interface.secure_wallet.createPayment(to_pay, signed_contract, fees); 421 if (!payment_status.value) { 422 writeit("i2p: faucet is empty"); 423 rep.status = nng_http_status.NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR; 424 rep.msg = format("faucet createPayment error: %s", payment_status.msg); 425 return; 426 } 427 428 writeit(signed_contract.toPretty); 429 430 const message = wallet_interface.secure_wallet.net.calcHash(signed_contract); 431 const contract_net = wallet_interface.secure_wallet.net.derive(message); 432 const hirpc = HiRPC(contract_net); 433 const hirpc_submit = hirpc.submit(signed_contract); 434 wallet_interface.secure_wallet.account.hirpcs ~= hirpc_submit.toDoc; 435 436 auto receiver = sendSubmitHiRPC(options.contract_address, hirpc_submit, contract_net); 437 wallet_interface.save(false); 438 439 writeit("i2p: payment sent"); 440 rep.status = nng_http_status.NNG_HTTP_STATUS_OK; 441 rep.type = "applicaion/octet-stream"; 442 rep.rawdata = cast(ubyte[])(receiver.toDoc.serialize); 443 } 444 445 static void sysinfo_handler(WebData* req, WebData* rep, void* ctx) { 446 thread_attachThis(); 447 JSONValue data = parseJSON("{}"); 448 data["memsize"] = getmemstatus(); 449 rep.status = nng_http_status.NNG_HTTP_STATUS_OK; 450 rep.type = "application/json"; 451 rep.json = data; 452 } 453 454 int _main(string[] args) { 455 immutable program = args[0]; 456 bool version_switch; 457 GetoptResult main_args; 458 459 ShellOptions options; 460 461 long sz, isz; 462 463 auto config_file = "shell.json"; 464 if (config_file.exists) { 465 options.load(config_file); 466 } 467 else { 468 options.setDefault; 469 } 470 string address; 471 472 try { 473 main_args = getopt(args, std.getopt.config.caseSensitive, 474 std.getopt.config.bundling, 475 "version", "display the version", &version_switch, 476 ); 477 } 478 catch (GetOptException e) { 479 stderr.writeit(e.msg); 480 return 1; 481 } 482 483 // if (address !is address.init) { 484 // options.shell_uri = address; 485 486 // } 487 488 if (version_switch) { 489 revision_text.writeit; 490 return 0; 491 } 492 if (main_args.helpWanted) { 493 const option_info = format("%s [<option>...] <config.json> <files>", program); 494 495 defaultGetoptPrinter( 496 [ 497 // format("%s version %s", program, REVNO), 498 "Documentation: https://tagion.org/", 499 "", 500 "Usage:", 501 format("%s [<option>...] <config.json> <files>", program), 502 "", 503 "<option>:", 504 505 ].join("\n"), 506 main_args.options); 507 return 0; 508 } 509 510 //auto ds_tid = spawn(&dart_worker, options); 511 512 writeit("\nTagionShell web service\nListening at " 513 ~ options.shell_uri ~ "\n\t" 514 ~ options.shell_api_prefix 515 ~ options.contract_endpoint 516 ~ "\t= POST contract hibon\n\t" 517 ~ options.shell_api_prefix 518 ~ options.dart_endpoint 519 ~ "\t\t= POST dart request hibon\n\t" 520 ~ options.shell_api_prefix 521 ~ options.i2p_endpoint 522 ~ "\t= POST invoice-to-pay hibon\n\t" 523 ~ options 524 .shell_api_prefix 525 ~ options.bullseye_endpoint 526 ~ "\t= GET dart bullseye hibon\n\t" 527 ~ options.shell_api_prefix 528 ~ options.sysinfo_endpoint 529 ~ "\t\t= GET system info\n\t" 530 531 ); 532 533 isz = getmemstatus(); 534 535 appoint: 536 537 WebApp app = WebApp("ShellApp", options.shell_uri, parseJSON(`{"root_path":"/tmp/webapp","static_path":"static"}`), &options); 538 539 app.route(options.shell_api_prefix ~ options.sysinfo_endpoint, &sysinfo_handler, ["GET"]); 540 app.route(options.shell_api_prefix ~ options.bullseye_endpoint, &bullseye_handler, ["GET"]); 541 app.route(options.shell_api_prefix ~ options.contract_endpoint, &contract_handler, ["POST"]); 542 app.route(options.shell_api_prefix ~ options.dart_endpoint, &dart_handler, ["POST"]); 543 app.route(options.shell_api_prefix ~ options.dartcache_endpoint, &dartcache_handler, ["POST"]); 544 app.route(options.shell_api_prefix ~ options.i2p_endpoint, &i2p_handler, ["POST"]); 545 546 app.start(); 547 548 while (true) { 549 nng_sleep(2000.msecs); 550 version (none) { 551 sz = getmemstatus(); 552 writeln("mem: ", sz); 553 if (sz > isz * 2) { 554 writeln("Reset app!"); 555 app.stop; 556 destroy(app); 557 goto appoint; 558 } 559 } 560 561 } 562 563 return 0; 564 }