1 module tagion.testbench.hirpcserver;
2 
3 import std.algorithm : remove;
4 import std.conv : to;
5 import std.format;
6 import std.socket : InternetAddress, Socket, SocketException, SocketSet, TcpSocket;
7 
8 // import std.stdio : writeln, writefln;
9 import std.stdio;
10 import tagion.Keywords;
11 import tagion.basic.Types : Buffer;
12 import tagion.communication.HiRPC;
13 import tagion.gossip.GossipNet;
14 import tagion.hibon.HiBON;
15 import tagion.tools.Basic;
16 
17 version (none) class HRPCNet : StdSecureNet {
18     import tagion.hashgraph.HashGraph;
19 
20     override void request(HashGraph hashgraph, immutable(ubyte[]) fingerprint) {
21         assert(0, format("Not implemented %s", __PRETTY_FUNCTION__));
22     }
23 
24     this(string passphrase) {
25         import tagion.crypto.secp256k1.NativeSecp256k1;
26 
27         super(new NativeSecp256k1(NativeSecp256k1.Format.AUTO, NativeSecp256k1.Format.COMPACT));
28         generateKeyPair(passphrase);
29         writefln("Pubkey %s:%d", (cast(Buffer) pubkey).toHexString!true, pubkey.length);
30     }
31 }
32 
33 import file = std.file;
34 import std.file : exists, tempDir;
35 import std.path;
36 import std.stdio;
37 
38 version (none) struct Bank {
39     protected enum _transactions = [
40             "accountCreate",
41             "accountGetHistoryExchange",
42             "accountMakeTransfer",
43             "accountMakeExchange"
44         ];
45 
46     import std.traits : EnumMembers;
47     import tagion.Base : EnumText;
48 
49     mixin(EnumText!("Transactions", _transactions));
50 
51     protected enum _params = [
52             "account",
53             "amount"
54         ];
55 
56     mixin(EnumText!("Params", _params));
57 
58     alias HRPCSender = HRPC.HRPCSender;
59     alias HRPCReceiver = HRPC.HRPCReceiver;
60     HRPC hrpc;
61 
62     static immutable(Buffer) getAccount(ref const(HRPCReceiver) received) {
63         HRPC.check_element!Buffer(received.params, Params.account);
64         return received.params[Params.account].get!Buffer;
65     }
66 
67     enum EXT = "bson";
68     string filename(immutable(Buffer) account) {
69         return tempDir.buildPath(setExtension(account.toHexString, EXT));
70     }
71 
72     // const(HRPCSender) opDispatch(string method)(ref const(HRPCReceiver) received) isValid!Transactions, method)  {
73     //     enum code=format("%s(%s)", method, read_only);
74     //     mixin(code);
75     // }
76 
77     const(HRPCSender) accountCreate(ref const(HRPCReceiver) received) {
78         immutable account = getAccount(received);
79         immutable filename = filename(account);
80         if (filename.exists) {
81             auto bson_data = new HBSON;
82             bson_data[Params.account] = received.params[Params.account].get!Buffer;
83             return hrpc.error(received, format("Account %s already exists", account.toHexString), -17, bson_data);
84         }
85         auto bson_account = new HBSON;
86         // FixMe: CBR amount must be a uint NOT int
87         // Standard BSON does not support uint
88         //        bson_account[Params.amount]=0u;
89         bson_account[Params.amount] = 0;
90         file.write(filename, bson_account.serialize);
91         return hrpc.result(received, bson_account);
92     }
93 
94     const(HRPCSender) accountGetHistoryExchange(ref const(HRPCReceiver) received) {
95         immutable account = getAccount(received);
96         return hrpc.error(received, format("Not implemented yet %s", __PRETTY_FUNCTION__), -17);
97     }
98 
99     const(HRPCSender) accountMakeTransfer(ref const(HRPCReceiver) received) {
100         immutable account = getAccount(received);
101         return hrpc.error(received, format("Not implemented yet %s", __PRETTY_FUNCTION__), -17);
102     }
103 
104     const(HRPCSender) accountMakeExchange(ref const(HRPCReceiver) received) {
105         immutable account = getAccount(received);
106         return hrpc.error(received, format("Not implemented yet %s", __PRETTY_FUNCTION__), -17);
107     }
108 
109     /++
110      Executes the transaction method
111 
112      Params: received contains valid method and a params object
113      Returns: Returns a HRPC either an result or an error
114      +/
115     const(HRPCSender) opCall(ref const(HRPCReceiver) received) {
116         if (!received.params.empty) {
117             if (received.params.hasElement("id")) {
118                 immutable message = format("The parameter 'id' should be called 'account' instead", received
119                         .message.method);
120                 return hrpc.error(received, message, 42);
121             }
122         }
123         switch (received.message.method) {
124             static foreach (method; EnumMembers!Transactions) {
125                 mixin(format("case Transactions.%s: return %s(received);", method, method));
126             }
127         default:
128             immutable message = format("Method '%s' not supported", received.message.method);
129             return hrpc.error(received, message, 22);
130         }
131         assert(0);
132     }
133 }
134 
135 mixin Main!_main;
136 
137 int _main(string[] args) {
138     version (none) {
139         ushort port;
140         enum BUFFER_SIZE = 1024;
141         if (args.length >= 2) {
142             port = to!ushort(args[1]);
143         }
144         else {
145             port = 4444;
146         }
147 
148         auto listener = new TcpSocket();
149         assert(listener.isAlive);
150         listener.blocking = false;
151         listener.bind(new InternetAddress(port));
152         listener.listen(10);
153 
154         writefln("Listening on port %d.", port);
155         stdout.flush;
156         enum MAX_CONNECTIONS = 60;
157         // Room for listener.
158         auto socketSet = new SocketSet(MAX_CONNECTIONS + 1);
159         Socket[] reads;
160 
161         HRPC hrpc;
162         immutable passphrase = "Very secret password for the server";
163         hrpc.net = new HRPCNet(passphrase);
164         Bank bank;
165         bank.hrpc = hrpc;
166         while (true) {
167             socketSet.add(listener);
168 
169             foreach (sock; reads) {
170                 socketSet.add(sock);
171             }
172 
173             Socket.select(socketSet, null, null);
174 
175             for (size_t i = 0; i < reads.length; i++) {
176                 if (socketSet.isSet(reads[i])) {
177                     ubyte[BUFFER_SIZE] buf;
178                     auto datLength = reads[i].receive(buf[]);
179 
180                     if (datLength == Socket.ERROR) {
181                         writeln("Connection error.");
182                     }
183                     else if (datLength != 0) {
184 
185                         writefln("\nReceived %d bytes from %s", datLength, reads[i].remoteAddress()
186                                 .toString());
187                         const(HRPC.HRPCReceiver)* ref_received;
188                         auto doc = Document(buf[0 .. datLength].idup);
189                         writeln(doc.toText);
190                         stdout.flush;
191                         try {
192                             auto received = hrpc.receive(doc);
193 
194                             ref_received = &received;
195                             if (received.verified) {
196                                 writeln("Message is verified and signed");
197                             }
198                             else {
199                                 writeln("Message is not signed");
200                             }
201                             version (none) {
202 
203                                 auto bson_result = new HBSON;
204                                 bson_result[Keywords.method] = received.message.method;
205                                 bson_result[Keywords.params] = received.params;
206                                 bson_result["signed"] = received.verified;
207                                 auto message_doc = doc[Keywords.message].get!Document;
208                                 immutable hash = hrpc.net.calcHash(message_doc.data);
209                                 bson_result["hash"] = hash;
210                             }
211                             auto sender = bank(received);
212                             immutable buffer = hrpc.toBSON(sender).serialize;
213 
214                             reads[i].send(buffer);
215                             writeln("\nResonse:");
216                             auto sender_doc = Document(buffer);
217                             writefln(sender_doc.toText);
218                             stdout.flush;
219 
220                         }
221                         catch (Exception e) {
222                             auto bson_data = new HBSON;
223                             bson_data["stack"] = e.msg;
224                             if ((ref_received) && !ref_received.empty) {
225                                 auto error_sender = hrpc.error(*ref_received, e.msg, 666, bson_data);
226                                 immutable error_buffer = hrpc.toBSON(error_sender).serialize;
227                                 reads[i].send(error_buffer);
228                                 writeln("\nError:");
229                                 auto error_doc = Document(error_buffer);
230                                 writeln(error_doc.toText);
231                                 stdout.flush;
232                             }
233                             else {
234                                 auto error_sender = hrpc.error(e.msg, 42);
235                                 immutable error_buffer = hrpc.toBSON(error_sender).serialize;
236                                 reads[i].send(error_buffer);
237                                 writeln("\nError:");
238                                 auto error_doc = Document(error_buffer);
239                                 writeln(error_doc.toText);
240                                 stdout.flush;
241                             }
242                         }
243 
244                         continue;
245                     }
246                     else {
247                         try {
248                             // if the connection closed due to an error, remoteAddress() could fail
249                             writefln("Connection from %s closed.", reads[i].remoteAddress().toString());
250                         }
251                         catch (SocketException) {
252                             writeln("Connection closed.");
253                         }
254                     }
255 
256                     // release socket resources now
257                     reads[i].close();
258 
259                     reads = reads.remove(i);
260                     // i will be incremented by the for, we don't want it to be.
261                     i--;
262 
263                     writefln("\tTotal connections: %d", reads.length);
264                     stdout.flush;
265                 }
266             }
267 
268             if (socketSet.isSet(listener)) { // connection request
269                 Socket sn = null;
270                 scope (failure) {
271                     writefln("Error accepting");
272                     if (sn) {
273                         sn.close();
274                     }
275                 }
276                 sn = listener.accept();
277                 assert(sn.isAlive);
278                 assert(listener.isAlive);
279 
280                 if (reads.length < MAX_CONNECTIONS) {
281                     writefln("Connection from %s established.", sn.remoteAddress().toString());
282                     reads ~= sn;
283                     writefln("\tTotal connections: %d", reads.length);
284                 }
285                 else {
286                     writefln("Rejected connection from %s; too many connections.", sn.remoteAddress()
287                             .toString());
288                     sn.close();
289                     assert(!sn.isAlive);
290                     assert(listener.isAlive);
291                 }
292             }
293 
294             socketSet.reset();
295         }
296         stdout.flush;
297     }
298     return 0;
299 }