1 module eventy.engine; 2 3 import eventy.queues : Queue; 4 import eventy.signal : Signal; 5 import eventy.event : Event; 6 7 import std.container.dlist; 8 import core.sync.mutex : Mutex; 9 import core.thread : Thread, dur, Duration; 10 11 import eventy.exceptions; 12 13 14 import std.stdio; 15 16 /* TODO: Move elsewhere, this thing thinks it's a delegate in the unit test, idk why */ 17 void runner(Event e) 18 { 19 import std.stdio; 20 writeln("Running event", e.id); 21 } 22 23 unittest 24 { 25 Engine engine = new Engine(); 26 engine.start(); 27 28 29 /** 30 * Let the event engine know what typeIDs are 31 * allowed to be queued 32 */ 33 engine.addQueue(1); 34 engine.addQueue(2); 35 36 37 /** 38 * Create a new Signal Handler that will handles 39 * event types `1` and `2` with the given `handler()` 40 * function 41 */ 42 class SignalHandler1 : Signal 43 { 44 this() 45 { 46 super([1,2]); 47 } 48 49 public override void handler(Event e) 50 { 51 import std.stdio; 52 writeln("Running event", e.id); 53 } 54 } 55 56 /** 57 * Tell the event engine that I want to register 58 * the following handler for its queues `1` and `2` 59 */ 60 Signal j = new SignalHandler1(); 61 engine.addSignalHandler(j); 62 63 Event eTest = new Event(1); 64 engine.push(eTest); 65 66 eTest = new Event(2); 67 engine.push(eTest); 68 69 70 Thread.sleep(dur!("seconds")(2)); 71 engine.push(eTest); 72 73 writeln("naai"); 74 } 75 76 /** 77 * Engine 78 * 79 * An instance of this represents an engine that 80 * can, at any time, handle the delivery of new 81 * events, trigger the correct signal handlers 82 * for the respective events, remove signal 83 * handlers, add signal handlers, among many 84 * other things 85 */ 86 public final class Engine : Thread 87 { 88 /* TODO: Or use a queue data structure */ 89 private DList!(Queue) queues; 90 private Mutex queueLock; 91 92 /* TODO: Or use a queue data structure */ 93 private DList!(Signal) handlers; 94 private Mutex handlerLock; 95 96 private Duration sleepTime; 97 98 this() 99 { 100 super(&run); 101 queueLock = new Mutex(); 102 handlerLock = new Mutex(); 103 } 104 105 /** 106 * Set the event loop sleep time 107 * 108 * The load average will sky rocket if it is 0, 109 * which is just because it is calculated on how 110 * full the run queue is, length but also over time 111 * and even just one task continousy in it will 112 * make the average high 113 * 114 * Reason why it's always runnable is the process 115 * (the "thread") is a tight loop with no sleeps 116 * that would dequeue it from the run queue and/or 117 * no I/O system calls that would put it into the 118 * waiting queue 119 */ 120 public void setSleep(Duration time) 121 { 122 sleepTime = time; 123 } 124 125 /** 126 * Adds the given Signal handler 127 * 128 * @param e the Signal handler to add 129 */ 130 public void addSignalHandler(Signal e) 131 { 132 /* Lock the signal-set */ 133 handlerLock.lock(); 134 135 /* Add the new handler */ 136 handlers ~= e; 137 138 /* Unlock the signal-set */ 139 handlerLock.unlock(); 140 } 141 142 /** 143 * Event loop 144 */ 145 public void run() 146 { 147 while(true) 148 { 149 /* TODO: Implement me */ 150 151 /** 152 * TODO: If lock fails, then yield 153 */ 154 155 /** 156 * Lock the queue-set 157 * 158 * Additionally: 159 * Don't waste time spinning on mutex, 160 * if it is not lockable then yield 161 */ 162 while(!queueLock.tryLock_nothrow()) 163 { 164 yield(); 165 } 166 167 168 foreach(Queue queue; queues) 169 { 170 /* If the queue has evenets queued */ 171 if(queue.hasEvents()) 172 { 173 /* TODO: Add different dequeuing techniques */ 174 175 /* Pop the first Event */ 176 Event headEvent = queue.popEvent(); 177 178 /* Get all signal-handlers for this event type */ 179 Signal[] handlersMatched = getSignalsForEvent(headEvent); 180 181 /* Dispatch the signal handlers */ 182 dispatch(handlersMatched, headEvent); 183 184 } 185 } 186 187 /* Unlock the queue set */ 188 queueLock.unlock(); 189 190 /* Yield to stop mutex starvation */ 191 yield(); 192 193 /* TODO: Add yield to stop mutex starvation on a single thread */ 194 195 /* Sleep the thread */ 196 // sleepTime = dur!("seconds")(0); 197 // sleep(sleepTime); 198 } 199 } 200 201 /** 202 * Dispatch(Signal[] set, Event e) 203 * 204 * Creates a new thread per signal and dispatches the event to them 205 * 206 * TODO: Add ability to dispatch on this thread 207 */ 208 private void dispatch(Signal[] signalSet, Event e) 209 { 210 foreach(Signal signal; signalSet) 211 { 212 /* Create a new Thread */ 213 Thread handlerThread = getThread(signal, e); 214 215 /* Start the thread */ 216 handlerThread.start(); 217 } 218 } 219 220 private Thread getThread(Signal signal, Event e) 221 { 222 Thread signalHandlerThread = new class Thread 223 { 224 this() 225 { 226 super(&worker); 227 } 228 229 public void worker() 230 { 231 signal.handler(e); 232 //handler(e); 233 } 234 }; 235 236 return signalHandlerThread; 237 } 238 239 /** 240 * returns all signal(s) responsible for 241 * handling the type of Event provided 242 * 243 * @param e the Event type to match to 244 * @returns Signal[] the list of signal 245 * handlers that handle event e 246 */ 247 public Signal[] getSignalsForEvent(Event e) 248 { 249 /* Matched handlers */ 250 Signal[] matchedHandlers; 251 252 /* Lock the signal-set */ 253 handlerLock.lock(); 254 255 /* Find all handlers matching */ 256 foreach(Signal signal; handlers) 257 { 258 if(signal.handles(e.id)) 259 { 260 matchedHandlers ~= signal; 261 } 262 } 263 264 /* Unlock the signal-set */ 265 handlerLock.unlock(); 266 267 return matchedHandlers; 268 } 269 270 /** 271 * push(Event e) 272 * 273 * Provided an Event, `e`, this will enqueue the event 274 * to 275 */ 276 public void push(Event e) 277 { 278 Queue matchedQueue = findQueue(e.id); 279 280 if(matchedQueue) 281 { 282 /* Append to the queue */ 283 matchedQueue.add(e); 284 } 285 } 286 287 /** 288 * Creates a new queue with the given id 289 * and then adds it 290 * 291 * @param id the id of the new queue to add 292 * @throws EventyException if a queue with 293 * the given id already exists 294 */ 295 public void addQueue(ulong id) 296 { 297 /* Create a new queue with the given id */ 298 Queue newQueue = new Queue(id); 299 300 /* Lock the queue collection */ 301 queueLock.lock(); 302 303 /* If no such queue exists then add it (recursive mutex used) */ 304 if(!findQueue(id)) 305 { 306 /* Add the queue */ 307 queues ~= newQueue; 308 } 309 else 310 { 311 throw new EventyException("Failure to add queue with ID already in use"); 312 } 313 314 315 /* Unlock the queue collection */ 316 queueLock.unlock(); 317 } 318 319 /** 320 * Given an id, this will return 321 * the Queue associated with said 322 * id 323 * 324 * @param id the id of the Queue 325 * @returns The Queue if found but 326 * null otherwise 327 */ 328 public Queue findQueue(ulong id) 329 { 330 /* Lock the queue collection */ 331 queueLock.lock(); 332 333 /* Find the matching queue */ 334 Queue matchedQueue; 335 foreach(Queue queue; queues) 336 { 337 if(queue.id == id) 338 { 339 matchedQueue = queue; 340 break; 341 } 342 } 343 344 /* Unlock the queue collection */ 345 queueLock.unlock(); 346 347 return matchedQueue; 348 } 349 350 /* TODO: Add coumentation */ 351 public ulong[] getTypes() 352 { 353 /* TODO: Implement me */ 354 return null; 355 } 356 }