/[cvs]/eggdrop1.9/testcode/sockbuf.c
ViewVC logotype

Contents of /eggdrop1.9/testcode/sockbuf.c

Parent Directory Parent Directory | Revision Log Revision Log | View Revision Graph Revision Graph


Revision 1.9 - (show annotations) (download) (as text)
Sun Aug 11 20:50:35 2002 UTC (16 years, 7 months ago) by stdarg
Branch: MAIN
CVS Tags: HEAD
Changes since 1.8: +1 -0 lines
File MIME type: text/x-chdr
*** empty log message ***

1 #define HAVE_POLL
2
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <string.h>
7 #include <sys/types.h>
8 #include <sys/socket.h>
9 #include <sys/time.h>
10 #ifdef HAVE_POLL
11 #include <sys/poll.h>
12 #else
13 #include "my_poll.h"
14 #define poll my_poll
15 #endif
16
17 #include <errno.h>
18
19 #include "sockbuf.h"
20
21 typedef struct {
22 int sock; /* Underlying socket descriptor */
23 int flags; /* Keep track of blocked status, client/server */
24
25 char *peer_ip; /* Who we're connected to. */
26 int peer_port;
27
28 char *data; /* Output buffer. */
29 int len; /* Length of buffer. */
30
31 sockbuf_filter_t **filters; /* Line-mode, gzip, ssl... */
32 void **filter_client_data; /* Client data for filters */
33 int nfilters; /* Number of filters */
34
35 sockbuf_handler_t *handler; /* User's event handlers */
36 void *client_data; /* User's client data */
37 } sockbuf_t;
38
39 static sockbuf_t *sockbufs = NULL;
40 static int nsockbufs = 0;
41 static int ndeleted_sockbufs = 0;
42
43 /* 'idx_array' and 'pollfds' are parallel arrays. */
44 static int *idx_array = NULL;
45 static struct pollfd *pollfds = NULL;
46 static int npollfds = 0;
47
48 /* Listeners attach to the end of the pollfds array so that we can listen
49 for events on sockets that don't have sockbufs. */
50 static int nlisteners = 0;
51
52 /* An idle event handler that does nothing. */
53 static sockbuf_handler_t sockbuf_idler = {
54 "idle",
55 NULL, NULL, NULL,
56 NULL, NULL
57 };
58
59 static void sockbuf_got_eof(int idx, int err);
60
61 /* Mark a sockbuf as blocked and put it on the POLLOUT list. */
62 static void sockbuf_block(int idx)
63 {
64 int i;
65 sockbufs[idx].flags |= SOCKBUF_BLOCK;
66 for (i = 0; i < npollfds; i++) {
67 if (idx_array[i] == idx) {
68 pollfds[i].events |= POLLOUT;
69 break;
70 }
71 }
72 }
73
74 /* Mark a sockbuf as unblocked and remove it from the POLLOUT list. */
75 static void sockbuf_unblock(int idx)
76 {
77 int i;
78 sockbufs[idx].flags &= (~SOCKBUF_BLOCK);
79 for (i = 0; i < npollfds; i++) {
80 if (idx_array[i] == idx) {
81 pollfds[i].events &= (~POLLOUT);
82 break;
83 }
84 }
85 }
86
87 /* Try to write data to the underlying socket. If we don't write it all,
88 save the data in the output buffer and start monitoring for POLLOUT. */
89 static int sockbuf_real_write(int idx, const char *data, int len)
90 {
91 int nbytes;
92 sockbuf_t *sbuf = &sockbufs[idx];
93
94 /* If it's not blocked already, write as much as we can. */
95 if (!(sbuf->flags & SOCKBUF_BLOCK)) {
96 nbytes = write(sbuf->sock, data, len);
97 if (nbytes == len) return(nbytes);
98 if (nbytes < 0) {
99 if (errno != EAGAIN) {
100 sockbuf_got_eof(idx, errno);
101 return(nbytes);
102 }
103 nbytes = 0;
104 }
105 sockbuf_block(idx);
106 data += nbytes;
107 len -= nbytes;
108 }
109
110 /* Add the remaining data to the buffer. */
111 sbuf->data = (char *)realloc(sbuf->data, sbuf->len + len);
112 memcpy(sbuf->data + sbuf->len, data, len);
113 sbuf->len += len;
114 return(nbytes);
115 }
116
117 /* Eof occurs on a socket. */
118 int sockbuf_on_eof(int idx, int level, int err, char *errmsg)
119 {
120 int i;
121 sockbuf_t *sbuf = &sockbufs[idx];
122
123 for (i = 0; i < sbuf->nfilters; i++) {
124 if (sbuf->filters[i]->on_eof && sbuf->filters[i]->level > level) {
125 return sbuf->filters[i]->on_eof(sbuf->filter_client_data[i], idx, err, errmsg);
126 }
127 }
128
129 /* If we didn't branch to a filter, try the user handler. */
130 if (sbuf->handler->on_eof) {
131 sbuf->handler->on_eof(sbuf->client_data, idx, err, errmsg);
132 }
133 return(0);
134 }
135
136 /* This is called when a client sock connects successfully. */
137 int sockbuf_on_connect(int idx, int level, const char *peer_ip, int peer_port)
138 {
139 int i;
140 sockbuf_t *sbuf = &sockbufs[idx];
141
142 for (i = 0; i < sbuf->nfilters; i++) {
143 if (sbuf->filters[i]->on_connect && sbuf->filters[i]->level > level) {
144 return sbuf->filters[i]->on_connect(sbuf->filter_client_data[i], idx, sbuf->peer_ip, sbuf->peer_port);
145 }
146 }
147
148 sbuf->peer_ip = strdup(peer_ip);
149 sbuf->peer_port = peer_port;
150 if (sbuf->handler->on_connect) {
151 sbuf->handler->on_connect(sbuf->client_data, idx, peer_ip, peer_port);
152 }
153 }
154
155 /* When an incoming connection is accepted. */
156 int sockbuf_on_newclient(int idx, int level, int newidx, const char *peer_ip, int peer_port)
157 {
158 int i;
159 sockbuf_t *sbuf = &sockbufs[idx];
160
161 for (i = 0; i < sbuf->nfilters; i++) {
162 if (sbuf->filters[i]->on_connect && sbuf->filters[i]->level > level) {
163 return sbuf->filters[i]->on_newclient(sbuf->filter_client_data[i], idx, newidx, peer_ip, peer_port);
164 }
165 }
166
167 if (sbuf->handler->on_newclient) {
168 sbuf->handler->on_newclient(sbuf->client_data, idx, newidx, peer_ip, peer_port);
169 }
170 return(0);
171 }
172
173 /* We read some data from the sock. */
174 int sockbuf_on_read(int idx, int level, char *data, int len)
175 {
176 int i;
177 sockbuf_t *sbuf = &sockbufs[idx];
178
179 for (i = 0; i < sbuf->nfilters; i++) {
180 if (sbuf->filters[i]->on_read && sbuf->filters[i]->level > level) {
181 return sbuf->filters[i]->on_read(sbuf->filter_client_data[i], idx, data, len);
182 }
183 }
184
185 if (sbuf->handler->on_read ){
186 sbuf->handler->on_read(sbuf->client_data, idx, data, len);
187 }
188 return(0);
189 }
190
191 /* We're writing some data to the sock. */
192 int sockbuf_on_write(int idx, int level, const char *data, int len)
193 {
194 int i;
195 sockbuf_t *sbuf = &sockbufs[idx];
196
197 for (i = sbuf->nfilters-1; i >= 0; i--) {
198 if (sbuf->filters[i]->on_write && sbuf->filters[i]->level < level) {
199 return sbuf->filters[i]->on_write(sbuf->filter_client_data[i], idx, data, len);
200 }
201 }
202 /* There's no user handler for on_write (they wrote it). */
203 return sockbuf_real_write(idx, data, len);
204 }
205
206 /* We wrote some data to the sock. */
207 int sockbuf_on_written(int idx, int level, int len, int remaining)
208 {
209 int i;
210 sockbuf_t *sbuf = &sockbufs[idx];
211
212 for (i = 0; i < sbuf->nfilters; i++) {
213 if (sbuf->filters[i]->on_written && sbuf->filters[i]->level > level) {
214 return sbuf->filters[i]->on_written(sbuf->filter_client_data[i], idx, len, remaining);
215 }
216 }
217
218 if (sbuf->handler->on_written) {
219 sbuf->handler->on_written(sbuf->client_data, idx, len, remaining);
220 }
221 return(0);
222 }
223
224 /* When eof or an error is detected. */
225 static void sockbuf_got_eof(int idx, int err)
226 {
227 char *errmsg;
228 sockbuf_t *sbuf = &sockbufs[idx];
229
230 /* If there's no error given, check for a socket-level error. */
231 if (!err) err = socket_get_error(sbuf->sock);
232
233 /* Get the associated error message. */
234 errmsg = strerror(err);
235
236 close(sbuf->sock);
237 sockbuf_on_eof(idx, SOCKBUF_LEVEL_INTERNAL, err, errmsg);
238 }
239
240 /* When a client sock is writable, that means it's connected. Unless there's
241 a socket level error, anyway. So see if there's an error, then get
242 the peer we're connected to, then call the on_connect event. */
243 static void sockbuf_got_writable_client(int idx)
244 {
245 int err, peer_port;
246 char *peer_ip;
247 sockbuf_t *sbuf = &sockbufs[idx];
248
249 err = socket_get_error(sbuf->sock);
250 if (err) {
251 sockbuf_got_eof(idx, err);
252 return;
253 }
254
255 sbuf->flags &= ~SOCKBUF_CLIENT;
256 sockbuf_unblock(idx);
257 socket_get_peer_name(sbuf->sock, &peer_ip, &peer_port);
258
259 sockbuf_on_connect(idx, SOCKBUF_LEVEL_INTERNAL, peer_ip, peer_port);
260 if (peer_ip) free(peer_ip);
261 }
262
263 /* When a server sock is readable, that means there's a connection waiting
264 to be accepted. So we'll accept the sock, get the peer name, and
265 call the on_newclient event. */
266 static void sockbuf_got_readable_server(int idx)
267 {
268 int newsock, newidx, peer_port;
269 char *peer_ip = NULL;
270 sockbuf_t *sbuf = &sockbufs[idx];
271
272 newsock = socket_accept(sbuf->sock, &peer_ip, &peer_port);
273 if (newsock < 0) {
274 if (peer_ip) free(peer_ip);
275 return;
276 }
277 socket_set_nonblock(newsock, 1);
278
279 newidx = sockbuf_new();
280 sockbuf_set_sock(newidx, newsock, 0);
281 sockbuf_on_newclient(idx, SOCKBUF_LEVEL_INTERNAL, newidx, peer_ip, peer_port);
282 }
283
284 /* This is called when the POLLOUT condition is true for already-connected
285 socks. We write as much data as we can and call the on_written
286 event. */
287 static void sockbuf_got_writable(int idx)
288 {
289 int nbytes;
290 sockbuf_t *sbuf = &sockbufs[idx];
291
292 /* Try to write any buffered data. */
293 errno = 0;
294 nbytes = write(sbuf->sock, sbuf->data, sbuf->len);
295 if (nbytes > 0) {
296 sbuf->len -= nbytes;
297 if (!sbuf->len) sockbuf_unblock(idx);
298 else memmove(sbuf->data, sbuf->data+nbytes, sbuf->len);
299 sockbuf_on_written(idx, SOCKBUF_LEVEL_INTERNAL, nbytes, sbuf->len);
300 }
301 else if (nbytes < 0) {
302 /* If there's an error writing to a socket that's marked as
303 writable, then there's probably a socket-level error. */
304 sockbuf_got_eof(idx, errno);
305 }
306 }
307
308 /* When a sock is readable we read some from it and pass it to the on_read
309 handlers. We don't want to read more than once here, because fast
310 sockets on slow computers can get stuck in the read loop. */
311 static void sockbuf_got_readable(int idx)
312 {
313 sockbuf_t *sbuf = &sockbufs[idx];
314 char buf[4097];
315 int nbytes;
316
317 errno = 0;
318 nbytes = read(sbuf->sock, buf, sizeof(buf)-1);
319 if (nbytes > 0) {
320 buf[nbytes] = 0;
321 sockbuf_on_read(idx, SOCKBUF_LEVEL_INTERNAL, buf, nbytes);
322 }
323 else {
324 sockbuf_got_eof(idx, errno);
325 }
326 }
327
328 int sockbuf_new()
329 {
330 sockbuf_t *sbuf;
331 int idx;
332
333 for (idx = 0; idx < nsockbufs; idx++) {
334 if (sockbufs[idx].flags & SOCKBUF_AVAIL) break;
335 }
336 if (idx == nsockbufs) {
337 int i;
338
339 sockbufs = (sockbuf_t *)realloc(sockbufs, (nsockbufs+5) * sizeof(*sockbufs));
340 memset(sockbufs+nsockbufs, 0, 5 * sizeof(*sockbufs));
341 for (i = 0; i < 5; i++) {
342 sockbufs[nsockbufs+i].sock = -1;
343 sockbufs[nsockbufs+i].flags = SOCKBUF_AVAIL;
344 }
345 nsockbufs += 5;
346 }
347
348 sbuf = &sockbufs[idx];
349 memset(sbuf, 0, sizeof(*sbuf));
350 sbuf->flags = SOCKBUF_BLOCK;
351 sbuf->sock = -1;
352 sbuf->handler = &sockbuf_idler;
353
354 return(idx);
355 }
356
357 int sockbuf_set_sock(int idx, int sock, int flags)
358 {
359 int i;
360
361 if (!sockbuf_isvalid(idx)) return(-1);
362
363 sockbufs[idx].sock = sock;
364 sockbufs[idx].flags &= ~(SOCKBUF_CLIENT|SOCKBUF_SERVER|SOCKBUF_BLOCK);
365 sockbufs[idx].flags |= flags;
366
367 /* pollfds = [socks][socks][socks][listeners][listeners][end] */
368 /* idx_array = [ idx ][ idx ][ idx ][end]*/
369 /* So when we grow pollfds, we shift the listeners at the end. */
370
371 /* Find the entry in the pollfds array. */
372 for (i = 0; i < npollfds; i++) {
373 if (idx_array[i] == idx) break;
374 }
375
376 if (sock == -1) {
377 if (i == npollfds) return(1);
378
379 /* If they set the sock to -1, then we remove the entry. */
380 memmove(idx_array+i, idx_array+i+1, sizeof(int) * (npollfds-i-1));
381 memmove(pollfds+i, pollfds+i+1, sizeof(*pollfds) * (nlisteners + npollfds-i-1));
382 npollfds--;
383 return(0);
384 }
385
386 /* Add it to the end if it's not found. */
387 if (i == npollfds) {
388 /* Add the new idx to the idx_array. */
389 idx_array = (int *)realloc(idx_array, sizeof(int) * (i+1));
390 idx_array[i] = idx;
391
392 /* Add corresponding pollfd to pollfds. */
393 pollfds = (struct pollfd *)realloc(pollfds, sizeof(*pollfds) * (i+nlisteners+1));
394 memmove(pollfds+i+1, pollfds+i, sizeof(*pollfds) * nlisteners);
395
396 npollfds++;
397 }
398
399 pollfds[i].fd = sock;
400 pollfds[i].events = 0;
401 if (flags & (SOCKBUF_BLOCK|SOCKBUF_CLIENT)) pollfds[i].events |= POLLOUT;
402 if (!(flags & SOCKBUF_NOREAD)) pollfds[i].events |= POLLIN;
403
404 return(idx);
405 }
406
407 int sockbuf_isvalid(int idx)
408 {
409 if (idx >= 0 && idx < nsockbufs && !(sockbufs[idx].flags & (SOCKBUF_AVAIL | SOCKBUF_DELETED))) return(1);
410 return(0);
411 }
412
413 int sockbuf_close(int idx)
414 {
415 sockbuf_t *sbuf;
416
417 if (!sockbuf_isvalid(idx)) return(-1);
418 sbuf = &sockbufs[idx];
419 if (sbuf->sock >= 0) {
420 close(sbuf->sock);
421 sockbuf_set_sock(idx, -1, 0);
422 }
423 return(0);
424 }
425
426 int sockbuf_delete(int idx)
427 {
428 sockbuf_t *sbuf;
429 int i;
430
431 if (!sockbuf_isvalid(idx)) return(-1);
432 sbuf = &sockbufs[idx];
433
434 /* Call the on_delete handler for all filters. */
435 for (i = 0; i < sbuf->nfilters; i++) {
436 if (sbuf->filters[i]->on_delete) {
437 sbuf->filters[i]->on_delete(sbuf->filter_client_data[i], idx);
438 }
439 }
440
441 /* Close the file descriptor. */
442 if (sbuf->sock >= 0) close(sbuf->sock);
443
444 /* Free the peer ip. */
445 if (sbuf->peer_ip) free(sbuf->peer_ip);
446
447 /* Free its output buffer. */
448 if (sbuf->data) free(sbuf->data);
449
450 /* Mark it as deleted. */
451 memset(sbuf, 0, sizeof(*sbuf));
452 sbuf->sock = -1;
453 sbuf->flags = SOCKBUF_DELETED;
454 ndeleted_sockbufs++;
455
456 /* Find it in the pollfds/idx_array and delete it. */
457 for (i = 0; i < npollfds; i++) if (idx_array[i] == idx) break;
458 if (i == npollfds) return(0);
459
460 memmove(pollfds+i, pollfds+i+1, sizeof(*pollfds) * (npollfds+nlisteners-i-1));
461 memmove(idx_array+i, idx_array+i+1, sizeof(int) * (npollfds-i-1));
462 npollfds--;
463
464 return(0);
465 }
466
467 int sockbuf_write(int idx, const char *data, int len)
468 {
469 return sockbuf_on_write(idx, SOCKBUF_LEVEL_WRITE_INTERNAL, data, len);
470 }
471
472 int sockbuf_set_handler(int idx, sockbuf_handler_t *handler, void *client_data)
473 {
474 if (!sockbuf_isvalid(idx)) return(-1);
475 sockbufs[idx].handler = handler;
476 sockbufs[idx].client_data = client_data;
477
478 return(0);
479 }
480
481 /* Listeners are sockets that you want to be included in the event loop, but
482 do not have sockbufs associated with them. This is useful for stuff
483 like Tcl scripts who want to use async Tcl channels. All you have to
484 do is attach the channel's file descriptor with this function and it
485 will be monitored for activity (but not acted upon).
486 */
487 int sockbuf_attach_listener(int fd)
488 {
489 pollfds = (struct pollfd *)realloc(pollfds, sizeof(*pollfds) * (npollfds + nlisteners + 1));
490 pollfds[npollfds+nlisteners].fd = fd;
491 pollfds[npollfds+nlisteners].events = POLLIN;
492 pollfds[npollfds+nlisteners].revents = 0;
493 nlisteners++;
494 return(0);
495 }
496
497 int sockbuf_detach_listener(int fd)
498 {
499 int i;
500
501 /* Search for it so we can clear its event field. */
502 for (i = 0; i < nlisteners; i++) {
503 if (pollfds[npollfds+i].fd == fd) break;
504 }
505 if (i < nlisteners) {
506 memmove(pollfds+npollfds+i, pollfds+npollfds+i+1, sizeof(*pollfds) * (nlisteners-i-1));
507 nlisteners--;
508 }
509 return(0);
510 }
511
512 /* A filter is something you can write to intercept events that happen on/to
513 a sockbuf. When something happens, like data arrives on the socket,
514 we pass the event to the earliest filter in the chain. It chooses to
515 halt the event or continue it (maybe modifying it too). Some events,
516 like writing to the sockbuf (sockbuf_write) have to get called
517 backwards.
518 */
519 int sockbuf_attach_filter(int idx, sockbuf_filter_t *filter, void *client_data)
520 {
521 sockbuf_t *sbuf;
522 int i;
523
524 if (!sockbuf_isvalid(idx)) return(-1);
525 sbuf = &sockbufs[idx];
526
527 sbuf->filters = (sockbuf_filter_t **)realloc(sbuf->filters, sizeof(filter) * (sbuf->nfilters+1));
528
529 sbuf->filter_client_data = (void **)realloc(sbuf->filter_client_data, sizeof(void *) * (sbuf->nfilters+1));
530
531 /* Filters are ordered according to levels. The lower the level, the
532 earlier the filter comes. This allows filters to be stacked
533 in different orders but still function intelligently (e.g.
534 compression should always be above encryption).
535 */
536 for (i = 0; i < sbuf->nfilters; i++) {
537 if (filter->level < sbuf->filters[i]->level) break;
538 }
539
540 /* Move up the higher-level filters. */
541 memmove(sbuf->filters+i+1, sbuf->filters+i, sizeof(filter) * (sbuf->nfilters-i));
542 memmove(sbuf->filter_client_data+i+1, sbuf->filter_client_data+i, sizeof(void *) * (sbuf->nfilters-i));
543
544 /* Put this filter in the empty spot. */
545 sbuf->filters[i] = filter;
546 sbuf->filter_client_data[i] = client_data;
547
548 sbuf->nfilters++;
549 return(0);
550 }
551
552 /* Detach the specified filter, and return the filter's client data in the
553 client_data pointer (it should be a pointer to a pointer). */
554 int sockbuf_detach_filter(int idx, sockbuf_filter_t *filter, void *client_data)
555 {
556 int i;
557 sockbuf_t *sbuf;
558
559 if (!sockbuf_isvalid(idx)) return(-1);
560 sbuf = &sockbufs[idx];
561
562 for (i = 0; i < sbuf->nfilters; i++) if (sbuf->filters[i] == filter) break;
563 if (i == sbuf->nfilters) {
564 if (client_data) *(void **)client_data = NULL;
565 return(0);
566 }
567
568 if (client_data) *(void **)client_data = sbuf->filter_client_data[i];
569 memmove(sbuf->filter_client_data+i, sbuf->filter_client_data+i+1, sizeof(void *) * sbuf->nfilters-i-1);
570 memmove(sbuf->filters+i, sbuf->filters+i+1, sizeof(void *) * (sbuf->nfilters-i-1));
571 sbuf->nfilters--;
572 return(0);
573 }
574
575 /* This bit waits for something to happen on one of the sockets, with an
576 optional timeout (pass -1 to wait forever). Then, all of the sockbufs
577 are processed, callbacks made, etc, before control returns to the
578 caller.
579 */
580 int sockbuf_update_all(int timeout)
581 {
582 int i, n, flags, revents, idx;
583 static int depth = 0;
584
585 /* Increment the depth counter when we enter the proc. */
586 depth++;
587
588 n = poll(pollfds, npollfds, timeout);
589 if (n < 0) n = 0;
590
591 /* If a sockbuf gets deleted during its event handler, the pollfds array
592 gets shifted down and we will miss the events of the next
593 socket. That's ok, because we'll pick up those events next
594 time.
595 */
596 for (i = 0; n && i < npollfds; i++) {
597 /* Common case: no activity. */
598 revents = pollfds[i].revents;
599 if (!revents) continue;
600
601 idx = idx_array[i];
602 flags = sockbufs[idx].flags;
603 if (revents & POLLOUT) {
604 if (flags & SOCKBUF_CLIENT) sockbuf_got_writable_client(idx);
605 else sockbuf_got_writable(idx);
606 }
607 if (revents & POLLIN) {
608 if (flags & SOCKBUF_SERVER) sockbuf_got_readable_server(idx);
609 else sockbuf_got_readable(idx);
610 }
611 if (revents & (POLLHUP|POLLNVAL|POLLERR)) sockbuf_got_eof(idx, 0);
612 n--;
613 }
614
615 /* Now that we're done manipulating stuff, back out of the depth. */
616 depth--;
617
618 /* If this is the topmost level, check for deleted sockbufs. */
619 if (ndeleted_sockbufs && !depth) {
620 for (i = 0; ndeleted_sockbufs && i < nsockbufs; i++) {
621 if (sockbufs[i].flags & SOCKBUF_DELETED) {
622 sockbufs[i].flags = SOCKBUF_AVAIL;
623 ndeleted_sockbufs--;
624 }
625 }
626 /* If ndeleted_sockbufs isn't 0, then we somehow lost track of
627 an idx. That can't happen, but we might as well be
628 safe. */
629 ndeleted_sockbufs = 0;
630 }
631
632 return(0);
633 }

webmaster@eggheads.org
ViewVC Help
Powered by ViewVC 1.1.23