1 Star 0 Fork 288

powerdream / go-fastdfs

forked from sjqzhang / go-fastdfs 
Create your Gitee Account
Explore and code with more than 6 million developers,Free private repositories !:)
Sign up
Clone or download
fileserver.go 123.34 KB
Copy Edit Web IDE Raw Blame History
sjqzhang authored 2020-07-03 11:45 . add status check peer_id host
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440
package main
import (
"bufio"
"bytes"
"errors"
"flag"
"fmt"
"image"
"image/jpeg"
"image/png"
"io"
"io/ioutil"
slog "log"
random "math/rand"
"mime/multipart"
"net"
"net/http"
_ "net/http/pprof"
"net/smtp"
"net/url"
"os"
"os/signal"
"path"
"path/filepath"
"regexp"
"runtime"
"runtime/debug"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"unsafe"
"github.com/astaxie/beego/httplib"
mapset "github.com/deckarep/golang-set"
_ "github.com/eventials/go-tus"
jsoniter "github.com/json-iterator/go"
"github.com/nfnt/resize"
"github.com/radovskyb/watcher"
"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/mem"
"github.com/sjqzhang/googleAuthenticator"
"github.com/sjqzhang/goutil"
log "github.com/sjqzhang/seelog"
"github.com/sjqzhang/tusd"
"github.com/sjqzhang/tusd/filestore"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
var staticHandler http.Handler
var json = jsoniter.ConfigCompatibleWithStandardLibrary
var server *Server = nil
var logacc log.LoggerInterface
var FOLDERS = []string{DATA_DIR, STORE_DIR, CONF_DIR, STATIC_DIR}
var CONST_QUEUE_SIZE = 10000
var (
VERSION string
BUILD_TIME string
GO_VERSION string
GIT_VERSION string
v = flag.Bool("v", false, "display version")
)
var (
FileName string
ptr unsafe.Pointer
DOCKER_DIR = ""
STORE_DIR = STORE_DIR_NAME
CONF_DIR = CONF_DIR_NAME
LOG_DIR = LOG_DIR_NAME
DATA_DIR = DATA_DIR_NAME
STATIC_DIR = STATIC_DIR_NAME
LARGE_DIR_NAME = "haystack"
LARGE_DIR = STORE_DIR + "/haystack"
CONST_LEVELDB_FILE_NAME = DATA_DIR + "/fileserver.db"
CONST_LOG_LEVELDB_FILE_NAME = DATA_DIR + "/log.db"
CONST_STAT_FILE_NAME = DATA_DIR + "/stat.json"
CONST_CONF_FILE_NAME = CONF_DIR + "/cfg.json"
CONST_SERVER_CRT_FILE_NAME = CONF_DIR + "/server.crt"
CONST_SERVER_KEY_FILE_NAME = CONF_DIR + "/server.key"
CONST_SEARCH_FILE_NAME = DATA_DIR + "/search.txt"
CONST_UPLOAD_COUNTER_KEY = "__CONST_UPLOAD_COUNTER_KEY__"
logConfigStr = `
<seelog type="asynctimer" asyncinterval="1000" minlevel="trace" maxlevel="error">
<outputs formatid="common">
<buffered formatid="common" size="1048576" flushperiod="1000">
<rollingfile type="size" filename="{DOCKER_DIR}log/fileserver.log" maxsize="104857600" maxrolls="10"/>
</buffered>
</outputs>
<formats>
<format id="common" format="%Date %Time [%LEV] [%File:%Line] [%Func] %Msg%n" />
</formats>
</seelog>
`
logAccessConfigStr = `
<seelog type="asynctimer" asyncinterval="1000" minlevel="trace" maxlevel="error">
<outputs formatid="common">
<buffered formatid="common" size="1048576" flushperiod="1000">
<rollingfile type="size" filename="{DOCKER_DIR}log/access.log" maxsize="104857600" maxrolls="10"/>
</buffered>
</outputs>
<formats>
<format id="common" format="%Date %Time [%LEV] [%File:%Line] [%Func] %Msg%n" />
</formats>
</seelog>
`
)
const (
STORE_DIR_NAME = "files"
LOG_DIR_NAME = "log"
DATA_DIR_NAME = "data"
CONF_DIR_NAME = "conf"
STATIC_DIR_NAME = "static"
CONST_STAT_FILE_COUNT_KEY = "fileCount"
CONST_BIG_UPLOAD_PATH_SUFFIX = "/big/upload/"
CONST_STAT_FILE_TOTAL_SIZE_KEY = "totalSize"
CONST_Md5_ERROR_FILE_NAME = "errors.md5"
CONST_Md5_QUEUE_FILE_NAME = "queue.md5"
CONST_FILE_Md5_FILE_NAME = "files.md5"
CONST_REMOME_Md5_FILE_NAME = "removes.md5"
CONST_SMALL_FILE_SIZE = 1024 * 1024
CONST_MESSAGE_CLUSTER_IP = "Can only be called by the cluster ip or 127.0.0.1 or admin_ips(cfg.json),current ip:%s"
cfgJson = `{
"绑定端号": "端口",
"addr": ":8080",
"是否开启https": "默认不开启,如需启开启,请在conf目录中增加证书文件 server.crt 私钥 文件 server.key",
"enable_https": false,
"PeerID": "集群内唯一,请使用0-9的单字符,默认自动生成",
"peer_id": "%s",
"本主机地址": "本机http地址,默认自动生成(注意端口必须与addr中的端口一致),必段为内网,自动生成不为内网请自行修改,下同",
"host": "%s",
"集群": "集群列表,注意为了高可用,IP必须不能是同一个,同一不会自动备份,且不能为127.0.0.1,且必须为内网IP,默认自动生成",
"peers": ["%s"],
"组号": "用于区别不同的集群(上传或下载)与support_group_manage配合使用,带在下载路径中",
"group": "group1",
"是否支持按组(集群)管理,主要用途是Nginx支持多集群": "默认支持,不支持时路径为http://10.1.5.4:8080/action,支持时为http://10.1.5.4:8080/group(配置中的group参数)/action,action为动作名,如status,delete,sync等",
"support_group_manage": true,
"是否合并小文件": "默认不合并,合并可以解决inode不够用的情况(当前对于小于1M文件)进行合并",
"enable_merge_small_file": false,
"允许后缀名": "允许可以上传的文件后缀名,如jpg,jpeg,png等。留空允许所有。",
"extensions": [],
"重试同步失败文件的时间": "单位秒",
"refresh_interval": 1800,
"是否自动重命名": "默认不自动重命名,使用原文件名",
"rename_file": false,
"是否支持web上传,方便调试": "默认支持web上传",
"enable_web_upload": true,
"是否支持非日期路径": "默认支持非日期路径,也即支持自定义路径,需要上传文件时指定path",
"enable_custom_path": true,
"下载域名": "用于外网下载文件的域名,不包含http://",
"download_domain": "",
"场景列表": "当设定后,用户指的场景必项在列表中,默认不做限制(注意:如果想开启场景认功能,格式如下:'场景名:googleauth_secret' 如 default:N7IET373HB2C5M6D ",
"scenes": [],
"默认场景": "默认default",
"default_scene": "default",
"是否显示目录": "默认显示,方便调试用,上线时请关闭",
"show_dir": true,
"邮件配置": "",
"mail": {
"user": "abc@163.com",
"password": "abc",
"host": "smtp.163.com:25"
},
"告警接收邮件列表": "接收人数组",
"alarm_receivers": [],
"告警接收URL": "方法post,参数:subject,message",
"alarm_url": "",
"下载是否需带token": "真假",
"download_use_token": false,
"下载token过期时间": "单位秒",
"download_token_expire": 600,
"是否自动修复": "在超过1亿文件时出现性能问题,取消此选项,请手动按天同步,请查看FAQ",
"auto_repair": true,
"文件去重算法md5可能存在冲突,默认md5": "sha1|md5",
"file_sum_arithmetic": "md5",
"管理ip列表": "用于管理集的ip白名单,",
"admin_ips": ["127.0.0.1"],
"是否启用迁移": "默认不启用",
"enable_migrate": false,
"文件是否去重": "默认去重",
"enable_distinct_file": true,
"是否开启跨站访问": "默认开启",
"enable_cross_origin": true,
"是否开启Google认证,实现安全的上传、下载": "默认不开启",
"enable_google_auth": false,
"认证url": "当url不为空时生效,注意:普通上传中使用http参数 auth_token 作为认证参数, 在断点续传中通过HTTP头Upload-Metadata中的auth_token作为认证参数,认证流程参考认证架构图",
"auth_url": "",
"下载是否认证": "默认不认证(注意此选项是在auth_url不为空的情况下生效)",
"enable_download_auth": false,
"默认是否下载": "默认下载",
"default_download": true,
"本机是否只读": "默认可读可写",
"read_only": false,
"是否开启断点续传": "默认开启",
"enable_tus": true,
"同步单一文件超时时间(单位秒)": "默认为0,程序自动计算,在特殊情况下,自已设定",
"sync_timeout": 0
}
`
)
type Server struct {
ldb *leveldb.DB
logDB *leveldb.DB
util *goutil.Common
statMap *goutil.CommonMap
sumMap *goutil.CommonMap
rtMap *goutil.CommonMap
queueToPeers chan FileInfo
queueFromPeers chan FileInfo
queueFileLog chan *FileLog
queueUpload chan WrapReqResp
lockMap *goutil.CommonMap
sceneMap *goutil.CommonMap
searchMap *goutil.CommonMap
curDate string
host string
}
type FileInfo struct {
Name string `json:"name"`
ReName string `json:"rename"`
Path string `json:"path"`
Md5 string `json:"md5"`
Size int64 `json:"size"`
Peers []string `json:"peers"`
Scene string `json:"scene"`
TimeStamp int64 `json:"timeStamp"`
OffSet int64 `json:"offset"`
retry int
op string
}
type FileLog struct {
FileInfo *FileInfo
FileName string
}
type WrapReqResp struct {
w *http.ResponseWriter
r *http.Request
done chan bool
}
type JsonResult struct {
Message string `json:"message"`
Status string `json:"status"`
Data interface{} `json:"data"`
}
type FileResult struct {
Url string `json:"url"`
Md5 string `json:"md5"`
Path string `json:"path"`
Domain string `json:"domain"`
Scene string `json:"scene"`
Size int64 `json:"size"`
ModTime int64 `json:"mtime"`
//Just for Compatibility
Scenes string `json:"scenes"`
Retmsg string `json:"retmsg"`
Retcode int `json:"retcode"`
Src string `json:"src"`
}
type Mail struct {
User string `json:"user"`
Password string `json:"password"`
Host string `json:"host"`
}
type StatDateFileInfo struct {
Date string `json:"date"`
TotalSize int64 `json:"totalSize"`
FileCount int64 `json:"fileCount"`
}
type GloablConfig struct {
Addr string `json:"addr"`
Peers []string `json:"peers"`
EnableHttps bool `json:"enable_https"`
Group string `json:"group"`
RenameFile bool `json:"rename_file"`
ShowDir bool `json:"show_dir"`
Extensions []string `json:"extensions"`
RefreshInterval int `json:"refresh_interval"`
EnableWebUpload bool `json:"enable_web_upload"`
DownloadDomain string `json:"download_domain"`
EnableCustomPath bool `json:"enable_custom_path"`
Scenes []string `json:"scenes"`
AlarmReceivers []string `json:"alarm_receivers"`
DefaultScene string `json:"default_scene"`
Mail Mail `json:"mail"`
AlarmUrl string `json:"alarm_url"`
DownloadUseToken bool `json:"download_use_token"`
DownloadTokenExpire int `json:"download_token_expire"`
QueueSize int `json:"queue_size"`
AutoRepair bool `json:"auto_repair"`
Host string `json:"host"`
FileSumArithmetic string `json:"file_sum_arithmetic"`
PeerId string `json:"peer_id"`
SupportGroupManage bool `json:"support_group_manage"`
AdminIps []string `json:"admin_ips"`
EnableMergeSmallFile bool `json:"enable_merge_small_file"`
EnableMigrate bool `json:"enable_migrate"`
EnableDistinctFile bool `json:"enable_distinct_file"`
ReadOnly bool `json:"read_only"`
EnableCrossOrigin bool `json:"enable_cross_origin"`
EnableGoogleAuth bool `json:"enable_google_auth"`
AuthUrl string `json:"auth_url"`
EnableDownloadAuth bool `json:"enable_download_auth"`
DefaultDownload bool `json:"default_download"`
EnableTus bool `json:"enable_tus"`
SyncTimeout int64 `json:"sync_timeout"`
EnableFsnotify bool `json:"enable_fsnotify"`
EnableDiskCache bool `json:"enable_disk_cache"`
ConnectTimeout bool `json:"connect_timeout"`
ReadTimeout int `json:"read_timeout"`
WriteTimeout int `json:"write_timeout"`
IdleTimeout int `json:"idle_timeout"`
ReadHeaderTimeout int `json:"read_header_timeout"`
SyncWorker int `json:"sync_worker"`
UploadWorker int `json:"upload_worker"`
UploadQueueSize int `json:"upload_queue_size"`
RetryCount int `json:"retry_count"`
SyncDelay int64 `json:"sync_delay"`
WatchChanSize int `json:"watch_chan_size"`
}
type FileInfoResult struct {
Name string `json:"name"`
Md5 string `json:"md5"`
Path string `json:"path"`
Size int64 `json:"size"`
ModTime int64 `json:"mtime"`
IsDir bool `json:"is_dir"`
}
func NewServer() *Server {
var (
//server *Server
err error
)
if server != nil {
return server
}
server = &Server{
util: &goutil.Common{},
statMap: goutil.NewCommonMap(0),
lockMap: goutil.NewCommonMap(0),
rtMap: goutil.NewCommonMap(0),
sceneMap: goutil.NewCommonMap(0),
searchMap: goutil.NewCommonMap(0),
queueToPeers: make(chan FileInfo, CONST_QUEUE_SIZE),
queueFromPeers: make(chan FileInfo, CONST_QUEUE_SIZE),
queueFileLog: make(chan *FileLog, CONST_QUEUE_SIZE),
queueUpload: make(chan WrapReqResp, 100),
sumMap: goutil.NewCommonMap(365 * 3),
}
defaultTransport := &http.Transport{
DisableKeepAlives: true,
Dial: httplib.TimeoutDialer(time.Second*15, time.Second*300),
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
}
settins := httplib.BeegoHTTPSettings{
UserAgent: "Go-FastDFS",
ConnectTimeout: 15 * time.Second,
ReadWriteTimeout: 15 * time.Second,
Gzip: true,
DumpBody: true,
Transport: defaultTransport,
}
httplib.SetDefaultSetting(settins)
server.statMap.Put(CONST_STAT_FILE_COUNT_KEY, int64(0))
server.statMap.Put(CONST_STAT_FILE_TOTAL_SIZE_KEY, int64(0))
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_COUNT_KEY, int64(0))
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, int64(0))
server.curDate = server.util.GetToDay()
opts := &opt.Options{
CompactionTableSize: 1024 * 1024 * 20,
WriteBuffer: 1024 * 1024 * 20,
}
server.ldb, err = leveldb.OpenFile(CONST_LEVELDB_FILE_NAME, opts)
if err != nil {
fmt.Println(fmt.Sprintf("open db file %s fail,maybe has opening", CONST_LEVELDB_FILE_NAME))
log.Error(err)
panic(err)
}
server.logDB, err = leveldb.OpenFile(CONST_LOG_LEVELDB_FILE_NAME, opts)
if err != nil {
fmt.Println(fmt.Sprintf("open db file %s fail,maybe has opening", CONST_LOG_LEVELDB_FILE_NAME))
log.Error(err)
panic(err)
}
return server
}
func Config() *GloablConfig {
return (*GloablConfig)(atomic.LoadPointer(&ptr))
}
func ParseConfig(filePath string) {
var (
data []byte
)
if filePath == "" {
data = []byte(strings.TrimSpace(cfgJson))
} else {
file, err := os.Open(filePath)
if err != nil {
panic(fmt.Sprintln("open file path:", filePath, "error:", err))
}
defer file.Close()
FileName = filePath
data, err = ioutil.ReadAll(file)
if err != nil {
panic(fmt.Sprintln("file path:", filePath, " read all error:", err))
}
}
var c GloablConfig
if err := json.Unmarshal(data, &c); err != nil {
panic(fmt.Sprintln("file path:", filePath, "json unmarshal error:", err))
}
log.Info(c)
atomic.StorePointer(&ptr, unsafe.Pointer(&c))
log.Info("config parse success")
}
func (this *Server) BackUpMetaDataByDate(date string) {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("BackUpMetaDataByDate")
log.Error(re)
log.Error(string(buffer))
}
}()
var (
err error
keyPrefix string
msg string
name string
fileInfo FileInfo
logFileName string
fileLog *os.File
fileMeta *os.File
metaFileName string
fi os.FileInfo
)
logFileName = DATA_DIR + "/" + date + "/" + CONST_FILE_Md5_FILE_NAME
this.lockMap.LockKey(logFileName)
defer this.lockMap.UnLockKey(logFileName)
metaFileName = DATA_DIR + "/" + date + "/" + "meta.data"
os.MkdirAll(DATA_DIR+"/"+date, 0775)
if this.util.IsExist(logFileName) {
os.Remove(logFileName)
}
if this.util.IsExist(metaFileName) {
os.Remove(metaFileName)
}
fileLog, err = os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
if err != nil {
log.Error(err)
return
}
defer fileLog.Close()
fileMeta, err = os.OpenFile(metaFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
if err != nil {
log.Error(err)
return
}
defer fileMeta.Close()
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, CONST_FILE_Md5_FILE_NAME)
iter := server.logDB.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
defer iter.Release()
for iter.Next() {
if err = json.Unmarshal(iter.Value(), &fileInfo); err != nil {
continue
}
name = fileInfo.Name
if fileInfo.ReName != "" {
name = fileInfo.ReName
}
msg = fmt.Sprintf("%s\t%s\n", fileInfo.Md5, string(iter.Value()))
if _, err = fileMeta.WriteString(msg); err != nil {
log.Error(err)
}
msg = fmt.Sprintf("%s\t%s\n", this.util.MD5(fileInfo.Path+"/"+name), string(iter.Value()))
if _, err = fileMeta.WriteString(msg); err != nil {
log.Error(err)
}
msg = fmt.Sprintf("%s|%d|%d|%s\n", fileInfo.Md5, fileInfo.Size, fileInfo.TimeStamp, fileInfo.Path+"/"+name)
if _, err = fileLog.WriteString(msg); err != nil {
log.Error(err)
}
}
if fi, err = fileLog.Stat(); err != nil {
log.Error(err)
} else if fi.Size() == 0 {
fileLog.Close()
os.Remove(logFileName)
}
if fi, err = fileMeta.Stat(); err != nil {
log.Error(err)
} else if fi.Size() == 0 {
fileMeta.Close()
os.Remove(metaFileName)
}
}
func (this *Server) RepairFileInfoFromFile() {
var (
pathPrefix string
err error
fi os.FileInfo
)
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("RepairFileInfoFromFile")
log.Error(re)
log.Error(string(buffer))
}
}()
if this.lockMap.IsLock("RepairFileInfoFromFile") {
log.Warn("Lock RepairFileInfoFromFile")
return
}
this.lockMap.LockKey("RepairFileInfoFromFile")
defer this.lockMap.UnLockKey("RepairFileInfoFromFile")
handlefunc := func(file_path string, f os.FileInfo, err error) error {
var (
files []os.FileInfo
fi os.FileInfo
fileInfo FileInfo
sum string
pathMd5 string
)
if f.IsDir() {
files, err = ioutil.ReadDir(file_path)
if err != nil {
return err
}
for _, fi = range files {
if fi.IsDir() || fi.Size() == 0 {
continue
}
file_path = strings.Replace(file_path, "\\", "/", -1)
if DOCKER_DIR != "" {
file_path = strings.Replace(file_path, DOCKER_DIR, "", 1)
}
if pathPrefix != "" {
file_path = strings.Replace(file_path, pathPrefix, STORE_DIR_NAME, 1)
}
if strings.HasPrefix(file_path, STORE_DIR_NAME+"/"+LARGE_DIR_NAME) {
log.Info(fmt.Sprintf("ignore small file file %s", file_path+"/"+fi.Name()))
continue
}
pathMd5 = this.util.MD5(file_path + "/" + fi.Name())
//if finfo, _ := this.GetFileInfoFromLevelDB(pathMd5); finfo != nil && finfo.Md5 != "" {
// log.Info(fmt.Sprintf("exist ignore file %s", file_path+"/"+fi.Name()))
// continue
//}
//sum, err = this.util.GetFileSumByName(file_path+"/"+fi.Name(), Config().FileSumArithmetic)
sum = pathMd5
if err != nil {
log.Error(err)
continue
}
fileInfo = FileInfo{
Size: fi.Size(),
Name: fi.Name(),
Path: file_path,
Md5: sum,
TimeStamp: fi.ModTime().Unix(),
Peers: []string{this.host},
OffSet: -2,
}
//log.Info(fileInfo)
log.Info(file_path, "/", fi.Name())
this.AppendToQueue(&fileInfo)
//this.postFileToPeer(&fileInfo)
this.SaveFileInfoToLevelDB(fileInfo.Md5, &fileInfo, this.ldb)
//this.SaveFileMd5Log(&fileInfo, CONST_FILE_Md5_FILE_NAME)
}
}
return nil
}
pathname := STORE_DIR
pathPrefix, err = os.Readlink(pathname)
if err == nil {
//link
pathname = pathPrefix
if strings.HasSuffix(pathPrefix, "/") {
//bugfix fullpath
pathPrefix = pathPrefix[0 : len(pathPrefix)-1]
}
}
fi, err = os.Stat(pathname)
if err != nil {
log.Error(err)
}
if fi.IsDir() {
filepath.Walk(pathname, handlefunc)
}
log.Info("RepairFileInfoFromFile is finish.")
}
func (this *Server) WatchFilesChange() {
var (
w *watcher.Watcher
//fileInfo FileInfo
curDir string
err error
qchan chan *FileInfo
isLink bool
)
qchan = make(chan *FileInfo, Config().WatchChanSize)
w = watcher.New()
w.FilterOps(watcher.Create)
//w.FilterOps(watcher.Create, watcher.Remove)
curDir, err = filepath.Abs(filepath.Dir(STORE_DIR_NAME))
if err != nil {
log.Error(err)
}
go func() {
for {
select {
case event := <-w.Event:
if event.IsDir() {
continue
}
fpath := strings.Replace(event.Path, curDir+string(os.PathSeparator), "", 1)
if isLink {
fpath = strings.Replace(event.Path, curDir, STORE_DIR_NAME, 1)
}
fpath = strings.Replace(fpath, string(os.PathSeparator), "/", -1)
sum := this.util.MD5(fpath)
fileInfo := FileInfo{
Size: event.Size(),
Name: event.Name(),
Path: strings.TrimSuffix(fpath, "/"+event.Name()), // files/default/20190927/xxx
Md5: sum,
TimeStamp: event.ModTime().Unix(),
Peers: []string{this.host},
OffSet: -2,
op: event.Op.String(),
}
log.Info(fmt.Sprintf("WatchFilesChange op:%s path:%s", event.Op.String(), fpath))
qchan <- &fileInfo
//this.AppendToQueue(&fileInfo)
case err := <-w.Error:
log.Error(err)
case <-w.Closed:
return
}
}
}()
go func() {
for {
c := <-qchan
if time.Now().Unix()-c.TimeStamp < Config().SyncDelay {
qchan <- c
time.Sleep(time.Second * 1)
continue
} else {
//if c.op == watcher.Remove.String() {
// req := httplib.Post(fmt.Sprintf("%s%s?md5=%s", this.host, this.getRequestURI("delete"), c.Md5))
// req.Param("md5", c.Md5)
// req.SetTimeout(time.Second*5, time.Second*10)
// log.Infof(req.String())
//}
if c.op == watcher.Create.String() {
log.Info(fmt.Sprintf("Syncfile Add to Queue path:%s", c.Path+"/"+c.Name))
this.AppendToQueue(c)
this.SaveFileInfoToLevelDB(c.Md5, c, this.ldb)
}
}
}
}()
if dir, err := os.Readlink(STORE_DIR_NAME); err == nil {
if strings.HasSuffix(dir, string(os.PathSeparator)) {
dir = strings.TrimSuffix(dir, string(os.PathSeparator))
}
curDir = dir
isLink = true
if err := w.AddRecursive(dir); err != nil {
log.Error(err)
}
w.Ignore(dir + "/_tmp/")
w.Ignore(dir + "/" + LARGE_DIR_NAME + "/")
}
if err := w.AddRecursive("./" + STORE_DIR_NAME); err != nil {
log.Error(err)
}
w.Ignore("./" + STORE_DIR_NAME + "/_tmp/")
w.Ignore("./" + STORE_DIR_NAME + "/" + LARGE_DIR_NAME + "/")
if err := w.Start(time.Millisecond * 100); err != nil {
log.Error(err)
}
}
func (this *Server) RepairStatByDate(date string) StatDateFileInfo {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("RepairStatByDate")
log.Error(re)
log.Error(string(buffer))
}
}()
var (
err error
keyPrefix string
fileInfo FileInfo
fileCount int64
fileSize int64
stat StatDateFileInfo
)
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, CONST_FILE_Md5_FILE_NAME)
iter := server.logDB.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
defer iter.Release()
for iter.Next() {
if err = json.Unmarshal(iter.Value(), &fileInfo); err != nil {
continue
}
fileCount = fileCount + 1
fileSize = fileSize + fileInfo.Size
}
this.statMap.Put(date+"_"+CONST_STAT_FILE_COUNT_KEY, fileCount)
this.statMap.Put(date+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, fileSize)
this.SaveStat()
stat.Date = date
stat.FileCount = fileCount
stat.TotalSize = fileSize
return stat
}
func (this *Server) GetFilePathByInfo(fileInfo *FileInfo, withDocker bool) string {
var (
fn string
)
fn = fileInfo.Name
if fileInfo.ReName != "" {
fn = fileInfo.ReName
}
if withDocker {
return DOCKER_DIR + fileInfo.Path + "/" + fn
}
return fileInfo.Path + "/" + fn
}
func (this *Server) CheckFileExistByInfo(md5s string, fileInfo *FileInfo) bool {
var (
err error
fullpath string
fi os.FileInfo
info *FileInfo
)
if fileInfo == nil {
return false
}
if fileInfo.OffSet >= 0 {
//small file
if info, err = this.GetFileInfoFromLevelDB(fileInfo.Md5); err == nil && info.Md5 == fileInfo.Md5 {
return true
} else {
return false
}
}
fullpath = this.GetFilePathByInfo(fileInfo, true)
if fi, err = os.Stat(fullpath); err != nil {
return false
}
if fi.Size() == fileInfo.Size {
return true
} else {
return false
}
}
func (this *Server) ParseSmallFile(filename string) (string, int64, int, error) {
var (
err error
offset int64
length int
)
err = errors.New("unvalid small file")
if len(filename) < 3 {
return filename, -1, -1, err
}
if strings.Contains(filename, "/") {
filename = filename[strings.LastIndex(filename, "/")+1:]
}
pos := strings.Split(filename, ",")
if len(pos) < 3 {
return filename, -1, -1, err
}
offset, err = strconv.ParseInt(pos[1], 10, 64)
if err != nil {
return filename, -1, -1, err
}
if length, err = strconv.Atoi(pos[2]); err != nil {
return filename, offset, -1, err
}
if length > CONST_SMALL_FILE_SIZE || offset < 0 {
err = errors.New("invalid filesize or offset")
return filename, -1, -1, err
}
return pos[0], offset, length, nil
}
func (this *Server) DownloadFromPeer(peer string, fileInfo *FileInfo) {
var (
err error
filename string
fpath string
fpathTmp string
fi os.FileInfo
sum string
data []byte
downloadUrl string
)
if Config().ReadOnly {
log.Warn("ReadOnly", fileInfo)
return
}
if Config().RetryCount > 0 && fileInfo.retry >= Config().RetryCount {
log.Error("DownloadFromPeer Error ", fileInfo)
return
} else {
fileInfo.retry = fileInfo.retry + 1
}
filename = fileInfo.Name
if fileInfo.ReName != "" {
filename = fileInfo.ReName
}
if fileInfo.OffSet != -2 && Config().EnableDistinctFile && this.CheckFileExistByInfo(fileInfo.Md5, fileInfo) {
// ignore migrate file
log.Info(fmt.Sprintf("DownloadFromPeer file Exist, path:%s", fileInfo.Path+"/"+fileInfo.Name))
return
}
if (!Config().EnableDistinctFile || fileInfo.OffSet == -2) && this.util.FileExists(this.GetFilePathByInfo(fileInfo, true)) {
// ignore migrate file
if fi, err = os.Stat(this.GetFilePathByInfo(fileInfo, true)); err == nil {
if fi.ModTime().Unix() > fileInfo.TimeStamp {
log.Info(fmt.Sprintf("ignore file sync path:%s", this.GetFilePathByInfo(fileInfo, false)))
fileInfo.TimeStamp = fi.ModTime().Unix()
this.postFileToPeer(fileInfo) // keep newer
return
}
os.Remove(this.GetFilePathByInfo(fileInfo, true))
}
}
if _, err = os.Stat(fileInfo.Path); err != nil {
os.MkdirAll(DOCKER_DIR+fileInfo.Path, 0775)
}
//fmt.Println("downloadFromPeer",fileInfo)
p := strings.Replace(fileInfo.Path, STORE_DIR_NAME+"/", "", 1)
//filename=this.util.UrlEncode(filename)
if Config().SupportGroupManage {
downloadUrl = peer + "/" + Config().Group + "/" + p + "/" + filename
} else {
downloadUrl = peer + "/" + p + "/" + filename
}
log.Info("DownloadFromPeer: ", downloadUrl)
fpath = DOCKER_DIR + fileInfo.Path + "/" + filename
fpathTmp = DOCKER_DIR + fileInfo.Path + "/" + fmt.Sprintf("%s_%s", "tmp_", filename)
timeout := fileInfo.Size/1024/1024/1 + 30
if Config().SyncTimeout > 0 {
timeout = Config().SyncTimeout
}
this.lockMap.LockKey(fpath)
defer this.lockMap.UnLockKey(fpath)
download_key := fmt.Sprintf("downloading_%d_%s", time.Now().Unix(), fpath)
this.ldb.Put([]byte(download_key), []byte(""), nil)
defer func() {
this.ldb.Delete([]byte(download_key), nil)
}()
if fileInfo.OffSet == -2 {
//migrate file
if fi, err = os.Stat(fpath); err == nil && fi.Size() == fileInfo.Size {
//prevent double download
this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo, this.ldb)
//log.Info(fmt.Sprintf("file '%s' has download", fpath))
return
}
req := httplib.Get(downloadUrl)
req.SetTimeout(time.Second*30, time.Second*time.Duration(timeout))
if err = req.ToFile(fpathTmp); err != nil {
this.AppendToDownloadQueue(fileInfo) //retry
os.Remove(fpathTmp)
log.Error(err, fpathTmp)
return
}
if fi, err = os.Stat(fpathTmp); err != nil {
os.Remove(fpathTmp)
return
}
if fi.Size() != fileInfo.Size {
log.Error("file size check error")
os.Remove(fpathTmp)
}
if os.Rename(fpathTmp, fpath) == nil {
//this.SaveFileMd5Log(fileInfo, CONST_FILE_Md5_FILE_NAME)
this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo, this.ldb)
}
return
}
req := httplib.Get(downloadUrl)
req.SetTimeout(time.Second*30, time.Second*time.Duration(timeout))
if fileInfo.OffSet >= 0 {
//small file download
data, err = req.Bytes()
if err != nil {
this.AppendToDownloadQueue(fileInfo) //retry
log.Error(err)
return
}
data2 := make([]byte, len(data)+1)
data2[0] = '1'
for i, v := range data {
data2[i+1] = v
}
data = data2
if int64(len(data)) != fileInfo.Size {
log.Warn("file size is error")
return
}
fpath = strings.Split(fpath, ",")[0]
err = this.util.WriteFileByOffSet(fpath, fileInfo.OffSet, data)
if err != nil {
log.Warn(err)
return
}
this.SaveFileMd5Log(fileInfo, CONST_FILE_Md5_FILE_NAME)
return
}
if err = req.ToFile(fpathTmp); err != nil {
this.AppendToDownloadQueue(fileInfo) //retry
os.Remove(fpathTmp)
log.Error(err)
return
}
if fi, err = os.Stat(fpathTmp); err != nil {
os.Remove(fpathTmp)
return
}
_ = sum
//if Config().EnableDistinctFile {
// //DistinctFile
// if sum, err = this.util.GetFileSumByName(fpathTmp, Config().FileSumArithmetic); err != nil {
// log.Error(err)
// return
// }
//} else {
// //DistinctFile By path
// sum = this.util.MD5(this.GetFilePathByInfo(fileInfo, false))
//}
if fi.Size() != fileInfo.Size { // maybe has bug remove || sum != fileInfo.Md5
log.Error("file sum check error")
os.Remove(fpathTmp)
return
}
if os.Rename(fpathTmp, fpath) == nil {
this.SaveFileMd5Log(fileInfo, CONST_FILE_Md5_FILE_NAME)
}
}
func (this *Server) CrossOrigin(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, Depth, User-Agent, X-File-Size, X-Requested-With, X-Requested-By, If-Modified-Since, X-File-Name, X-File-Type, Cache-Control, Origin")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS, PUT, DELETE")
w.Header().Set("Access-Control-Expose-Headers", "Authorization")
//https://blog.csdn.net/yanzisu_congcong/article/details/80552155
}
func (this *Server) SetDownloadHeader(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", "attachment")
}
func (this *Server) CheckAuth(w http.ResponseWriter, r *http.Request) bool {
var (
err error
req *httplib.BeegoHTTPRequest
result string
jsonResult JsonResult
)
if err = r.ParseForm(); err != nil {
log.Error(err)
return false
}
req = httplib.Post(Config().AuthUrl)
req.SetTimeout(time.Second*10, time.Second*10)
req.Param("__path__", r.URL.Path)
req.Param("__query__", r.URL.RawQuery)
for k, _ := range r.Form {
req.Param(k, r.FormValue(k))
}
for k, v := range r.Header {
req.Header(k, v[0])
}
result, err = req.String()
result = strings.TrimSpace(result)
if strings.HasPrefix(result, "{") && strings.HasSuffix(result, "}") {
if err = json.Unmarshal([]byte(result), &jsonResult); err != nil {
log.Error(err)
return false
}
if jsonResult.Data != "ok" {
log.Warn(result)
return false
}
} else {
if result != "ok" {
log.Warn(result)
return false
}
}
return true
}
func (this *Server) NotPermit(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(401)
}
func (this *Server) GetFilePathFromRequest(w http.ResponseWriter, r *http.Request) (string, string) {
var (
err error
fullpath string
smallPath string
prefix string
)
fullpath = r.RequestURI[1:]
if strings.HasPrefix(r.RequestURI, "/"+Config().Group+"/") {
fullpath = r.RequestURI[len(Config().Group)+2 : len(r.RequestURI)]
}
fullpath = strings.Split(fullpath, "?")[0] // just path
fullpath = DOCKER_DIR + STORE_DIR_NAME + "/" + fullpath
prefix = "/" + LARGE_DIR_NAME + "/"
if Config().SupportGroupManage {
prefix = "/" + Config().Group + "/" + LARGE_DIR_NAME + "/"
}
if strings.HasPrefix(r.RequestURI, prefix) {
smallPath = fullpath //notice order
fullpath = strings.Split(fullpath, ",")[0]
}
if fullpath, err = url.PathUnescape(fullpath); err != nil {
log.Error(err)
}
return fullpath, smallPath
}
func (this *Server) CheckDownloadAuth(w http.ResponseWriter, r *http.Request) (bool, error) {
var (
err error
maxTimestamp int64
minTimestamp int64
ts int64
token string
timestamp string
fullpath string
smallPath string
pathMd5 string
fileInfo *FileInfo
scene string
secret interface{}
code string
ok bool
)
CheckToken := func(token string, md5sum string, timestamp string) bool {
if this.util.MD5(md5sum+timestamp) != token {
return false
}
return true
}
if Config().EnableDownloadAuth && Config().AuthUrl != "" && !this.IsPeer(r) && !this.CheckAuth(w, r) {
return false, errors.New("auth fail")
}
if Config().DownloadUseToken && !this.IsPeer(r) {
token = r.FormValue("token")
timestamp = r.FormValue("timestamp")
if token == "" || timestamp == "" {
return false, errors.New("unvalid request")
}
maxTimestamp = time.Now().Add(time.Second *
time.Duration(Config().DownloadTokenExpire)).Unix()
minTimestamp = time.Now().Add(-time.Second *
time.Duration(Config().DownloadTokenExpire)).Unix()
if ts, err = strconv.ParseInt(timestamp, 10, 64); err != nil {
return false, errors.New("unvalid timestamp")
}
if ts > maxTimestamp || ts < minTimestamp {
return false, errors.New("timestamp expire")
}
fullpath, smallPath = this.GetFilePathFromRequest(w, r)
if smallPath != "" {
pathMd5 = this.util.MD5(smallPath)
} else {
pathMd5 = this.util.MD5(fullpath)
}
if fileInfo, err = this.GetFileInfoFromLevelDB(pathMd5); err != nil {
// TODO
} else {
ok := CheckToken(token, fileInfo.Md5, timestamp)
if !ok {
return ok, errors.New("unvalid token")
}
return ok, nil
}
}
if Config().EnableGoogleAuth && !this.IsPeer(r) {
fullpath = r.RequestURI[len(Config().Group)+2 : len(r.RequestURI)]
fullpath = strings.Split(fullpath, "?")[0] // just path
scene = strings.Split(fullpath, "/")[0]
code = r.FormValue("code")
if secret, ok = this.sceneMap.GetValue(scene); ok {
if !this.VerifyGoogleCode(secret.(string), code, int64(Config().DownloadTokenExpire/30)) {
return false, errors.New("invalid google code")
}
}
}
return true, nil
}
func (this *Server) GetSmallFileByURI(w http.ResponseWriter, r *http.Request) ([]byte, bool, error) {
var (
err error
data []byte
offset int64
length int
fullpath string
info os.FileInfo
)
fullpath, _ = this.GetFilePathFromRequest(w, r)
if _, offset, length, err = this.ParseSmallFile(r.RequestURI); err != nil {
return nil, false, err
}
if info, err = os.Stat(fullpath); err != nil {
return nil, false, err
}
if info.Size() < offset+int64(length) {
return nil, true, errors.New("noFound")
} else {
data, err = this.util.ReadFileByOffSet(fullpath, offset, length)
if err != nil {
return nil, false, err
}
return data, false, err
}
}
func (this *Server) DownloadSmallFileByURI(w http.ResponseWriter, r *http.Request) (bool, error) {
var (
err error
data []byte
isDownload bool
imgWidth int
imgHeight int
width string
height string
notFound bool
)
r.ParseForm()
isDownload = true
if r.FormValue("download") == "" {
isDownload = Config().DefaultDownload
}
if r.FormValue("download") == "0" {
isDownload = false
}
width = r.FormValue("width")
height = r.FormValue("height")
if width != "" {
imgWidth, err = strconv.Atoi(width)
if err != nil {
log.Error(err)
}
}
if height != "" {
imgHeight, err = strconv.Atoi(height)
if err != nil {
log.Error(err)
}
}
data, notFound, err = this.GetSmallFileByURI(w, r)
_ = notFound
if data != nil && string(data[0]) == "1" {
if isDownload {
this.SetDownloadHeader(w, r)
}
if imgWidth != 0 || imgHeight != 0 {
this.ResizeImageByBytes(w, data[1:], uint(imgWidth), uint(imgHeight))
return true, nil
}
w.Write(data[1:])
return true, nil
}
return false, errors.New("not found")
}
func (this *Server) DownloadNormalFileByURI(w http.ResponseWriter, r *http.Request) (bool, error) {
var (
err error
isDownload bool
imgWidth int
imgHeight int
width string
height string
)
r.ParseForm()
isDownload = true
if r.FormValue("download") == "" {
isDownload = Config().DefaultDownload
}
if r.FormValue("download") == "0" {
isDownload = false
}
width = r.FormValue("width")
height = r.FormValue("height")
if width != "" {
imgWidth, err = strconv.Atoi(width)
if err != nil {
log.Error(err)
}
}
if height != "" {
imgHeight, err = strconv.Atoi(height)
if err != nil {
log.Error(err)
}
}
if isDownload {
this.SetDownloadHeader(w, r)
}
fullpath, _ := this.GetFilePathFromRequest(w, r)
if imgWidth != 0 || imgHeight != 0 {
this.ResizeImage(w, fullpath, uint(imgWidth), uint(imgHeight))
return true, nil
}
staticHandler.ServeHTTP(w, r)
return true, nil
}
func (this *Server) DownloadNotFound(w http.ResponseWriter, r *http.Request) {
var (
err error
fullpath string
smallPath string
isDownload bool
pathMd5 string
peer string
fileInfo *FileInfo
)
fullpath, smallPath = this.GetFilePathFromRequest(w, r)
isDownload = true
if r.FormValue("download") == "" {
isDownload = Config().DefaultDownload
}
if r.FormValue("download") == "0" {
isDownload = false
}
if smallPath != "" {
pathMd5 = this.util.MD5(smallPath)
} else {
pathMd5 = this.util.MD5(fullpath)
}
for _, peer = range Config().Peers {
if fileInfo, err = this.checkPeerFileExist(peer, pathMd5, fullpath); err != nil {
log.Error(err)
continue
}
if fileInfo.Md5 != "" {
go this.DownloadFromPeer(peer, fileInfo)
//http.Redirect(w, r, peer+r.RequestURI, 302)
if isDownload {
this.SetDownloadHeader(w, r)
}
this.DownloadFileToResponse(peer+r.RequestURI, w, r)
return
}
}
w.WriteHeader(404)
return
}
func (this *Server) Download(w http.ResponseWriter, r *http.Request) {
var (
err error
ok bool
fullpath string
smallPath string
fi os.FileInfo
)
// redirect to upload
if r.RequestURI == "/" || r.RequestURI == "" ||
r.RequestURI == "/"+Config().Group ||
r.RequestURI == "/"+Config().Group+"/" {
this.Index(w, r)
return
}
if ok, err = this.CheckDownloadAuth(w, r); !ok {
log.Error(err)
this.NotPermit(w, r)
return
}
if Config().EnableCrossOrigin {
this.CrossOrigin(w, r)
}
fullpath, smallPath = this.GetFilePathFromRequest(w, r)
if smallPath == "" {
if fi, err = os.Stat(fullpath); err != nil {
this.DownloadNotFound(w, r)
return
}
if !Config().ShowDir && fi.IsDir() {
w.Write([]byte("list dir deny"))
return
}
//staticHandler.ServeHTTP(w, r)
this.DownloadNormalFileByURI(w, r)
return
}
if smallPath != "" {
if ok, err = this.DownloadSmallFileByURI(w, r); !ok {
this.DownloadNotFound(w, r)
return
}
return
}
}
func (this *Server) DownloadFileToResponse(url string, w http.ResponseWriter, r *http.Request) {
var (
err error
req *httplib.BeegoHTTPRequest
resp *http.Response
)
req = httplib.Get(url)
req.SetTimeout(time.Second*20, time.Second*600)
resp, err = req.DoRequest()
if err != nil {
log.Error(err)
return
}
defer resp.Body.Close()
_, err = io.Copy(w, resp.Body)
if err != nil {
log.Error(err)
}
}
func (this *Server) ResizeImageByBytes(w http.ResponseWriter, data []byte, width, height uint) {
var (
img image.Image
err error
imgType string
)
reader := bytes.NewReader(data)
img, imgType, err = image.Decode(reader)
if err != nil {
log.Error(err)
return
}
img = resize.Resize(width, height, img, resize.Lanczos3)
if imgType == "jpg" || imgType == "jpeg" {
jpeg.Encode(w, img, nil)
} else if imgType == "png" {
png.Encode(w, img)
} else {
w.Write(data)
}
}
func (this *Server) ResizeImage(w http.ResponseWriter, fullpath string, width, height uint) {
var (
img image.Image
err error
imgType string
file *os.File
)
file, err = os.Open(fullpath)
if err != nil {
log.Error(err)
return
}
img, imgType, err = image.Decode(file)
if err != nil {
log.Error(err)
return
}
file.Close()
img = resize.Resize(width, height, img, resize.Lanczos3)
if imgType == "jpg" || imgType == "jpeg" {
jpeg.Encode(w, img, nil)
} else if imgType == "png" {
png.Encode(w, img)
} else {
file.Seek(0, 0)
io.Copy(w, file)
}
}
func (this *Server) GetServerURI(r *http.Request) string {
return fmt.Sprintf("http://%s/", r.Host)
}
func (this *Server) CheckFileAndSendToPeer(date string, filename string, isForceUpload bool) {
var (
md5set mapset.Set
err error
md5s []interface{}
)
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("CheckFileAndSendToPeer")
log.Error(re)
log.Error(string(buffer))
}
}()
if md5set, err = this.GetMd5sByDate(date, filename); err != nil {
log.Error(err)
return
}
md5s = md5set.ToSlice()
for _, md := range md5s {
if md == nil {
continue
}
if fileInfo, _ := this.GetFileInfoFromLevelDB(md.(string)); fileInfo != nil && fileInfo.Md5 != "" {
if isForceUpload {
fileInfo.Peers = []string{}
}
if len(fileInfo.Peers) > len(Config().Peers) {
continue
}
if !this.util.Contains(this.host, fileInfo.Peers) {
fileInfo.Peers = append(fileInfo.Peers, this.host) // peer is null
}
if filename == CONST_Md5_QUEUE_FILE_NAME {
this.AppendToDownloadQueue(fileInfo)
} else {
this.AppendToQueue(fileInfo)
}
}
}
}
func (this *Server) postFileToPeer(fileInfo *FileInfo) {
var (
err error
peer string
filename string
info *FileInfo
postURL string
result string
fi os.FileInfo
i int
data []byte
fpath string
)
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("postFileToPeer")
log.Error(re)
log.Error(string(buffer))
}
}()
//fmt.Println("postFile",fileInfo)
for i, peer = range Config().Peers {
_ = i
if fileInfo.Peers == nil {
fileInfo.Peers = []string{}
}
if this.util.Contains(peer, fileInfo.Peers) {
continue
}
filename = fileInfo.Name
if fileInfo.ReName != "" {
filename = fileInfo.ReName
if fileInfo.OffSet != -1 {
filename = strings.Split(fileInfo.ReName, ",")[0]
}
}
fpath = DOCKER_DIR + fileInfo.Path + "/" + filename
if !this.util.FileExists(fpath) {
log.Warn(fmt.Sprintf("file '%s' not found", fpath))
continue
} else {
if fileInfo.Size == 0 {
if fi, err = os.Stat(fpath); err != nil {
log.Error(err)
} else {
fileInfo.Size = fi.Size()
}
}
}
if fileInfo.OffSet != -2 && Config().EnableDistinctFile {
//not migrate file should check or update file
// where not EnableDistinctFile should check
if info, err = this.checkPeerFileExist(peer, fileInfo.Md5, ""); info.Md5 != "" {
fileInfo.Peers = append(fileInfo.Peers, peer)
if _, err = this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo, this.ldb); err != nil {
log.Error(err)
}
continue
}
}
postURL = fmt.Sprintf("%s%s", peer, this.getRequestURI("syncfile_info"))
b := httplib.Post(postURL)
b.SetTimeout(time.Second*30, time.Second*30)
if data, err = json.Marshal(fileInfo); err != nil {
log.Error(err)
return
}
b.Param("fileInfo", string(data))
result, err = b.String()
if err != nil {
if fileInfo.retry <= Config().RetryCount {
fileInfo.retry = fileInfo.retry + 1
this.AppendToQueue(fileInfo)
}
log.Error(err, fmt.Sprintf(" path:%s", fileInfo.Path+"/"+fileInfo.Name))
}
if !strings.HasPrefix(result, "http://") || err != nil {
this.SaveFileMd5Log(fileInfo, CONST_Md5_ERROR_FILE_NAME)
}
if strings.HasPrefix(result, "http://") {
log.Info(result)
if !this.util.Contains(peer, fileInfo.Peers) {
fileInfo.Peers = append(fileInfo.Peers, peer)
if _, err = this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo, this.ldb); err != nil {
log.Error(err)
}
}
}
if err != nil {
log.Error(err)
}
}
}
func (this *Server) SaveFileMd5Log(fileInfo *FileInfo, filename string) {
var (
info FileInfo
)
for len(this.queueFileLog)+len(this.queueFileLog)/10 > CONST_QUEUE_SIZE {
time.Sleep(time.Second * 1)
}
info = *fileInfo
this.queueFileLog <- &FileLog{FileInfo: &info, FileName: filename}
}
func (this *Server) saveFileMd5Log(fileInfo *FileInfo, filename string) {
var (
err error
outname string
logDate string
ok bool
fullpath string
md5Path string
logKey string
)
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("saveFileMd5Log")
log.Error(re)
log.Error(string(buffer))
}
}()
if fileInfo == nil || fileInfo.Md5 == "" || filename == "" {
log.Warn("saveFileMd5Log", fileInfo, filename)
return
}
logDate = this.util.GetDayFromTimeStamp(fileInfo.TimeStamp)
outname = fileInfo.Name
if fileInfo.ReName != "" {
outname = fileInfo.ReName
}
fullpath = fileInfo.Path + "/" + outname
logKey = fmt.Sprintf("%s_%s_%s", logDate, filename, fileInfo.Md5)
if filename == CONST_FILE_Md5_FILE_NAME {
//this.searchMap.Put(fileInfo.Md5, fileInfo.Name)
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5, this.ldb); !ok {
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_COUNT_KEY, 1)
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, fileInfo.Size)
this.SaveStat()
}
if _, err = this.SaveFileInfoToLevelDB(logKey, fileInfo, this.logDB); err != nil {
log.Error(err)
}
if _, err := this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo, this.ldb); err != nil {
log.Error("saveToLevelDB", err, fileInfo)
}
if _, err = this.SaveFileInfoToLevelDB(this.util.MD5(fullpath), fileInfo, this.ldb); err != nil {
log.Error("saveToLevelDB", err, fileInfo)
}
return
}
if filename == CONST_REMOME_Md5_FILE_NAME {
//this.searchMap.Remove(fileInfo.Md5)
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5, this.ldb); ok {
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_COUNT_KEY, -1)
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, -fileInfo.Size)
this.SaveStat()
}
this.RemoveKeyFromLevelDB(logKey, this.logDB)
md5Path = this.util.MD5(fullpath)
if err := this.RemoveKeyFromLevelDB(fileInfo.Md5, this.ldb); err != nil {
log.Error("RemoveKeyFromLevelDB", err, fileInfo)
}
if err = this.RemoveKeyFromLevelDB(md5Path, this.ldb); err != nil {
log.Error("RemoveKeyFromLevelDB", err, fileInfo)
}
// remove files.md5 for stat info(repair from logDB)
logKey = fmt.Sprintf("%s_%s_%s", logDate, CONST_FILE_Md5_FILE_NAME, fileInfo.Md5)
this.RemoveKeyFromLevelDB(logKey, this.logDB)
return
}
this.SaveFileInfoToLevelDB(logKey, fileInfo, this.logDB)
}
func (this *Server) checkPeerFileExist(peer string, md5sum string, fpath string) (*FileInfo, error) {
var (
err error
fileInfo FileInfo
)
req := httplib.Post(fmt.Sprintf("%s%s?md5=%s", peer, this.getRequestURI("check_file_exist"), md5sum))
req.Param("path", fpath)
req.Param("md5", md5sum)
req.SetTimeout(time.Second*5, time.Second*10)
if err = req.ToJSON(&fileInfo); err != nil {
return &FileInfo{}, err
}
if fileInfo.Md5 == "" {
return &fileInfo, errors.New("not found")
}
return &fileInfo, nil
}
func (this *Server) CheckFileExist(w http.ResponseWriter, r *http.Request) {
var (
data []byte
err error
fileInfo *FileInfo
fpath string
fi os.FileInfo
)
r.ParseForm()
md5sum := ""
md5sum = r.FormValue("md5")
fpath = r.FormValue("path")
if fileInfo, err = this.GetFileInfoFromLevelDB(md5sum); fileInfo != nil {
if fileInfo.OffSet != -1 {
if data, err = json.Marshal(fileInfo); err != nil {
log.Error(err)
}
w.Write(data)
return
}
fpath = DOCKER_DIR + fileInfo.Path + "/" + fileInfo.Name
if fileInfo.ReName != "" {
fpath = DOCKER_DIR + fileInfo.Path + "/" + fileInfo.ReName
}
if this.util.IsExist(fpath) {
if data, err = json.Marshal(fileInfo); err == nil {
w.Write(data)
return
} else {
log.Error(err)
}
} else {
if fileInfo.OffSet == -1 {
this.RemoveKeyFromLevelDB(md5sum, this.ldb) // when file delete,delete from leveldb
}
}
} else {
if fpath != "" {
fi, err = os.Stat(fpath)
if err == nil {
sum := this.util.MD5(fpath)
//if Config().EnableDistinctFile {
// sum, err = this.util.GetFileSumByName(fpath, Config().FileSumArithmetic)
// if err != nil {
// log.Error(err)
// }
//}
fileInfo = &FileInfo{
Path: path.Dir(fpath),
Name: path.Base(fpath),
Size: fi.Size(),
Md5: sum,
Peers: []string{Config().Host},
OffSet: -1, //very important
TimeStamp: fi.ModTime().Unix(),
}
data, err = json.Marshal(fileInfo)
w.Write(data)
return
}
}
}
data, _ = json.Marshal(FileInfo{})
w.Write(data)
return
}
func (this *Server) CheckFilesExist(w http.ResponseWriter, r *http.Request) {
var (
data []byte
err error
fileInfo *FileInfo
fileInfos []*FileInfo
fpath string
result JsonResult
)
r.ParseForm()
md5sum := ""
md5sum = r.FormValue("md5s")
md5s := strings.Split(md5sum, ",")
for _, m := range md5s {
if fileInfo, err = this.GetFileInfoFromLevelDB(m); fileInfo != nil {
if fileInfo.OffSet != -1 {
if data, err = json.Marshal(fileInfo); err != nil {
log.Error(err)
}
//w.Write(data)
//return
fileInfos = append(fileInfos, fileInfo)
continue
}
fpath = DOCKER_DIR + fileInfo.Path + "/" + fileInfo.Name
if fileInfo.ReName != "" {
fpath = DOCKER_DIR + fileInfo.Path + "/" + fileInfo.ReName
}
if this.util.IsExist(fpath) {
if data, err = json.Marshal(fileInfo); err == nil {
fileInfos = append(fileInfos, fileInfo)
//w.Write(data)
//return
continue
} else {
log.Error(err)
}
} else {
if fileInfo.OffSet == -1 {
this.RemoveKeyFromLevelDB(md5sum, this.ldb) // when file delete,delete from leveldb
}
}
}
}
result.Data = fileInfos
data, _ = json.Marshal(result)
w.Write(data)
return
}
func (this *Server) Sync(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
)
r.ParseForm()
result.Status = "fail"
if !this.IsPeer(r) {
result.Message = "client must be in cluster"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
date := ""
force := ""
inner := ""
isForceUpload := false
force = r.FormValue("force")
date = r.FormValue("date")
inner = r.FormValue("inner")
if force == "1" {
isForceUpload = true
}
if inner != "1" {
for _, peer := range Config().Peers {
req := httplib.Post(peer + this.getRequestURI("sync"))
req.Param("force", force)
req.Param("inner", "1")
req.Param("date", date)
if _, err := req.String(); err != nil {
log.Error(err)
}
}
}
if date == "" {
result.Message = "require paramete date &force , ?date=20181230"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
date = strings.Replace(date, ".", "", -1)
if isForceUpload {
go this.CheckFileAndSendToPeer(date, CONST_FILE_Md5_FILE_NAME, isForceUpload)
} else {
go this.CheckFileAndSendToPeer(date, CONST_Md5_ERROR_FILE_NAME, isForceUpload)
}
result.Status = "ok"
result.Message = "job is running"
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
func (this *Server) IsExistFromLevelDB(key string, db *leveldb.DB) (bool, error) {
return db.Has([]byte(key), nil)
}
func (this *Server) GetFileInfoFromLevelDB(key string) (*FileInfo, error) {
var (
err error
data []byte
fileInfo FileInfo
)
if data, err = this.ldb.Get([]byte(key), nil); err != nil {
return nil, err
}
if err = json.Unmarshal(data, &fileInfo); err != nil {
return nil, err
}
return &fileInfo, nil
}
func (this *Server) SaveStat() {
SaveStatFunc := func() {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("SaveStatFunc")
log.Error(re)
log.Error(string(buffer))
}
}()
stat := this.statMap.Get()
if v, ok := stat[CONST_STAT_FILE_COUNT_KEY]; ok {
switch v.(type) {
case int64, int32, int, float64, float32:
if v.(int64) >= 0 {
if data, err := json.Marshal(stat); err != nil {
log.Error(err)
} else {
this.util.WriteBinFile(CONST_STAT_FILE_NAME, data)
}
}
}
}
}
SaveStatFunc()
}
func (this *Server) RemoveKeyFromLevelDB(key string, db *leveldb.DB) error {
var (
err error
)
err = db.Delete([]byte(key), nil)
return err
}
func (this *Server) SaveFileInfoToLevelDB(key string, fileInfo *FileInfo, db *leveldb.DB) (*FileInfo, error) {
var (
err error
data []byte
)
if fileInfo == nil || db == nil {
return nil, errors.New("fileInfo is null or db is null")
}
if data, err = json.Marshal(fileInfo); err != nil {
return fileInfo, err
}
if err = db.Put([]byte(key), data, nil); err != nil {
return fileInfo, err
}
if db == this.ldb { //search slow ,write fast, double write logDB
logDate := this.util.GetDayFromTimeStamp(fileInfo.TimeStamp)
logKey := fmt.Sprintf("%s_%s_%s", logDate, CONST_FILE_Md5_FILE_NAME, fileInfo.Md5)
this.logDB.Put([]byte(logKey), data, nil)
}
return fileInfo, nil
}
func (this *Server) IsPeer(r *http.Request) bool {
var (
ip string
peer string
bflag bool
cidr *net.IPNet
err error
)
IsPublicIP := func(IP net.IP) bool {
if IP.IsLoopback() || IP.IsLinkLocalMulticast() || IP.IsLinkLocalUnicast() {
return false
}
if ip4 := IP.To4(); ip4 != nil {
switch true {
case ip4[0] == 10:
return false
case ip4[0] == 172 && ip4[1] >= 16 && ip4[1] <= 31:
return false
case ip4[0] == 192 && ip4[1] == 168:
return false
default:
return true
}
}
return false
}
//return true
ip = this.util.GetClientIp(r)
if this.util.Contains("0.0.0.0", Config().AdminIps) {
if IsPublicIP(net.ParseIP(ip)) {
return false
}
return true
}
if this.util.Contains(ip, Config().AdminIps) {
return true
}
for _, v := range Config().AdminIps {
if strings.Contains(v, "/") {
if _, cidr, err = net.ParseCIDR(v); err != nil {
log.Error(err)
return false
}
if cidr.Contains(net.ParseIP(ip)) {
return true
}
}
}
realIp := os.Getenv("GO_FASTDFS_IP")
if realIp == "" {
realIp = this.util.GetPulicIP()
}
if ip == "127.0.0.1" || ip == realIp {
return true
}
ip = "http://" + ip
bflag = false
for _, peer = range Config().Peers {
if strings.HasPrefix(peer, ip) {
bflag = true
break
}
}
return bflag
}
func (this *Server) ReceiveMd5s(w http.ResponseWriter, r *http.Request) {
var (
err error
md5str string
fileInfo *FileInfo
md5s []string
)
if !this.IsPeer(r) {
log.Warn(fmt.Sprintf("ReceiveMd5s %s", this.util.GetClientIp(r)))
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
r.ParseForm()
md5str = r.FormValue("md5s")
md5s = strings.Split(md5str, ",")
AppendFunc := func(md5s []string) {
for _, m := range md5s {
if m != "" {
if fileInfo, err = this.GetFileInfoFromLevelDB(m); err != nil {
log.Error(err)
continue
}
this.AppendToQueue(fileInfo)
}
}
}
go AppendFunc(md5s)
}
func (this *Server) GetClusterNotPermitMessage(r *http.Request) string {
var (
message string
)
message = fmt.Sprintf(CONST_MESSAGE_CLUSTER_IP, this.util.GetClientIp(r))
return message
}
func (this *Server) GetMd5sForWeb(w http.ResponseWriter, r *http.Request) {
var (
date string
err error
result mapset.Set
lines []string
md5s []interface{}
)
if !this.IsPeer(r) {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
date = r.FormValue("date")
if result, err = this.GetMd5sByDate(date, CONST_FILE_Md5_FILE_NAME); err != nil {
log.Error(err)
return
}
md5s = result.ToSlice()
for _, line := range md5s {
if line != nil && line != "" {
lines = append(lines, line.(string))
}
}
w.Write([]byte(strings.Join(lines, ",")))
}
func (this *Server) GetMd5File(w http.ResponseWriter, r *http.Request) {
var (
date string
fpath string
data []byte
err error
)
if !this.IsPeer(r) {
return
}
fpath = DATA_DIR + "/" + date + "/" + CONST_FILE_Md5_FILE_NAME
if !this.util.FileExists(fpath) {
w.WriteHeader(404)
return
}
if data, err = ioutil.ReadFile(fpath); err != nil {
w.WriteHeader(500)
return
}
w.Write(data)
}
func (this *Server) GetMd5sMapByDate(date string, filename string) (*goutil.CommonMap, error) {
var (
err error
result *goutil.CommonMap
fpath string
content string
lines []string
line string
cols []string
data []byte
)
result = goutil.NewCommonMap(0)
if filename == "" {
fpath = DATA_DIR + "/" + date + "/" + CONST_FILE_Md5_FILE_NAME
} else {
fpath = DATA_DIR + "/" + date + "/" + filename
}
if !this.util.FileExists(fpath) {
return result, errors.New(fmt.Sprintf("fpath %s not found", fpath))
}
if data, err = ioutil.ReadFile(fpath); err != nil {
return result, err
}
content = string(data)
lines = strings.Split(content, "\n")
for _, line = range lines {
cols = strings.Split(line, "|")
if len(cols) > 2 {
if _, err = strconv.ParseInt(cols[1], 10, 64); err != nil {
continue
}
result.Add(cols[0])
}
}
return result, nil
}
func (this *Server) GetMd5sByDate(date string, filename string) (mapset.Set, error) {
var (
keyPrefix string
md5set mapset.Set
keys []string
)
md5set = mapset.NewSet()
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, filename)
iter := server.logDB.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
for iter.Next() {
keys = strings.Split(string(iter.Key()), "_")
if len(keys) >= 3 {
md5set.Add(keys[2])
}
}
iter.Release()
return md5set, nil
}
func (this *Server) SyncFileInfo(w http.ResponseWriter, r *http.Request) {
var (
err error
fileInfo FileInfo
fileInfoStr string
filename string
)
r.ParseForm()
fileInfoStr = r.FormValue("fileInfo")
if !this.IsPeer(r) {
log.Info("isn't peer fileInfo:", fileInfo)
return
}
if err = json.Unmarshal([]byte(fileInfoStr), &fileInfo); err != nil {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
log.Error(err)
return
}
if fileInfo.OffSet == -2 {
// optimize migrate
this.SaveFileInfoToLevelDB(fileInfo.Md5, &fileInfo, this.ldb)
} else {
this.SaveFileMd5Log(&fileInfo, CONST_Md5_QUEUE_FILE_NAME)
}
this.AppendToDownloadQueue(&fileInfo)
filename = fileInfo.Name
if fileInfo.ReName != "" {
filename = fileInfo.ReName
}
p := strings.Replace(fileInfo.Path, STORE_DIR+"/", "", 1)
downloadUrl := fmt.Sprintf("http://%s/%s", r.Host, Config().Group+"/"+p+"/"+filename)
log.Info("SyncFileInfo: ", downloadUrl)
w.Write([]byte(downloadUrl))
}
func (this *Server) CheckScene(scene string) (bool, error) {
var (
scenes []string
)
if len(Config().Scenes) == 0 {
return true, nil
}
for _, s := range Config().Scenes {
scenes = append(scenes, strings.Split(s, ":")[0])
}
if !this.util.Contains(scene, scenes) {
return false, errors.New("not valid scene")
}
return true, nil
}
func (this *Server) GetFileInfo(w http.ResponseWriter, r *http.Request) {
var (
fpath string
md5sum string
fileInfo *FileInfo
err error
result JsonResult
)
md5sum = r.FormValue("md5")
fpath = r.FormValue("path")
result.Status = "fail"
if !this.IsPeer(r) {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
md5sum = r.FormValue("md5")
if fpath != "" {
fpath = strings.Replace(fpath, "/"+Config().Group+"/", STORE_DIR_NAME+"/", 1)
md5sum = this.util.MD5(fpath)
}
if fileInfo, err = this.GetFileInfoFromLevelDB(md5sum); err != nil {
log.Error(err)
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
result.Status = "ok"
result.Data = fileInfo
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
func (this *Server) RemoveFile(w http.ResponseWriter, r *http.Request) {
var (
err error
md5sum string
fileInfo *FileInfo
fpath string
delUrl string
result JsonResult
inner string
name string
)
_ = delUrl
_ = inner
r.ParseForm()
md5sum = r.FormValue("md5")
fpath = r.FormValue("path")
inner = r.FormValue("inner")
result.Status = "fail"
if !this.IsPeer(r) {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
if Config().AuthUrl != "" && !this.CheckAuth(w, r) {
this.NotPermit(w, r)
return
}
if fpath != "" && md5sum == "" {
fpath = strings.Replace(fpath, "/"+Config().Group+"/", STORE_DIR_NAME+"/", 1)
md5sum = this.util.MD5(fpath)
}
if inner != "1" {
for _, peer := range Config().Peers {
delFile := func(peer string, md5sum string, fileInfo *FileInfo) {
delUrl = fmt.Sprintf("%s%s", peer, this.getRequestURI("delete"))
req := httplib.Post(delUrl)
req.Param("md5", md5sum)
req.Param("inner", "1")
req.SetTimeout(time.Second*5, time.Second*10)
if _, err = req.String(); err != nil {
log.Error(err)
}
}
go delFile(peer, md5sum, fileInfo)
}
}
if len(md5sum) < 32 {
result.Message = "md5 unvalid"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if fileInfo, err = this.GetFileInfoFromLevelDB(md5sum); err != nil {
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if fileInfo.OffSet >= 0 {
result.Message = "small file delete not support"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
name = fileInfo.Name
if fileInfo.ReName != "" {
name = fileInfo.ReName
}
fpath = fileInfo.Path + "/" + name
if fileInfo.Path != "" && this.util.FileExists(DOCKER_DIR+fpath) {
this.SaveFileMd5Log(fileInfo, CONST_REMOME_Md5_FILE_NAME)
if err = os.Remove(DOCKER_DIR + fpath); err != nil {
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
} else {
result.Message = "remove success"
result.Status = "ok"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
}
result.Message = "fail remove"
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
func (this *Server) getRequestURI(action string) string {
var (
uri string
)
if Config().SupportGroupManage {
uri = "/" + Config().Group + "/" + action
} else {
uri = "/" + action
}
return uri
}
func (this *Server) BuildFileResult(fileInfo *FileInfo, r *http.Request) FileResult {
var (
outname string
fileResult FileResult
p string
downloadUrl string
domain string
host string
protocol string
)
if Config().EnableHttps {
protocol = "https"
} else {
protocol = "http"
}
host = strings.Replace(Config().Host, "http://", "", -1)
if r != nil {
host = r.Host
}
if !strings.HasPrefix(Config().DownloadDomain, "http") {
if Config().DownloadDomain == "" {
Config().DownloadDomain = fmt.Sprintf("%s://%s", protocol, host)
} else {
Config().DownloadDomain = fmt.Sprintf("%s://%s", protocol, Config().DownloadDomain)
}
}
if Config().DownloadDomain != "" {
domain = Config().DownloadDomain
} else {
domain = fmt.Sprintf("%s://%s", protocol, host)
}
outname = fileInfo.Name
if fileInfo.ReName != "" {
outname = fileInfo.ReName
}
p = strings.Replace(fileInfo.Path, STORE_DIR_NAME+"/", "", 1)
if Config().SupportGroupManage {
p = Config().Group + "/" + p + "/" + outname
} else {
p = p + "/" + outname
}
downloadUrl = fmt.Sprintf("%s://%s/%s", protocol, host, p)
if Config().DownloadDomain != "" {
downloadUrl = fmt.Sprintf("%s/%s", Config().DownloadDomain, p)
}
fileResult.Url = downloadUrl
fileResult.Md5 = fileInfo.Md5
fileResult.Path = "/" + p
fileResult.Domain = domain
fileResult.Scene = fileInfo.Scene
fileResult.Size = fileInfo.Size
fileResult.ModTime = fileInfo.TimeStamp
// Just for Compatibility
fileResult.Src = fileResult.Path
fileResult.Scenes = fileInfo.Scene
return fileResult
}
func (this *Server) SaveUploadFile(file multipart.File, header *multipart.FileHeader, fileInfo *FileInfo, r *http.Request) (*FileInfo, error) {
var (
err error
outFile *os.File
folder string
fi os.FileInfo
)
defer file.Close()
_, fileInfo.Name = filepath.Split(header.Filename)
// bugfix for ie upload file contain fullpath
if len(Config().Extensions) > 0 && !this.util.Contains(path.Ext(fileInfo.Name), Config().Extensions) {
return fileInfo, errors.New("(error)file extension mismatch")
}
if Config().RenameFile {
fileInfo.ReName = this.util.MD5(this.util.GetUUID()) + path.Ext(fileInfo.Name)
}
folder = time.Now().Format("20060102/15/04")
if Config().PeerId != "" {
folder = fmt.Sprintf(folder+"/%s", Config().PeerId)
}
if fileInfo.Scene != "" {
folder = fmt.Sprintf(STORE_DIR+"/%s/%s", fileInfo.Scene, folder)
} else {
folder = fmt.Sprintf(STORE_DIR+"/%s", folder)
}
if fileInfo.Path != "" {
if strings.HasPrefix(fileInfo.Path, STORE_DIR) {
folder = fileInfo.Path
} else {
folder = STORE_DIR + "/" + fileInfo.Path
}
}
if !this.util.FileExists(folder) {
if err = os.MkdirAll(folder, 0775); err != nil {
log.Error(err)
}
}
outPath := fmt.Sprintf(folder+"/%s", fileInfo.Name)
if fileInfo.ReName != "" {
outPath = fmt.Sprintf(folder+"/%s", fileInfo.ReName)
}
if this.util.FileExists(outPath) && Config().EnableDistinctFile {
for i := 0; i < 10000; i++ {
outPath = fmt.Sprintf(folder+"/%d_%s", i, filepath.Base(header.Filename))
fileInfo.Name = fmt.Sprintf("%d_%s", i, header.Filename)
if !this.util.FileExists(outPath) {
break
}
}
}
log.Info(fmt.Sprintf("upload: %s", outPath))
if outFile, err = os.Create(outPath); err != nil {
return fileInfo, err
}
defer outFile.Close()
if err != nil {
log.Error(err)
return fileInfo, errors.New("(error)fail," + err.Error())
}
if _, err = io.Copy(outFile, file); err != nil {
log.Error(err)
return fileInfo, errors.New("(error)fail," + err.Error())
}
if fi, err = outFile.Stat(); err != nil {
log.Error(err)
return fileInfo, errors.New("(error)fail," + err.Error())
} else {
fileInfo.Size = fi.Size()
}
if fi.Size() != header.Size {
return fileInfo, errors.New("(error)file uncomplete")
}
v := "" // this.util.GetFileSum(outFile, Config().FileSumArithmetic)
if Config().EnableDistinctFile {
v = this.util.GetFileSum(outFile, Config().FileSumArithmetic)
} else {
v = this.util.MD5(this.GetFilePathByInfo(fileInfo, false))
}
fileInfo.Md5 = v
//fileInfo.Path = folder //strings.Replace( folder,DOCKER_DIR,"",1)
fileInfo.Path = strings.Replace(folder, DOCKER_DIR, "", 1)
fileInfo.Peers = append(fileInfo.Peers, this.host)
//fmt.Println("upload",fileInfo)
return fileInfo, nil
}
func (this *Server) Upload(w http.ResponseWriter, r *http.Request) {
var (
err error
fn string
folder string
fpTmp *os.File
fpBody *os.File
)
if r.Method == http.MethodGet {
this.upload(w, r)
return
}
folder = STORE_DIR + "/_tmp/" + time.Now().Format("20060102")
if !this.util.FileExists(folder) {
if err = os.MkdirAll(folder, 0777); err != nil {
log.Error(err)
}
}
fn = folder + "/" + this.util.GetUUID()
fpTmp, err = os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0777)
if err != nil {
log.Error(err)
w.Write([]byte(err.Error()))
return
}
if _, err = io.Copy(fpTmp, r.Body); err != nil {
log.Error(err)
w.Write([]byte(err.Error()))
return
}
fpTmp.Close()
if fpBody, err = os.Open(fn); err != nil {
log.Error(err)
w.Write([]byte(err.Error()))
return
}
r.Body = fpBody
defer func() {
err = fpBody.Close()
if err != nil {
log.Error(err)
}
err = os.Remove(fn)
if err != nil {
log.Error(err)
}
}()
done := make(chan bool, 1)
this.queueUpload <- WrapReqResp{&w, r, done}
<-done
}
func (this *Server) upload(w http.ResponseWriter, r *http.Request) {
var (
err error
ok bool
// pathname string
md5sum string
fileName string
fileInfo FileInfo
uploadFile multipart.File
uploadHeader *multipart.FileHeader
scene string
output string
fileResult FileResult
result JsonResult
data []byte
code string
secret interface{}
msg string
)
output = r.FormValue("output")
if Config().EnableCrossOrigin {
this.CrossOrigin(w, r)
if r.Method == http.MethodOptions {
return
}
}
result.Status = "fail"
if Config().AuthUrl != "" {
if !this.CheckAuth(w, r) {
msg = "auth fail"
log.Warn(msg, r.Form)
this.NotPermit(w, r)
result.Message = msg
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
}
if r.Method == http.MethodPost {
md5sum = r.FormValue("md5")
fileName = r.FormValue("filename")
output = r.FormValue("output")
if Config().ReadOnly {
msg = "(error) readonly"
result.Message = msg
log.Warn(msg)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if Config().EnableCustomPath {
fileInfo.Path = r.FormValue("path")
fileInfo.Path = strings.Trim(fileInfo.Path, "/")
}
scene = r.FormValue("scene")
code = r.FormValue("code")
if scene == "" {
//Just for Compatibility
scene = r.FormValue("scenes")
}
if Config().EnableGoogleAuth && scene != "" {
if secret, ok = this.sceneMap.GetValue(scene); ok {
if !this.VerifyGoogleCode(secret.(string), code, int64(Config().DownloadTokenExpire/30)) {
this.NotPermit(w, r)
msg = "invalid request,error google code"
result.Message = msg
log.Error(msg)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
}
}
fileInfo.Md5 = md5sum
fileInfo.ReName = fileName
fileInfo.OffSet = -1
if uploadFile, uploadHeader, err = r.FormFile("file"); err != nil {
log.Error(err)
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
fileInfo.Peers = []string{}
fileInfo.TimeStamp = time.Now().Unix()
if scene == "" {
scene = Config().DefaultScene
}
if output == "" {
output = "text"
}
if !this.util.Contains(output, []string{"json", "text", "json2"}) {
msg = "output just support json or text or json2"
result.Message = msg
log.Warn(msg)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
fileInfo.Scene = scene
if _, err = this.CheckScene(scene); err != nil {
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
log.Error(err)
return
}
if err != nil {
log.Error(err)
http.Redirect(w, r, "/", http.StatusMovedPermanently)
return
}
if _, err = this.SaveUploadFile(uploadFile, uploadHeader, &fileInfo, r); err != nil {
result.Message = err.Error()
log.Error(err)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if Config().EnableDistinctFile {
if v, _ := this.GetFileInfoFromLevelDB(fileInfo.Md5); v != nil && v.Md5 != "" {
fileResult = this.BuildFileResult(v, r)
if Config().RenameFile {
os.Remove(DOCKER_DIR + fileInfo.Path + "/" + fileInfo.ReName)
} else {
os.Remove(DOCKER_DIR + fileInfo.Path + "/" + fileInfo.Name)
}
if output == "json" || output == "json2" {
if output == "json2" {
result.Data = fileResult
result.Status = "ok"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
w.Write([]byte(this.util.JsonEncodePretty(fileResult)))
} else {
w.Write([]byte(fileResult.Url))
}
return
}
}
if fileInfo.Md5 == "" {
msg = " fileInfo.Md5 is null"
log.Warn(msg)
result.Message = msg
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if md5sum != "" && fileInfo.Md5 != md5sum {
msg = " fileInfo.Md5 and md5sum !="
log.Warn(msg)
result.Message = msg
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if !Config().EnableDistinctFile {
// bugfix filecount stat
fileInfo.Md5 = this.util.MD5(this.GetFilePathByInfo(&fileInfo, false))
}
if Config().EnableMergeSmallFile && fileInfo.Size < CONST_SMALL_FILE_SIZE {
if err = this.SaveSmallFile(&fileInfo); err != nil {
log.Error(err)
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
}
this.saveFileMd5Log(&fileInfo, CONST_FILE_Md5_FILE_NAME) //maybe slow
go this.postFileToPeer(&fileInfo)
if fileInfo.Size <= 0 {
msg = "file size is zero"
result.Message = msg
w.Write([]byte(this.util.JsonEncodePretty(result)))
log.Error(msg)
return
}
fileResult = this.BuildFileResult(&fileInfo, r)
if output == "json" || output == "json2" {
if output == "json2" {
result.Data = fileResult
result.Status = "ok"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
w.Write([]byte(this.util.JsonEncodePretty(fileResult)))
} else {
w.Write([]byte(fileResult.Url))
}
return
} else {
md5sum = r.FormValue("md5")
output = r.FormValue("output")
if md5sum == "" {
msg = "(error) if you want to upload fast md5 is require" +
",and if you want to upload file,you must use post method "
result.Message = msg
log.Error(msg)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if v, _ := this.GetFileInfoFromLevelDB(md5sum); v != nil && v.Md5 != "" {
fileResult = this.BuildFileResult(v, r)
result.Data = fileResult
result.Status = "ok"
}
if output == "json" || output == "json2" {
if data, err = json.Marshal(fileResult); err != nil {
log.Error(err)
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if output == "json2" {
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
w.Write(data)
} else {
w.Write([]byte(fileResult.Url))
}
}
}
func (this *Server) SaveSmallFile(fileInfo *FileInfo) error {
var (
err error
filename string
fpath string
srcFile *os.File
desFile *os.File
largeDir string
destPath string
reName string
fileExt string
)
filename = fileInfo.Name
fileExt = path.Ext(filename)
if fileInfo.ReName != "" {
filename = fileInfo.ReName
}
fpath = DOCKER_DIR + fileInfo.Path + "/" + filename
largeDir = LARGE_DIR + "/" + Config().PeerId
if !this.util.FileExists(largeDir) {
os.MkdirAll(largeDir, 0775)
}
reName = fmt.Sprintf("%d", this.util.RandInt(100, 300))
destPath = largeDir + "/" + reName
this.lockMap.LockKey(destPath)
defer this.lockMap.UnLockKey(destPath)
if this.util.FileExists(fpath) {
srcFile, err = os.OpenFile(fpath, os.O_CREATE|os.O_RDONLY, 06666)
if err != nil {
return err
}
defer srcFile.Close()
desFile, err = os.OpenFile(destPath, os.O_CREATE|os.O_RDWR, 06666)
if err != nil {
return err
}
defer desFile.Close()
fileInfo.OffSet, err = desFile.Seek(0, 2)
if _, err = desFile.Write([]byte("1")); err != nil {
//first byte set 1
return err
}
fileInfo.OffSet, err = desFile.Seek(0, 2)
if err != nil {
return err
}
fileInfo.OffSet = fileInfo.OffSet - 1 //minus 1 byte
fileInfo.Size = fileInfo.Size + 1
fileInfo.ReName = fmt.Sprintf("%s,%d,%d,%s", reName, fileInfo.OffSet, fileInfo.Size, fileExt)
if _, err = io.Copy(desFile, srcFile); err != nil {
return err
}
srcFile.Close()
os.Remove(fpath)
fileInfo.Path = strings.Replace(largeDir, DOCKER_DIR, "", 1)
}
return nil
}
func (this *Server) SendToMail(to, subject, body, mailtype string) error {
host := Config().Mail.Host
user := Config().Mail.User
password := Config().Mail.Password
hp := strings.Split(host, ":")
auth := smtp.PlainAuth("", user, password, hp[0])
var contentType string
if mailtype == "html" {
contentType = "Content-Type: text/" + mailtype + "; charset=UTF-8"
} else {
contentType = "Content-Type: text/plain" + "; charset=UTF-8"
}
msg := []byte("To: " + to + "\r\nFrom: " + user + ">\r\nSubject: " + "\r\n" + contentType + "\r\n\r\n" + body)
sendTo := strings.Split(to, ";")
err := smtp.SendMail(host, auth, user, sendTo, msg)
return err
}
func (this *Server) BenchMark(w http.ResponseWriter, r *http.Request) {
t := time.Now()
batch := new(leveldb.Batch)
for i := 0; i < 100000000; i++ {
f := FileInfo{}
f.Peers = []string{"http://192.168.0.1", "http://192.168.2.5"}
f.Path = "20190201/19/02"
s := strconv.Itoa(i)
s = this.util.MD5(s)
f.Name = s
f.Md5 = s
if data, err := json.Marshal(&f); err == nil {
batch.Put([]byte(s), data)
}
if i%10000 == 0 {
if batch.Len() > 0 {
server.ldb.Write(batch, nil)
// batch = new(leveldb.Batch)
batch.Reset()
}
fmt.Println(i, time.Since(t).Seconds())
}
//fmt.Println(server.GetFileInfoFromLevelDB(s))
}
this.util.WriteFile("time.txt", time.Since(t).String())
fmt.Println(time.Since(t).String())
}
func (this *Server) RepairStatWeb(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
date string
inner string
)
if !this.IsPeer(r) {
result.Message = this.GetClusterNotPermitMessage(r)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
date = r.FormValue("date")
inner = r.FormValue("inner")
if ok, err := regexp.MatchString("\\d{8}", date); err != nil || !ok {
result.Message = "invalid date"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if date == "" || len(date) != 8 {
date = this.util.GetToDay()
}
if inner != "1" {
for _, peer := range Config().Peers {
req := httplib.Post(peer + this.getRequestURI("repair_stat"))
req.Param("inner", "1")
req.Param("date", date)
if _, err := req.String(); err != nil {
log.Error(err)
}
}
}
result.Data = this.RepairStatByDate(date)
result.Status = "ok"
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
func (this *Server) Stat(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
inner string
echart string
category []string
barCount []int64
barSize []int64
dataMap map[string]interface{}
)
if !this.IsPeer(r) {
result.Message = this.GetClusterNotPermitMessage(r)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
r.ParseForm()
inner = r.FormValue("inner")
echart = r.FormValue("echart")
data := this.GetStat()
result.Status = "ok"
result.Data = data
if echart == "1" {
dataMap = make(map[string]interface{}, 3)
for _, v := range data {
barCount = append(barCount, v.FileCount)
barSize = append(barSize, v.TotalSize)
category = append(category, v.Date)
}
dataMap["category"] = category
dataMap["barCount"] = barCount
dataMap["barSize"] = barSize
result.Data = dataMap
}
if inner == "1" {
w.Write([]byte(this.util.JsonEncodePretty(data)))
} else {
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
}
func (this *Server) GetStat() []StatDateFileInfo {
var (
min int64
max int64
err error
i int64
rows []StatDateFileInfo
total StatDateFileInfo
)
min = 20190101
max = 20190101
for k := range this.statMap.Get() {
ks := strings.Split(k, "_")
if len(ks) == 2 {
if i, err = strconv.ParseInt(ks[0], 10, 64); err != nil {
continue
}
if i >= max {
max = i
}
if i < min {
min = i
}
}
}
for i := min; i <= max; i++ {
s := fmt.Sprintf("%d", i)
if v, ok := this.statMap.GetValue(s + "_" + CONST_STAT_FILE_TOTAL_SIZE_KEY); ok {
var info StatDateFileInfo
info.Date = s
switch v.(type) {
case int64:
info.TotalSize = v.(int64)
total.TotalSize = total.TotalSize + v.(int64)
}
if v, ok := this.statMap.GetValue(s + "_" + CONST_STAT_FILE_COUNT_KEY); ok {
switch v.(type) {
case int64:
info.FileCount = v.(int64)
total.FileCount = total.FileCount + v.(int64)
}
}
rows = append(rows, info)
}
}
total.Date = "all"
rows = append(rows, total)
return rows
}
func (this *Server) RegisterExit() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
for s := range c {
switch s {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
this.ldb.Close()
log.Info("Exit", s)
os.Exit(1)
}
}
}()
}
func (this *Server) AppendToQueue(fileInfo *FileInfo) {
for (len(this.queueToPeers) + CONST_QUEUE_SIZE/10) > CONST_QUEUE_SIZE {
time.Sleep(time.Millisecond * 50)
}
this.queueToPeers <- *fileInfo
}
func (this *Server) AppendToDownloadQueue(fileInfo *FileInfo) {
for (len(this.queueFromPeers) + CONST_QUEUE_SIZE/10) > CONST_QUEUE_SIZE {
time.Sleep(time.Millisecond * 50)
}
this.queueFromPeers <- *fileInfo
}
func (this *Server) ConsumerDownLoad() {
ConsumerFunc := func() {
for {
fileInfo := <-this.queueFromPeers
if len(fileInfo.Peers) <= 0 {
log.Warn("Peer is null", fileInfo)
continue
}
for _, peer := range fileInfo.Peers {
if strings.Contains(peer, "127.0.0.1") {
log.Warn("sync error with 127.0.0.1", fileInfo)
continue
}
if peer != this.host {
this.DownloadFromPeer(peer, &fileInfo)
break
}
}
}
}
for i := 0; i < Config().SyncWorker; i++ {
go ConsumerFunc()
}
}
func (this *Server) RemoveDownloading() {
RemoveDownloadFunc := func() {
for {
iter := this.ldb.NewIterator(util.BytesPrefix([]byte("downloading_")), nil)
for iter.Next() {
key := iter.Key()
keys := strings.Split(string(key), "_")
if len(keys) == 3 {
if t, err := strconv.ParseInt(keys[1], 10, 64); err == nil && time.Now().Unix()-t > 60*10 {
os.Remove(DOCKER_DIR + keys[2])
}
}
}
iter.Release()
time.Sleep(time.Minute * 3)
}
}
go RemoveDownloadFunc()
}
func (this *Server) ConsumerLog() {
go func() {
var (
fileLog *FileLog
)
for {
fileLog = <-this.queueFileLog
this.saveFileMd5Log(fileLog.FileInfo, fileLog.FileName)
}
}()
}
func (this *Server) LoadSearchDict() {
go func() {
log.Info("Load search dict ....")
f, err := os.Open(CONST_SEARCH_FILE_NAME)
if err != nil {
log.Error(err)
return
}
defer f.Close()
r := bufio.NewReader(f)
for {
line, isprefix, err := r.ReadLine()
for isprefix && err == nil {
kvs := strings.Split(string(line), "\t")
if len(kvs) == 2 {
this.searchMap.Put(kvs[0], kvs[1])
}
}
}
log.Info("finish load search dict")
}()
}
func (this *Server) SaveSearchDict() {
var (
err error
fp *os.File
searchDict map[string]interface{}
k string
v interface{}
)
this.lockMap.LockKey(CONST_SEARCH_FILE_NAME)
defer this.lockMap.UnLockKey(CONST_SEARCH_FILE_NAME)
searchDict = this.searchMap.Get()
fp, err = os.OpenFile(CONST_SEARCH_FILE_NAME, os.O_RDWR, 0755)
if err != nil {
log.Error(err)
return
}
defer fp.Close()
for k, v = range searchDict {
fp.WriteString(fmt.Sprintf("%s\t%s", k, v.(string)))
}
}
func (this *Server) ConsumerPostToPeer() {
ConsumerFunc := func() {
for {
fileInfo := <-this.queueToPeers
this.postFileToPeer(&fileInfo)
}
}
for i := 0; i < Config().SyncWorker; i++ {
go ConsumerFunc()
}
}
func (this *Server) ConsumerUpload() {
ConsumerFunc := func() {
for {
wr := <-this.queueUpload
this.upload(*wr.w, wr.r)
this.rtMap.AddCountInt64(CONST_UPLOAD_COUNTER_KEY, wr.r.ContentLength)
if v, ok := this.rtMap.GetValue(CONST_UPLOAD_COUNTER_KEY); ok {
if v.(int64) > 1*1024*1024*1024 {
var _v int64
this.rtMap.Put(CONST_UPLOAD_COUNTER_KEY, _v)
debug.FreeOSMemory()
}
}
wr.done <- true
}
}
for i := 0; i < Config().UploadWorker; i++ {
go ConsumerFunc()
}
}
func (this *Server) AutoRepair(forceRepair bool) {
if this.lockMap.IsLock("AutoRepair") {
log.Warn("Lock AutoRepair")
return
}
this.lockMap.LockKey("AutoRepair")
defer this.lockMap.UnLockKey("AutoRepair")
AutoRepairFunc := func(forceRepair bool) {
var (
dateStats []StatDateFileInfo
err error
countKey string
md5s string
localSet mapset.Set
remoteSet mapset.Set
allSet mapset.Set
tmpSet mapset.Set
fileInfo *FileInfo
)
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("AutoRepair")
log.Error(re)
log.Error(string(buffer))
}
}()
Update := func(peer string, dateStat StatDateFileInfo) {
//从远端拉数据过来
req := httplib.Get(fmt.Sprintf("%s%s?date=%s&force=%s", peer, this.getRequestURI("sync"), dateStat.Date, "1"))
req.SetTimeout(time.Second*5, time.Second*5)
if _, err = req.String(); err != nil {
log.Error(err)
}
log.Info(fmt.Sprintf("syn file from %s date %s", peer, dateStat.Date))
}
for _, peer := range Config().Peers {
req := httplib.Post(fmt.Sprintf("%s%s", peer, this.getRequestURI("stat")))
req.Param("inner", "1")
req.SetTimeout(time.Second*5, time.Second*15)
if err = req.ToJSON(&dateStats); err != nil {
log.Error(err)
continue
}
for _, dateStat := range dateStats {
if dateStat.Date == "all" {
continue
}
countKey = dateStat.Date + "_" + CONST_STAT_FILE_COUNT_KEY
if v, ok := this.statMap.GetValue(countKey); ok {
switch v.(type) {
case int64:
if v.(int64) != dateStat.FileCount || forceRepair {
//不相等,找差异
//TODO
req := httplib.Post(fmt.Sprintf("%s%s", peer, this.getRequestURI("get_md5s_by_date")))
req.SetTimeout(time.Second*15, time.Second*60)
req.Param("date", dateStat.Date)
if md5s, err = req.String(); err != nil {
continue
}
if localSet, err = this.GetMd5sByDate(dateStat.Date, CONST_FILE_Md5_FILE_NAME); err != nil {
log.Error(err)
continue
}
remoteSet = this.util.StrToMapSet(md5s, ",")
allSet = localSet.Union(remoteSet)
md5s = this.util.MapSetToStr(allSet.Difference(localSet), ",")
req = httplib.Post(fmt.Sprintf("%s%s", peer, this.getRequestURI("receive_md5s")))
req.SetTimeout(time.Second*15, time.Second*60)
req.Param("md5s", md5s)
req.String()
tmpSet = allSet.Difference(remoteSet)
for v := range tmpSet.Iter() {
if v != nil {
if fileInfo, err = this.GetFileInfoFromLevelDB(v.(string)); err != nil {
log.Error(err)
continue
}
this.AppendToQueue(fileInfo)
}
}
//Update(peer,dateStat)
}
}
} else {
Update(peer, dateStat)
}
}
}
}
AutoRepairFunc(forceRepair)
}
func (this *Server) CleanLogLevelDBByDate(date string, filename string) {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("CleanLogLevelDBByDate")
log.Error(re)
log.Error(string(buffer))
}
}()
var (
err error
keyPrefix string
keys mapset.Set
)
keys = mapset.NewSet()
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, filename)
iter := server.logDB.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
for iter.Next() {
keys.Add(string(iter.Value()))
}
iter.Release()
for key := range keys.Iter() {
err = this.RemoveKeyFromLevelDB(key.(string), this.logDB)
if err != nil {
log.Error(err)
}
}
}
func (this *Server) CleanAndBackUp() {
Clean := func() {
var (
filenames []string
yesterday string
)
if this.curDate != this.util.GetToDay() {
filenames = []string{CONST_Md5_QUEUE_FILE_NAME, CONST_Md5_ERROR_FILE_NAME, CONST_REMOME_Md5_FILE_NAME}
yesterday = this.util.GetDayFromTimeStamp(time.Now().AddDate(0, 0, -1).Unix())
for _, filename := range filenames {
this.CleanLogLevelDBByDate(yesterday, filename)
}
this.BackUpMetaDataByDate(yesterday)
this.curDate = this.util.GetToDay()
}
}
go func() {
for {
time.Sleep(time.Hour * 6)
Clean()
}
}()
}
func (this *Server) LoadFileInfoByDate(date string, filename string) (mapset.Set, error) {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("LoadFileInfoByDate")
log.Error(re)
log.Error(string(buffer))
}
}()
var (
err error
keyPrefix string
fileInfos mapset.Set
)
fileInfos = mapset.NewSet()
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, filename)
iter := server.logDB.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
for iter.Next() {
var fileInfo FileInfo
if err = json.Unmarshal(iter.Value(), &fileInfo); err != nil {
continue
}
fileInfos.Add(&fileInfo)
}
iter.Release()
return fileInfos, nil
}
func (this *Server) LoadQueueSendToPeer() {
if queue, err := this.LoadFileInfoByDate(this.util.GetToDay(), CONST_Md5_QUEUE_FILE_NAME); err != nil {
log.Error(err)
} else {
for fileInfo := range queue.Iter() {
//this.queueFromPeers <- *fileInfo.(*FileInfo)
this.AppendToDownloadQueue(fileInfo.(*FileInfo))
}
}
}
func (this *Server) CheckClusterStatus() {
check := func() {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("CheckClusterStatus")
log.Error(re)
log.Error(string(buffer))
}
}()
var (
status JsonResult
err error
subject string
body string
req *httplib.BeegoHTTPRequest
data []byte
)
for _, peer := range Config().Peers {
req = httplib.Get(fmt.Sprintf("%s%s", peer, this.getRequestURI("status")))
req.SetTimeout(time.Second*5, time.Second*5)
err = req.ToJSON(&status)
if err != nil || status.Status != "ok" {
for _, to := range Config().AlarmReceivers {
subject = "fastdfs server error"
if err != nil {
body = fmt.Sprintf("%s\nserver:%s\nerror:\n%s", subject, peer, err.Error())
} else {
body = fmt.Sprintf("%s\nserver:%s\n", subject, peer)
}
if err = this.SendToMail(to, subject, body, "text"); err != nil {
log.Error(err)
}
}
if Config().AlarmUrl != "" {
req = httplib.Post(Config().AlarmUrl)
req.SetTimeout(time.Second*10, time.Second*10)
req.Param("message", body)
req.Param("subject", subject)
if _, err = req.String(); err != nil {
log.Error(err)
}
}
log.Error(err)
} else {
var statusMap map[string]interface{}
if data, err = json.Marshal(status.Data); err != nil {
log.Error(err)
return
}
if err = json.Unmarshal(data, &statusMap); err != nil {
log.Error(err)
}
if v, ok := statusMap["Fs.PeerId"]; ok {
if v == Config().PeerId {
log.Error(fmt.Sprintf("PeerId is confict:%s", v))
}
}
if v, ok := statusMap["Fs.Local"]; ok {
if v == Config().Host {
log.Error(fmt.Sprintf("Host is confict:%s", v))
}
}
}
}
}
check()
go func() {
for {
time.Sleep(time.Minute * 10)
check()
}
}()
}
func (this *Server) RepairFileInfo(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
)
if !this.IsPeer(r) {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
if !Config().EnableMigrate {
w.Write([]byte("please set enable_migrate=true"))
return
}
result.Status = "ok"
result.Message = "repair job start,don't try again,very danger "
go this.RepairFileInfoFromFile()
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
func (this *Server) Reload(w http.ResponseWriter, r *http.Request) {
var (
err error
data []byte
cfg GloablConfig
action string
cfgjson string
result JsonResult
)
result.Status = "fail"
r.ParseForm()
if !this.IsPeer(r) {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
cfgjson = r.FormValue("cfg")
action = r.FormValue("action")
_ = cfgjson
if action == "get" {
result.Data = Config()
result.Status = "ok"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if action == "set" {
if cfgjson == "" {
result.Message = "(error)parameter cfg(json) require"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if err = json.Unmarshal([]byte(cfgjson), &cfg); err != nil {
log.Error(err)
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
result.Status = "ok"