1 Star 2 Fork 3

林中白狼 / Infrastructure

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
AsynQueue.cs 5.06 KB
一键复制 编辑 原始数据 按行查看 历史
林中白狼 提交于 2019-01-07 21:25 . No commit message
public class AsynQueue<T>
{
//队列是否正在处理数据
private int isProcessing;
//有线程正在处理数据
private const int Processing = 1;
//没有线程处理数据
private const int UnProcessing = 0;
//队列是否可用
private volatile bool enabled = true;
private Task currentTask;
public event Action<T> ProcessItemFunction;
public event EventHandler<EventArgs<Exception>> ProcessException;
private ConcurrentQueue<T> queue;
public AsynQueue()
{
queue = new ConcurrentQueue<T>();
Start();
}
public int Count
{
get
{
return queue.Count;
}
}
private void Start()
{
Thread process_Thread = new Thread(PorcessItem);
process_Thread.IsBackground = true;
process_Thread.Start();
}
public void Enqueue(T items)
{
if (items == null)
{
throw new ArgumentException("items");
}
queue.Enqueue(items);
DataAdded();
}
//数据添加完成后通知消费者线程处理
private void DataAdded()
{
if (enabled)
{
if (!IsProcessingItem())
{
currentTask = Task.Factory.StartNew(ProcessItemLoop);
}
}
}
//判断是否队列有线程正在处理
private bool IsProcessingItem()
{
return !(Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0);
}
private void ProcessItemLoop()
{
if (!enabled && queue.IsEmpty)
{
Interlocked.Exchange(ref isProcessing, 0);
return;
}
//处理的线程数 是否小于当前最大任务数
//if (Thread.VolatileRead(ref runingCore) <= this.MaxTaskCount)
//{
T publishFrame;
if (queue.TryDequeue(out publishFrame))
{
try
{
ProcessItemFunction(publishFrame);
}
catch (Exception ex)
{
OnProcessException(ex);
}
}
if (enabled && !queue.IsEmpty)
{
currentTask = Task.Factory.StartNew(ProcessItemLoop);
}
else
{
Interlocked.Exchange(ref isProcessing, UnProcessing);
}
}
/// <summary>
///定时处理线程调用函数
///主要是监视入队的时候线程 没有来的及处理的情况
/// </summary>
private void PorcessItem(object state)
{
int sleepCount = 0;
int sleepTime = 1000;
while (enabled)
{
//如果队列为空则根据循环的次数确定睡眠的时间
if (queue.IsEmpty)
{
if (sleepCount == 0)
{
sleepTime = 1000;
}
else if (sleepCount <= 3)
{
sleepTime = 1000 * 3;
}
else
{
sleepTime = 1000 * 50;
}
sleepCount++;
Thread.Sleep(sleepTime);
}
else
{
//判断是否队列有线程正在处理
if (enabled && Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0)
{
if (!queue.IsEmpty)
{
currentTask = Task.Factory.StartNew(ProcessItemLoop);
}
else
{
Interlocked.Exchange(ref isProcessing, 0);
}
sleepCount = 0;
sleepTime = 1000;
}
}
}
}
public void Flsuh()
{
Stop();
if (currentTask != null)
{
currentTask.Wait();
}
while (!queue.IsEmpty)
{
try
{
T publishFrame;
if (queue.TryDequeue(out publishFrame))
{
ProcessItemFunction(publishFrame);
}
}
catch (Exception ex)
{
OnProcessException(ex);
}
}
currentTask = null;
}
public void Stop()
{
this.enabled = false;
}
private void OnProcessException(System.Exception ex)
{
var tempException = ProcessException;
Interlocked.CompareExchange(ref ProcessException, null, null);
if (tempException != null)
{
ProcessException(ex, new EventArgs<Exception>(ex));
}
}
}
[Serializable]
public class EventArgs<T> : System.EventArgs
{
public T Argument;
public EventArgs()
: this(default(T))
{
}
public EventArgs(T argument)
{
Argument = argument;
}
}
C#
1
https://gitee.com/fq_chenzhen/Infrastructure.git
git@gitee.com:fq_chenzhen/Infrastructure.git
fq_chenzhen
Infrastructure
Infrastructure
master

搜索帮助

53164aa7 5694891 3bd8fe86 5694891