123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061506250635064506550665067506850695070507150725073507450755076507750785079508050815082508350845085508650875088508950905091509250935094509550965097509850995100510151025103510451055106510751085109511051115112511351145115511651175118511951205121512251235124512551265127512851295130513151325133513451355136513751385139514051415142514351445145514651475148514951505151515251535154515551565157515851595160516151625163516451655166516751685169517051715172517351745175517651775178517951805181518251835184518551865187518851895190519151925193519451955196519751985199520052015202520352045205520652075208520952105211521252135214521552165217521852195220522152225223522452255226522752285229523052315232523352345235523652375238523952405241524252435244524552465247524852495250525152525253525452555256525752585259526052615262526352645265526652675268526952705271527252735274527552765277527852795280528152825283528452855286528752885289529052915292529352945295529652975298529953005301530253035304530553065307530853095310531153125313531453155316531753185319532053215322532353245325532653275328532953305331533253335334533553365337533853395340534153425343534453455346534753485349535053515352535353545355535653575358535953605361536253635364536553665367536853695370537153725373537453755376537753785379538053815382538353845385538653875388538953905391539253935394539553965397539853995400540154025403540454055406540754085409541054115412541354145415541654175418541954205421542254235424542554265427542854295430543154325433543454355436543754385439544054415442544354445445544654475448544954505451545254535454545554565457545854595460546154625463546454655466546754685469547054715472547354745475547654775478547954805481548254835484548554865487548854895490549154925493549454955496549754985499550055015502550355045505550655075508550955105511551255135514 |
- # sys.path.append("/data")
- from BaseDataMaintenance.dataSource.source import getConnect_activateMQ_ali
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from BaseDataMaintenance.common.multiProcess import MultiHandler
- from queue import Queue
- from multiprocessing import Queue as PQueue
- from BaseDataMaintenance.model.ots.document_tmp import *
- from BaseDataMaintenance.model.ots.attachment import *
- from BaseDataMaintenance.model.ots.document_html import *
- from BaseDataMaintenance.model.ots.document_extract2 import *
- from BaseDataMaintenance.model.ots.project import *
- from BaseDataMaintenance.model.ots.project2_tmp import *
- from BaseDataMaintenance.model.ots.document import *
- from BaseDataMaintenance.model.ots.project_process import *
- import base64
- from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
- from uuid import uuid4
- from BaseDataMaintenance.common.ossUtils import *
- from BaseDataMaintenance.dataSource.source import is_internal,getAuth
- from apscheduler.schedulers.blocking import BlockingScheduler
- from BaseDataMaintenance.maintenance.dataflow_settings import *
- from threading import Thread
- import oss2
- from BaseDataMaintenance.maxcompute.documentDumplicate import *
- from BaseDataMaintenance.maxcompute.documentMerge import *
- from BaseDataMaintenance.common.otsUtils import *
- from BaseDataMaintenance.common.activateMQUtils import *
- from BaseDataMaintenance.dataMonitor.data_monitor import BaseDataMonitor
- from BaseDataMaintenance.dataSource.pool import ConnectorPool
- def getSet(list_dict,key):
- _set = set()
- for item in list_dict:
- if key in item:
- if item[key]!='' and item[key] is not None:
- if re.search("^\d[\d\.]*$",item[key]) is not None:
- _set.add(str(float(item[key])))
- else:
- _set.add(str(item[key]))
- return _set
- def getSimilarityOfString(str1,str2):
- _set1 = set()
- _set2 = set()
- if str1 is not None:
- for i in range(1,len(str1)):
- _set1.add(str1[i-1:i+1])
- if str2 is not None:
- for i in range(1,len(str2)):
- _set2.add(str2[i-1:i+1])
- _len = max(1,min(len(_set1),len(_set2)))
- return len(_set1&_set2)/_len
- def getDiffIndex(list_dict,key,confidence=100):
- _set = set()
- for _i in range(len(list_dict)):
- item = list_dict[_i]
- if item["confidence"]>=confidence:
- continue
- if key in item:
- if item[key]!='' and item[key] is not None:
- if re.search("^\d+(\.\d+)?$",item[key]) is not None:
- _set.add(str(float(item[key])))
- else:
- _set.add(str(item[key]))
- if len(_set)>1:
- return _i
- return len(list_dict)
- def transformSWF(bucket,attachment_hub_url,objectPath,localpath,swf_dir):
- swf_urls = []
- try:
- list_files = os.listdir(swf_dir)
- list_files.sort(key=lambda x:x)
- headers = dict()
- headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PUBLIC_READ
- for _file in list_files:
- swf_localpath = "%s/%s"%(swf_dir,_file)
- swf_objectPath = "%s/%s"%(objectPath.split(".")[0],_file)
- uploadFileByPath(bucket,swf_localpath,swf_objectPath,headers)
- _url = "%s/%s"%(attachment_hub_url,swf_objectPath)
- swf_urls.append(_url)
- os.remove(swf_localpath)
- except Exception as e:
- traceback.print_exc()
- return swf_urls
- class Dataflow():
- def __init__(self):
- self.ots_client = getConnect_ots()
- self.queue_init = Queue()
- self.queue_attachment = Queue()
- self.queue_attachment_ocr = Queue()
- self.queue_attachment_not_ocr = Queue()
- self.list_attachment_ocr = []
- self.list_attachment_not_ocr = []
- self.queue_extract = Queue()
- self.list_extract = []
- self.queue_dumplicate = PQueue()
- self.queue_dumplicate_processed = PQueue()
- self.dumplicate_set = set()
- self.queue_merge = Queue()
- self.queue_syncho = Queue()
- self.queue_remove = Queue()
- self.queue_remove_project = Queue()
- self.attachment_rec_interface = ""
- self.ots_client_merge = getConnect_ots()
- if is_internal:
- self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
- else:
- self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
- if is_internal:
- self.extract_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
- self.industy_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
- self.other_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
- else:
- self.extract_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
- self.industy_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
- self.other_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
- self.header = {'Content-Type': 'application/json',"Authorization":"NzZmOWZlMmU2MGY3YmQ4MDBjM2E5MDAyZjhjNjQ0MzZlMmE0NTMwZg=="}
- self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
- self.auth = getAuth()
- oss2.defaults.connection_pool_size = 100
- oss2.defaults.multiget_num_threads = 20
- log("bucket_url:%s"%(self.bucket_url))
- self.attachment_bucket_name = "attachment-hub"
- self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
- self.current_path = os.path.dirname(__file__)
- def flow_init(self):
- def producer():
- bool_query = BoolQuery(must_queries=[RangeQuery("crtime",'2022-04-20')])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- log("flow_init producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_init.put(_dict)
- _count = len(list_dict)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_init.put(_dict)
- _count += len(list_dict)
- def comsumer():
- mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
- mt.run()
- def comsumer_handle(item,result_queue,ots_client):
- _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
- if document_tmp_dochtmlcon in item:
- item.pop(document_tmp_dochtmlcon)
- if document_tmp_doctextcon in item:
- item.pop(document_tmp_doctextcon)
- if document_tmp_attachmenttextcon in item:
- item.pop(document_tmp_attachmenttextcon)
- _status = item.get(document_tmp_status)
- new_status = None
- if _status>=201 and _status<=300:
- item[document_tmp_save] = 1
- new_status = 81
- elif _status>=401 and _status<=450:
- item[document_tmp_save] = 0
- new_status = 81
- else:
- new_status = 1
- # new_status = 1
- item[document_tmp_status] = new_status
- dtmp = Document_tmp(item)
- dhtml = Document_html({document_tmp_partitionkey:item.get(document_tmp_partitionkey),
- document_tmp_docid:item.get(document_tmp_docid),
- document_tmp_dochtmlcon:_dochtmlcon})
- dtmp.update_row(ots_client)
- dhtml.update_row(ots_client)
- producer()
- comsumer()
- def getTitleFromHtml(self,filemd5,_html):
- _soup = BeautifulSoup(_html,"lxml")
- _find = _soup.find("a",attrs={"data":filemd5})
- _title = ""
- if _find is not None:
- _title = _find.get_text()
- return _title
- def getSourceLinkFromHtml(self,filemd5,_html):
- _soup = BeautifulSoup(_html,"lxml")
- _find = _soup.find("a",attrs={"filelink":filemd5})
- filelink = ""
- if _find is None:
- _find = _soup.find("img",attrs={"filelink":filemd5})
- if _find is not None:
- filelink = _find.attrs.get("src","")
- else:
- filelink = _find.attrs.get("href","")
- return filelink
- def request_attachment_interface(self,attach,_dochtmlcon):
- filemd5 = attach.getProperties().get(attachment_filemd5)
- _status = attach.getProperties().get(attachment_status)
- _filetype = attach.getProperties().get(attachment_filetype)
- _size = attach.getProperties().get(attachment_size)
- _path = attach.getProperties().get(attachment_path)
- _uuid = uuid4()
- objectPath = attach.getProperties().get(attachment_path)
- localpath = os.path.join(self.current_path,"download",_uuid.hex)
- docids = attach.getProperties().get(attachment_docids)
- try:
- if _size>ATTACHMENT_LARGESIZE:
- attach.setValue(attachment_status, ATTACHMENT_TOOLARGE)
- log("attachment :%s of path:%s to large"%(filemd5,_path))
- attach.update_row(self.ots_client)
- return True
- else:
- d_start_time = time.time()
- if downloadFile(self.bucket,objectPath,localpath):
- time_download = time.time()-d_start_time
- _data_base64 = base64.b64encode(open(localpath,"rb").read())
- #调用接口处理结果
- start_time = time.time()
- _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype,kwargs={"timeout":600})
- if _success:
- log("process filemd5:%s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
- else:
- log("attach interface failed of docid:%s filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
- # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
- _html = ""
- return False
- swf_images = eval(swf_images)
- if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
- swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
- if len(swf_urls)==0:
- objectPath = attach.getProperties().get(attachment_path,"")
- localpath = os.path.join(self.current_path,"download/%s.swf"%(uuid4().hex))
- swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
- if not os.path.exists(swf_dir):
- os.mkdir(swf_dir)
- for _i in range(len(swf_images)):
- _base = swf_images[_i]
- _base = base64.b64decode(_base)
- filename = "swf_page_%d.png"%(_i)
- filepath = os.path.join(swf_dir,filename)
- with open(filepath,"wb") as f:
- f.write(_base)
- swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
- if os.path.exists(swf_dir):
- os.rmdir(swf_dir)
- attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
- if re.search("<td",_html) is not None:
- attach.setValue(attachment_has_table,1,True)
- _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
- filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
- if _file_title!="":
- attach.setValue(attachment_file_title,_file_title,True)
- if filelink!="":
- attach.setValue(attachment_file_link,filelink,True)
- attach.setValue(attachment_attachmenthtml,_html,True)
- attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
- attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
- attach.setValue(attachment_recsize,len(_html),True)
- attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
- attach.update_row(self.ots_client) #线上再开放更新
- return True
- else:
- return False
- except oss2.exceptions.NotFound as e:
- return True
- except Exception as e:
- traceback.print_exc()
- finally:
- try:
- os.remove(localpath)
- except:
- pass
- def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
- list_html = []
- swf_urls = []
- for _attach in list_attach:
- #测试全跑
- if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
- _html = _attach.getProperties().get(attachment_attachmenthtml,"")
- if _html is None:
- _html = ""
- list_html.append(_html)
- else:
- _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
- if not _succeed:
- return False,"",[]
- _html = _attach.getProperties().get(attachment_attachmenthtml,"")
- if _html is None:
- _html = ""
- list_html.append(_html)
- if _attach.getProperties().get(attachment_filetype)=="swf":
- swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
- return True,list_html,swf_urls
- def generate_dumplicate_query(self,_dict,_dict_must_not,set_match=set(["project_code","project_codes","product"]),set_nested=set(["win_tenderer","bidding_budget","win_bid_price"]),
- set_term=set(["doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save","docid"]),
- set_range=set(["page_time","status"]),set_phrase=set(["doctitle","project_name"])):
- list_must_queries = []
- list_must_no_queries = []
- for k,v in _dict.items():
- if k in set_match:
- if isinstance(v,str):
- l_s = []
- for s_v in v.split(","):
- l_s.append(MatchQuery(k,s_v))
- list_must_queries.append(BoolQuery(should_queries=l_s))
- elif k in set_nested:
- _v = v
- if k!="":
- if k=="bidding_budget" or k=="win_bid_price":
- _v = float(_v)
- list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
- else:
- list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
- elif k in set_term:
- list_must_queries.append(TermQuery(k,v))
- elif k in set_phrase:
- list_must_queries.append(MatchPhraseQuery(k,v))
- elif k in set_range:
- if len(v)==1:
- list_must_queries.append(RangeQuery(k,v[0]))
- elif len(v)==2:
- list_must_queries.append(RangeQuery(k,v[0],v[1],True,True))
- for k,v in _dict_must_not.items():
- if k in set_match:
- if isinstance(v,str):
- l_s = []
- for s_v in v.split(","):
- l_s.append(MatchQuery(k,s_v))
- list_must_no_queries.append(BoolQuery(should_queries=l_s))
- elif k in set_nested:
- _v = v
- if k!="":
- if k=="bidding_budget" or k=="win_bid_price":
- _v = float(_v)
- list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
- else:
- list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
- elif k in set_term:
- list_must_no_queries.append(TermQuery(k,v))
- elif k in set_range:
- if len(v)==1:
- list_must_no_queries.append(RangeQuery(k,v[0]))
- elif len(v)==2:
- list_must_no_queries.append(RangeQuery(k,v[0],v[1],True,True))
- return BoolQuery(must_queries=list_must_queries,must_not_queries=list_must_no_queries)
- def f_decode_sub_docs_json(self, project_code,project_name,tenderee,agency,sub_docs_json):
- columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
- extract_count = 0
- if project_code is not None and project_code!="":
- extract_count += 1
- if project_name is not None and project_name!="":
- extract_count += 1
- if tenderee is not None and tenderee!="":
- extract_count += 1
- if agency is not None and agency!="":
- extract_count += 1
- if sub_docs_json is not None:
- try:
- sub_docs = json.loads(sub_docs_json)
- except Exception as e:
- sub_docs = []
- sub_docs.sort(key=lambda x:float(x.get("bidding_budget",0)),reverse=True)
- sub_docs.sort(key=lambda x:float(x.get("win_bid_price",0)),reverse=True)
- # log("==%s"%(str(sub_docs)))
- for sub_docs in sub_docs:
- for _key_sub_docs in sub_docs.keys():
- extract_count += 1
- if _key_sub_docs in columns:
- if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
- if _key_sub_docs in ["bidding_budget","win_bid_price"]:
- if float(sub_docs[_key_sub_docs])>0:
- columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
- else:
- columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
- return columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count
- def post_extract(self,_dict):
- win_tenderer,bidding_budget,win_bid_price,extract_count = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
- _dict["win_tenderer"] = win_tenderer
- _dict["bidding_budget"] = bidding_budget
- _dict["win_bid_price"] = win_bid_price
- if "extract_count" not in _dict:
- _dict["extract_count"] = extract_count
- def get_dump_columns(self,_dict):
- docchannel = _dict.get(document_tmp_docchannel,0)
- project_code = _dict.get(document_tmp_project_code,"")
- project_name = _dict.get(document_tmp_project_name,"")
- tenderee = _dict.get(document_tmp_tenderee,"")
- agency = _dict.get(document_tmp_agency,"")
- doctitle_refine = _dict.get(document_tmp_doctitle_refine,"")
- win_tenderer = _dict.get("win_tenderer","")
- bidding_budget = _dict.get("bidding_budget","")
- if bidding_budget==0:
- bidding_budget = ""
- win_bid_price = _dict.get("win_bid_price","")
- if win_bid_price==0:
- win_bid_price = ""
- page_time = _dict.get(document_tmp_page_time,"")
- fingerprint = _dict.get(document_tmp_fingerprint,"")
- product = _dict.get(document_tmp_product,"")
- return docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product
- def f_set_docid_limitNum_contain(self,item, _split,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"]):
- flag = True
- for _key in singleNum_keys:
- if len(getSet(_split,_key))>1:
- flag = False
- break
- for _key in multiNum_keys:
- if len(getSet(_split,_key))<=1:
- flag = False
- break
- project_code = item.get("project_code","")
- for _key in notlike_keys:
- if not flag:
- break
- for _d in _split:
- _key_v = _d.get(_key,"")
- _sim = getSimilarityOfString(project_code,_key_v)
- if _sim>0.7 and _sim<1:
- flag = False
- break
- #判断组内每条公告是否包含
- if flag:
- if len(contain_keys)>0:
- for _key in contain_keys:
- MAX_CONTAIN_COLUMN = None
- for _d in _split:
- contain_column = _d.get(_key)
- if contain_column is not None and contain_column !="":
- if MAX_CONTAIN_COLUMN is None:
- MAX_CONTAIN_COLUMN = contain_column
- else:
- if len(MAX_CONTAIN_COLUMN)<len(contain_column):
- if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
- flag = False
- break
- MAX_CONTAIN_COLUMN = contain_column
- else:
- if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
- flag = False
- break
- if flag:
- return _split
- return []
- def search_data_by_query(self,item,_query,confidence,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count,document_tmp_doctitle]):
- list_data = []
- if isinstance(_query,list):
- bool_query = BoolQuery(should_queries=_query)
- else:
- bool_query = _query
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=50,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.post_extract(_dict)
- _dict["confidence"] = confidence
- list_data.append(_dict)
- # _count = len(list_dict)
- # while next_token:
- # rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
- # SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- # ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- # list_dict = getRow_ots(rows)
- # for _dict in list_dict:
- # self.post_extract(_dict)
- # _dict["confidence"] = confidence
- # list_data.append(_dict)
- list_dict = self.f_set_docid_limitNum_contain(item,list_dict,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys)
- return list_dict
- def add_data_by_query(self,item,base_list,set_docid,_query,confidence,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
- list_dict = self.search_data_by_query(item,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
- for _dict in list_dict:
- self.post_extract(_dict)
- _docid = _dict.get(document_tmp_docid)
- if _docid not in set_docid:
- base_list.append(_dict)
- set_docid.add(_docid)
- def translate_dumplicate_rules(self,status_from,item):
- docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
- if page_time=='':
- page_time = getCurrent_date("%Y-%m-%d")
- base_dict = {
- "status":[status_from[0]],
- "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
- }
- must_not_dict = {"save":0}
- list_rules = []
- singleNum_keys = ["tenderee","win_tenderer"]
- if fingerprint!="":
- _dict = {}
- confidence = 100
- _dict[document_tmp_fingerprint] = fingerprint
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "item":item,
- "query":_query,
- "singleNum_keys":[],
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if docchannel in (52,118):
- if bidding_budget!="" and tenderee!="" and project_code!="":
- confidence = 90
- _dict = {document_tmp_docchannel:docchannel,
- "bidding_budget":item.get("bidding_budget"),
- document_tmp_tenderee:item.get(document_tmp_tenderee,""),
- document_tmp_project_code:item.get(document_tmp_project_code,"")
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if doctitle_refine!="" and tenderee!="" and bidding_budget!="":
- confidence = 80
- _dict = {document_tmp_docchannel:docchannel,
- "doctitle_refine":doctitle_refine,
- "tenderee":tenderee,
- bidding_budget:"bidding_budget"
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_code!="" and doctitle_refine!="" and agency!="" and bidding_budget!="":
- confidence = 90
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "doctitle_refine":doctitle_refine,
- "agency":agency,
- "bidding_budget":bidding_budget
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_code!="" and tenderee!="" and bidding_budget!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "tenderee":tenderee,
- "bidding_budget":bidding_budget
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if doctitle_refine!="" and agency!="" and bidding_budget!="":
- confidence = 71
- _dict = {document_tmp_docchannel:docchannel,
- "doctitle_refine":doctitle_refine,
- "agency":agency,
- "bidding_budget":bidding_budget
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_code!="" and project_name!="" and agency!="" and bidding_budget!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "project_name":project_name,
- "agency":agency,
- "bidding_budget":bidding_budget
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- n_singleKeys = [i for i in singleNum_keys]
- n_singleKeys.append(document_tmp_web_source_no)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":n_singleKeys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- ##-- 5. 招标公告 - 同项目编号- 同[项目名称、标题] - 同[招标人、代理公司] - 同预算(!=0) - 同信息源=1
- if project_code!="" and project_name!="" and tenderee!="" and bidding_budget!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "project_name":project_name,
- "tenderee":tenderee,
- "bidding_budget":bidding_budget
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- n_singleKeys = [i for i in singleNum_keys]
- n_singleKeys.append(document_tmp_web_source_no)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":n_singleKeys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if project_code!="" and doctitle_refine!="" and tenderee!="" and bidding_budget!="":
- confidence = 71
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "doctitle_refine":doctitle_refine,
- "tenderee":tenderee,
- "bidding_budget":bidding_budget
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
- if project_name!="" and agency!="":
- tmp_bidding = 0
- if bidding_budget!="":
- tmp_bidding = bidding_budget
- confidence = 51
- _dict = {document_tmp_docchannel:docchannel,
- "project_name":project_name,
- "agency":agency,
- "bidding_budget":tmp_bidding
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
- if project_code!="" and agency!="":
- tmp_bidding = 0
- if bidding_budget!="":
- tmp_bidding = bidding_budget
- confidence = 51
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "agency":agency,
- "bidding_budget":tmp_bidding
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if docchannel not in (101,119,120):
- #-- 7. 非中标公告 - 同项目名称 - 同发布日期 - 同招标人 - 同预算 - 同类型 - 信息源>1 - 同项目编号
- if project_name!="" and tenderee!="" and project_code!="":
- tmp_bidding = 0
- if bidding_budget!="":
- tmp_bidding = bidding_budget
- confidence = 51
- _dict = {document_tmp_docchannel:docchannel,
- "project_name":project_name,
- "tenderee":tenderee,
- "project_code":project_code
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if docchannel in (101,119,120):
- #-- 3. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(==0)
- if project_code!="" and project_name!="" and win_tenderer!="":
- tmp_win = 0
- if win_bid_price!="":
- tmp_win = win_bid_price
- confidence = 61
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "project_name":project_name,
- "win_tenderer":win_tenderer,
- "win_bid_price":tmp_win
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if project_code!="" and project_name!="" and bidding_budget!="" and product!="":
- confidence = 72
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "project_name":project_name,
- "bidding_budget":bidding_budget,
- "product":product
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- n_singleKeys = [i for i in singleNum_keys]
- n_singleKeys.append(document_tmp_web_source_no)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":n_singleKeys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if project_code!='' and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "doctitle_refine":doctitle_refine,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- n_singleKeys = [i for i in singleNum_keys]
- n_singleKeys.append(document_tmp_web_source_no)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":n_singleKeys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- ##-- 2. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(!=0) - 同信息源=1
- if project_code!="" and project_name!="" and win_tenderer!="" and win_bid_price!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "project_name":project_name,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- n_singleKeys = [i for i in singleNum_keys]
- n_singleKeys.append(document_tmp_web_source_no)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":n_singleKeys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if project_name!="" and win_tenderer!="" and win_bid_price!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_name":project_name,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price,
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_code!="" and win_tenderer!="" and win_bid_price!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price,
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_code!="" and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "doctitle_refine":doctitle_refine,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- n_singleKeys = [i for i in singleNum_keys]
- n_singleKeys.append(document_tmp_web_source_no)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":n_singleKeys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
- confidence=90
- _dict = {document_tmp_docchannel:docchannel,
- "doctitle_refine":doctitle_refine,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_name!="" and win_tenderer!="" and win_bid_price!="" and project_code!="":
- confidence=95
- _dict = {document_tmp_docchannel:docchannel,
- "project_name":project_name,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price,
- "project_code":project_code
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if docchannel in (51,103,115,116):
- #9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
- if doctitle_refine!="" and tenderee!="":
- tmp_budget = 0
- if bidding_budget!="":
- tmp_budget = bidding_budget
- confidence=81
- _dict = {document_tmp_docchannel:docchannel,
- "doctitle_refine":doctitle_refine,
- "tenderee":tenderee,
- "bidding_budget":tmp_budget,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- #-- 9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
- if project_code!="" and tenderee!="":
- confidence=81
- tmp_budget = 0
- if bidding_budget!="":
- tmp_budget = bidding_budget
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "tenderee":tenderee,
- "bidding_budget":tmp_budget,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_name!="" and tenderee!="":
- confidence=81
- tmp_budget = 0
- if bidding_budget!="":
- tmp_budget = bidding_budget
- _dict = {document_tmp_docchannel:docchannel,
- "project_name":project_name,
- "tenderee":tenderee,
- "bidding_budget":tmp_budget,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if agency!="" and tenderee!="":
- confidence=81
- tmp_budget = 0
- if bidding_budget!="":
- tmp_budget = bidding_budget
- _dict = {document_tmp_docchannel:docchannel,
- "agency":agency,
- "tenderee":tenderee,
- "bidding_budget":tmp_budget,
- "product":product
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if agency!="" and project_code!="":
- confidence=81
- tmp_budget = 0
- if bidding_budget!="":
- tmp_budget = bidding_budget
- _dict = {document_tmp_docchannel:docchannel,
- "agency":agency,
- "project_code":project_code,
- "bidding_budget":tmp_budget,
- "product":product
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if agency!="" and project_name!="":
- confidence=81
- tmp_budget = 0
- if bidding_budget!="":
- tmp_budget = bidding_budget
- _dict = {document_tmp_docchannel:docchannel,
- "agency":agency,
- "project_name":project_name,
- "bidding_budget":tmp_budget,
- "product":product
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- #五选二
- if tenderee!="" and bidding_budget!="" and product!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "tenderee":tenderee,
- "bidding_budget":bidding_budget,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if tenderee!="" and win_tenderer!="" and product!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "tenderee":tenderee,
- "win_tenderer":win_tenderer,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if tenderee!="" and win_bid_price!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "tenderee":tenderee,
- "win_bid_price":win_bid_price,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if tenderee!="" and agency!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "tenderee":tenderee,
- "agency":agency,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if win_tenderer!="" and bidding_budget!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "win_tenderer":win_tenderer,
- "bidding_budget":bidding_budget,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if win_bid_price!="" and bidding_budget!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "win_bid_price":win_bid_price,
- "bidding_budget":bidding_budget,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if agency!="" and bidding_budget!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "agency":agency,
- "bidding_budget":bidding_budget,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if win_tenderer!="" and win_bid_price!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if win_tenderer!="" and agency!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "win_tenderer":win_tenderer,
- "agency":agency,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if doctitle_refine!="" and product!="" and len(doctitle_refine)>7:
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "doctitle_refine":doctitle_refine,
- "product":product,
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- return list_rules
- def dumplicate_fianl_check(self,base_list):
- the_group = base_list
- the_group.sort(key=lambda x:x["confidence"],reverse=True)
- if len(the_group)>10:
- keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget","doctitle_refine"]
- else:
- keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget"]
- #置信度
- list_key_index = []
- for _k in keys:
- if _k=="doctitle":
- list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
- else:
- list_key_index.append(getDiffIndex(the_group,_k))
- _index = min(list_key_index)
- if _index>1:
- return the_group[:_index]
- return []
- def get_best_docid(self,base_list):
- to_reverse = False
- dict_source_count = {}
- for _item in base_list:
- _web_source = _item.get(document_tmp_web_source_no)
- _fingerprint = _item.get(document_tmp_fingerprint)
- if _web_source is not None:
- if _web_source not in dict_source_count:
- dict_source_count[_web_source] = set()
- dict_source_count[_web_source].add(_fingerprint)
- if len(dict_source_count[_web_source])>=2:
- to_reverse=True
- # 专项债
- if len(base_list)>0 and base_list[0].get("is_special_bonds")==1:
- for _item in base_list:
- detail_link = _item.get("detail_link")
- detail_link = detail_link.strip() if detail_link else ""
- if "bondId=" in detail_link:
- bondId = detail_link.split("bondId=")[1]
- bondId = bondId.split(",") if bondId else []
- else:
- bondId = []
- _item['bondId_num'] = len(bondId)
- # print([i.get("bondId_num") for i in base_list])
- base_list.sort(key=lambda x:x["bondId_num"],reverse=True)
- return base_list[0]["docid"]
- if len(base_list)>0:
- base_list.sort(key=lambda x:x["docid"],reverse=to_reverse)
- base_list.sort(key=lambda x:x.get(document_attachment_extract_status,0),reverse=True)
- base_list.sort(key=lambda x:x["extract_count"],reverse=True)
- return base_list[0]["docid"]
- def save_dumplicate(self,base_list,best_docid,status_from,status_to):
- #best_docid need check while others can save directly
- list_dict = []
- for item in base_list:
- docid = item["docid"]
- _dict = {"partitionkey":item["partitionkey"],
- "docid":item["docid"]}
- if docid==best_docid:
- if item.get("save",1)!=0:
- _dict["save"] = 1
- else:
- _dict["save"] = 0
- if item.get("status")>=status_from[0] and item.get("status")<=status_from[1]:
- _dict["status"] = random.randint(status_to[0],status_to[1])
- list_dict.append(_dict)
- for _dict in list_dict:
- dtmp = Document_tmp(_dict)
- dtmp.update_row(self.ots_client)
- def flow_test(self,status_to=[1,10]):
- def producer():
- bool_query = BoolQuery(must_queries=[
- # ExistsQuery("docid"),
- # RangeQuery("crtime",range_to='2022-04-10'),
- # RangeQuery("status",61),
- NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
- ],
- must_not_queries=[
- # NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
- TermQuery("attachment_extract_status",1),
- RangeQuery("status",1,11)
- ]
- )
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
- log("flow_init producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_init.put(_dict)
- _count = len(list_dict)
- while next_token and _count<1000000:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_init.put(_dict)
- _count += len(list_dict)
- print("%d/%d"%(_count,total_count))
- def comsumer():
- mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
- mt.run()
- def comsumer_handle(item,result_queue,ots_client):
- # print(item)
- dtmp = Document_tmp(item)
- dtmp.setValue(document_tmp_status,random.randint(*status_to),True)
- dtmp.update_row(ots_client)
- # dhtml = Document_html(item)
- # dhtml.update_row(ots_client)
- # dtmp.delete_row(ots_client)
- # dhtml.delete_row(ots_client)
- producer()
- comsumer()
- def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
- def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_web_source_name]):
- bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_dumplicate producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_dumplicate.put(_dict)
- _count = len(list_dict)
- while next_token and _count<flow_process_count:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_dumplicate.put(_dict)
- _count += len(list_dict)
- def comsumer():
- mt = MultiThreadHandler(self.queue_dumplicate,comsumer_handle,None,10,1,ots_client=self.ots_client)
- mt.run()
- def comsumer_handle(item,result_queue,ots_client):
- self.post_extract(item)
- base_list = []
- set_docid = set()
- list_rules = self.translate_dumplicate_rules(flow_dumplicate_status_from,item)
- list_rules.sort(key=lambda x:x["confidence"],reverse=True)
- # print(item,"len_rules",len(list_rules))
- for _rule in list_rules:
- _query = _rule["query"]
- confidence = _rule["confidence"]
- singleNum_keys = _rule["singleNum_keys"]
- contain_keys = _rule["contain_keys"]
- multiNum_keys = _rule["multiNum_keys"]
- self.add_data_by_query(item,base_list,set_docid,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys)
- item["confidence"] = 999
- if item.get(document_tmp_docid) not in set_docid:
- base_list.append(item)
- final_list = self.dumplicate_fianl_check(base_list)
- best_docid = self.get_best_docid(final_list)
- # log(str(final_list))
- _d = {"partitionkey":item["partitionkey"],
- "docid":item["docid"],
- "status":random.randint(*flow_dumplicate_status_to),
- document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
- }
- dtmp = Document_tmp(_d)
- dup_docid = set()
- for _dict in final_list:
- dup_docid.add(_dict.get(document_tmp_docid))
- if item.get(document_tmp_docid) in dup_docid:
- dup_docid.remove(item.get(document_tmp_docid))
- if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
- dtmp.setValue(document_tmp_save,1,True)
- dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- else:
- dtmp.setValue(document_tmp_save,0,True)
- if best_docid in dup_docid:
- dup_docid.remove(best_docid)
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- dmp_docid = "%d,%s"%(best_docid,dmp_docid)
- else:
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
- dtmp.update_row(self.ots_client)
- #只保留当前公告
- # self.save_dumplicate(final_list,best_docid,status_from,status_to)
- #
- # print("=base=",item)
- # if len(final_list)>=1:
- # print("==================")
- # for _dict in final_list:
- # print(_dict)
- # print("========>>>>>>>>>>")
- producer()
- comsumer()
- def merge_document(self,item,status_to=None):
- self.post_extract(item)
- docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
- _d = {"partitionkey":item["partitionkey"],
- "docid":item["docid"],
- }
- dtmp = Document_tmp(_d)
- if item.get(document_tmp_save,1)==1:
- list_should_q = []
- if project_code!="" and tenderee!="":
- _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
- TermQuery("tenderee",tenderee)])
- list_should_q.append(_q)
- if project_name!="" and project_code!="":
- _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
- TermQuery("project_name",project_name)])
- list_should_q.append(_q)
- if len(list_should_q)>0:
- list_data = self.search_data_by_query(item,list_should_q,100,merge=True,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
- if len(list_data)==1:
- dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
- print(item["docid"],list_data[0]["uuid"])
- else:
- list_should_q = []
- if bidding_budget!="" and project_code!="":
- _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
- TermQuery("bidding_budget",float(bidding_budget))])
- list_should_q.append(_q)
- if tenderee!="" and bidding_budget!="" and project_name!="":
- _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
- TermQuery("bidding_budget",float(bidding_budget)),
- TermQuery("project_name",project_name)])
- list_should_q.append(_q)
- if tenderee!="" and win_bid_price!="" and project_name!="":
- _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
- TermQuery("win_bid_price",float(win_bid_price)),
- TermQuery("project_name",project_name)])
- list_should_q.append(_q)
- if len(list_should_q)>0:
- list_data = self.search_data_by_query(item,list_should_q,100,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
- if len(list_data)==1:
- dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
- print(item["docid"],list_data[0]["uuid"])
- return dtmp.getProperties().get("merge_uuid","")
- # dtmp.update_row(self.ots_client)
- def test_merge(self):
- import pandas as pd
- import queue
- def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
- list_test_item = []
- should_q = BoolQuery(should_queries=[
- TermQuery("docchannel",101),
- TermQuery("docchannel",119),
- TermQuery("docchannel",120)
- ])
- bool_query = BoolQuery(must_queries=[
- TermQuery("page_time","2022-04-22"),
- should_q,
- TermQuery("save",1)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_dumplicate producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- list_test_item.append(_dict)
- _count = len(list_dict)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- list_test_item.append(_dict)
- _count += len(list_dict)
- print("%d/%d"%(_count,total_count))
- return list_test_item
- from BaseDataMaintenance.model.ots.project import Project
- def comsumer_handle(item,result_queue,ots_client):
- item["merge_uuid"] = self.merge_document(item)
- if item["merge_uuid"]!="":
- _dict = {"uuid":item["merge_uuid"]}
- _p = Project(_dict)
- _p.fix_columns(self.ots_client,["zhao_biao_page_time"],True)
- if _p.getProperties().get("zhao_biao_page_time","")!="":
- item["是否有招标"] = "是"
- list_test_item = producer()
- task_queue = queue.Queue()
- for item in list_test_item:
- task_queue.put(item)
- mt = MultiThreadHandler(task_queue,comsumer_handle,None,30,1,ots_client=self.ots_client)
- mt.run()
- keys = [document_tmp_docid,document_tmp_docchannel,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_doctitle_refine,"win_tenderer","bidding_budget","win_bid_price","merge_uuid","是否有招标"]
- df_data = {}
- for k in keys:
- df_data[k] = []
- for item in list_test_item:
- for k in keys:
- df_data[k].append(item.get(k,""))
- df = pd.DataFrame(df_data)
- df.to_excel("test_merge.xlsx",columns=keys)
- def flow_merge(self,process_count=10000,status_from=[71,80],status_to=[81,90]):
- def producer(columns=[document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
- bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_merge producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_merge.put(_dict)
- _count = len(list_dict)
- while next_token and _count<process_count:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_merge.put(_dict)
- _count += len(list_dict)
- def comsumer():
- mt = MultiThreadHandler(self.queue_merge,comsumer_handle,None,10,1,ots_client=self.ots_client)
- mt.run()
- def comsumer_handle(item,result_queue,ots_client):
- self.merge_document(item,status_to)
- # producer()
- # comsumer()
- pass
- def flow_syncho(self,status_from=[71,80],status_to=[81,90]):
- pass
- def flow_remove(self,process_count=flow_process_count,status_from=flow_remove_status_from):
- def producer():
- current_date = getCurrent_date("%Y-%m-%d")
- tmp_date = timeAdd(current_date,-10)
- bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True),
- RangeQuery(document_tmp_crtime,range_to="%s 00:00:00"%(tmp_date))])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- log("flow_remove producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_remove.put(_dict)
- _count = len(list_dict)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_remove.put(_dict)
- _count += len(list_dict)
- def comsumer():
- mt = MultiThreadHandler(self.queue_remove,comsumer_handle,None,10,1,ots_client=self.ots_client)
- mt.run()
- def comsumer_handle(item,result_queue,ots_client):
- dtmp = Document_tmp(item)
- dtmp.delete_row(self.ots_client)
- dhtml = Document_html(item)
- dhtml.delete_row(self.ots_client)
- producer()
- comsumer()
- def start_flow_dumplicate(self):
- schedule = BlockingScheduler()
- schedule.add_job(self.flow_remove,"cron",hour="20")
- schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
- schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
- schedule.start()
- def flow_remove_project_tmp(self,process_count=flow_process_count):
- def producer():
- current_date = getCurrent_date("%Y-%m-%d")
- tmp_date = timeAdd(current_date,-6*31)
- bool_query = BoolQuery(must_queries=[
- RangeQuery(project_page_time,range_to="%s"%(tmp_date))])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2_tmp","project2_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- log("flow_remove project2_tmp producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_remove_project.put(_dict)
- _count = len(list_dict)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_remove_project.put(_dict)
- _count += len(list_dict)
- def comsumer():
- mt = MultiThreadHandler(self.queue_remove_project,comsumer_handle,None,10,1,ots_client=self.ots_client)
- mt.run()
- def comsumer_handle(item,result_queue,ots_client):
- ptmp = Project_tmp(item)
- ptmp.delete_row(self.ots_client)
- producer()
- comsumer()
- def start_flow_merge(self):
- schedule = BlockingScheduler()
- schedule.add_job(self.flow_merge,"cron",second="*/10")
- schedule.start()
- def download_attachment():
- ots_client = getConnect_ots()
- queue_attachment = Queue()
- auth = getAuth()
- oss2.defaults.connection_pool_size = 100
- oss2.defaults.multiget_num_threads = 20
- attachment_bucket_name = "attachment-hub"
- if is_internal:
- bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
- else:
- bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
- bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name)
- current_path = os.path.dirname(__file__)
- def producer():
- columns = [document_tmp_attachment_path]
- bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_crtime,"2022-03-29 15:00:00","2022-03-29 17:00:00",True,True)])
- rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_attachment producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- queue_attachment.put(_dict)
- _count = len(list_dict)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- queue_attachment.put(_dict)
- _count += len(list_dict)
- def comsumer():
- mt = MultiThreadHandler(queue_attachment,comsumer_handle,None,10,1)
- mt.run()
- def getAttachments(list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
- list_attachment = []
- rows_to_get = []
- for _md5 in list_filemd5[:50]:
- if _md5 is None:
- continue
- primary_key = [(attachment_filemd5,_md5)]
- rows_to_get.append(primary_key)
- req = BatchGetRowRequest()
- req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
- try:
- result = ots_client.batch_get_row(req)
- attach_result = result.get_result_by_table(attachment_table_name)
- for item in attach_result:
- if item.is_ok:
- _dict = getRow_ots_primary(item.row)
- if _dict is not None:
- list_attachment.append(attachment(_dict))
- except Exception as e:
- log(str(list_filemd5))
- log("attachProcess comsumer error %s"%str(e))
- return list_attachment
- def comsumer_handle(item,result_queue):
- page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
- if len(page_attachments)==0:
- pass
- else:
- list_fileMd5 = []
- for _atta in page_attachments:
- list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
- list_attach = getAttachments(list_fileMd5)
- for attach in list_attach:
- filemd5 = attach.getProperties().get(attachment_filemd5)
- _status = attach.getProperties().get(attachment_status)
- _filetype = attach.getProperties().get(attachment_filetype)
- _size = attach.getProperties().get(attachment_size)
- _path = attach.getProperties().get(attachment_path)
- _uuid = uuid4()
- objectPath = attach.getProperties().get(attachment_path)
- localpath = os.path.join(current_path,"download","%s.%s"%(filemd5,_filetype))
- try:
- if _size>ATTACHMENT_LARGESIZE:
- pass
- else:
- downloadFile(bucket,objectPath,localpath)
- except Exception as e:
- traceback.print_exc()
- producer()
- comsumer()
- def test_attachment_interface():
- current_path = os.path.dirname(__file__)
- task_queue = Queue()
- def producer():
- _count = 0
- list_filename = os.listdir(os.path.join(current_path,"download"))
- for _filename in list_filename:
- _count += 1
- _type = _filename.split(".")[1]
- task_queue.put({"path":os.path.join(current_path,"download",_filename),"file_type":_type})
- if _count>=500:
- break
- def comsumer():
- mt = MultiThreadHandler(task_queue,comsumer_handle,None,10)
- mt.run()
- def comsumer_handle(item,result_queue):
- _path = item.get("path")
- _type = item.get("file_type")
- _data_base64 = base64.b64encode(open(_path,"rb").read())
- #调用接口处理结果
- start_time = time.time()
- _success,_html,swf_images = getAttachDealInterface(_data_base64,_type)
- log("%s result:%s takes:%d"%(_path,str(_success),time.time()-start_time))
- producer()
- comsumer()
- class Dataflow_attachment(Dataflow):
- def __init__(self):
- Dataflow.__init__(self)
- self.process_list_thread = []
- def flow_attachment_process(self):
- self.process_comsumer()
- def monitor_attachment_process(self):
- alive_count = 0
- for _t in self.process_list_thread:
- if _t.is_alive():
- alive_count += 1
- log("attachment_process alive:%d total:%d"%(alive_count,len(self.process_list_thread)))
- def process_comsumer(self):
- if len(self.process_list_thread)==0:
- thread_count = 60
- for i in range(thread_count):
- self.process_list_thread.append(Thread(target=self.process_comsumer_handle))
- for t in self.process_list_thread:
- t.start()
- while 1:
- failed_count = 0
- for _i in range(len(self.process_list_thread)):
- t = self.process_list_thread[_i]
- if not t.is_alive():
- failed_count += 1
- self.prcess_list_thread[_i] = Thread(target=self.process_comsumer_handle)
- self.prcess_list_thread[_i].start()
- if failed_count>0:
- log("attachment failed %d"%(failed_count))
- time.sleep(5)
- def process_comsumer_handle(self):
- while 1:
- _flag = False
- log("attachment handle:%s"%str(threading.get_ident()))
- try:
- item = self.queue_attachment_ocr.get(True,timeout=0.2)
- log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
- self.attachment_recognize(item,None)
- log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
- except Exception as e:
- _flag = True
- pass
- try:
- item = self.queue_attachment_not_ocr.get(True,timeout=0.2)
- log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
- self.attachment_recognize(item,None)
- log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
- except Exception as e:
- _flag = True and _flag
- pass
- if _flag:
- time.sleep(2)
- def attachment_recognize(self,_dict,result_queue):
- item = _dict.get("item")
- list_attach = _dict.get("list_attach")
- dhtml = Document_html({"partitionkey":item.get("partitionkey"),
- "docid":item.get("docid")})
- dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
- _dochtmlcon = dhtml.getProperties().get("dochtmlcon","")
- _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
- log(str(swf_urls))
- if not _succeed:
- item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
- else:
- dhtml.updateSWFImages(swf_urls)
- dhtml.updateAttachment(list_html)
- dhtml.update_row(self.ots_client)
- item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
- item[document_tmp_attachment_extract_status] = 1
- log("document:%d get attachments with result:%s"%(item.get("docid"),str(_succeed)))
- dtmp = Document_tmp(item)
- dtmp.update_row(self.ots_client)
- def flow_attachment(self):
- self.flow_attachment_producer()
- self.flow_attachment_producer_comsumer()
- def getAttachments(self,list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
- list_attachment = []
- rows_to_get = []
- for _md5 in list_filemd5[:50]:
- if _md5 is None:
- continue
- primary_key = [(attachment_filemd5,_md5)]
- rows_to_get.append(primary_key)
- req = BatchGetRowRequest()
- req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
- try:
- result = self.ots_client.batch_get_row(req)
- attach_result = result.get_result_by_table(attachment_table_name)
- for item in attach_result:
- if item.is_ok:
- _dict = getRow_ots_primary(item.row)
- if _dict is not None:
- list_attachment.append(attachment(_dict))
- except Exception as e:
- log(str(list_filemd5))
- log("attachProcess comsumer error %s"%str(e))
- return list_attachment
- def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
- qsize_ocr = self.queue_attachment_ocr.qsize()
- qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
- log("queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(qsize_ocr,qsize_not_ocr))
- #选择加入数据场景
- if min(qsize_ocr,qsize_not_ocr)>200 or max(qsize_ocr,qsize_not_ocr)>1000:
- return
- #去重
- set_docid = set()
- set_docid = set_docid | set(self.list_attachment_ocr) | set(self.list_attachment_not_ocr)
- if qsize_ocr>0:
- self.list_attachment_ocr = self.list_attachment_ocr[-qsize_ocr:]
- else:
- self.list_attachment_ocr = []
- if qsize_not_ocr>0:
- self.list_attachment_not_ocr = self.list_attachment_not_ocr[-qsize_not_ocr:]
- else:
- self.list_attachment_not_ocr = []
- try:
- bool_query = BoolQuery(must_queries=[
- RangeQuery(document_tmp_status,*flow_attachment_status_from,True,True),
- # TermQuery(document_tmp_docid,234925191),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_attachment producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- _count = 0
- for _dict in list_dict:
- docid = _dict.get(document_tmp_docid)
- if docid in set_docid:
- continue
- self.queue_attachment.put(_dict,True)
- _count += 1
- while next_token and _count<flow_process_count:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- docid = _dict.get(document_tmp_docid)
- if docid in set_docid:
- continue
- self.queue_attachment.put(_dict,True)
- _count += 1
- log("add attachment count:%d"%(_count))
- except Exception as e:
- log("flow attachment producer error:%s"%(str(e)))
- traceback.print_exc()
- def flow_attachment_producer_comsumer(self):
- log("start flow_attachment comsumer")
- mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1)
- mt.run()
- def set_queue(self,_dict):
- list_attach = _dict.get("list_attach")
- to_ocr = False
- for attach in list_attach:
- if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
- to_ocr = True
- break
- if to_ocr:
- self.queue_attachment_ocr.put(_dict,True)
- # self.list_attachment_ocr.append(_dict.get("item").get(document_tmp_docid))
- else:
- self.queue_attachment_not_ocr.put(_dict,True)
- # self.list_attachment_not_ocr.append(_dict.get("item").get(document_tmp_docid))
- def comsumer_handle(self,item,result_queue):
- try:
- page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
- if len(page_attachments)==0:
- item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
- dtmp = Document_tmp(item)
- dtmp.update_row(self.ots_client)
- else:
- list_fileMd5 = []
- for _atta in page_attachments:
- list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
- list_attach = self.getAttachments(list_fileMd5)
- #未上传成功的2小时内不处理
- if len(page_attachments)!=len(list_attach) and time.mktime(time.localtime())-time.mktime(time.strptime(item.get(document_tmp_crtime),"%Y-%m-%d %H:%M:%S"))<7200:
- item[document_tmp_status] = 1
- dtmp = Document_tmp(item)
- dtmp.update_row(self.ots_client)
- return
- self.set_queue({"item":item,"list_attach":list_attach})
- except Exception as e:
- traceback.print_exc()
- def start_flow_attachment(self):
- schedule = BlockingScheduler()
- schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
- schedule.add_job(self.flow_attachment,"cron",second="*/10")
- schedule.start()
- class Dataflow_extract(Dataflow):
- def __init__(self):
- Dataflow.__init__(self)
- def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
- q_size = self.queue_extract.qsize()
- if q_size>100:
- return
- set_docid = set(self.list_extract)
- if q_size>0:
- self.list_extract = self.list_extract[-q_size:]
- else:
- self.list_extract = []
- try:
- bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*flow_extract_status_from,True,True)])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.ASC)]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_extract producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- docid = _dict.get(document_tmp_docid)
- if docid in set_docid:
- self.list_extract.insert(0,docid)
- continue
- else:
- self.queue_extract.put(_dict)
- self.list_extract.append(docid)
- _count = len(list_dict)
- while next_token and _count<flow_process_count:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- docid = _dict.get(document_tmp_docid)
- if docid in set_docid:
- self.list_extract.insert(0,docid)
- continue
- else:
- self.queue_extract.put(_dict)
- self.list_extract.append(docid)
- _count += len(list_dict)
- except Exception as e:
- log("flow extract producer error:%s"%(str(e)))
- traceback.print_exc()
- def flow_extract(self,):
- self.comsumer()
- def comsumer(self):
- mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,35,1,True)
- mt.run()
- def comsumer_handle(self,item,result_queue):
- dhtml = Document_html({"partitionkey":item.get("partitionkey"),
- "docid":item.get("docid")})
- dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
- item[document_tmp_dochtmlcon] = dhtml.getProperties().get(document_tmp_dochtmlcon,"")
- _extract = Document_extract({})
- _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
- _extract.setValue(document_extract2_docid,item.get(document_docid))
- all_done = 1
- if all_done:
- data = item
- resp = requests.post(self.other_url,json=data,headers=self.header)
- if (resp.status_code >=200 and resp.status_code<=210):
- _extract.setValue(document_extract2_other_json,resp.content.decode("utf8"),True)
- else:
- all_done = -1
- data = {}
- for k,v in item.items():
- data[k] = v
- data["timeout"] = 240
- data["doc_id"] = data.get(document_tmp_docid)
- data["content"] = data.get(document_tmp_dochtmlcon,"")
- if document_tmp_dochtmlcon in data:
- data.pop(document_tmp_dochtmlcon)
- data["title"] = data.get(document_tmp_doctitle,"")
- data["web_source_no"] = item.get(document_tmp_web_source_no,"")
- data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
- if all_done:
- resp = requests.post(self.extract_url,json=data,headers=self.header)
- if (resp.status_code >=200 and resp.status_code<=210):
- _extract.setValue(document_extract2_extract_json,resp.content.decode("utf8"),True)
- else:
- all_done = -2
- if all_done:
- resp = requests.post(self.industy_url,json=data,headers=self.header)
- if (resp.status_code >=200 and resp.status_code<=210):
- _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
- else:
- all_done = -3
- _dict = {document_partitionkey:item.get(document_tmp_partitionkey),
- document_docid:item.get(document_tmp_docid),
- }
- dtmp = Document_tmp(_dict)
- if all_done!=1:
- sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
- dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_failed_to),True)
- dtmp.update_row(self.ots_client)
- else:
- dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
- dtmp.update_row(self.ots_client)
- # 插入接口表,上线放开
- _extract.setValue(document_extract2_status,random.randint(1,50),True)
- _extract.update_row(self.ots_client)
- log("process docid:%d %s"%(data["doc_id"],str(all_done)))
- def start_flow_extract(self):
- schedule = BlockingScheduler()
- schedule.add_job(self.flow_extract_producer,"cron",second="*/10")
- schedule.add_job(self.flow_extract,"cron",second="*/10")
- schedule.start()
- class Dataflow_dumplicate(Dataflow):
- class DeleteListener():
- def __init__(self,conn,_func,*args,**kwargs):
- self.conn = conn
- self._func = _func
- def on_error(self, headers,*args,**kwargs):
- log('received an error %s' % str(headers.body))
- def on_message(self, headers,*args,**kwargs):
- try:
- message_id = headers.headers["message-id"]
- body = headers.body
- log("get message %s"%(message_id))
- self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
- except Exception as e:
- traceback.print_exc()
- pass
- def __del__(self):
- self.conn.disconnect()
- def __init__(self,start_delete_listener=True):
- Dataflow.__init__(self,)
- self.c_f_get_extractCount = f_get_extractCount()
- self.c_f_get_package = f_get_package()
- logging.basicConfig(level = logging.info,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- self.fix_doc_docid = None
- self.bdm = BaseDataMonitor()
- self.check_rule = 1
- if start_delete_listener:
- self.delete_comsumer_counts = 2
- self.doc_delete_queue = "/queue/doc_delete_queue"
- self.doc_delete_result = "/queue/doc_delete_result"
- self.pool_mq_ali = ConnectorPool(1,10,getConnect_activateMQ_ali)
- for _ in range(self.delete_comsumer_counts):
- conn = getConnect_activateMQ_ali()
- listener = self.DeleteListener(conn,self.delete_doc_handle)
- createComsumer(listener,self.doc_delete_queue)
- def get_dict_time(self,_extract,keys=["time_bidclose","time_bidopen","time_bidstart","time_commencement","time_completion","time_earnestMoneyEnd","time_earnestMoneyStart","time_getFileEnd","time_getFileStart","time_publicityEnd","time_publicityStart","time_registrationEnd","time_registrationStart"]):
- dict_time = {}
- for k in keys:
- _time = _extract.get(k)
- _time = _time[:10] if _time else ""
- dict_time[k] = _time
- return dict_time
- def get_attrs_before_dump(self,docid,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]):
- bool_query = BoolQuery(must_queries=[
- TermQuery("docid",docid)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_dumplicate producer total_count:%d"%total_count)
- if total_count==0:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- if len(list_dict)>0:
- return self.post_extract(list_dict[0])
- def post_extract(self,_dict):
- win_tenderer,bidding_budget,win_bid_price,_ = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
- _dict["win_tenderer"] = win_tenderer
- _dict["bidding_budget"] = bidding_budget
- _dict["win_bid_price"] = win_bid_price
- extract_json = _dict.get(document_tmp_extract_json,"{}")
- _extract = json.loads(extract_json)
- _dict["product"] = ",".join(_extract.get("product",[]))
- _dict["fingerprint"] = _extract.get("fingerprint","")
- _dict["project_codes"] = _extract.get("code",[])
- if len(_dict["project_codes"])>0:
- _dict["project_code"] = _dict["project_codes"][0]
- else:
- _dict["project_code"] = ""
- _dict["doctitle_refine"] = _extract.get("doctitle_refine","")
- if _dict["doctitle_refine"]=="":
- _dict["doctitle_refine"] = _dict.get("doctitle")
- _dict["moneys"] = set(_extract.get("moneys",[]))
- _dict["moneys_attachment"] = set(_extract.get("moneys_attachment",[]))
- _dict["nlp_enterprise"] = json.dumps({"indoctextcon":_extract.get("nlp_enterprise",[]),
- "notindoctextcon":_extract.get("nlp_enterprise_attachment",[])},ensure_ascii=False)
- _dict["extract_count"] = _extract.get("extract_count",0)
- _dict["package"] = self.c_f_get_package.evaluate(extract_json)
- _dict["project_name"] = _extract.get("name","")
- _dict["dict_time"] = self.get_dict_time(_extract)
- _dict["punish"] = _extract.get("punish",{})
- _dict["approval"] = _extract.get("approval",[])
- # 专项债字段
- issue_details = _extract.get("debt_dic",{}).get("issue_details",[])
- _dict["is_special_bonds"] = 1 if _dict.get(document_tmp_docchannel)==302 and _dict.get(document_tmp_web_source_name)=='专项债券信息网' and issue_details else 0
- # 采购意向字段
- if _dict.get("docchannel")==114:
- _dict["demand_info"] = _extract.get("demand_info",{}).get("data",[])
- else:
- _dict["demand_info"] = []
- return _dict
- def dumplicate_fianl_check(self,base_list,b_log=False):
- the_group = base_list
- the_group.sort(key=lambda x:x["confidence"],reverse=True)
- _index = 0
- base_fingerprint = "None"
- if len(base_list)>0:
- base_fingerprint = base_list[0]["fingerprint"]
- final_group = []
- for _i in range(len(base_list)):
- _dict1 = base_list[_i]
- fingerprint_less = _dict1["fingerprint"]
- _pass = True
- if fingerprint_less==base_fingerprint:
- _index = _i
- final_group.append(_dict1)
- continue
- for _dict2 in final_group:
- _prob,day_dis = self.dumplicate_check(_dict1,_dict2,_dict1.get("min_counts",10),b_log=b_log)
- if _prob<=0.1:
- _pass = False
- break
- log("checking index:%d %s %.2f"%(_i,str(_pass),_prob))
- _index = _i
- if _pass:
- final_group.append(_dict1)
- else:
- break
- return final_group
- def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
- document_less = _dict1
- docid_less = _dict1["docid"]
- docchannel_less = document_less.get("docchannel",0)
- page_time_less = document_less.get("page_time")
- doctitle_refine_less = document_less["doctitle_refine"]
- project_codes_less = document_less.get("project_codes")
- nlp_enterprise_less = document_less["nlp_enterprise"]
- tenderee_less = document_less.get("tenderee","")
- agency_less = document_less.get("agency")
- win_tenderer_less = document_less["win_tenderer"]
- bidding_budget_less = document_less["bidding_budget"]
- win_bid_price_less = document_less["win_bid_price"]
- product_less = document_less.get("product")
- package_less = document_less.get("package")
- json_time_less = document_less.get("dict_time")
- project_name_less = document_less.get("project_name")
- fingerprint_less = document_less.get("fingerprint")
- extract_count_less = document_less.get("extract_count",0)
- web_source_no_less = document_less.get("web_source_no")
- province_less = document_less.get("province")
- city_less = document_less.get("city")
- district_less = document_less.get("district")
- moneys_less = document_less.get("moneys")
- moneys_attachment_less = document_less.get("moneys_attachment")
- page_attachments_less = document_less.get("page_attachments","[]")
- punish_less = document_less.get("punish",{})
- approval_less = document_less.get("approval",[])
- source_type_less = document_less.get("source_type")
- document_greater = _dict2
- docid_greater = _dict2["docid"]
- page_time_greater = document_greater["page_time"]
- docchannel_greater = document_greater.get("docchannel",0)
- doctitle_refine_greater = document_greater.get("doctitle_refine","")
- project_codes_greater = document_greater["project_codes"]
- nlp_enterprise_greater = document_greater["nlp_enterprise"]
- tenderee_greater = document_greater.get("tenderee","")
- agency_greater = document_greater.get("agency","")
- win_tenderer_greater = document_greater["win_tenderer"]
- bidding_budget_greater = document_greater["bidding_budget"]
- win_bid_price_greater = document_greater["win_bid_price"]
- product_greater = document_greater.get("product")
- package_greater = document_greater.get("package")
- json_time_greater = document_greater["dict_time"]
- project_name_greater = document_greater.get("project_name")
- fingerprint_greater = document_greater.get("fingerprint")
- extract_count_greater = document_greater.get("extract_count",0)
- web_source_no_greater = document_greater.get("web_source_no")
- province_greater = document_greater.get("province")
- city_greater = document_greater.get("city")
- district_greater = document_greater.get("district")
- moneys_greater = document_greater.get("moneys")
- moneys_attachment_greater = document_greater.get("moneys_attachment")
- page_attachments_greater = document_greater.get("page_attachments","[]")
- punish_greater = document_greater.get("punish",{})
- approval_greater = document_greater.get("approval",[])
- source_type_greater = document_greater.get("source_type")
- hard_level=1
- if docchannel_less==docchannel_greater==302:
- hard_level=2
- if web_source_no_less==web_source_no_greater=="17397-3":
- hard_level=2
- if self.check_rule==1:
- _prob = check_dumplicate_rule(document_less,document_greater,min_counts,b_log=b_log,hard_level=hard_level)
- else:
- _prob = check_dumplicate_rule_test(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater)
- pagetime_stamp_less = getTimeStamp(page_time_less)
- pagetime_stamp_greater = getTimeStamp(page_time_greater)
-
- day_dis = abs(pagetime_stamp_greater-pagetime_stamp_less)//86400
- if document_less.get("is_special_bonds",0)==document_greater.get("is_special_bonds",0)==1:
- pass
- else:
- if day_dis>7:
- _prob = 0
- elif day_dis>3:
- if _prob<0.4:
- _prob = 0
- return _prob,day_dis
- def dumplicate_check_bak(self,_dict1,_dict2,min_counts,b_log=False):
- document_less = _dict1
- docid_less = _dict1["docid"]
- docchannel_less = document_less["docchannel"]
- page_time_less = document_less["page_time"]
- doctitle_refine_less = document_less["doctitle_refine"]
- project_codes_less = document_less["project_codes"]
- nlp_enterprise_less = document_less["nlp_enterprise"]
- tenderee_less = document_less["tenderee"]
- agency_less = document_less["agency"]
- win_tenderer_less = document_less["win_tenderer"]
- bidding_budget_less = document_less["bidding_budget"]
- win_bid_price_less = document_less["win_bid_price"]
- product_less = document_less["product"]
- package_less = document_less["package"]
- json_time_less = document_less["dict_time"]
- project_name_less = document_less["project_name"]
- fingerprint_less = document_less["fingerprint"]
- extract_count_less = document_less["extract_count"]
- document_greater = _dict2
- docid_greater = _dict2["docid"]
- page_time_greater = document_greater["page_time"]
- doctitle_refine_greater = document_greater["doctitle_refine"]
- project_codes_greater = document_greater["project_codes"]
- nlp_enterprise_greater = document_greater["nlp_enterprise"]
- tenderee_greater = document_greater["tenderee"]
- agency_greater = document_greater["agency"]
- win_tenderer_greater = document_greater["win_tenderer"]
- bidding_budget_greater = document_greater["bidding_budget"]
- win_bid_price_greater = document_greater["win_bid_price"]
- product_greater = document_greater["product"]
- package_greater = document_greater["package"]
- json_time_greater = document_greater["dict_time"]
- project_name_greater = document_greater["project_name"]
- fingerprint_greater = document_greater["fingerprint"]
- extract_count_greater = document_greater["extract_count"]
- if fingerprint_less==fingerprint_greater:
- return 1
- same_count = 0
- all_count = 8
- if len(set(project_codes_less) & set(project_codes_greater))>0:
- same_count += 1
- if getLength(tenderee_less)>0 and tenderee_less==tenderee_greater:
- same_count += 1
- if getLength(agency_less)>0 and agency_less==agency_greater:
- same_count += 1
- if getLength(win_tenderer_less)>0 and win_tenderer_less==win_tenderer_greater:
- same_count += 1
- if getLength(bidding_budget_less)>0 and bidding_budget_less==bidding_budget_greater:
- same_count += 1
- if getLength(win_bid_price_less)>0 and win_bid_price_less==win_bid_price_greater:
- same_count += 1
- if getLength(project_name_less)>0 and project_name_less==project_name_greater:
- same_count += 1
- if getLength(doctitle_refine_less)>0 and (doctitle_refine_less==doctitle_refine_greater or doctitle_refine_less in doctitle_refine_greater or doctitle_refine_greater in doctitle_refine_less):
- same_count += 1
- base_prob = 0
- if min_counts<3:
- base_prob = 0.9
- elif min_counts<5:
- base_prob = 0.8
- elif min_counts<8:
- base_prob = 0.7
- else:
- base_prob = 0.6
- _prob = base_prob*same_count/all_count
- if _prob<0.1 and min(extract_count_less,extract_count_greater)<=3:
- _prob = 0.15
- if _prob<0.1:
- return _prob
- check_result = {"pass":1}
- if docchannel_less in (51,102,103,104,115,116,117):
- if doctitle_refine_less!=doctitle_refine_greater:
- if page_time_less!=page_time_greater:
- check_result["docchannel"] = 0
- check_result["pass"] = 0
- else:
- check_result["docchannel"] = 2
- if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater,page_time_less,page_time_greater):
- check_result["doctitle"] = 0
- check_result["pass"] = 0
- if b_log:
- logging.info("%d-%d,check_doctitle_failed:%s==%s"%(docid_less,docid_greater,str(doctitle_refine_less),str(doctitle_refine_greater)))
- else:
- check_result["doctitle"] = 2
- #added check
- if not check_codes(project_codes_less,project_codes_greater):
- check_result["code"] = 0
- check_result["pass"] = 0
- if b_log:
- logging.info("%d-%d,check_code_failed:%s==%s"%(docid_less,docid_greater,str(project_codes_less),str(project_codes_greater)))
- else:
- if getLength(project_codes_less)>0 and getLength(project_codes_greater)>0 and len(set(project_codes_less) & set(project_codes_greater))>0:
- check_result["code"] = 2
- else:
- check_result["code"] = 1
- if not check_product(product_less,product_greater,doctitle_refine_less,doctitle_refine_greater):
- check_result["product"] = 0
- check_result["pass"] = 0
- if b_log:
- logging.info("%d-%d,check_product_failed:%s==%s"%(docid_less,docid_greater,str(product_less),str(product_greater)))
- else:
- if getLength(product_less)>0 and getLength(product_greater)>0:
- check_result["product"] = 2
- else:
- check_result["product"] = 1
- if not check_demand():
- check_result["pass"] = 0
- if not check_entity(nlp_enterprise_less,nlp_enterprise_greater,
- tenderee_less,tenderee_greater,
- agency_less,agency_greater,
- win_tenderer_less,win_tenderer_greater):
- check_result["entity"] = 0
- check_result["pass"] = 0
- if b_log:
- logging.info("%d-%d,check_entity_failed:%s==%s==%s==%s==%s==%s==%s==%s"%(docid_less,docid_greater,str(nlp_enterprise_less),str(nlp_enterprise_greater),str(tenderee_less),str(tenderee_greater),str(agency_less),str(agency_greater),str(win_tenderer_less),str(win_tenderer_greater)))
- else:
- if docchannel_less in (51,52,103,105,114,118) and getLength(tenderee_less)>0 and getLength(tenderee_greater)>0:
- check_result["entity"] = 2
- elif docchannel_less in (101,119,120) and getLength(win_tenderer_less)>0 and getLength(win_tenderer_greater)>0:
- check_result["entity"] = 2
- else:
- check_result["entity"] = 1
- if not check_money(bidding_budget_less,bidding_budget_greater,
- win_bid_price_less,win_bid_price_greater):
- if b_log:
- logging.info("%d-%d,check_money_failed:%s==%s==%s==%s"%(docid_less,docid_greater,str(bidding_budget_less),str(bidding_budget_greater),str(win_bid_price_less),str(win_bid_price_greater)))
- check_result["money"] = 0
- check_result["pass"] = 0
- else:
- if docchannel_less in (51,52,103,105,114,118) and getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0:
- check_result["money"] = 2
- elif docchannel_less in (101,119,120) and getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0:
- check_result["money"] = 2
- else:
- check_result["money"] = 1
- #added check
- if not check_package(package_less,package_greater):
- if b_log:
- logging.info("%d-%d,check_package_failed:%s==%s"%(docid_less,docid_greater,str(package_less),str(package_greater)))
- check_result["package"] = 0
- check_result["pass"] = 0
- else:
- if getLength(package_less)>0 and getLength(package_greater)>0:
- check_result["package"] = 2
- else:
- check_result["package"] = 1
- #added check
- if not check_time(json_time_less,json_time_greater):
- if b_log:
- logging.info("%d-%d,check_time_failed:%s==%s"%(docid_less,docid_greater,str(json_time_less),str(json_time_greater)))
- if isinstance(json_time_less,dict):
- time_less = json_time_less
- else:
- time_less = json.loads(json_time_less)
- if isinstance(json_time_greater,dict):
- time_greater = json_time_greater
- else:
- time_greater = json.loads(json_time_greater)
- for k,v in time_less.items():
- if getLength(v)>0:
- v1 = time_greater.get(k,"")
- if getLength(v1)>0:
- if v!=v1:
- log("%d-%d,key:%s"%(docid_less,docid_greater,str(k)))
- check_result["time"] = 0
- check_result["pass"] = 0
- else:
- if getLength(json_time_less)>10 and getLength(json_time_greater)>10:
- check_result["time"] = 2
- else:
- check_result["time"] = 1
- if check_result.get("pass",0)==0:
- if b_log:
- logging.info(str(check_result))
- if check_result.get("money",1)==0:
- return 0
- if check_result.get("entity",1)==2 and check_result.get("code",1)==2 and check_result.get("doctitle",2)==2 and check_result.get("product",2)==2 and check_result.get("money",0)==2:
- return _prob
- else:
- return 0
- if check_result.get("time",1)==0:
- return 0
- return _prob
- def search_data_by_query(self,item,_query,confidence,retry_times=3,merge=False,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count],b_log=False):
- for _ in range(retry_times):
- try:
- _time = time.time()
- check_time = 0
- if isinstance(_query,list):
- bool_query = BoolQuery(should_queries=_query)
- else:
- bool_query = _query
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- list_data = []
- for _dict in list_dict:
- self.post_extract(_dict)
- _docid = _dict.get(document_tmp_docid)
- if merge:
- list_data.append(_dict)
- else:
- if _docid!=item.get(document_tmp_docid):
- _time1 = time.time()
- confidence,day_dis = self.dumplicate_check(item,_dict,total_count,b_log=b_log)
- check_time+= time.time()-_time1
- _dict["confidence"] = confidence
- _dict["min_counts"] = total_count
- list_data.append(_dict)
- all_time = time.time()-_time
- # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
- return list_data
- except Exception as e:
- traceback.print_exc()
- return []
- def add_data_by_query(self,item,base_list,set_docid,_query,confidence,table_name,table_index,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count],b_log=False):
- list_dict = self.search_data_by_query(item,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns,b_log=b_log)
- for _dict in list_dict:
- _docid = _dict.get(document_tmp_docid)
- confidence = _dict["confidence"]
- if b_log:
- log("confidence %d %.3f total_count %d"%(_docid,confidence,_dict.get('min_counts',0)))
- if confidence>0.1:
- if _docid not in set_docid:
- base_list.append(_dict)
- set_docid.add(_docid)
- set_docid.add(_docid)
- def appendRule(self,list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=False):
- for k,v in _dict.items():
- if getLength(v)==0:
- return
- _dict.update(base_dict)
- if b_log:
- log("rule dict:"+str(_dict))
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "item":item,
- "query":_query,
- "singleNum_keys":[],
- "contain_keys":[],
- "multiNum_keys":[],
- "_dict":_dict}
- list_rules.append(_rule)
- def translate_dumplicate_rules(self,status_from,item,get_all=False,to_log=False,day_dis=7,table_name ="document_tmp",table_index="document_tmp_index"):
- docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
- current_date = getCurrent_date("%Y-%m-%d")
- if page_time=='':
- page_time = current_date
- two_day_dict = {"page_time":[timeAdd(page_time,-7),timeAdd(page_time,7)]}
- if table_name in {"document_tmp","document"}:
- # if page_time>=timeAdd(current_date,-7) and item.get("is_special_bonds")!=1:
- if page_time>=timeAdd(current_date,-7) and item.get("is_special_bonds")!=1 and not get_all:
- table_name = "document_tmp"
- table_index = "document_tmp_index"
- base_dict = {
- "docchannel":item.get("docchannel",52),
- "status":[status_from[0]],
- "page_time":[timeAdd(page_time,-day_dis),timeAdd(page_time,day_dis)]
- }
- must_not_dict = {"save":0,"docid":item.get("docid")}
- doctitle_refine_name = "doctitle_refine"
- else:
- table_name = "document"
- table_index = "document_index"
- if get_all:
- _status = [201,450]
- else:
- _status = [201,300]
- base_dict = {
- "docchannel":item["docchannel"],
- "status":_status,
- "page_time":[timeAdd(page_time,-day_dis),timeAdd(page_time,day_dis)]
- }
- must_not_dict = {"docid":item.get("docid")}
- doctitle_refine_name = "doctitle"
- else:
- _status = [201,300]
- base_dict = {
- "docchannel":item["docchannel"],
- "status":_status,
- "page_time":[timeAdd(page_time,-day_dis),timeAdd(page_time,day_dis)]
- }
- must_not_dict = {"docid":item.get("docid")}
- doctitle_refine_name = "doctitle"
- list_rules = []
- singleNum_keys = ["tenderee","win_tenderer"]
- confidence = 100
- self.appendRule(list_rules,{document_tmp_fingerprint:fingerprint},base_dict,must_not_dict,confidence,item,b_log=to_log)
- confidence = 90
- _dict = {document_tmp_agency:agency,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {document_tmp_agency:agency,
- "win_tenderer":win_tenderer,
- "bidding_budget":bidding_budget}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {document_tmp_agency:agency,
- "win_bid_price":win_bid_price,
- "bidding_budget":bidding_budget}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {win_tenderer:win_tenderer,
- "win_bid_price":win_bid_price,
- "bidding_budget":bidding_budget}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "win_tenderer":win_tenderer,
- "bidding_budget":bidding_budget}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "win_bid_price":win_bid_price,
- "bidding_budget":bidding_budget}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "agency":agency,
- "win_tenderer":win_tenderer}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "agency":agency,
- "win_bid_price":win_bid_price}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "agency":agency,
- "bidding_budget":bidding_budget}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "project_codes":project_code
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "win_bid_price":win_bid_price
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"agency":agency,
- "project_codes":project_code
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"win_tenderer":win_tenderer,
- "bidding_budget":bidding_budget
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_codes":project_code,
- "win_bid_price":win_bid_price
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_codes":project_code,
- "bidding_budget":bidding_budget
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_codes":project_code,
- doctitle_refine_name:doctitle_refine
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "bidding_budget":bidding_budget
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_codes":project_code,
- "win_tenderer":win_tenderer
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- base_dict.update(two_day_dict)
- confidence=85
- _dict = {"tenderee":tenderee,
- "agency":agency
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "project_name":project_name
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- if getLength(product)>0:
- l_p = product.split(",")
- _dict = {"tenderee":tenderee,
- "product":l_p[0]
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "win_tenderer":win_tenderer
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- doctitle_refine_name:doctitle_refine
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"agency":agency,
- "project_name":project_name
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_codes":project_code,
- "project_name":project_name
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_name":project_name,
- "win_tenderer":win_tenderer
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_name":project_name,
- "win_bid_price":win_bid_price
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_name":project_name,
- "bidding_budget":bidding_budget
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_name":project_name,
- doctitle_refine_name:doctitle_refine
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"win_tenderer":win_tenderer,
- doctitle_refine_name:doctitle_refine
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"win_bid_price":win_bid_price,
- "bidding_budget":bidding_budget
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- confidence=80
- _dict = {"project_codes":project_code}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"win_bid_price":win_bid_price,
- doctitle_refine_name:doctitle_refine
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"bidding_budget":bidding_budget,
- doctitle_refine_name:doctitle_refine
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- confidence=80
- _dict = {doctitle_refine_name:doctitle_refine}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- # 专项债
- if item.get("is_special_bonds")==1:
- confidence = 90
- _dict = {doctitle_refine_name: doctitle_refine,
- document_tmp_web_source_name:"专项债券信息网"}
- tmp_base_dict = {
- "docchannel": item["docchannel"],
- "status": [201, 450],
- # "page_time": [timeAdd(page_time, -365), timeAdd(page_time, 365)]
- }
- self.appendRule(list_rules, _dict, tmp_base_dict, must_not_dict, confidence, item, b_log=to_log)
- confidence=70
- _dict = {"project_name":project_name}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- return list_rules,table_name,table_index
- def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,"detail_link",'products']):
- q_size = self.queue_dumplicate.qsize()
- log("dumplicate queue size %d"%(q_size))
- while 1:
- try:
- docid = self.queue_dumplicate_processed.get(block=False)
- if docid in self.dumplicate_set:
- self.dumplicate_set.remove(docid)
- except Exception as e:
- break
- if q_size>process_count//3:
- return
- bool_query = BoolQuery(must_queries=[
- RangeQuery(document_tmp_status,*status_from,True,True),
- # TermQuery("docid",271983871)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_update_document,SortOrder.DESC),FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_dumplicate producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- docid = _dict.get(document_tmp_docid)
- if docid in self.dumplicate_set:
- continue
- self.dumplicate_set.add(docid)
- self.queue_dumplicate.put(_dict)
- _count = len(list_dict)
- while next_token and _count<process_count:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- docid = _dict.get(document_tmp_docid)
- if docid in self.dumplicate_set:
- continue
- self.dumplicate_set.add(docid)
- self.queue_dumplicate.put(_dict)
- _count += len(list_dict)
- # _l = list(self.dumplicate_set)
- # _l.sort(key=lambda x:x,reverse=True)
- # self.dumplicate_set = set(_l[:flow_process_count*2])
- def comsumer_flow_dumplicate(self):
- mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,60,1,ots_client=self.ots_client)
- mt.run()
- def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
- self.producer_flow_dumplicate(process_count=process_count,status_from=status_from)
- # self.comsumer_flow_dumplicate()
- def flow_dumpcate_comsumer(self):
- from multiprocessing import Process
- process_count = 6
- thread_count = 12
- list_process = []
- def start_thread():
- mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,thread_count,1,need_stop=False,restart=True,timeout=600,ots_client=self.ots_client)
- mt.run()
- for _ in range(process_count):
- p = Process(target=start_thread)
- list_process.append(p)
- for p in list_process:
- p.start()
- while 1:
- for _i in range(len(list_process)):
- p = list_process[_i]
- if not p.is_alive():
- p = Process(target=start_thread)
- list_process[_i] = p
- p.start()
- time.sleep(1)
- # mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,40,1,ots_client=self.ots_client)
- # mt.run()
- def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment,document_tenderee_code,document_agency_code,document_candidates],document_name="document"):
- '''
- 根据docid查询公告内容,先查询document_tmp,再查询document
- :param list_docids:
- :return:
- '''
- list_docs = []
- set_fingerprint = set()
- for _docid in list_docids:
- docid = int(_docid)
- _dict = {document_partitionkey:getPartitionKey(docid),
- document_docid:docid}
- if document_name in {"document","document_tmp"}:
- _doc = Document_tmp(_dict)
- _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
- if not _exists:
- _doc = Document(_dict)
- _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
- else:
- _doc = Document(_dict)
- _doc.table_name = document_name
- _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
- if _exists:
- _fingerprint = _doc.getProperties().get(document_fingerprint)
- if _fingerprint in set_fingerprint:
- continue
- set_fingerprint.add(_fingerprint)
- list_docs.append(_doc)
- for _doc in list_docs:
- try:
- _sub_docs_json = _doc.getProperties().get(document_tmp_sub_docs_json)
- if _sub_docs_json is not None:
- _doc.setValue("sub_docs",json.loads(_sub_docs_json),False)
- except Exception as e:
- traceback.print_exc()
- list_docs.sort(key=lambda x:x.getProperties().get(document_page_time,""))
- return list_docs
- def is_same_package(self,_dict1,_dict2):
- sub_project_name1 = _dict1.get(project_sub_project_name,"")
- if sub_project_name1=="Project":
- sub_project_name1 = ""
- win_tenderer1 = _dict1.get(project_win_tenderer,"")
- win_bid_price1 = _dict1.get(project_win_bid_price,0)
- bidding_budget1 = _dict1.get(project_bidding_budget,0)
- sub_project_name2 = _dict2.get(project_sub_project_name,"")
- if sub_project_name2=="Project":
- sub_project_name2 = ""
- win_tenderer2 = _dict2.get(project_win_tenderer,"")
- win_bid_price2 = _dict2.get(project_win_bid_price,0)
- bidding_budget2 = _dict2.get(project_bidding_budget,0)
- _set = set([a for a in [sub_project_name1,sub_project_name2] if a!=""])
- if len(_set)>1:
- return False
- _set = set([a for a in [win_tenderer1,win_tenderer2] if a!=""])
- if len(_set)>1:
- return False
- _set = set([a for a in [win_bid_price1,win_bid_price2] if a!=0])
- if len(_set)>1:
- return False
- _set = set([a for a in [bidding_budget1,bidding_budget2] if a!=0])
- if len(_set)>1:
- return False
- return True
- def getUpdate_dict(self,_dict):
- update_dict = {}
- for k,v in _dict.items():
- if v is None:
- continue
- if isinstance(v,str):
- if v=="":
- continue
- if isinstance(v,(float,int)):
- if v==0:
- continue
- update_dict[k] = v
- return update_dict
- def update_projects_by_document(self,docid,save,projects,document_name="document"):
- '''
- 更新projects中对应的document的属性
- :param docid:
- :param projects: 项目集合
- :param action:add/delete add时附加唯一属性,delete时删除唯一属性
- :return:
- '''
- list_docs = self.search_docs([docid],document_name=document_name)
- docs = [_doc.getProperties() for _doc in list_docs]
- project_dict = generate_common_properties(docs)
- list_package_properties = generate_packages_properties(docs)
- _dict = {}
- #更新公共属性
- _replace_replace = False
- v = project_dict.get(document_district,"")
- if not (v is None or v=="" or v=="[]" or v=="未知"):
- _replace_replace = True
- for k,v in project_dict.items():
- if not _replace_replace:
- if k in [document_district,document_city,document_province,document_area]:
- continue
- if v is None or v=="" or v=="[]" or v=="未知":
- continue
- if k in (project_project_dynamics,project_product,project_project_codes,project_docids,project_candidates,project_zhong_biao_page_time,project_zhao_biao_page_time,project_page_time,project_docchannel):
- continue
- _dict[k] = v
- for _proj in projects:
- _proj.update(_dict)
- for _proj in projects:
- if _proj.get(project_page_time,"")<=project_dict.get(project_page_time,""):
- _proj[project_page_time] = project_dict.get(project_page_time,"")
- _proj[project_docchannel] = project_dict.get(project_docchannel,"")
- else:
- if project_docchannel in project_dict:
- project_dict.pop(project_docchannel)
- if _proj.get(project_zhong_biao_page_time,"")>project_dict.get(project_zhong_biao_page_time,""):
- _proj[project_zhong_biao_page_time] = project_dict.get(project_zhong_biao_page_time,"")
- if _proj.get(project_zhao_biao_page_time,"")>project_dict.get(project_zhao_biao_page_time,""):
- _proj[project_zhao_biao_page_time] = project_dict.get(project_zhao_biao_page_time,"")
- for _proj in projects:
- #拼接属性
- append_dict = {}
- set_docid = set()
- set_product = set()
- set_code = set()
- set_nlp_enterprise = set()
- set_nlp_enterprise_attachment = set()
- set_candidates = set()
- _docids = _proj.get(project_docids,"")
- _codes = _proj.get(project_project_codes,"")
- _product = _proj.get(project_product,"")
- set_docid = set(_docids.split(","))
- if save==1:
- set_docid.add(str(docid))
- else:
- if str(docid) in set_docid:
- set_docid.remove(str(docid))
- set_code = set_code | set(_codes.split(","))
- set_product = set_product | set(_product.split(","))
- try:
- set_nlp_enterprise |= set(json.loads(_proj.get(project_nlp_enterprise,"[]")))
- set_nlp_enterprise_attachment |= set(json.loads(_proj.get(project_nlp_enterprise_attachment,"[]")))
- list_candidates = json.loads(project_dict.get(project_candidates,"[]"))
- for item in list_candidates:
- if item.get("name") is not None and item.get("name") not in set_candidates:
- set_candidates.add(item.get("name"))
- set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
- set_product = set_product | set(project_dict.get(project_product,"").split(","))
- set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
- set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
- for item in json.loads(_proj.get(project_candidates,"[]")):
- if item.get("name") is not None and item.get("name") not in set_candidates:
- set_candidates.add(item.get("name"))
- list_candidates.append(item)
- except Exception as e:
- pass
- append_dict[project_docids] = ",".join([a for a in list(set_docid) if a!=""])
- append_dict[project_docid_number] = len(set_docid)
- append_dict[project_project_codes] = ",".join([a for a in list(set_code) if a!=""])
- append_dict[project_product] = ",".join([a for a in list(set_product) if a!=""])
- append_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
- append_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
- append_dict[project_candidates] = json.dumps(list_candidates,ensure_ascii=False)
- dict_dynamic = {}
- set_docid = set()
- _dynamic = json.loads(_proj.get(project_project_dynamics,"[]"))
- for _dy in _dynamic:
- _docid = _dy.get("docid")
- dict_dynamic[_docid] = _dy
- _dynamic = json.loads(project_dict.get(project_project_dynamics,"[]"))
- for _dy in _dynamic:
- _docid = _dy.get("docid")
- dict_dynamic[_docid] = _dy
- list_dynamics = []
- for k,v in dict_dynamic.items():
- list_dynamics.append(v)
- list_dynamics.sort(key=lambda x:x.get(document_page_time,""))
- append_dict[project_project_dynamics] = json.dumps(list_dynamics[:100],ensure_ascii=False)
- _proj.update(append_dict)
- dict_package = {}
- for _pp in projects:
- _counts = 0
- sub_project_name = _pp.get(project_sub_project_name,"")
- if sub_project_name=="Project":
- sub_project_name = ""
- win_tenderer = _pp.get(project_win_tenderer,"")
- win_bid_price = _pp.get(project_win_bid_price,0)
- bidding_budget = _pp.get(project_bidding_budget,0)
- if win_tenderer!="" and bidding_budget!=0:
- _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
- dict_package[_key] = _pp
- _counts += 1
- if win_tenderer!="" and win_bid_price!=0:
- _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
- dict_package[_key] = _pp
- _counts +=1
- if _counts==0:
- if win_tenderer!="":
- _key = "%s-%s"%(sub_project_name,win_tenderer)
- dict_package[_key] = _pp
- _counts += 1
- if bidding_budget!=0:
- _key = "%s-%s"%(sub_project_name,str(bidding_budget))
- dict_package[_key] = _pp
- _counts += 1
- #更新私有属性
- if len(projects)==1 and len(list_package_properties)==1:
- _pp = list_package_properties[0]
- pp = projects[0]
- ud = self.getUpdate_dict(_pp)
- self.set_project_uuid(ud,pp.get("uuid"))
- pp.update(_pp)
- else:
- for _pp in list_package_properties:
- flag_update = False
- sub_project_name = _pp.get(project_sub_project_name,"")
- if sub_project_name=="Project":
- sub_project_name = ""
- win_tenderer = _pp.get(project_win_tenderer,"")
- win_bid_price = _pp.get(project_win_bid_price,0)
- bidding_budget = _pp.get(project_bidding_budget,0)
- if win_tenderer!="" and bidding_budget!=0:
- _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
- if _key in dict_package:
- if self.is_same_package(_pp,dict_package[_key]):
- ud = self.getUpdate_dict(_pp)
- self.set_project_uuid(ud,dict_package[_key].get("uuid"))
- dict_package[_key].update(ud)
- flag_update = True
- continue
- if win_tenderer!="" and win_bid_price!=0:
- _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
- if _key in dict_package:
- if self.is_same_package(_pp,dict_package[_key]):
- ud = self.getUpdate_dict(_pp)
- self.set_project_uuid(ud,dict_package[_key].get("uuid"))
- dict_package[_key].update(ud)
- flag_update = True
- continue
- if win_tenderer!="":
- _key = "%s-%s"%(sub_project_name,win_tenderer)
- if _key in dict_package:
- if self.is_same_package(_pp,dict_package[_key]):
- ud = self.getUpdate_dict(_pp)
- self.set_project_uuid(ud,dict_package[_key].get("uuid"))
- dict_package[_key].update(ud)
- flag_update = True
- continue
- if bidding_budget!=0:
- _key = "%s-%s"%(sub_project_name,str(bidding_budget))
- if _key in dict_package:
- if self.is_same_package(_pp,dict_package[_key]):
- ud = self.getUpdate_dict(_pp)
- self.set_project_uuid(ud,dict_package[_key].get("uuid"))
- dict_package[_key].update(ud)
- flag_update = True
- continue
- if not flag_update:
- _pp.update(project_dict)
- projects.append(_pp)
- _counts = 0
- if win_tenderer!="" and bidding_budget!=0:
- _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
- dict_package[_key] = _pp
- _counts += 1
- if win_tenderer!="" and win_bid_price!=0:
- _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
- dict_package[_key] = _pp
- _counts +=1
- if _counts==0:
- if win_tenderer!="":
- _key = "%s-%s"%(sub_project_name,win_tenderer)
- dict_package[_key] = _pp
- _counts += 1
- if bidding_budget!=0:
- _key = "%s-%s"%(sub_project_name,str(bidding_budget))
- dict_package[_key] = _pp
- _counts += 1
- def delete_projects_by_document(self,docid):
- '''
- 更新projects中对应的document的属性
- :param docid:
- :param projects: 项目集合
- :param action:add/delete add时附加唯一属性,delete时删除唯一属性
- :return:
- '''
- set_docid = set()
- list_delete_projects = []
- list_projects = self.search_projects_with_document([docid])
- for _proj in list_projects:
- _p = {}
- _docids = _proj.get(project_docids,"")
- print(_proj.get(project_uuid))
- _p["delete_uuid"] = _proj.get(project_uuid)
- _p["to_delete"] = True
- list_delete_projects.append(_p)
- if _docids!="":
- set_docid = set_docid | set(_docids.split(","))
- if str(docid) in set_docid:
- set_docid.remove(str(docid))
- list_docid = list(set_docid)
- list_projects = []
- if len(list_docid)>0:
- list_docs = self.search_docs(list_docid)
- print("search_docs(list_docid)")
- list_projects = self.generate_projects_from_document(list_docs)
- print("generate_projects_from_document")
- list_projects = dumplicate_projects(list_projects,max_count=20)
- print("dumplicate_projects")
- list_projects.extend(list_delete_projects)
- project_json = to_project_json(list_projects)
- return project_json
- def delete_doc_handle(self,_dict,result_queue):
- try:
- headers = _dict.get("frame")
- conn = _dict.get("conn")
- if headers is not None:
- message_id = headers.headers["message-id"]
- body = headers.body
- item = json.loads(body)
- docid = item.get("docid")
- log("==========start delete docid:%s"%(str(docid)))
- if docid is None:
- ackMsg(conn,message_id)
- delete_result = self.delete_projects_by_document(docid)
- log("1")
- _uuid = uuid4().hex
- _d = {PROJECT_PROCESS_UUID:_uuid,
- PROJECT_PROCESS_CRTIME:1,
- PROJECT_PROCESS_PROJECTS:delete_result}
- _pp = Project_process(_d)
- log("2")
- try:
- if _pp.update_row(self.ots_client):
- ackMsg(conn,message_id)
- except Exception as e:
- ackMsg(conn,message_id)
- log("3")
- #取消插入结果队列,改成插入project_process表
- # if send_msg_toacmq(self.pool_mq_ali,delete_result,self.doc_delete_result):
- # ackMsg(conn,message_id)
- log("==========end delete docid:%s"%(str(docid)))
- else:
- log("has not headers")
- except Exception as e:
- traceback.print_exc()
- ackMsg(conn,message_id)
- log("==========end delete docid:%s"%(str(docid)))
- def generate_common_properties(self,list_docs):
- '''
- #通用属性生成
- :param list_docis:
- :return:
- '''
- #计数法选择
- choose_dict = {}
- project_dict = {}
- for _key in [document_bidway,document_industry,document_info_type,document_info_source,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]:
- for _doc in list_docs:
- _value = _doc.getProperties().get(_key,"")
- if _value!="":
- if _key not in choose_dict:
- choose_dict[_key] = {}
- if _value not in choose_dict[_key]:
- choose_dict[_key][_value] = 0
- choose_dict[_key][_value] += 1
- _find = False
- for _key in [document_district,document_city,document_province,document_area]:
- area_dict = {}
- for _doc in list_docs:
- loc = _doc.getProperties().get(_key,"未知")
- if loc not in ('全国','未知',"0"):
- if loc not in area_dict:
- area_dict[loc] = 0
- area_dict[loc] += 1
- list_loc = []
- for k,v in area_dict.items():
- list_loc.append([k,v])
- list_loc.sort(key=lambda x:x[1],reverse=True)
- if len(list_loc)>0:
- project_dict[document_district] = _doc.getProperties().get(document_district)
- project_dict[document_city] = _doc.getProperties().get(document_city)
- project_dict[document_province] = _doc.getProperties().get(document_province)
- project_dict[document_area] = _doc.getProperties().get(document_area)
- _find = True
- break
- if not _find:
- if len(list_docs)>0:
- project_dict[document_district] = list_docs[0].getProperties().get(document_district)
- project_dict[document_city] = list_docs[0].getProperties().get(document_city)
- project_dict[document_province] = list_docs[0].getProperties().get(document_province)
- project_dict[document_area] = list_docs[0].getProperties().get(document_area)
- for _key,_value in choose_dict.items():
- _l = []
- for k,v in _value.items():
- _l.append([k,v])
- _l.sort(key=lambda x:x[1],reverse=True)
- if len(_l)>0:
- _v = _l[0][0]
- if _v in ('全国','未知'):
- if len(_l)>1:
- _v = _l[1][0]
- project_dict[_key] = _v
- list_dynamics = []
- docid_number = 0
- visuable_docids = []
- zhao_biao_page_time = ""
- zhong_biao_page_time = ""
- list_codes = []
- list_product = []
- p_page_time = ""
- remove_docids = set()
- for _doc in list_docs:
- table_name = _doc.getProperties().get("table_name")
- status = _doc.getProperties().get(document_status,0)
- _save = _doc.getProperties().get(document_tmp_save,1)
- doctitle = _doc.getProperties().get(document_doctitle,"")
- docchannel = _doc.getProperties().get(document_docchannel)
- page_time = _doc.getProperties().get(document_page_time,"")
- _docid = _doc.getProperties().get(document_docid)
- _bidway = _doc.getProperties().get(document_bidway,"")
- _docchannel = _doc.getProperties().get(document_life_docchannel,0)
- project_codes = _doc.getProperties().get(document_project_codes)
- product = _doc.getProperties().get(document_product)
- sub_docs = _doc.getProperties().get("sub_docs",[])
- is_multipack = True if len(sub_docs)>1 else False
- extract_count = _doc.getProperties().get(document_tmp_extract_count,0)
- if product is not None:
- list_product.extend(product.split(","))
- if project_codes is not None:
- _c = project_codes.split(",")
- list_codes.extend(_c)
- if p_page_time=="":
- p_page_time = page_time
- if zhao_biao_page_time=="" and _docchannel in (51,52,102,103,114):
- zhao_biao_page_time = page_time
- if zhong_biao_page_time=="" and _docchannel in (101,118,119,120):
- zhong_biao_page_time = page_time
- is_visuable = 0
- if table_name=="document":
- if status>=201 and status<=300:
- docid_number +=1
- visuable_docids.append(str(_docid))
- is_visuable = 1
- else:
- remove_docids.add(str(_docid))
- else:
- if _save==1:
- docid_number +=1
- visuable_docids.append(str(_docid))
- is_visuable = 1
- else:
- remove_docids.add(str(_docid))
- list_dynamics.append({document_docid:_docid,
- document_doctitle:doctitle,
- document_docchannel:_docchannel,
- document_bidway:_bidway,
- document_page_time:page_time,
- document_status:201 if is_visuable==1 else 401,
- "is_multipack":is_multipack,
- document_tmp_extract_count:extract_count
- }
- )
- project_dict[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
- project_dict[project_docid_number] = docid_number
- project_dict[project_docids] = ",".join(list(set(visuable_docids)-remove_docids))
- if zhao_biao_page_time !="":
- project_dict[project_zhao_biao_page_time] = zhao_biao_page_time
- if zhong_biao_page_time !="":
- project_dict[project_zhong_biao_page_time] = zhong_biao_page_time
- project_dict[project_project_codes] = ",".join(list(set(list_codes)))
- project_dict[project_page_time] = p_page_time
- project_dict[project_product] = ",".join(list(set(list_product)))
- return project_dict
- def generate_packages_properties(self,list_docs):
- '''
- 生成分包属性
- :param list_docs:
- :return:
- '''
- list_properties = []
- set_key = set()
- for _doc in list_docs:
- _dict = {}
- sub_docs = _doc.getProperties().get("sub_docs")
- if sub_docs is not None:
- for _d in sub_docs:
- sub_project_code = _d.get(project_sub_project_code,"")
- sub_project_name = _d.get(project_sub_project_name,"")
- win_tenderer = _d.get(project_win_tenderer,"")
- win_bid_price = _d.get(project_win_bid_price,"")
- _key = "%s-%s-%s-%s"%(sub_project_code,sub_project_name,win_tenderer,win_bid_price)
- if _key in set_key:
- continue
- set_key.add(_key)
- list_properties.append(_d)
- return list_properties
- def generate_projects_from_document(self,list_docs):
- '''
- #通过公告生成projects
- :param list_docids:
- :return:
- '''
- #判断标段数
- list_projects = generate_projects([doc.getProperties() for doc in list_docs])
- return list_projects
- def search_projects_with_document(self,list_docids,project_table,project_table_index):
- '''
- 通过docid集合查询对应的projects
- :param list_docids:
- :return:
- '''
- log("search_projects_with_document %s"%str(list_docids))
- list_should_q = []
- for _docid in list_docids:
- list_should_q.append(TermQuery("docids",_docid))
- bool_query = BoolQuery(should_queries=list_should_q)
- _query = {"query":bool_query,"limit":20}
- list_project_dict = getDocument(_query,self.ots_client,[
- project_uuid,project_docids,project_zhao_biao_page_time,
- project_zhong_biao_page_time,
- project_page_time,
- project_area,
- project_province,
- project_city,
- project_district,
- project_info_type,
- project_industry,
- project_qcodes,
- project_project_name,
- project_project_code,
- project_project_codes,
- project_project_addr,
- project_tenderee,
- project_tenderee_addr,
- project_tenderee_phone,
- project_tenderee_contact,
- project_agency,
- project_agency_phone,
- project_agency_contact,
- project_sub_project_name,
- project_sub_project_code,
- project_bidding_budget,
- project_win_tenderer,
- project_win_bid_price,
- project_win_tenderer_manager,
- project_win_tenderer_phone,
- project_second_tenderer,
- project_second_bid_price,
- project_second_tenderer_manager,
- project_second_tenderer_phone,
- project_third_tenderer,
- project_third_bid_price,
- project_third_tenderer_manager,
- project_third_tenderer_phone,
- project_procurement_system,
- project_bidway,
- project_dup_data,
- project_docid_number,
- project_project_dynamics,
- project_product,
- project_moneysource,
- project_service_time,
- project_time_bidclose,
- project_time_bidopen,
- project_time_bidstart,
- project_time_commencement,
- project_time_completion,
- project_time_earnest_money_start,
- project_time_earnest_money_end,
- project_time_get_file_end,
- project_time_get_file_start,
- project_time_publicity_end,
- project_time_publicity_start,
- project_time_registration_end,
- project_time_registration_start,
- project_time_release,
- project_dup_docid,
- project_info_source,
- project_nlp_enterprise,
- project_nlp_enterprise_attachment,
- project_tenderee_code,
- project_agency_code,
- project_candidates,
- project_docchannel
- ],sort="page_time",table_name=project_table,table_index=project_table_index)
- return list_project_dict
- def set_project_uuid(self,_dict,_uuid):
- if _uuid is not None and _uuid!="":
- if "uuid" in _dict:
- _dict["uuid"] = "%s,%s"%(_dict["uuid"],_uuid)
- else:
- _dict["uuid"] = _uuid
- def getMerge_rules(self,page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district):
- whole_time_start = time.time()
- _time = time.time()
- list_query = []
- list_code = [a for a in project_codes.split(",") if a!='']
- should_q_code = BoolQuery(should_queries=[MatchQuery(project_project_codes,a) for a in list_code[:20]])
- # print("should_q_code",[a for a in list_code[:20]])
- should_q_cod = BoolQuery(should_queries=[MatchQuery(project_project_code,a) for a in list_code[:20]])
- list_product = [a for a in product.split(",") if a!='']
- should_q_product = BoolQuery(should_queries=[MatchQuery(project_product,a) for a in list_product[:20]])
- should_q_area = None
- if province!="" or city!="" or district!="":
- should_q = []
- if province not in ("","全国","未知") and province is not None:
- should_q.append(TermQuery(project_province,province))
- if city not in ("","全国","未知") and city is not None:
- should_q.append(TermQuery(project_city,city))
- if district not in ("","全国","未知") and district is not None:
- should_q.append(TermQuery(project_district,district))
- if len(should_q)>0:
- should_q_area = BoolQuery(should_queries=should_q)
- prepare_time = time.time()-_time
- _time = time.time()
- # log("list_code %s"%(str(list_code)))
- # log("list_product %s"%(str(list_product)))
- # log("tenderee %s"%(tenderee))
- # log("bidding_budget %s"%(bidding_budget))
- # log("win_tenderer %s"%(win_tenderer))
- # log("win_bid_price %s"%(win_bid_price))
- # log("project_name %s"%(project_name))
- log_time = time.time()-_time
- _time = time.time()
- if tenderee!="" and len(list_code)>0:
- _query = [TermQuery(project_tenderee,tenderee),
- should_q_code,
- ]
- list_query.append([_query,2])
- _query = [TermQuery(project_tenderee,tenderee),
- should_q_cod
- ]
- list_query.append([_query,2])
- if tenderee!="" and len(list_product)>0:
- _query = [TermQuery(project_tenderee,tenderee),
- should_q_product]
- list_query.append([_query,1])
- if tenderee!="" and project_name!="":
- _query = [TermQuery(project_tenderee,tenderee),
- TermQuery(project_project_name,project_name)]
- list_query.append([_query,2])
- if tenderee!="" and agency!="":
- _query = [TermQuery(project_tenderee,tenderee),
- TermQuery(project_agency,agency)]
- list_query.append([_query,0])
- if tenderee!="" and float(bidding_budget)>0:
- _query = [TermQuery(project_tenderee,tenderee),
- TermQuery(project_bidding_budget,bidding_budget)]
- list_query.append([_query,2])
- if float(bidding_budget)>0 and float(win_bid_price)>0:
- _query = [TermQuery(project_bidding_budget,bidding_budget),
- TermQuery(project_win_bid_price,win_bid_price)]
- list_query.append([_query,2])
- if tenderee!="" and win_tenderer!="":
- _query = [TermQuery(project_tenderee,tenderee),
- TermQuery(project_win_tenderer,win_tenderer)]
- list_query.append([_query,2])
- if agency!="" and win_tenderer!="":
- _query = [TermQuery(project_agency,agency),
- TermQuery(project_win_tenderer,win_tenderer)]
- list_query.append([_query,0])
- if agency!="" and len(list_product)>0:
- _query = [TermQuery(project_agency,agency),
- should_q_product]
- list_query.append([_query,1])
- if win_tenderer!="" and len(list_code)>0:
- _query = [TermQuery(project_win_tenderer,win_tenderer),
- should_q_code]
- list_query.append([_query,2])
- _query = [TermQuery(project_win_tenderer,win_tenderer),
- should_q_cod]
- list_query.append([_query,2])
- if win_tenderer!="" and sub_project_name!="":
- _query = [TermQuery(project_win_tenderer,win_tenderer),
- TermQuery(project_sub_project_name,sub_project_name)
- ]
- list_query.append([_query,2])
- if win_tenderer!="" and float(win_bid_price)>0:
- _query = [TermQuery(project_win_tenderer,win_tenderer),
- TermQuery(project_win_bid_price,win_bid_price)]
- list_query.append([_query,2])
- if win_tenderer!="" and float(bidding_budget)>0:
- _query = [TermQuery(project_win_tenderer,win_tenderer),
- TermQuery(project_bidding_budget,bidding_budget)]
- list_query.append([_query,2])
- if len(list_code)>0 and len(list_product)>0:
- _query = [should_q_code,
- should_q_product]
- list_query.append([_query,2])
- if len(list_code)>0:
- _query = [
- should_q_code]
- list_query.append([_query,2])
- _query = [
- should_q_cod]
- list_query.append([_query,1])
- if project_name!="" and project_name is not None:
- _query = [
- TermQuery(project_project_name,project_name)]
- list_query.append([_query,1])
- _query_title = [MatchPhraseQuery(project_doctitles,project_name)]
- list_query.append([_query_title,1])
- if len(list_product)>0 and should_q_area is not None:
- _query = [should_q_area,
- should_q_product]
- list_query.append([_query,0])
- generate_time = time.time()-_time
- whole_time = time.time()-whole_time_start
- # log("projects merge rules whole_time:%.3f prepare_time:%.3f log_time:%.3f generate_time:%.3f"%(whole_time,prepare_time,log_time,generate_time))
- return list_query
- def merge_projects(self,list_projects,b_log=False,check_columns=[project_uuid,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_project_name,project_project_code,project_project_codes,project_tenderee,project_agency,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_project_dynamics,project_product,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_nlp_enterprise,project_nlp_enterprise_attachment,project_docids,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_addr,project_tenderee_addr,project_agency_phone,project_agency_contact,project_tenderee_phone,project_tenderee_contact,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_moneysource,project_service_time,project_dup_docid,project_info_source],project_table="project2",project_table_index="project2_index_formerge"):
- '''
- 对项目进行合并
- :return:
- '''
- try:
- whole_time_start = time.time()
- set_uuid = set()
- for _proj in list_projects:
- _uuid = _proj.get("uuid")
- if _uuid is not None:
- set_uuid = set_uuid | set(_uuid.split(","))
- projects_merge_count = 0
- projects_check_rule_time = 0
- projects_update_time = 0
- projects_query_time = 0
- projects_prepare_time = 0
- current_date = getCurrent_date("%Y-%m-%d")
- min_date = timeAdd(current_date,-35,format="%Y-%m-%d")
- search_table = "project2"
- search_table_index = "project2_index_formerge"
- project_cls = Project
- docids = ""
- for _proj in list_projects[:30]:
- must_not_q = []
- for _uuid in list(set_uuid):
- must_not_q.append(TermQuery("uuid",_uuid))
- docids = _proj.get(project_docids,"")
- page_time = _proj.get(project_page_time,"")
- project_codes = _proj.get(project_project_codes,"")
- project_name = _proj.get(project_project_name,"")
- tenderee = _proj.get(project_tenderee,"")
- agency = _proj.get(project_agency,"")
- product = _proj.get(project_product,"")
- sub_project_name = _proj.get(project_sub_project_name,"")
- bidding_budget = _proj.get(project_bidding_budget,-1)
- win_tenderer = _proj.get(project_win_tenderer,"")
- win_bid_price = _proj.get(project_win_bid_price,-1)
- _dynamic = _proj.get(project_project_dynamics,"[]")
- is_yanshou = False
- list_dynamic = json.loads(_dynamic)
- for _d in list_dynamic:
- _title = _d.get("doctitle","")
- if re.search("验收公[示告]|验收结果",_title) is not None or _d.get("docchannel")==122:
- is_yanshou = True
- break
- province = _proj.get(project_province,"")
- city = _proj.get(project_city,"")
- district = _proj.get(project_district,"")
- if is_yanshou:
- page_time_less = timeAdd(page_time,-850)
- page_time_greater = timeAdd(page_time,820)
- else:
- page_time_less = timeAdd(page_time,-450)
- page_time_greater = timeAdd(page_time,420)
- sub_project_q = TermQuery(project_sub_project_name,sub_project_name) if sub_project_name.replace("Project","")!="" else None
- _time = time.time()
- list_must_query = self.getMerge_rules(page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district)
- list_merge_data = []
- search_table = "project2"
- search_table_index = "project2_index_formerge"
- project_cls = Project
- search_table = project_table
- search_table_index = project_table_index
- # print("page_time,min_date",page_time,min_date)
- # if page_time>=min_date:
- # search_table = "project2_tmp"
- # search_table_index = "project2_tmp_index"
- # project_cls = Project_tmp
- _step = 2
- _begin = 0
- must_queries = []
- if page_time_less is not None and page_time_greater is not None:
- must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
- # RangeQuery("status",201,301)
- ]
- #sub_project_name非必要条件
- # if sub_project_q is not None:
- # must_queries.append(sub_project_q)
- projects_prepare_time += time.time()-_time
- _time = time.time()
- sort_type = SortOrder.DESC
- while _begin<len(list_must_query):
- if sort_type==SortOrder.DESC:
- sort_type=SortOrder.ASC
- if sort_type==SortOrder.ASC:
- sort_type=SortOrder.DESC
- list_should_q = []
- _limit = 10
- for must_q,_count in list_must_query[_begin:_begin+_step]:
- must_q1 = list(must_q)
- must_q1.extend(must_queries)
- list_should_q.append(BoolQuery(must_queries=must_q1))
- _limit += _count*5
- _query = BoolQuery(
- should_queries=list_should_q,
- must_not_queries=must_not_q[:100]
- )
- # rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
- # SearchQuery(_query,limit=_limit),
- # columns_to_get=ColumnsToGet(column_names=[project_uuid,project_docids,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_name,project_project_code,project_project_codes,project_project_addr,project_tenderee,project_tenderee_addr,project_tenderee_phone,project_tenderee_contact,project_agency,project_agency_phone,project_agency_contact,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_project_dynamics,project_product,project_moneysource,project_service_time,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_dup_docid,project_info_source,project_nlp_enterprise,project_nlp_enterprise_attachment],return_type=ColumnReturnType.SPECIFIED))
- rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search(search_table,search_table_index,
- SearchQuery(_query,sort=Sort(sorters=[FieldSort(project_page_time,sort_type)]),limit=_limit),
- columns_to_get=ColumnsToGet(column_names=check_columns,return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- list_merge_data.extend(list_data)
- # print(list_data)
- for _data in list_data:
- must_not_q.append(TermQuery(project_uuid,_data.get(project_uuid)))
- _begin += _step
- projects_query_time += time.time()-_time
- #优先匹配招标金额相近的
- projects_merge_count = len(list_merge_data)
- list_merge_data.sort(key=lambda x:x.get(project_page_time,""))
- list_merge_data.sort(key=lambda x:x.get(project_bidding_budget,-1))
- # log(page_time_less+"=="+page_time_greater)
- if b_log:
- log("list_merge_data count:%d"%(len(list_merge_data)))
- list_check_data = []
- for _data in list_merge_data:
- _time = time.time()
- _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
- if b_log:
- log(str(_check))
- projects_check_rule_time += time.time()-_time
- if _check:
- list_check_data.append([_data,_prob])
- list_check_data.sort(key=lambda x:x[1],reverse=True)
- for _data,_ in list_check_data:
- _time = time.time()
- _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
- projects_check_rule_time += time.time()-_time
- _time = time.time()
- if _check:
- # o_proj = project_cls(_data)
- # o_proj.fix_columns(self.ots_client,fix_columns,True)
- # for k in fix_columns:
- # _data[k] = o_proj.getProperties().get(k)
- update_projects_by_project(_data,[_proj])
- projects_update_time += time.time()-_time
- whole_time = time.time()-whole_time_start
- log("%s %s merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(search_table,docids,whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
- return list_projects
- except Exception as e:
- traceback.print_exc()
- assert 1==2
- def dumplicate_document_in_merge(self,list_projects,dup_docid,_docid,_docchannel,document_name="document",b_log=False):
- '''
- 合并时去重
- :param list_projects:
- :return:
- '''
- dup_docid = set([str(a) for a in dup_docid])
- set_dup_total = set()
- docid_item = self.get_attrs_before_dump(_docid)
- best_docid = None
- for _proj in list_projects:
- try:
- docids = _proj.get(project_docids,"")
- set_docids = set([a for a in docids.split(",") if a!=""])
- _project_dynamics = _proj.get(project_project_dynamics,"[]")
- list_dynamics = json.loads(_project_dynamics)
- set_dup_docid = set()
- list_dup_result = [(_docid,docid_item.get("extract_count"))]
- log("=========%s---%s"%(str(set_docids),str(_docid)))
- if str(_docid) in set_docids:
- list_to_dup_docid = []
- for _d in list_dynamics:
- docid = _d.get(document_docid)
- doctitle = _d.get(document_doctitle,"")
- docchannel = _d.get(document_docchannel,0)
- status = _d.get(document_status,0)
- if status>=401:
- continue
- if str(docid) not in set_docids:
- continue
- if str(docid) in dup_docid:
- continue
- if docchannel!=_docchannel:
- continue
- if docid==_docid:
- continue
- list_to_dup_docid.append(_d)
- for _d in list_to_dup_docid:
- docid = _d.get(document_docid)
- _item = self.get_attrs_before_dump(docid)
- _prob = check_dumplicate_rule(docid_item,_item,5,b_log=b_log)
- log("dumplicate_document_in_merge %s-%s prob %.2f"%(str(_docid),str(docid),_prob))
- if _prob>0.4:
- docid = int(docid)
- _d = {"partitionkey":docid%500+1,
- "docid":docid,
- }
- _doc = Document(_d)
- _doc.table_name = document_name
- if _doc.fix_columns(self.ots_client,[document_page_time,document_update_document],True):
- if _doc.getProperties().get(document_update_document,"")!="true":
- list_dup_result.append((docid,_item.get("extract_count")))
- list_dup_result.sort(key=lambda x:x[0])
- list_dup_result.sort(key=lambda x:x[1],reverse=True)
- if len(list_dup_result)>0:
- best_docid1 = list_dup_result[0][0]
- if best_docid1 not in set_dup_total:
- best_docid = best_docid1
- for _d in list_dup_result[1:]:
- set_dup_docid.add(str(_d[0]))
- for _dynamic in list_dynamics:
- if _dynamic.get(document_docid) in set_dup_docid:
- _dynamic[document_status] = 401
- set_docids = set_docids-set_dup_docid-dup_docid
- set_dup_total |= set_dup_docid
- if len(set_docids)==0:
- print(set_dup_docid,dup_docid)
- log("projects set_docids length is zero %s"%(docids))
- return None,None
- else:
- _proj[project_docids] = ",".join(list(set_docids))
- _proj[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
- _proj[project_docid_number] = len(set_docids)
- _proj[project_dup_docid] = ",".join(list(set_dup_docid))
- # log("dumplicate_document docid%s dynamic %d takes%.3f"%(str(docid),len(list_dynamics),time.time()-_time))
- except Exception as e:
- traceback.print_exc()
- if best_docid in set_dup_total:
- best_docid = None
- return best_docid,list(set_dup_total)
- def merge_document_real(self,item,dup_docid,save,document_name="document",project_table="project2",project_table_index="project2_index_formerge",b_log=False):
- '''
- 实时项目合并
- :param item:
- :param dup_docid:重复的公告集合
- :param status_to:
- :return:
- '''
- try:
- list_docids = []
- _docid = item.get(document_tmp_docid)
- list_docids.append(_docid)
- print("dup_docid",dup_docid)
- if save==0:
- dup_docid.insert(0,_docid)
- if isinstance(dup_docid,list):
- list_docids.extend(dup_docid)
- list_docids = [a for a in list_docids if a is not None]
- _time = time.time()
- list_projects = self.search_projects_with_document(list_docids,project_table,project_table_index)
- log("search %d projects takes:%.3f"%(len(list_projects),time.time()-_time))
- if len(list_projects)==0:
- # _time = time.time()
- list_docs = self.search_docs(list_docids,document_name=document_name)
- # log("search document takes:%.3f"%(time.time()-_time))
- # _time = time.time()
- list_projects = self.generate_projects_from_document(list_docs)
- # log("generate projects takes:%.3f"%(time.time()-_time))
- else:
- _time = time.time()
- self.update_projects_by_document(_docid,save,list_projects,document_name=document_name)
- # log("update projects takes:%.3f"%(time.time()-_time))
- _time = time.time()
- list_projects = dumplicate_projects(list_projects)
- # log("dumplicate projects takes:%.3f"%(time.time()-_time))
- _time = time.time()
- list_projects = self.merge_projects(list_projects,b_log,project_table=project_table,project_table_index=project_table_index)
- # log("merge projects takes:%.3f"%(time.time()-_time))
- _time = time.time()
- best_docid,list_merge_dump = self.dumplicate_document_in_merge(list_projects,dup_docid,_docid,item.get(document_docchannel),document_name=document_name,b_log=b_log)
- # log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
- if list_merge_dump is None:
- list_projects = []
- _time = time.time()
- project_json = to_project_json(list_projects)
- # log("json projects takes:%.3f"%(time.time()-_time))
- if b_log:
- log("project_json:%s"%project_json)
- return project_json,best_docid,list_merge_dump
- except Exception as e:
- raise RuntimeError("error on dumplicate")
- def is_exist_fingerprint(self,final_list,_docid,_fingerprint,is_tmp=False):
- set_fingerprint = set()
- for _i in range(1,len(final_list)):
- _dict = final_list[_i]
- b_docid = _dict[document_tmp_docid]
- _save = _dict.get(document_tmp_save,0)
- _status = _dict.get(document_tmp_status,0)
- if not is_tmp:
- if _status>=201 and _status<=300:
- _save = 1
- fingerprint_less = _dict.get(document_tmp_fingerprint,"")
- if b_docid==_docid:
- pass
- else:
- if _save==1:
- set_fingerprint.add(fingerprint_less)
- if _fingerprint in set_fingerprint:
- return True
- return False
- def exists_normal_fingerprint(self,_fingerprint,docid,table_name="document",table_index="document_index"):
- query = BoolQuery(must_queries=[
- RangeQuery("status",201,301),
- TermQuery("fingerprint",_fingerprint),
- RangeQuery("docid",0,docid-400000),
- ]
- )
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
- SearchQuery(query,get_total_count=True,limit=1))
- if total_count>0:
- return True
- return False
- def check_page_time(self,item,table_name="document",table_index="document_index"):
- page_time = item.get(document_page_time,"")
- has_before = False
- has_after = False
- bidclose_time = page_time
- web_source_name = item.get(document_tmp_web_source_name,"")
- docchannel = item.get(document_tmp_docchannel,"0")
- try:
- docchannel = int(docchannel)
- except:
- docchannel = 0
- if docchannel<200:
- if len(page_time)>0:
- l_page_time = timeAdd(page_time,days=-90)
- dict_time = item.get("dict_time",{})
- for k,v in dict_time.items():
- if v is not None and len(v)>0:
- if l_page_time>v:
- has_before = True
- if v>page_time:
- has_after = True
- if k==document_tmp_time_bidclose:
- bidclose_time = v
- set_web_source = {"中国招标投标公共服务平台","比地招标"}
- if web_source_name in set_web_source and bidclose_time<page_time:
- return False
- log("%s check page_time has_before %s has_after %s"%(str(item.get(document_docid)),str(has_before),str(has_after)))
- if has_before:
- _query = BoolQuery(must_queries=[MatchPhraseQuery(document_doctitle,item.get(document_doctitle,""))],
- must_not_queries=[TermQuery(document_docid,item.get(document_docid,0))])
- if not has_after:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
- SearchQuery(_query,get_total_count=True,limit=1))
- if total_count>0:
- log("%s check page_time false %s==%s-%s"%(str(item.get(document_docid)),l_page_time,k,v))
- return False
- if item.get(document_web_source_name,"")=="中国政府采购网":
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
- SearchQuery(_query,get_total_count=True,limit=1))
- if total_count>0:
- log("%s check 中国政府采购网 false "%(str(item.get(document_docid))))
- return False
- return True
- def dumplicate_comsumer_handle_interface(self,docid,document_table,document_table_index,project_table,project_table_index,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document],b_log=False,upgrade=False):
- result_dict = {"success":True}
- try:
- bool_query = BoolQuery(must_queries=[
- TermQuery("docid",docid)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(document_table,document_table_index,
- SearchQuery(bool_query,limit=1,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- if len(list_dict)==0:
- raise RuntimeError("未查找到docid为%s的数据"%(str(docid)))
- item = list_dict[0]
- self.post_extract(item)
- log("dumplicate start on:%s"%(str(item.get(document_tmp_docid))))
- base_list = []
- set_docid = set()
- list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=False,to_log=False,table_name=document_table,table_index=document_table_index)
- # print("len_rules",len(list_rules),table_name,table_index)
- list_rules.sort(key=lambda x:x["confidence"],reverse=True)
- log("dumplicate %s rules:%d"%(str(item.get(document_tmp_docid)),len(list_rules)))
- list_rules = list_rules[:30]
- _i = 0
- step = 2
- item["confidence"] = 999
- if item.get(document_tmp_docid) not in set_docid:
- base_list.append(item)
- set_docid.add(item.get(document_tmp_docid))
- while _i<len(list_rules):
- must_not_q = []
- if len(base_list)>0:
- must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
- _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
- must_not_queries=must_not_q)
- _rule = list_rules[_i]
- confidence = _rule["confidence"]
- singleNum_keys = _rule["singleNum_keys"]
- contain_keys = _rule["contain_keys"]
- multiNum_keys = _rule["multiNum_keys"]
- self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document],b_log=b_log)
- _i += step
- _time = time.time()
- # log("%d start final check with length:%d"%(item["docid"],len(base_list)))
- final_list = self.dumplicate_fianl_check(base_list,b_log)
- exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),is_tmp=table_name=="document_tmp")
- exist_normal_fingerprint = self.exists_normal_fingerprint(item.get(document_tmp_fingerprint),item.get(document_tmp_docid),table_name=table_name,table_index=table_index)
- # print("exist_normal_fingerprint",exist_normal_fingerprint)
- # log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
- best_docid = self.get_best_docid(final_list)
- final_list_docid = [a["docid"] for a in final_list]
- # log("%d:final_list_docid:%s"%(item["docid"],str(final_list_docid)))
- _d = {"partitionkey":item["partitionkey"],
- "docid":item["docid"],
- "status":random.randint(*flow_dumplicate_status_to),
- document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
- }
- dtmp = Document_tmp(_d)
- dup_docid = set()
- for _dict in final_list:
- if _dict.get("update_document","")!="true":
- dup_docid.add(_dict.get(document_tmp_docid))
- if item.get(document_tmp_docid) in dup_docid:
- dup_docid.remove(item.get(document_tmp_docid))
- remove_list = []
- _unnormal = False
- dmp_docid = ""
- _check_time = self.check_page_time(item,table_name=table_name,table_index=table_index)
- if (_check_time and not exist_normal_fingerprint and (len(final_list)==0 or best_docid==item.get(document_tmp_docid))) or item.get("update_document","")=="true":
- dtmp.setValue(document_tmp_save,1,True)
- # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- for _dict in final_list:
- if _dict.get(document_tmp_docid) in dup_docid:
- remove_list.append(_dict)
- else:
- if exist_normal_fingerprint:
- log("%s has exist_normal_fingerprint"%(str(item.get(document_docid))))
- best_docid = -1
- dmp_docid = ""
- _unnormal = True
- if not _check_time:
- best_docid = -2
- dmp_docid = ""
- _unnormal = True
- dtmp.setValue(document_tmp_save,0,True)
- if best_docid in dup_docid:
- dup_docid.remove(best_docid)
- for _dict in final_list:
- if _dict.get(document_tmp_docid) in dup_docid:
- remove_list.append(_dict)
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- else:
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- for _dict in final_list:
- if _dict.get(document_tmp_docid) in dup_docid:
- remove_list.append(_dict)
- list_docids = list(dup_docid)
- # if item.get(document_update_document)=="true":
- # dtmp.setValue(document_tmp_save,1,True)
- list_merge_dump = []
- if (exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0) or item.get(document_docchannel,0) in (301,302):
- if exist_finterprint:
- log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
- dtmp.setValue(document_tmp_projects,"[]",True)
- else:
- project_json,merge_best_docid,list_merge_dump = self.merge_document_real(item,list_docids,dtmp.getProperties().get(document_tmp_save),document_name=document_table,project_table=project_table,project_table_index=project_table_index,b_log=b_log)
- if merge_best_docid is not None and (best_docid is None or best_docid==item.get(document_tmp_docid) or best_docid<0):
- best_docid = merge_best_docid
- if list_merge_dump is not None and len(list_merge_dump)>0 and str(item.get(document_tmp_docid)) in list_merge_dump and item.get("update_document","")!="true":
- dtmp.setValue(document_tmp_save,0,True)
- if list_merge_dump is not None:
- dmp_docid = "%s,%s"%(dmp_docid,",".join([str(a) for a in list_merge_dump]))
- dtmp.setValue(document_tmp_projects,project_json,True)
- result_dict["projects"] = project_json
- log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
- dmp_docid = set([a for a in dmp_docid.split(",") if a!=""])
- if str(best_docid) in dmp_docid:
- dmp_docid.remove(str(best_docid))
- dmp_docid = ",".join([str(a) for a in list(dmp_docid)])
- result_dict["best_docid"] = str(best_docid) if best_docid is not None else ""
- result_dict["save"] = dtmp.getProperties().get("save")
- result_dict["dmp_docid"] = dmp_docid
- except Exception as e:
- result_dict["success"] = False
- result_dict["errmsg"] = str(e)
- return result_dict
- def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
- try:
- start_time = time.time()
- b_log = False if upgrade else True
- self.post_extract(item)
- log("dumplicate start on:%s"%(str(item.get(document_tmp_docid))))
- base_list = []
- set_docid = set()
- list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=get_all,to_log=b_log)
- # print("len_rules",len(list_rules),table_name,table_index)
- list_rules.sort(key=lambda x:x["confidence"],reverse=True)
- log("dumplicate %s rules:%d"%(str(item.get(document_tmp_docid)),len(list_rules)))
- list_rules = list_rules[:30]
- _i = 0
- step = 2
- item["confidence"] = 999
- if item.get(document_tmp_docid) not in set_docid:
- base_list.append(item)
- set_docid.add(item.get(document_tmp_docid))
- while _i<len(list_rules):
- must_not_q = []
- if len(base_list)>0:
- must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
- _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
- must_not_queries=must_not_q)
- _rule = list_rules[_i]
- confidence = _rule["confidence"]
- singleNum_keys = _rule["singleNum_keys"]
- contain_keys = _rule["contain_keys"]
- multiNum_keys = _rule["multiNum_keys"]
- self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document,document_tmp_web_source_name,'detail_link','products'],b_log=b_log)
- _i += step
- _time = time.time()
- # log("%d start final check with length:%d"%(item["docid"],len(base_list)))
- final_list = self.dumplicate_fianl_check(base_list,b_log)
- exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),is_tmp=table_name=="document_tmp")
- exist_normal_fingerprint = self.exists_normal_fingerprint(item.get(document_tmp_fingerprint),item.get(document_tmp_docid))
- # print("exist_normal_fingerprint",exist_normal_fingerprint)
- # log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
- best_docid = self.get_best_docid(final_list)
- final_list_docid = [a["docid"] for a in final_list]
- # log("%d:final_list_docid:%s"%(item["docid"],str(final_list_docid)))
- _d = {"partitionkey":item["partitionkey"],
- "docid":item["docid"],
- "status":random.randint(*flow_dumplicate_status_to),
- document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
- }
- dtmp = Document_tmp(_d)
- dup_docid = set()
- for _dict in final_list:
- if _dict.get("update_document","")!="true":
- dup_docid.add(_dict.get(document_tmp_docid))
- if item.get(document_tmp_docid) in dup_docid:
- dup_docid.remove(item.get(document_tmp_docid))
- remove_list = []
- _unnormal = False
- dmp_docid = ""
- _check_time = self.check_page_time(item)
- if (_check_time and not exist_normal_fingerprint and (len(final_list)==0 or best_docid==item.get(document_tmp_docid))) or item.get("update_document","")=="true":
- dtmp.setValue(document_tmp_save,1,True)
- # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- for _dict in final_list:
- if _dict.get(document_tmp_docid) in dup_docid:
- remove_list.append(_dict)
- else:
- if exist_normal_fingerprint:
- log("%s has exist_normal_fingerprint"%(str(item.get(document_docid))))
- best_docid = -1
- dmp_docid = ""
- _unnormal = True
- if not _check_time:
- best_docid = -2
- dmp_docid = ""
- _unnormal = True
- dtmp.setValue(document_tmp_save,0,True)
- if best_docid in dup_docid:
- dup_docid.remove(best_docid)
- for _dict in final_list:
- if _dict.get(document_tmp_docid) in dup_docid:
- remove_list.append(_dict)
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- else:
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- for _dict in final_list:
- if _dict.get(document_tmp_docid) in dup_docid:
- remove_list.append(_dict)
- list_docids = list(dup_docid)
- # if item.get(document_update_document)=="true":
- # dtmp.setValue(document_tmp_save,1,True)
- list_merge_dump = []
- if (exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0) or item.get(document_docchannel,0) in (301,302):
- if exist_finterprint:
- log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
- dtmp.setValue(document_tmp_projects,"[]",True)
- else:
- project_json,merge_best_docid,list_merge_dump = self.merge_document_real(item,list_docids,dtmp.getProperties().get(document_tmp_save),b_log=b_log)
- if merge_best_docid is not None and (best_docid is None or best_docid==item.get(document_tmp_docid) or best_docid<0):
- best_docid = merge_best_docid
- if list_merge_dump is not None and len(list_merge_dump)>0 and str(item.get(document_tmp_docid)) in list_merge_dump and item.get("update_document","")!="true":
- dtmp.setValue(document_tmp_save,0,True)
- if list_merge_dump is not None:
- dmp_docid = "%s,%s"%(dmp_docid,",".join([str(a) for a in list_merge_dump]))
- dtmp.setValue(document_tmp_projects,project_json,True)
- log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
- dmp_docid = set([a for a in dmp_docid.split(",") if a!=""])
- if str(best_docid) in dmp_docid:
- dmp_docid.remove(str(best_docid))
- dmp_docid = ",".join([str(a) for a in list(dmp_docid)])
- if _unnormal:
- dmp_docid = ""
- if upgrade:
- # print(dtmp.getProperties())
- dmp_docid = dmp_docid.replace(",,",",")
- dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
- dtmp.setValue(document_tmp_best_docid,best_docid,True)
- _flag = dtmp.update_row(self.ots_client)
- if not _flag:
- for i in range(10):
- list_proj_json = dtmp.getProperties().get(document_tmp_projects)
- if list_proj_json is not None:
- list_proj = json.loads(list_proj_json)
- dtmp.setValue(document_tmp_projects,json.dumps(list_proj[:len(list_proj)//2]),True)
- if dtmp.update_row(self.ots_client):
- break
- self.changeSaveStatus(remove_list)
- self.changeSaveStatus(list_merge_dump)
- else:
- return list_docids
- except Exception as e:
- traceback.print_exc()
- log("dumplicate error on:%s"%(str(item.get(document_tmp_docid))))
- finally:
- log("dumplicate end on:%s"%(str(item.get(document_tmp_docid))))
- self.queue_dumplicate_processed.put(item.get(document_tmp_docid))
- def fix_doc_which_not_in_project(self):
- '''
- 将成品公告中不存在于project2的数据取出,并放入document_tmp中重新进行去重和合并
- :return:
- '''
- def fix_doc_handle(item,result_queue):
- _docid = item.get(document_tmp_docid)
- b_q = BoolQuery(must_queries=[TermQuery(project_docids,str(_docid))])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
- SearchQuery(b_q,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- if total_count==0:
- log("fix_doc:%s not in project2"%(str(_docid)))
- d_tmp = Document_tmp(item)
- d_tmp.setValue(document_tmp_status,flow_dumplicate_status_from[0],True)
- d_tmp.update_row(self.ots_client)
- current_date = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
- before_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-20)
- after_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-5)
- if self.fix_doc_docid is None:
- bool_query = BoolQuery(must_queries=[
- TermQuery(document_tmp_save,1),
- RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
- RangeQuery(document_tmp_docchannel,0,300),
- RangeQuery(document_tmp_opertime,before_date,after_date)
- ])
- else:
- bool_query = BoolQuery(must_queries=[
- TermQuery(document_tmp_save,1),
- RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
- RangeQuery(document_tmp_docchannel,0,300),
- RangeQuery(document_tmp_docid,self.fix_doc_docid),
- RangeQuery(document_tmp_opertime,before_date,after_date)
- ])
- list_data = []
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),get_total_count=True,limit=100),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_d = getRow_ots(rows)
- list_data.extend(list_d)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_d = getRow_ots(rows)
- list_data.extend(list_d)
- print("%d/%d"%(len(list_data),total_count))
- if len(list_data)>0:
- self.fix_doc_docid = list_data[-1].get(document_tmp_docid)
- log("current fix_doc_docid:%s"%(str(self.fix_doc_docid)))
- task_queue = Queue()
- for _data in list_data:
- task_queue.put(_data)
- mt = MultiThreadHandler(task_queue,fix_doc_handle,None,30)
- mt.run()
- def send_daily_check_data(self):
- import datetime
- def get_download_url(bucket, ObjectName, timeout):
- url = ""
- exist = bucket.object_exists(ObjectName)
- if exist:
- get_url = False
- for i in range(3):
- try:
- url = bucket.sign_url('GET', ObjectName, timeout)
- url = url.replace("-internal", "") # 替换地址里的内网标识
- get_url = True
- except:
- pass
- if get_url:
- break
- return url
- file_timeout = 60 * 60 * 24 * 5 # 文件下载链接保存 5 天
- # 获取昨天的日期
- date = str(datetime.date.today() - datetime.timedelta(days=1))
- oss_path = 'tmp_document_quality_data/'
- object_path = oss_path + date + '/'
- msg = "每日数据质量检查结果(报警):"
- csv_name = "数据质量监控检查结果.xlsx"
- ObjectName = object_path + csv_name
- url = get_download_url(self.bucket,ObjectName,file_timeout)
- if url:
- msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
- csv_name = "公告重复量大的编号.xlsx"
- ObjectName = object_path + csv_name
- url = get_download_url(self.bucket, ObjectName, file_timeout)
- if url:
- msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
- csv_name = "公告附件重复量大的编号.xlsx"
- ObjectName = object_path + csv_name
- url = get_download_url(self.bucket, ObjectName, file_timeout)
- if url:
- msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
- csv_name = "附件识别异常的站源.xlsx"
- ObjectName = object_path + csv_name
- url = get_download_url(self.bucket, ObjectName, file_timeout)
- if url:
- msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
- csv_name = "报名时间,截止时间在发布时间之前的公告.xlsx"
- ObjectName = object_path + csv_name
- url = get_download_url(self.bucket, ObjectName, file_timeout)
- if url:
- msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
- atMobiles = ['18813973429'] # 维阵
- ACCESS_TOKEN_DATAWORKS = "https://oapi.dingtalk.com/robot/send?access_token=9489f01c4ab9f0c3f87e2ff5c3e35eb9fb0d17afb6244de4683596df1111daea"
- sentMsgToDD(msg,ACCESS_TOKEN_DATAWORKS,atMobiles=atMobiles)
- def send_daily_check_data2(self):
- import datetime
- import pandas as pd
- from itertools import groupby
- dict_channel = {"公告变更": 51,
- "招标公告": 52,
- "中标信息": 101,
- "招标预告": 102,
- "招标答疑": 103,
- "资审结果": 105,
- "法律法规": 106,
- "新闻资讯": 107,
- "采购意向": 114,
- "拍卖出让": 115,
- "土地矿产": 116,
- "产权交易": 117,
- "废标公告": 118,
- "候选人公示": 119,
- "合同公告": 120}
- label2channel = {v:k for k,v in dict_channel.items()}
- def post_data(url,json_data):
- post_sucess = False
- for i in range(3):
- if not post_sucess:
- try:
- # 发送POST请求,传输JSON数据
- response = requests.post(url, json=json_data)
- # 检查响应状态码
- if response.status_code == 200:
- post_sucess = True
- except requests.exceptions.RequestException as e:
- log("send_daily_check_data2,post error reason: %s"%(str(e)))
- pass
- return post_sucess
- res_json = {
- "data": [],
- "count": 0
- }
- # 获取昨天的日期
- date = str(datetime.date.today() - datetime.timedelta(days=1))
- oss_path = 'tmp_document_quality_data/'
- object_path = oss_path + date + '/'
- csv_name = "数据质量监控检查结果.xlsx"
- ObjectName = object_path + csv_name
- LocalPath = os.path.join(self.current_path,"download",csv_name)
- down_res = downloadFile(self.bucket,ObjectName,LocalPath,retry=3)
- if down_res:
- df = pd.read_excel(LocalPath)
- for web_source_no,original_docchannel,error_rule in zip(df['web_source_no'],df['original_docchannel'],df['error_rule']):
- error_rule = json.loads(error_rule)
- for error_type,error_sample in error_rule.items():
- tmp_data = {
- "WEB_SOURCE_NO": web_source_no,
- "WEBTYPE": label2channel.get(original_docchannel, ""),
- "TYPE": error_type,
- "ITEMS": error_sample
- }
- res_json['data'].append(tmp_data)
- res_json['count'] += 1
- os.remove(LocalPath)
- csv_name = "公告重复量大的编号.xlsx"
- ObjectName = object_path + csv_name
- down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
- if down_res:
- df = pd.read_excel(LocalPath)
- tmp_list = []
- for web_source_no,fingerprint,original_docchannel,cnt,res in zip(df['web_source_no'], df['fingerprint'],
- df['original_docchannel'],df['cnt'],df['res']):
- tmp_data = {
- "WEB_SOURCE_NO": web_source_no,
- "WEBTYPE": label2channel.get(original_docchannel, ""),
- "TYPE": "编号公告重复",
- "FINGERPRINT": fingerprint,
- "ITEMS": json.loads(res)
- }
- tmp_list.append(tmp_data)
- tmp_list.sort(key=lambda x: x['WEB_SOURCE_NO'])
- for key, group in groupby(tmp_list, lambda x: (x['WEB_SOURCE_NO'])):
- group = list(group)[:5]
- res_json['data'].extend(group)
- res_json['count'] += len(group)
- os.remove(LocalPath)
- csv_name = "公告附件重复量大的编号.xlsx"
- ObjectName = object_path + csv_name
- down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
- if down_res:
- df = pd.read_excel(LocalPath)
- tmp_list = []
- for web_source_no,filemd5,original_docchannel,cnt,res in zip(df['web_source_no'],df['filemd5'],
- df['original_docchannel'],df['cnt'],df['res']):
- tmp_data = {
- "WEB_SOURCE_NO": web_source_no,
- "WEBTYPE": label2channel.get(original_docchannel, ""),
- "TYPE": "编号附件重复",
- "FILEMD5": filemd5,
- "ITEMS": json.loads(res)
- }
- tmp_list.append(tmp_data)
- tmp_list.sort(key=lambda x: x['WEB_SOURCE_NO'])
- for key, group in groupby(tmp_list, lambda x: (x['WEB_SOURCE_NO'])):
- group = list(group)[:5]
- res_json['data'].extend(group)
- res_json['count'] += len(group)
- os.remove(LocalPath)
- csv_name = "附件识别异常的站源.xlsx"
- ObjectName = object_path + csv_name
- down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
- if down_res:
- df = pd.read_excel(LocalPath)
- for web_source_no,original_docchannel,error_ratio,error_sample,res in zip(df['web_source_no'], df['original_docchannel'],
- df['error_ratio'],df['error_sample'],df['res']):
- tmp_data = {
- "WEB_SOURCE_NO": web_source_no,
- "WEBTYPE": label2channel.get(original_docchannel, ""),
- "TYPE": "附件识别异常",
- "ITEMS": json.loads(res)
- }
- res_json['data'].append(tmp_data)
- res_json['count'] += 1
- os.remove(LocalPath)
- csv_name = "报名时间,截止时间在发布时间之前的公告.xlsx"
- ObjectName = object_path + csv_name
- down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
- if down_res:
- df = pd.read_excel(LocalPath)
- tmp_list = []
- for web_source_no,original_docchannel,res in zip(df['web_source_no'],df['original_docchannel'],df['res']):
- tmp_data = {
- "WEB_SOURCE_NO": web_source_no,
- "WEBTYPE": label2channel.get(original_docchannel, ""),
- "TYPE": "截止日期在发布日期之前",
- "ITEMS": json.loads(res)
- }
- tmp_list.append(tmp_data)
- res_json['data'].extend(tmp_list)
- res_json['count'] += len(tmp_list)
- os.remove(LocalPath)
- # url = "http://120.132.118.205:17090/saveQualityListData"
- url = "http://data-monitor.bidizhaobiao.com/oldApi/saveQualityListData"
- res = post_data(url,res_json)
- if res:
- log("send_daily_check_data2,sent data len: %d"%(res_json['count']))
- # 根据项目修复doc公告数据
- def fix_doc_by_project2(self):
- import datetime
- from itertools import groupby
- from collections import Counter
- label2key = {
- '公告变更': 51,
- '招标公告': 52,
- '中标信息': 101,
- '招标预告': 102,
- '招标答疑': 103,
- '招标文件': 104,
- '资审结果': 105,
- '法律法规': 106,
- '新闻资讯': 107,
- '采购意向': 114,
- '拍卖出让': 115,
- '土地矿产': 116,
- '产权交易': 117,
- '废标公告': 118,
- '候选人公示': 119,
- '合同公告': 120,
- '开标记录': 121,
- '验收合同': 122,
- # 以下排除
- '拟在建数据': 301,
- '审批项目数据': 302,
- '投诉处罚': 303
- }
- key2label = dict((i[1], i[0]) for i in label2key.items())
- today = str(datetime.date.today())
- yesterday = str(datetime.date.today() - datetime.timedelta(days=1))
- front_year = str(datetime.date.today() - datetime.timedelta(days=365))
- bool_query = BoolQuery(must_queries=[RangeQuery("update_time", yesterday + " 00:00:00", today + " 00:00:00"),
- RangeQuery("page_time", front_year, today),
- RangeQuery("status", 201, 301),
- RangeQuery("docid_number", 4, 30)]
- )
- all_rows = []
- rows, next_token, total_count, is_all_succeed = self.ots_client.search("project2", "project2_index",
- SearchQuery(bool_query, sort=Sort(sorters=[
- FieldSort("update_time", SortOrder.ASC)]),
- limit=100, get_total_count=True),
- ColumnsToGet(['uuid', 'docids', 'update_time','docid_number'],
- return_type=ColumnReturnType.SPECIFIED))
- all_rows.extend(rows)
- while next_token:
- rows, next_token, total_count, is_all_succeed = self.ots_client.search("project2", "project2_index",
- SearchQuery(bool_query,
- next_token=next_token,
- sort=Sort(sorters=[
- FieldSort("update_time",SortOrder.ASC)]),
- limit=100,get_total_count=True),
- ColumnsToGet(['uuid', 'docids', 'update_time','docid_number'],
- return_type=ColumnReturnType.SPECIFIED))
- all_rows.extend(rows)
- list_dict = getRow_ots(all_rows)
- docids_list = []
- for _dict in list_dict:
- _uuid = _dict.get("uuid", "")
- _docids = _dict.get("docids", "")
- _docids = _docids.split(",")
- for docid in _docids:
- docids_list.append([_uuid, int(docid)])
- # print('docids_list len:', len(docids_list))
- ots_query_res = []
- doc_columns_list = ['page_time', 'tenderee', 'tenderee_phone', 'agency', 'agency_phone', 'extract_count',
- "sub_docs_json",'extract_json', 'extract_json1', 'extract_json2', 'extract_json3']
- def extract_json_process(res_json):
- # 解析document数据
- extract_json = res_json.pop("extract_json")
- extract_json = extract_json if extract_json else "{}"
- if 'extract_json1' in res_json:
- extract_json1 = res_json.pop("extract_json1")
- extract_json1 = extract_json1 if extract_json1 else ""
- extract_json = extract_json + extract_json1
- if 'extract_json2' in res_json:
- extract_json2 = res_json.pop("extract_json2")
- extract_json2 = extract_json2 if extract_json2 else ""
- extract_json = extract_json + extract_json2
- if 'extract_json3' in res_json:
- extract_json3 = res_json.pop("extract_json3")
- extract_json3 = extract_json3 if extract_json3 else ""
- extract_json = extract_json + extract_json3
- try:
- extract_json = json.loads(extract_json)
- except:
- return None
- docchannel_dict = extract_json.get('docchannel', {})
- res_json['docchannel'] = docchannel_dict.get('docchannel', "")
- res_json['life_docchannel'] = docchannel_dict.get('life_docchannel', "")
- district_dict = extract_json.get('district', {})
- res_json['province'] = district_dict.get('province', "")
- res_json['city'] = district_dict.get('city', "")
- res_json['district'] = district_dict.get('district', "")
- res_json['area'] = district_dict.get('area', "")
- prem = extract_json.get('prem', {})
- res_json['prem'] = prem
- return res_json
- def _handle(item, _):
- # 查询解析document数据
- _uuid = item[0] # project uuid
- _docid = item[1]
- for i in range(3):
- try:
- bool_query = BoolQuery(must_queries=[TermQuery('docid', _docid)]
- )
- rows, next_token, total_count, is_all_succeed = self.ots_client.search("document", "document_index",
- SearchQuery(bool_query,
- sort=Sort(sorters=[FieldSort("page_time",SortOrder.ASC)]),
- limit=None,get_total_count=True),
- ColumnsToGet(doc_columns_list,
- return_type=ColumnReturnType.SPECIFIED))
- res = getRow_ots(rows)
- if res:
- # 通过extract_count过滤掉相关性不大的公告
- if res[0].get('extract_count', 0) > 5:
- ots_query_res.append([_uuid, _docid, extract_json_process(res[0])])
- break
- except Exception as e:
- # print('error:',e)
- pass
- task_queue = Queue()
- for item in docids_list:
- task_queue.put(item)
- if task_queue.qsize() >= 10000:
- _mt = MultiThreadHandler(task_queue, _handle, None, 20)
- _mt.run()
- if task_queue.qsize() >= 0:
- _mt = MultiThreadHandler(task_queue, _handle, None, 20)
- _mt.run()
- # print('ots_query_res len:', len(ots_query_res))
- # 处理修复数据
- ots_query_res.sort(key=lambda x: x[0])
- # 招标类别
- zb_type = [51, 52, 101, 102, 103, 104, 105, 114, 118, 119, 120, 121, 122]
- zb_type = [key2label[i] for i in zb_type]
- change_res = []
- for key, group in groupby(ots_query_res, lambda x: (x[0])):
- uuid = key
- project_data = list(group)
- all_len = len(project_data)
- if all_len < 4:
- continue
- zb_len = sum([1 if i[2].get('docchannel') in zb_type else 0 for i in project_data])
- # 招标类公告占比
- # if zb_len / all_len <= 0.5:
- if zb_len / all_len <= 0.7:
- # 项目不是招标相关项目
- continue
- # 项目里最多的省份
- province_list = [i[2].get('province', '') for i in project_data]
- province_sort = Counter(province_list).most_common()
- change_province = ""
- change_city = ""
- change_district = ""
- change_area = ""
- # if province_sort[0][1]/all_len > 0.5:
- if province_sort[0][1] / all_len > 0.7:
- if province_sort[0][0] and province_sort[0][0] not in ["全国", "未知"]:
- change_province = province_sort[0][0]
- if change_province:
- # 只替换到city,district 取"未知"
- change_province_data = [(i[2].get('province', ''), i[2].get('city', ''), i[2].get('area', '')) for i in
- project_data if i[2].get('province', '') == change_province]
- change_province_data_sort = Counter(change_province_data).most_common()
- change_city = change_province_data_sort[0][0][1]
- change_area = change_province_data_sort[0][0][2]
- change_district = "未知"
- # 联系方式统计
- phone_dict = {}
- for d in project_data:
- tenderee = d[2].get("tenderee", "")
- agency = d[2].get("agency", "")
- prem = d[2].get("prem", {})
- if len(prem) > 0:
- for name, project in prem.items():
- roleList = project.get("roleList", [])
- for role in roleList:
- role_name = role.get("role_name", "")
- role_text = role.get("role_text", "")
- if role_name in ['tenderee', 'agency', 'win_tenderer']:
- linklist = role.get("linklist", [])
- for _contact in linklist:
- if _contact[1] not in phone_dict:
- phone_dict[_contact[1]] = {}
- if role_text not in phone_dict[_contact[1]]:
- phone_dict[_contact[1]][role_text] = 0
- phone_dict[_contact[1]][role_text] += 1
- # 汇总电话对应的实体
- new_phone_dict = dict((phone, []) for phone in phone_dict)
- for phone, value in phone_dict.items():
- phone_name = [(name, count) for name, count in value.items()]
- phone_name.sort(key=lambda x: x[1], reverse=True)
- max_count = phone_name[0][1]
- max_name = [name for name, count in value.items() if count == max_count and max_count > 0]
- new_phone_dict[phone] = max_name
- for item in project_data:
- change_json = {"partitionkey": item[2].get("partitionkey"),
- 'docid': item[1],
- 'contactsByDelete': []}
- tenderee = item[2].get("tenderee", "")
- agency = item[2].get("agency", "")
- # docchannel修复
- docchannel = item[2].get('docchannel', "")
- life_docchannel = item[2].get('life_docchannel', "")
- if docchannel and docchannel not in zb_type:
- if life_docchannel in zb_type and docchannel != '采招数据':
- change_json['docchannel'] = label2key.get(life_docchannel)
- # province修复
- province = item[2].get('province', "")
- if change_province:
- if province != change_province and province in ["全国", "未知", '']: # province未识别时才修复
- change_json['province'] = change_province
- change_json['city'] = change_city
- change_json['district'] = change_district
- change_json['area'] = change_area
- # 联系方式修复
- tenderee_phone = item[2].get("tenderee_phone", "")
- agency_phone = item[2].get("agency_phone", "")
- prem = item[2].get("prem", {})
- sub_docs_json = item[2].get("sub_docs_json", "[]")
- try:
- sub_docs_json = json.loads(sub_docs_json)
- except:
- sub_docs_json = []
- for name, project in prem.items():
- roleList = project.get("roleList", [])
- for role in roleList:
- role_name = role.get("role_name", "")
- role_text = role.get("role_text", "")
- if role_name == 'tenderee' and role_text == tenderee:
- linklist = role.get("linklist", [])
- need_change = False
- right_contact = []
- for _contact in linklist:
- if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
- change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
- if _contact[1] == tenderee_phone:
- need_change = True
- else:
- right_contact.append([_contact[0], _contact[1]])
- if need_change:
- if right_contact:
- right_contact.sort(reverse=True)
- change_json['tendereeContact'] = right_contact[0][0]
- change_json['tendereePhone'] = right_contact[0][1]
- elif role_name == 'agency' and role_text == agency:
- linklist = role.get("linklist", [])
- need_change = False
- right_contact = []
- for _contact in linklist:
- if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
- change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
- if _contact[1] == agency_phone:
- need_change = True
- else:
- right_contact.append([_contact[0], _contact[1]])
- if need_change:
- if right_contact:
- right_contact.sort(reverse=True)
- change_json['agencyContact'] = right_contact[0][0]
- change_json['agencyPhone'] = right_contact[0][1]
- elif role_name == 'win_tenderer':
- linklist = role.get("linklist", [])
- for _contact in linklist:
- if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
- change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
- sub_docs_json_change = False
- if sub_docs_json:
- for _project in sub_docs_json:
- win_tenderer = _project.get("win_tenderer", "")
- win_tenderer_phone = _project.get("win_tenderer_phone", "")
- if win_tenderer_phone and new_phone_dict.get(win_tenderer_phone) and win_tenderer not in new_phone_dict[win_tenderer_phone]:
- _project["win_tenderer_phone"] = ""
- _project["win_tenderer_manager"] = ""
- sub_docs_json_change = True
- if sub_docs_json_change:
- change_json['subDocsJson'] = sub_docs_json
- new_contact_json = []
- for _contact in change_json['contactsByDelete']:
- if _contact not in new_contact_json:
- new_contact_json.append(_contact)
- change_json['contactsByDelete'] = new_contact_json
- if len(change_json) > 3 or len(change_json['contactsByDelete']) > 0:
- # 没有修改地区时,传输原来提取的地区
- if not change_json.get("province"):
- change_json['area'] = item[2].get("area", "")
- change_json['province'] = item[2].get("province", "")
- change_json['city'] = item[2].get("city", "")
- change_json['district'] = item[2].get("district", "")
- change_res.append({"document": change_json})
- # post result
- headers = {'Content-Type': 'application/json',
- "Authorization": "Bearer eyJhbGciOiJIUzUxMiJ9.eyJ1c2VySWQiOjEsInVzZXJuYW1lIjoiYWRtaW4iLCJ1dWlkIjoiNGQwYzA0ODYtMzVmZi00MDJhLTk4OWQtNWEwNTE3YTljMDNiIiwic3ViIjoiMSIsImlhdCI6MTY3OTk5MTcxNywiZXhwIjo0ODMzNTkxNzE3fQ.ESDDnEDYP5ioK4ouHOYXsZbLayGRNVI9ugpbxDx_3fPIceD1KIjlDeopBmeATLoz8VYQihd8qO-UzP5pDsaUmQ"}
- # url = "http://192.168.2.26:8002/document/updateAreaAndContact"
- url = "http://data-api.bidizhaobiao.com/document/updateAreaAndContact"
- for _data in change_res:
- post_sucess = False
- for i in range(3):
- if not post_sucess:
- try:
- # 发送POST请求,传输JSON数据
- response = requests.post(url, json=_data,headers=headers)
- # print(response.status_code,response.json())
- # 检查响应状态码
- if response.status_code == 200:
- post_sucess = True
- except requests.exceptions.RequestException as e:
- # log("fix doc by project2,post error reason: %s"%(str(e)))
- pass
- log("fix doc by project2, change doc nums:%d"%len(change_res))
- def start_flow_dumplicate(self):
- schedule = BlockingScheduler()
- schedule.add_job(self.flow_dumplicate,"cron",second="*/5")
- schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/30")
- schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
- schedule.add_job(self.flow_remove,"cron",hour="20")
- schedule.add_job(self.send_daily_check_data,"cron",hour='9', minute='10')
- schedule.add_job(self.send_daily_check_data2,"cron",hour='9', minute='10')
- schedule.add_job(self.fix_doc_by_project2,"cron",hour='8', minute='10')
- schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
- schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="*/10")
- schedule.start()
- def changeSaveStatus(self,list_dict):
- if list_dict is not None:
- for _dict in list_dict:
- if isinstance(_dict,dict):
- if _dict.get(document_tmp_save,1)==1:
- _d = {"partitionkey":_dict["partitionkey"],
- "docid":_dict["docid"],
- document_tmp_save:0
- }
- _d_tmp = Document_tmp(_d)
- if _d_tmp.exists_row(self.ots_client):
- _d_tmp.update_row(self.ots_client)
- elif isinstance(_dict,int):
- _d = {"partitionkey":_dict%500+1,
- "docid":_dict,
- document_tmp_save:0
- }
- _d_tmp = Document_tmp(_d)
- if _d_tmp.fix_columns(self.ots_client,["status",document_update_document],True):
- if _d_tmp.getProperties().get("status")==1:
- if _d_tmp.getProperties().get(document_update_document,"")!="true":
- _d_tmp.setValue("status",0,True)
- _d_tmp.update_row(self.ots_client)
- def test_dumplicate(self,docid):
- # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
- columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,'detail_link','products']
- # print('columns',columns)
- item = self.get_attrs_before_dump(docid,columns)
- if item:
- log("start dumplicate_comsumer_handle")
- self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=False)
- # self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
- return
- def test_merge(self,list_docid_less,list_docid_greater):
- list_docs_less = self.search_docs(list_docid_less)
- list_projects_less = self.generate_projects_from_document(list_docs_less)
- list_docs_greater = self.search_docs(list_docid_greater)
- list_projects_greater = self.generate_projects_from_document(list_docs_greater)
- list_projects_less.extend(list_projects_greater)
- list_projects = dumplicate_projects(list_projects_less,b_log=True)
- project_json = to_project_json(list_projects)
- log("project_json:%s"%project_json)
- return project_json
- def getRemainDoc(self,docid):
- columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
- bool_query = BoolQuery(must_queries=[
- TermQuery("docid",docid)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- if len(list_dict)>0:
- item = list_dict[0]
- start_time = time.time()
- self.post_extract(item)
- base_list = []
- set_docid = set()
- list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,to_log=True)
- list_rules.sort(key=lambda x:x["confidence"],reverse=True)
- _i = 0
- step = 5
- item["confidence"] = 999
- if item.get(document_tmp_docid) not in set_docid:
- base_list.append(item)
- set_docid.add(item.get(document_tmp_docid))
- while _i<len(list_rules):
- must_not_q = []
- if len(base_list)>0:
- must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
- _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
- must_not_queries=must_not_q)
- _rule = list_rules[_i]
- confidence = _rule["confidence"]
- singleNum_keys = _rule["singleNum_keys"]
- contain_keys = _rule["contain_keys"]
- multiNum_keys = _rule["multiNum_keys"]
- self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json])
- _i += step
- _time = time.time()
- log("%d start final check with length:%d"%(item["docid"],len(base_list)))
- final_list = self.dumplicate_fianl_check(base_list)
- log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
- best_docid = self.get_best_docid(final_list)
- return best_docid
- return None
- def compare_dumplicate_check():
- import pandas as pd
- df_dump = Dataflow_dumplicate(start_delete_listener=False)
- test_count = 1000
- # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
- columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district]
- bool_query = BoolQuery(must_queries=[
- RangeQuery("docid",400453395,400463395)
- ])
- rows,next_token,total_count,is_all_succeed = df_dump.ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=10,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_dumplicate producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- while 1:
- if not next_token or len(list_dict)>=test_count:
- break
- rows,next_token,total_count,is_all_succeed = df_dump.ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict.extend(getRow_ots(rows))
- def _handle1(_item,result_queue):
- try:
- list_docid = df_dump.dumplicate_comsumer_handle(_item,None,df_dump.ots_client,get_all=True,upgrade=False)
- _item["before"] = list_docid
- except Exception as e:
- pass
- dump_result = {}
- for item in list_dict:
- dump_result[item["docid"]] = {}
- task_queue = Queue()
- list_item = []
- for item in list_dict:
- _item = {}
- _item.update(item)
- list_item.append(_item)
- task_queue.put(_item)
- mt = MultiThreadHandler(task_queue,_handle1,None,30)
- mt.run()
- for item in list_item:
- dump_result[item["docid"]]["before"] = item.get("before")
- df_dump.check_rule = 2
- def _handle2(_item,result_queue):
- try:
- list_docid1 = df_dump.dumplicate_comsumer_handle(_item,None,df_dump.ots_client,get_all=True,upgrade=False)
- _item["after"] = list_docid1
- except Exception as e:
- pass
- task_queue = Queue()
- list_item = []
- for item in list_dict:
- _item = {}
- _item.update(item)
- list_item.append(_item)
- task_queue.put(_item)
- mt = MultiThreadHandler(task_queue,_handle2,None,30)
- mt.run()
- for item in list_item:
- dump_result[item["docid"]]["after"] = item.get("after")
- df_data = {"docid":[],
- "before":[],
- "after":[],
- "before-after":[],
- "after-before":[]}
- for docid,_d in dump_result.items():
- df_data["docid"].append(docid)
- before = _d.get("before",[])
- after = _d.get("after",[])
- df_data["before"].append(str(before))
- df_data["after"].append(str(after))
- df_data["before-after"].append(str(set(before)-set(after)))
- df_data["after-before"].append(str(set(after)-set(before)))
- df = pd.DataFrame(df_data,columns=["docid","before","after","before-after","after-before"])
- df.to_excel("compare_dump.xlsx")
- def fix_merge_docid(docid):
- def get_uuid_docids(docid):
- ots_client = getConnect_ots()
- bool_query = BoolQuery(must_queries=[
- TermQuery("docids",docid)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time")]),limit=100,get_total_count=True),
- ColumnsToGet(["docids"],return_type=ColumnReturnType.SPECIFIED))
- list_row = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(["docids"],return_type=ColumnReturnType.SPECIFIED))
- list_row.extend(getRow_ots(rows))
- return list_row
- def get_new_docid(list_docid1,list_docid2):
- return list(set(list_docid1)-set(list_docid2))
- def get_list_docid(list_row):
- list_docid = []
- for row in list_row:
- docids = row.get("docids",'')
- if docids:
- list_docid.extend([int(a) for a in docids.split(",")])
- return list(set(list_docid))
- def get_list_uuid(list_row):
- list_uuid = []
- for row in list_row:
- uuid = row.get("uuid",'')
- if uuid:
- list_uuid.append(uuid)
- return list(set(list_uuid))
- list_row = get_uuid_docids(docid)
- print(list_row)
- list_docid1 = get_list_docid(list_row)
- list_new_docid = get_new_docid(list_docid1,[docid])
- while 1:
- if len(list_new_docid)==0:
- break
- list_row2 = []
- for _docid in list_new_docid:
- list_row2.extend(get_uuid_docids(_docid))
- list_docid1 = get_list_docid(list_row)
- list_docid2 = get_list_docid(list_row2)
- list_new_docid = get_new_docid(list_docid1,list_docid2)
- list_row.extend(list_row2)
- list_uuid = get_list_uuid(list_row)
- list_docid = get_list_docid(list_row)
- print(list_uuid)
- print(list_docid)
- for _docid in list_docid:
- _d = Document({document_partitionkey:_docid%500+1,
- document_docid:_docid,
- document_status:1})
- if _d.exists_row(ots_client):
- _d.update_row(ots_client)
- for _uuid in list_uuid:
- _p = Project({project_uuid:_uuid,})
- _p.delete_row(ots_client)
- if __name__ == '__main__':
- a = time.time()
- # df = Dataflow()
- # df.flow_init()
- # df.flow_test()
- # df.test_merge()
- # df.start_flow_attachment()
- # df.start_flow_extract()
- # df.start_flow_dumplicate()
- # # df.start_flow_merge()
- # df.start_flow_remove()
- # download_attachment()
- # test_attachment_interface()
- df_dump = Dataflow_dumplicate(start_delete_listener=False)
- # df_dump.start_flow_dumplicate()
- df_dump.test_dumplicate(628365020
- )
- # df_dump.dumplicate_comsumer_handle_interface(603504420,document_table="document_0000",document_table_index="document_0000_index",project_table="project_0000",project_table_index="project_0000_index_formerge")
- # compare_dumplicate_check()
- # df_dump.test_merge([391898061
- # ],[371551361,])
- # df_dump.flow_remove_project_tmp()
- # fix_merge_docid(595271944)
- print("takes",time.time()-a)
- # df_dump.fix_doc_which_not_in_project()
- # df_dump.delete_projects_by_document(16288036)
- # log("=======")
- # for i in range(3):
- # time.sleep(20)
- #
- # a = {"docid":74295123}
- # send_msg_toacmq(df_dump.pool_mq_ali,json.dumps(a),df_dump.doc_delete_queue)
|