1 Star 0 Fork 0

souhoiryo / opensips

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
tcp_read.c 30.01 KB
一键复制 编辑 原始数据 按行查看 历史
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181
/*
* $Id$
*
* Copyright (C) 2001-2003 FhG Fokus
*
* This file is part of opensips, a free SIP server.
*
* opensips is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* opensips is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* History:
* --------
* 2002-12-?? created by andrei.
* 2003-02-10 zero term before calling receive_msg & undo afterward (andrei)
* 2003-05-13 l: (short form of Content-Length) is now recognized (andrei)
* 2003-07-01 tcp_read & friends take no a single tcp_connection
* parameter & they set c->state to S_CONN_EOF on eof (andrei)
* 2003-07-04 fixed tcp EOF handling (possible infinite loop) (andrei)
* 2005-07-05 migrated to the new io_wait code (andrei)
*/
/*!
* \file
* \brief TCP connections - read functions
*/
#ifdef USE_TCP
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdlib.h> /* for abort() */
#include "dprint.h"
#include "tcp_conn.h"
#include "pass_fd.h"
#include "globals.h"
#include "receive.h"
#include "timer.h"
#include "ut.h"
#ifdef USE_TLS
#include "tls/tls_server.h"
#endif
#define HANDLE_IO_INLINE
#include "io_wait.h"
#include <fcntl.h> /* must be included after io_wait.h if SIGIO_RT is used */
#include "forward.h"
#include "pt.h"
enum fd_types { F_NONE=0, F_TCPMAIN=1, F_TCPCONN=2 }; /*!< types used in io_wait* */
static struct tcp_connection* tcp_conn_lst=0; /*!< list of tcp connections handled by this process */
static io_wait_h io_w; /* io_wait handler*/
static int tcpmain_sock=-1;
/* buffer to be used for reading all TCP SIP messages
detached from the actual con - in order to improve
paralelism ( process the SIP message while the con
can be sent back to main to do more stuff */
struct tcp_req current_req;
/*! \brief reads next available bytes
* \return number of bytes read, 0 on EOF or -1 on error,
* on EOF it also sets c->state to S_CONN_EOF
* (to distinguish from reads that would block which could return 0)
* sets also r->error
*/
int tcp_read(struct tcp_connection *c,struct tcp_req *r)
{
int bytes_free, bytes_read;
int fd;
fd=c->fd;
bytes_free=TCP_BUF_SIZE- (int)(r->pos - r->buf);
if (bytes_free==0){
LM_ERR("buffer overrun, dropping\n");
r->error=TCP_REQ_OVERRUN;
return -1;
}
again:
bytes_read=read(fd, r->pos, bytes_free);
if(bytes_read==-1){
if (errno == EWOULDBLOCK || errno == EAGAIN){
return 0; /* nothing has been read */
}else if (errno == EINTR) goto again;
else{
LM_ERR("error reading: %s\n",strerror(errno));
r->error=TCP_READ_ERROR;
return -1;
}
}else if (bytes_read==0){
c->state=S_CONN_EOF;
LM_DBG("EOF on %p, FD %d\n", c, fd);
}
#ifdef EXTRA_DEBUG
LM_DBG("read %d bytes:\n%.*s\n", bytes_read, bytes_read, r->pos);
#endif
r->pos+=bytes_read;
return bytes_read;
}
/*! \brief
* reads all headers (until double crlf), & parses the content-length header
*
* \note (WARNING: inefficient, tries to reuse receive_msg but will go through
* the headers twice [once here looking for Content-Length and for the end
* of the headers and once in receive_msg]; a more speed efficient version will
* result in either major code duplication or major changes to the receive code)
*
* \return number of bytes read & sets r->state & r->body
* when either r->body!=0 or r->state==H_BODY =>
* all headers have been read. It should be called in a while loop.
* returns < 0 if error or 0 if EOF */
int tcp_read_headers(struct tcp_connection *c,struct tcp_req *r)
{
unsigned int remaining;
int bytes;
char *p;
#define crlf_default_skip_case \
case '\n': \
r->state=H_LF; \
break; \
default: \
r->state=H_SKIP
#define content_len_beg_case \
case ' ': \
case '\t': \
if (!r->has_content_len) r->state=H_STARTWS; \
else r->state=H_SKIP; \
/* not interested if we already found one */ \
break; \
case 'C': \
case 'c': \
if(!r->has_content_len) r->state=H_CONT_LEN1; \
else r->state=H_SKIP; \
break; \
case 'l': \
case 'L': \
/* short form for Content-Length */ \
if (!r->has_content_len) r->state=H_L_COLON; \
else r->state=H_SKIP; \
break
#define change_state(upper, lower, newstate)\
switch(*p){ \
case upper: \
case lower: \
r->state=(newstate); break; \
crlf_default_skip_case; \
}
#define change_state_case(state0, upper, lower, newstate)\
case state0: \
change_state(upper, lower, newstate); \
p++; \
break
/* if we still have some unparsed part, parse it first, don't do the read*/
if (r->parsed<r->pos){
bytes=0;
}else{
#ifdef USE_TLS
if (c->type==PROTO_TLS)
bytes=tls_read(c,r);
else
#endif
bytes=tcp_read(c,r);
if (bytes<=0) return bytes;
}
p=r->parsed;
while(p<r->pos && r->error==TCP_REQ_OK){
switch((unsigned char)r->state){
case H_BODY: /* read the body*/
remaining=r->pos-p;
if (remaining>r->bytes_to_go) remaining=r->bytes_to_go;
r->bytes_to_go-=remaining;
p+=remaining;
if (r->bytes_to_go==0){
r->complete=1;
goto skip;
}
break;
case H_SKIP:
/* find lf, we are in this state if we are not interested
* in anything till end of line*/
p=q_memchr(p, '\n', r->pos-p);
if (p){
p++;
r->state=H_LF;
}else{
p=r->pos;
}
break;
case H_LF:
/* terminate on LF CR LF or LF LF */
switch (*p){
case '\r':
r->state=H_LFCR;
break;
case '\n':
/* found LF LF */
r->state=H_BODY;
if (r->has_content_len){
r->body=p+1;
r->bytes_to_go=r->content_len;
if (r->bytes_to_go==0){
r->complete=1;
p++;
goto skip;
}
}else{
LM_DBG("no clen, p=%X\n", *p);
r->error=TCP_REQ_BAD_LEN;
}
break;
content_len_beg_case;
default:
r->state=H_SKIP;
}
p++;
break;
case H_LFCR:
if (*p=='\n'){
/* found LF CR LF */
r->state=H_BODY;
if (r->has_content_len){
r->body=p+1;
r->bytes_to_go=r->content_len;
if (r->bytes_to_go==0){
r->complete=1;
p++;
goto skip;
}
}else{
LM_DBG("no clen, p=%X\n", *p);
r->error=TCP_REQ_BAD_LEN;
}
}else r->state=H_SKIP;
p++;
break;
case H_STARTWS:
switch (*p){
content_len_beg_case;
crlf_default_skip_case;
}
p++;
break;
case H_SKIP_EMPTY:
switch (*p){
case '\n':
break;
case '\r':
if (tcp_crlf_pingpong) {
r->state=H_SKIP_EMPTY_CR_FOUND;
r->start=p;
}
break;
case ' ':
case '\t':
/* skip empty lines */
break;
case 'C':
case 'c':
r->state=H_CONT_LEN1;
r->start=p;
break;
case 'l':
case 'L':
/* short form for Content-Length */
r->state=H_L_COLON;
r->start=p;
break;
default:
r->state=H_SKIP;
r->start=p;
};
p++;
break;
case H_SKIP_EMPTY_CR_FOUND:
if (*p=='\n'){
r->state=H_SKIP_EMPTY_CRLF_FOUND;
p++;
}else{
r->state=H_SKIP_EMPTY;
}
break;
case H_SKIP_EMPTY_CRLF_FOUND:
if (*p=='\r'){
r->state = H_SKIP_EMPTY_CRLFCR_FOUND;
p++;
}else{
r->state = H_SKIP_EMPTY;
}
break;
case H_SKIP_EMPTY_CRLFCR_FOUND:
if (*p=='\n'){
r->state = H_PING_CRLFCRLF;
r->complete = 1;
r->has_content_len = 1; /* hack to avoid error check */
p++;
goto skip;
}else{
r->state = H_SKIP_EMPTY;
}
break;
change_state_case(H_CONT_LEN1, 'O', 'o', H_CONT_LEN2);
change_state_case(H_CONT_LEN2, 'N', 'n', H_CONT_LEN3);
change_state_case(H_CONT_LEN3, 'T', 't', H_CONT_LEN4);
change_state_case(H_CONT_LEN4, 'E', 'e', H_CONT_LEN5);
change_state_case(H_CONT_LEN5, 'N', 'n', H_CONT_LEN6);
change_state_case(H_CONT_LEN6, 'T', 't', H_CONT_LEN7);
change_state_case(H_CONT_LEN7, '-', '_', H_CONT_LEN8);
change_state_case(H_CONT_LEN8, 'L', 'l', H_CONT_LEN9);
change_state_case(H_CONT_LEN9, 'E', 'e', H_CONT_LEN10);
change_state_case(H_CONT_LEN10, 'N', 'n', H_CONT_LEN11);
change_state_case(H_CONT_LEN11, 'G', 'g', H_CONT_LEN12);
change_state_case(H_CONT_LEN12, 'T', 't', H_CONT_LEN13);
change_state_case(H_CONT_LEN13, 'H', 'h', H_L_COLON);
case H_L_COLON:
switch(*p){
case ' ':
case '\t':
break; /* skip space */
case ':':
r->state=H_CONT_LEN_BODY;
break;
crlf_default_skip_case;
};
p++;
break;
case H_CONT_LEN_BODY:
switch(*p){
case ' ':
case '\t':
break; /* eat space */
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
r->state=H_CONT_LEN_BODY_PARSE;
r->content_len=(*p-'0');
break;
/*FIXME: content length on different lines ! */
crlf_default_skip_case;
}
p++;
break;
case H_CONT_LEN_BODY_PARSE:
switch(*p){
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
r->content_len=r->content_len*10+(*p-'0');
break;
case '\r':
case ' ':
case '\t': /* FIXME: check if line contains only WS */
r->state=H_SKIP;
r->has_content_len=1;
break;
case '\n':
/* end of line, parse successful */
r->state=H_LF;
r->has_content_len=1;
break;
default:
LM_ERR("bad Content-Length header value, unexpected "
"char %c in state %d\n", *p, r->state);
r->state=H_SKIP; /* try to find another?*/
}
p++;
break;
default:
LM_CRIT("unexpected state %d\n", r->state);
abort();
}
}
skip:
r->parsed=p;
return bytes;
}
void release_tcpconn(struct tcp_connection* c, long state, int unix_sock)
{
long response[2];
LM_DBG(" releasing con %p, state %ld, fd=%d, id=%d\n",
c, state, c->fd, c->id);
LM_DBG(" extra_data %p\n", c->extra_data);
if (c->con_req) {
pkg_free(c->con_req);
c->con_req = NULL;
}
/* release req & signal the parent */
if (c->fd!=-1) close(c->fd);
/* errno==EINTR, EWOULDBLOCK a.s.o todo */
response[0]=(long)c;
response[1]=state;
if (send_all(unix_sock, response, sizeof(response))<=0)
LM_ERR("send_all failed\n");
}
/* Responsible for writing the TCP send chunks - called under con write lock
* * if returns >= 0 : it keeps the connection for further usage
* or releases it manually
* * if returns < 0 : the connection should be released by the
* upper layer
*/
int tcp_write_async_req(struct tcp_connection* con)
{
int n,left;
struct tcp_send_chunk *chunk;
if (con->async_chunks_no == 0) {
LM_DBG("The connection has been triggered "
" for a write event - but we have no pending write chunks\n");
return 0;
}
next_chunk:
chunk=con->async_chunks[0];
again:
left = (int)((chunk->buf+chunk->len)-chunk->pos);
LM_DBG("Trying to send %d bytes from chunk %p in conn %p - %d %d \n",
left,chunk,con,chunk->ticks,get_ticks());
n=send(con->fd, chunk->pos, left,
#ifdef HAVE_MSG_NOSIGNAL
MSG_NOSIGNAL
#else
0
#endif
);
if (n<0) {
if (errno==EINTR)
goto again;
else if (errno==EAGAIN || errno==EWOULDBLOCK) {
LM_DBG("Can't finish to write chunk %p on conn %p\n",
chunk,con);
release_tcpconn(con, ASYNC_WRITE, tcpmain_sock);
return 0;
} else {
LM_ERR("Error occured while sending async chunk %d (%s)\n",
errno,strerror(errno));
return CONN_ERROR;
}
}
if (n < left) {
/* partial write */
chunk->pos+=n;
goto again;
} else {
/* written a full chunk - move to the next one, if any */
shm_free(chunk);
con->async_chunks_no--;
if (con->async_chunks_no == 0) {
LM_DBG("We have finished writing all our async chunks in %p\n",con);
con->oldest_chunk=0;
release_tcpconn(con, CONN_RELEASE, tcpmain_sock);
return 0;
} else {
LM_DBG("We still have %d chunks pending on %p\n",
con->async_chunks_no,con);
memmove(&con->async_chunks[0],&con->async_chunks[1],
con->async_chunks_no * sizeof(struct tcp_send_chunk*));
con->oldest_chunk = con->async_chunks[0]->ticks;
goto next_chunk;
}
}
}
/* Responsible for reading the request
* * if returns >= 0 : it keeps the connection for further usage
* or releases it manually
* * if returns < 0 : the connection should be released by the
* upper layer
*/
int tcp_read_req(struct tcp_connection* con, int* bytes_read)
{
int bytes;
int total_bytes;
int resp;
long size;
struct tcp_req* req;
char c;
struct receive_info local_rcv;
char *msg_buf;
int msg_len;
bytes=-1;
total_bytes=0;
resp=CONN_RELEASE;
if (con->con_req) {
req=con->con_req;
LM_DBG("Using the per connection buff \n");
} else {
LM_DBG("Using the global ( per process ) buff \n");
req=&current_req;
}
#ifdef USE_TLS
if (con->type==PROTO_TLS){
if (tls_fix_read_conn(con)!=0){
resp=CONN_ERROR;
goto end_req;
}
if(con->state!=S_CONN_OK) goto end_req; /* not enough data */
}
#endif
again:
if(req->error==TCP_REQ_OK){
bytes=tcp_read_headers(con,req);
//#ifdef EXTRA_DEBUG
/* if timeout state=0; goto end__req; */
LM_DBG("read= %d bytes, parsed=%d, state=%d, error=%d\n",
bytes, (int)(req->parsed-req->start), req->state,
req->error );
LM_DBG("last char=0x%02X, parsed msg=\n%.*s\n",
*(req->parsed-1), (int)(req->parsed-req->start),
req->start);
//#endif
if (bytes==-1){
LM_ERR("failed to read \n");
resp=CONN_ERROR;
goto end_req;
}
total_bytes+=bytes;
/* eof check:
* is EOF if eof on fd and req. not complete yet,
* if req. is complete we might have a second unparsed
* request after it, so postpone release_with_eof
*/
if ((con->state==S_CONN_EOF) && (req->complete==0)) {
LM_DBG("EOF\n");
resp=CONN_EOF;
goto end_req;
}
}
if (req->error!=TCP_REQ_OK){
LM_ERR("bad request, state=%d, error=%d "
"buf:\n%.*s\nparsed:\n%.*s\n", req->state, req->error,
(int)(req->pos-req->buf), req->buf,
(int)(req->parsed-req->start), req->start);
LM_DBG("- received from: port %d\n", con->rcv.src_port);
print_ip("- received from: ip ",&con->rcv.src_ip, "\n");
resp=CONN_ERROR;
goto end_req;
}
if (req->complete){
#ifdef EXTRA_DEBUG
LM_DBG("end of header part\n");
LM_DBG("- received from: port %d\n", con->rcv.src_port);
print_ip("- received from: ip ", &con->rcv.src_ip, "\n");
LM_DBG("headers:\n%.*s.\n",(int)(req->body-req->start), req->start);
#endif
if (req->has_content_len){
LM_DBG("content-length= %d\n", req->content_len);
#ifdef EXTRA_DEBUG
LM_DBG("body:\n%.*s\n", req->content_len,req->body);
#endif
}else{
req->error=TCP_REQ_BAD_LEN;
LM_ERR("content length not present or unparsable\n");
resp=CONN_ERROR;
goto end_req;
}
/* update the timeout - we succesfully read the request */
con->timeout=get_ticks()+tcp_max_msg_time;
/* if we are here everything is nice and ok*/
update_stat( pt[process_no].load, +1 );
resp=CONN_RELEASE;
#ifdef EXTRA_DEBUG
LM_DBG("calling receive_msg(%p, %d, )\n",
req->start, (int)(req->parsed-req->start));
#endif
/* rcv.bind_address should always be !=0 */
bind_address=con->rcv.bind_address;
/* just for debugging use sendipv4 as receiving socket FIXME*/
/*
if (con->rcv.dst_ip.af==AF_INET6){
bind_address=sendipv6_tcp;
}else{
bind_address=sendipv4_tcp;
}
*/
con->rcv.proto_reserved1=con->id; /* copy the id */
c=*req->parsed; /* ugly hack: zero term the msg & save the
previous char, req->parsed should be ok
because we always alloc BUF_SIZE+1 */
*req->parsed=0;
/* prepare for next request */
size=req->pos-req->parsed;
if (req->state==H_PING_CRLFCRLF) {
/* we send the reply */
if (tcp_send( con->rcv.bind_address, con->rcv.proto,CRLF,
CRLF_LEN, &(con->rcv.src_su), con->rcv.proto_reserved1) < 0) {
LM_ERR("CRLF pong - tcp_send() failed\n");
}
if (!size) {
/* we can release the connection */
io_watch_del(&io_w, con->fd, -1, IO_FD_CLOSING,IO_WATCH_READ);
tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
if (con->state==S_CONN_EOF)
release_tcpconn(con, CONN_EOF, tcpmain_sock);
else
release_tcpconn(con, CONN_RELEASE, tcpmain_sock);
}
} else {
msg_buf = req->start;
msg_len = req->parsed-req->start;
local_rcv = con->rcv;
if (!size) {
/* did not read any more things - we can release the connection */
LM_DBG("We're releasing the connection in state %d \n",con->state);
if (req != &current_req) {
/* we have the buffer in the connection tied buff -
* detach it , release the conn and free it afterwards */
con->con_req = NULL;
}
io_watch_del(&io_w, con->fd, -1, IO_FD_CLOSING,IO_WATCH_READ);
tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
/* if we have EOF, signal that to MAIN as well
* otherwise - just pass it back */
if (con->state==S_CONN_EOF)
release_tcpconn(con, CONN_EOF, tcpmain_sock);
else
release_tcpconn(con, CONN_RELEASE, tcpmain_sock);
} else {
LM_DBG("We still have things on the pipe - keeping connection \n");
}
if (receive_msg(msg_buf, msg_len,
&local_rcv) <0)
LM_ERR("receive_msg failed \n");
if (!size && req != &current_req) {
/* if we no longer need this tcp_req
* we can free it now */
pkg_free(req);
}
}
*req->parsed=c;
update_stat( pt[process_no].load, -1 );
if (size) memmove(req->buf, req->parsed, size);
#ifdef EXTRA_DEBUG
LM_DBG("preparing for new request, kept %ld bytes\n", size);
#endif
req->pos=req->buf+size;
req->parsed=req->buf;
req->start=req->buf;
req->body=0;
req->error=TCP_REQ_OK;
req->state=H_SKIP_EMPTY;
req->complete=req->content_len=req->has_content_len=0;
req->bytes_to_go=0;
con->msg_attempts = 0;
/* if we still have some unparsed bytes, try to parse them too*/
if (size) goto again;
} else {
/* request not complete - check the if the thresholds are exceeded */
con->msg_attempts ++;
if (con->msg_attempts == tcp_max_msg_chunks) {
LM_ERR("Made %u read attempts but message is not complete yet - "
"closing connection \n",con->msg_attempts);
resp = CONN_ERROR;
goto end_req;
}
if (req == &current_req) {
/* let's duplicate this - most likely another conn will come in */
LM_DBG("We didn't manage to read a full request. Back to child poll\n");
/* FIXME - PKG or SHM ? */
con->con_req = pkg_malloc(sizeof(struct tcp_req));
if (con->con_req == NULL) {
LM_ERR("No more mem for dynamic con request buffer\n");
resp = CONN_ERROR;
goto end_req;
}
if (req->pos != req->buf) {
/* we have read some bytes */
memcpy(con->con_req->buf,req->buf,req->pos-req->buf);
con->con_req->pos = con->con_req->buf + (req->pos-req->buf);
} else {
con->con_req->pos = con->con_req->buf;
}
if (req->start != req->buf)
con->con_req->start = con->con_req->buf + (req->start-req->buf);
else
con->con_req->start = con->con_req->buf;
if (req->parsed != req->buf)
con->con_req->parsed = con->con_req->buf + (req->parsed-req->buf);
else
con->con_req->parsed = con->con_req->buf;
if (req->body != 0) {
con->con_req->body = con->con_req->buf + (req->body-req->buf);
} else
con->con_req->body = 0;
con->con_req->complete=req->complete;
con->con_req->has_content_len=req->has_content_len;
con->con_req->content_len=req->content_len;
con->con_req->bytes_to_go=req->bytes_to_go;
con->con_req->error = req->error;
con->con_req->state = req->state;
/* zero out the per process req for the future SIP msg */
init_tcp_req(&current_req);
}
}
LM_DBG("tcp_read_req end\n");
end_req:
if (bytes_read) *bytes_read=total_bytes;
return resp;
}
#ifdef DEBUG_TCP_RECEIVE
/* old code known to work, kept arround for debuging */
void tcp_receive_loop(int unix_sock)
{
struct tcp_connection* list; /* list with connections in use */
struct tcp_connection* con;
struct tcp_connection* c_next;
int n;
int nfds;
int s;
long resp;
fd_set master_set;
fd_set sel_set;
int maxfd;
struct timeval timeout;
int ticks;
/* init */
list=con=0;
FD_ZERO(&master_set);
FD_SET(unix_sock, &master_set);
maxfd=unix_sock;
/* listen on the unix socket for the fd */
for(;;){
timeout.tv_sec=TCP_CHILD_SELECT_TIMEOUT;
timeout.tv_usec=0;
sel_set=master_set;
nfds=select(maxfd+1, &sel_set, 0 , 0 , &timeout);
#ifdef EXTRA_DEBUG
for (n=0; n<maxfd; n++){
if (FD_ISSET(n, &sel_set))
LM_DBG("fd %d is set\n", n);
}
#endif
if (nfds<0){
if (errno==EINTR) continue; /* just a signal */
/* errors */
LM_ERR("select:(%d) %s\n", errno,
strerror(errno));
continue;
}
if (FD_ISSET(unix_sock, &sel_set)){
nfds--;
/* a new conn from "main" */
n=receive_fd(unix_sock, &con, sizeof(con), &s, 0);
if (n<0){
if (errno == EWOULDBLOCK || errno == EAGAIN ||
errno == EINTR){
goto skip;
}else{
LM_CRIT("read_fd: %s\n",strerror(errno));
abort(); /* big error*/
}
}
LM_DBG("received n=%d con=%p, fd=%d\n", n, con, s);
if (n==0){
LM_WARN("0 bytes read\n");
goto skip;
}
if (con==0){
LM_CRIT("null pointer\n");
goto skip;
}
con->fd=s;
if (s==-1) {
LM_ERR("read_fd: no fd read\n");
resp=CONN_ERROR;
con->state=S_CONN_BAD;
release_tcpconn(con, resp, unix_sock);
goto skip;
}
con->timeout=get_ticks()+TCP_CHILD_TIMEOUT;
FD_SET(s, &master_set);
if (maxfd<s) maxfd=s;
if (con==list){
LM_CRIT("duplicate"
" connection received: %p, id %d, fd %d, refcnt %d"
" state %d (n=%d)\n", con, con->id, con->fd,
con->refcnt, con->state, n);
resp=CONN_ERROR;
release_tcpconn(con, resp, unix_sock);
goto skip; /* try to recover */
}
tcpconn_listadd(list, con, c_next, c_prev);
}
skip:
ticks=get_ticks();
for (con=list; con ; con=c_next){
c_next=con->c_next; /* safe for removing*/
#ifdef EXTRA_DEBUG
LM_DBG("list fd=%d, id=%d, timeout=%d, refcnt=%d\n",
con->fd, con->id, con->timeout, con->refcnt);
#endif
if (con->state<0){
/* S_CONN_BAD or S_CONN_ERROR, remove it */
resp=CONN_ERROR;
FD_CLR(con->fd, &master_set);
tcpconn_listrm(list, con, c_next, c_prev);
con->state=S_CONN_BAD;
release_tcpconn(con, resp, unix_sock);
continue;
}
if (nfds && FD_ISSET(con->fd, &sel_set)){
#ifdef EXTRA_DEBUG
LM_DBG("match, fd:isset\n");
#endif
nfds--;
resp=tcp_read_req(con);
if (resp<0){
FD_CLR(con->fd, &master_set);
tcpconn_listrm(list, con, c_next, c_prev);
con->state=S_CONN_BAD;
release_tcpconn(con, resp, unix_sock);
}else{
/* update timeout */
con->timeout=ticks+TCP_CHILD_TIMEOUT;
}
}else{
/* timeout */
if (con->timeout<=ticks){
/* expired, return to "tcp main" */
LM_DBG("%p expired (%d, %d)\n",
con, con->timeout, ticks);
resp=CONN_RELEASE;
FD_CLR(con->fd, &master_set);
tcpconn_listrm(list, con, c_next, c_prev);
release_tcpconn(con, resp, unix_sock);
}
}
}
}
}
#else /* DEBUG_TCP_RECEIVE */
/*! \brief
* handle io routine, based on the fd_map type
* (it will be called from io_wait_loop* )
* params: fm - pointer to a fd hash entry
* idx - index in the fd_array (or -1 if not known)
* return: -1 on error, or when we are not interested any more on reads
* from this fd (e.g.: we are closing it )
* 0 on EAGAIN or when by some other way it is known that no more
* io events are queued on the fd (the receive buffer is empty).
* Usefull to detect when there are no more io events queued for
* sigio_rt, epoll_et, kqueue.
* >0 on successfull read from the fd (when there might be more io
* queued -- the receive buffer might still be non-empty)
*/
inline static int handle_io(struct fd_map* fm, int idx,int event_type)
{
int ret=0;
int n;
struct tcp_connection* con;
int s,rw;
long resp;
long response[2];
switch(fm->type){
case F_TCPMAIN:
again:
ret=n=receive_fd(fm->fd, response, sizeof(response), &s, 0);
if (n<0){
if (errno == EWOULDBLOCK || errno == EAGAIN){
ret=0;
break;
}else if (errno == EINTR) goto again;
else{
LM_CRIT("read_fd: %s \n", strerror(errno));
abort(); /* big error*/
}
}
if (n==0){
LM_WARN("0 bytes read\n");
break;
}
con = (struct tcp_connection *)response[0];
rw = (int)response[1];
if (con==0){
LM_CRIT("null pointer\n");
break;
}
con->fd=s;
if (s==-1) {
LM_ERR("read_fd:no fd read\n");
goto con_error;
}
if (con==tcp_conn_lst){
LM_CRIT("duplicate"
" connection received: %p, id %d, fd %d, refcnt %d"
" state %d (n=%d)\n", con, con->id, con->fd,
con->refcnt, con->state, n);
release_tcpconn(con, CONN_ERROR, tcpmain_sock);
break; /* try to recover */
}
LM_DBG("We have received conn %p with rw %d\n",con,rw);
if (rw & IO_WATCH_READ) {
/* reset the per process TCP req struct */
init_tcp_req(&current_req);
/* 0 attempts so far for this SIP MSG */
con->msg_attempts = 0;
/* must be before io_watch_add, io_watch_add might catch some
* already existing events => might call handle_io and
* handle_io might decide to del. the new connection =>
* must be in the list */
tcpconn_listadd(tcp_conn_lst, con, c_next, c_prev);
con->timeout=get_ticks()+tcp_max_msg_time;
if (io_watch_add(&io_w, s, F_TCPCONN, con,IO_WATCH_READ)<0){
LM_CRIT("failed to add new socket to the fd list\n");
tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
goto con_error;
}
} else if (rw & IO_WATCH_WRITE) {
LM_DBG("Received con %p ref = %d\n",con,con->refcnt);
lock_get(&con->write_lock);
resp=tcp_write_async_req(con);
if (resp<0) {
lock_release(&con->write_lock);
ret=-1; /* some error occured */
con->state=S_CONN_BAD;
release_tcpconn(con, resp, tcpmain_sock);
break;
}
lock_release(&con->write_lock);
ret = 0;
}
break;
case F_TCPCONN:
if (event_type & IO_WATCH_READ) {
con=(struct tcp_connection*)fm->data;
resp=tcp_read_req(con, &ret);
if (resp<0) {
ret=-1; /* some error occured */
io_watch_del(&io_w, con->fd, idx, IO_FD_CLOSING,
IO_WATCH_READ|IO_WATCH_WRITE);
tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
con->state=S_CONN_BAD;
release_tcpconn(con, resp, tcpmain_sock);
break;
}
}
break;
case F_NONE:
LM_CRIT("empty fd map %p (%d): "
"{%d, %d, %p}\n", fm, (int)(fm-io_w.fd_hash),
fm->fd, fm->type, fm->data);
goto error;
default:
LM_CRIT("uknown fd type %d\n", fm->type);
goto error;
}
return ret;
con_error:
con->state=S_CONN_BAD;
release_tcpconn(con, CONN_ERROR, fm->fd);
return ret;
error:
return -1;
}
/*! \brief releases expired connections and cleans up bad ones (state<0) */
static inline void tcp_receive_timeout(void)
{
struct tcp_connection* con;
struct tcp_connection* next;
unsigned int ticks;
ticks=get_ticks();
for (con=tcp_conn_lst; con; con=next) {
next=con->c_next; /* safe for removing */
if (con->state<0){ /* kill bad connections */
/* S_CONN_BAD or S_CONN_ERROR, remove it */
/* fd will be closed in release_tcpconn */
io_watch_del(&io_w, con->fd, -1, IO_FD_CLOSING,IO_WATCH_READ);
tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
con->state=S_CONN_BAD;
release_tcpconn(con, CONN_ERROR, tcpmain_sock);
continue;
}
if (con->timeout<=ticks){
LM_DBG("%p expired - (%d, %d) lt=%d\n",
con, con->timeout, ticks,con->lifetime);
/* fd will be closed in release_tcpconn */
io_watch_del(&io_w, con->fd, -1, IO_FD_CLOSING,IO_WATCH_READ);
tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
if (con->msg_attempts)
release_tcpconn(con, CONN_ERROR, tcpmain_sock);
else
release_tcpconn(con, CONN_RELEASE, tcpmain_sock);
}
}
}
void tcp_receive_loop(int unix_sock)
{
/* init */
tcpmain_sock=unix_sock; /* init com. socket */
if (init_io_wait(&io_w, tcp_max_fd_no, tcp_poll_method)<0)
goto error;
/* add the unix socket */
if (io_watch_add(&io_w, tcpmain_sock, F_TCPMAIN, 0,IO_WATCH_READ)<0){
LM_CRIT("failed to add socket to the fd list\n");
goto error;
}
/* main loop */
switch(io_w.poll_method){
case POLL_POLL:
while(1){
io_wait_loop_poll(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
tcp_receive_timeout();
}
break;
#ifdef HAVE_SELECT
case POLL_SELECT:
while(1){
io_wait_loop_select(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
tcp_receive_timeout();
}
break;
#endif
#ifdef HAVE_SIGIO_RT
case POLL_SIGIO_RT:
while(1){
io_wait_loop_sigio_rt(&io_w, TCP_CHILD_SELECT_TIMEOUT);
tcp_receive_timeout();
}
break;
#endif
#ifdef HAVE_EPOLL
case POLL_EPOLL_LT:
while(1){
io_wait_loop_epoll(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
tcp_receive_timeout();
}
break;
case POLL_EPOLL_ET:
while(1){
io_wait_loop_epoll(&io_w, TCP_CHILD_SELECT_TIMEOUT, 1);
tcp_receive_timeout();
}
break;
#endif
#ifdef HAVE_KQUEUE
case POLL_KQUEUE:
while(1){
io_wait_loop_kqueue(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
tcp_receive_timeout();
}
break;
#endif
#ifdef HAVE_DEVPOLL
case POLL_DEVPOLL:
while(1){
io_wait_loop_devpoll(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
tcp_receive_timeout();
}
break;
#endif
default:
LM_CRIT("no support for poll method %s (%d)\n",
poll_method_name(io_w.poll_method), io_w.poll_method);
goto error;
}
error:
destroy_io_wait(&io_w);
LM_CRIT("exiting...");
exit(-1);
}
#endif /* DEBUG_TCP_RECEIVE */
#endif /* USE_TCP */
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/maplerain/opensips.git
git@gitee.com:maplerain/opensips.git
maplerain
opensips
opensips
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891