1 Star 0 Fork 0

秋之夜 / wrk

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
wrk.c 17.47 KB
一键复制 编辑 原始数据 按行查看 历史
秋之夜 提交于 2018-04-14 21:27 . 笔记
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586
// Copyright (C) 2012 - Will Glozer. All rights reserved.
#include "wrk.h"
#include "script.h"
#include "main.h"
static struct config {
uint64_t connections;
uint64_t duration;
uint64_t threads;
uint64_t timeout;
uint64_t pipeline;
bool delay;
bool dynamic;
bool latency;
char *host;
char *script;
SSL_CTX *ctx;
} cfg;
static struct {
stats *latency;
stats *requests;
} statistics;
static struct sock sock = {
.connect = sock_connect,
.close = sock_close,
.read = sock_read,
.write = sock_write,
.readable = sock_readable
};
static struct http_parser_settings parser_settings = {
.on_message_complete = response_complete
};
static volatile sig_atomic_t stop = 0;
static void handler(int sig) {
stop = 1;
}
static void usage() {
printf("Usage: wrk <options> <url> \n"
" Options: \n"
" -c, --connections <N> Connections to keep open \n"
" -d, --duration <T> Duration of test \n"
" -t, --threads <N> Number of threads to use \n"
" \n"
" -s, --script <S> Load Lua script file \n"
" -H, --header <H> Add header to request \n"
" --latency Print latency statistics \n"
" --timeout <T> Socket/request timeout \n"
" -v, --version Print version details \n"
" \n"
" Numeric arguments may include a SI unit (1k, 1M, 1G)\n"
" Time arguments may include a time unit (2s, 2m, 2h)\n");
}
int main(int argc, char **argv) {
char *url, **headers = zmalloc(argc * sizeof(char *));
struct http_parser_url parts = {};
if (parse_args(&cfg, &url, &parts, headers, argc, argv)) {
usage();
exit(1);
}
char *schema = copy_url_part(url, &parts, UF_SCHEMA);
char *host = copy_url_part(url, &parts, UF_HOST);
char *port = copy_url_part(url, &parts, UF_PORT);
char *service = port ? port : schema;
if (!strncmp("https", schema, 5)) {
if ((cfg.ctx = ssl_init()) == NULL) {
fprintf(stderr, "unable to initialize SSL\n");
ERR_print_errors_fp(stderr);
exit(1);
}
sock.connect = ssl_connect;
sock.close = ssl_close;
sock.read = ssl_read;
sock.write = ssl_write;
sock.readable = ssl_readable;
}
signal(SIGPIPE, SIG_IGN);
signal(SIGINT, SIG_IGN);
statistics.latency = stats_alloc(cfg.timeout * 1000);
statistics.requests = stats_alloc(MAX_THREAD_RATE_S);
thread *threads = zcalloc(cfg.threads * sizeof(thread)); //根据指定线程数,分配thread结构所需的内容
lua_State *L = script_create(cfg.script, url, headers);
if (!script_resolve(L, host, service)) {
char *msg = strerror(errno);
fprintf(stderr, "unable to connect to %s:%s %s\n", host, service, msg);
exit(1);
}
cfg.host = host;
for (uint64_t i = 0; i < cfg.threads; i++) {
thread *t = &threads[i];
t->loop = aeCreateEventLoop(10 + cfg.connections * 3); //每个线程,对应一个epoll/kqueue/poll/select 文件描述符
t->connections = cfg.connections / cfg.threads; //每个线程要执行的请求数
t->L = script_create(cfg.script, url, headers);
script_init(L, t, argc - optind, &argv[optind]);
if (i == 0) {
cfg.pipeline = script_verify_request(t->L);
cfg.dynamic = !script_is_static(t->L);
cfg.delay = script_has_delay(t->L);
if (script_want_response(t->L)) {
parser_settings.on_header_field = header_field;
parser_settings.on_header_value = header_value;
parser_settings.on_body = response_body;
}
}
if (!t->loop || pthread_create(&t->thread, NULL, &thread_main, t)) { //给每个thread结构分配一个线程,去执行thread_main函数
char *msg = strerror(errno);
fprintf(stderr, "unable to create thread %"PRIu64": %s\n", i, msg);
exit(2);
}
}
struct sigaction sa = {
.sa_handler = handler,
.sa_flags = 0,
};
sigfillset(&sa.sa_mask);
sigaction(SIGINT, &sa, NULL);
char *time = format_time_s(cfg.duration);
printf("Running %s test @ %s\n", time, url);
printf(" %"PRIu64" threads and %"PRIu64" connections\n", cfg.threads, cfg.connections);
uint64_t start = time_us();
uint64_t complete = 0;
uint64_t bytes = 0;
errors errors = { 0 };
sleep(cfg.duration); //让主线程休眠 参数为duration秒
stop = 1; //设置stop为停止
for (uint64_t i = 0; i < cfg.threads; i++) { //处理每个线程执行的结果
thread *t = &threads[i];
pthread_join(t->thread, NULL);
complete += t->complete; //
bytes += t->bytes; //
errors.connect += t->errors.connect; //统计错误的 连接数/读取数/写入数/超时数/状态
errors.read += t->errors.read;
errors.write += t->errors.write;
errors.timeout += t->errors.timeout;
errors.status += t->errors.status;
}
uint64_t runtime_us = time_us() - start;
long double runtime_s = runtime_us / 1000000.0;
long double req_per_s = complete / runtime_s;
long double bytes_per_s = bytes / runtime_s;
if (complete / cfg.connections > 0) {
int64_t interval = runtime_us / (complete / cfg.connections);
stats_correct(statistics.latency, interval);
}
print_stats_header();
print_stats("Latency", statistics.latency, format_time_us);
print_stats("Req/Sec", statistics.requests, format_metric);
if (cfg.latency) print_stats_latency(statistics.latency);
char *runtime_msg = format_time_us(runtime_us);
printf(" %"PRIu64" requests in %s, %sB read\n", complete, runtime_msg, format_binary(bytes));
if (errors.connect || errors.read || errors.write || errors.timeout) {
printf(" Socket errors: connect %d, read %d, write %d, timeout %d\n",
errors.connect, errors.read, errors.write, errors.timeout);
}
if (errors.status) {
printf(" Non-2xx or 3xx responses: %d\n", errors.status);
}
printf("Requests/sec: %9.2Lf\n", req_per_s);
printf("Transfer/sec: %10sB\n", format_binary(bytes_per_s));
if (script_has_done(L)) {
script_summary(L, runtime_us, complete, bytes);
script_errors(L, &errors);
script_done(L, statistics.latency, statistics.requests);
}
return 0;
}
void *thread_main(void *arg) {
thread *thread = arg;
char *request = NULL;
size_t length = 0;
if (!cfg.dynamic) {
script_request(thread->L, &request, &length);
}
thread->cs = zcalloc(thread->connections * sizeof(connection)); //给当前线程的请求数分配请求对象
connection *c = thread->cs; //
for (uint64_t i = 0; i < thread->connections; i++, c++) {
c->thread = thread;
c->ssl = cfg.ctx ? SSL_new(cfg.ctx) : NULL;
c->request = request;
c->length = length;
c->delayed = cfg.delay;
connect_socket(thread, c); //根据thread对象和connection对象创建socket
}
aeEventLoop *loop = thread->loop;
aeCreateTimeEvent(loop, RECORD_INTERVAL_MS, record_rate, thread, NULL);
thread->start = time_us(); //给thread对象添加开始时间
aeMain(loop);
aeDeleteEventLoop(loop);
zfree(thread->cs);
return NULL;
}
//
static int connect_socket(thread *thread, connection *c) {
struct addrinfo *addr = thread->addr;
struct aeEventLoop *loop = thread->loop;
int fd, flags;
fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); //创建socket
flags = fcntl(fd, F_GETFL, 0); //获取socket文件描述符 文件特性
fcntl(fd, F_SETFL, flags | O_NONBLOCK); //设置socket文件描述符为非阻塞
if (connect(fd, addr->ai_addr, addr->ai_addrlen) == -1) { //连接
if (errno != EINPROGRESS) goto error;
}
flags = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags)); //TCP_NODELAY 提高实时性
flags = AE_READABLE | AE_WRITABLE; //进行位运算,1|2 ===> 3
if (aeCreateFileEvent(loop, fd, flags, socket_connected, c) == AE_OK) {
c->parser.data = c;
c->fd = fd;
return fd;
}
error:
thread->errors.connect++;
close(fd);
return -1;
}
static int reconnect_socket(thread *thread, connection *c) {
aeDeleteFileEvent(thread->loop, c->fd, AE_WRITABLE | AE_READABLE);
sock.close(c);
close(c->fd);
return connect_socket(thread, c);
}
static int record_rate(aeEventLoop *loop, long long id, void *data) {
thread *thread = data;
if (thread->requests > 0) {
uint64_t elapsed_ms = (time_us() - thread->start) / 1000;
uint64_t requests = (thread->requests / (double) elapsed_ms) * 1000;
stats_record(statistics.requests, requests);
thread->requests = 0;
thread->start = time_us();
}
if (stop) aeStop(loop);
return RECORD_INTERVAL_MS;
}
static int delay_request(aeEventLoop *loop, long long id, void *data) {
connection *c = data;
c->delayed = false;
aeCreateFileEvent(loop, c->fd, AE_WRITABLE, socket_writeable, c);
return AE_NOMORE;
}
static int header_field(http_parser *parser, const char *at, size_t len) {
connection *c = parser->data;
if (c->state == VALUE) {
*c->headers.cursor++ = '\0';
c->state = FIELD;
}
buffer_append(&c->headers, at, len);
return 0;
}
static int header_value(http_parser *parser, const char *at, size_t len) {
connection *c = parser->data;
if (c->state == FIELD) {
*c->headers.cursor++ = '\0';
c->state = VALUE;
}
buffer_append(&c->headers, at, len);
return 0;
}
static int response_body(http_parser *parser, const char *at, size_t len) {
connection *c = parser->data;
buffer_append(&c->body, at, len);
return 0;
}
static int response_complete(http_parser *parser) {
connection *c = parser->data;
thread *thread = c->thread;
uint64_t now = time_us();
int status = parser->status_code;
thread->complete++;
thread->requests++;
if (status > 399) {
thread->errors.status++;
}
if (c->headers.buffer) {
*c->headers.cursor++ = '\0';
script_response(thread->L, status, &c->headers, &c->body);
c->state = FIELD;
}
if (--c->pending == 0) {
if (!stats_record(statistics.latency, now - c->start)) {
thread->errors.timeout++;
}
c->delayed = cfg.delay;
aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
}
if (!http_should_keep_alive(parser)) {
reconnect_socket(thread, c);
goto done;
}
http_parser_init(parser, HTTP_RESPONSE);
done:
return 0;
}
static void socket_connected(aeEventLoop *loop, int fd, void *data, int mask) {
connection *c = data;
switch (sock.connect(c, cfg.host)) {
case OK: break;
case ERROR: goto error;
case RETRY: return;
}
http_parser_init(&c->parser, HTTP_RESPONSE);
c->written = 0;
aeCreateFileEvent(c->thread->loop, fd, AE_READABLE, socket_readable, c);
aeCreateFileEvent(c->thread->loop, fd, AE_WRITABLE, socket_writeable, c);
return;
error:
c->thread->errors.connect++;
reconnect_socket(c->thread, c);
}
static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) {
connection *c = data;
thread *thread = c->thread;
if (c->delayed) {
uint64_t delay = script_delay(thread->L);
aeDeleteFileEvent(loop, fd, AE_WRITABLE);
aeCreateTimeEvent(loop, delay, delay_request, c, NULL);
return;
}
if (!c->written) {
if (cfg.dynamic) {
script_request(thread->L, &c->request, &c->length);
}
c->start = time_us();
c->pending = cfg.pipeline;
}
char *buf = c->request + c->written;
size_t len = c->length - c->written;
size_t n;
switch (sock.write(c, buf, len, &n)) {
case OK: break;
case ERROR: goto error;
case RETRY: return;
}
c->written += n;
if (c->written == c->length) {
c->written = 0;
aeDeleteFileEvent(loop, fd, AE_WRITABLE);
}
return;
error:
thread->errors.write++;
reconnect_socket(thread, c);
}
static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
connection *c = data;
size_t n;
do {
switch (sock.read(c, &n)) {
case OK: break;
case ERROR: goto error;
case RETRY: return;
}
if (http_parser_execute(&c->parser, &parser_settings, c->buf, n) != n) goto error;
if (n == 0 && !http_body_is_final(&c->parser)) goto error;
c->thread->bytes += n;
} while (n == RECVBUF && sock.readable(c) > 0);
return;
error:
c->thread->errors.read++;
reconnect_socket(c->thread, c);
}
static uint64_t time_us() {
struct timeval t;
gettimeofday(&t, NULL);
return (t.tv_sec * 1000000) + t.tv_usec;
}
static char *copy_url_part(char *url, struct http_parser_url *parts, enum http_parser_url_fields field) {
char *part = NULL;
if (parts->field_set & (1 << field)) {
uint16_t off = parts->field_data[field].off;
uint16_t len = parts->field_data[field].len;
part = zcalloc(len + 1 * sizeof(char));
memcpy(part, &url[off], len);
}
return part;
}
static struct option longopts[] = {
{ "connections", required_argument, NULL, 'c' },
{ "duration", required_argument, NULL, 'd' },
{ "threads", required_argument, NULL, 't' },
{ "script", required_argument, NULL, 's' },
{ "header", required_argument, NULL, 'H' },
{ "latency", no_argument, NULL, 'L' },
{ "timeout", required_argument, NULL, 'T' },
{ "help", no_argument, NULL, 'h' },
{ "version", no_argument, NULL, 'v' },
{ NULL, 0, NULL, 0 }
};
static int parse_args(struct config *cfg, char **url, struct http_parser_url *parts, char **headers, int argc, char **argv) {
char **header = headers;
int c;
memset(cfg, 0, sizeof(struct config));
cfg->threads = 2;
cfg->connections = 10;
cfg->duration = 10;
cfg->timeout = SOCKET_TIMEOUT_MS;
while ((c = getopt_long(argc, argv, "t:c:d:s:H:T:Lrv?", longopts, NULL)) != -1) {
switch (c) {
case 't':
if (scan_metric(optarg, &cfg->threads)) return -1;
break;
case 'c':
if (scan_metric(optarg, &cfg->connections)) return -1;
break;
case 'd':
if (scan_time(optarg, &cfg->duration)) return -1;
break;
case 's':
cfg->script = optarg;
break;
case 'H':
*header++ = optarg;
break;
case 'L':
cfg->latency = true;
break;
case 'T':
if (scan_time(optarg, &cfg->timeout)) return -1;
cfg->timeout *= 1000;
break;
case 'v':
printf("wrk %s [%s] ", VERSION, aeGetApiName());
printf("Copyright (C) 2012 Will Glozer\n");
break;
case 'h':
case '?':
case ':':
default:
return -1;
}
}
if (optind == argc || !cfg->threads || !cfg->duration) return -1;
if (!script_parse_url(argv[optind], parts)) {
fprintf(stderr, "invalid URL: %s\n", argv[optind]);
return -1;
}
if (!cfg->connections || cfg->connections < cfg->threads) {
fprintf(stderr, "number of connections must be >= threads\n");
return -1;
}
*url = argv[optind];
*header = NULL;
return 0;
}
static void print_stats_header() {
printf(" Thread Stats%6s%11s%8s%12s\n", "Avg", "Stdev", "Max", "+/- Stdev");
}
static void print_units(long double n, char *(*fmt)(long double), int width) {
char *msg = fmt(n);
int len = strlen(msg), pad = 2;
if (isalpha(msg[len-1])) pad--;
if (isalpha(msg[len-2])) pad--;
width -= pad;
printf("%*.*s%.*s", width, width, msg, pad, " ");
free(msg);
}
static void print_stats(char *name, stats *stats, char *(*fmt)(long double)) {
uint64_t max = stats->max;
long double mean = stats_mean(stats);
long double stdev = stats_stdev(stats, mean);
printf(" %-10s", name);
print_units(mean, fmt, 8);
print_units(stdev, fmt, 10);
print_units(max, fmt, 9);
printf("%8.2Lf%%\n", stats_within_stdev(stats, mean, stdev, 1));
}
static void print_stats_latency(stats *stats) {
long double percentiles[] = { 50.0, 75.0, 90.0, 99.0 };
printf(" Latency Distribution\n");
for (size_t i = 0; i < sizeof(percentiles) / sizeof(long double); i++) {
long double p = percentiles[i];
uint64_t n = stats_percentile(stats, p);
printf("%7.0Lf%%", p);
print_units(n, format_time_us, 10);
printf("\n");
}
}
C
1
https://gitee.com/yt39/wrk.git
git@gitee.com:yt39/wrk.git
yt39
wrk
wrk
master

搜索帮助