1 /// Consensus HashGraph main object 2 module tagion.hashgraph.HashGraph; 3 4 import std.algorithm; 5 import std.array : array; 6 import std.conv; 7 import std.exception : assumeWontThrow; 8 import std.format; 9 import std.range; 10 import std.stdio; 11 import std.typecons : Flag, No, Yes; 12 import tagion.basic.Debug : __format; 13 import tagion.basic.Types : Buffer; 14 import tagion.communication.HiRPC; 15 import tagion.crypto.SecureInterfaceNet; 16 import tagion.crypto.Types : Privkey, Pubkey, Signature; 17 import tagion.gossip.InterfaceNet; 18 import tagion.hashgraph.Event; 19 import tagion.hashgraph.HashGraphBasic; 20 import tagion.hashgraph.RefinementInterface; 21 import tagion.hashgraph.Round; 22 import tagion.hibon.Document : Document; 23 import tagion.hibon.HiBON : HiBON; 24 import tagion.hibon.HiBONRecord : isHiBONRecord; 25 import tagion.logger.Logger; 26 import tagion.utils.BitMask; 27 import tagion.utils.StdTime; 28 29 // debug 30 import tagion.basic.Debug; 31 import tagion.hibon.HiBONJSON; 32 import tagion.utils.Miscellaneous : cutHex; 33 34 @safe 35 class HashGraph { 36 enum default_scrap_depth = 10; 37 //bool print_flag; 38 int scrap_depth = default_scrap_depth; 39 import tagion.basic.ConsensusExceptions; 40 41 bool __debug_print; 42 43 protected alias check = Check!HashGraphConsensusException; 44 // protected alias consensus=consensusCheckArguments!(HashGraphConsensusException); 45 import tagion.logger.Statistic; 46 47 immutable size_t node_size; /// Number of active nodes in the graph 48 immutable(string) name; // Only used for debugging 49 Statistic!uint witness_search_statistic; 50 Statistic!uint strong_seeing_statistic; 51 Statistic!uint received_order_statistic; 52 Statistic!uint mark_received_statistic; 53 Statistic!uint order_compare_statistic; 54 Statistic!uint rare_order_compare_statistic; 55 Statistic!uint epoch_events_statistic; 56 Statistic!uint wavefront_event_package_statistic; 57 Statistic!uint wavefront_event_package_used_statistic; 58 Statistic!uint live_events_statistic; 59 Statistic!uint live_witness_statistic; 60 Statistic!long epoch_delay_statistic; 61 BitMask _excluded_nodes_mask; 62 private { 63 Node[Pubkey] _nodes; // List of participating _nodes T 64 uint event_id; 65 sdt_t last_epoch_time; 66 Flag!"joining" _joining; 67 } 68 Refinement refinement; 69 protected Node _owner_node; 70 const(Node) owner_node() const pure nothrow @nogc { 71 return _owner_node; 72 } 73 74 Flag!"joining" joining() const pure nothrow @nogc { 75 return _joining; 76 } 77 78 /** 79 * Get a map of all the nodes currently handled by the graph 80 * Returns: 81 */ 82 const(Node[Pubkey]) nodes() const pure nothrow @nogc { 83 return _nodes; 84 } 85 86 const HiRPC hirpc; 87 88 @nogc 89 const(BitMask) excluded_nodes_mask() const pure nothrow { 90 return _excluded_nodes_mask; 91 } 92 93 void excluded_nodes_mask(const(BitMask) mask) pure nothrow { 94 _excluded_nodes_mask = mask; 95 } 96 97 package Round.Rounder _rounds; /// The rounder hold the round in the queue both decided and undecided rounds 98 99 alias ValidChannel = bool delegate(const Pubkey channel); 100 const ValidChannel valid_channel; /// Valiates of a node at channel is valid 101 /** 102 * Creates a graph with node_size nodes 103 * Params: 104 * node_size = number of nodes handles byt the graph 105 * net = Securety element handles hash function, signing and signature validation 106 * valid_channel = call-back to check if a node is valid 107 * epoch_callback = call-back which is called when an epoch has been produced 108 * epack_callback = call-back call if when a package has been added to the cache. 109 * name = used for debuging label the node name 110 */ 111 this(const size_t node_size, 112 const SecureNet net, 113 Refinement refinement, 114 const ValidChannel valid_channel, 115 const Flag!"joining" joining, 116 string name = null) 117 in (node_size >= 4) 118 do { 119 hirpc = HiRPC(net); 120 this.node_size = node_size; 121 this._owner_node = getNode(hirpc.net.pubkey); 122 this.refinement = refinement; 123 this.refinement.setOwner(this); 124 this.valid_channel = valid_channel; 125 126 this._joining = joining; 127 this.name = name; 128 _rounds = Round.Rounder(this); 129 } 130 131 void initialize_witness(const(immutable(EventPackage)*[]) epacks) 132 in { 133 assert(_nodes.length > 0 && (channel in _nodes), 134 "Owen Eva event needs to be create before witness can be initialized"); 135 assert(_owner_node !is null); 136 } 137 do { 138 version (EPOCH_LOG) { 139 log("INITTING WITNESSES %s", _owner_node.channel.cutHex); 140 } 141 Node[Pubkey] recovered_nodes; 142 scope (success) { 143 void init_event(immutable(EventPackage*) epack) { 144 auto event = new Event(epack, this); 145 _event_cache[event.fingerprint] = event; 146 event.witness_event(node_size); 147 version (EPOCH_LOG) { 148 log("init_event time %s", event.event_body.time); 149 } 150 _rounds.last_round.add(event); 151 front_seat(event); 152 event._round_received = _rounds.last_round; 153 } 154 155 _rounds.erase; 156 _rounds = Round.Rounder(this); 157 _rounds.last_decided_round = _rounds.last_round; 158 (() @trusted { _event_cache.clear; })(); 159 init_event(_owner_node.event.event_package); 160 // front_seat(owen_event); 161 foreach (epack; epacks) { 162 if (epack.pubkey != channel) { 163 init_event(epack); 164 } 165 } 166 foreach (channel, recovered_node; recovered_nodes) { 167 if (!(channel in _nodes)) { 168 if (recovered_node.event) { 169 init_event(recovered_node.event.event_package); 170 } 171 } 172 } 173 174 _nodes.byValue 175 .map!(n => n.event) 176 .each!(e => e.initializeOrder); 177 } 178 scope (failure) { 179 _nodes = recovered_nodes; 180 } 181 recovered_nodes = _nodes; 182 _nodes = null; 183 check(isMajority(cast(uint) epacks.length), ConsensusFailCode.HASHGRAPH_EVENT_INITIALIZE); 184 // consensus(epacks.length) 185 // .check(epacks.length <= node_size, ConsensusFailCode.HASHGRAPH_EVENT_INITIALIZE); 186 // getNode(channel); // Make sure that node_id == 0 is owner node 187 foreach (epack; epacks) { 188 if (epack.pubkey != channel) { 189 check(!(epack.pubkey in _nodes), ConsensusFailCode.HASHGRAPH_DUBLICATE_WITNESS); 190 auto node = getNode(epack.pubkey); 191 } 192 } 193 } 194 195 @nogc 196 const(Round.Rounder) rounds() const pure nothrow { 197 return _rounds; 198 } 199 200 bool areWeInGraph() const pure nothrow { 201 return _rounds.last_decided_round !is null; 202 } 203 204 final Pubkey channel() const pure nothrow { 205 return hirpc.net.pubkey; 206 } 207 208 const(Pubkey[]) channels() const pure nothrow { 209 return _nodes.keys; 210 } 211 212 bool not_used_channels(const(Pubkey) selected_channel) { 213 if (selected_channel == channel) { 214 return false; 215 } 216 const node = _nodes.get(selected_channel, null); 217 if (node) { 218 return node.state is ExchangeState.NONE; 219 } 220 return true; 221 } 222 223 alias GraphResonse = const(Pubkey) delegate( 224 GossipNet.ChannelFilter channel_filter, 225 GossipNet.SenderCallBack sender) @safe; 226 alias GraphPayload = const(Document) delegate() @safe; 227 228 void init_tide( 229 const(GraphResonse) responde, 230 const(GraphPayload) payload, 231 lazy const sdt_t time) { 232 const(HiRPC.Sender) payload_sender() @safe { 233 const doc = payload(); 234 immutable epack = event_pack(time, null, doc); 235 236 const registrated = registerEventPackage(epack); 237 238 assert(registrated, "Should not fail here"); 239 const sender = hirpc.wavefront(tidalWave); 240 return sender; 241 } 242 243 const(HiRPC.Sender) sharp_sender() @safe { 244 version (EPOCH_LOG) { 245 log("SENDING sharp sender: %s", owner_node.channel.cutHex); 246 } 247 248 const sharp_wavefront = sharpWave(); 249 const sender = hirpc.wavefront(sharp_wavefront); 250 return sender; 251 } 252 253 if (areWeInGraph) { 254 const send_channel = responde( 255 ¬_used_channels, 256 &payload_sender); 257 if (send_channel !is Pubkey.init) { 258 getNode(send_channel).state = ExchangeState.INIT_TIDE; 259 } 260 } 261 else { 262 const send_channel = responde( 263 ¬_used_channels, 264 &sharp_sender); 265 } 266 } 267 268 immutable(EventPackage)* event_pack( 269 lazy const sdt_t time, 270 const(Event) father_event, 271 const Document doc) { 272 273 const mother_event = getNode(channel).event; 274 275 immutable ebody = EventBody(doc, mother_event, father_event, time); 276 277 immutable result = new immutable(EventPackage)(hirpc.net, ebody); 278 return result; 279 } 280 281 immutable(EventPackage*) eva_pack(lazy const sdt_t time, const Buffer nonce) { 282 const payload = EvaPayload(channel, nonce); 283 immutable eva_event_body = EventBody(payload.toDoc, null, null, time); 284 immutable epack = new immutable(EventPackage)(hirpc.net, eva_event_body); 285 return epack; 286 } 287 288 Event createEvaEvent(lazy const sdt_t time, const Buffer nonce) { 289 immutable eva_epack = eva_pack(time, nonce); 290 auto eva_event = new Event(eva_epack, this); 291 292 _event_cache[eva_event.fingerprint] = eva_event; 293 front_seat(eva_event); 294 // set_strongly_seen_mask(eva_event); 295 return eva_event; 296 } 297 298 alias EventPackageCache = immutable(EventPackage)*[Buffer]; /// EventPackages received from another node in the hashgraph. 299 alias EventCache = Event[Buffer]; /// Events already connected to this hashgraph. 300 301 protected { 302 EventCache _event_cache; 303 } 304 305 void eliminate(scope const(Buffer) fingerprint) pure nothrow { 306 _event_cache.remove(fingerprint); 307 } 308 309 @nogc 310 size_t number_of_registered_event() const pure nothrow { 311 return _event_cache.length; 312 } 313 314 // function not used 315 @nogc 316 bool isRegistered(scope const(ubyte[]) fingerprint) const pure nothrow { 317 return (fingerprint in _event_cache) !is null; 318 } 319 320 Topic topic = Topic("hashgraph_event"); 321 package void epoch(Event[] event_collection, const Round decided_round) { 322 refinement.epoch(event_collection, decided_round); 323 if (scrap_depth > 0) { 324 live_events_statistic(Event.count); 325 log(topic, live_events_statistic.stringof, live_events_statistic); 326 live_witness_statistic(Event.Witness.count); 327 log(topic, live_witness_statistic.stringof, live_events_statistic); 328 _rounds.dustman; 329 } 330 } 331 332 /++ 333 @return true if the event package has been register correct 334 +/ 335 Event registerEventPackage( 336 immutable(EventPackage*) event_pack) 337 in (event_pack.fingerprint !in _event_cache, 338 format("Event %(%02x%) has already been registerd", 339 event_pack.fingerprint)) 340 do { 341 if (valid_channel(event_pack.pubkey)) { 342 auto event = new Event(event_pack, this); 343 _event_cache[event.fingerprint] = event; 344 refinement.epack(event_pack); 345 event.connect(this); 346 return event; 347 } 348 return null; 349 } 350 351 class Register { 352 private EventPackageCache event_package_cache; 353 354 this(const Wavefront received_wave) pure nothrow { 355 uint count_events; 356 scope (exit) { 357 wavefront_event_package_statistic(count_events); 358 wavefront_event_package_used_statistic(cast(uint) event_package_cache.length); 359 } 360 foreach (e; received_wave.epacks) { 361 count_events++; 362 if (!(e.fingerprint in event_package_cache || e.fingerprint in _event_cache)) { 363 event_package_cache[e.fingerprint] = e; 364 } 365 } 366 } 367 368 final Event lookup(const(Buffer) fingerprint) { 369 if (fingerprint in _event_cache) { 370 return _event_cache[fingerprint]; 371 } 372 373 if (fingerprint in event_package_cache) { 374 immutable event_pack = event_package_cache[fingerprint]; 375 if (valid_channel(event_pack.pubkey)) { 376 auto event = new Event(event_pack, this.outer); 377 _event_cache[fingerprint] = event; 378 return event; 379 } 380 } 381 return null; 382 } 383 384 // function not used 385 final bool isCached(scope const(Buffer) fingerprint) const pure nothrow { 386 return (fingerprint in event_package_cache) !is null; 387 } 388 389 final Event register(const(Buffer) fingerprint) { 390 Event event; 391 392 if (!fingerprint) { 393 return event; 394 } 395 396 // event either from event_package_cache or event_cache. 397 event = lookup(fingerprint); 398 Event.check(_joining || event !is null, ConsensusFailCode.EVENT_MISSING_IN_CACHE); 399 if (event !is null) { 400 event.connect(this.outer); 401 } 402 return event; 403 } 404 } 405 406 protected Register _register; 407 408 package final Event register(const(Buffer) fingerprint) { 409 if (_register) { 410 return _register.register(fingerprint); 411 } 412 413 return _event_cache.get(fingerprint, null); 414 } 415 416 /++ 417 Returns: 418 The front event of the send channel 419 +/ 420 const(Event) register_wavefront(const Wavefront received_wave, const Pubkey from_channel) { 421 _register = new Register(received_wave); 422 423 scope (exit) { 424 log(topic, wavefront_event_package_statistic.stringof, wavefront_event_package_statistic); 425 log(topic, wavefront_event_package_used_statistic.stringof, wavefront_event_package_statistic); 426 _register = null; 427 } 428 429 Event front_seat_event; 430 foreach (fingerprint; _register.event_package_cache.byKey) { 431 auto registered_event = register(fingerprint); 432 if (registered_event.channel == from_channel) { 433 if (front_seat_event is null) { 434 front_seat_event = registered_event; 435 } 436 else if (higher(registered_event.altitude, front_seat_event.altitude)) { 437 front_seat_event = registered_event; 438 } 439 } 440 } 441 442 return front_seat_event; 443 } 444 445 @HiRPCMethod const(HiRPC.Sender) wavefront( 446 const Wavefront wave, 447 const uint id = 0) { 448 return hirpc.wavefront(wave, id); 449 } 450 451 /++ to synchronize two _nodes A and B 452 + 1) 453 + Node A send it's wave front to B 454 + This is done via the waveFront function 455 + 2) 456 + B collects all the events it has which is are in front of the 457 + wave front of A. 458 + This is done via the waveFront function 459 + B send the all the collected event to B including B's wave font of all 460 + the node which B know it leads in, 461 + The wave from is collect via the waveFront function by adding the remaining tides 462 + 3) 463 + A send the rest of the event which is in front of B's wave-front 464 +/ 465 const(Wavefront) tidalWave() pure { 466 Tides tides; 467 foreach (pkey, n; _nodes) { 468 if (n.isOnline) { 469 tides[pkey] = n.altitude; 470 assert(n._event.isFront); 471 } 472 } 473 return Wavefront(tides); 474 } 475 476 const(Wavefront) buildWavefront(const ExchangeState state, const Tides tides = null) { 477 if (state is ExchangeState.NONE || state is ExchangeState.BREAKING_WAVE) { 478 return Wavefront(null, null, state); 479 } 480 481 immutable(EventPackage)*[] result; 482 Tides owner_tides; 483 foreach (n; _nodes) { 484 if (n.channel in tides) { 485 const other_altitude = tides[n.channel]; 486 foreach (e; n[]) { 487 if (!higher(e.altitude, other_altitude)) { 488 owner_tides[n.channel] = e.altitude; 489 break; 490 } 491 result ~= e.event_package; 492 493 } 494 } 495 } 496 if (result.length == 0) { 497 return Wavefront(null, null, state); 498 } 499 return Wavefront(result, owner_tides, state); 500 } 501 502 /** 503 * 504 * Params: 505 * received_wave = The sharp received wave 506 * Returns: either coherent if in graph or rippleWave 507 */ 508 const(Wavefront) sharpResponse(const Wavefront received_wave) 509 in { 510 assert(received_wave.state is ExchangeState.SHARP); 511 } 512 do { 513 if (areWeInGraph) { 514 // writefln("sharp response ingraph:true"); 515 immutable(EventPackage)*[] result = _rounds.last_decided_round 516 .events 517 .filter!((e) => (e !is null)) 518 .map!((e) => cast(immutable(EventPackage)*) e.event_package) 519 .array; 520 return Wavefront(result, null, ExchangeState.COHERENT); 521 } 522 523 // if we are not in graph ourselves, we put the delta information 524 // in and return a RIPPLE. 525 auto received_epacks = received_wave 526 .epacks 527 .map!((e) => cast(immutable(EventPackage)*) e) 528 .array 529 .sort!((a, b) => a.fingerprint < b.fingerprint); 530 auto own_epacks = _nodes.byValue 531 .map!((n) => n[]) 532 .joiner 533 .map!((e) => cast(immutable(EventPackage)*) e.event_package) 534 .array 535 .sort!((a, b) => a.fingerprint < b.fingerprint); 536 import std.algorithm.setops : setDifference; 537 538 auto changes = setDifference!((a, b) => a.fingerprint < b.fingerprint)(received_epacks, own_epacks); 539 540 version (EPOCH_LOG) { 541 log("owner_epacks %s", own_epacks.length); 542 } 543 if (!changes.empty) { 544 // delta received from sharp should be added to our own node. 545 version (EPOCH_LOG) { 546 log("changes found"); 547 } 548 foreach (epack; changes) { 549 const epack_node = getNode(epack.pubkey); 550 auto first_event = new Event(epack, this); 551 if (epack_node.event is null) { 552 check(first_event.isEva, ConsensusFailCode.GOSSIPNET_FIRST_EVENT_MUST_BE_EVA); 553 } 554 _event_cache[first_event.fingerprint] = first_event; 555 front_seat(first_event); 556 } 557 } 558 559 auto result = setDifference!((a, b) => a.fingerprint < b.fingerprint)(own_epacks, received_epacks).array; 560 561 const state = ExchangeState.RIPPLE; 562 return Wavefront(result, Tides.init, state); 563 564 } 565 566 /** 567 * First time it is called we only send our own eva since this is all we know. 568 * Later we send everything it knows. 569 * Returns: the wavefront for a node that either wants to join or is booting. 570 */ 571 const(Wavefront) sharpWave() { 572 auto result = _nodes.byValue 573 .filter!((n) => (n._event !is null)) 574 .map!((n) => cast(immutable(EventPackage)*) n._event.event_package) 575 .array; 576 577 return Wavefront(result, null, ExchangeState.SHARP); 578 } 579 580 void wavefront( 581 const HiRPC.Receiver received, 582 lazy const(sdt_t) time, 583 void delegate(const(HiRPC.Sender) send_wave) @safe response, 584 const(Document) delegate() @safe payload) { 585 586 alias consensus = consensusCheckArguments!(GossipConsensusException); 587 immutable from_channel = received.pubkey; 588 const received_wave = received.params!(Wavefront)(hirpc.net); 589 check(valid_channel(from_channel), ConsensusFailCode.GOSSIPNET_ILLEGAL_CHANNEL); 590 auto received_node = getNode(from_channel); 591 592 if (Event.callbacks) { 593 Event.callbacks.receive(received_wave); 594 } 595 version (EPOCH_LOG) { 596 log.trace("received_wave(%s <- %s)", received_wave.state, received_node.state); 597 } 598 scope (exit) { 599 version (EPOCH_LOG) { 600 log.trace("next <- %s", received_node.state); 601 } 602 } 603 const(Wavefront) wavefront_response() @safe { 604 with (ExchangeState) { 605 final switch (received_wave.state) { 606 case NONE: 607 case INIT_TIDE: 608 consensus(received_wave.state) 609 .check(false, ConsensusFailCode.GOSSIPNET_ILLEGAL_EXCHANGE_STATE); 610 break; 611 612 case SHARP: /// 613 received_node.state = NONE; 614 received_node.sticky_state = SHARP; 615 version (EPOCH_LOG) { 616 log("received sharp %s", received_node.channel.cutHex); 617 } 618 const sharp_response = sharpResponse(received_wave); 619 return sharp_response; 620 case RIPPLE: 621 received_node.state = RIPPLE; 622 received_node.sticky_state = RIPPLE; 623 624 if (areWeInGraph) { 625 break; 626 } 627 628 // if we receive a ripplewave, we must add the eva events to our own graph. 629 const received_epacks = received_wave.epacks; 630 foreach (epack; received_epacks) { 631 const epack_node = getNode(epack.pubkey); 632 auto first_event = new Event(epack, this); 633 if (epack_node.event is null) { 634 check(first_event.isEva, ConsensusFailCode.GOSSIPNET_FIRST_EVENT_MUST_BE_EVA); 635 } 636 _event_cache[first_event.fingerprint] = first_event; 637 front_seat(first_event); 638 } 639 640 const contain_all = 641 _nodes 642 .byValue 643 .all!((n) => n._event !is null); 644 645 if (contain_all && node_size == _nodes.length) { 646 const own_epacks = _nodes 647 .byValue 648 .map!((n) => n[]) 649 .joiner 650 .map!((e) => e.event_package) 651 .array; 652 version (EPOCH_LOG) { 653 log("%s going to init witnesses, areweingraph %s", _owner_node.channel.cutHex, areWeInGraph); 654 } 655 initialize_witness(own_epacks); 656 } 657 break; 658 case COHERENT: 659 received_node.state = NONE; 660 received_node.sticky_state = COHERENT; 661 version (EPOCH_LOG) { 662 log("received coherent from: %s, self %s", received_node.channel.cutHex, _owner_node.channel 663 .cutHex); 664 } 665 if (!areWeInGraph) { 666 try { 667 version (EPOCH_LOG) { 668 log("GOING to init"); 669 } 670 initialize_witness(received_wave.epacks); 671 _owner_node.sticky_state = COHERENT; 672 _joining = No.joining; 673 } 674 catch (ConsensusException e) { 675 // initialized witness not correct 676 } 677 } 678 break; 679 case TIDAL_WAVE: /// 680 if (received_node.state !is NONE || !areWeInGraph || joining) { 681 received_node.state = NONE; 682 return buildWavefront(BREAKING_WAVE); 683 } 684 check(received_wave.epacks.length is 0, ConsensusFailCode 685 .GOSSIPNET_TIDAL_WAVE_CONTAINS_EVENTS); 686 received_node.state = received_wave.state; 687 immutable epack = event_pack(time, null, payload()); 688 const registered = registerEventPackage(epack); 689 assert(registered); 690 691 const wave = buildWavefront(FIRST_WAVE, received_wave.tides); 692 693 return wave; 694 case BREAKING_WAVE: 695 received_node.state = NONE; 696 break; 697 case FIRST_WAVE: 698 if (received_node.state !is INIT_TIDE || !areWeInGraph) { 699 received_node.state = NONE; 700 return buildWavefront(BREAKING_WAVE); 701 } 702 received_node.state = NONE; 703 704 const from_front_seat = register_wavefront(received_wave, from_channel); 705 immutable epack = event_pack(time, from_front_seat, payload()); 706 const registreted = registerEventPackage(epack); 707 assert(registreted, "The event package has not been registered correct (The wave should be dumped)"); 708 return buildWavefront(SECOND_WAVE, received_wave.tides); 709 case SECOND_WAVE: 710 if (received_node.state !is TIDAL_WAVE || !areWeInGraph || joining) { 711 received_node.state = NONE; 712 return buildWavefront(BREAKING_WAVE); 713 } 714 received_node.state = NONE; 715 const from_front_seat = register_wavefront(received_wave, from_channel); 716 immutable epack = event_pack(time, from_front_seat, payload()); 717 const registrated = registerEventPackage(epack); 718 assert(registrated, "The event package has not been registered correct (The wave should be dumped)"); 719 } 720 return buildWavefront(NONE); 721 } 722 } 723 724 const return_wavefront = wavefront_response; 725 if (return_wavefront.state !is ExchangeState.NONE) { 726 const sender = hirpc.wavefront(return_wavefront); 727 response(sender); 728 } 729 } 730 731 void front_seat(Event event) 732 in { 733 assert(event, "event must be defined"); 734 } 735 do { 736 getNode(event.channel).front_seat(event); 737 } 738 739 @safe 740 class Node { 741 ExchangeState state; 742 immutable size_t node_id; 743 immutable(Pubkey) channel; 744 private bool _offline; 745 private this(const Pubkey channel, const size_t node_id) pure nothrow { 746 this.node_id = node_id; 747 this.channel = channel; 748 } 749 750 protected ExchangeState _sticky_state = ExchangeState.RIPPLE; 751 752 void sticky_state(const(ExchangeState) state) pure nothrow @nogc { 753 754 if (state > _sticky_state) { 755 _sticky_state = state; 756 } 757 } 758 759 final bool offline() const pure nothrow @nogc { 760 return _offline; 761 } 762 763 const(ExchangeState) sticky_state() const pure nothrow @nogc { 764 return _sticky_state; 765 } 766 /++ 767 Register first event 768 +/ 769 private void front_seat(Event event) 770 in { 771 assert(event.channel == channel, "Wrong channel"); 772 } 773 do { 774 if (_event is null) { 775 _event = event; 776 } 777 else if (higher(event.altitude, _event.altitude)) { 778 // Event.check(event.mother !is null, ConsensusFailCode.EVENT_MOTHER_LESS); 779 _event = event; 780 } 781 } 782 783 private Event _event; /// This is the last event in this Node 784 785 @nogc 786 const(Event) event() const pure nothrow { 787 return _event; 788 } 789 790 @nogc pure nothrow { 791 package final Event event() { 792 return _event; 793 } 794 795 final bool isOnline() const { 796 return (_event !is null); 797 } 798 799 final int altitude() const 800 in { 801 assert(_event !is null, "This node has no events so the altitude is not set yet"); 802 } 803 out { 804 assert(_event.isFront); 805 } 806 do { 807 return _event.altitude; 808 } 809 810 package Event.Range!false opSlice() { 811 if (_event) { 812 return _event[]; 813 } 814 return Event.Range!false(null); 815 } 816 817 Event.Range!true opSlice() const { 818 if (_event) { 819 return _event[]; 820 } 821 return Event.Range!true(null); 822 } 823 } 824 } 825 826 import std.traits : fullyQualifiedName; 827 828 alias NodeRange = typeof((cast(const) _nodes).byValue); 829 830 @nogc 831 NodeRange opSlice() const pure nothrow { 832 return _nodes.byValue; 833 } 834 835 @nogc 836 size_t active_nodes() const pure nothrow { 837 return _nodes.length; 838 } 839 840 @nogc 841 const(SecureNet) net() const pure nothrow { 842 return hirpc.net; 843 } 844 845 package Node getNode(Pubkey channel) pure { 846 const next_id = next_node_id; 847 return _nodes.require(channel, new Node(channel, next_id)); 848 } 849 850 @nogc 851 bool isMajority(const size_t voting) const pure nothrow { 852 return .isMajority(voting, node_size); 853 } 854 855 private void remove_node(Node n) nothrow 856 in { 857 assert(n !is null); 858 assert(n.channel in _nodes, __format("Node id %d is not removable because it does not exist", n 859 .node_id)); 860 } 861 do { 862 _nodes.remove(n.channel); 863 } 864 865 bool remove_node(const Pubkey pkey) nothrow { 866 if (pkey in _nodes) { 867 _nodes.remove(pkey); 868 return true; 869 } 870 return false; 871 } 872 873 void mark_offline(const(size_t) node_id) nothrow { 874 875 auto mark_node = _nodes.byKeyValue 876 .filter!((pair) => !pair.value._offline) 877 .filter!((pair) => pair.value.node_id == node_id) 878 .map!(pair => pair.value); 879 if (mark_node.empty) { 880 return; 881 } 882 mark_node.front._offline = true; 883 } 884 885 @nogc 886 uint next_event_id() pure nothrow { 887 event_id++; 888 if (event_id is event_id.init) { 889 return event_id.init + 1; 890 } 891 return event_id; 892 } 893 894 size_t next_node_id() const pure nothrow { 895 if (_nodes.length is 0) { 896 return 0; 897 } 898 BitMask used_nodes; 899 _nodes.byValue 900 .map!(a => a.node_id) 901 .each!((n) { used_nodes[n] = true; }); 902 return (~used_nodes)[].front; 903 } 904 905 //bool disable_scrapping; 906 907 enum max_package_size = 0x1000; 908 enum round_clean_limit = 10; 909 910 /++ 911 Dumps all events in the Hashgraph to a file 912 +/ 913 void fwrite(string filename, Pubkey[string] node_labels = null) { 914 import tagion.hashgraphview.EventView; 915 import tagion.hibon.HiBONFile : fwrite; 916 917 size_t[Pubkey] node_id_relocation; 918 if (node_labels.length) { 919 // assert(node_labels.length is _nodes.length); 920 auto names = node_labels.keys; 921 names.sort; 922 foreach (i, name; names) { 923 node_id_relocation[node_labels[name]] = i; 924 } 925 926 } 927 auto events = new HiBON; 928 (() @trusted { 929 foreach (n; _nodes) { 930 const node_id = (node_id_relocation.length is 0) ? size_t.max : node_id_relocation[n.channel]; 931 n[] 932 .filter!((e) => !e.isGrounded) 933 .each!((e) => events[e.id] = EventView(e, node_id)); 934 } 935 })(); 936 auto h = new HiBON; 937 h[Params.size] = node_size; 938 h[Params.events] = events; 939 filename.fwrite(h); 940 } 941 942 }