1 module tagion.gossip.EmulatorGossipNet; 2 3 import std.array : join; 4 import std.concurrency; 5 import std.conv : to; 6 import std.format; 7 import std.stdio; 8 import tagion.basic.Types : Buffer, isBufferType; 9 import tagion.basic.basic : EnumText, basename, buf_idup; 10 import tagion.crypto.Types : Pubkey; 11 12 import tagion.utils.Miscellaneous : cutHex; 13 import tagion.utils.Queue; 14 15 import tagion.gossip.InterfaceNet; 16 import tagion.hibon.Document : Document; 17 import tagion.hibon.HiBON : HiBON; 18 19 import tagion.basic.ConsensusExceptions; 20 import tagion.communication.HiRPC; 21 import tagion.hashgraph.Event; 22 import tagion.logger.Logger; 23 import tagion.options.CommonOptions; 24 import tagion.options.ServiceNames : get_node_name; 25 import tagion.utils.StdTime; 26 import tagion.crypto.secp256k1.NativeSecp256k1; 27 import core.atomic; 28 import core.thread; 29 import core.time; 30 import std.datetime; 31 import std.random : Random, uniform, unpredictableSeed; 32 import tagion.services.messages; 33 34 @safe 35 class EmulatorGossipNet : GossipNet { 36 private Duration duration; 37 38 private string[immutable(Pubkey)] task_names; 39 private immutable(Pubkey)[] _pkeys; 40 protected uint _send_node_id; 41 protected sdt_t _current_time; 42 immutable(Pubkey) mypk; 43 Random random; 44 45 this(const Pubkey mypk, Duration duration) { 46 this.random = Random(unpredictableSeed); 47 this.duration = duration; 48 this.mypk = mypk; 49 } 50 51 void add_channel(const Pubkey channel) { 52 import core.thread; 53 import tagion.gossip.AddressBook; 54 import tagion.services.locator; 55 56 const task_name = addressbook.getAddress(channel); 57 58 // we do this command to make sure that everything has started since it will throw if it has not been started. 59 tryLocate(task_name); 60 61 _pkeys ~= channel; 62 task_names[channel] = task_name; 63 64 log.trace("Add channel: %s tid: %s", channel.cutHex, task_names[channel]); 65 } 66 67 void remove_channel(const Pubkey channel) { 68 import std.algorithm.searching; 69 70 const channel_index = countUntil(_pkeys, channel); 71 _pkeys = _pkeys[0 .. channel_index] ~ _pkeys[channel_index + 1 .. $]; 72 task_names.remove(channel); 73 } 74 75 @safe 76 void close() { 77 78 } 79 80 @property 81 const(sdt_t) time() pure const { 82 return _current_time; 83 } 84 85 bool isValidChannel(const(Pubkey) channel) const pure nothrow { 86 return (channel in task_names) !is null; 87 } 88 89 const(Pubkey) select_channel(const(ChannelFilter) channel_filter) { 90 import std.range : dropExactly; 91 92 foreach (count; 0 .. task_names.length * 2) { 93 const node_index = uniform(0, cast(uint) task_names.length, random); 94 const send_channel = _pkeys[node_index]; 95 if ((send_channel != mypk) && channel_filter(send_channel)) { 96 return send_channel; 97 } 98 } 99 return Pubkey(); 100 } 101 102 const(Pubkey) gossip( 103 const(ChannelFilter) channel_filter, 104 const(SenderCallBack) sender) { 105 const send_channel = select_channel(channel_filter); 106 version (EPOCH_LOG) { 107 log.trace("Selected channel: %s", send_channel.cutHex); 108 } 109 if (send_channel.length) { 110 send(send_channel, sender()); 111 } 112 return send_channel; 113 } 114 115 @trusted 116 void send(const Pubkey channel, const(HiRPC.Sender) sender) { 117 import std.algorithm.searching : countUntil; 118 import tagion.hibon.HiBONJSON; 119 120 Thread.sleep(duration); 121 122 auto node_tid = locate(task_names[channel]); 123 if (node_tid is Tid.init) { 124 return; 125 } 126 127 node_tid.send(ReceivedWavefront(), sender.toDoc); 128 version (EPOCH_LOG) { 129 log.trace("Successfully sent to %s (Node_%s) %d bytes", channel.cutHex, _pkeys.countUntil(channel), sender 130 .toDoc.serialize.length); 131 } 132 } 133 134 void start_listening() { 135 // NO IMPLEMENTATION NEEDED 136 } 137 }