1 module tagion.testbench.services.collector;
2 // Default import list for bdd
3 import core.time;
4 import std.algorithm.iteration : map;
5 import std.array;
6 import std.exception;
7 import std.file : exists, mkdirRecurse, remove, rmdirRecurse;
8 import std.format : format;
9 import std.path : buildPath, dirName, setExtension;
10 import std.range : iota, take, zip;
11 import std.typecons : Tuple;
12 import tagion.actor;
13 import tagion.basic.Types : Buffer, FileExtension;
14 import tagion.behaviour;
15 import tagion.communication.HiRPC;
16 import tagion.crypto.SecureInterfaceNet;
17 import tagion.crypto.SecureNet;
18 import tagion.crypto.Types;
19 import tagion.dart.DARTBasic;
20 import tagion.dart.Recorder;
21 import tagion.hibon.Document;
22 import tagion.logger.LogRecords : LogInfo;
23 import tagion.logger.Logger;
24 import tagion.script.TagionCurrency;
25 import tagion.script.common;
26 import tagion.script.execute;
27 import tagion.services.DART;
28 import tagion.services.collector;
29 import tagion.services.messages;
30 import tagion.services.options : TaskNames;
31 import tagion.services.replicator;
32 import tagion.testbench.actor.util;
33 import tagion.testbench.tools.Environment;
34 import tagion.utils.StdTime;
35 import tagion.utils.pretend_safe_concurrency : receive, receiveOnly, receiveTimeout;
36 
37 enum feature = Feature(
38             "collector services",
39             []);
40 
41 alias FeatureContext = Tuple!(
42         ItWork, "ItWork",
43         FeatureGroup*, "result"
44 );
45 
46 SecureNet[] createNets(uint count, string pass_prefix = "net") @safe {
47     return iota(0, count).map!((i) {
48         SecureNet net = new StdSecureNet();
49         net.generateKeyPair(format("%s_%s", pass_prefix, i));
50         return net;
51     }).array;
52 }
53 
54 TagionBill[] createBills(const(SecureNet)[] bill_nets, uint amount) @safe {
55     return bill_nets.map!((net) =>
56             TagionBill(TGN(amount), currentTime, net.pubkey, Buffer.init)
57     ).array;
58 }
59 
60 const(DARTIndex)[] insertBills(TagionBill[] bills, ref RecordFactory.Recorder rec) @safe {
61     rec.insert(bills, Archive.Type.ADD);
62     return rec[].map!((a) => a.dart_index).array;
63 }
64 
65 @safe @Scenario("it work", [])
66 class ItWork {
67     enum dart_service = "dart_service_task";
68     ActorHandle dart_handle;
69     ActorHandle collector_handle;
70     ActorHandle replicator_handle;
71 
72     TagionBill[] input_bills;
73     SecureNet[] input_nets;
74 
75     immutable SecureNet node_net;
76     this() {
77         SecureNet _net = new StdSecureNet();
78         _net.generateKeyPair("very secret");
79         node_net = (() @trusted => cast(immutable) _net)();
80     }
81 
82     @Given("i have a collector service")
83     Document service() @safe {
84         thisActor.task_name = "collector_tester_task";
85         log.registerSubscriptionTask(thisActor.task_name);
86         submask.subscribe(reject_collector);
87 
88         immutable task_names = TaskNames();
89         { // Start dart service
90             auto module_path = env.bdd_log.buildPath(__MODULE__);
91             immutable opts = DARTOptions(
92                     module_path,
93                     "dart".setExtension(FileExtension.dart),
94             );
95 
96             auto replicator_path = buildPath(module_path, "replicator");
97             immutable replicator_opts = ReplicatorOptions(replicator_path);
98 
99             mkdirRecurse(replicator_path);
100             if (replicator_path.exists) {
101                 rmdirRecurse(replicator_path);
102             }
103             mkdirRecurse(opts.dart_filename.dirName);
104 
105             if (opts.dart_path.exists) {
106                 opts.dart_path.remove;
107             }
108 
109             import tagion.dart.DART;
110 
111             DART.create(opts.dart_path, node_net);
112 
113             auto dart_net = new StdSecureNet;
114             dart_net.generateKeyPair("dartnet");
115             dart_handle = (() @trusted => spawn!DARTService(task_names.dart, opts, task_names, cast(shared) dart_net, false))();
116             replicator_handle = (() @trusted => spawn!ReplicatorService(task_names.replicator, replicator_opts))();
117             check(waitforChildren(Ctrl.ALIVE), "dart service did not alive");
118         }
119 
120         auto record_factory = RecordFactory(node_net);
121         auto insert_recorder = record_factory.recorder;
122 
123         input_nets = createNets(10, "input");
124         input_bills = input_nets.createBills(100_000);
125         input_bills.insertBills(insert_recorder);
126         dart_handle.send(dartModifyRR(), RecordFactory.uniqueRecorder(insert_recorder), immutable long(0));
127         receiveOnlyTimeout!(dartModifyRR.Response, Fingerprint);
128 
129         {
130             import tagion.utils.pretend_safe_concurrency;
131 
132             register(task_names.tvm, thisTid);
133         }
134         collector_handle = _spawn!CollectorService(task_names.collector, task_names);
135         check(waitforChildren(Ctrl.ALIVE), "CollectorService never alived");
136         return result_ok;
137     }
138 
139     @When("i send a contract")
140     Document contract() {
141         const script = PayScript(iota(0, 10).map!(_ => TagionBill.init).array).toDoc;
142         const s_contract = sign(input_nets, input_bills.map!(a => a.toDoc).array, null, script);
143 
144         import std.stdio;
145         import tagion.hibon.HiBONJSON;
146 
147         writeln(s_contract.toPretty);
148 
149         const hirpc = HiRPC(node_net);
150         immutable sender = hirpc.sendDaMonies(s_contract);
151         collector_handle.send(inputHiRPC(), hirpc.receive(sender.toDoc));
152 
153         auto collected = receiveOnlyTimeout!(signedContract, immutable(CollectedSignedContract)*)[1];
154 
155         check(collected !is null, "The collected was null");
156         // check(collected.inputs.length == inputs.length, "The lenght of inputs were not the same");
157         // check(collected.inputs.map!(a => node_net.dartIndex(a)).array == inputs, "The collected archives did not match the index");
158         return result_ok;
159     }
160 
161     @When("i send an contract with no inputs")
162     Document noInputs() {
163         const outputs = PayScript(iota(0, 10).map!(_ => TagionBill.init).array).toDoc;
164         const contract = Contract(DARTIndex[].init, DARTIndex[].init, outputs);
165         const s_contract = SignedContract(Signature[].init, contract);
166 
167         const hirpc = HiRPC(node_net);
168         immutable sender = hirpc.sendDaMonies(s_contract);
169         //immutable sender = hirpc.sendDaMonies(contract);
170         collector_handle.send(inputHiRPC(), hirpc.receive(sender.toDoc));
171 
172         auto result = receiveOnlyTimeout!(LogInfo, const(Document));
173         check(result[0].symbol_name == "hirpc_invalid_signed_contract", "did not reject for the expected reason, got %s"
174                 .format(result[0].symbol_name));
175 
176         return result_ok;
177     }
178 
179     @When("i send an contract with invalid signatures inputs")
180     Document invalidSignatures() {
181         import std.random;
182 
183         const script = PayScript(iota(0, 10).map!(_ => TagionBill.init).array).toDoc;
184         const s_contract = sign(input_nets.randomShuffle, input_bills.map!(a => a.toDoc).array, null, script);
185 
186         const hirpc = HiRPC(node_net);
187         immutable sender = hirpc.sendDaMonies(s_contract);
188         collector_handle.send(inputHiRPC(), hirpc.receive(sender.toDoc));
189 
190         auto result = receiveOnlyTimeout!(LogInfo, const(Document));
191         check(result[0].symbol_name == "contract_no_verify", "did not reject for the expected reason got, %s".format(result[0]
192                 .symbol_name));
193 
194         return result_ok;
195     }
196 
197     @When("i send a contract with input which are not in the dart")
198     Document inTheDart() {
199         const script = PayScript(iota(0, 10).map!(_ => TagionBill.init).array).toDoc;
200 
201         const invalid_inputs = createNets(10, "not_int_dart")
202             .createBills(100_000)
203             .map!(a => a.toDoc)
204             .array;
205 
206         const s_contract = sign(input_nets, invalid_inputs, null, script);
207 
208         const hirpc = HiRPC(node_net);
209         immutable sender = hirpc.sendDaMonies(s_contract);
210         collector_handle.send(inputHiRPC(), hirpc.receive(sender.toDoc));
211 
212         auto result = receiveOnlyTimeout!(LogInfo, const(Document));
213         check(result[0].symbol_name == "missing_archives", "did not reject for the expected reason");
214 
215         return result_ok;
216     }
217 
218     @Then("i stop the services")
219     Document collectedSignedContract() {
220         dart_handle.send(Sig.STOP);
221         collector_handle.send(Sig.STOP);
222         waitforChildren(Ctrl.END);
223 
224         return result_ok;
225     }
226 }