代码拉取完成,页面将自动刷新
同步操作将从 wangsl_sl/InitQ 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
- .net core版本:2.1
- redis版本:3.0以上
1.通过注解的方式,订阅队列
2.可以设置消费消息的频次
3.支持消息广播
4.1.0.0.7版本新增延迟队列支持
分布式环境,redis消息队列
1.获取initQ包 方案A. install-package InitQ 方案B. nuget包管理工具搜索 InitQ
2.添加中间件(该中间件依赖StackExchange.Redis)
services.AddInitQ(m=>
{
m.SuspendTime = 1000;
m.ConnectionString = "47.104.247.70,password=admin";
m.ListSubscribe = new List<Type>() { typeof(RedisSubscribeA), typeof(RedisSubscribeB) };
m.ShowLog = false;
});
using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
{
//redis对象
var _redis = scope.ServiceProvider.GetService<ICacheService>();
//循环向 tibos_test_1 队列发送消息
for (int i = 0; i < 1000; i++)
{
await _redis.ListRightPushAsync("tibos_test_1", $"我是消息{i + 1}号");
}
}
public class RedisSubscribeA: IRedisSubscribe
{
[Subscribe("tibos_test_1")]
private async Task SubRedisTest(string msg)
{
Console.WriteLine($"A类--->订阅者A消息消息:{msg}");
}
[Subscribe("tibos_test_1")]
private async Task SubRedisTest1(string msg)
{
Console.WriteLine($"A类--->订阅者A1消息消息:{msg}");
}
[Subscribe("tibos_test_1")]
private async Task SubRedisTest2(string msg)
{
Console.WriteLine($"A类--->订阅者A2消息消息:{msg}");
}
[Subscribe("tibos_test_1")]
private async Task SubRedisTest3(string msg)
{
Console.WriteLine($"A类--->订阅者A3消息消息:{msg}");
}
}
public class RedisSubscribeB : IRedisSubscribe
{
/// <summary>
/// 测试
/// </summary>
/// <param name="msg"></param>
/// <returns></returns>
[Subscribe("tibos_test_1")]
private async Task SubRedisTest(string msg)
{
Console.WriteLine($"B类--->订阅者B消费消息:{msg}");
}
}
public class ChannelSubscribeA : IHostedService, IDisposable
{
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
public ChannelSubscribeA(ILogger<TestMain> logger, IServiceProvider provider)
{
_logger = logger;
_provider = provider;
}
public void Dispose()
{
_logger.LogInformation("退出");
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("程序启动");
Task.Run(async () =>
{
using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
{
//redis对象
var _redis = scope.ServiceProvider.GetService<ICacheService>();
await _redis.SubscribeAsync("test_channel", new Action<RedisChannel, RedisValue>((channel, message) =>
{
Console.WriteLine("test_channel" + " 订阅服务A收到消息:" + message);
}));
}
});
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("结束");
return Task.CompletedTask;
}
}
public class ChannelSubscribeB : IHostedService, IDisposable
{
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
public ChannelSubscribeB(ILogger<TestMain> logger, IServiceProvider provider)
{
_logger = logger;
_provider = provider;
}
public void Dispose()
{
_logger.LogInformation("退出");
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("程序启动");
Task.Run(async () =>
{
using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
{
//redis对象
var _redis = scope.ServiceProvider.GetService<ICacheService>();
await _redis.SubscribeAsync("test_channel", new Action<RedisChannel, RedisValue>((channel, message) =>
{
Console.WriteLine("test_channel" + " 订阅服务B收到消息:" + message);
}));
}
});
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("结束");
return Task.CompletedTask;
}
}
services.AddHostedService<ChannelSubscribeA>();
services.AddHostedService<ChannelSubscribeB>();
using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
{
//redis对象
var _redis = scope.ServiceProvider.GetService<ICacheService>();
for (int i = 0; i < 1000; i++)
{
await _redis.PublishAsync("test_channel", $"往通道发送第{i}条消息");
}
}
Task.Run(async () =>
{
using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
{
//redis对象
var _redis = scope.ServiceProvider.GetService<ICacheService>();
for (int i = 0; i < 100; i++)
{
var dt = DateTime.Now.AddSeconds(3 * (i + 1));
//key:redis里的key,唯一
//msg:任务
//time:延时执行的时间
await _redis.SortedSetAddAsync("test_0625", $"延迟任务,第{i + 1}个元素,执行时间:{dt.ToString("yyyy-MM-dd HH:mm:ss")}", dt);
}
}
});
//延迟队列
[SubscribeDelay("test_0625")]
private async Task SubRedisTest1(string msg)
{
Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者延迟队列消息开始--->{msg}");
//模拟任务执行耗时
var m = new Random().Next(1,10);
await Task.Delay(TimeSpan.FromSeconds(m));
Console.WriteLine($"A类--->{msg} 结束<---");
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。