1 Star 0 Fork 1

frank / memcached

forked from 苏小逝 / memcached 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
storage.c 22.26 KB
一键复制 编辑 原始数据 按行查看 历史
dormando 提交于 2020-04-11 22:19 . extstore: fix some valgrind errors.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include "memcached.h"
#ifdef EXTSTORE
#include "storage.h"
#include <stdlib.h>
#include <stdio.h>
#include <stddef.h>
#include <string.h>
#include <limits.h>
#include <ctype.h>
#define PAGE_BUCKET_DEFAULT 0
#define PAGE_BUCKET_COMPACT 1
#define PAGE_BUCKET_CHUNKED 2
#define PAGE_BUCKET_LOWTTL 3
/*** WRITE FLUSH THREAD ***/
static int storage_write(void *storage, const int clsid, const int item_age) {
int did_moves = 0;
struct lru_pull_tail_return it_info;
it_info.it = NULL;
lru_pull_tail(clsid, COLD_LRU, 0, LRU_PULL_RETURN_ITEM, 0, &it_info);
/* Item is locked, and we have a reference to it. */
if (it_info.it == NULL) {
return did_moves;
}
obj_io io;
item *it = it_info.it;
/* First, storage for the header object */
size_t orig_ntotal = ITEM_ntotal(it);
uint32_t flags;
if ((it->it_flags & ITEM_HDR) == 0 &&
(item_age == 0 || current_time - it->time > item_age)) {
FLAGS_CONV(it, flags);
item *hdr_it = do_item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, sizeof(item_hdr));
/* Run the storage write understanding the start of the item is dirty.
* We will fill it (time/exptime/etc) from the header item on read.
*/
if (hdr_it != NULL) {
int bucket = (it->it_flags & ITEM_CHUNKED) ?
PAGE_BUCKET_CHUNKED : PAGE_BUCKET_DEFAULT;
// Compress soon to expire items into similar pages.
if (it->exptime - current_time < settings.ext_low_ttl) {
bucket = PAGE_BUCKET_LOWTTL;
}
hdr_it->it_flags |= ITEM_HDR;
io.len = orig_ntotal;
io.mode = OBJ_IO_WRITE;
// NOTE: when the item is read back in, the slab mover
// may see it. Important to have refcount>=2 or ~ITEM_LINKED
assert(it->refcount >= 2);
// NOTE: write bucket vs free page bucket will disambiguate once
// lowttl feature is better understood.
if (extstore_write_request(storage, bucket, bucket, &io) == 0) {
// cuddle the hash value into the time field so we don't have
// to recalculate it.
item *buf_it = (item *) io.buf;
buf_it->time = it_info.hv;
// copy from past the headers + time headers.
// TODO: should be in items.c
if (it->it_flags & ITEM_CHUNKED) {
// Need to loop through the item and copy
item_chunk *sch = (item_chunk *) ITEM_schunk(it);
int remain = orig_ntotal;
int copied = 0;
// copy original header
int hdrtotal = ITEM_ntotal(it) - it->nbytes;
memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, hdrtotal - STORE_OFFSET);
copied = hdrtotal;
// copy data in like it were one large object.
while (sch && remain) {
assert(remain >= sch->used);
memcpy((char *)io.buf+copied, sch->data, sch->used);
// FIXME: use one variable?
remain -= sch->used;
copied += sch->used;
sch = sch->next;
}
} else {
memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, io.len-STORE_OFFSET);
}
// crc what we copied so we can do it sequentially.
buf_it->it_flags &= ~ITEM_LINKED;
buf_it->exptime = crc32c(0, (char*)io.buf+STORE_OFFSET, orig_ntotal-STORE_OFFSET);
extstore_write(storage, &io);
item_hdr *hdr = (item_hdr *) ITEM_data(hdr_it);
hdr->page_version = io.page_version;
hdr->page_id = io.page_id;
hdr->offset = io.offset;
// overload nbytes for the header it
hdr_it->nbytes = it->nbytes;
/* success! Now we need to fill relevant data into the new
* header and replace. Most of this requires the item lock
*/
/* CAS gets set while linking. Copy post-replace */
item_replace(it, hdr_it, it_info.hv);
ITEM_set_cas(hdr_it, ITEM_get_cas(it));
do_item_remove(hdr_it);
did_moves = 1;
LOGGER_LOG(NULL, LOG_EVICTIONS, LOGGER_EXTSTORE_WRITE, it, bucket);
} else {
/* Failed to write for some reason, can't continue. */
slabs_free(hdr_it, ITEM_ntotal(hdr_it), ITEM_clsid(hdr_it));
}
}
}
do_item_remove(it);
item_unlock(it_info.hv);
return did_moves;
}
static pthread_t storage_write_tid;
static pthread_mutex_t storage_write_plock;
#define WRITE_SLEEP_MAX 1000000
#define WRITE_SLEEP_MIN 500
static void *storage_write_thread(void *arg) {
void *storage = arg;
// NOTE: ignoring overflow since that would take years of uptime in a
// specific load pattern of never going to sleep.
unsigned int backoff[MAX_NUMBER_OF_SLAB_CLASSES] = {0};
unsigned int counter = 0;
useconds_t to_sleep = WRITE_SLEEP_MIN;
logger *l = logger_create();
if (l == NULL) {
fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
abort();
}
pthread_mutex_lock(&storage_write_plock);
while (1) {
// cache per-loop to avoid calls to the slabs_clsid() search loop
int min_class = slabs_clsid(settings.ext_item_size);
bool do_sleep = true;
counter++;
if (to_sleep > WRITE_SLEEP_MAX)
to_sleep = WRITE_SLEEP_MAX;
for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
bool did_move = false;
bool mem_limit_reached = false;
unsigned int chunks_free;
int item_age;
int target = settings.ext_free_memchunks[x];
if (min_class > x || (backoff[x] && (counter % backoff[x] != 0))) {
// Long sleeps means we should retry classes sooner.
if (to_sleep > WRITE_SLEEP_MIN * 10)
backoff[x] /= 2;
continue;
}
// Avoid extra slab lock calls during heavy writing.
chunks_free = slabs_available_chunks(x, &mem_limit_reached,
NULL);
// storage_write() will fail and cut loop after filling write buffer.
while (1) {
// if we are low on chunks and no spare, push out early.
if (chunks_free < target && mem_limit_reached) {
item_age = 0;
} else {
item_age = settings.ext_item_age;
}
if (storage_write(storage, x, item_age)) {
chunks_free++; // Allow stopping if we've done enough this loop
did_move = true;
do_sleep = false;
if (to_sleep > WRITE_SLEEP_MIN)
to_sleep /= 2;
} else {
break;
}
}
if (!did_move) {
backoff[x]++;
} else if (backoff[x]) {
backoff[x] /= 2;
}
}
// flip lock so we can be paused or stopped
pthread_mutex_unlock(&storage_write_plock);
if (do_sleep) {
usleep(to_sleep);
to_sleep *= 2;
}
pthread_mutex_lock(&storage_write_plock);
}
return NULL;
}
// TODO
// logger needs logger_destroy() to exist/work before this is safe.
/*int stop_storage_write_thread(void) {
int ret;
pthread_mutex_lock(&lru_maintainer_lock);
do_run_lru_maintainer_thread = 0;
pthread_mutex_unlock(&lru_maintainer_lock);
// WAKEUP SIGNAL
if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
return -1;
}
settings.lru_maintainer_thread = false;
return 0;
}*/
void storage_write_pause(void) {
pthread_mutex_lock(&storage_write_plock);
}
void storage_write_resume(void) {
pthread_mutex_unlock(&storage_write_plock);
}
int start_storage_write_thread(void *arg) {
int ret;
pthread_mutex_init(&storage_write_plock, NULL);
if ((ret = pthread_create(&storage_write_tid, NULL,
storage_write_thread, arg)) != 0) {
fprintf(stderr, "Can't create storage_write thread: %s\n",
strerror(ret));
return -1;
}
return 0;
}
/*** COMPACTOR ***/
/* Fetch stats from the external storage system and decide to compact.
* If we're more than half full, start skewing how aggressively to run
* compaction, up to a desired target when all pages are full.
*/
static int storage_compact_check(void *storage, logger *l,
uint32_t *page_id, uint64_t *page_version,
uint64_t *page_size, bool *drop_unread) {
struct extstore_stats st;
int x;
double rate;
uint64_t frag_limit;
uint64_t low_version = ULLONG_MAX;
uint64_t lowest_version = ULLONG_MAX;
unsigned int low_page = 0;
unsigned int lowest_page = 0;
extstore_get_stats(storage, &st);
if (st.pages_used == 0)
return 0;
// lets pick a target "wasted" value and slew.
if (st.pages_free > settings.ext_compact_under)
return 0;
*drop_unread = false;
// the number of free pages reduces the configured frag limit
// this allows us to defrag early if pages are very empty.
rate = 1.0 - ((double)st.pages_free / st.page_count);
rate *= settings.ext_max_frag;
frag_limit = st.page_size * rate;
LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_FRAGINFO,
NULL, rate, frag_limit);
st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
extstore_get_page_data(storage, &st);
// find oldest page by version that violates the constraint
for (x = 0; x < st.page_count; x++) {
if (st.page_data[x].version == 0 ||
st.page_data[x].bucket == PAGE_BUCKET_LOWTTL)
continue;
if (st.page_data[x].version < lowest_version) {
lowest_page = x;
lowest_version = st.page_data[x].version;
}
if (st.page_data[x].bytes_used < frag_limit) {
if (st.page_data[x].version < low_version) {
low_page = x;
low_version = st.page_data[x].version;
}
}
}
*page_size = st.page_size;
free(st.page_data);
// we have a page + version to attempt to reclaim.
if (low_version != ULLONG_MAX) {
*page_id = low_page;
*page_version = low_version;
return 1;
} else if (lowest_version != ULLONG_MAX && settings.ext_drop_unread
&& st.pages_free <= settings.ext_drop_under) {
// nothing matched the frag rate barrier, so pick the absolute oldest
// version if we're configured to drop items.
*page_id = lowest_page;
*page_version = lowest_version;
*drop_unread = true;
return 1;
}
return 0;
}
static pthread_t storage_compact_tid;
static pthread_mutex_t storage_compact_plock;
#define MIN_STORAGE_COMPACT_SLEEP 10000
#define MAX_STORAGE_COMPACT_SLEEP 2000000
struct storage_compact_wrap {
obj_io io;
pthread_mutex_t lock; // gates the bools.
bool done;
bool submitted;
bool miss; // version flipped out from under us
};
static void storage_compact_readback(void *storage, logger *l,
bool drop_unread, char *readback_buf,
uint32_t page_id, uint64_t page_version, uint64_t read_size) {
uint64_t offset = 0;
unsigned int rescues = 0;
unsigned int lost = 0;
unsigned int skipped = 0;
while (offset < read_size) {
item *hdr_it = NULL;
item_hdr *hdr = NULL;
item *it = (item *)(readback_buf+offset);
unsigned int ntotal;
// probably zeroed out junk at the end of the wbuf
if (it->nkey == 0) {
break;
}
ntotal = ITEM_ntotal(it);
uint32_t hv = (uint32_t)it->time;
item_lock(hv);
// We don't have a conn and don't need to do most of do_item_get
hdr_it = assoc_find(ITEM_key(it), it->nkey, hv);
if (hdr_it != NULL) {
bool do_write = false;
refcount_incr(hdr_it);
// Check validity but don't bother removing it.
if ((hdr_it->it_flags & ITEM_HDR) && !item_is_flushed(hdr_it) &&
(hdr_it->exptime == 0 || hdr_it->exptime > current_time)) {
hdr = (item_hdr *)ITEM_data(hdr_it);
if (hdr->page_id == page_id && hdr->page_version == page_version) {
// Item header is still completely valid.
extstore_delete(storage, page_id, page_version, 1, ntotal);
// drop inactive items.
if (drop_unread && GET_LRU(hdr_it->slabs_clsid) == COLD_LRU) {
do_write = false;
skipped++;
} else {
do_write = true;
}
}
}
if (do_write) {
bool do_update = false;
int tries;
obj_io io;
io.len = ntotal;
io.mode = OBJ_IO_WRITE;
for (tries = 10; tries > 0; tries--) {
if (extstore_write_request(storage, PAGE_BUCKET_COMPACT, PAGE_BUCKET_COMPACT, &io) == 0) {
memcpy(io.buf, it, io.len);
extstore_write(storage, &io);
do_update = true;
break;
} else {
usleep(1000);
}
}
if (do_update) {
if (it->refcount == 2) {
hdr->page_version = io.page_version;
hdr->page_id = io.page_id;
hdr->offset = io.offset;
rescues++;
} else {
lost++;
// TODO: re-alloc and replace header.
}
} else {
lost++;
}
}
do_item_remove(hdr_it);
}
item_unlock(hv);
offset += ntotal;
if (read_size - offset < sizeof(struct _stritem))
break;
}
STATS_LOCK();
stats.extstore_compact_lost += lost;
stats.extstore_compact_rescues += rescues;
stats.extstore_compact_skipped += skipped;
STATS_UNLOCK();
LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_END,
NULL, page_id, offset, rescues, lost, skipped);
}
static void _storage_compact_cb(void *e, obj_io *io, int ret) {
struct storage_compact_wrap *wrap = (struct storage_compact_wrap *)io->data;
assert(wrap->submitted == true);
pthread_mutex_lock(&wrap->lock);
if (ret < 1) {
wrap->miss = true;
}
wrap->done = true;
pthread_mutex_unlock(&wrap->lock);
}
// TODO: hoist the storage bits from lru_maintainer_thread in here.
// would be nice if they could avoid hammering the same locks though?
// I guess it's only COLD. that's probably fine.
static void *storage_compact_thread(void *arg) {
void *storage = arg;
useconds_t to_sleep = MAX_STORAGE_COMPACT_SLEEP;
bool compacting = false;
uint64_t page_version = 0;
uint64_t page_size = 0;
uint64_t page_offset = 0;
uint32_t page_id = 0;
bool drop_unread = false;
char *readback_buf = NULL;
struct storage_compact_wrap wrap;
logger *l = logger_create();
if (l == NULL) {
fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
abort();
}
readback_buf = malloc(settings.ext_wbuf_size);
if (readback_buf == NULL) {
fprintf(stderr, "Failed to allocate readback buffer for storage compaction thread\n");
abort();
}
pthread_mutex_init(&wrap.lock, NULL);
wrap.done = false;
wrap.submitted = false;
wrap.io.data = &wrap;
wrap.io.iov = NULL;
wrap.io.buf = (void *)readback_buf;
wrap.io.len = settings.ext_wbuf_size;
wrap.io.mode = OBJ_IO_READ;
wrap.io.cb = _storage_compact_cb;
pthread_mutex_lock(&storage_compact_plock);
while (1) {
pthread_mutex_unlock(&storage_compact_plock);
if (to_sleep) {
extstore_run_maint(storage);
usleep(to_sleep);
}
pthread_mutex_lock(&storage_compact_plock);
if (!compacting && storage_compact_check(storage, l,
&page_id, &page_version, &page_size, &drop_unread)) {
page_offset = 0;
compacting = true;
LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_START,
NULL, page_id, page_version);
}
if (compacting) {
pthread_mutex_lock(&wrap.lock);
if (page_offset < page_size && !wrap.done && !wrap.submitted) {
wrap.io.page_version = page_version;
wrap.io.page_id = page_id;
wrap.io.offset = page_offset;
// FIXME: should be smarter about io->next (unlink at use?)
wrap.io.next = NULL;
wrap.submitted = true;
wrap.miss = false;
extstore_submit(storage, &wrap.io);
} else if (wrap.miss) {
LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_ABORT,
NULL, page_id);
wrap.done = false;
wrap.submitted = false;
compacting = false;
} else if (wrap.submitted && wrap.done) {
LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_START,
NULL, page_id, page_offset);
storage_compact_readback(storage, l, drop_unread,
readback_buf, page_id, page_version, settings.ext_wbuf_size);
page_offset += settings.ext_wbuf_size;
wrap.done = false;
wrap.submitted = false;
} else if (page_offset >= page_size) {
compacting = false;
wrap.done = false;
wrap.submitted = false;
extstore_close_page(storage, page_id, page_version);
LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_END,
NULL, page_id);
}
pthread_mutex_unlock(&wrap.lock);
if (to_sleep > MIN_STORAGE_COMPACT_SLEEP)
to_sleep /= 2;
} else {
if (to_sleep < MAX_STORAGE_COMPACT_SLEEP)
to_sleep += MIN_STORAGE_COMPACT_SLEEP;
}
}
free(readback_buf);
return NULL;
}
// TODO
// logger needs logger_destroy() to exist/work before this is safe.
/*int stop_storage_compact_thread(void) {
int ret;
pthread_mutex_lock(&lru_maintainer_lock);
do_run_lru_maintainer_thread = 0;
pthread_mutex_unlock(&lru_maintainer_lock);
if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
return -1;
}
settings.lru_maintainer_thread = false;
return 0;
}*/
void storage_compact_pause(void) {
pthread_mutex_lock(&storage_compact_plock);
}
void storage_compact_resume(void) {
pthread_mutex_unlock(&storage_compact_plock);
}
int start_storage_compact_thread(void *arg) {
int ret;
pthread_mutex_init(&storage_compact_plock, NULL);
if ((ret = pthread_create(&storage_compact_tid, NULL,
storage_compact_thread, arg)) != 0) {
fprintf(stderr, "Can't create storage_compact thread: %s\n",
strerror(ret));
return -1;
}
return 0;
}
/*** UTILITY ***/
// /path/to/file:100G:bucket1
// FIXME: Modifies argument. copy instead?
struct extstore_conf_file *storage_conf_parse(char *arg, unsigned int page_size) {
struct extstore_conf_file *cf = NULL;
char *b = NULL;
char *p = strtok_r(arg, ":", &b);
char unit = 0;
uint64_t multiplier = 0;
int base_size = 0;
if (p == NULL)
goto error;
// First arg is the filepath.
cf = calloc(1, sizeof(struct extstore_conf_file));
cf->file = strdup(p);
p = strtok_r(NULL, ":", &b);
if (p == NULL) {
fprintf(stderr, "must supply size to ext_path, ie: ext_path=/f/e:64m (M|G|T|P supported)\n");
goto error;
}
unit = tolower(p[strlen(p)-1]);
p[strlen(p)-1] = '\0';
// sigh.
switch (unit) {
case 'm':
multiplier = 1024 * 1024;
break;
case 'g':
multiplier = 1024 * 1024 * 1024;
break;
case 't':
multiplier = 1024 * 1024;
multiplier *= 1024 * 1024;
break;
case 'p':
multiplier = 1024 * 1024;
multiplier *= 1024 * 1024 * 1024;
break;
}
base_size = atoi(p);
multiplier *= base_size;
// page_count is nearest-but-not-larger-than pages * psize
cf->page_count = multiplier / page_size;
assert(page_size * cf->page_count <= multiplier);
// final token would be a default free bucket
p = strtok_r(NULL, ",", &b);
// TODO: We reuse the original DEFINES for now,
// but if lowttl gets split up this needs to be its own set.
if (p != NULL) {
if (strcmp(p, "compact") == 0) {
cf->free_bucket = PAGE_BUCKET_COMPACT;
} else if (strcmp(p, "lowttl") == 0) {
cf->free_bucket = PAGE_BUCKET_LOWTTL;
} else if (strcmp(p, "chunked") == 0) {
cf->free_bucket = PAGE_BUCKET_CHUNKED;
} else if (strcmp(p, "default") == 0) {
cf->free_bucket = PAGE_BUCKET_DEFAULT;
} else {
fprintf(stderr, "Unknown extstore bucket: %s\n", p);
goto error;
}
} else {
// TODO: is this necessary?
cf->free_bucket = PAGE_BUCKET_DEFAULT;
}
// TODO: disabling until compact algorithm is improved.
if (cf->free_bucket != PAGE_BUCKET_DEFAULT) {
fprintf(stderr, "ext_path only presently supports the default bucket\n");
goto error;
}
return cf;
error:
if (cf) {
if (cf->file)
free(cf->file);
free(cf);
}
return NULL;
}
#endif
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/test_yx/memcached.git
git@gitee.com:test_yx/memcached.git
test_yx
memcached
memcached
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891