ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/svn/vendor/libpeak-0.1.2/peak/engine_mod_epoll.c
Revision: 3251
Committed: Wed Apr 2 16:58:30 2014 UTC (10 years ago) by michael
Content type: text/x-csrc
File size: 11581 byte(s)
Log Message:
- Imported libpeak-0.1.2

File Contents

# Content
1 /* PEAK Library
2 *
3 * Copyright (c) 2003, 2004
4 * Stephane Thiell <mbuna@bugged.org>. All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
22 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
23 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
24 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
25 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 *
29 * $Id: engine_mod_epoll.c,v 1.5 2007/03/06 21:02:43 mbuna Exp $
30 */
31 #define RCSID "$Id: engine_mod_epoll.c,v 1.5 2007/03/06 21:02:43 mbuna Exp $"
32
33 #ifdef HAVE_CONFIG_H
34 #include "config.h"
35 #endif
36
37 #include "engine.h"
38
39 #include <assert.h>
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <sys/types.h>
44 #include <sys/epoll.h>
45 #include <sys/socket.h>
46 #include <time.h>
47 #include <unistd.h>
48 #ifdef HAVE_SIGNAL_H
49 #include <signal.h>
50 #endif
51
52 #include "internal.h"
53 #include "socket.h"
54 #include "spinlock.h"
55 #include "task_private.h"
56 #include "utilities.h"
57
58 static peak_spinlock_t epollSignalLock = PEAK_SPINLOCK_INITIALIZER;
59
60 static void __peak_engine_init(peak_engine e, va_list vp);
61 static void __peak_engine_finalize(peak_engine e);
62 static void __peak_engine_set_or_clear(peak_engine e, peak_engine_client c,
63 uint32_t set, uint32_t clear);
64 static void __peak_engine_add_signal(peak_engine e, peak_engine_client c);
65 static void __peak_engine_signal_trap(int signum);
66 static void __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
67 int event, int info);
68
69 #ifndef MAX_SIGNUM
70 #ifdef NSIG
71 #define MAX_SIGNUM (NSIG-1)
72 #else
73 #define MAX_SIGNUM 31
74 #endif
75 #endif
76
77 struct __peak_engine
78 {
79 PEAK_STRUCT_RT_HEADER;
80 peak_task _task;
81 int _epfd;
82 int _maxfds;
83 peak_engine_client _signals[MAX_SIGNUM + 1];
84 int _signal_status;
85 int _ne;
86 volatile int _running;
87 };
88
89 static int interrupt_read_fd, interrupt_write_fd;
90
91 PEAK_CLASS_BASE_DECLARE(engine);
92
93 __private_extern__ const char *
94 _peak_engine_get_name(peak_engine e)
95 {
96 return "epoll";
97 }
98
99 __private_extern__ peak_engine
100 _peak_engine_create(peak_task task)
101 {
102 return PEAK_CLASS_CONSTRUCT1(engine, task);
103 }
104
105 static void
106 __peak_engine_init(peak_engine e, va_list vp)
107 {
108 int i, p[2];
109
110 e->_task = va_arg(vp, peak_task);
111
112 e->_maxfds = peak_set_fdlimit(PEAK_DEFAULT_FLAVOR_MAXFDS);
113 e->_epfd = epoll_create(e->_maxfds);
114 if (e->_epfd == -1)
115 PEAK_FATAL("epoll_create failure", errno);
116
117 for (i = 0; i <= MAX_SIGNUM; i++)
118 e->_signals[i] = NULL;
119
120 e->_signal_status = -1;
121
122 if (interrupt_read_fd == 0)
123 {
124 if (pipe(p) == -1)
125 PEAK_HALT;
126 interrupt_read_fd = p[0];
127 interrupt_write_fd = p[1];
128 }
129
130 e->_running = 0;
131 }
132
133 static void
134 __peak_engine_finalize(peak_engine e)
135 {
136 close(e->_epfd);
137 }
138
139 __private_extern__ int
140 _peak_engine_get_maxfds(peak_engine e)
141 {
142 return e->_maxfds;
143 }
144
145 __private_extern__ int
146 _peak_engine_set_maxfds(peak_engine e, int maxfds)
147 {
148 if (maxfds <= 0)
149 return -1;
150
151 maxfds += 2; /* for signal fds */
152
153 e->_maxfds = peak_set_fdlimit(maxfds);
154
155 return maxfds == e->_maxfds ? 0 : -1;
156 }
157
158 static void
159 __peak_engine_set_or_clear(peak_engine e, peak_engine_client c,
160 uint32_t set, uint32_t clear)
161 {
162 struct epoll_event ev;
163
164 ev.events = c->_events;
165 ev.data.ptr = (void*)c;
166
167 if ((clear ^ set) & (CS_ACCEPTING|CS_READING)) /* readable has changed */
168 {
169 if (set & (CS_ACCEPTING|CS_READING)) /* it's set */
170 ev.events |= EPOLLIN;
171 else /* clear it */
172 ev.events &= ~EPOLLIN;
173 }
174
175 if ((clear ^ set) & (CS_CONNECTING|CS_WRITING)) /* writable has changed */
176 {
177 if (set & (CS_CONNECTING|CS_WRITING)) /* it's set */
178 ev.events |= EPOLLOUT;
179 else /* clear it */
180 ev.events &= ~EPOLLOUT;
181 }
182
183 if (ev.events != c->_events)
184 {
185 if (epoll_ctl(e->_epfd, EPOLL_CTL_MOD, c->_ident, &ev) != 0)
186 PEAK_FATAL("epoll_ctl failure", errno);
187 c->_events = ev.events;
188 }
189 }
190
191 static void
192 __peak_engine_add_signal(peak_engine e, peak_engine_client c)
193 {
194 struct sigaction action;
195 sigset_t stop_signal;
196
197 sigemptyset(&stop_signal);
198 sigaddset(&stop_signal, c->_ident);
199
200 action.sa_handler = __peak_engine_signal_trap;
201 action.sa_mask = stop_signal;
202 action.sa_flags = 0;
203
204 if (c->_ident > MAX_SIGNUM)
205 PEAK_HALT;
206
207 if (sigaction(c->_ident, &action, NULL) == 0)
208 {
209 e->_signals[c->_ident] = c;
210
211 _peak_spinlock_lock(&epollSignalLock);
212
213 /* Register one real client for all signals, the first signal client is
214 * effectively used for convenience.
215 */
216 if (e->_signal_status == -1)
217 {
218 struct epoll_event ev;
219
220 c->_events = EPOLLIN;
221 ev.events = c->_events;
222 ev.data.ptr = (void*)c;
223
224 c->_state |= CS_READING;
225 if (epoll_ctl(e->_epfd, EPOLL_CTL_ADD, interrupt_read_fd, &ev) != 0)
226 PEAK_FATAL("epoll_ctl failure", errno);
227
228 e->_signal_status = 0;
229 c->_engine = e;
230 }
231 _peak_spinlock_unlock(&epollSignalLock);
232 }
233 else
234 PEAK_HALT;
235 }
236
237 static void
238 __peak_engine_signal_trap(int signum)
239 {
240 assert (interrupt_write_fd >= 0);
241 write(interrupt_write_fd, &signum, sizeof(interrupt_write_fd));
242 }
243
244 __private_extern__ void
245 _peak_engine_add_client(peak_engine e, peak_engine_client c)
246 {
247 struct epoll_event ev;
248
249 if (c->_state & CS_SIGNAL)
250 {
251 __peak_engine_add_signal(e, c);
252 return;
253 }
254
255 /* Bound checking? */
256
257 c->_events = 0;
258 ev.events = c->_events;
259 ev.data.ptr = (void*)c;
260 if (epoll_ctl(e->_epfd, EPOLL_CTL_ADD, c->_ident, &ev) != 0)
261 PEAK_FATAL("epoll_ctl failure", errno);
262
263 __peak_engine_set_or_clear(e, c, c->_state, 0);
264
265 c->_engine = e;
266 }
267
268 __private_extern__ void
269 _peak_engine_remove_client(peak_engine e, peak_engine_client c)
270 {
271 assert(c != NULL);
272
273 if (c->_state & CS_SIGNAL)
274 {
275 struct sigaction action;
276
277 /* Remove a signal: restore default action. */
278 action.sa_handler = SIG_DFL;
279 sigemptyset(&action.sa_mask);
280 action.sa_flags = 0;
281
282 sigaction(c->_ident, &action, NULL);
283 e->_signals[c->_ident] = NULL;
284 }
285 else if (epoll_ctl(e->_epfd, EPOLL_CTL_DEL, c->_ident, NULL) != 0)
286 PEAK_FATAL("epoll_ctl failure", errno);
287
288 c->_engine = NULL;
289 }
290
291 /* Precondition: Always called under the protection of c->_lock.
292 */
293 __private_extern__ void
294 _peak_engine_edit_client(peak_engine e, peak_engine_client c)
295 {
296 assert(!(c->_state & CS_HANDLED));
297
298 if (c->_sstate != c->_state)
299 {
300 c->_sstate = c->_state;
301 __peak_engine_set_or_clear(e, c, c->_state, CS_ANY);
302 }
303 }
304
305 __private_extern__ void
306 _peak_engine_loop(peak_engine e)
307 {
308 peak_engine_client c;
309 struct epoll_event events[24];
310 int maxevents = 24;
311 int i, nfds;
312 int err;
313
314 e->_running = 1;
315
316 do {
317 nfds = epoll_wait(e->_epfd, events, maxevents,
318 _peak_task_timer_mswait(e->_task));
319 if (nfds < 0)
320 {
321 fprintf(stderr, "epoll_wait failure\n");
322 continue;
323 }
324 e->_ne = 0;
325
326 for (i = 0; i < nfds; i++)
327 {
328 if ((c = (peak_engine_client)events[i].data.ptr) == NULL)
329 continue;
330
331 if (c->_state & CS_SIGNAL)
332 {
333 if (events[i].events & EPOLLIN)
334 {
335 int signum;
336
337 if (read(interrupt_read_fd, &signum, sizeof(signum))
338 == sizeof(signum))
339 {
340 if (signum > 0 && signum <= MAX_SIGNUM)
341 __peak_engine_ioevent_generate(e, e->_signals[signum],
342 IOEVENT_SIGNAL, signum);
343 }
344 }
345 continue;
346 }
347
348 assert(!(c->_state & CS_SIGNAL));
349
350 if (events[i].events & EPOLLERR &&
351 (err = peak_socket_get_error(c->_ident)) != 0)
352 {
353 __peak_engine_ioevent_generate(e, c, IOEVENT_ERROR, err);
354 continue;
355 }
356
357 if (events[i].events & EPOLLIN)
358 {
359 if (c->_state & CS_ACCEPTING) /* ready for accept */
360 __peak_engine_ioevent_generate(e, c, IOEVENT_ACCEPT, 0);
361 else
362 {
363 assert(c->_state & CS_READING);
364
365 if (events[i].events & EPOLLHUP)
366 __peak_engine_ioevent_generate(e, c, IOEVENT_EOF, 0);
367 else
368 __peak_engine_ioevent_generate(e, c, IOEVENT_READ, 0);
369 }
370 }
371 else if (events[i].events & EPOLLOUT)
372 {
373 if (c->_state & CS_CONNECTING)
374 __peak_engine_ioevent_generate(e, c, IOEVENT_CONNECT, 0);
375 else
376 {
377 assert (c->_state & CS_WRITING);
378
379 __peak_engine_ioevent_generate(e, c, IOEVENT_WRITE, 0);
380 }
381 }
382 }
383
384 /* Prepare to fire any pending timers
385 */
386 e->_ne += _peak_task_timer_schedule_fire(e->_task);
387
388 /* Process events...
389 */
390 _peak_task_process_pending_events(e->_task, e->_ne);
391
392 } while (e->_running);
393 }
394
395 void
396 _peak_engine_break(peak_engine e)
397 {
398 e->_running = 0;
399 }
400
401 static void
402 __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
403 int ioevent, int info)
404 {
405 uint16_t mclear = 0, mset = 0;
406
407 assert(c != NULL);
408
409 switch (ioevent)
410 {
411 case IOEVENT_CONNECT:
412 mclear = CS_CONNECTING;
413 mset = CS_CONNECTED|CS_READING|CS_WRITING;
414 break;
415 case IOEVENT_ACCEPT:
416 mclear = CS_ACCEPTING;
417 break;
418 case IOEVENT_READ:
419 mclear = CS_READING;
420 break;
421 case IOEVENT_WRITE:
422 mclear = CS_WRITING;
423 break;
424 case IOEVENT_EOF:
425 case IOEVENT_ERROR:
426 mclear = CS_CONNECTED|CS_READING|CS_WRITING;
427 break;
428 case IOEVENT_SIGNAL:
429 break;
430 default:
431 PEAK_HALT;
432 break;
433 }
434
435 /* Set "event handled" bit */
436 c->_state |= CS_HANDLED;
437
438 /* Cache state */
439 c->_sstate = c->_state;
440
441 /* Prepare */
442 c->_state &= ~mclear;
443 c->_state |= mset;
444
445 /* Schedule for processing */
446 _peak_task_op_ioevent_schedule(e->_task, c, ioevent, info);
447
448 e->_ne++;
449 }
450
451 __private_extern__ void
452 _peak_engine_event_postprocess(peak_engine_client c)
453 {
454 peak_engine e = c->_engine;
455
456 /* Commit changes if necessary, restore stuffs.
457 */
458 _peak_engine_client_lock(c);
459
460 if (c->_sstate != c->_state && e != NULL)
461 __peak_engine_set_or_clear(e, c, c->_state, CS_ANY);
462
463 c->_sstate = 0; /* invalidate cache */
464 c->_state &= ~CS_HANDLED; /* we don't handle it anymore */
465
466 _peak_engine_client_unlock(c);
467 }
468