ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/svn/vendor/libpeak-0.1.2/peak/engine_mod_kqueue.c
Revision: 3251
Committed: Wed Apr 2 16:58:30 2014 UTC (11 years, 4 months ago) by michael
Content type: text/x-csrc
File size: 10961 byte(s)
Log Message:
- Imported libpeak-0.1.2

File Contents

# Content
1 /* PEAK Library
2 *
3 * Copyright (c) 2003, 2004, 2005
4 * Stephane Thiell <mbuna@bugged.org>. All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
22 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
23 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
24 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
25 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 *
29 * $Id: engine_mod_kqueue.c,v 1.6 2005/01/27 16:31:50 mbuna Exp $
30 */
31 #define RCSID "$Id: engine_mod_kqueue.c,v 1.6 2005/01/27 16:31:50 mbuna Exp $"
32
33 #ifdef HAVE_CONFIG_H
34 #include "config.h"
35 #endif
36
37 #include "engine.h"
38
39 #include <assert.h>
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #include <sys/types.h>
45 #include <sys/socket.h>
46 #include <sys/time.h>
47 #include <time.h>
48 #include <unistd.h>
49 #include <sys/event.h>
50 #ifdef HAVE_SIGNAL_H
51 #include <signal.h>
52 #endif
53
54 #include "internal.h"
55 #include "socket.h"
56 #include "spinlock.h"
57 #include "task_private.h"
58 #include "utilities.h"
59
60 /* We don't systematically register for both read and write events, so we
61 * bother to save if we did or not, so that we can properly remove the events
62 * later, without error.
63 */
64 #define CS_KEVENT_READ CS_CUSTOM1
65 #define CS_KEVENT_WRITE CS_CUSTOM2
66
67 static void __peak_engine_init(peak_engine e, va_list vp);
68 static void __peak_engine_finalize(peak_engine e);
69 static void __peak_engine_set_or_clear(peak_engine e, peak_engine_client c,
70 uint32_t set, uint32_t clear);
71 static void __peak_engine_add_signal(peak_engine e, peak_engine_client c);
72 static void __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
73 int event, int info);
74
75
76
77 struct __peak_engine
78 {
79 PEAK_STRUCT_RT_HEADER;
80 peak_task _task;
81 int _maxfds;
82 int _nfds;
83 int _kq;
84 int _ne;
85 volatile int _running;
86 };
87
88 PEAK_CLASS_BASE_DECLARE(engine);
89
90 __private_extern__ const char *
91 _peak_engine_get_name(peak_engine e)
92 {
93 return "kqueue";
94 }
95
96 __private_extern__ peak_engine
97 _peak_engine_create(peak_task task)
98 {
99 return PEAK_CLASS_CONSTRUCT1(engine, task);
100 }
101
102 static void
103 __peak_engine_init(peak_engine e, va_list vp)
104 {
105 e->_task = va_arg(vp, peak_task);
106 e->_maxfds = PEAK_DEFAULT_FLAVOR_MAXFDS;
107 e->_nfds = 0;
108
109 if ((e->_kq = kqueue()) == -1)
110 PEAK_HALT;
111
112 e->_running = 0;
113 }
114
115 static void
116 __peak_engine_finalize(peak_engine e)
117 {
118 }
119
120 __private_extern__ int
121 _peak_engine_get_maxfds(peak_engine e)
122 {
123 return e->_maxfds;
124 }
125
126 __private_extern__ int
127 _peak_engine_set_maxfds(peak_engine e, int maxfds)
128 {
129 if (maxfds <= 0)
130 return -1;
131
132 e->_maxfds = peak_set_fdlimit(maxfds);
133 return (e->_maxfds == maxfds) ? 0 : -1;
134 }
135
136 static void
137 __peak_engine_set_or_clear(peak_engine e, peak_engine_client c,
138 uint32_t set, uint32_t clear)
139 {
140 int i = 0;
141 struct kevent chglist[2];
142
143 if ((clear ^ set) & (CS_ACCEPTING|CS_READING)) /* readable has changed */
144 {
145 EV_SET(&chglist[i], c->_ident, EVFILT_READ, EV_ADD, 0, 0, c);
146
147 if (set & (CS_ACCEPTING|CS_READING)) /* it's set */
148 chglist[i].flags |= EV_ENABLE;
149 else /* clear it */
150 chglist[i].flags |= EV_DISABLE;
151
152 c->_state |= CS_KEVENT_READ;
153 i++;
154 }
155
156 if ((clear ^ set) & (CS_CONNECTING|CS_WRITING)) /* writable has changed */
157 {
158 EV_SET(&chglist[i], c->_ident, EVFILT_WRITE, EV_ADD, 0, 0, c);
159
160 if (set & (CS_CONNECTING|CS_WRITING)) /* it's set */
161 chglist[i].flags |= EV_ENABLE;
162 else /* clear it */
163 chglist[i].flags |= EV_DISABLE;
164
165 c->_state |= CS_KEVENT_WRITE;
166 i++;
167 }
168
169 if (i == 0)
170 PEAK_FATAL("State of engine's client cannot generate event", 0);
171
172 if (kevent(e->_kq, chglist, i, 0, 0, 0) == -1)
173 PEAK_FATAL("kevent failure", errno);
174 }
175
176 static void
177 __peak_engine_add_signal(peak_engine e, peak_engine_client c)
178 {
179 struct kevent sigevent;
180 struct sigaction act;
181
182 assert(c->_state & CS_SIGNAL);
183
184 EV_SET(&sigevent, c->_ident, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, c);
185
186 if (kevent(e->_kq, &sigevent, 1, 0, 0, 0) == -1)
187 PEAK_FATAL("kevent", errno);
188
189 act.sa_handler = SIG_IGN; /* ignore the signal */
190 act.sa_flags = 0;
191 sigemptyset(&act.sa_mask);
192 sigaction(c->_ident, &act, 0);
193 }
194
195 __private_extern__ void
196 _peak_engine_add_client(peak_engine e, peak_engine_client c)
197 {
198 if (c->_state & CS_SIGNAL)
199 {
200 __peak_engine_add_signal(e, c);
201 return;
202 }
203
204 if (++e->_nfds >= e->_maxfds)
205 PEAK_HALT;
206
207 __peak_engine_set_or_clear(e, c, c->_state, 0);
208
209 c->_engine = e;
210 }
211
212 __private_extern__ void
213 _peak_engine_remove_client(peak_engine e, peak_engine_client c)
214 {
215 int i = 0;
216 struct kevent dellist[2];
217
218 assert(c != NULL);
219
220 e->_nfds--;
221 c->_engine = NULL;
222
223 if (c->_state & CS_SIGNAL)
224 {
225 struct sigaction act;
226
227 EV_SET(&dellist[i], c->_ident, EVFILT_SIGNAL, EV_DELETE, 0, 0, 0);
228 i++;
229
230 act.sa_handler = SIG_IGN; /* reset default */
231 act.sa_flags = 0;
232 sigemptyset(&act.sa_mask);
233 sigaction(c->_ident, &act, 0);
234 }
235 else
236 {
237 if (c->_state & CS_KEVENT_READ)
238 {
239 EV_SET(&dellist[i], c->_ident, EVFILT_READ, EV_DELETE, 0, 0, 0);
240 i++;
241 }
242 if (c->_state & CS_KEVENT_WRITE)
243 {
244 EV_SET(&dellist[i], c->_ident, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
245 i++;
246 }
247 }
248
249 if (i > 0 && kevent(e->_kq, dellist, i, 0, 0, 0) == -1)
250 PEAK_FATAL("kevent", errno);
251 }
252
253 /* Precondition: Always called under the protection of c->_lock.
254 */
255 __private_extern__ void
256 _peak_engine_edit_client(peak_engine e, peak_engine_client c)
257 {
258 assert(!(c->_state & CS_HANDLED));
259
260 if (c->_sstate != c->_state)
261 {
262 c->_sstate = c->_state;
263 __peak_engine_set_or_clear(e, c, c->_state, CS_ANY);
264 }
265 }
266
267 __private_extern__ void
268 _peak_engine_loop(peak_engine e)
269 {
270 struct timespec ts;
271 peak_engine_client c;
272 int i, nevs, err;
273 int events_count = 24;
274 struct kevent events[24];
275
276 e->_running = 1;
277
278 do {
279 nevs = kevent(e->_kq, 0, 0, events, events_count,
280 _peak_task_timer_tswait(e->_task, &ts));
281 if (nevs < 0)
282 {
283 fprintf(stderr, "kevent failure\n");
284 continue;
285 }
286
287 e->_ne = 0;
288
289 for (i = 0; i < nevs; i++)
290 {
291 if ((c = (peak_engine_client)events[i].udata) == NULL)
292 PEAK_HALT;
293
294 /* Although implementations of kqueue support it, the library's
295 * design doesn't allow us to handle more than one event at a time for
296 * the same client.
297 */
298 if (c->_state & CS_HANDLED)
299 continue;
300
301 switch (events[i].filter)
302 {
303 case EVFILT_SIGNAL:
304 __peak_engine_ioevent_generate(e, c, IOEVENT_SIGNAL,
305 events[i].ident);
306 break;
307 case EVFILT_READ:
308 if ((err = peak_socket_get_error(events[i].ident)) != 0)
309 {
310 __peak_engine_ioevent_generate(e, c, IOEVENT_ERROR, err);
311 continue;
312 }
313
314 if (c->_state & CS_ACCEPTING)
315 __peak_engine_ioevent_generate(e, c, IOEVENT_ACCEPT, 0);
316 else
317 {
318 if (c->_state & CS_READING)
319 __peak_engine_ioevent_generate(e, c,
320 events[i].flags & EV_EOF ? IOEVENT_EOF : IOEVENT_READ, 0);
321 }
322 break;
323 case EVFILT_WRITE:
324 if ((err = peak_socket_get_error(events[i].ident)) != 0)
325 {
326 __peak_engine_ioevent_generate(e, c, IOEVENT_ERROR, err);
327 continue;
328 }
329
330 if (c->_state & CS_CONNECTING)
331 __peak_engine_ioevent_generate(e, c, IOEVENT_CONNECT, 0);
332 else /* CS_CONNECTED or accepted socket */
333 {
334 if (c->_state & CS_WRITING)
335 __peak_engine_ioevent_generate(e, c, IOEVENT_WRITE, 0);
336 }
337 break;
338
339 default:
340 PEAK_HALT;
341 }
342 }
343
344 /* Prepare to fire any pending timers
345 */
346 e->_ne += _peak_task_timer_schedule_fire(e->_task);
347 _peak_task_process_pending_events(e->_task, e->_ne);
348
349 } while (e->_running);
350 }
351
352 __private_extern__ void
353 _peak_engine_break(peak_engine e)
354 {
355 e->_running = 0;
356 }
357
358 static void
359 __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
360 int event, int info)
361 {
362 uint16_t mclear = 0, mset = 0;
363
364 switch (event)
365 {
366 case IOEVENT_CONNECT:
367 mclear = CS_CONNECTING;
368 mset = CS_CONNECTED|CS_READING|CS_WRITING;
369 break;
370 case IOEVENT_ACCEPT:
371 mclear = CS_ACCEPTING;
372 break;
373 case IOEVENT_READ:
374 mclear = CS_READING;
375 break;
376 case IOEVENT_WRITE:
377 mclear = CS_WRITING;
378 break;
379 case IOEVENT_EOF:
380 case IOEVENT_ERROR:
381 mclear = CS_CONNECTED|CS_READING|CS_WRITING;
382 break;
383 case IOEVENT_SIGNAL:
384 break;
385 default:
386 PEAK_HALT;
387 break;
388 }
389
390 #if 0
391 printf("gen: c->_state=%x\n", c->_state);
392 if (c->_state & CS_HANDLED)
393 {
394 printf("gen: handling several events for the same object fd=%d\n",
395 c->_ident);
396 }
397 #endif
398
399 /* Set "event handled" bit */
400 c->_state |= CS_HANDLED;
401
402 /* Cache state */
403 c->_sstate = c->_state;
404
405 /* Prepare */
406 c->_state &= ~mclear;
407 c->_state |= mset;
408
409 /* Schedule for processing */
410 _peak_task_op_ioevent_schedule(e->_task, c, event, info);
411
412 e->_ne++;
413 }
414
415 __private_extern__ void
416 _peak_engine_event_postprocess(peak_engine_client c)
417 {
418 peak_engine e = c->_engine;
419
420 /* Commit changes if necessary, restore stuffs.
421 */
422 _peak_engine_client_lock(c);
423
424 if (c->_sstate != c->_state && e != NULL)
425 __peak_engine_set_or_clear(e, c, c->_state, CS_ANY);
426
427 c->_sstate = 0; /* invalidate cache */
428 c->_state &= ~CS_HANDLED; /* we don't handle it anymore */
429
430 _peak_engine_client_unlock(c);
431 }