1 Star 1 Fork 0

GUANGYU WANG / amqp

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
confirms_test.go 2.78 KB
一键复制 编辑 原始数据 按行查看 历史
Gerhard Lazu 提交于 2017-11-01 22:19 . Format go source with simplify flag
package amqp
import (
"testing"
"time"
)
func TestConfirmOneResequences(t *testing.T) {
var (
fixtures = []Confirmation{
{1, true},
{2, false},
{3, true},
}
c = newConfirms()
l = make(chan Confirmation, len(fixtures))
)
c.Listen(l)
for i := range fixtures {
if want, got := uint64(i+1), c.Publish(); want != got {
t.Fatalf("expected publish to return the 1 based delivery tag published, want: %d, got: %d", want, got)
}
}
c.One(fixtures[1])
c.One(fixtures[2])
select {
case confirm := <-l:
t.Fatalf("expected to wait in order to properly resequence results, got: %+v", confirm)
default:
}
c.One(fixtures[0])
for i, fix := range fixtures {
if want, got := fix, <-l; want != got {
t.Fatalf("expected to return confirmations in sequence for %d, want: %+v, got: %+v", i, want, got)
}
}
}
func TestConfirmMixedResequences(t *testing.T) {
var (
fixtures = []Confirmation{
{1, true},
{2, true},
{3, true},
}
c = newConfirms()
l = make(chan Confirmation, len(fixtures))
)
c.Listen(l)
for range fixtures {
c.Publish()
}
c.One(fixtures[0])
c.One(fixtures[2])
c.Multiple(fixtures[1])
for i, fix := range fixtures {
want := fix
var got Confirmation
select {
case got = <-l:
case <-time.After(1 * time.Second):
t.Fatalf("timeout on reading confirmations")
}
if want != got {
t.Fatalf("expected to confirm in sequence for %d, want: %+v, got: %+v", i, want, got)
}
}
}
func TestConfirmMultipleResequences(t *testing.T) {
var (
fixtures = []Confirmation{
{1, true},
{2, true},
{3, true},
{4, true},
}
c = newConfirms()
l = make(chan Confirmation, len(fixtures))
)
c.Listen(l)
for range fixtures {
c.Publish()
}
c.Multiple(fixtures[len(fixtures)-1])
for i, fix := range fixtures {
if want, got := fix, <-l; want != got {
t.Fatalf("expected to confirm multiple in sequence for %d, want: %+v, got: %+v", i, want, got)
}
}
}
func BenchmarkSequentialBufferedConfirms(t *testing.B) {
var (
c = newConfirms()
l = make(chan Confirmation, 10)
)
c.Listen(l)
for i := 0; i < t.N; i++ {
if i > cap(l)-1 {
<-l
}
c.One(Confirmation{c.Publish(), true})
}
}
func TestConfirmsIsThreadSafe(t *testing.T) {
const count = 1000
const timeout = 5 * time.Second
var (
c = newConfirms()
l = make(chan Confirmation)
pub = make(chan Confirmation)
done = make(chan Confirmation)
late = time.After(timeout)
)
c.Listen(l)
for i := 0; i < count; i++ {
go func() { pub <- Confirmation{c.Publish(), true} }()
}
for i := 0; i < count; i++ {
go func() { c.One(<-pub) }()
}
for i := 0; i < count; i++ {
go func() { done <- <-l }()
}
for i := 0; i < count; i++ {
select {
case <-done:
case <-late:
t.Fatalf("expected all publish/confirms to finish after %s", timeout)
}
}
}
Go
1
https://gitee.com/macheals/amqp.git
git@gitee.com:macheals/amqp.git
macheals
amqp
amqp
master

搜索帮助