1 module tagion.services.subscription; 2 @safe: 3 4 import core.time : msecs; 5 import nngd; 6 import std.array; 7 import std.format; 8 import std.variant; 9 import tagion.actor; 10 import tagion.communication.HiRPC; 11 import tagion.hibon.Document; 12 import tagion.hibon.HiBON; 13 import tagion.hibon.HiBONRecord; 14 import tagion.logger; 15 import tagion.logger.LogRecords; 16 import tagion.services.exception; 17 18 struct SubscriptionServiceOptions { 19 import tagion.utils.JSONCommon; 20 21 string tags; 22 string address; 23 24 import tagion.services.options : contract_sock_addr; 25 26 void setDefault() nothrow { 27 address = contract_sock_addr("SUBSCRIPTION_"); 28 } 29 30 uint sendtimeout = 1000; 31 uint sendbufsize = 4096; 32 mixin JSONCommon; 33 } 34 35 @recordType("sub_payload") 36 struct SubscriptionPayload { 37 @label("topic") string topic_name; 38 @label("task") string task_name; 39 @label("symbol") string symbol_name; 40 @label("data") Document data; 41 42 mixin HiBONRecord!(q{ 43 this(LogInfo info, const(Document) data) { 44 this.topic_name = info.topic_name; 45 this.task_name = info.task_name; 46 this.symbol_name = info.symbol_name; 47 this.data = data; 48 } 49 }); 50 } 51 52 struct SubscriptionService { 53 void task(immutable(SubscriptionServiceOptions) opts) @trusted { 54 log.registerSubscriptionTask(thisActor.task_name); 55 log("Subscribing to tags"); 56 foreach (tag; opts.tags.split(',')) { 57 submask.subscribe(tag); 58 } 59 scope (exit) { 60 foreach (tag; opts.tags.split(',')) { 61 submask.unsubscribe(tag); 62 } 63 } 64 log("Subscribed to tags"); 65 66 NNGSocket sock = NNGSocket(nng_socket_type.NNG_SOCKET_PUB); 67 sock.sendtimeout = opts.sendtimeout.msecs; 68 sock.sendbuf = opts.sendbufsize; 69 70 HiRPC hirpc; 71 72 int rc = sock.listen(opts.address); 73 check(rc == 0, format("Could not listen to url %s: %s", opts.address, rc.nng_errstr)); 74 75 log("Publishing on %s", opts.address); 76 77 void receiveSubscription(LogInfo info, const(Document) data) @trusted { 78 immutable(ubyte)[] payload; 79 80 payload = cast(immutable(ubyte)[])(info.topic_name ~ '\0'); 81 82 auto hibon = SubscriptionPayload(info, data); 83 auto sender = hirpc.log(hibon); 84 payload ~= sender.toDoc.serialize; 85 86 rc = sock.send(payload); 87 } 88 89 run(&receiveSubscription); 90 } 91 }