1 Star 0 Fork 22

冲击 / gnet

forked from Gitee 极速下载 / gnet 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
engine_windows.go 3.86 KB
一键复制 编辑 原始数据 按行查看 历史
andypan 提交于 2023-05-16 18:42 . chore: update copyright info
// Copyright (c) 2023 The Gnet Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gnet
import (
"context"
"runtime"
"sync/atomic"
"golang.org/x/sync/errgroup"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
)
type engine struct {
ln *listener
lb loadBalancer // event-loops for handling events
opts *Options // options with engine
ticker struct {
ctx context.Context
cancel context.CancelFunc
}
inShutdown int32 // whether the engine is in shutdown
workerPool struct {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
}
eventHandler EventHandler // user eventHandler
}
func (eng *engine) isInShutdown() bool {
return atomic.LoadInt32(&eng.inShutdown) == 1
}
// shutdown signals the engine to shut down.
func (eng *engine) shutdown(err error) {
if err != nil && err != errorx.ErrEngineShutdown {
eng.opts.Logger.Errorf("engine is being shutdown with error: %v", err)
}
eng.workerPool.shutdown()
}
func (eng *engine) start(numEventLoop int) error {
for i := 0; i < numEventLoop; i++ {
el := eventloop{
ch: make(chan interface{}, 1024),
idx: i,
eng: eng,
connections: make(map[*conn]struct{}),
eventHandler: eng.eventHandler,
}
eng.lb.register(&el)
eng.workerPool.Go(el.run)
if i == 0 && eng.opts.Ticker {
eng.workerPool.Go(func() error {
el.ticker(eng.ticker.ctx)
return nil
})
}
}
eng.workerPool.Go(eng.listen)
return nil
}
func (eng *engine) stop(engine Engine) error {
<-eng.workerPool.shutdownCtx.Done()
eng.opts.Logger.Infof("engine is being shutdown...")
eng.eventHandler.OnShutdown(engine)
eng.ln.close()
eng.lb.iterate(func(i int, el *eventloop) bool {
el.ch <- errorx.ErrEngineShutdown
return true
})
if eng.ticker.cancel != nil {
eng.ticker.cancel()
}
if err := eng.workerPool.Wait(); err != nil {
eng.opts.Logger.Errorf("engine shutdown error: %v", err)
}
atomic.StoreInt32(&eng.inShutdown, 1)
return nil
}
func run(eventHandler EventHandler, listener *listener, options *Options, protoAddr string) error {
// Figure out the proper number of event-loops/goroutines to run.
numEventLoop := 1
if options.Multicore {
numEventLoop = runtime.NumCPU()
}
if options.NumEventLoop > 0 {
numEventLoop = options.NumEventLoop
}
shutdownCtx, shutdown := context.WithCancel(context.Background())
eng := engine{
opts: options,
eventHandler: eventHandler,
ln: listener,
workerPool: struct {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
}{&errgroup.Group{}, shutdownCtx, shutdown},
}
switch options.LB {
case RoundRobin:
eng.lb = new(roundRobinLoadBalancer)
case LeastConnections:
eng.lb = new(leastConnectionsLoadBalancer)
case SourceAddrHash:
eng.lb = new(sourceAddrHashLoadBalancer)
}
if options.Ticker {
eng.ticker.ctx, eng.ticker.cancel = context.WithCancel(context.Background())
}
engine := Engine{eng: &eng}
switch eventHandler.OnBoot(engine) {
case None:
case Shutdown:
return nil
}
if err := eng.start(numEventLoop); err != nil {
eng.opts.Logger.Errorf("gnet engine is stopping with error: %v", err)
return err
}
defer eng.stop(engine) //nolint:errcheck
allEngines.Store(protoAddr, &eng)
return nil
}
/*
func (eng *engine) sendCmd(_ *asyncCmd, _ bool) error {
return errorx.ErrUnsupportedOp
}
*/
Shell
1
https://gitee.com/1273640670/gnet.git
git@gitee.com:1273640670/gnet.git
1273640670
gnet
gnet
dev

搜索帮助