1 module tagion.communication.HandlerPool; 2 3 import core.time; 4 import std.datetime; 5 import std.stdio; 6 import tagion.basic.Types : Buffer; 7 8 @safe 9 interface ResponseHandler { 10 void setResponse(Buffer response); 11 bool alive(); 12 void close(); 13 14 @safe 15 struct Response(TKey) { 16 immutable(TKey) key; 17 Buffer data; 18 this(const TKey key, Buffer data) inout { 19 import std.traits : isArray, isBasicType; 20 21 static if (isBasicType!TKey) { 22 this.key = key; 23 } 24 else if (isArray!TKey) { 25 static if (is(ForeachType!Tkey == immutable)) { 26 this.key = key; 27 } 28 else { 29 this.key = key.idup; 30 } 31 } 32 else if (is(key == immutable)) { 33 this.key = key; 34 } 35 else { 36 static assert(0, "TKey " ~ TKey.stringof ~ " not supported"); 37 } 38 this.data = data; 39 } 40 } 41 } 42 43 @safe 44 interface HandlerPool(TValue : ResponseHandler, TKey) { 45 protected final class ActiveHandler { 46 protected TValue handler; //TODO: try immutable/const 47 protected SysTime last_timestamp; 48 49 protected const bool update_timestamp; 50 51 /* 52 update_timestamp - for long-live connection 53 */ 54 this(TValue value, const bool update_timestamp = false) { 55 handler = value; 56 this.update_timestamp = update_timestamp; 57 this.last_timestamp = Clock.currTime(); 58 } 59 60 void setResponse(Buffer response) { 61 if (update_timestamp) { 62 this.last_timestamp = Clock.currTime(); 63 } 64 handler.setResponse(response); 65 } 66 67 bool isExpired(const Duration dur) { 68 return (Clock.currTime - last_timestamp) > dur; 69 } 70 } 71 72 ActiveHandler* get(const TKey key); 73 bool contains(const TKey key); 74 void add(const TKey key, ref TValue value, bool long_lived = false); 75 void remove(const TKey key); 76 ulong size(); 77 bool empty(); 78 void setResponse(immutable ResponseHandler.Response!TKey resp); 79 void tick(); 80 } 81 82 @safe 83 class StdHandlerPool(TValue : ResponseHandler, TKey) : HandlerPool!(TValue, TKey) { 84 protected ActiveHandler[TKey] handlers; //TODO: should be threadsafe? 85 protected immutable Duration timeout; 86 87 this(const Duration timeout = Duration.zero) { 88 this.timeout = cast(immutable) timeout; 89 } 90 91 ActiveHandler* get(const TKey key) { 92 auto valuePtr = (key in handlers); 93 // if(valuePtr is null) return null; 94 return valuePtr; 95 } 96 97 bool contains(const TKey key) { 98 return get(key) !is null; 99 } 100 101 void add(const TKey key, ref TValue value, bool long_lived = false) 102 in { 103 assert(!contains(key)); //TODO: special case 104 } 105 do { 106 auto active_handler = new ActiveHandler(value, long_lived); 107 handlers[key] = active_handler; 108 } 109 110 void remove(const TKey key) 111 out { 112 assert(!contains(key)); 113 } 114 do { 115 auto value_ptr = get(key); 116 if (value_ptr !is null) { 117 handlers.remove(key); 118 (*value_ptr).handler.close(); 119 } 120 } 121 122 ulong size() { 123 return handlers.length; 124 } 125 126 bool empty() { 127 return size == 0; 128 } 129 130 void setResponse(immutable ResponseHandler.Response!TKey resp) { //TODO: scope(exit) destroy(resp.stream); ? 131 // writeln("set response: ", resp.key); 132 auto active_connection_ptr = get(resp.key); 133 if (active_connection_ptr !is null) { 134 auto active_connection = *active_connection_ptr; 135 active_connection.setResponse(resp.data); 136 if (!active_connection.handler.alive) { 137 remove(resp.key); 138 } 139 } 140 else { 141 writeln("no respponse handler found"); 142 } 143 tick; 144 } 145 146 void tick() { 147 if (timeout != Duration.zero) { 148 foreach (key, activeHandler; handlers) { 149 if (activeHandler.isExpired(timeout) || !activeHandler.handler.alive()) { 150 // if(activeHandler.isExpired(timeout)) writeln("EXPIRED HANDLER"); 151 // else writeln("HANDLER NOT ALIVE"); 152 remove(key); 153 } 154 } 155 } 156 } 157 } 158 159 @safe 160 unittest { 161 import core.thread; 162 163 @safe 164 class FakeResponseHandler : ResponseHandler { 165 bool setResponseCalled = false; 166 bool alived = true; 167 bool closed = false; 168 void setResponse(Buffer response) { 169 setResponseCalled = true; 170 } 171 172 bool alive() { 173 return alived; 174 } 175 176 void close() { 177 closed = true; 178 } 179 } 180 181 version (none) { //HandlerPool: remove handler on expired 182 auto handler_pool = new StdHandlerPool!(FakeResponseHandler, uint)(10.msecs); 183 auto fakeResponseHandler = new FakeResponseHandler(); 184 handler_pool.add(0, fakeResponseHandler); 185 assert(!handler_pool.empty); 186 handler_pool.tick; 187 assert(!handler_pool.empty); 188 Thread.sleep(1.msecs); 189 handler_pool.tick; 190 assert(!handler_pool.empty); 191 Thread.sleep(10.msecs); 192 handler_pool.tick; 193 assert(handler_pool.empty); 194 } 195 196 version (none) { //HandlerPool: update timestamp on set response 197 auto handler_pool = new StdHandlerPool!(FakeResponseHandler, uint)(10.msecs); 198 auto fakeResponseHandler = new FakeResponseHandler(); 199 handler_pool.add(1, fakeResponseHandler, true); 200 assert(!handler_pool.empty); 201 handler_pool.tick; 202 assert(!handler_pool.empty); 203 Thread.sleep(7.msecs); 204 handler_pool.tick; 205 assert(!handler_pool.empty); 206 immutable response = ResponseHandler.Response!uint(1, cast(Buffer)[]); 207 handler_pool.setResponse(response); 208 assert(fakeResponseHandler.setResponseCalled); 209 Thread.sleep(7.msecs); 210 handler_pool.tick; 211 assert(!handler_pool.empty); 212 Thread.sleep(5.msecs); 213 handler_pool.tick; 214 assert(handler_pool.empty); 215 } 216 217 { //HandlerPool: remove handler if not alive after set response 218 auto handler_pool = new StdHandlerPool!(FakeResponseHandler, uint)(10.msecs); 219 auto fakeResponseHandler = new FakeResponseHandler(); 220 handler_pool.add(0, fakeResponseHandler); 221 assert(!handler_pool.empty); 222 223 fakeResponseHandler.alived = false; 224 immutable response = immutable(ResponseHandler.Response!uint)(0, null); 225 handler_pool.setResponse(response); 226 assert(fakeResponseHandler.closed); 227 assert(fakeResponseHandler.setResponseCalled); 228 assert(handler_pool.empty); 229 } 230 { //HandlerPool: remove handler if not alive after tick 231 auto handler_pool = new StdHandlerPool!(FakeResponseHandler, uint)(10.msecs); 232 auto fakeResponseHandler = new FakeResponseHandler(); 233 handler_pool.add(0, fakeResponseHandler); 234 assert(!handler_pool.empty); 235 handler_pool.tick; 236 assert(!handler_pool.empty); 237 238 fakeResponseHandler.alived = false; 239 handler_pool.tick; 240 assert(fakeResponseHandler.closed); 241 assert(!fakeResponseHandler.setResponseCalled); 242 assert(handler_pool.empty); 243 } 244 }