1 module tagion.gossip.EmulatorGossipNet;
2 
3 import std.array : join;
4 import std.concurrency;
5 import std.conv : to;
6 import std.format;
7 import std.stdio;
8 import tagion.basic.Types : Buffer, isBufferType;
9 import tagion.basic.basic : EnumText, basename, buf_idup;
10 import tagion.crypto.Types : Pubkey;
11 
12 import tagion.utils.Miscellaneous : cutHex;
13 import tagion.utils.Queue;
14 
15 import tagion.gossip.InterfaceNet;
16 import tagion.hibon.Document : Document;
17 import tagion.hibon.HiBON : HiBON;
18 
19 import tagion.basic.ConsensusExceptions;
20 import tagion.communication.HiRPC;
21 import tagion.hashgraph.Event;
22 import tagion.logger.Logger;
23 import tagion.options.CommonOptions;
24 import tagion.options.ServiceNames : get_node_name;
25 import tagion.utils.StdTime;
26 import tagion.crypto.secp256k1.NativeSecp256k1;
27 import core.atomic;
28 import core.thread;
29 import core.time;
30 import std.datetime;
31 import std.random : Random, uniform, unpredictableSeed;
32 import tagion.services.messages;
33 
34 @safe
35 class EmulatorGossipNet : GossipNet {
36     private Duration duration;
37 
38     private string[immutable(Pubkey)] task_names;
39     private immutable(Pubkey)[] _pkeys;
40     protected uint _send_node_id;
41     protected sdt_t _current_time;
42     immutable(Pubkey) mypk;
43     Random random;
44 
45     this(const Pubkey mypk, Duration duration) {
46         this.random = Random(unpredictableSeed);
47         this.duration = duration;
48         this.mypk = mypk;
49     }
50 
51     void add_channel(const Pubkey channel) {
52         import core.thread;
53         import tagion.gossip.AddressBook;
54         import tagion.services.locator;
55 
56         const task_name = addressbook.getAddress(channel);
57 
58         // we do this command to make sure that everything has started since it will throw if it has not been started.
59         tryLocate(task_name);
60 
61         _pkeys ~= channel;
62         task_names[channel] = task_name;
63 
64         log.trace("Add channel: %s tid: %s", channel.cutHex, task_names[channel]);
65     }
66 
67     void remove_channel(const Pubkey channel) {
68         import std.algorithm.searching;
69 
70         const channel_index = countUntil(_pkeys, channel);
71         _pkeys = _pkeys[0 .. channel_index] ~ _pkeys[channel_index + 1 .. $];
72         task_names.remove(channel);
73     }
74 
75     @safe
76     void close() {
77 
78     }
79 
80     @property
81     const(sdt_t) time() pure const {
82         return _current_time;
83     }
84 
85     bool isValidChannel(const(Pubkey) channel) const pure nothrow {
86         return (channel in task_names) !is null;
87     }
88 
89     const(Pubkey) select_channel(const(ChannelFilter) channel_filter) {
90         import std.range : dropExactly;
91 
92         foreach (count; 0 .. task_names.length * 2) {
93             const node_index = uniform(0, cast(uint) task_names.length, random);
94             const send_channel = _pkeys[node_index];
95             if ((send_channel != mypk) && channel_filter(send_channel)) {
96                 return send_channel;
97             }
98         }
99         return Pubkey();
100     }
101 
102     const(Pubkey) gossip(
103             const(ChannelFilter) channel_filter,
104             const(SenderCallBack) sender) {
105         const send_channel = select_channel(channel_filter);
106         version (EPOCH_LOG) {
107             log.trace("Selected channel: %s", send_channel.cutHex);
108         }
109         if (send_channel.length) {
110             send(send_channel, sender());
111         }
112         return send_channel;
113     }
114 
115     @trusted
116     void send(const Pubkey channel, const(HiRPC.Sender) sender) {
117         import std.algorithm.searching : countUntil;
118         import tagion.hibon.HiBONJSON;
119 
120         Thread.sleep(duration);
121 
122         auto node_tid = locate(task_names[channel]);
123         if (node_tid is Tid.init) {
124             return;
125         }
126 
127         node_tid.send(ReceivedWavefront(), sender.toDoc);
128         version (EPOCH_LOG) {
129             log.trace("Successfully sent to %s (Node_%s) %d bytes", channel.cutHex, _pkeys.countUntil(channel), sender
130                     .toDoc.serialize.length);
131         }
132     }
133 
134     void start_listening() {
135         // NO IMPLEMENTATION NEEDED
136     }
137 }