1 module tagion.testbench.hirpcserver; 2 3 import std.algorithm : remove; 4 import std.conv : to; 5 import std.format; 6 import std.socket : InternetAddress, Socket, SocketException, SocketSet, TcpSocket; 7 8 // import std.stdio : writeln, writefln; 9 import std.stdio; 10 import tagion.Keywords; 11 import tagion.basic.Types : Buffer; 12 import tagion.communication.HiRPC; 13 import tagion.gossip.GossipNet; 14 import tagion.hibon.HiBON; 15 import tagion.tools.Basic; 16 17 version (none) class HRPCNet : StdSecureNet { 18 import tagion.hashgraph.HashGraph; 19 20 override void request(HashGraph hashgraph, immutable(ubyte[]) fingerprint) { 21 assert(0, format("Not implemented %s", __PRETTY_FUNCTION__)); 22 } 23 24 this(string passphrase) { 25 import tagion.crypto.secp256k1.NativeSecp256k1; 26 27 super(new NativeSecp256k1(NativeSecp256k1.Format.AUTO, NativeSecp256k1.Format.COMPACT)); 28 generateKeyPair(passphrase); 29 writefln("Pubkey %s:%d", (cast(Buffer) pubkey).toHexString!true, pubkey.length); 30 } 31 } 32 33 import file = std.file; 34 import std.file : exists, tempDir; 35 import std.path; 36 import std.stdio; 37 38 version (none) struct Bank { 39 protected enum _transactions = [ 40 "accountCreate", 41 "accountGetHistoryExchange", 42 "accountMakeTransfer", 43 "accountMakeExchange" 44 ]; 45 46 import std.traits : EnumMembers; 47 import tagion.Base : EnumText; 48 49 mixin(EnumText!("Transactions", _transactions)); 50 51 protected enum _params = [ 52 "account", 53 "amount" 54 ]; 55 56 mixin(EnumText!("Params", _params)); 57 58 alias HRPCSender = HRPC.HRPCSender; 59 alias HRPCReceiver = HRPC.HRPCReceiver; 60 HRPC hrpc; 61 62 static immutable(Buffer) getAccount(ref const(HRPCReceiver) received) { 63 HRPC.check_element!Buffer(received.params, Params.account); 64 return received.params[Params.account].get!Buffer; 65 } 66 67 enum EXT = "bson"; 68 string filename(immutable(Buffer) account) { 69 return tempDir.buildPath(setExtension(account.toHexString, EXT)); 70 } 71 72 // const(HRPCSender) opDispatch(string method)(ref const(HRPCReceiver) received) isValid!Transactions, method) { 73 // enum code=format("%s(%s)", method, read_only); 74 // mixin(code); 75 // } 76 77 const(HRPCSender) accountCreate(ref const(HRPCReceiver) received) { 78 immutable account = getAccount(received); 79 immutable filename = filename(account); 80 if (filename.exists) { 81 auto bson_data = new HBSON; 82 bson_data[Params.account] = received.params[Params.account].get!Buffer; 83 return hrpc.error(received, format("Account %s already exists", account.toHexString), -17, bson_data); 84 } 85 auto bson_account = new HBSON; 86 // FixMe: CBR amount must be a uint NOT int 87 // Standard BSON does not support uint 88 // bson_account[Params.amount]=0u; 89 bson_account[Params.amount] = 0; 90 file.write(filename, bson_account.serialize); 91 return hrpc.result(received, bson_account); 92 } 93 94 const(HRPCSender) accountGetHistoryExchange(ref const(HRPCReceiver) received) { 95 immutable account = getAccount(received); 96 return hrpc.error(received, format("Not implemented yet %s", __PRETTY_FUNCTION__), -17); 97 } 98 99 const(HRPCSender) accountMakeTransfer(ref const(HRPCReceiver) received) { 100 immutable account = getAccount(received); 101 return hrpc.error(received, format("Not implemented yet %s", __PRETTY_FUNCTION__), -17); 102 } 103 104 const(HRPCSender) accountMakeExchange(ref const(HRPCReceiver) received) { 105 immutable account = getAccount(received); 106 return hrpc.error(received, format("Not implemented yet %s", __PRETTY_FUNCTION__), -17); 107 } 108 109 /++ 110 Executes the transaction method 111 112 Params: received contains valid method and a params object 113 Returns: Returns a HRPC either an result or an error 114 +/ 115 const(HRPCSender) opCall(ref const(HRPCReceiver) received) { 116 if (!received.params.empty) { 117 if (received.params.hasElement("id")) { 118 immutable message = format("The parameter 'id' should be called 'account' instead", received 119 .message.method); 120 return hrpc.error(received, message, 42); 121 } 122 } 123 switch (received.message.method) { 124 static foreach (method; EnumMembers!Transactions) { 125 mixin(format("case Transactions.%s: return %s(received);", method, method)); 126 } 127 default: 128 immutable message = format("Method '%s' not supported", received.message.method); 129 return hrpc.error(received, message, 22); 130 } 131 assert(0); 132 } 133 } 134 135 mixin Main!_main; 136 137 int _main(string[] args) { 138 version (none) { 139 ushort port; 140 enum BUFFER_SIZE = 1024; 141 if (args.length >= 2) { 142 port = to!ushort(args[1]); 143 } 144 else { 145 port = 4444; 146 } 147 148 auto listener = new TcpSocket(); 149 assert(listener.isAlive); 150 listener.blocking = false; 151 listener.bind(new InternetAddress(port)); 152 listener.listen(10); 153 154 writefln("Listening on port %d.", port); 155 stdout.flush; 156 enum MAX_CONNECTIONS = 60; 157 // Room for listener. 158 auto socketSet = new SocketSet(MAX_CONNECTIONS + 1); 159 Socket[] reads; 160 161 HRPC hrpc; 162 immutable passphrase = "Very secret password for the server"; 163 hrpc.net = new HRPCNet(passphrase); 164 Bank bank; 165 bank.hrpc = hrpc; 166 while (true) { 167 socketSet.add(listener); 168 169 foreach (sock; reads) { 170 socketSet.add(sock); 171 } 172 173 Socket.select(socketSet, null, null); 174 175 for (size_t i = 0; i < reads.length; i++) { 176 if (socketSet.isSet(reads[i])) { 177 ubyte[BUFFER_SIZE] buf; 178 auto datLength = reads[i].receive(buf[]); 179 180 if (datLength == Socket.ERROR) { 181 writeln("Connection error."); 182 } 183 else if (datLength != 0) { 184 185 writefln("\nReceived %d bytes from %s", datLength, reads[i].remoteAddress() 186 .toString()); 187 const(HRPC.HRPCReceiver)* ref_received; 188 auto doc = Document(buf[0 .. datLength].idup); 189 writeln(doc.toText); 190 stdout.flush; 191 try { 192 auto received = hrpc.receive(doc); 193 194 ref_received = &received; 195 if (received.verified) { 196 writeln("Message is verified and signed"); 197 } 198 else { 199 writeln("Message is not signed"); 200 } 201 version (none) { 202 203 auto bson_result = new HBSON; 204 bson_result[Keywords.method] = received.message.method; 205 bson_result[Keywords.params] = received.params; 206 bson_result["signed"] = received.verified; 207 auto message_doc = doc[Keywords.message].get!Document; 208 immutable hash = hrpc.net.calcHash(message_doc.data); 209 bson_result["hash"] = hash; 210 } 211 auto sender = bank(received); 212 immutable buffer = hrpc.toBSON(sender).serialize; 213 214 reads[i].send(buffer); 215 writeln("\nResonse:"); 216 auto sender_doc = Document(buffer); 217 writefln(sender_doc.toText); 218 stdout.flush; 219 220 } 221 catch (Exception e) { 222 auto bson_data = new HBSON; 223 bson_data["stack"] = e.msg; 224 if ((ref_received) && !ref_received.empty) { 225 auto error_sender = hrpc.error(*ref_received, e.msg, 666, bson_data); 226 immutable error_buffer = hrpc.toBSON(error_sender).serialize; 227 reads[i].send(error_buffer); 228 writeln("\nError:"); 229 auto error_doc = Document(error_buffer); 230 writeln(error_doc.toText); 231 stdout.flush; 232 } 233 else { 234 auto error_sender = hrpc.error(e.msg, 42); 235 immutable error_buffer = hrpc.toBSON(error_sender).serialize; 236 reads[i].send(error_buffer); 237 writeln("\nError:"); 238 auto error_doc = Document(error_buffer); 239 writeln(error_doc.toText); 240 stdout.flush; 241 } 242 } 243 244 continue; 245 } 246 else { 247 try { 248 // if the connection closed due to an error, remoteAddress() could fail 249 writefln("Connection from %s closed.", reads[i].remoteAddress().toString()); 250 } 251 catch (SocketException) { 252 writeln("Connection closed."); 253 } 254 } 255 256 // release socket resources now 257 reads[i].close(); 258 259 reads = reads.remove(i); 260 // i will be incremented by the for, we don't want it to be. 261 i--; 262 263 writefln("\tTotal connections: %d", reads.length); 264 stdout.flush; 265 } 266 } 267 268 if (socketSet.isSet(listener)) { // connection request 269 Socket sn = null; 270 scope (failure) { 271 writefln("Error accepting"); 272 if (sn) { 273 sn.close(); 274 } 275 } 276 sn = listener.accept(); 277 assert(sn.isAlive); 278 assert(listener.isAlive); 279 280 if (reads.length < MAX_CONNECTIONS) { 281 writefln("Connection from %s established.", sn.remoteAddress().toString()); 282 reads ~= sn; 283 writefln("\tTotal connections: %d", reads.length); 284 } 285 else { 286 writefln("Rejected connection from %s; too many connections.", sn.remoteAddress() 287 .toString()); 288 sn.close(); 289 assert(!sn.isAlive); 290 assert(listener.isAlive); 291 } 292 } 293 294 socketSet.reset(); 295 } 296 stdout.flush; 297 } 298 return 0; 299 }