1 module tagion.services.subscription;
2 @safe:
3 
4 import core.time : msecs;
5 import nngd;
6 import std.array;
7 import std.format;
8 import std.variant;
9 import tagion.actor;
10 import tagion.communication.HiRPC;
11 import tagion.hibon.Document;
12 import tagion.hibon.HiBON;
13 import tagion.hibon.HiBONRecord;
14 import tagion.logger;
15 import tagion.logger.LogRecords;
16 import tagion.services.exception;
17 
18 struct SubscriptionServiceOptions {
19     import tagion.utils.JSONCommon;
20 
21     string tags;
22     string address;
23 
24     import tagion.services.options : contract_sock_addr;
25 
26     void setDefault() nothrow {
27         address = contract_sock_addr("SUBSCRIPTION_");
28     }
29 
30     uint sendtimeout = 1000;
31     uint sendbufsize = 4096;
32     mixin JSONCommon;
33 }
34 
35 @recordType("sub_payload")
36 struct SubscriptionPayload {
37     @label("topic") string topic_name;
38     @label("task") string task_name;
39     @label("symbol") string symbol_name;
40     @label("data") Document data;
41 
42     mixin HiBONRecord!(q{
43             this(LogInfo info, const(Document) data) {
44                 this.topic_name = info.topic_name;
45                 this.task_name = info.task_name;
46                 this.symbol_name = info.symbol_name;
47                 this.data = data;
48             }
49     });
50 }
51 
52 struct SubscriptionService {
53     void task(immutable(SubscriptionServiceOptions) opts) @trusted {
54         log.registerSubscriptionTask(thisActor.task_name);
55         log("Subscribing to tags");
56         foreach (tag; opts.tags.split(',')) {
57             submask.subscribe(tag);
58         }
59         scope (exit) {
60             foreach (tag; opts.tags.split(',')) {
61                 submask.unsubscribe(tag);
62             }
63         }
64         log("Subscribed to tags");
65 
66         NNGSocket sock = NNGSocket(nng_socket_type.NNG_SOCKET_PUB);
67         sock.sendtimeout = opts.sendtimeout.msecs;
68         sock.sendbuf = opts.sendbufsize;
69 
70         HiRPC hirpc;
71 
72         int rc = sock.listen(opts.address);
73         check(rc == 0, format("Could not listen to url %s: %s", opts.address, rc.nng_errstr));
74 
75         log("Publishing on %s", opts.address);
76 
77         void receiveSubscription(LogInfo info, const(Document) data) @trusted {
78             immutable(ubyte)[] payload;
79 
80             payload = cast(immutable(ubyte)[])(info.topic_name ~ '\0');
81 
82             auto hibon = SubscriptionPayload(info, data);
83             auto sender = hirpc.log(hibon);
84             payload ~= sender.toDoc.serialize;
85 
86             rc = sock.send(payload);
87         }
88 
89         run(&receiveSubscription);
90     }
91 }