ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/svn/ircd-hybrid/servlink/io.c
Revision: 32
Committed: Sun Oct 2 20:41:23 2005 UTC (18 years, 5 months ago) by knight
Content type: text/x-csrc
File size: 17343 byte(s)
Log Message:
- svn:keywords

File Contents

# Content
1 /************************************************************************
2 * IRC - Internet Relay Chat, servlink/io.c
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 1, or (at your option)
7 * any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17 *
18 * $Id$
19 */
20
21 #include "stdinc.h"
22
23 #ifdef HAVE_LIBCRYPTO
24 #include <openssl/evp.h>
25 #include <openssl/err.h>
26 #endif
27 #ifdef HAVE_LIBZ
28 #include <zlib.h>
29 #endif
30
31 #include "servlink.h"
32 #include "io.h"
33 #include "control.h"
34
35 static int check_error(int, int, int);
36
37 static const char *
38 fd_name(int fd)
39 {
40 if (fd == CONTROL_R.fd)
41 return "control read";
42 if (fd == CONTROL_W.fd)
43 return "control write";
44 if (fd == LOCAL_R.fd)
45 return "data read";
46 if (fd == LOCAL_W.fd)
47 return "data write";
48 if (fd == REMOTE_R.fd)
49 return "network";
50
51 /* uh oh... */
52 return("unknown");
53 }
54
55 #if defined( HAVE_LIBCRYPTO ) || defined( HAVE_LIBZ )
56 static unsigned char tmp_buf[BUFLEN];
57 #endif
58 #ifdef HAVE_LIBZ
59 static unsigned char tmp2_buf[BUFLEN];
60 #endif
61
62 static unsigned char ctrl_buf[256] = "";
63 static unsigned int ctrl_len = 0;
64 static unsigned int ctrl_ofs = 0;
65
66 void
67 io_loop(int nfds)
68 {
69 fd_set rfds;
70 fd_set wfds;
71 int i, ret;
72
73 /* loop forever */
74 for (;;)
75 {
76 FD_ZERO(&rfds);
77 FD_ZERO(&wfds);
78
79 for (i = 0; i < 5; i++)
80 {
81 if (fds[i].read_cb)
82 FD_SET(fds[i].fd, &rfds);
83 if (fds[i].write_cb)
84 FD_SET(fds[i].fd, &wfds);
85 }
86
87 /* we have <6 fds ever, so I don't think select is too painful */
88 ret = select(nfds, &rfds, &wfds, NULL, NULL);
89
90 if (ret < 0)
91 {
92 check_error(ret, IO_SELECT, -1); /* exit on fatal errors */
93 }
94 else if (ret > 0)
95 {
96 /* call any callbacks */
97 for (i = 0; i < 5; i++)
98 {
99 if (FD_ISSET(fds[i].fd, &rfds) && fds[i].read_cb)
100 (*fds[i].read_cb)();
101 if (FD_ISSET(fds[i].fd, &wfds) && fds[i].write_cb)
102 (*fds[i].write_cb)();
103 }
104 }
105 }
106 }
107
108 void
109 send_data_blocking(int fd, unsigned char *data, int datalen)
110 {
111 int ret;
112 fd_set wfds;
113
114 while (1)
115 {
116 ret = write(fd, data, datalen);
117
118 if (ret == datalen)
119 return;
120 else if ( ret > 0)
121 {
122 data += ret;
123 datalen -= ret;
124 }
125
126 ret = check_error(ret, IO_WRITE, fd);
127
128 FD_ZERO(&wfds);
129 FD_SET(fd, &wfds);
130
131 /* sleep until we can write to the fd */
132 while(1)
133 {
134 ret = select(fd+1, NULL, &wfds, NULL, NULL);
135
136 if (ret > 0) /* break out so we can write */
137 break;
138
139 if (ret < 0) /* error ? */
140 check_error(ret, IO_SELECT, fd); /* exit on fatal errors */
141
142 /* loop on non-fatal errors */
143 }
144 }
145 }
146
147 /*
148 * process_sendq:
149 *
150 * used before CMD_INIT to pass contents of SendQ from ircd
151 * to servlink. This data must _not_ be encrypted/compressed.
152 */
153 void
154 process_sendq(struct ctrl_command *cmd)
155 {
156 send_data_blocking(REMOTE_W.fd, cmd->data, cmd->datalen);
157 }
158
159 /*
160 * process_recvq:
161 *
162 * used before CMD_INIT to pass contents of RecvQ from ircd
163 * to servlink. This data must be decrypted/decopmressed before
164 * sending back to the ircd.
165 */
166 void
167 process_recvq(struct ctrl_command *cmd)
168 {
169 int ret;
170 unsigned char *buf;
171 unsigned int blen;
172 unsigned char *data = cmd->data;
173 unsigned int datalen = cmd->datalen;
174
175 buf = data;
176 blen = datalen;
177 ret = -1;
178 if (datalen > READLEN)
179 send_error("Error processing INJECT_RECVQ - buffer too long (%d > %d)",
180 datalen, READLEN);
181
182 #ifdef HAVE_LIBCRYPTO
183 if (in_state.crypt)
184 {
185 assert(EVP_DecryptUpdate(&in_state.crypt_state.ctx,
186 tmp_buf, &blen,
187 data, datalen));
188 assert(blen == datalen);
189 buf = tmp_buf;
190 }
191 #endif
192
193 #ifdef HAVE_LIBZ
194 if (in_state.zip)
195 {
196 /* decompress data */
197 in_state.zip_state.stream.next_in = buf;
198 in_state.zip_state.stream.avail_in = blen;
199 in_state.zip_state.stream.next_out = tmp2_buf;
200 in_state.zip_state.stream.avail_out = BUFLEN;
201
202 buf = tmp2_buf;
203 while (in_state.zip_state.stream.avail_in)
204 {
205 if ((ret = inflate(&in_state.zip_state.stream,
206 Z_NO_FLUSH)) != Z_OK)
207 send_error("Inflate failed: %d (%s)", ret, zError(ret));
208
209 blen = BUFLEN - in_state.zip_state.stream.avail_out;
210
211 if (in_state.zip_state.stream.avail_in)
212 {
213 send_data_blocking(LOCAL_W.fd, buf, blen);
214 blen = 0;
215 in_state.zip_state.stream.next_out = buf;
216 in_state.zip_state.stream.avail_out = BUFLEN;
217 }
218 }
219
220 if (!blen)
221 return;
222 }
223 #endif
224
225 send_data_blocking(LOCAL_W.fd, buf, blen);
226 }
227
228 void
229 send_zipstats(struct ctrl_command *unused)
230 {
231 #ifdef HAVE_LIBZ
232 int i = 0;
233 int ret;
234
235 if (!in_state.active || !out_state.active)
236 send_error("Error processing CMD_ZIPSTATS - link is not active!");
237 if (!in_state.zip || !out_state.zip)
238 send_error("Error processing CMD_ZIPSTATS - link is not compressed!");
239
240 ctrl_buf[i++] = RPL_ZIPSTATS;
241 ctrl_buf[i++] = 0;
242 ctrl_buf[i++] = 16;
243 ctrl_buf[i++] = ((in_state.zip_state.stream.total_out >> 24) & 0xFF);
244 ctrl_buf[i++] = ((in_state.zip_state.stream.total_out >> 16) & 0xFF);
245 ctrl_buf[i++] = ((in_state.zip_state.stream.total_out >> 8) & 0xFF);
246 ctrl_buf[i++] = ((in_state.zip_state.stream.total_out ) & 0xFF);
247
248 ctrl_buf[i++] = ((in_state.zip_state.stream.total_in >> 24) & 0xFF);
249 ctrl_buf[i++] = ((in_state.zip_state.stream.total_in >> 16) & 0xFF);
250 ctrl_buf[i++] = ((in_state.zip_state.stream.total_in >> 8) & 0xFF);
251 ctrl_buf[i++] = ((in_state.zip_state.stream.total_in ) & 0xFF);
252
253 ctrl_buf[i++] = ((out_state.zip_state.stream.total_in >> 24) & 0xFF);
254 ctrl_buf[i++] = ((out_state.zip_state.stream.total_in >> 16) & 0xFF);
255 ctrl_buf[i++] = ((out_state.zip_state.stream.total_in >> 8) & 0xFF);
256 ctrl_buf[i++] = ((out_state.zip_state.stream.total_in ) & 0xFF);
257
258 ctrl_buf[i++] = ((out_state.zip_state.stream.total_out >> 24) & 0xFF);
259 ctrl_buf[i++] = ((out_state.zip_state.stream.total_out >> 16) & 0xFF);
260 ctrl_buf[i++] = ((out_state.zip_state.stream.total_out >> 8) & 0xFF);
261 ctrl_buf[i++] = ((out_state.zip_state.stream.total_out ) & 0xFF);
262
263 in_state.zip_state.stream.total_in = 0;
264 in_state.zip_state.stream.total_out = 0;
265 out_state.zip_state.stream.total_in = 0;
266 out_state.zip_state.stream.total_out = 0;
267
268 ret = check_error(write(CONTROL_W.fd, ctrl_buf, i), IO_WRITE, CONTROL_W.fd);
269 if (ret < i)
270 {
271 /* write incomplete, register write cb */
272 CONTROL_W.write_cb = write_ctrl;
273 /* deregister read_cb */
274 CONTROL_R.read_cb = NULL;
275 ctrl_ofs = ret;
276 ctrl_len = i - ret;
277 return;
278 }
279 #else
280 send_error("can't send_zipstats -- no zlib support!");
281 #endif
282 }
283
284 /* send_error
285 * - we ran into some problem, make a last ditch effort to
286 * flush the control fd sendq, then (blocking) send an
287 * error message over the control fd.
288 */
289 void
290 send_error(const char *message, ...)
291 {
292 va_list args;
293 static int sending_error = 0;
294 struct linger linger_opt = { 1, 30 }; /* wait 30 seconds */
295 int len;
296
297 if (sending_error)
298 exit(1); /* we did _try_ */
299
300 sending_error = 1;
301
302 if(ctrl_len) /* attempt to flush any data we have... */
303 {
304 send_data_blocking(CONTROL_W.fd, (ctrl_buf+ctrl_ofs), ctrl_len);
305 }
306
307 /* prepare the message, in in_buf, since we won't be using it again.. */
308 in_state.buf[0] = RPL_ERROR;
309 in_state.buf[1] = 0;
310 in_state.buf[2] = 0;
311
312 va_start(args, message);
313 len = vsprintf((char *)in_state.buf+3, message, args);
314 va_end(args);
315
316 in_state.buf[3+len++] = '\0';
317 in_state.buf[1] = len >> 8;
318 in_state.buf[2] = len & 0xFF;
319 len+=3;
320
321 send_data_blocking(CONTROL_W.fd, in_state.buf, len);
322
323 /* XXX - is this portable?
324 * this obviously will fail on a non socket.. */
325 setsockopt(CONTROL_W.fd, SOL_SOCKET, SO_LINGER, &linger_opt,
326 sizeof(struct linger));
327
328 /* well, we've tried... */
329 exit(1); /* now abort */
330 }
331
332 /* read_ctrl
333 * called when a command is waiting on the control pipe
334 */
335 void
336 read_ctrl(void)
337 {
338 int ret;
339 unsigned char tmp[2];
340 unsigned char *len;
341 struct command_def *cdef;
342 static struct ctrl_command cmd = {0, 0, 0, 0, NULL};
343
344 if (cmd.command == 0) /* we don't have a command yet */
345 {
346 cmd.gotdatalen = 0;
347 cmd.datalen = 0;
348 cmd.readdata = 0;
349 cmd.data = NULL;
350
351 /* read the command */
352 if (!(ret = check_error(read(CONTROL_R.fd, tmp, 1),
353 IO_READ, CONTROL_R.fd)))
354 return;
355
356 cmd.command = tmp[0];
357 }
358
359 for (cdef = command_table; cdef->commandid; cdef++)
360 {
361 if (cdef->commandid == cmd.command)
362 break;
363 }
364
365 if (!cdef->commandid)
366 {
367 send_error("Unsupported command (servlink/ircd out of sync?): %d",
368 cmd.command);
369 /* NOTREACHED */
370 }
371
372 /* read datalen for commands including data */
373 if (cdef->flags & COMMAND_FLAG_DATA)
374 {
375 if (cmd.gotdatalen < 2)
376 {
377 len = tmp;
378 if (!(ret = check_error(read(CONTROL_R.fd, len,
379 (2 - cmd.gotdatalen)),
380 IO_READ, CONTROL_R.fd)))
381 return;
382
383 if (cmd.gotdatalen == 0)
384 {
385 cmd.datalen = len[0] << 8;
386 cmd.gotdatalen++;
387 ret--;
388 len++;
389 }
390 if (ret && (cmd.gotdatalen == 1))
391 {
392 cmd.datalen |= len[0];
393 cmd.gotdatalen++;
394 if (cmd.datalen > 0)
395 cmd.data = calloc(cmd.datalen, 1);
396 }
397 }
398 }
399
400 if (cmd.readdata < cmd.datalen) /* try to get any remaining data */
401 {
402 if (!(ret = check_error(read(CONTROL_R.fd,
403 (cmd.data + cmd.readdata),
404 cmd.datalen - cmd.readdata),
405 IO_READ, CONTROL_R.fd)))
406 return;
407
408 cmd.readdata += ret;
409 if (cmd.readdata < cmd.datalen)
410 return;
411 }
412
413 /* we now have the command and any data */
414 (*cdef->handler)(&cmd);
415
416 if (cmd.datalen > 0)
417 free(cmd.data);
418 cmd.command = 0;
419 }
420
421 void
422 write_ctrl(void)
423 {
424 int ret;
425
426 assert(ctrl_len);
427
428 if (!(ret = check_error(write(CONTROL_W.fd, (ctrl_buf + ctrl_ofs),
429 ctrl_len),
430 IO_WRITE, CONTROL_W.fd)))
431 return; /* no data waiting */
432
433 ctrl_len -= ret;
434
435 if (!ctrl_len)
436 {
437 /* write completed, de-register write cb */
438 CONTROL_W.write_cb = NULL;
439 /* reregister read_cb */
440 CONTROL_R.read_cb = read_ctrl;
441 ctrl_ofs = 0;
442 }
443 else
444 ctrl_ofs += ret;
445 }
446
447 void
448 read_data(void)
449 {
450 int ret, ret2;
451 unsigned char *buf = out_state.buf;
452 int blen;
453 ret2 = -1;
454 assert(!out_state.len);
455
456 #if defined(HAVE_LIBZ) || defined(HAVE_LIBCRYPTO)
457 if (out_state.zip || out_state.crypt)
458 buf = tmp_buf;
459 #endif
460
461 while ((ret = check_error(read(LOCAL_R.fd, buf, READLEN),
462 IO_READ, LOCAL_R.fd)))
463 {
464 blen = ret;
465 #ifdef HAVE_LIBZ
466 if (out_state.zip)
467 {
468 out_state.zip_state.stream.next_in = buf;
469 out_state.zip_state.stream.avail_in = ret;
470
471 buf = out_state.buf;
472
473 #ifdef HAVE_LIBCRYPTO
474 if (out_state.crypt)
475 buf = tmp2_buf;
476 #endif
477
478 out_state.zip_state.stream.next_out = buf;
479 out_state.zip_state.stream.avail_out = BUFLEN;
480 if(!(ret2 = deflate(&out_state.zip_state.stream,
481 Z_PARTIAL_FLUSH)) == Z_OK)
482 send_error("error compressing outgoing data - deflate returned %d (%s)",
483 ret2, zError(ret2));
484
485 if (!out_state.zip_state.stream.avail_out)
486 send_error("error compressing outgoing data - avail_out == 0");
487 if (out_state.zip_state.stream.avail_in)
488 send_error("error compressing outgoing data - avail_in != 0");
489
490 blen = BUFLEN - out_state.zip_state.stream.avail_out;
491 }
492 #endif
493
494 #ifdef HAVE_LIBCRYPTO
495 if (out_state.crypt)
496 {
497 /* encrypt data */
498 ret = blen;
499 if (!EVP_EncryptUpdate(&out_state.crypt_state.ctx,
500 out_state.buf, &blen,
501 buf, ret))
502 send_error("error encrypting outgoing data: EncryptUpdate: %s",
503 ERR_error_string(ERR_get_error(), NULL));
504 assert(blen == ret);
505 }
506 #endif
507
508 ret = check_error(write(REMOTE_W.fd, out_state.buf, blen),
509 IO_WRITE, REMOTE_W.fd);
510 if (ret < blen)
511 {
512 /* write incomplete, register write cb */
513 REMOTE_W.write_cb = write_net;
514 /* deregister read_cb */
515 LOCAL_R.read_cb = NULL;
516 out_state.ofs = ret;
517 out_state.len = blen - ret;
518 return;
519 }
520 #if defined(HAVE_LIBZ) || defined(HAVE_LIBCRYPTO)
521 if (out_state.zip || out_state.crypt)
522 buf = tmp_buf;
523 #endif
524 }
525
526 }
527
528 void
529 write_net(void)
530 {
531 int ret;
532
533 assert(out_state.len);
534
535 if (!(ret = check_error(write(REMOTE_W.fd,
536 (out_state.buf + out_state.ofs),
537 out_state.len),
538 IO_WRITE, REMOTE_W.fd)))
539 return; /* no data waiting */
540
541 out_state.len -= ret;
542
543 if (!out_state.len)
544 {
545 /* write completed, de-register write cb */
546 REMOTE_W.write_cb = NULL;
547 /* reregister read_cb */
548 LOCAL_R.read_cb = read_data;
549 out_state.ofs = 0;
550 }
551 else
552 out_state.ofs += ret;
553 }
554
555 void
556 read_net(void)
557 {
558 int ret;
559 int ret2;
560 unsigned char *buf = in_state.buf;
561 int blen;
562 ret2 = -1;
563 assert(!in_state.len);
564
565 #if defined(HAVE_LIBCRYPTO) || defined(HAVE_LIBZ)
566 if (in_state.crypt || in_state.zip)
567 buf = tmp_buf;
568 #endif
569
570 while ((ret = check_error(read(REMOTE_R.fd, buf, READLEN),
571 IO_READ, REMOTE_R.fd)))
572 {
573 blen = ret;
574 #ifdef HAVE_LIBCRYPTO
575 if (in_state.crypt)
576 {
577 /* decrypt data */
578 buf = in_state.buf;
579 #ifdef HAVE_LIBZ
580 if (in_state.zip)
581 buf = tmp2_buf;
582 #endif
583 if (!EVP_DecryptUpdate(&in_state.crypt_state.ctx,
584 buf, &blen,
585 tmp_buf, ret))
586 send_error("error decompressing incoming data - DecryptUpdate: %s",
587 ERR_error_string(ERR_get_error(), NULL));
588 assert(blen == ret);
589 }
590 #endif
591
592 #ifdef HAVE_LIBZ
593 if (in_state.zip)
594 {
595 /* decompress data */
596 in_state.zip_state.stream.next_in = buf;
597 in_state.zip_state.stream.avail_in = ret;
598 in_state.zip_state.stream.next_out = in_state.buf;
599 in_state.zip_state.stream.avail_out = BUFLEN;
600
601 while (in_state.zip_state.stream.avail_in)
602 {
603 if ((ret2 = inflate(&in_state.zip_state.stream,
604 Z_NO_FLUSH)) != Z_OK)
605 send_error("inflate failed: %d (%s)", ret2, zError(ret2));
606
607 blen = BUFLEN - in_state.zip_state.stream.avail_out;
608
609 if (in_state.zip_state.stream.avail_in)
610 {
611 if (blen)
612 {
613 send_data_blocking(LOCAL_W.fd, in_state.buf, blen);
614 blen = 0;
615 }
616
617 in_state.zip_state.stream.next_out = in_state.buf;
618 in_state.zip_state.stream.avail_out = BUFLEN;
619 }
620 }
621
622 if (!blen)
623 return; /* that didn't generate any decompressed input.. */
624 }
625 #endif
626
627 ret = check_error(write(LOCAL_W.fd, in_state.buf, blen),
628 IO_WRITE, LOCAL_W.fd);
629
630 if (ret < blen)
631 {
632 in_state.ofs = ret;
633 in_state.len = blen - ret;
634 /* write incomplete, register write cb */
635 LOCAL_W.write_cb = write_data;
636 /* deregister read_cb */
637 REMOTE_R.read_cb = NULL;
638 return;
639 }
640 #if defined(HAVE_LIBCRYPTO) || defined(HAVE_LIBZ)
641 if (in_state.crypt || in_state.zip)
642 buf = tmp_buf;
643 #endif
644 }
645 }
646
647 void
648 write_data(void)
649 {
650 int ret;
651
652 assert(in_state.len);
653
654 if (!(ret = check_error(write(LOCAL_W.fd,
655 (in_state.buf + in_state.ofs),
656 in_state.len),
657 IO_WRITE, LOCAL_W.fd)))
658 return;
659
660 in_state.len -= ret;
661
662 if (!in_state.len)
663 {
664 /* write completed, de-register write cb */
665 LOCAL_W.write_cb = NULL;
666 /* reregister read_cb */
667 REMOTE_R.read_cb = read_net;
668 in_state.ofs = 0;
669 }
670 else
671 in_state.ofs += ret;
672 }
673
674 int
675 check_error(int ret, int io, int fd)
676 {
677 if (ret > 0) /* no error */
678 return ret;
679 if (ret == 0) /* EOF */
680 {
681 send_error("%s failed on %s: EOF", IO_TYPE(io), FD_NAME(fd));
682 exit(1); /* NOTREACHED */
683 }
684
685 /* ret == -1.. */
686 switch (errno)
687 {
688 case EINPROGRESS:
689 case EWOULDBLOCK:
690 #if EAGAIN != EWOULDBLOCK
691 case EAGAIN:
692 #endif
693 case EALREADY:
694 case EINTR:
695 #ifdef ERESTART
696 case ERESTART:
697 #endif
698 /* non-fatal error, 0 bytes read */
699 return 0;
700 }
701
702 /* fatal error */
703 send_error("%s failed on %s: %s",
704 IO_TYPE(io),
705 FD_NAME(fd),
706 strerror(errno));
707 exit(1); /* NOTREACHED */
708 }

Properties

Name Value
svn:eol-style native
svn:keywords Revision