ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/svn/vendor/libpeak-0.1.2/peak/engine_mod_poll.c
Revision: 3251
Committed: Wed Apr 2 16:58:30 2014 UTC (11 years, 4 months ago) by michael
Content type: text/x-csrc
File size: 16802 byte(s)
Log Message:
- Imported libpeak-0.1.2

File Contents

# Content
1 /* PEAK Library
2 *
3 * Copyright (c) 2003, 2004, 2005
4 * Stephane Thiell <mbuna@bugged.org>. All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
22 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
23 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
24 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
25 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 *
29 * $Id: engine_mod_poll.c,v 1.7 2005/02/28 15:25:37 mbuna Exp $
30 */
31 #define RCSID "$Id: engine_mod_poll.c,v 1.7 2005/02/28 15:25:37 mbuna Exp $"
32
33 #ifdef HAVE_CONFIG_H
34 #include "config.h"
35 #endif
36
37 #include "engine.h"
38
39 #include <assert.h>
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <sys/types.h>
44 #include <sys/socket.h>
45
46 #ifdef HAVE_POLL_H
47 /* The POSIX standard suggests that poll.h be installed as <poll.h>. */
48 #include <poll.h>
49 #elif defined(HAVE_SYS_POLL_H)
50 /* But the standard is not portable! */
51 #include <sys/poll.h>
52 #endif
53
54 #include <time.h>
55 #include <unistd.h>
56 #ifdef HAVE_SIGNAL_H
57 #include <signal.h>
58 #endif
59
60 #include "internal.h"
61 #include "socket.h"
62 #include "spinlock.h"
63 #include "task_private.h"
64 #include "utilities.h"
65
66 /* Figure out what bits to set for read */
67 #if defined(POLLMSG) && defined(POLLIN) && defined(POLLRDNORM)
68 # define POLLREADFLAGS (POLLMSG|POLLIN|POLLRDNORM)
69 #elif defined(POLLIN) && defined(POLLRDNORM)
70 # define POLLREADFLAGS (POLLIN|POLLRDNORM)
71 #elif defined(POLLIN)
72 # define POLLREADFLAGS POLLIN
73 #elif defined(POLLRDNORM)
74 # define POLLREADFLAGS POLLRDNORM
75 #endif
76
77 /* Figure out what bits to set for write */
78 #if defined(POLLOUT) && defined(POLLWRNORM)
79 # define POLLWRITEFLAGS (POLLOUT|POLLWRNORM)
80 #elif defined(POLLOUT)
81 # define POLLWRITEFLAGS POLLOUT
82 #elif defined(POLLWRNORM)
83 # define POLLWRITEFLAGS POLLWRNORM
84 #endif
85
86 static peak_spinlock_t pollfdLock = PEAK_SPINLOCK_INITIALIZER;
87
88 static void __peak_engine_init(peak_engine e, va_list vp, void *ctcx);
89 static void __peak_engine_finalize(peak_engine e);
90 static void __peak_engine_allocate_pollfds(peak_engine e);
91 static void __peak_engine_set_or_clear(peak_engine e, peak_engine_client c,
92 uint32_t set, uint32_t clear);
93 static void __peak_engine_add_signal(peak_engine e, peak_engine_client c);
94 static void __peak_engine_signal_trap(int signum);
95 static void __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
96 int event, int info);
97
98 #ifndef MAX_SIGNUM
99 #ifdef NSIG
100 #define MAX_SIGNUM (NSIG-1)
101 #else
102 #define MAX_SIGNUM 31
103 #endif
104 #endif
105
106 struct __peak_engine
107 {
108 PEAK_STRUCT_RT_HEADER;
109 peak_task _task;
110 peak_engine_client* _clients;
111 struct pollfd* _pollfds;
112 int _allocfds;
113 int _maxfds;
114 int _poll_count;
115 int _poll_min;
116 peak_engine_client _signals[MAX_SIGNUM + 1];
117 int _signal_index;
118 int _ne;
119 volatile int _running;
120 };
121
122 static int interrupt_read_fd, interrupt_write_fd;
123
124 PEAK_CLASS_BASE_DECLARE(engine);
125
126 __private_extern__ const char *
127 _peak_engine_get_name(peak_engine e)
128 {
129 return "poll";
130 }
131
132 __private_extern__ peak_engine
133 _peak_engine_create(peak_task task)
134 {
135 return PEAK_CLASS_CONSTRUCT1(engine, task);
136 }
137
138 static void
139 __peak_engine_init(peak_engine e, va_list vp, void *ctcx)
140 {
141 int i, p[2];
142
143 e->_task = va_arg(vp, peak_task);
144 e->_allocfds = 0;
145 e->_maxfds = peak_set_fdlimit(PEAK_DEFAULT_FLAVOR_MAXFDS);
146 e->_clients = NULL;
147 e->_pollfds = NULL;
148 e->_poll_count = 0;
149 e->_poll_min = 0;
150
151 for (i = 0; i <= MAX_SIGNUM; i++)
152 e->_signals[i] = NULL;
153
154 e->_signal_index = -1;
155
156 if (interrupt_read_fd == 0)
157 {
158 if (pipe(p) == -1)
159 PEAK_HALT;
160 interrupt_read_fd = p[0];
161 interrupt_write_fd = p[1];
162 }
163
164 e->_running = 0;
165 }
166
167 static void
168 __peak_engine_finalize(peak_engine e)
169 {
170 if (e->_pollfds)
171 peak_deallocate(e->_pollfds);
172 if (e->_clients)
173 peak_deallocate(e->_clients);
174 }
175
176 __private_extern__ int
177 _peak_engine_get_maxfds(peak_engine e)
178 {
179 return e->_maxfds;
180 }
181
182 __private_extern__ int
183 _peak_engine_set_maxfds(peak_engine e, int maxfds)
184 {
185 if (maxfds <= 0) /* doh */
186 return -1;
187
188 maxfds += 2; /* for signal fds */
189
190 if (e->_allocfds > 0 && maxfds > e->_allocfds)
191 {
192 peak_engine_client * n_clients;
193 struct pollfd * n_pollfds;
194 int i;
195
196 n_clients = (peak_engine_client *) peak_allocate(sizeof(peak_engine_client)
197 * (size_t)maxfds);
198 n_pollfds = (struct pollfd *) peak_allocate(sizeof(struct pollfd)
199 * (size_t)maxfds);
200 for (i = 0; i < e->_maxfds; i++)
201 {
202 n_clients[i] = e->_clients[i];
203 n_pollfds[i] = e->_pollfds[i];
204 }
205 for (i = e->_maxfds; i < maxfds; i++)
206 {
207 n_clients[i] = NULL;
208 n_pollfds[i].fd = -1;
209 n_pollfds[i].events = 0;
210 n_pollfds[i].revents = 0;
211 }
212 _peak_spinlock_lock(&pollfdLock);
213 e->_allocfds = maxfds;
214 e->_maxfds = maxfds;
215 peak_deallocate(e->_pollfds);
216 peak_deallocate(e->_clients);
217 e->_clients = n_clients;
218 e->_pollfds = n_pollfds;
219 _peak_spinlock_unlock(&pollfdLock);
220 }
221 else
222 {
223 _peak_spinlock_lock(&pollfdLock);
224 e->_maxfds = peak_set_fdlimit(maxfds);
225 _peak_spinlock_unlock(&pollfdLock);
226 }
227 return maxfds == e->_maxfds ? 0 : -1;
228 }
229
230 /* Should be called under the protection of pollfdLock for eventual
231 * concurrent allocation (eg. 2 timers are fired at the same time to create
232 * the first engine's clients on different threads).
233 */
234 static void
235 __peak_engine_allocate_pollfds(peak_engine e)
236 {
237 int i;
238
239 assert(e->_clients == NULL);
240 assert(e->_pollfds == NULL);
241
242 peak_deallocate(e->_pollfds); /* "deallocate workaround" */
243
244 e->_clients = (peak_engine_client *) peak_allocate(sizeof(peak_engine_client)
245 * (size_t)e->_maxfds);
246 e->_pollfds = (struct pollfd *) peak_allocate(sizeof(struct pollfd)
247 * (size_t)e->_maxfds);
248 for (i = 0; i < e->_maxfds; i++)
249 {
250 e->_clients[i] = NULL;
251 e->_pollfds[i].fd = -1;
252 e->_pollfds[i].events = 0;
253 e->_pollfds[i].revents = 0;
254 }
255 e->_poll_count = 0;
256 e->_poll_min = 0;
257 e->_allocfds = e->_maxfds;
258 }
259
260 static void
261 __peak_engine_set_or_clear(peak_engine e, peak_engine_client c,
262 uint32_t set, uint32_t clear)
263 {
264 assert(c->_index < e->_poll_count);
265
266 if ((clear ^ set) & (CS_ACCEPTING|CS_READING)) /* readable has changed */
267 {
268 if (set & (CS_ACCEPTING|CS_READING)) /* it's set */
269 e->_pollfds[c->_index].events |= POLLREADFLAGS;
270 else /* clear it */
271 e->_pollfds[c->_index].events &= ~POLLREADFLAGS;
272 }
273
274 if ((clear ^ set) & (CS_CONNECTING|CS_WRITING)) /* writable has changed */
275 {
276 if (set & (CS_CONNECTING|CS_WRITING)) /* it's set */
277 e->_pollfds[c->_index].events |= POLLWRITEFLAGS;
278 else /* clear it */
279 e->_pollfds[c->_index].events &= ~POLLWRITEFLAGS;
280 }
281 }
282
283 static void
284 __peak_engine_add_signal(peak_engine e, peak_engine_client c)
285 {
286 struct sigaction action;
287 sigset_t stop_signal;
288
289 sigemptyset(&stop_signal);
290 sigaddset(&stop_signal, c->_ident);
291
292 action.sa_handler = __peak_engine_signal_trap;
293 action.sa_mask = stop_signal;
294 action.sa_flags = 0;
295
296 if (c->_ident > MAX_SIGNUM)
297 PEAK_HALT;
298
299 if (sigaction(c->_ident, &action, NULL) == 0)
300 {
301 e->_signals[c->_ident] = c;
302
303 /* Register one real client for all signals, the first signal client is
304 * effectively used for convenience.
305 */
306 if (e->_signal_index == -1)
307 {
308 int i;
309
310 _peak_spinlock_lock(&pollfdLock);
311
312 if (!e->_allocfds)
313 __peak_engine_allocate_pollfds(e);
314
315 for (i = e->_poll_min; e->_clients[i] && i < e->_poll_count; i++)
316 ;
317 if (i >= e->_poll_count)
318 {
319 if (e->_poll_count >= e->_maxfds)
320 PEAK_HALT;
321
322 i = e->_poll_count++;
323 }
324
325 e->_poll_min = i + 1;
326 e->_signal_index = c->_index = i;
327 e->_clients[c->_index] = c;
328 e->_pollfds[c->_index].fd = interrupt_read_fd;
329
330 c->_state |= CS_READING;
331 __peak_engine_set_or_clear(e, c, c->_state, 0);
332 c->_engine = e;
333
334 _peak_spinlock_unlock(&pollfdLock);
335 }
336 }
337 }
338
339 static void
340 __peak_engine_signal_trap(int signum)
341 {
342 assert (interrupt_write_fd >= 0);
343 write(interrupt_write_fd, &signum, sizeof(interrupt_write_fd));
344 }
345
346 __private_extern__ void
347 _peak_engine_add_client(peak_engine e, peak_engine_client c)
348 {
349 int i;
350
351 if (c->_state & CS_SIGNAL)
352 {
353 __peak_engine_add_signal(e, c);
354 return;
355 }
356
357 _peak_spinlock_lock(&pollfdLock);
358
359 if (!e->_allocfds)
360 __peak_engine_allocate_pollfds(e);
361
362 /* Find an empty slot */
363 for (i = e->_poll_min; e->_clients[i] && i < e->_poll_count; i++)
364 ;
365
366 if (i >= e->_poll_count)
367 {
368 if (e->_poll_count >= e->_maxfds)
369 PEAK_HALT;
370
371 i = e->_poll_count++;
372 }
373 e->_poll_min = i + 1;
374 c->_index = i;
375 e->_clients[c->_index] = c;
376 e->_pollfds[c->_index].fd = c->_ident;
377
378 __peak_engine_set_or_clear(e, c, c->_state, 0);
379
380 c->_engine = e;
381
382 _peak_spinlock_unlock(&pollfdLock);
383 }
384
385 __private_extern__ void
386 _peak_engine_remove_client(peak_engine e, peak_engine_client c)
387 {
388 _peak_spinlock_lock(&pollfdLock);
389 assert(c != NULL && c == e->_clients[c->_index]);
390
391 if (c->_state & CS_SIGNAL)
392 {
393 struct sigaction action;
394
395 /* Remove a signal: restore default action. */
396 action.sa_handler = SIG_DFL;
397 sigemptyset(&action.sa_mask);
398 action.sa_flags = 0;
399
400 sigaction(c->_ident, &action, NULL);
401 e->_signals[c->_ident] = NULL;
402 }
403 else
404 {
405 e->_clients[c->_index] = NULL;
406 e->_pollfds[c->_index].fd = -1;
407 e->_pollfds[c->_index].events = 0;
408
409 while (e->_poll_count > 0 && e->_clients[e->_poll_count - 1] == NULL)
410 e->_poll_count--;
411
412 if (e->_poll_count < e->_poll_min)
413 e->_poll_min = e->_poll_count;
414 else if (c->_index < e->_poll_min)
415 e->_poll_min = c->_index;
416 }
417 c->_engine = NULL;
418 _peak_spinlock_unlock(&pollfdLock);
419 }
420
421 /* Precondition: Always called under the protection of c->_lock.
422 */
423 __private_extern__ void
424 _peak_engine_edit_client(peak_engine e, peak_engine_client c)
425 {
426 assert(!(c->_state & CS_HANDLED));
427
428 if (c->_sstate != c->_state)
429 {
430 c->_sstate = c->_state;
431 _peak_spinlock_lock(&pollfdLock);
432 __peak_engine_set_or_clear(e, c, c->_state, CS_ANY);
433 _peak_spinlock_unlock(&pollfdLock);
434 }
435 }
436
437 __private_extern__ void
438 _peak_engine_loop(peak_engine e)
439 {
440 peak_engine_client c;
441 int i, nfds;
442 int err;
443 e->_running = 1;
444
445 /* Workaround for some poll implementations that don't accept a NULL fds
446 * parameter: allocate one fake client (this happens if no engine's client
447 * has been registered yet (eg. only timers).
448 */
449 if (e->_pollfds == NULL)
450 e->_pollfds = (struct pollfd *) peak_allocate(sizeof(struct pollfd));
451
452 do {
453 nfds = poll(e->_pollfds, (unsigned long)e->_poll_count,
454 _peak_task_timer_mswait(e->_task));
455 if (nfds < 0)
456 {
457 fprintf(stderr, "poll error\n");
458 continue;
459 }
460 e->_ne = 0;
461
462 for (i = 0; nfds > 0 && i < e->_poll_count; i++)
463 {
464 if ((c = e->_clients[i]) == NULL)
465 continue;
466
467 if (c->_state & CS_SIGNAL)
468 {
469 if (e->_pollfds[i].revents & POLLREADFLAGS)
470 {
471 int signum;
472
473 if (read(interrupt_read_fd, &signum, sizeof(signum))
474 == sizeof(signum))
475 {
476 if (signum > 0 && signum <= MAX_SIGNUM)
477 __peak_engine_ioevent_generate(e, e->_signals[signum],
478 IOEVENT_SIGNAL, signum);
479 }
480 nfds--;
481 }
482 continue;
483 }
484
485 assert(!(c->_state & CS_SIGNAL));
486 assert(e->_pollfds[i].fd == c->_ident);
487
488 if ((err = peak_socket_get_error(e->_pollfds[i].fd)) != 0)
489 {
490 __peak_engine_ioevent_generate(e, c, IOEVENT_ERROR, err);
491 nfds--;
492 continue;
493 }
494
495 #ifdef POLLHUP
496 if (e->_pollfds[i].revents & POLLHUP)
497 {
498 __peak_engine_ioevent_generate(e, c, IOEVENT_EOF, 0);
499 nfds--;
500 }
501 else
502 #endif
503 if (e->_pollfds[i].revents & POLLREADFLAGS)
504 {
505 if (c->_state & CS_ACCEPTING) /* ready for accept */
506 __peak_engine_ioevent_generate(e, c, IOEVENT_ACCEPT, 0);
507 else
508 {
509 assert(c->_state & CS_READING);
510
511 #ifndef POLLHUP
512 /* PEEK TEST */
513 if (c->_state & CS_PEEKABLE)
514 {
515 switch (peak_socket_peek(e->_pollfds[i].fd))
516 {
517 case -1:
518 if (errno == EAGAIN)
519 {
520 PEAK_WARN("peak_socket_peek triggered EAGAIN");
521 continue; /* Resource temporarily unavailable */
522 }
523 __peak_engine_ioevent_generate(e, c, IOEVENT_ERROR, errno);
524 PEAK_FATAL("peak_socket_peek failed", errno);
525 break;
526 case 0: /* EOF */
527 __peak_engine_ioevent_generate(e, c, IOEVENT_EOF, 0);
528 break;
529 default:
530 __peak_engine_ioevent_generate(e, c, IOEVENT_READ, 0);
531 break;
532 }
533 }
534 else
535 #endif /* !POLLHUP */
536 __peak_engine_ioevent_generate(e, c, IOEVENT_READ, 0);
537 }
538 nfds--;
539 }
540 else if (e->_pollfds[i].revents & POLLWRITEFLAGS)
541 {
542 if (c->_state & CS_CONNECTING)
543 __peak_engine_ioevent_generate(e, c, IOEVENT_CONNECT, 0);
544 else
545 {
546 assert (c->_state & CS_WRITING);
547
548 __peak_engine_ioevent_generate(e, c, IOEVENT_WRITE, 0);
549 }
550 nfds--;
551 }
552 }
553
554 /* Prepare to fire any pending timers
555 */
556 e->_ne += _peak_task_timer_schedule_fire(e->_task);
557
558 /* Process events...
559 */
560 _peak_task_process_pending_events(e->_task, e->_ne);
561
562 } while (e->_running);
563 }
564
565 __private_extern__ void
566 _peak_engine_break(peak_engine e)
567 {
568 e->_running = 0;
569 }
570
571 static void
572 __peak_engine_ioevent_generate(peak_engine e, peak_engine_client c,
573 int event, int info)
574 {
575 uint16_t mclear = 0, mset = 0;
576
577 switch (event)
578 {
579 case IOEVENT_CONNECT:
580 mclear = CS_CONNECTING;
581 mset = CS_CONNECTED|CS_READING|CS_WRITING;
582 break;
583 case IOEVENT_ACCEPT:
584 mclear = CS_ACCEPTING;
585 break;
586 case IOEVENT_READ:
587 mclear = CS_READING;
588 break;
589 case IOEVENT_WRITE:
590 mclear = CS_WRITING;
591 break;
592 case IOEVENT_EOF:
593 case IOEVENT_ERROR:
594 mclear = CS_CONNECTED|CS_READING|CS_WRITING;
595 break;
596 case IOEVENT_SIGNAL:
597 break;
598 default:
599 PEAK_HALT;
600 break;
601 }
602
603 /* Set "event handled" bit */
604 c->_state |= CS_HANDLED;
605
606 /* Cache state */
607 c->_sstate = c->_state;
608
609 /* Prepare */
610 c->_state &= ~mclear;
611 c->_state |= mset;
612
613 /* Schedule for processing */
614 _peak_task_op_ioevent_schedule(e->_task, c, event, info);
615
616 e->_ne++;
617 }
618
619 __private_extern__ void
620 _peak_engine_event_postprocess(peak_engine_client c)
621 {
622 peak_engine e = c->_engine;
623
624 /* Commit changes if necessary, restore stuffs.
625 */
626 _peak_engine_client_lock(c);
627
628 if (c->_sstate != c->_state && e != NULL)
629 __peak_engine_set_or_clear(e, c, c->_state, CS_ANY);
630
631 c->_sstate = 0; /* invalidate cache */
632 c->_state &= ~CS_HANDLED; /* we don't handle it anymore */
633
634 _peak_engine_client_unlock(c);
635 }
636