1 module tagion.services.monitor; 2 3 import core.thread; 4 import std.format; 5 import std.socket; 6 import tagion.actor; 7 import tagion.actor.exceptions; 8 import tagion.basic.Types : FileExtension; 9 import tagion.basic.tagionexceptions : TagionException; 10 import tagion.hibon.Document; 11 import tagion.logger.Logger; 12 import tagion.network.ListenerSocket; 13 import tagion.utils.JSONCommon; 14 import tagion.utils.pretend_safe_concurrency; 15 16 @safe 17 struct MonitorOptions { 18 bool enable = false; /// When enabled the Monitor is started 19 ushort port = 10900; /// Monitor port 20 uint timeout = 500; ///.service.server.listerne timeout in msecs 21 FileExtension dataformat = FileExtension.json; 22 string url = "127.0.0.1"; 23 string taskname = "monitor"; 24 mixin JSONCommon; 25 } 26 27 void monitorServiceTask(immutable(MonitorOptions) opts) @trusted nothrow { 28 try { 29 // setState(Ctrl.STARTING); 30 // scope (exit) { 31 // setState(Ctrl.END); 32 // } 33 34 log.task_name = opts.taskname; 35 36 log("SockectThread port=%d addresss=%s", opts.port, opts.url); 37 auto listener_socket = ListenerSocket(opts.url, 38 opts.port, opts.timeout, opts.taskname); 39 auto listener_socket_thread = listener_socket.start; 40 41 scope (exit) { 42 listener_socket.stop; 43 } 44 45 void taskfailure(immutable(TaskFailure) t) @safe { 46 ownerTid.send(t); 47 } 48 49 setState(Ctrl.ALIVE); 50 while (!thisActor.stop) { 51 pragma(msg, "REV: Should the 500.msecs be an opts"); 52 receiveTimeout(500.msecs, //Control the thread 53 &signal, 54 &ownerTerminated, 55 (string json) @trusted { listener_socket.broadcast(json); }, 56 (immutable(ubyte)[] hibon_bytes) @trusted { listener_socket.broadcast(hibon_bytes); }, 57 (Document doc) @trusted { listener_socket.broadcast(doc); }, 58 &taskfailure 59 ); 60 } 61 } 62 catch (Throwable t) { 63 import std.stdio; 64 65 fail(t); 66 } 67 }