1 module tagion.testbench.services.collector; 2 // Default import list for bdd 3 import core.time; 4 import std.algorithm.iteration : map; 5 import std.array; 6 import std.exception; 7 import std.file : exists, mkdirRecurse, remove, rmdirRecurse; 8 import std.format : format; 9 import std.path : buildPath, dirName, setExtension; 10 import std.range : iota, take, zip; 11 import std.typecons : Tuple; 12 import tagion.actor; 13 import tagion.basic.Types : Buffer, FileExtension; 14 import tagion.behaviour; 15 import tagion.communication.HiRPC; 16 import tagion.crypto.SecureInterfaceNet; 17 import tagion.crypto.SecureNet; 18 import tagion.crypto.Types; 19 import tagion.dart.DARTBasic; 20 import tagion.dart.Recorder; 21 import tagion.hibon.Document; 22 import tagion.logger.LogRecords : LogInfo; 23 import tagion.logger.Logger; 24 import tagion.script.TagionCurrency; 25 import tagion.script.common; 26 import tagion.script.execute; 27 import tagion.services.DART; 28 import tagion.services.collector; 29 import tagion.services.messages; 30 import tagion.services.options : TaskNames; 31 import tagion.services.replicator; 32 import tagion.testbench.actor.util; 33 import tagion.testbench.tools.Environment; 34 import tagion.utils.StdTime; 35 import tagion.utils.pretend_safe_concurrency : receive, receiveOnly, receiveTimeout; 36 37 enum feature = Feature( 38 "collector services", 39 []); 40 41 alias FeatureContext = Tuple!( 42 ItWork, "ItWork", 43 FeatureGroup*, "result" 44 ); 45 46 SecureNet[] createNets(uint count, string pass_prefix = "net") @safe { 47 return iota(0, count).map!((i) { 48 SecureNet net = new StdSecureNet(); 49 net.generateKeyPair(format("%s_%s", pass_prefix, i)); 50 return net; 51 }).array; 52 } 53 54 TagionBill[] createBills(const(SecureNet)[] bill_nets, uint amount) @safe { 55 return bill_nets.map!((net) => 56 TagionBill(TGN(amount), currentTime, net.pubkey, Buffer.init) 57 ).array; 58 } 59 60 const(DARTIndex)[] insertBills(TagionBill[] bills, ref RecordFactory.Recorder rec) @safe { 61 rec.insert(bills, Archive.Type.ADD); 62 return rec[].map!((a) => a.dart_index).array; 63 } 64 65 @safe @Scenario("it work", []) 66 class ItWork { 67 enum dart_service = "dart_service_task"; 68 ActorHandle dart_handle; 69 ActorHandle collector_handle; 70 ActorHandle replicator_handle; 71 72 TagionBill[] input_bills; 73 SecureNet[] input_nets; 74 75 immutable SecureNet node_net; 76 this() { 77 SecureNet _net = new StdSecureNet(); 78 _net.generateKeyPair("very secret"); 79 node_net = (() @trusted => cast(immutable) _net)(); 80 } 81 82 @Given("i have a collector service") 83 Document service() @safe { 84 thisActor.task_name = "collector_tester_task"; 85 log.registerSubscriptionTask(thisActor.task_name); 86 submask.subscribe(reject_collector); 87 88 immutable task_names = TaskNames(); 89 { // Start dart service 90 auto module_path = env.bdd_log.buildPath(__MODULE__); 91 immutable opts = DARTOptions( 92 module_path, 93 "dart".setExtension(FileExtension.dart), 94 ); 95 96 auto replicator_path = buildPath(module_path, "replicator"); 97 immutable replicator_opts = ReplicatorOptions(replicator_path); 98 99 mkdirRecurse(replicator_path); 100 if (replicator_path.exists) { 101 rmdirRecurse(replicator_path); 102 } 103 mkdirRecurse(opts.dart_filename.dirName); 104 105 if (opts.dart_path.exists) { 106 opts.dart_path.remove; 107 } 108 109 import tagion.dart.DART; 110 111 DART.create(opts.dart_path, node_net); 112 113 auto dart_net = new StdSecureNet; 114 dart_net.generateKeyPair("dartnet"); 115 dart_handle = (() @trusted => spawn!DARTService(task_names.dart, opts, task_names, cast(shared) dart_net, false))(); 116 replicator_handle = (() @trusted => spawn!ReplicatorService(task_names.replicator, replicator_opts))(); 117 check(waitforChildren(Ctrl.ALIVE), "dart service did not alive"); 118 } 119 120 auto record_factory = RecordFactory(node_net); 121 auto insert_recorder = record_factory.recorder; 122 123 input_nets = createNets(10, "input"); 124 input_bills = input_nets.createBills(100_000); 125 input_bills.insertBills(insert_recorder); 126 dart_handle.send(dartModifyRR(), RecordFactory.uniqueRecorder(insert_recorder), immutable long(0)); 127 receiveOnlyTimeout!(dartModifyRR.Response, Fingerprint); 128 129 { 130 import tagion.utils.pretend_safe_concurrency; 131 132 register(task_names.tvm, thisTid); 133 } 134 collector_handle = _spawn!CollectorService(task_names.collector, task_names); 135 check(waitforChildren(Ctrl.ALIVE), "CollectorService never alived"); 136 return result_ok; 137 } 138 139 @When("i send a contract") 140 Document contract() { 141 const script = PayScript(iota(0, 10).map!(_ => TagionBill.init).array).toDoc; 142 const s_contract = sign(input_nets, input_bills.map!(a => a.toDoc).array, null, script); 143 144 import std.stdio; 145 import tagion.hibon.HiBONJSON; 146 147 writeln(s_contract.toPretty); 148 149 const hirpc = HiRPC(node_net); 150 immutable sender = hirpc.sendDaMonies(s_contract); 151 collector_handle.send(inputHiRPC(), hirpc.receive(sender.toDoc)); 152 153 auto collected = receiveOnlyTimeout!(signedContract, immutable(CollectedSignedContract)*)[1]; 154 155 check(collected !is null, "The collected was null"); 156 // check(collected.inputs.length == inputs.length, "The lenght of inputs were not the same"); 157 // check(collected.inputs.map!(a => node_net.dartIndex(a)).array == inputs, "The collected archives did not match the index"); 158 return result_ok; 159 } 160 161 @When("i send an contract with no inputs") 162 Document noInputs() { 163 const outputs = PayScript(iota(0, 10).map!(_ => TagionBill.init).array).toDoc; 164 const contract = Contract(DARTIndex[].init, DARTIndex[].init, outputs); 165 const s_contract = SignedContract(Signature[].init, contract); 166 167 const hirpc = HiRPC(node_net); 168 immutable sender = hirpc.sendDaMonies(s_contract); 169 //immutable sender = hirpc.sendDaMonies(contract); 170 collector_handle.send(inputHiRPC(), hirpc.receive(sender.toDoc)); 171 172 auto result = receiveOnlyTimeout!(LogInfo, const(Document)); 173 check(result[0].symbol_name == "hirpc_invalid_signed_contract", "did not reject for the expected reason, got %s" 174 .format(result[0].symbol_name)); 175 176 return result_ok; 177 } 178 179 @When("i send an contract with invalid signatures inputs") 180 Document invalidSignatures() { 181 import std.random; 182 183 const script = PayScript(iota(0, 10).map!(_ => TagionBill.init).array).toDoc; 184 const s_contract = sign(input_nets.randomShuffle, input_bills.map!(a => a.toDoc).array, null, script); 185 186 const hirpc = HiRPC(node_net); 187 immutable sender = hirpc.sendDaMonies(s_contract); 188 collector_handle.send(inputHiRPC(), hirpc.receive(sender.toDoc)); 189 190 auto result = receiveOnlyTimeout!(LogInfo, const(Document)); 191 check(result[0].symbol_name == "contract_no_verify", "did not reject for the expected reason got, %s".format(result[0] 192 .symbol_name)); 193 194 return result_ok; 195 } 196 197 @When("i send a contract with input which are not in the dart") 198 Document inTheDart() { 199 const script = PayScript(iota(0, 10).map!(_ => TagionBill.init).array).toDoc; 200 201 const invalid_inputs = createNets(10, "not_int_dart") 202 .createBills(100_000) 203 .map!(a => a.toDoc) 204 .array; 205 206 const s_contract = sign(input_nets, invalid_inputs, null, script); 207 208 const hirpc = HiRPC(node_net); 209 immutable sender = hirpc.sendDaMonies(s_contract); 210 collector_handle.send(inputHiRPC(), hirpc.receive(sender.toDoc)); 211 212 auto result = receiveOnlyTimeout!(LogInfo, const(Document)); 213 check(result[0].symbol_name == "missing_archives", "did not reject for the expected reason"); 214 215 return result_ok; 216 } 217 218 @Then("i stop the services") 219 Document collectedSignedContract() { 220 dart_handle.send(Sig.STOP); 221 collector_handle.send(Sig.STOP); 222 waitforChildren(Ctrl.END); 223 224 return result_ok; 225 } 226 }