6 Star 12 Fork 6

覃攀 / cothread2

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
cothread_demo.c 13.45 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
/* 简介:cothread 是一个轻量级协程调度器,由纯C语言实现,易于移植到各种单片机。
* 同时,由于该调度器仅仅运行在一个实际线程中,所以它也适用于服务器高并发场景。
*
* 版本: 1.0.0 2019/02/25
*
* 作者: 覃攀 <qinpan1003@qq.com>
*
*/
#include "rtos.h"
/* 通过 switch-case 来模拟 OS 的线程调度*/
static coresult_t main_thread_1(ccb_t *ccb)
{
switch (ccb->ret_addr)
{
case 0:
LOG("%s hello world 1!\n", __FUNCTION__);
ccb->ret_addr = __LINE__ + 3;
cothread_yeild(ccb);
return STATUS_CONTINUE;
case __LINE__:
LOG("%s hello world 2!\n", __FUNCTION__);
ccb->ret_addr = __LINE__ + 3;
cothread_sleep(ccb, 10);
return STATUS_CONTINUE;
case __LINE__:
LOG("%s hello world 3!\n", __FUNCTION__);
ccb->ret_addr = __LINE__ + 3;
cothread_wait(ccb, 1, 10);
return STATUS_CONTINUE;
case __LINE__:
return STATUS_DONE;
default:
return STATUS_DONE;
}
}
/* 通过 switch-case 宏定义来模拟 OS 的线程调度 */
static coresult_t main_thread_2(ccb_t *ccb)
{
thread_start();
LOG("%s hello world 1!\n", __FUNCTION__);
thread_yeild();
LOG("%s hello world 2!\n", __FUNCTION__);
thread_sleep(10);
LOG("%s hello world 3!\n", __FUNCTION__);
thread_wait(1, 10);
thread_end();
}
ccb_t *ccb5 = NULL;
struct main_thread_3_param {
int i;
int j;
char *name;
}param_3, param_3_1;
struct user_msg {
unsigned char *buff;
int count;
};
static mqd_t mq = NULL;
static coresult_t main_thread_3(ccb_t *ccb)
{
struct user_msg msg;
struct main_thread_3_param *param = ccb->arg;
thread_start();
param->i = 0;
while (param->i < 5)
{
LOG("%s %s while yeild %d!\n", __FUNCTION__, param->name, param->i);
thread_yeild();
param->i++;
}
for (param->i = 0; param->i < 5; param->i++)
{
LOG("%s %s for yeild %d!\n", __FUNCTION__, param->name, param->i);
if (!strcmp(param->name, "thread3-1"))
thread_yeild();
}
if (!strcmp(param->name, "thread3"))
thread_delete();
for (param->j = 0; param->j < 5; param->j++)
{
LOG("%s %s delay %d!\n", __FUNCTION__, param->name, param->j);
thread_sleep(10);
}
LOG("%s %s wait 3!\n", __FUNCTION__, param->name);
thread_wait(1, 10);
mq = mq_open("mq_3_5", 20, sizeof(struct user_msg), 200);
while (1)
{
if (ccb5 != NULL)
{
LOG("%s %s send signal to ccb5\n", __FUNCTION__, param->name);
thread_signal(ccb5, 1 << 29);
}
thread_sleep(1000);
msg.count = strlen("hello world") + 1;
msg.buff = malloc(msg.count);
if (msg.buff == NULL)
{
LOG_ERR("[%s] malloc failed\n", __FUNCTION__);
return -1;
}
memcpy(msg.buff, "hello world", msg.count);
mq_send(mq, (const char *)&msg, sizeof(msg), 0);
}
thread_end();
}
struct main_thread_4_param {
int i;
}param_4;
/* 线程子函数内部可以使用 switch-case */
/* 线程子函数内部可以使用 signal,create 函数 */
static int main_thread_4_sub_fun(ccb_t *ccb)
{
struct main_thread_4_param *param = ccb->arg;
switch (param->i)
{
case 1:
LOG("%s hello world 1!\n", __FUNCTION__);
if (ccb5 != NULL)
thread_signal(ccb5, 1 << param->i);
/* 线程内部子函数可以创建其他线程 */
param_3_1.name = "thread3-1";
thread_create(main_thread_3, &param_3_1, THREAD_PRIO_HIGH);
break;
case 2:
LOG("%s hello world 2!\n", __FUNCTION__);
if (ccb5 != NULL)
thread_signal(ccb5, 1 << param->i);
break;
case 3:
LOG("%s hello world 3!\n", __FUNCTION__);
if (ccb5 != NULL)
thread_signal(ccb5, 1 << param->i);
break;
case 4:
LOG("%s hello world 4!\n", __FUNCTION__);
if (ccb5 != NULL)
thread_signal(ccb5, 1 << param->i);
break;
case 5:
LOG("%s hello world 5!\n", __FUNCTION__);
if (ccb5 != NULL)
thread_signal(ccb5, 1 << param->i);
break;
case 6:
LOG("%s hello world 6!\n", __FUNCTION__);
if (ccb5 != NULL)
thread_signal(ccb5, 1 << param->i);
break;
default:
if (ccb5 != NULL)
thread_signal(ccb5, 1 << param->i);
return -1;
}
param->i++;
return 0;
}
static void timer_test(void *arg)
{
LOG("[%s][%s][%d]\n", __FUNCTION__, (char *)arg, get_system_tick());
}
static coresult_t main_thread_4(ccb_t *ccb)
{
thread_start();
struct main_thread_4_param *param = ccb->arg;
param->i = 1;
/* 线程内部可以创建其他线程 */
param_3.name = "thread3";
thread_create(main_thread_3, &param_3, THREAD_PRIO_HIGH);
timer_set(1000, 2000, timer_test, "hello timer test.");
while (1)
{
int ret = main_thread_4_sub_fun(ccb);
if (ret != 0)
break;
thread_yeild();
}
LOG("%s exit\n", __FUNCTION__);
thread_end();
}
static coresult_t main_thread_5(ccb_t *ccb)
{
thread_start();
/* 线程内部可以创建其他线程 */
thread_create(main_thread_4, &param_4, THREAD_PRIO_HIGH);
while (1)
{
thread_wait(~0, 0);
if (ccb->event_wakeup)
{
unsigned int wakeup_event = ccb->event_mask & ccb->event_signaled;
LOG("%s event wakeup 0x%08x\n", __FUNCTION__, wakeup_event);
ccb->event_signaled &= ~wakeup_event;
}
else
{
LOG("%s wait timeout.\n", __FUNCTION__);
}
if (mq != NULL)
{
struct user_msg msg;
mq_receive(mq, (char *)&msg, sizeof(msg), NULL);
LOG("%s receive msg:%d, %s.\n", __FUNCTION__, msg.count, msg.buff);
free(msg.buff);
}
}
thread_end();
}
static coresult_t main_thread_6_1(ccb_t *ccb)
{
long i = (long)ccb->arg;
thread_start();
thread_sleep(i * 2 + 10);
LOG("-------------------------wakeup %ld\n", i);
thread_end();
}
static coresult_t main_thread_6(ccb_t *ccb)
{
unsigned int tick;
thread_start();
while (1)
{
if (mq != NULL)
{
struct user_msg msg;
mq_receive(mq, (char *)&msg, sizeof(msg), NULL);
LOG("%s receive msg:%d, %s.\n", __FUNCTION__, msg.count, msg.buff);
free(msg.buff);
}
/* 构造 30% CPU使用率,并且不释放CPU,可以被调度器检测到 */
tick = get_system_tick() + 300;
while (!time_before(tick, get_system_tick()));
os_show_stat();
thread_sleep(1700);
long i = 0;
while (thread_create(main_thread_6_1, (void *)i, THREAD_PRIO_HIGH) != NULL)
i++;
}
thread_end();
}
#define INTERRUPT_BUFF_SIZE (128)
static int interrupt_buff[INTERRUPT_BUFF_SIZE];
static int interrupt_buff_head = 0;
static int interrupt_buff_tail = 0;
static int interrupt_data = 0;
static void interrupt_income_data(void)
{
int i = 0;
int head_next = interrupt_buff_head + 1;;
for (i = 0; i < 80; i++)
{
if (head_next >= INTERRUPT_BUFF_SIZE)
head_next = 0;
if (head_next == interrupt_buff_tail)
break;
interrupt_buff[interrupt_buff_head] = interrupt_data++;
interrupt_buff_head = head_next++;
}
}
static void interrupt_process_data(void)
{
int i = 0;
int tail_tmp = interrupt_buff_tail;
LOG("interrupt_process_data.\n");
while (interrupt_buff_head != tail_tmp)
{
LOG("%5x ", interrupt_buff[tail_tmp]);
if (++tail_tmp >= INTERRUPT_BUFF_SIZE)
tail_tmp = 0;
interrupt_buff_tail = tail_tmp;
i++;
if (i == 16)
{
i = 0;
LOG("\n");
}
else if (i == 8)
{
LOG(" ");
}
}
LOG("\n");
}
/* 这个线程是中断处理线程,在中断函数中建立 */
static coresult_t main_thread_7(ccb_t *ccb)
{
thread_start();
interrupt_process_data();
thread_end();
}
/* 这个线程在调度器1上面运行 */
static coresult_t main_thread_8(ccb_t *ccb)
{
thread_start();
while (1)
{
thread_sleep(3000);
LOG("[%s] run on scheduler %p.\n", __FUNCTION__, ccb->scheduler);
}
thread_end();
}
int thread_test(int argc,char **argv)
{
thread_create(main_thread_1, NULL, THREAD_PRIO_HIGH);
thread_create(main_thread_2, NULL, THREAD_PRIO_HIGH);
ccb5 = thread_create(main_thread_5, NULL, THREAD_PRIO_HIGH);
thread_create(main_thread_6, NULL, THREAD_PRIO_HIGH);
/* 建立新的调度器 */
struct cothread_scheduler *scheduler = alloc_scheduler(5, 3, NULL);
thread_create_on_scheduler(scheduler, main_thread_8, NULL, THREAD_PRIO_HIGH);
return 0;
}
/* demo 需要提供给 rtos 的接口函数 */
/*-----------------------------------------------------------------------*/
static int io_inited = 0;
static int input_arrived = 0;
static char ch;
static irq_state_t interrupt_disabled = 0;
static int tx_inprogress = 0;
static char write_buff[1024];
static int write_head = 0;
static int write_tail = 0;
static void uart_irq_handler(void)
{
if (read_flag())
wakeup_shell_thread();
if (write_completed())
wakeup_log_thread();
}
static void ll_read(void)
{
if (input_arrived)
return;
input_arrived = fread(&ch, 1, 1, stdin);
if (input_arrived)
uart_irq_handler();
}
static void ll_write(void)
{
int count = 0;
int write_head_tmp = write_head;
if (write_tail == write_head_tmp)
return;
tx_inprogress = 1;
/* 控制每次发送字节数,模拟串口发送阻塞,波特率 = bytes * 8 * 1000 */
int bytes = 10;
while (write_tail != write_head_tmp && count < bytes)
{
printf("%c", write_buff[write_tail]);
fflush(stdout);
if (++write_tail >= sizeof(write_buff))
write_tail = 0;
count++;
}
if (write_tail != write_head_tmp)
return;
tx_inprogress = 0;
uart_irq_handler();
}
static void io_init(void)
{
int flags = fcntl(0, F_GETFL, 0);
fcntl(0, F_SETFL, flags | O_NONBLOCK);
io_inited = 1;
}
int irq_log_enable = 0;
int enable_irq_log(int argc, char **argv)
{
if (argc < 2)
return - 1;
if (strcmp(argv[1], "on") == 0)
irq_log_enable = 1;
else if (strcmp(argv[1], "off") == 0)
irq_log_enable = 0;
return 0;
}
static int timer_count = 0;
static void timer_thread(int signo)
{
signal(SIGALRM, timer_thread);
if (interrupt_disabled)
return;
timer_count++;
/* 模拟串口输入中断 */
ll_read();
/* 模拟串口发送 */
ll_write();
system_tick();
if (irq_log_enable)
LOG("log from interrupt.\n");
/* 模拟中断数据到来,建立处理线程 */
if (timer_count % 5000 == 1)
{
interrupt_income_data();
thread_create(main_thread_7, NULL, THREAD_PRIO_HIGH);
}
}
static void hardware_init(void)
{
io_init();
}
int read_flag(void)
{
if (!io_inited)
return 0;
return input_arrived;
}
int read_data(void)
{
int ch_tmp = ch;
input_arrived = 0;
return ch_tmp;
}
int write_completed(void)
{
if (!io_inited)
return 0;
return !tx_inprogress;
}
void write_ch(char ch)
{
int write_head_tmp;
if (!io_inited)
return;
write_head_tmp = write_head + 1;
if (write_head_tmp >= sizeof(write_buff))
write_head_tmp = 0;
/* 满 */
if (write_tail == write_head_tmp)
return;
write_buff[write_head] = ch;
write_head = write_head_tmp;
}
irq_state_t irq_save_disable(void)
{
irq_state_t stat = interrupt_disabled;
interrupt_disabled = 1;
return stat;
}
void irq_restore(irq_state_t stat)
{
interrupt_disabled = stat;
}
void cothread_scheduler_wakeup(void)
{
}
void system_timer_start(void)
{
struct itimerval value, ovalue;
signal(SIGALRM, timer_thread);
value.it_value.tv_sec = 0;
value.it_value.tv_usec = 1000;
value.it_interval.tv_sec = 0;
value.it_interval.tv_usec = 1000;
setitimer(ITIMER_REAL, &value, &ovalue);
}
/*-----------------------------------------------------------------------*/
int main(void)
{
os_init();
hardware_init();
create_shell_thread();
create_log_thread();
os_start();
return 0;
}
C
1
https://gitee.com/qinpan/cothread2.git
git@gitee.com:qinpan/cothread2.git
qinpan
cothread2
cothread2
master

搜索帮助