1 module tagion.tools.tagionshell;
2 
3 import core.time;
4 import std.algorithm;
5 import std.array;
6 import std.concurrency;
7 import std.conv;
8 import std.exception;
9 import std.file : exists;
10 import std.format;
11 import std.getopt;
12 import std.json;
13 import std.stdio : File, stderr, stdout, writefln, writeln;
14 import tagion.basic.Types : Buffer, FileExtension, hasExtension;
15 import tagion.basic.range : doFront;
16 import tagion.communication.HiRPC;
17 import tagion.crypto.SecureInterfaceNet;
18 import tagion.crypto.SecureNet;
19 import tagion.hibon.Document;
20 import tagion.hibon.HiBON;
21 import tagion.hibon.HiBONFile : fread, fwrite;
22 import tagion.hibon.HiBONRecord : isRecord;
23 import tagion.script.TagionCurrency;
24 import tagion.script.common;
25 import tagion.tools.Basic;
26 import tagion.tools.revision;
27 import tagion.tools.shell.shelloptions;
28 import tagion.tools.wallet.WalletInterface;
29 import tagion.tools.wallet.WalletOptions;
30 import tagion.utils.StdTime : currentTime;
31 import tagion.wallet.AccountDetails;
32 import tagion.wallet.SecureWallet;
33 import tagion.utils.LRUT;
34 
35 import core.thread;
36 import nngd.nngd;
37 
38 mixin Main!(_main, "shell");
39 
40 alias DartCache = LRUT!(Buffer, TagionBill);
41 
42 long getmemstatus() {
43     long sz = -1;
44     auto f = File("/proc/self/status", "rt");
45     foreach (line; f.byLine) {
46         if (line.startsWith("VmRSS")) {
47             sz = to!long(line.split()[1]);
48             break;
49         }
50     }
51     f.close();
52     return sz;
53 }
54 
55 void writeit(A...)(A a) {
56     writeln(a);
57     stdout.flush();
58 }
59 
60 void dart_worker(ShellOptions opt) {
61     int rc;
62     NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_SUB);
63     s.recvtimeout = msecs(1000);
64     s.subscribe("");
65     writeit("DS: subscribed");
66     while (true) {
67         rc = s.dial(opt.tagion_subscription_addr);
68         if (rc == 0)
69             break;
70     }
71     scope (exit) {
72         s.close;
73     }
74     writeit("DS: connected");
75     while (true) {
76         Document received_doc = s.receive!(immutable(ubyte[]))();
77         writeit(format("DS: received %d bytes", received_doc.length));
78     }
79 }
80 
81 void contract_handler(WebData* req, WebData* rep, void* ctx) {
82 
83     thread_attachThis();
84 
85     int rc;
86     ShellOptions* opt = cast(ShellOptions*) ctx;
87     if (req.type != "application/octet-stream") {
88         rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
89         rep.msg = "invalid data type";
90         return;
91     }
92 
93     const contract_addr = opt.node_contract_addr;
94 
95     writeit(format("WH: contract: with %d bytes for %s", req.rawdata.length, contract_addr));
96     NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_REQ);
97     s.recvtimeout = msecs(10000);
98     writeit(format("WH: contract: trying to dial %s", contract_addr));
99     while (true) {
100         rc = s.dial(contract_addr);
101         if (rc == 0)
102             break;
103     }
104     scope (exit) {
105         s.close();
106     }
107     rc = s.send(req.rawdata);
108     if (rc != 0) {
109         writeit("contract_handler: send: ", nng_errstr(s.errno));
110         rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
111         rep.msg = "socket error";
112         return;
113     }
114     ubyte[4096] buf;
115     size_t len = s.receivebuf(buf, 4096);
116     if (len == size_t.max && s.errno != 0) {
117         writeit("contract_handler: recv: ", nng_errstr(s.errno));
118         rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
119         rep.msg = "socket error";
120         return;
121     }
122     writeit(format("WH: dart: received %d bytes", len));
123     rep.status = (len > 0) ? nng_http_status.NNG_HTTP_STATUS_OK : nng_http_status.NNG_HTTP_STATUS_NO_CONTENT;
124     rep.type = "applicaion/octet-stream";
125     rep.rawdata = (len > 0) ? buf[0 .. len] : null;
126 }
127 
128 import crud = tagion.dart.DARTcrud;
129 
130 static void bullseye_handler(WebData* req, WebData* rep, void* ctx) {
131 
132     thread_attachThis();
133 
134     NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_REQ);
135 
136     int rc;
137     ShellOptions* opt = cast(ShellOptions*) ctx;
138     while (true) {
139         rc = s.dial(opt.node_dart_addr);
140         if (rc == 0)
141             break;
142     }
143     scope (exit) {
144         s.close();
145     }
146 
147     rc = s.send(crud.dartBullseye.toDoc.serialize);
148     ubyte[192] buf;
149     size_t len = s.receivebuf(buf, buf.length);
150     if (len == size_t.max && s.errno != 0) {
151         writeit("bullseye_handler: recv: ", nng_errstr(s.errno));
152         rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
153         rep.msg = "socket error";
154         return;
155     }
156 
157     const receiver = HiRPC(null).receive(Document(buf.idup));
158     if (!receiver.isResponse) {
159         rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
160         rep.msg = "response error";
161         return;
162     }
163 
164     const dartindex = parseJSON(receiver.response.toPretty);
165 
166     rep.status = (len > 0) ? nng_http_status.NNG_HTTP_STATUS_OK : nng_http_status.NNG_HTTP_STATUS_NO_CONTENT;
167     rep.type = "application/json";
168     rep.json = dartindex;
169 }
170 
171 static void dartcache_handler(WebData* req, WebData* rep, void* ctx) {
172 
173     thread_attachThis();
174 
175     int rc;
176     const size_t buflen = 1048576;
177     ubyte[1048576] buf;
178     immutable(ubyte)[] docbuf;
179 
180     ShellOptions* opt = cast(ShellOptions*) ctx;
181     if (req.type != "application/octet-stream") {
182         rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
183         rep.msg = "invalid data type";
184         return;
185     }
186 
187     SecureNet net = new StdSecureNet();
188     net.generateKeyPair("very_secret");
189     HiRPC hirpc = HiRPC(net);
190     Document doc = Document(cast(immutable(ubyte[])) req.rawdata);
191     immutable receiver = hirpc.receive(doc);
192     auto pkey_doc = receiver.method.params;
193     Buffer[] owner_pkeys;
194     foreach (owner; pkey_doc[]) {
195         owner_pkeys ~= owner.get!Buffer;
196     }
197 
198     TagionBill[] found_bills;
199 
200     // to chache
201 
202     if (!found_bills.empty) {
203         foreach (bill; found_bills) {
204             remove!(x => x == bill.owner)(owner_pkeys);
205         }
206     }
207 
208     if (!owner_pkeys.empty) {
209         auto dreq = new HiBON;
210         dreq = owner_pkeys;
211 
212         NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_REQ);
213         s.recvtimeout = 60_000.msecs;
214         while (true) {
215             rc = s.dial(opt.node_dart_addr);
216             if (rc == 0)
217                 break;
218         }
219         scope (exit) {
220             s.close();
221         }
222 
223         rc = s.send(cast(ubyte[])(hirpc.search(dreq).toDoc.serialize));
224 
225         if (rc != 0) {
226             writeit("dart_handler: send: ", nng_errstr(rc));
227             rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
228             rep.msg = "socket error";
229             return;
230         }
231 
232         size_t len = 0, doclen = 0;
233         do {
234             len = s.receivebuf(buf, buflen);
235             if (len == size_t.max && s.errno != 0) {
236                 writeit("dart_handler: recv: ", nng_errstr(s.errno));
237                 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
238                 rep.msg = "socket error";
239                 return;
240             }
241             if (len > buflen) {
242                 writeit("dart_handler: recv wrong size: ", len);
243                 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
244                 rep.msg = "socket error";
245                 return;
246             }
247             writeit(format("WH: dart: received %d bytes", len));
248             docbuf ~= buf[0 .. len];
249             doclen += len;
250         }
251         while (len > buflen - 1);
252 
253         const repdoc = Document(docbuf);
254         immutable repreceiver = hirpc.receive(repdoc);
255         found_bills ~= repreceiver.response.result[]
256             .map!(e => TagionBill(e.get!Document))
257             .array;
258 
259     }
260 
261     HiBON params = new HiBON;
262 
263     foreach (i, bill; found_bills) {
264         params[i] = bill.toHiBON;
265     }
266 
267     Document response = hirpc.result(receiver, params).toDoc;
268 
269     rep.status = (found_bills.length > 0) ? nng_http_status.NNG_HTTP_STATUS_OK : nng_http_status
270         .NNG_HTTP_STATUS_NO_CONTENT;
271     rep.type = "applicaion/octet-stream";
272     rep.rawdata = (found_bills.length > 0) ? cast(ubyte[])(response.serialize) : null;
273 
274     writeit("WH: dart: res ", response.toPretty);
275 }
276 
277 static void dart_handler(WebData* req, WebData* rep, void* ctx) {
278 
279     thread_attachThis();
280 
281     int rc;
282     const size_t buflen = 1048576;
283     ubyte[1048576] buf;
284     ubyte[] docbuf;
285     ShellOptions* opt = cast(ShellOptions*) ctx;
286     if (req.type != "application/octet-stream") {
287         rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
288         rep.msg = "invalid data type";
289         return;
290     }
291 
292     const dart_addr = opt.node_dart_addr;
293 
294     writeit(format("WH: dart: with %d bytes for %s", req.rawdata.length, dart_addr));
295     NNGSocket s = NNGSocket(nng_socket_type.NNG_SOCKET_REQ);
296     s.recvtimeout = 60_000.msecs;
297     while (true) {
298         rc = s.dial(dart_addr);
299         if (rc == 0)
300             break;
301     }
302     scope (exit) {
303         s.close();
304     }
305     rc = s.send(req.rawdata);
306     if (rc != 0) {
307         writeit("dart_handler: error on send: ", nng_errstr(rc));
308         rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
309         rep.msg = "socket error";
310         return;
311     }
312     writeit(format("WH: dart: sent %d bytes", req.rawdata.length));
313     size_t len = 0, doclen = 0;
314     do {
315         len = s.receivebuf(buf, buflen);
316         if (len == size_t.max && s.errno != 0) {
317             writeit("dart_handler: error on recv: ", nng_errstr(s.errno));
318             rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
319             rep.msg = "socket error";
320             return;
321         }
322         if (len > buflen) {
323             writeit("dart_handler: recv wrong size: ", len);
324             rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
325             rep.msg = "socket error";
326             return;
327         }
328         writeit(format("WH: dart: received %d bytes", len));
329         docbuf ~= buf[0 .. len];
330         doclen += len;
331     }
332     while (len > buflen - 1);
333     rep.status = (doclen > 0) ? nng_http_status.NNG_HTTP_STATUS_OK : nng_http_status.NNG_HTTP_STATUS_NO_CONTENT;
334     rep.type = "applicaion/octet-stream";
335     rep.rawdata = (doclen > 0) ? docbuf[0 .. doclen] : null;
336 }
337 
338 static void i2p_handler(WebData* req, WebData* rep, void* ctx) {
339 
340     thread_attachThis();
341     rt_moduleTlsCtor();
342 
343     int rc;
344     ShellOptions* opt = cast(ShellOptions*) ctx;
345     if (req.type != "application/octet-stream") {
346         rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
347         rep.msg = "invalid data type";
348         return;
349     }
350     writeit(format("WH: invoice2pay: with %d bytes", req.rawdata.length));
351 
352     WalletOptions options;
353     auto wallet_config_file = opt.default_i2p_wallet;
354     if (wallet_config_file.exists) {
355         options.load(wallet_config_file);
356     }
357     else {
358         writeit("i2p: invalid wallet config: " ~ opt.default_i2p_wallet);
359         rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
360         rep.msg = "invalid wallet config";
361         return;
362     }
363     auto wallet_interface = WalletInterface(options);
364 
365     if (!wallet_interface.load) {
366         writeit("i2p: Wallet does not exist");
367         rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
368         rep.msg = "wallet does not exist";
369         return;
370     }
371     const flag = wallet_interface.secure_wallet.login(opt.default_i2p_wallet_pin);
372     if (!flag) {
373         writeit("i2p: Wallet wrong pincode");
374         rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
375         rep.msg = "Faucet invalid pin code";
376         return;
377     }
378 
379     if (!wallet_interface.secure_wallet.isLoggedin) {
380         writeit("i2p: invalid wallet login");
381         rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
382         rep.msg = "invalid wallet login";
383         return;
384     }
385 
386     writeit("Before creating of invoices");
387 
388     Document[] requests_to_pay;
389     requests_to_pay ~= Document(cast(immutable(ubyte[])) req.rawdata);
390     TagionBill[] to_pay;
391     import tagion.hibon.HiBONRecord;
392 
393     foreach (doc; requests_to_pay) {
394         if (doc.valid != Document.Element.ErrorCode.NONE) {
395             rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
396             rep.msg = "invalid document: ";
397             writeln("i2p: invalid document");
398             return;
399         }
400         if (doc.isRecord!TagionBill) {
401             to_pay ~= TagionBill(doc);
402         }
403         else if (doc.isRecord!Invoice) {
404             import tagion.utils.StdTime : currentTime;
405 
406             auto read_invoice = Invoice(doc);
407             to_pay ~= TagionBill(read_invoice.amount, currentTime, read_invoice.pkey, Buffer.init);
408         }
409         else {
410             rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST;
411             rep.msg = "invalid faucet request";
412             return;
413         }
414     }
415 
416     writeit(to_pay[0].toPretty);
417 
418     SignedContract signed_contract;
419     TagionCurrency fees;
420     const payment_status = wallet_interface.secure_wallet.createPayment(to_pay, signed_contract, fees);
421     if (!payment_status.value) {
422         writeit("i2p: faucet is empty");
423         rep.status = nng_http_status.NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR;
424         rep.msg = format("faucet createPayment error: %s", payment_status.msg);
425         return;
426     }
427 
428     writeit(signed_contract.toPretty);
429 
430     const message = wallet_interface.secure_wallet.net.calcHash(signed_contract);
431     const contract_net = wallet_interface.secure_wallet.net.derive(message);
432     const hirpc = HiRPC(contract_net);
433     const hirpc_submit = hirpc.submit(signed_contract);
434     wallet_interface.secure_wallet.account.hirpcs ~= hirpc_submit.toDoc;
435 
436     auto receiver = sendSubmitHiRPC(options.contract_address, hirpc_submit, contract_net);
437     wallet_interface.save(false);
438 
439     writeit("i2p: payment sent");
440     rep.status = nng_http_status.NNG_HTTP_STATUS_OK;
441     rep.type = "applicaion/octet-stream";
442     rep.rawdata = cast(ubyte[])(receiver.toDoc.serialize);
443 }
444 
445 static void sysinfo_handler(WebData* req, WebData* rep, void* ctx) {
446     thread_attachThis();
447     JSONValue data = parseJSON("{}");
448     data["memsize"] = getmemstatus();
449     rep.status = nng_http_status.NNG_HTTP_STATUS_OK;
450     rep.type = "application/json";
451     rep.json = data;
452 }
453 
454 int _main(string[] args) {
455     immutable program = args[0];
456     bool version_switch;
457     GetoptResult main_args;
458 
459     ShellOptions options;
460 
461     long sz, isz;
462 
463     auto config_file = "shell.json";
464     if (config_file.exists) {
465         options.load(config_file);
466     }
467     else {
468         options.setDefault;
469     }
470     string address;
471 
472     try {
473         main_args = getopt(args, std.getopt.config.caseSensitive,
474                 std.getopt.config.bundling,
475                 "version", "display the version", &version_switch,
476         );
477     }
478     catch (GetOptException e) {
479         stderr.writeit(e.msg);
480         return 1;
481     }
482 
483     // if (address !is address.init) {
484     //     options.shell_uri = address;
485 
486     // }
487 
488     if (version_switch) {
489         revision_text.writeit;
490         return 0;
491     }
492     if (main_args.helpWanted) {
493         const option_info = format("%s [<option>...] <config.json> <files>", program);
494 
495         defaultGetoptPrinter(
496                 [
497             // format("%s version %s", program, REVNO),
498             "Documentation: https://tagion.org/",
499             "",
500             "Usage:",
501             format("%s [<option>...] <config.json> <files>", program),
502             "",
503             "<option>:",
504 
505         ].join("\n"),
506                 main_args.options);
507         return 0;
508     }
509 
510     //auto ds_tid = spawn(&dart_worker, options);
511 
512     writeit("\nTagionShell web service\nListening at "
513             ~ options.shell_uri ~ "\n\t"
514             ~ options.shell_api_prefix
515             ~ options.contract_endpoint
516             ~ "\t= POST contract hibon\n\t"
517             ~ options.shell_api_prefix
518             ~ options.dart_endpoint
519             ~ "\t\t= POST dart request hibon\n\t"
520             ~ options.shell_api_prefix
521             ~ options.i2p_endpoint
522             ~ "\t= POST invoice-to-pay hibon\n\t"
523             ~ options
524                 .shell_api_prefix
525                 ~ options.bullseye_endpoint
526                 ~ "\t= GET dart bullseye hibon\n\t"
527                 ~ options.shell_api_prefix
528                 ~ options.sysinfo_endpoint
529                 ~ "\t\t= GET system info\n\t"
530 
531     );
532 
533     isz = getmemstatus();
534 
535 appoint:
536 
537     WebApp app = WebApp("ShellApp", options.shell_uri, parseJSON(`{"root_path":"/tmp/webapp","static_path":"static"}`), &options);
538 
539     app.route(options.shell_api_prefix ~ options.sysinfo_endpoint, &sysinfo_handler, ["GET"]);
540     app.route(options.shell_api_prefix ~ options.bullseye_endpoint, &bullseye_handler, ["GET"]);
541     app.route(options.shell_api_prefix ~ options.contract_endpoint, &contract_handler, ["POST"]);
542     app.route(options.shell_api_prefix ~ options.dart_endpoint, &dart_handler, ["POST"]);
543     app.route(options.shell_api_prefix ~ options.dartcache_endpoint, &dartcache_handler, ["POST"]);
544     app.route(options.shell_api_prefix ~ options.i2p_endpoint, &i2p_handler, ["POST"]);
545 
546     app.start();
547 
548     while (true) {
549         nng_sleep(2000.msecs);
550         version (none) {
551             sz = getmemstatus();
552             writeln("mem: ", sz);
553             if (sz > isz * 2) {
554                 writeln("Reset app!");
555                 app.stop;
556                 destroy(app);
557                 goto appoint;
558             }
559         }
560 
561     }
562 
563     return 0;
564 }