/[svn]/branches/newio/src/ioengine_kqueue.c
ViewVC logotype

Contents of /branches/newio/src/ioengine_kqueue.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2385 - (show annotations)
Sat Jul 6 20:31:15 2013 UTC (9 years ago) by michael
File MIME type: text/x-chdr
File size: 13497 byte(s)
- Add various socket engines

1 /*
2 * ircd-hybrid: an advanced Internet Relay Chat Daemon(ircd).
3 *
4 * Copyright (C) 2001 Kevin L. Mitchell <klmitch@mit.edu>
5 * Copyright (C) 2013 by the Hybrid Development Team.
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 * USA
21 */
22
23 /*! \file ioengine_kqueue.c
24 * \brief FreeBSD kqueue()/kevent() event engine.
25 * \version $Id: ioengine_kqueue.c 2297 2013-06-19 11:57:38Z michael $
26 */
27
28 #include "stdinc.h"
29 #include "ioengine.h"
30 #include "ircd.h"
31 #include "memory.h"
32 #include "log.h"
33 #include "restart.h"
34
35 #include <errno.h>
36 #include <signal.h>
37 #include <sys/types.h>
38 #include <sys/event.h>
39 #include <sys/socket.h>
40 #include <sys/time.h>
41 #include <time.h>
42 #include <unistd.h>
43
44 #define KQUEUE_ERROR_THRESHOLD 20 /**< after 20 kqueue errors, restart */
45 #define ERROR_EXPIRE_TIME 3600 /**< expire errors after an hour */
46
47 /** Array of active Socket structures, indexed by file descriptor. */
48 static struct Socket** sockList;
49 /** Maximum file descriptor supported, plus one. */
50 static int kqueue_max;
51 /** File descriptor for kqueue pseudo-file. */
52 static int kqueue_id;
53
54 /** Number of recent errors from kqueue. */
55 static int errors;
56 /** Periodic timer to forget errors. */
57 static struct Timer clear_error;
58
59 /** Decrement the error count (once per hour).
60 * @param[in] ev Expired timer event (ignored).
61 */
62 static void
63 error_clear(struct Event* ev)
64 {
65 if (!--errors) /* remove timer when error count reaches 0 */
66 timer_del(ev_timer(ev));
67 }
68
69 /** Initialize the kqueue engine.
70 * @param[in] max_sockets Maximum number of file descriptors to support.
71 * @return Non-zero on success, or zero on failure.
72 */
73 static int
74 engine_init(int max_sockets)
75 {
76 int i;
77
78 if ((kqueue_id = kqueue()) < 0)
79 {
80 /* initialize... */
81 log_write(LS_SYSTEM, L_WARNING, 0,
82 "kqueue() engine cannot initialize: %m");
83 return 0;
84 }
85
86 /* allocate necessary memory */
87 sockList = (struct Socket**) MyMalloc(sizeof(struct Socket*) * max_sockets);
88
89 /* initialize the data */
90 for (i = 0; i < max_sockets; i++)
91 sockList[i] = 0;
92
93 kqueue_max = max_sockets; /* number of sockets allocated */
94
95 return 1; /* success! */
96 }
97
98 /** Add a signal to the event engine.
99 * @param[in] sig Signal to add to engine.
100 */
101 static void
102 engine_signal(struct Signal* sig)
103 {
104 struct kevent sigevent;
105 struct sigaction act;
106
107 assert(0 != signal);
108
109 ilog(LOG_TYPE_DEBUG, "kqueue: Adding filter for signal %d [%p]",
110 sig_signal(sig), sig);
111
112 sigevent.ident = sig_signal(sig); /* set up the kqueue event */
113 sigevent.filter = EVFILT_SIGNAL; /* looking for signals... */
114 sigevent.flags = EV_ADD | EV_ENABLE; /* add and enable it */
115 sigevent.fflags = 0;
116 sigevent.data = 0;
117 sigevent.udata = sig; /* store our user data */
118
119 if (kevent(kqueue_id, &sigevent, 1, 0, 0, 0) < 0)
120 {
121 /* add event */
122 ilog(LOG_TYPE_DEBUG, "Unable to trap signal %d",
123 sig_signal(sig));
124 return;
125 }
126
127 act.sa_handler = SIG_IGN; /* ignore the signal */
128 act.sa_flags = 0;
129 sigemptyset(&act.sa_mask);
130 sigaction(sig_signal(sig), &act, 0);
131 }
132
133 /** Figure out what events go with a given state.
134 * @param[in] state %Socket state to consider.
135 * @param[in] events User-specified preferred event set.
136 * @return Actual set of preferred events.
137 */
138 static unsigned int
139 state_to_events(enum SocketState state, unsigned int events)
140 {
141 switch (state)
142 {
143 case SS_CONNECTING: /* connecting socket */
144 return SOCK_EVENT_WRITABLE;
145 break;
146
147 case SS_LISTENING: /* listening socket */
148 case SS_NOTSOCK: /* our signal socket--just in case */
149 return SOCK_EVENT_READABLE;
150 break;
151
152 case SS_CONNECTED:
153 case SS_DATAGRAM:
154 case SS_CONNECTDG:
155 return events; /* ordinary socket */
156 break;
157 }
158
159 /*NOTREACHED*/
160 return 0;
161 }
162
163 /** Activate kqueue filters as appropriate.
164 * @param[in] sock Socket structure to operate on.
165 * @param[in] clear Set of interest events to clear from socket.
166 * @param[in] set Set of interest events to set on socket.
167 */
168 static void
169 set_or_clear(struct Socket *sock, unsigned int clear, unsigned int set)
170 {
171 int i = 0;
172 struct kevent chglist[2];
173
174 assert(0 != sock);
175 assert(-1 < s_fd(sock));
176
177 if ((clear ^ set) & SOCK_EVENT_READABLE) { /* readable has changed */
178 chglist[i].ident = s_fd(sock); /* set up the change list */
179 chglist[i].filter = EVFILT_READ; /* readable filter */
180 chglist[i].flags = EV_ADD; /* adding it */
181 chglist[i].fflags = 0;
182 chglist[i].data = 0;
183 chglist[i].udata = 0; /* I love udata, but it can't really be used here */
184
185 if (set & SOCK_EVENT_READABLE) /* it's set */
186 chglist[i].flags |= EV_ENABLE;
187 else /* clear it */
188 chglist[i].flags |= EV_DISABLE;
189
190 i++; /* advance to next element */
191 }
192
193 if ((clear ^ set) & SOCK_EVENT_WRITABLE) { /* writable has changed */
194 chglist[i].ident = s_fd(sock); /* set up the change list */
195 chglist[i].filter = EVFILT_WRITE; /* writable filter */
196 chglist[i].flags = EV_ADD; /* adding it */
197 chglist[i].fflags = 0;
198 chglist[i].data = 0;
199 chglist[i].udata = 0;
200
201 if (set & SOCK_EVENT_WRITABLE) /* it's set */
202 chglist[i].flags |= EV_ENABLE;
203 else /* clear it */
204 chglist[i].flags |= EV_DISABLE;
205
206 i++; /* advance count... */
207 }
208
209 if (kevent(kqueue_id, chglist, i, 0, 0, 0) < 0 && errno != EBADF)
210 event_generate(ET_ERROR, sock, errno); /* report error */
211 }
212
213 /** Add a socket to the event engine.
214 * @param[in] sock Socket to add to engine.
215 * @return Non-zero on success, or zero on error.
216 */
217 static int
218 engine_add(struct Socket *sock)
219 {
220 assert(0 != sock);
221 assert(0 == sockList[s_fd(sock)]);
222
223 /* bounds-check... */
224 if (sock->s_fd >= kqueue_max)
225 {
226 ilog(LOG_TYPE_DEBUG,
227 "Attempt to add socket %d (> %d) to event engine",
228 s_fd(sock), kqueue_max);
229 return 0;
230 }
231
232 sockList[s_fd(sock)] = sock; /* add to list */
233
234 ilog(LOG_TYPE_DEBUG, "kqueue: Adding socket %d [%p], state %s, to engine",
235 s_fd(sock), sock, state_to_name(s_state(sock)));
236
237 /* Add socket to queue */
238 set_or_clear(sock, 0, state_to_events(s_state(sock), s_events(sock)));
239
240 return 1; /* success */
241 }
242
243 /** Handle state transition for a socket.
244 * @param[in] sock Socket changing state.
245 * @param[in] new_state New state for socket.
246 */
247 static void
248 engine_state(struct Socket *sock, enum SocketState new_state)
249 {
250 assert(0 != sock);
251 assert(sock == sockList[s_fd(sock)]);
252
253 ilog(LOG_TYPE_DEBUG, "kqueue: Changing state for socket %p to %s",
254 sock, state_to_name(new_state)));
255
256 /* set the correct events */
257 set_or_clear(sock,
258 state_to_events(s_state(sock), s_events(sock)), /* old state */
259 state_to_events(new_state, s_events(sock))); /* new state */
260
261 }
262
263 /** Handle change to preferred socket events.
264 * @param[in] sock Socket getting new interest list.
265 * @param[in] new_events New set of interesting events for socket.
266 */
267 static void
268 engine_events(struct Socket *sock, unsigned int new_events)
269 {
270 assert(0 != sock);
271 assert(sock == sockList[s_fd(sock)]);
272
273 ilog(LOG_TYPE_DEBUG, "kqueue: Changing event mask for socket %p to [%s]",
274 sock, sock_flags(new_events));
275
276 /* set the correct events */
277 set_or_clear(sock,
278 state_to_events(s_state(sock), s_events(sock)), /* old events */
279 state_to_events(s_state(sock), new_events)); /* new events */
280 }
281
282 /** Remove a socket from the event engine.
283 * @param[in] sock Socket being destroyed.
284 */
285 static void
286 engine_delete(struct Socket *sock)
287 {
288 struct kevent dellist[2];
289
290 assert(0 != sock);
291 assert(sock == sockList[s_fd(sock)]);
292
293 ilog(LOG_TYPE_DEBUG, "kqueue: Deleting socket %d [%p], state %s",
294 s_fd(sock), sock, state_to_name(s_state(sock)));
295
296 dellist[0].ident = s_fd(sock); /* set up the delete list */
297 dellist[0].filter = EVFILT_READ; /* readable filter */
298 dellist[0].flags = EV_DELETE; /* delete it */
299 dellist[0].fflags = 0;
300 dellist[0].data = 0;
301 dellist[0].udata = 0;
302
303 dellist[1].ident = s_fd(sock);
304 dellist[1].filter = EVFILT_WRITE; /* writable filter */
305 dellist[1].flags = EV_DELETE; /* delete it */
306 dellist[1].fflags = 0;
307 dellist[1].data = 0;
308 dellist[1].udata = 0;
309
310 sockList[s_fd(sock)] = 0;
311 }
312
313 /** Run engine event loop.
314 * @param[in] gen Lists of generators of various types.
315 */
316 static void
317 engine_loop(struct Generators *gen)
318 {
319 struct kevent *events;
320 int events_count;
321 struct Socket* sock;
322 struct timespec wait;
323 int nevs;
324 int i;
325 int errcode;
326 size_t codesize;
327
328 if ((events_count = 64 /* XXX */) < 20)
329 events_count = 20;
330 events = MyMalloc(sizeof(struct kevent) * events_count);
331
332 while (running)
333 {
334 if ((i = 64 /* XXX */) >= 20 && i != events_count)
335 {
336 events = MyRealloc(events, sizeof(struct kevent) * i);
337 events_count = i;
338 }
339
340 /* set up the sleep time */
341 wait.tv_sec = timer_next(gen) ? (timer_next(gen) - CurrentTime) : -1;
342 wait.tv_nsec = 0;
343
344 ilog(LOG_TYPE_DEBUG, "kqueue: delay: %Tu (%Tu) %Tu", timer_next(gen),
345 CurrentTime, wait.tv_sec);
346
347 /* check for active events */
348 nevs = kevent(kqueue_id, 0, 0, events, events_count,
349 wait.tv_sec < 0 ? 0 : &wait);
350
351 CurrentTime = time(0); /* set current time... */
352
353 if (nevs < 0) {
354 if (errno != EINTR) { /* ignore kevent interrupts */
355 /* Log the kqueue error */
356 ilog(LOG_TYPE_DEBUG, "kevent() error: %m");
357 if (!errors++)
358 timer_add(timer_init(&clear_error), error_clear, 0, TT_PERIODIC,
359 ERROR_EXPIRE_TIME);
360 else if (errors > KQUEUE_ERROR_THRESHOLD) /* too many errors... */
361 restart("too many kevent errors");
362 }
363 /* old code did a sleep(1) here; with usage these days,
364 * that may be too expensive
365 */
366 continue;
367 }
368
369 for (i = 0; i < nevs; i++) {
370 if (events[i].filter == EVFILT_SIGNAL) {
371 /* it's a signal; deal appropriately */
372 event_generate(ET_SIGNAL, events[i].udata, events[i].ident);
373 continue; /* skip socket processing loop */
374 }
375
376 assert(events[i].filter == EVFILT_READ ||
377 events[i].filter == EVFILT_WRITE);
378
379 sock = sockList[events[i].ident];
380 if (!sock) /* slots may become empty while processing events */
381 continue;
382
383 assert(s_fd(sock) == events[i].ident);
384
385 gen_ref_inc(sock); /* can't have it going away on us */
386
387 ilog(LOG_TYPE_DEBUG, "kqueue: Checking socket %p (fd %d) state %s, "
388 "events %s", sock, s_fd(sock), state_to_name(s_state(sock)),
389 sock_flags(s_events(sock)));
390
391 if (s_state(sock) != SS_NOTSOCK) {
392 errcode = 0; /* check for errors on socket */
393 codesize = sizeof(errcode);
394 if (getsockopt(s_fd(sock), SOL_SOCKET, SO_ERROR, &errcode,
395 &codesize) < 0)
396 errcode = errno; /* work around Solaris implementation */
397
398 if (errcode) { /* an error occurred; generate an event */
399 ilog(LOG_TYPE_DEBUG, "kqueue: Error %d on fd %d, socket %p", errcode,
400 s_fd(sock), sock);
401 event_generate(ET_ERROR, sock, errcode);
402 gen_ref_dec(sock); /* careful not to leak reference counts */
403 continue;
404 }
405 }
406
407 switch (s_state(sock)) {
408 case SS_CONNECTING:
409 if (events[i].filter == EVFILT_WRITE) { /* connection completed */
410 ilog(LOG_TYPE_DEBUG, "kqueue: Connection completed");
411 event_generate(ET_CONNECT, sock, 0);
412 }
413 break;
414
415 case SS_LISTENING:
416 if (events[i].filter == EVFILT_READ) { /* connect. to be accept. */
417 ilog(LOG_TYPE_DEBUG, "kqueue: Ready for accept");
418 event_generate(ET_ACCEPT, sock, 0);
419 }
420 break;
421
422 case SS_NOTSOCK: /* doing nothing socket-specific */
423 case SS_CONNECTED:
424 if (events[i].filter == EVFILT_READ) { /* data on socket */
425 ilog(LOG_TYPE_DEBUG, "kqueue: EOF or data to be read");
426 event_generate(events[i].flags & EV_EOF ? ET_EOF : ET_READ, sock, 0);
427 }
428 if (events[i].filter == EVFILT_WRITE) { /* socket writable */
429 ilog(LOG_TYPE_DEBUG, "kqueue: Data can be written");
430 event_generate(ET_WRITE, sock, 0);
431 }
432 break;
433
434 case SS_DATAGRAM: case SS_CONNECTDG:
435 if (events[i].filter == EVFILT_READ) { /* socket readable */
436 ilog(LOG_TYPE_DEBUG, "kqueue: Datagram to be read");
437 event_generate(ET_READ, sock, 0);
438 }
439 if (events[i].filter == EVFILT_WRITE) { /* socket writable */
440 ilog(LOG_TYPE_DEBUG, "kqueue: Datagram can be written");
441 event_generate(ET_WRITE, sock, 0);
442 }
443 break;
444 }
445
446 gen_ref_dec(sock); /* we're done with it */
447 }
448
449 timer_run(); /* execute any pending timers */
450 }
451 }
452
453 /** Descriptor for kqueue() event engine. */
454 struct Engine engine_kqueue =
455 {
456 "kqueue()", /* Engine name */
457 engine_init, /* Engine initialization function */
458 engine_signal, /* Engine signal registration function */
459 engine_add, /* Engine socket registration function */
460 engine_state, /* Engine socket state change function */
461 engine_events, /* Engine socket events mask function */
462 engine_delete, /* Engine socket deletion function */
463 engine_loop /* Core engine event loop */
464 };

svnadmin@ircd-hybrid.org
ViewVC Help
Powered by ViewVC 1.1.28