1 module eventy.engine;
2 
3 import eventy.queues : Queue;
4 import eventy.signal : Signal;
5 import eventy.event : Event;
6 
7 import std.container.dlist;
8 import core.sync.mutex : Mutex;
9 import core.thread : Thread, dur, Duration;
10 
11 import eventy.exceptions;
12 
13 
14 import std.stdio;
15 
16 /* TODO: Move elsewhere, this thing thinks it's a delegate in the unit test, idk why */
17 void runner(Event e)
18 {
19     import std.stdio;
20     writeln("Running event", e.id);
21 }
22 
23 unittest
24 {
25     Engine engine = new Engine();
26     engine.start();
27 
28  
29     /**
30     * Let the event engine know what typeIDs are
31     * allowed to be queued
32     */
33     engine.addQueue(1);
34     engine.addQueue(2);
35 
36 
37     /**
38     * Create a new Signal Handler that will handles
39     * event types `1` and `2` with the given `handler()`
40     * function
41     */
42     class SignalHandler1 : Signal
43     {
44     	this()
45     	{
46     		super([1,2]);
47     	}
48 
49     	public override void handler(Event e)
50     	{
51     		import std.stdio;
52     		writeln("Running event", e.id);
53     	}
54     }
55 
56     /**
57     * Tell the event engine that I want to register
58     * the following handler for its queues `1` and `2`
59     */
60     Signal j = new SignalHandler1();
61     engine.addSignalHandler(j);
62 
63     Event eTest = new Event(1);
64     engine.push(eTest);
65 
66     eTest = new Event(2);
67     engine.push(eTest);
68     
69 
70     Thread.sleep(dur!("seconds")(2));
71     engine.push(eTest);
72 
73     writeln("naai");
74 }
75 
76 /**
77 * Engine
78 *
79 * An instance of this represents an engine that
80 * can, at any time, handle the delivery of new
81 * events, trigger the correct signal handlers
82 * for the respective events, remove signal
83 * handlers, add signal handlers, among many
84 * other things
85 */
86 public final class Engine : Thread
87 {
88     /* TODO: Or use a queue data structure */
89     private DList!(Queue) queues;
90     private Mutex queueLock;
91 
92     /* TODO: Or use a queue data structure */
93     private DList!(Signal) handlers;
94     private Mutex handlerLock;
95 
96     private Duration sleepTime;
97 
98     this()
99     {
100         super(&run);
101         queueLock = new Mutex();
102         handlerLock = new Mutex();
103     }
104 
105     /**
106     * Set the event loop sleep time
107     *
108     * The load average will sky rocket if it is 0,
109     * which is just because it is calculated on how
110     * full the run queue is, length but also over time
111     * and even just one task continousy in it will
112     * make the average high
113     *
114     * Reason why it's always runnable is the process
115     * (the "thread") is a tight loop with no sleeps
116     * that would dequeue it from the run queue and/or
117     * no I/O system calls that would put it into the
118     * waiting queue
119     */
120     public void setSleep(Duration time)
121     {
122         sleepTime = time;
123     }
124 
125     /**
126     * Adds the given Signal handler
127     *
128     * @param e the Signal handler to add
129     */
130     public void addSignalHandler(Signal e)
131     {
132          /* Lock the signal-set */
133         handlerLock.lock();
134 
135         /* Add the new handler */
136         handlers ~= e;
137 
138         /* Unlock the signal-set */
139         handlerLock.unlock();
140     }
141 
142     /**
143     * Event loop
144     */
145     public void run()
146     {
147         while(true)
148         {
149             /* TODO: Implement me */
150 
151             /**
152             * TODO: If lock fails, then yield
153             */
154 
155             /**
156             * Lock the queue-set
157             *
158             * Additionally:
159             * Don't waste time spinning on mutex,
160             * if it is not lockable then yield
161             */
162             while(!queueLock.tryLock_nothrow())
163             {
164                 yield();
165             }
166 
167 
168             foreach(Queue queue; queues)
169             {  
170                 /* If the queue has evenets queued */
171                 if(queue.hasEvents())
172                 {
173                     /* TODO: Add different dequeuing techniques */
174 
175                     /* Pop the first Event */
176                     Event headEvent = queue.popEvent();
177                    
178                     /* Get all signal-handlers for this event type */
179                     Signal[] handlersMatched = getSignalsForEvent(headEvent);
180 
181                     /* Dispatch the signal handlers */
182                     dispatch(handlersMatched, headEvent);
183                     
184                 }
185             }
186 
187             /* Unlock the queue set */
188             queueLock.unlock();
189 
190             /* Yield to stop mutex starvation */
191             yield();
192 
193             /* TODO: Add yield to stop mutex starvation on a single thread */
194 
195             /* Sleep the thread */
196             // sleepTime = dur!("seconds")(0);
197             // sleep(sleepTime);
198         }
199     }
200 
201     /**
202     * Dispatch(Signal[] set, Event e)
203     *
204     * Creates a new thread per signal and dispatches the event to them
205     *
206     * TODO: Add ability to dispatch on this thread
207     */
208     private void dispatch(Signal[] signalSet, Event e)
209     {
210         foreach(Signal signal; signalSet)
211         {
212             /* Create a new Thread */
213             Thread handlerThread = getThread(signal, e);
214 
215             /* Start the thread */
216             handlerThread.start();
217         }
218     }
219 
220     private Thread getThread(Signal signal, Event e)
221     {
222         Thread signalHandlerThread = new class Thread
223         {
224             this()
225             {
226                 super(&worker);
227             }
228 
229             public void worker()
230             {
231                 signal.handler(e);
232                 //handler(e);
233             }
234         };
235 
236         return signalHandlerThread;
237     }
238 
239     /**
240     * returns all signal(s) responsible for
241     * handling the type of Event provided
242     *
243     * @param e the Event type to match to
244     * @returns Signal[] the list of signal
245     * handlers that handle event e
246     */
247     public Signal[] getSignalsForEvent(Event e)
248     {
249         /* Matched handlers */
250         Signal[] matchedHandlers;
251 
252         /* Lock the signal-set */
253         handlerLock.lock();
254 
255         /* Find all handlers matching */
256         foreach(Signal signal; handlers)
257         {
258             if(signal.handles(e.id))
259             {
260                 matchedHandlers ~= signal;
261             }
262         }
263 
264         /* Unlock the signal-set */
265         handlerLock.unlock();
266 
267         return matchedHandlers;
268     }
269 
270     /**
271     * push(Event e)
272     *
273     * Provided an Event, `e`, this will enqueue the event
274     * to 
275     */
276     public void push(Event e)
277     {
278         Queue matchedQueue = findQueue(e.id);
279 
280         if(matchedQueue)
281         {
282             /* Append to the queue */
283             matchedQueue.add(e);
284         }
285     }
286 
287     /**
288     * Creates a new queue with the given id
289     * and then adds it
290     *
291     * @param id the id of the new queue to add
292     * @throws EventyException if a queue with
293     * the given id already exists
294     */
295     public void addQueue(ulong id)
296     {
297         /* Create a new queue with the given id */
298         Queue newQueue = new Queue(id);
299 
300         /* Lock the queue collection */
301         queueLock.lock();
302 
303         /* If no such queue exists then add it (recursive mutex used) */
304         if(!findQueue(id))
305         {
306             /* Add the queue */
307             queues ~= newQueue;
308         }
309         else
310         {
311             throw new EventyException("Failure to add queue with ID already in use");
312         }
313         
314 
315         /* Unlock the queue collection */
316         queueLock.unlock();
317     }
318 
319     /**
320     * Given an id, this will return
321     * the Queue associated with said
322     * id
323     *
324     * @param id the id of the Queue
325     * @returns The Queue if found but
326     * null otherwise
327     */
328     public Queue findQueue(ulong id)
329     {
330         /* Lock the queue collection */
331         queueLock.lock();
332 
333         /* Find the matching queue */
334         Queue matchedQueue;
335         foreach(Queue queue; queues)
336         {
337             if(queue.id == id)
338             {
339                 matchedQueue = queue;
340                 break;
341             }
342         }
343 
344         /* Unlock the queue collection */
345         queueLock.unlock();
346 
347         return matchedQueue;
348     }
349 
350     /* TODO: Add coumentation */
351     public ulong[] getTypes()
352     {
353         /* TODO: Implement me */
354         return null;
355     }
356 }