dataflow.py 264 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061506250635064506550665067506850695070507150725073507450755076507750785079508050815082508350845085508650875088508950905091509250935094509550965097509850995100510151025103510451055106510751085109511051115112511351145115511651175118511951205121512251235124512551265127512851295130513151325133513451355136513751385139514051415142514351445145514651475148514951505151515251535154515551565157515851595160516151625163516451655166516751685169517051715172517351745175517651775178517951805181518251835184518551865187518851895190519151925193519451955196519751985199520052015202520352045205520652075208520952105211521252135214521552165217521852195220522152225223522452255226522752285229523052315232523352345235523652375238523952405241524252435244524552465247524852495250525152525253525452555256525752585259526052615262526352645265526652675268526952705271527252735274527552765277527852795280528152825283528452855286528752885289529052915292529352945295529652975298529953005301530253035304530553065307530853095310531153125313531453155316531753185319532053215322532353245325532653275328532953305331533253335334533553365337533853395340534153425343534453455346534753485349535053515352535353545355535653575358535953605361536253635364536553665367536853695370537153725373537453755376537753785379538053815382538353845385538653875388538953905391539253935394539553965397539853995400540154025403540454055406540754085409541054115412541354145415541654175418541954205421542254235424542554265427542854295430543154325433543454355436543754385439544054415442544354445445544654475448544954505451545254535454545554565457545854595460546154625463546454655466546754685469547054715472547354745475547654775478547954805481548254835484548554865487548854895490549154925493549454955496549754985499550055015502550355045505550655075508550955105511551255135514
  1. # sys.path.append("/data")
  2. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ_ali
  3. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  4. from BaseDataMaintenance.common.multiProcess import MultiHandler
  5. from queue import Queue
  6. from multiprocessing import Queue as PQueue
  7. from BaseDataMaintenance.model.ots.document_tmp import *
  8. from BaseDataMaintenance.model.ots.attachment import *
  9. from BaseDataMaintenance.model.ots.document_html import *
  10. from BaseDataMaintenance.model.ots.document_extract2 import *
  11. from BaseDataMaintenance.model.ots.project import *
  12. from BaseDataMaintenance.model.ots.project2_tmp import *
  13. from BaseDataMaintenance.model.ots.document import *
  14. from BaseDataMaintenance.model.ots.project_process import *
  15. import base64
  16. from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
  17. from uuid import uuid4
  18. from BaseDataMaintenance.common.ossUtils import *
  19. from BaseDataMaintenance.dataSource.source import is_internal,getAuth
  20. from apscheduler.schedulers.blocking import BlockingScheduler
  21. from BaseDataMaintenance.maintenance.dataflow_settings import *
  22. from threading import Thread
  23. import oss2
  24. from BaseDataMaintenance.maxcompute.documentDumplicate import *
  25. from BaseDataMaintenance.maxcompute.documentMerge import *
  26. from BaseDataMaintenance.common.otsUtils import *
  27. from BaseDataMaintenance.common.activateMQUtils import *
  28. from BaseDataMaintenance.dataMonitor.data_monitor import BaseDataMonitor
  29. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  30. def getSet(list_dict,key):
  31. _set = set()
  32. for item in list_dict:
  33. if key in item:
  34. if item[key]!='' and item[key] is not None:
  35. if re.search("^\d[\d\.]*$",item[key]) is not None:
  36. _set.add(str(float(item[key])))
  37. else:
  38. _set.add(str(item[key]))
  39. return _set
  40. def getSimilarityOfString(str1,str2):
  41. _set1 = set()
  42. _set2 = set()
  43. if str1 is not None:
  44. for i in range(1,len(str1)):
  45. _set1.add(str1[i-1:i+1])
  46. if str2 is not None:
  47. for i in range(1,len(str2)):
  48. _set2.add(str2[i-1:i+1])
  49. _len = max(1,min(len(_set1),len(_set2)))
  50. return len(_set1&_set2)/_len
  51. def getDiffIndex(list_dict,key,confidence=100):
  52. _set = set()
  53. for _i in range(len(list_dict)):
  54. item = list_dict[_i]
  55. if item["confidence"]>=confidence:
  56. continue
  57. if key in item:
  58. if item[key]!='' and item[key] is not None:
  59. if re.search("^\d+(\.\d+)?$",item[key]) is not None:
  60. _set.add(str(float(item[key])))
  61. else:
  62. _set.add(str(item[key]))
  63. if len(_set)>1:
  64. return _i
  65. return len(list_dict)
  66. def transformSWF(bucket,attachment_hub_url,objectPath,localpath,swf_dir):
  67. swf_urls = []
  68. try:
  69. list_files = os.listdir(swf_dir)
  70. list_files.sort(key=lambda x:x)
  71. headers = dict()
  72. headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PUBLIC_READ
  73. for _file in list_files:
  74. swf_localpath = "%s/%s"%(swf_dir,_file)
  75. swf_objectPath = "%s/%s"%(objectPath.split(".")[0],_file)
  76. uploadFileByPath(bucket,swf_localpath,swf_objectPath,headers)
  77. _url = "%s/%s"%(attachment_hub_url,swf_objectPath)
  78. swf_urls.append(_url)
  79. os.remove(swf_localpath)
  80. except Exception as e:
  81. traceback.print_exc()
  82. return swf_urls
  83. class Dataflow():
  84. def __init__(self):
  85. self.ots_client = getConnect_ots()
  86. self.queue_init = Queue()
  87. self.queue_attachment = Queue()
  88. self.queue_attachment_ocr = Queue()
  89. self.queue_attachment_not_ocr = Queue()
  90. self.list_attachment_ocr = []
  91. self.list_attachment_not_ocr = []
  92. self.queue_extract = Queue()
  93. self.list_extract = []
  94. self.queue_dumplicate = PQueue()
  95. self.queue_dumplicate_processed = PQueue()
  96. self.dumplicate_set = set()
  97. self.queue_merge = Queue()
  98. self.queue_syncho = Queue()
  99. self.queue_remove = Queue()
  100. self.queue_remove_project = Queue()
  101. self.attachment_rec_interface = ""
  102. self.ots_client_merge = getConnect_ots()
  103. if is_internal:
  104. self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  105. else:
  106. self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  107. if is_internal:
  108. self.extract_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
  109. self.industy_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
  110. self.other_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
  111. else:
  112. self.extract_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
  113. self.industy_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
  114. self.other_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
  115. self.header = {'Content-Type': 'application/json',"Authorization":"NzZmOWZlMmU2MGY3YmQ4MDBjM2E5MDAyZjhjNjQ0MzZlMmE0NTMwZg=="}
  116. self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
  117. self.auth = getAuth()
  118. oss2.defaults.connection_pool_size = 100
  119. oss2.defaults.multiget_num_threads = 20
  120. log("bucket_url:%s"%(self.bucket_url))
  121. self.attachment_bucket_name = "attachment-hub"
  122. self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
  123. self.current_path = os.path.dirname(__file__)
  124. def flow_init(self):
  125. def producer():
  126. bool_query = BoolQuery(must_queries=[RangeQuery("crtime",'2022-04-20')])
  127. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  128. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  129. ColumnsToGet(return_type=ColumnReturnType.ALL))
  130. log("flow_init producer total_count:%d"%total_count)
  131. list_dict = getRow_ots(rows)
  132. for _dict in list_dict:
  133. self.queue_init.put(_dict)
  134. _count = len(list_dict)
  135. while next_token:
  136. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  137. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  138. ColumnsToGet(return_type=ColumnReturnType.ALL))
  139. list_dict = getRow_ots(rows)
  140. for _dict in list_dict:
  141. self.queue_init.put(_dict)
  142. _count += len(list_dict)
  143. def comsumer():
  144. mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
  145. mt.run()
  146. def comsumer_handle(item,result_queue,ots_client):
  147. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  148. if document_tmp_dochtmlcon in item:
  149. item.pop(document_tmp_dochtmlcon)
  150. if document_tmp_doctextcon in item:
  151. item.pop(document_tmp_doctextcon)
  152. if document_tmp_attachmenttextcon in item:
  153. item.pop(document_tmp_attachmenttextcon)
  154. _status = item.get(document_tmp_status)
  155. new_status = None
  156. if _status>=201 and _status<=300:
  157. item[document_tmp_save] = 1
  158. new_status = 81
  159. elif _status>=401 and _status<=450:
  160. item[document_tmp_save] = 0
  161. new_status = 81
  162. else:
  163. new_status = 1
  164. # new_status = 1
  165. item[document_tmp_status] = new_status
  166. dtmp = Document_tmp(item)
  167. dhtml = Document_html({document_tmp_partitionkey:item.get(document_tmp_partitionkey),
  168. document_tmp_docid:item.get(document_tmp_docid),
  169. document_tmp_dochtmlcon:_dochtmlcon})
  170. dtmp.update_row(ots_client)
  171. dhtml.update_row(ots_client)
  172. producer()
  173. comsumer()
  174. def getTitleFromHtml(self,filemd5,_html):
  175. _soup = BeautifulSoup(_html,"lxml")
  176. _find = _soup.find("a",attrs={"data":filemd5})
  177. _title = ""
  178. if _find is not None:
  179. _title = _find.get_text()
  180. return _title
  181. def getSourceLinkFromHtml(self,filemd5,_html):
  182. _soup = BeautifulSoup(_html,"lxml")
  183. _find = _soup.find("a",attrs={"filelink":filemd5})
  184. filelink = ""
  185. if _find is None:
  186. _find = _soup.find("img",attrs={"filelink":filemd5})
  187. if _find is not None:
  188. filelink = _find.attrs.get("src","")
  189. else:
  190. filelink = _find.attrs.get("href","")
  191. return filelink
  192. def request_attachment_interface(self,attach,_dochtmlcon):
  193. filemd5 = attach.getProperties().get(attachment_filemd5)
  194. _status = attach.getProperties().get(attachment_status)
  195. _filetype = attach.getProperties().get(attachment_filetype)
  196. _size = attach.getProperties().get(attachment_size)
  197. _path = attach.getProperties().get(attachment_path)
  198. _uuid = uuid4()
  199. objectPath = attach.getProperties().get(attachment_path)
  200. localpath = os.path.join(self.current_path,"download",_uuid.hex)
  201. docids = attach.getProperties().get(attachment_docids)
  202. try:
  203. if _size>ATTACHMENT_LARGESIZE:
  204. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE)
  205. log("attachment :%s of path:%s to large"%(filemd5,_path))
  206. attach.update_row(self.ots_client)
  207. return True
  208. else:
  209. d_start_time = time.time()
  210. if downloadFile(self.bucket,objectPath,localpath):
  211. time_download = time.time()-d_start_time
  212. _data_base64 = base64.b64encode(open(localpath,"rb").read())
  213. #调用接口处理结果
  214. start_time = time.time()
  215. _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype,kwargs={"timeout":600})
  216. if _success:
  217. log("process filemd5:%s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
  218. else:
  219. log("attach interface failed of docid:%s filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  220. # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  221. _html = ""
  222. return False
  223. swf_images = eval(swf_images)
  224. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  225. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  226. if len(swf_urls)==0:
  227. objectPath = attach.getProperties().get(attachment_path,"")
  228. localpath = os.path.join(self.current_path,"download/%s.swf"%(uuid4().hex))
  229. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  230. if not os.path.exists(swf_dir):
  231. os.mkdir(swf_dir)
  232. for _i in range(len(swf_images)):
  233. _base = swf_images[_i]
  234. _base = base64.b64decode(_base)
  235. filename = "swf_page_%d.png"%(_i)
  236. filepath = os.path.join(swf_dir,filename)
  237. with open(filepath,"wb") as f:
  238. f.write(_base)
  239. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  240. if os.path.exists(swf_dir):
  241. os.rmdir(swf_dir)
  242. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  243. if re.search("<td",_html) is not None:
  244. attach.setValue(attachment_has_table,1,True)
  245. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  246. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  247. if _file_title!="":
  248. attach.setValue(attachment_file_title,_file_title,True)
  249. if filelink!="":
  250. attach.setValue(attachment_file_link,filelink,True)
  251. attach.setValue(attachment_attachmenthtml,_html,True)
  252. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  253. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  254. attach.setValue(attachment_recsize,len(_html),True)
  255. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  256. attach.update_row(self.ots_client) #线上再开放更新
  257. return True
  258. else:
  259. return False
  260. except oss2.exceptions.NotFound as e:
  261. return True
  262. except Exception as e:
  263. traceback.print_exc()
  264. finally:
  265. try:
  266. os.remove(localpath)
  267. except:
  268. pass
  269. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  270. list_html = []
  271. swf_urls = []
  272. for _attach in list_attach:
  273. #测试全跑
  274. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  275. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  276. if _html is None:
  277. _html = ""
  278. list_html.append(_html)
  279. else:
  280. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  281. if not _succeed:
  282. return False,"",[]
  283. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  284. if _html is None:
  285. _html = ""
  286. list_html.append(_html)
  287. if _attach.getProperties().get(attachment_filetype)=="swf":
  288. swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  289. return True,list_html,swf_urls
  290. def generate_dumplicate_query(self,_dict,_dict_must_not,set_match=set(["project_code","project_codes","product"]),set_nested=set(["win_tenderer","bidding_budget","win_bid_price"]),
  291. set_term=set(["doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save","docid"]),
  292. set_range=set(["page_time","status"]),set_phrase=set(["doctitle","project_name"])):
  293. list_must_queries = []
  294. list_must_no_queries = []
  295. for k,v in _dict.items():
  296. if k in set_match:
  297. if isinstance(v,str):
  298. l_s = []
  299. for s_v in v.split(","):
  300. l_s.append(MatchQuery(k,s_v))
  301. list_must_queries.append(BoolQuery(should_queries=l_s))
  302. elif k in set_nested:
  303. _v = v
  304. if k!="":
  305. if k=="bidding_budget" or k=="win_bid_price":
  306. _v = float(_v)
  307. list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  308. else:
  309. list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  310. elif k in set_term:
  311. list_must_queries.append(TermQuery(k,v))
  312. elif k in set_phrase:
  313. list_must_queries.append(MatchPhraseQuery(k,v))
  314. elif k in set_range:
  315. if len(v)==1:
  316. list_must_queries.append(RangeQuery(k,v[0]))
  317. elif len(v)==2:
  318. list_must_queries.append(RangeQuery(k,v[0],v[1],True,True))
  319. for k,v in _dict_must_not.items():
  320. if k in set_match:
  321. if isinstance(v,str):
  322. l_s = []
  323. for s_v in v.split(","):
  324. l_s.append(MatchQuery(k,s_v))
  325. list_must_no_queries.append(BoolQuery(should_queries=l_s))
  326. elif k in set_nested:
  327. _v = v
  328. if k!="":
  329. if k=="bidding_budget" or k=="win_bid_price":
  330. _v = float(_v)
  331. list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  332. else:
  333. list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  334. elif k in set_term:
  335. list_must_no_queries.append(TermQuery(k,v))
  336. elif k in set_range:
  337. if len(v)==1:
  338. list_must_no_queries.append(RangeQuery(k,v[0]))
  339. elif len(v)==2:
  340. list_must_no_queries.append(RangeQuery(k,v[0],v[1],True,True))
  341. return BoolQuery(must_queries=list_must_queries,must_not_queries=list_must_no_queries)
  342. def f_decode_sub_docs_json(self, project_code,project_name,tenderee,agency,sub_docs_json):
  343. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  344. extract_count = 0
  345. if project_code is not None and project_code!="":
  346. extract_count += 1
  347. if project_name is not None and project_name!="":
  348. extract_count += 1
  349. if tenderee is not None and tenderee!="":
  350. extract_count += 1
  351. if agency is not None and agency!="":
  352. extract_count += 1
  353. if sub_docs_json is not None:
  354. try:
  355. sub_docs = json.loads(sub_docs_json)
  356. except Exception as e:
  357. sub_docs = []
  358. sub_docs.sort(key=lambda x:float(x.get("bidding_budget",0)),reverse=True)
  359. sub_docs.sort(key=lambda x:float(x.get("win_bid_price",0)),reverse=True)
  360. # log("==%s"%(str(sub_docs)))
  361. for sub_docs in sub_docs:
  362. for _key_sub_docs in sub_docs.keys():
  363. extract_count += 1
  364. if _key_sub_docs in columns:
  365. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  366. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  367. if float(sub_docs[_key_sub_docs])>0:
  368. columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
  369. else:
  370. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  371. return columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count
  372. def post_extract(self,_dict):
  373. win_tenderer,bidding_budget,win_bid_price,extract_count = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
  374. _dict["win_tenderer"] = win_tenderer
  375. _dict["bidding_budget"] = bidding_budget
  376. _dict["win_bid_price"] = win_bid_price
  377. if "extract_count" not in _dict:
  378. _dict["extract_count"] = extract_count
  379. def get_dump_columns(self,_dict):
  380. docchannel = _dict.get(document_tmp_docchannel,0)
  381. project_code = _dict.get(document_tmp_project_code,"")
  382. project_name = _dict.get(document_tmp_project_name,"")
  383. tenderee = _dict.get(document_tmp_tenderee,"")
  384. agency = _dict.get(document_tmp_agency,"")
  385. doctitle_refine = _dict.get(document_tmp_doctitle_refine,"")
  386. win_tenderer = _dict.get("win_tenderer","")
  387. bidding_budget = _dict.get("bidding_budget","")
  388. if bidding_budget==0:
  389. bidding_budget = ""
  390. win_bid_price = _dict.get("win_bid_price","")
  391. if win_bid_price==0:
  392. win_bid_price = ""
  393. page_time = _dict.get(document_tmp_page_time,"")
  394. fingerprint = _dict.get(document_tmp_fingerprint,"")
  395. product = _dict.get(document_tmp_product,"")
  396. return docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product
  397. def f_set_docid_limitNum_contain(self,item, _split,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"]):
  398. flag = True
  399. for _key in singleNum_keys:
  400. if len(getSet(_split,_key))>1:
  401. flag = False
  402. break
  403. for _key in multiNum_keys:
  404. if len(getSet(_split,_key))<=1:
  405. flag = False
  406. break
  407. project_code = item.get("project_code","")
  408. for _key in notlike_keys:
  409. if not flag:
  410. break
  411. for _d in _split:
  412. _key_v = _d.get(_key,"")
  413. _sim = getSimilarityOfString(project_code,_key_v)
  414. if _sim>0.7 and _sim<1:
  415. flag = False
  416. break
  417. #判断组内每条公告是否包含
  418. if flag:
  419. if len(contain_keys)>0:
  420. for _key in contain_keys:
  421. MAX_CONTAIN_COLUMN = None
  422. for _d in _split:
  423. contain_column = _d.get(_key)
  424. if contain_column is not None and contain_column !="":
  425. if MAX_CONTAIN_COLUMN is None:
  426. MAX_CONTAIN_COLUMN = contain_column
  427. else:
  428. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  429. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  430. flag = False
  431. break
  432. MAX_CONTAIN_COLUMN = contain_column
  433. else:
  434. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  435. flag = False
  436. break
  437. if flag:
  438. return _split
  439. return []
  440. def search_data_by_query(self,item,_query,confidence,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count,document_tmp_doctitle]):
  441. list_data = []
  442. if isinstance(_query,list):
  443. bool_query = BoolQuery(should_queries=_query)
  444. else:
  445. bool_query = _query
  446. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  447. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=50,get_total_count=True),
  448. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  449. list_dict = getRow_ots(rows)
  450. for _dict in list_dict:
  451. self.post_extract(_dict)
  452. _dict["confidence"] = confidence
  453. list_data.append(_dict)
  454. # _count = len(list_dict)
  455. # while next_token:
  456. # rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  457. # SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  458. # ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  459. # list_dict = getRow_ots(rows)
  460. # for _dict in list_dict:
  461. # self.post_extract(_dict)
  462. # _dict["confidence"] = confidence
  463. # list_data.append(_dict)
  464. list_dict = self.f_set_docid_limitNum_contain(item,list_dict,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys)
  465. return list_dict
  466. def add_data_by_query(self,item,base_list,set_docid,_query,confidence,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  467. list_dict = self.search_data_by_query(item,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
  468. for _dict in list_dict:
  469. self.post_extract(_dict)
  470. _docid = _dict.get(document_tmp_docid)
  471. if _docid not in set_docid:
  472. base_list.append(_dict)
  473. set_docid.add(_docid)
  474. def translate_dumplicate_rules(self,status_from,item):
  475. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  476. if page_time=='':
  477. page_time = getCurrent_date("%Y-%m-%d")
  478. base_dict = {
  479. "status":[status_from[0]],
  480. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  481. }
  482. must_not_dict = {"save":0}
  483. list_rules = []
  484. singleNum_keys = ["tenderee","win_tenderer"]
  485. if fingerprint!="":
  486. _dict = {}
  487. confidence = 100
  488. _dict[document_tmp_fingerprint] = fingerprint
  489. _dict.update(base_dict)
  490. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  491. _rule = {"confidence":confidence,
  492. "item":item,
  493. "query":_query,
  494. "singleNum_keys":[],
  495. "contain_keys":[],
  496. "multiNum_keys":[]}
  497. list_rules.append(_rule)
  498. if docchannel in (52,118):
  499. if bidding_budget!="" and tenderee!="" and project_code!="":
  500. confidence = 90
  501. _dict = {document_tmp_docchannel:docchannel,
  502. "bidding_budget":item.get("bidding_budget"),
  503. document_tmp_tenderee:item.get(document_tmp_tenderee,""),
  504. document_tmp_project_code:item.get(document_tmp_project_code,"")
  505. }
  506. _dict.update(base_dict)
  507. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  508. _rule = {"confidence":confidence,
  509. "query":_query,
  510. "singleNum_keys":singleNum_keys,
  511. "contain_keys":[],
  512. "multiNum_keys":[document_tmp_web_source_no]}
  513. list_rules.append(_rule)
  514. if doctitle_refine!="" and tenderee!="" and bidding_budget!="":
  515. confidence = 80
  516. _dict = {document_tmp_docchannel:docchannel,
  517. "doctitle_refine":doctitle_refine,
  518. "tenderee":tenderee,
  519. bidding_budget:"bidding_budget"
  520. }
  521. _dict.update(base_dict)
  522. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  523. _rule = {"confidence":confidence,
  524. "query":_query,
  525. "singleNum_keys":singleNum_keys,
  526. "contain_keys":[],
  527. "multiNum_keys":[document_tmp_web_source_no]}
  528. list_rules.append(_rule)
  529. if project_code!="" and doctitle_refine!="" and agency!="" and bidding_budget!="":
  530. confidence = 90
  531. _dict = {document_tmp_docchannel:docchannel,
  532. "project_code":project_code,
  533. "doctitle_refine":doctitle_refine,
  534. "agency":agency,
  535. "bidding_budget":bidding_budget
  536. }
  537. _dict.update(base_dict)
  538. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  539. _rule = {"confidence":confidence,
  540. "query":_query,
  541. "singleNum_keys":singleNum_keys,
  542. "contain_keys":[],
  543. "multiNum_keys":[document_tmp_web_source_no]}
  544. list_rules.append(_rule)
  545. if project_code!="" and tenderee!="" and bidding_budget!="":
  546. confidence = 91
  547. _dict = {document_tmp_docchannel:docchannel,
  548. "project_code":project_code,
  549. "tenderee":tenderee,
  550. "bidding_budget":bidding_budget
  551. }
  552. _dict.update(base_dict)
  553. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  554. _rule = {"confidence":confidence,
  555. "query":_query,
  556. "singleNum_keys":singleNum_keys,
  557. "contain_keys":[],
  558. "multiNum_keys":[document_tmp_web_source_no]}
  559. list_rules.append(_rule)
  560. if doctitle_refine!="" and agency!="" and bidding_budget!="":
  561. confidence = 71
  562. _dict = {document_tmp_docchannel:docchannel,
  563. "doctitle_refine":doctitle_refine,
  564. "agency":agency,
  565. "bidding_budget":bidding_budget
  566. }
  567. _dict.update(base_dict)
  568. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  569. _rule = {"confidence":confidence,
  570. "query":_query,
  571. "singleNum_keys":singleNum_keys,
  572. "contain_keys":[],
  573. "multiNum_keys":[document_tmp_web_source_no]}
  574. list_rules.append(_rule)
  575. if project_code!="" and project_name!="" and agency!="" and bidding_budget!="":
  576. confidence = 91
  577. _dict = {document_tmp_docchannel:docchannel,
  578. "project_code":project_code,
  579. "project_name":project_name,
  580. "agency":agency,
  581. "bidding_budget":bidding_budget
  582. }
  583. _dict.update(base_dict)
  584. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  585. n_singleKeys = [i for i in singleNum_keys]
  586. n_singleKeys.append(document_tmp_web_source_no)
  587. _rule = {"confidence":confidence,
  588. "query":_query,
  589. "singleNum_keys":n_singleKeys,
  590. "contain_keys":[],
  591. "multiNum_keys":[]}
  592. list_rules.append(_rule)
  593. ##-- 5. 招标公告 - 同项目编号- 同[项目名称、标题] - 同[招标人、代理公司] - 同预算(!=0) - 同信息源=1
  594. if project_code!="" and project_name!="" and tenderee!="" and bidding_budget!="":
  595. confidence = 91
  596. _dict = {document_tmp_docchannel:docchannel,
  597. "project_code":project_code,
  598. "project_name":project_name,
  599. "tenderee":tenderee,
  600. "bidding_budget":bidding_budget
  601. }
  602. _dict.update(base_dict)
  603. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  604. n_singleKeys = [i for i in singleNum_keys]
  605. n_singleKeys.append(document_tmp_web_source_no)
  606. _rule = {"confidence":confidence,
  607. "query":_query,
  608. "singleNum_keys":n_singleKeys,
  609. "contain_keys":[],
  610. "multiNum_keys":[]}
  611. list_rules.append(_rule)
  612. if project_code!="" and doctitle_refine!="" and tenderee!="" and bidding_budget!="":
  613. confidence = 71
  614. _dict = {document_tmp_docchannel:docchannel,
  615. "project_code":project_code,
  616. "doctitle_refine":doctitle_refine,
  617. "tenderee":tenderee,
  618. "bidding_budget":bidding_budget
  619. }
  620. _dict.update(base_dict)
  621. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  622. _rule = {"confidence":confidence,
  623. "query":_query,
  624. "singleNum_keys":singleNum_keys,
  625. "contain_keys":[],
  626. "multiNum_keys":[document_tmp_web_source_no]}
  627. list_rules.append(_rule)
  628. #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
  629. if project_name!="" and agency!="":
  630. tmp_bidding = 0
  631. if bidding_budget!="":
  632. tmp_bidding = bidding_budget
  633. confidence = 51
  634. _dict = {document_tmp_docchannel:docchannel,
  635. "project_name":project_name,
  636. "agency":agency,
  637. "bidding_budget":tmp_bidding
  638. }
  639. _dict.update(base_dict)
  640. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  641. _rule = {"confidence":confidence,
  642. "query":_query,
  643. "singleNum_keys":singleNum_keys,
  644. "contain_keys":[],
  645. "multiNum_keys":[document_tmp_web_source_no]}
  646. list_rules.append(_rule)
  647. #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
  648. if project_code!="" and agency!="":
  649. tmp_bidding = 0
  650. if bidding_budget!="":
  651. tmp_bidding = bidding_budget
  652. confidence = 51
  653. _dict = {document_tmp_docchannel:docchannel,
  654. "project_code":project_code,
  655. "agency":agency,
  656. "bidding_budget":tmp_bidding
  657. }
  658. _dict.update(base_dict)
  659. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  660. _rule = {"confidence":confidence,
  661. "query":_query,
  662. "singleNum_keys":singleNum_keys,
  663. "contain_keys":[],
  664. "multiNum_keys":[document_tmp_web_source_no]}
  665. list_rules.append(_rule)
  666. if docchannel not in (101,119,120):
  667. #-- 7. 非中标公告 - 同项目名称 - 同发布日期 - 同招标人 - 同预算 - 同类型 - 信息源>1 - 同项目编号
  668. if project_name!="" and tenderee!="" and project_code!="":
  669. tmp_bidding = 0
  670. if bidding_budget!="":
  671. tmp_bidding = bidding_budget
  672. confidence = 51
  673. _dict = {document_tmp_docchannel:docchannel,
  674. "project_name":project_name,
  675. "tenderee":tenderee,
  676. "project_code":project_code
  677. }
  678. _dict.update(base_dict)
  679. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  680. _rule = {"confidence":confidence,
  681. "query":_query,
  682. "singleNum_keys":singleNum_keys,
  683. "contain_keys":[],
  684. "multiNum_keys":[document_tmp_web_source_no]}
  685. list_rules.append(_rule)
  686. if docchannel in (101,119,120):
  687. #-- 3. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(==0)
  688. if project_code!="" and project_name!="" and win_tenderer!="":
  689. tmp_win = 0
  690. if win_bid_price!="":
  691. tmp_win = win_bid_price
  692. confidence = 61
  693. _dict = {document_tmp_docchannel:docchannel,
  694. "project_code":project_code,
  695. "project_name":project_name,
  696. "win_tenderer":win_tenderer,
  697. "win_bid_price":tmp_win
  698. }
  699. _dict.update(base_dict)
  700. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  701. _rule = {"confidence":confidence,
  702. "query":_query,
  703. "singleNum_keys":singleNum_keys,
  704. "contain_keys":[],
  705. "multiNum_keys":[]}
  706. list_rules.append(_rule)
  707. if project_code!="" and project_name!="" and bidding_budget!="" and product!="":
  708. confidence = 72
  709. _dict = {document_tmp_docchannel:docchannel,
  710. "project_code":project_code,
  711. "project_name":project_name,
  712. "bidding_budget":bidding_budget,
  713. "product":product
  714. }
  715. _dict.update(base_dict)
  716. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  717. n_singleKeys = [i for i in singleNum_keys]
  718. n_singleKeys.append(document_tmp_web_source_no)
  719. _rule = {"confidence":confidence,
  720. "query":_query,
  721. "singleNum_keys":n_singleKeys,
  722. "contain_keys":[],
  723. "multiNum_keys":[]}
  724. list_rules.append(_rule)
  725. if project_code!='' and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  726. confidence = 91
  727. _dict = {document_tmp_docchannel:docchannel,
  728. "project_code":project_code,
  729. "doctitle_refine":doctitle_refine,
  730. "win_tenderer":win_tenderer,
  731. "win_bid_price":win_bid_price
  732. }
  733. _dict.update(base_dict)
  734. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  735. n_singleKeys = [i for i in singleNum_keys]
  736. n_singleKeys.append(document_tmp_web_source_no)
  737. _rule = {"confidence":confidence,
  738. "query":_query,
  739. "singleNum_keys":n_singleKeys,
  740. "contain_keys":[],
  741. "multiNum_keys":[]}
  742. list_rules.append(_rule)
  743. ##-- 2. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(!=0) - 同信息源=1
  744. if project_code!="" and project_name!="" and win_tenderer!="" and win_bid_price!="":
  745. confidence = 91
  746. _dict = {document_tmp_docchannel:docchannel,
  747. "project_code":project_code,
  748. "project_name":project_name,
  749. "win_tenderer":win_tenderer,
  750. "win_bid_price":win_bid_price
  751. }
  752. _dict.update(base_dict)
  753. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  754. n_singleKeys = [i for i in singleNum_keys]
  755. n_singleKeys.append(document_tmp_web_source_no)
  756. _rule = {"confidence":confidence,
  757. "query":_query,
  758. "singleNum_keys":n_singleKeys,
  759. "contain_keys":[],
  760. "multiNum_keys":[]}
  761. list_rules.append(_rule)
  762. if project_name!="" and win_tenderer!="" and win_bid_price!="":
  763. confidence = 91
  764. _dict = {document_tmp_docchannel:docchannel,
  765. "project_name":project_name,
  766. "win_tenderer":win_tenderer,
  767. "win_bid_price":win_bid_price,
  768. }
  769. _dict.update(base_dict)
  770. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  771. _rule = {"confidence":confidence,
  772. "query":_query,
  773. "singleNum_keys":singleNum_keys,
  774. "contain_keys":[],
  775. "multiNum_keys":[document_tmp_web_source_no]}
  776. list_rules.append(_rule)
  777. if project_code!="" and win_tenderer!="" and win_bid_price!="":
  778. confidence = 91
  779. _dict = {document_tmp_docchannel:docchannel,
  780. "project_code":project_code,
  781. "win_tenderer":win_tenderer,
  782. "win_bid_price":win_bid_price,
  783. }
  784. _dict.update(base_dict)
  785. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  786. _rule = {"confidence":confidence,
  787. "query":_query,
  788. "singleNum_keys":singleNum_keys,
  789. "contain_keys":[],
  790. "multiNum_keys":[document_tmp_web_source_no]}
  791. list_rules.append(_rule)
  792. if project_code!="" and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  793. confidence = 91
  794. _dict = {document_tmp_docchannel:docchannel,
  795. "project_code":project_code,
  796. "doctitle_refine":doctitle_refine,
  797. "win_tenderer":win_tenderer,
  798. "win_bid_price":win_bid_price
  799. }
  800. _dict.update(base_dict)
  801. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  802. n_singleKeys = [i for i in singleNum_keys]
  803. n_singleKeys.append(document_tmp_web_source_no)
  804. _rule = {"confidence":confidence,
  805. "query":_query,
  806. "singleNum_keys":n_singleKeys,
  807. "contain_keys":[],
  808. "multiNum_keys":[]}
  809. list_rules.append(_rule)
  810. if doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  811. confidence=90
  812. _dict = {document_tmp_docchannel:docchannel,
  813. "doctitle_refine":doctitle_refine,
  814. "win_tenderer":win_tenderer,
  815. "win_bid_price":win_bid_price
  816. }
  817. _dict.update(base_dict)
  818. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  819. _rule = {"confidence":confidence,
  820. "query":_query,
  821. "singleNum_keys":singleNum_keys,
  822. "contain_keys":[],
  823. "multiNum_keys":[document_tmp_web_source_no]}
  824. list_rules.append(_rule)
  825. if project_name!="" and win_tenderer!="" and win_bid_price!="" and project_code!="":
  826. confidence=95
  827. _dict = {document_tmp_docchannel:docchannel,
  828. "project_name":project_name,
  829. "win_tenderer":win_tenderer,
  830. "win_bid_price":win_bid_price,
  831. "project_code":project_code
  832. }
  833. _dict.update(base_dict)
  834. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  835. _rule = {"confidence":confidence,
  836. "query":_query,
  837. "singleNum_keys":singleNum_keys,
  838. "contain_keys":[],
  839. "multiNum_keys":[document_tmp_web_source_no]}
  840. list_rules.append(_rule)
  841. if docchannel in (51,103,115,116):
  842. #9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
  843. if doctitle_refine!="" and tenderee!="":
  844. tmp_budget = 0
  845. if bidding_budget!="":
  846. tmp_budget = bidding_budget
  847. confidence=81
  848. _dict = {document_tmp_docchannel:docchannel,
  849. "doctitle_refine":doctitle_refine,
  850. "tenderee":tenderee,
  851. "bidding_budget":tmp_budget,
  852. }
  853. _dict.update(base_dict)
  854. _dict["page_time"] = [page_time,page_time]
  855. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  856. _rule = {"confidence":confidence,
  857. "query":_query,
  858. "singleNum_keys":singleNum_keys,
  859. "contain_keys":[],
  860. "multiNum_keys":[document_tmp_web_source_no]}
  861. list_rules.append(_rule)
  862. #-- 9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
  863. if project_code!="" and tenderee!="":
  864. confidence=81
  865. tmp_budget = 0
  866. if bidding_budget!="":
  867. tmp_budget = bidding_budget
  868. _dict = {document_tmp_docchannel:docchannel,
  869. "project_code":project_code,
  870. "tenderee":tenderee,
  871. "bidding_budget":tmp_budget,
  872. }
  873. _dict.update(base_dict)
  874. _dict["page_time"] = [page_time,page_time]
  875. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  876. _rule = {"confidence":confidence,
  877. "query":_query,
  878. "singleNum_keys":singleNum_keys,
  879. "contain_keys":[],
  880. "multiNum_keys":[document_tmp_web_source_no]}
  881. list_rules.append(_rule)
  882. if project_name!="" and tenderee!="":
  883. confidence=81
  884. tmp_budget = 0
  885. if bidding_budget!="":
  886. tmp_budget = bidding_budget
  887. _dict = {document_tmp_docchannel:docchannel,
  888. "project_name":project_name,
  889. "tenderee":tenderee,
  890. "bidding_budget":tmp_budget,
  891. }
  892. _dict.update(base_dict)
  893. _dict["page_time"] = [page_time,page_time]
  894. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  895. _rule = {"confidence":confidence,
  896. "query":_query,
  897. "singleNum_keys":singleNum_keys,
  898. "contain_keys":[],
  899. "multiNum_keys":[document_tmp_web_source_no]}
  900. list_rules.append(_rule)
  901. if agency!="" and tenderee!="":
  902. confidence=81
  903. tmp_budget = 0
  904. if bidding_budget!="":
  905. tmp_budget = bidding_budget
  906. _dict = {document_tmp_docchannel:docchannel,
  907. "agency":agency,
  908. "tenderee":tenderee,
  909. "bidding_budget":tmp_budget,
  910. "product":product
  911. }
  912. _dict.update(base_dict)
  913. _dict["page_time"] = [page_time,page_time]
  914. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  915. _rule = {"confidence":confidence,
  916. "query":_query,
  917. "singleNum_keys":singleNum_keys,
  918. "contain_keys":[],
  919. "multiNum_keys":[document_tmp_web_source_no]}
  920. list_rules.append(_rule)
  921. if agency!="" and project_code!="":
  922. confidence=81
  923. tmp_budget = 0
  924. if bidding_budget!="":
  925. tmp_budget = bidding_budget
  926. _dict = {document_tmp_docchannel:docchannel,
  927. "agency":agency,
  928. "project_code":project_code,
  929. "bidding_budget":tmp_budget,
  930. "product":product
  931. }
  932. _dict.update(base_dict)
  933. _dict["page_time"] = [page_time,page_time]
  934. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  935. _rule = {"confidence":confidence,
  936. "query":_query,
  937. "singleNum_keys":singleNum_keys,
  938. "contain_keys":[],
  939. "multiNum_keys":[document_tmp_web_source_no]}
  940. list_rules.append(_rule)
  941. if agency!="" and project_name!="":
  942. confidence=81
  943. tmp_budget = 0
  944. if bidding_budget!="":
  945. tmp_budget = bidding_budget
  946. _dict = {document_tmp_docchannel:docchannel,
  947. "agency":agency,
  948. "project_name":project_name,
  949. "bidding_budget":tmp_budget,
  950. "product":product
  951. }
  952. _dict.update(base_dict)
  953. _dict["page_time"] = [page_time,page_time]
  954. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  955. _rule = {"confidence":confidence,
  956. "query":_query,
  957. "singleNum_keys":singleNum_keys,
  958. "contain_keys":[],
  959. "multiNum_keys":[document_tmp_web_source_no]}
  960. list_rules.append(_rule)
  961. #五选二
  962. if tenderee!="" and bidding_budget!="" and product!="":
  963. confidence=80
  964. _dict = {document_tmp_docchannel:docchannel,
  965. "tenderee":tenderee,
  966. "bidding_budget":bidding_budget,
  967. "product":product,
  968. }
  969. _dict.update(base_dict)
  970. _dict["page_time"] = [page_time,page_time]
  971. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  972. _rule = {"confidence":confidence,
  973. "query":_query,
  974. "singleNum_keys":singleNum_keys,
  975. "contain_keys":[],
  976. "multiNum_keys":[]}
  977. list_rules.append(_rule)
  978. if tenderee!="" and win_tenderer!="" and product!="":
  979. confidence=80
  980. _dict = {document_tmp_docchannel:docchannel,
  981. "tenderee":tenderee,
  982. "win_tenderer":win_tenderer,
  983. "product":product,
  984. }
  985. _dict.update(base_dict)
  986. _dict["page_time"] = [page_time,page_time]
  987. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  988. _rule = {"confidence":confidence,
  989. "query":_query,
  990. "singleNum_keys":singleNum_keys,
  991. "contain_keys":[],
  992. "multiNum_keys":[]}
  993. list_rules.append(_rule)
  994. if tenderee!="" and win_bid_price!="":
  995. confidence=80
  996. _dict = {document_tmp_docchannel:docchannel,
  997. "tenderee":tenderee,
  998. "win_bid_price":win_bid_price,
  999. "product":product,
  1000. }
  1001. _dict.update(base_dict)
  1002. _dict["page_time"] = [page_time,page_time]
  1003. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1004. _rule = {"confidence":confidence,
  1005. "query":_query,
  1006. "singleNum_keys":singleNum_keys,
  1007. "contain_keys":[],
  1008. "multiNum_keys":[]}
  1009. list_rules.append(_rule)
  1010. if tenderee!="" and agency!="":
  1011. confidence=80
  1012. _dict = {document_tmp_docchannel:docchannel,
  1013. "tenderee":tenderee,
  1014. "agency":agency,
  1015. "product":product,
  1016. }
  1017. _dict.update(base_dict)
  1018. _dict["page_time"] = [page_time,page_time]
  1019. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1020. _rule = {"confidence":confidence,
  1021. "query":_query,
  1022. "singleNum_keys":singleNum_keys,
  1023. "contain_keys":[],
  1024. "multiNum_keys":[]}
  1025. list_rules.append(_rule)
  1026. if win_tenderer!="" and bidding_budget!="":
  1027. confidence=80
  1028. _dict = {document_tmp_docchannel:docchannel,
  1029. "win_tenderer":win_tenderer,
  1030. "bidding_budget":bidding_budget,
  1031. "product":product,
  1032. }
  1033. _dict.update(base_dict)
  1034. _dict["page_time"] = [page_time,page_time]
  1035. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1036. _rule = {"confidence":confidence,
  1037. "query":_query,
  1038. "singleNum_keys":singleNum_keys,
  1039. "contain_keys":[],
  1040. "multiNum_keys":[]}
  1041. list_rules.append(_rule)
  1042. if win_bid_price!="" and bidding_budget!="":
  1043. confidence=80
  1044. _dict = {document_tmp_docchannel:docchannel,
  1045. "win_bid_price":win_bid_price,
  1046. "bidding_budget":bidding_budget,
  1047. "product":product,
  1048. }
  1049. _dict.update(base_dict)
  1050. _dict["page_time"] = [page_time,page_time]
  1051. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1052. _rule = {"confidence":confidence,
  1053. "query":_query,
  1054. "singleNum_keys":singleNum_keys,
  1055. "contain_keys":[],
  1056. "multiNum_keys":[]}
  1057. list_rules.append(_rule)
  1058. if agency!="" and bidding_budget!="":
  1059. confidence=80
  1060. _dict = {document_tmp_docchannel:docchannel,
  1061. "agency":agency,
  1062. "bidding_budget":bidding_budget,
  1063. "product":product,
  1064. }
  1065. _dict.update(base_dict)
  1066. _dict["page_time"] = [page_time,page_time]
  1067. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1068. _rule = {"confidence":confidence,
  1069. "query":_query,
  1070. "singleNum_keys":singleNum_keys,
  1071. "contain_keys":[],
  1072. "multiNum_keys":[]}
  1073. list_rules.append(_rule)
  1074. if win_tenderer!="" and win_bid_price!="":
  1075. confidence=80
  1076. _dict = {document_tmp_docchannel:docchannel,
  1077. "win_tenderer":win_tenderer,
  1078. "win_bid_price":win_bid_price,
  1079. "product":product,
  1080. }
  1081. _dict.update(base_dict)
  1082. _dict["page_time"] = [page_time,page_time]
  1083. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1084. _rule = {"confidence":confidence,
  1085. "query":_query,
  1086. "singleNum_keys":singleNum_keys,
  1087. "contain_keys":[],
  1088. "multiNum_keys":[]}
  1089. list_rules.append(_rule)
  1090. if win_tenderer!="" and agency!="":
  1091. confidence=80
  1092. _dict = {document_tmp_docchannel:docchannel,
  1093. "win_tenderer":win_tenderer,
  1094. "agency":agency,
  1095. "product":product,
  1096. }
  1097. _dict.update(base_dict)
  1098. _dict["page_time"] = [page_time,page_time]
  1099. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1100. _rule = {"confidence":confidence,
  1101. "query":_query,
  1102. "singleNum_keys":singleNum_keys,
  1103. "contain_keys":[],
  1104. "multiNum_keys":[]}
  1105. list_rules.append(_rule)
  1106. if doctitle_refine!="" and product!="" and len(doctitle_refine)>7:
  1107. confidence=80
  1108. _dict = {document_tmp_docchannel:docchannel,
  1109. "doctitle_refine":doctitle_refine,
  1110. "product":product,
  1111. }
  1112. _dict.update(base_dict)
  1113. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1114. _rule = {"confidence":confidence,
  1115. "query":_query,
  1116. "singleNum_keys":singleNum_keys,
  1117. "contain_keys":[],
  1118. "multiNum_keys":[]}
  1119. list_rules.append(_rule)
  1120. return list_rules
  1121. def dumplicate_fianl_check(self,base_list):
  1122. the_group = base_list
  1123. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  1124. if len(the_group)>10:
  1125. keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget","doctitle_refine"]
  1126. else:
  1127. keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget"]
  1128. #置信度
  1129. list_key_index = []
  1130. for _k in keys:
  1131. if _k=="doctitle":
  1132. list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
  1133. else:
  1134. list_key_index.append(getDiffIndex(the_group,_k))
  1135. _index = min(list_key_index)
  1136. if _index>1:
  1137. return the_group[:_index]
  1138. return []
  1139. def get_best_docid(self,base_list):
  1140. to_reverse = False
  1141. dict_source_count = {}
  1142. for _item in base_list:
  1143. _web_source = _item.get(document_tmp_web_source_no)
  1144. _fingerprint = _item.get(document_tmp_fingerprint)
  1145. if _web_source is not None:
  1146. if _web_source not in dict_source_count:
  1147. dict_source_count[_web_source] = set()
  1148. dict_source_count[_web_source].add(_fingerprint)
  1149. if len(dict_source_count[_web_source])>=2:
  1150. to_reverse=True
  1151. # 专项债
  1152. if len(base_list)>0 and base_list[0].get("is_special_bonds")==1:
  1153. for _item in base_list:
  1154. detail_link = _item.get("detail_link")
  1155. detail_link = detail_link.strip() if detail_link else ""
  1156. if "bondId=" in detail_link:
  1157. bondId = detail_link.split("bondId=")[1]
  1158. bondId = bondId.split(",") if bondId else []
  1159. else:
  1160. bondId = []
  1161. _item['bondId_num'] = len(bondId)
  1162. # print([i.get("bondId_num") for i in base_list])
  1163. base_list.sort(key=lambda x:x["bondId_num"],reverse=True)
  1164. return base_list[0]["docid"]
  1165. if len(base_list)>0:
  1166. base_list.sort(key=lambda x:x["docid"],reverse=to_reverse)
  1167. base_list.sort(key=lambda x:x.get(document_attachment_extract_status,0),reverse=True)
  1168. base_list.sort(key=lambda x:x["extract_count"],reverse=True)
  1169. return base_list[0]["docid"]
  1170. def save_dumplicate(self,base_list,best_docid,status_from,status_to):
  1171. #best_docid need check while others can save directly
  1172. list_dict = []
  1173. for item in base_list:
  1174. docid = item["docid"]
  1175. _dict = {"partitionkey":item["partitionkey"],
  1176. "docid":item["docid"]}
  1177. if docid==best_docid:
  1178. if item.get("save",1)!=0:
  1179. _dict["save"] = 1
  1180. else:
  1181. _dict["save"] = 0
  1182. if item.get("status")>=status_from[0] and item.get("status")<=status_from[1]:
  1183. _dict["status"] = random.randint(status_to[0],status_to[1])
  1184. list_dict.append(_dict)
  1185. for _dict in list_dict:
  1186. dtmp = Document_tmp(_dict)
  1187. dtmp.update_row(self.ots_client)
  1188. def flow_test(self,status_to=[1,10]):
  1189. def producer():
  1190. bool_query = BoolQuery(must_queries=[
  1191. # ExistsQuery("docid"),
  1192. # RangeQuery("crtime",range_to='2022-04-10'),
  1193. # RangeQuery("status",61),
  1194. NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
  1195. ],
  1196. must_not_queries=[
  1197. # NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
  1198. TermQuery("attachment_extract_status",1),
  1199. RangeQuery("status",1,11)
  1200. ]
  1201. )
  1202. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1203. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1204. ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1205. log("flow_init producer total_count:%d"%total_count)
  1206. list_dict = getRow_ots(rows)
  1207. for _dict in list_dict:
  1208. self.queue_init.put(_dict)
  1209. _count = len(list_dict)
  1210. while next_token and _count<1000000:
  1211. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1212. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1213. ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1214. list_dict = getRow_ots(rows)
  1215. for _dict in list_dict:
  1216. self.queue_init.put(_dict)
  1217. _count += len(list_dict)
  1218. print("%d/%d"%(_count,total_count))
  1219. def comsumer():
  1220. mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
  1221. mt.run()
  1222. def comsumer_handle(item,result_queue,ots_client):
  1223. # print(item)
  1224. dtmp = Document_tmp(item)
  1225. dtmp.setValue(document_tmp_status,random.randint(*status_to),True)
  1226. dtmp.update_row(ots_client)
  1227. # dhtml = Document_html(item)
  1228. # dhtml.update_row(ots_client)
  1229. # dtmp.delete_row(ots_client)
  1230. # dhtml.delete_row(ots_client)
  1231. producer()
  1232. comsumer()
  1233. def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
  1234. def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_web_source_name]):
  1235. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
  1236. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1237. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1238. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1239. log("flow_dumplicate producer total_count:%d"%total_count)
  1240. list_dict = getRow_ots(rows)
  1241. for _dict in list_dict:
  1242. self.queue_dumplicate.put(_dict)
  1243. _count = len(list_dict)
  1244. while next_token and _count<flow_process_count:
  1245. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1246. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1247. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1248. list_dict = getRow_ots(rows)
  1249. for _dict in list_dict:
  1250. self.queue_dumplicate.put(_dict)
  1251. _count += len(list_dict)
  1252. def comsumer():
  1253. mt = MultiThreadHandler(self.queue_dumplicate,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1254. mt.run()
  1255. def comsumer_handle(item,result_queue,ots_client):
  1256. self.post_extract(item)
  1257. base_list = []
  1258. set_docid = set()
  1259. list_rules = self.translate_dumplicate_rules(flow_dumplicate_status_from,item)
  1260. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  1261. # print(item,"len_rules",len(list_rules))
  1262. for _rule in list_rules:
  1263. _query = _rule["query"]
  1264. confidence = _rule["confidence"]
  1265. singleNum_keys = _rule["singleNum_keys"]
  1266. contain_keys = _rule["contain_keys"]
  1267. multiNum_keys = _rule["multiNum_keys"]
  1268. self.add_data_by_query(item,base_list,set_docid,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys)
  1269. item["confidence"] = 999
  1270. if item.get(document_tmp_docid) not in set_docid:
  1271. base_list.append(item)
  1272. final_list = self.dumplicate_fianl_check(base_list)
  1273. best_docid = self.get_best_docid(final_list)
  1274. # log(str(final_list))
  1275. _d = {"partitionkey":item["partitionkey"],
  1276. "docid":item["docid"],
  1277. "status":random.randint(*flow_dumplicate_status_to),
  1278. document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  1279. }
  1280. dtmp = Document_tmp(_d)
  1281. dup_docid = set()
  1282. for _dict in final_list:
  1283. dup_docid.add(_dict.get(document_tmp_docid))
  1284. if item.get(document_tmp_docid) in dup_docid:
  1285. dup_docid.remove(item.get(document_tmp_docid))
  1286. if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
  1287. dtmp.setValue(document_tmp_save,1,True)
  1288. dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
  1289. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1290. else:
  1291. dtmp.setValue(document_tmp_save,0,True)
  1292. if best_docid in dup_docid:
  1293. dup_docid.remove(best_docid)
  1294. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1295. dmp_docid = "%d,%s"%(best_docid,dmp_docid)
  1296. else:
  1297. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1298. dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
  1299. dtmp.update_row(self.ots_client)
  1300. #只保留当前公告
  1301. # self.save_dumplicate(final_list,best_docid,status_from,status_to)
  1302. #
  1303. # print("=base=",item)
  1304. # if len(final_list)>=1:
  1305. # print("==================")
  1306. # for _dict in final_list:
  1307. # print(_dict)
  1308. # print("========>>>>>>>>>>")
  1309. producer()
  1310. comsumer()
  1311. def merge_document(self,item,status_to=None):
  1312. self.post_extract(item)
  1313. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  1314. _d = {"partitionkey":item["partitionkey"],
  1315. "docid":item["docid"],
  1316. }
  1317. dtmp = Document_tmp(_d)
  1318. if item.get(document_tmp_save,1)==1:
  1319. list_should_q = []
  1320. if project_code!="" and tenderee!="":
  1321. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1322. TermQuery("tenderee",tenderee)])
  1323. list_should_q.append(_q)
  1324. if project_name!="" and project_code!="":
  1325. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1326. TermQuery("project_name",project_name)])
  1327. list_should_q.append(_q)
  1328. if len(list_should_q)>0:
  1329. list_data = self.search_data_by_query(item,list_should_q,100,merge=True,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
  1330. if len(list_data)==1:
  1331. dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
  1332. print(item["docid"],list_data[0]["uuid"])
  1333. else:
  1334. list_should_q = []
  1335. if bidding_budget!="" and project_code!="":
  1336. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1337. TermQuery("bidding_budget",float(bidding_budget))])
  1338. list_should_q.append(_q)
  1339. if tenderee!="" and bidding_budget!="" and project_name!="":
  1340. _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
  1341. TermQuery("bidding_budget",float(bidding_budget)),
  1342. TermQuery("project_name",project_name)])
  1343. list_should_q.append(_q)
  1344. if tenderee!="" and win_bid_price!="" and project_name!="":
  1345. _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
  1346. TermQuery("win_bid_price",float(win_bid_price)),
  1347. TermQuery("project_name",project_name)])
  1348. list_should_q.append(_q)
  1349. if len(list_should_q)>0:
  1350. list_data = self.search_data_by_query(item,list_should_q,100,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
  1351. if len(list_data)==1:
  1352. dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
  1353. print(item["docid"],list_data[0]["uuid"])
  1354. return dtmp.getProperties().get("merge_uuid","")
  1355. # dtmp.update_row(self.ots_client)
  1356. def test_merge(self):
  1357. import pandas as pd
  1358. import queue
  1359. def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1360. list_test_item = []
  1361. should_q = BoolQuery(should_queries=[
  1362. TermQuery("docchannel",101),
  1363. TermQuery("docchannel",119),
  1364. TermQuery("docchannel",120)
  1365. ])
  1366. bool_query = BoolQuery(must_queries=[
  1367. TermQuery("page_time","2022-04-22"),
  1368. should_q,
  1369. TermQuery("save",1)
  1370. ])
  1371. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1372. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1373. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1374. log("flow_dumplicate producer total_count:%d"%total_count)
  1375. list_dict = getRow_ots(rows)
  1376. for _dict in list_dict:
  1377. list_test_item.append(_dict)
  1378. _count = len(list_dict)
  1379. while next_token:
  1380. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1381. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1382. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1383. list_dict = getRow_ots(rows)
  1384. for _dict in list_dict:
  1385. list_test_item.append(_dict)
  1386. _count += len(list_dict)
  1387. print("%d/%d"%(_count,total_count))
  1388. return list_test_item
  1389. from BaseDataMaintenance.model.ots.project import Project
  1390. def comsumer_handle(item,result_queue,ots_client):
  1391. item["merge_uuid"] = self.merge_document(item)
  1392. if item["merge_uuid"]!="":
  1393. _dict = {"uuid":item["merge_uuid"]}
  1394. _p = Project(_dict)
  1395. _p.fix_columns(self.ots_client,["zhao_biao_page_time"],True)
  1396. if _p.getProperties().get("zhao_biao_page_time","")!="":
  1397. item["是否有招标"] = "是"
  1398. list_test_item = producer()
  1399. task_queue = queue.Queue()
  1400. for item in list_test_item:
  1401. task_queue.put(item)
  1402. mt = MultiThreadHandler(task_queue,comsumer_handle,None,30,1,ots_client=self.ots_client)
  1403. mt.run()
  1404. keys = [document_tmp_docid,document_tmp_docchannel,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_doctitle_refine,"win_tenderer","bidding_budget","win_bid_price","merge_uuid","是否有招标"]
  1405. df_data = {}
  1406. for k in keys:
  1407. df_data[k] = []
  1408. for item in list_test_item:
  1409. for k in keys:
  1410. df_data[k].append(item.get(k,""))
  1411. df = pd.DataFrame(df_data)
  1412. df.to_excel("test_merge.xlsx",columns=keys)
  1413. def flow_merge(self,process_count=10000,status_from=[71,80],status_to=[81,90]):
  1414. def producer(columns=[document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1415. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
  1416. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1417. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1418. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1419. log("flow_merge producer total_count:%d"%total_count)
  1420. list_dict = getRow_ots(rows)
  1421. for _dict in list_dict:
  1422. self.queue_merge.put(_dict)
  1423. _count = len(list_dict)
  1424. while next_token and _count<process_count:
  1425. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1426. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1427. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1428. list_dict = getRow_ots(rows)
  1429. for _dict in list_dict:
  1430. self.queue_merge.put(_dict)
  1431. _count += len(list_dict)
  1432. def comsumer():
  1433. mt = MultiThreadHandler(self.queue_merge,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1434. mt.run()
  1435. def comsumer_handle(item,result_queue,ots_client):
  1436. self.merge_document(item,status_to)
  1437. # producer()
  1438. # comsumer()
  1439. pass
  1440. def flow_syncho(self,status_from=[71,80],status_to=[81,90]):
  1441. pass
  1442. def flow_remove(self,process_count=flow_process_count,status_from=flow_remove_status_from):
  1443. def producer():
  1444. current_date = getCurrent_date("%Y-%m-%d")
  1445. tmp_date = timeAdd(current_date,-10)
  1446. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True),
  1447. RangeQuery(document_tmp_crtime,range_to="%s 00:00:00"%(tmp_date))])
  1448. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1449. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1450. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1451. log("flow_remove producer total_count:%d"%total_count)
  1452. list_dict = getRow_ots(rows)
  1453. for _dict in list_dict:
  1454. self.queue_remove.put(_dict)
  1455. _count = len(list_dict)
  1456. while next_token:
  1457. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1458. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1459. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1460. list_dict = getRow_ots(rows)
  1461. for _dict in list_dict:
  1462. self.queue_remove.put(_dict)
  1463. _count += len(list_dict)
  1464. def comsumer():
  1465. mt = MultiThreadHandler(self.queue_remove,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1466. mt.run()
  1467. def comsumer_handle(item,result_queue,ots_client):
  1468. dtmp = Document_tmp(item)
  1469. dtmp.delete_row(self.ots_client)
  1470. dhtml = Document_html(item)
  1471. dhtml.delete_row(self.ots_client)
  1472. producer()
  1473. comsumer()
  1474. def start_flow_dumplicate(self):
  1475. schedule = BlockingScheduler()
  1476. schedule.add_job(self.flow_remove,"cron",hour="20")
  1477. schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
  1478. schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
  1479. schedule.start()
  1480. def flow_remove_project_tmp(self,process_count=flow_process_count):
  1481. def producer():
  1482. current_date = getCurrent_date("%Y-%m-%d")
  1483. tmp_date = timeAdd(current_date,-6*31)
  1484. bool_query = BoolQuery(must_queries=[
  1485. RangeQuery(project_page_time,range_to="%s"%(tmp_date))])
  1486. rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2_tmp","project2_tmp_index",
  1487. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time")]),limit=100,get_total_count=True),
  1488. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1489. log("flow_remove project2_tmp producer total_count:%d"%total_count)
  1490. list_dict = getRow_ots(rows)
  1491. for _dict in list_dict:
  1492. self.queue_remove_project.put(_dict)
  1493. _count = len(list_dict)
  1494. while next_token:
  1495. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1496. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1497. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1498. list_dict = getRow_ots(rows)
  1499. for _dict in list_dict:
  1500. self.queue_remove_project.put(_dict)
  1501. _count += len(list_dict)
  1502. def comsumer():
  1503. mt = MultiThreadHandler(self.queue_remove_project,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1504. mt.run()
  1505. def comsumer_handle(item,result_queue,ots_client):
  1506. ptmp = Project_tmp(item)
  1507. ptmp.delete_row(self.ots_client)
  1508. producer()
  1509. comsumer()
  1510. def start_flow_merge(self):
  1511. schedule = BlockingScheduler()
  1512. schedule.add_job(self.flow_merge,"cron",second="*/10")
  1513. schedule.start()
  1514. def download_attachment():
  1515. ots_client = getConnect_ots()
  1516. queue_attachment = Queue()
  1517. auth = getAuth()
  1518. oss2.defaults.connection_pool_size = 100
  1519. oss2.defaults.multiget_num_threads = 20
  1520. attachment_bucket_name = "attachment-hub"
  1521. if is_internal:
  1522. bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  1523. else:
  1524. bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  1525. bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name)
  1526. current_path = os.path.dirname(__file__)
  1527. def producer():
  1528. columns = [document_tmp_attachment_path]
  1529. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_crtime,"2022-03-29 15:00:00","2022-03-29 17:00:00",True,True)])
  1530. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1531. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
  1532. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1533. log("flow_attachment producer total_count:%d"%total_count)
  1534. list_dict = getRow_ots(rows)
  1535. for _dict in list_dict:
  1536. queue_attachment.put(_dict)
  1537. _count = len(list_dict)
  1538. while next_token:
  1539. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1540. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1541. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1542. list_dict = getRow_ots(rows)
  1543. for _dict in list_dict:
  1544. queue_attachment.put(_dict)
  1545. _count += len(list_dict)
  1546. def comsumer():
  1547. mt = MultiThreadHandler(queue_attachment,comsumer_handle,None,10,1)
  1548. mt.run()
  1549. def getAttachments(list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
  1550. list_attachment = []
  1551. rows_to_get = []
  1552. for _md5 in list_filemd5[:50]:
  1553. if _md5 is None:
  1554. continue
  1555. primary_key = [(attachment_filemd5,_md5)]
  1556. rows_to_get.append(primary_key)
  1557. req = BatchGetRowRequest()
  1558. req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
  1559. try:
  1560. result = ots_client.batch_get_row(req)
  1561. attach_result = result.get_result_by_table(attachment_table_name)
  1562. for item in attach_result:
  1563. if item.is_ok:
  1564. _dict = getRow_ots_primary(item.row)
  1565. if _dict is not None:
  1566. list_attachment.append(attachment(_dict))
  1567. except Exception as e:
  1568. log(str(list_filemd5))
  1569. log("attachProcess comsumer error %s"%str(e))
  1570. return list_attachment
  1571. def comsumer_handle(item,result_queue):
  1572. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  1573. if len(page_attachments)==0:
  1574. pass
  1575. else:
  1576. list_fileMd5 = []
  1577. for _atta in page_attachments:
  1578. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  1579. list_attach = getAttachments(list_fileMd5)
  1580. for attach in list_attach:
  1581. filemd5 = attach.getProperties().get(attachment_filemd5)
  1582. _status = attach.getProperties().get(attachment_status)
  1583. _filetype = attach.getProperties().get(attachment_filetype)
  1584. _size = attach.getProperties().get(attachment_size)
  1585. _path = attach.getProperties().get(attachment_path)
  1586. _uuid = uuid4()
  1587. objectPath = attach.getProperties().get(attachment_path)
  1588. localpath = os.path.join(current_path,"download","%s.%s"%(filemd5,_filetype))
  1589. try:
  1590. if _size>ATTACHMENT_LARGESIZE:
  1591. pass
  1592. else:
  1593. downloadFile(bucket,objectPath,localpath)
  1594. except Exception as e:
  1595. traceback.print_exc()
  1596. producer()
  1597. comsumer()
  1598. def test_attachment_interface():
  1599. current_path = os.path.dirname(__file__)
  1600. task_queue = Queue()
  1601. def producer():
  1602. _count = 0
  1603. list_filename = os.listdir(os.path.join(current_path,"download"))
  1604. for _filename in list_filename:
  1605. _count += 1
  1606. _type = _filename.split(".")[1]
  1607. task_queue.put({"path":os.path.join(current_path,"download",_filename),"file_type":_type})
  1608. if _count>=500:
  1609. break
  1610. def comsumer():
  1611. mt = MultiThreadHandler(task_queue,comsumer_handle,None,10)
  1612. mt.run()
  1613. def comsumer_handle(item,result_queue):
  1614. _path = item.get("path")
  1615. _type = item.get("file_type")
  1616. _data_base64 = base64.b64encode(open(_path,"rb").read())
  1617. #调用接口处理结果
  1618. start_time = time.time()
  1619. _success,_html,swf_images = getAttachDealInterface(_data_base64,_type)
  1620. log("%s result:%s takes:%d"%(_path,str(_success),time.time()-start_time))
  1621. producer()
  1622. comsumer()
  1623. class Dataflow_attachment(Dataflow):
  1624. def __init__(self):
  1625. Dataflow.__init__(self)
  1626. self.process_list_thread = []
  1627. def flow_attachment_process(self):
  1628. self.process_comsumer()
  1629. def monitor_attachment_process(self):
  1630. alive_count = 0
  1631. for _t in self.process_list_thread:
  1632. if _t.is_alive():
  1633. alive_count += 1
  1634. log("attachment_process alive:%d total:%d"%(alive_count,len(self.process_list_thread)))
  1635. def process_comsumer(self):
  1636. if len(self.process_list_thread)==0:
  1637. thread_count = 60
  1638. for i in range(thread_count):
  1639. self.process_list_thread.append(Thread(target=self.process_comsumer_handle))
  1640. for t in self.process_list_thread:
  1641. t.start()
  1642. while 1:
  1643. failed_count = 0
  1644. for _i in range(len(self.process_list_thread)):
  1645. t = self.process_list_thread[_i]
  1646. if not t.is_alive():
  1647. failed_count += 1
  1648. self.prcess_list_thread[_i] = Thread(target=self.process_comsumer_handle)
  1649. self.prcess_list_thread[_i].start()
  1650. if failed_count>0:
  1651. log("attachment failed %d"%(failed_count))
  1652. time.sleep(5)
  1653. def process_comsumer_handle(self):
  1654. while 1:
  1655. _flag = False
  1656. log("attachment handle:%s"%str(threading.get_ident()))
  1657. try:
  1658. item = self.queue_attachment_ocr.get(True,timeout=0.2)
  1659. log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
  1660. self.attachment_recognize(item,None)
  1661. log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
  1662. except Exception as e:
  1663. _flag = True
  1664. pass
  1665. try:
  1666. item = self.queue_attachment_not_ocr.get(True,timeout=0.2)
  1667. log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
  1668. self.attachment_recognize(item,None)
  1669. log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
  1670. except Exception as e:
  1671. _flag = True and _flag
  1672. pass
  1673. if _flag:
  1674. time.sleep(2)
  1675. def attachment_recognize(self,_dict,result_queue):
  1676. item = _dict.get("item")
  1677. list_attach = _dict.get("list_attach")
  1678. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1679. "docid":item.get("docid")})
  1680. dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
  1681. _dochtmlcon = dhtml.getProperties().get("dochtmlcon","")
  1682. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  1683. log(str(swf_urls))
  1684. if not _succeed:
  1685. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  1686. else:
  1687. dhtml.updateSWFImages(swf_urls)
  1688. dhtml.updateAttachment(list_html)
  1689. dhtml.update_row(self.ots_client)
  1690. item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
  1691. item[document_tmp_attachment_extract_status] = 1
  1692. log("document:%d get attachments with result:%s"%(item.get("docid"),str(_succeed)))
  1693. dtmp = Document_tmp(item)
  1694. dtmp.update_row(self.ots_client)
  1695. def flow_attachment(self):
  1696. self.flow_attachment_producer()
  1697. self.flow_attachment_producer_comsumer()
  1698. def getAttachments(self,list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
  1699. list_attachment = []
  1700. rows_to_get = []
  1701. for _md5 in list_filemd5[:50]:
  1702. if _md5 is None:
  1703. continue
  1704. primary_key = [(attachment_filemd5,_md5)]
  1705. rows_to_get.append(primary_key)
  1706. req = BatchGetRowRequest()
  1707. req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
  1708. try:
  1709. result = self.ots_client.batch_get_row(req)
  1710. attach_result = result.get_result_by_table(attachment_table_name)
  1711. for item in attach_result:
  1712. if item.is_ok:
  1713. _dict = getRow_ots_primary(item.row)
  1714. if _dict is not None:
  1715. list_attachment.append(attachment(_dict))
  1716. except Exception as e:
  1717. log(str(list_filemd5))
  1718. log("attachProcess comsumer error %s"%str(e))
  1719. return list_attachment
  1720. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  1721. qsize_ocr = self.queue_attachment_ocr.qsize()
  1722. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  1723. log("queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(qsize_ocr,qsize_not_ocr))
  1724. #选择加入数据场景
  1725. if min(qsize_ocr,qsize_not_ocr)>200 or max(qsize_ocr,qsize_not_ocr)>1000:
  1726. return
  1727. #去重
  1728. set_docid = set()
  1729. set_docid = set_docid | set(self.list_attachment_ocr) | set(self.list_attachment_not_ocr)
  1730. if qsize_ocr>0:
  1731. self.list_attachment_ocr = self.list_attachment_ocr[-qsize_ocr:]
  1732. else:
  1733. self.list_attachment_ocr = []
  1734. if qsize_not_ocr>0:
  1735. self.list_attachment_not_ocr = self.list_attachment_not_ocr[-qsize_not_ocr:]
  1736. else:
  1737. self.list_attachment_not_ocr = []
  1738. try:
  1739. bool_query = BoolQuery(must_queries=[
  1740. RangeQuery(document_tmp_status,*flow_attachment_status_from,True,True),
  1741. # TermQuery(document_tmp_docid,234925191),
  1742. ])
  1743. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1744. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
  1745. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1746. log("flow_attachment producer total_count:%d"%total_count)
  1747. list_dict = getRow_ots(rows)
  1748. _count = 0
  1749. for _dict in list_dict:
  1750. docid = _dict.get(document_tmp_docid)
  1751. if docid in set_docid:
  1752. continue
  1753. self.queue_attachment.put(_dict,True)
  1754. _count += 1
  1755. while next_token and _count<flow_process_count:
  1756. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1757. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1758. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1759. list_dict = getRow_ots(rows)
  1760. for _dict in list_dict:
  1761. docid = _dict.get(document_tmp_docid)
  1762. if docid in set_docid:
  1763. continue
  1764. self.queue_attachment.put(_dict,True)
  1765. _count += 1
  1766. log("add attachment count:%d"%(_count))
  1767. except Exception as e:
  1768. log("flow attachment producer error:%s"%(str(e)))
  1769. traceback.print_exc()
  1770. def flow_attachment_producer_comsumer(self):
  1771. log("start flow_attachment comsumer")
  1772. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1)
  1773. mt.run()
  1774. def set_queue(self,_dict):
  1775. list_attach = _dict.get("list_attach")
  1776. to_ocr = False
  1777. for attach in list_attach:
  1778. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  1779. to_ocr = True
  1780. break
  1781. if to_ocr:
  1782. self.queue_attachment_ocr.put(_dict,True)
  1783. # self.list_attachment_ocr.append(_dict.get("item").get(document_tmp_docid))
  1784. else:
  1785. self.queue_attachment_not_ocr.put(_dict,True)
  1786. # self.list_attachment_not_ocr.append(_dict.get("item").get(document_tmp_docid))
  1787. def comsumer_handle(self,item,result_queue):
  1788. try:
  1789. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  1790. if len(page_attachments)==0:
  1791. item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
  1792. dtmp = Document_tmp(item)
  1793. dtmp.update_row(self.ots_client)
  1794. else:
  1795. list_fileMd5 = []
  1796. for _atta in page_attachments:
  1797. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  1798. list_attach = self.getAttachments(list_fileMd5)
  1799. #未上传成功的2小时内不处理
  1800. if len(page_attachments)!=len(list_attach) and time.mktime(time.localtime())-time.mktime(time.strptime(item.get(document_tmp_crtime),"%Y-%m-%d %H:%M:%S"))<7200:
  1801. item[document_tmp_status] = 1
  1802. dtmp = Document_tmp(item)
  1803. dtmp.update_row(self.ots_client)
  1804. return
  1805. self.set_queue({"item":item,"list_attach":list_attach})
  1806. except Exception as e:
  1807. traceback.print_exc()
  1808. def start_flow_attachment(self):
  1809. schedule = BlockingScheduler()
  1810. schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  1811. schedule.add_job(self.flow_attachment,"cron",second="*/10")
  1812. schedule.start()
  1813. class Dataflow_extract(Dataflow):
  1814. def __init__(self):
  1815. Dataflow.__init__(self)
  1816. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  1817. q_size = self.queue_extract.qsize()
  1818. if q_size>100:
  1819. return
  1820. set_docid = set(self.list_extract)
  1821. if q_size>0:
  1822. self.list_extract = self.list_extract[-q_size:]
  1823. else:
  1824. self.list_extract = []
  1825. try:
  1826. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*flow_extract_status_from,True,True)])
  1827. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1828. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.ASC)]),limit=100,get_total_count=True),
  1829. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1830. log("flow_extract producer total_count:%d"%total_count)
  1831. list_dict = getRow_ots(rows)
  1832. for _dict in list_dict:
  1833. docid = _dict.get(document_tmp_docid)
  1834. if docid in set_docid:
  1835. self.list_extract.insert(0,docid)
  1836. continue
  1837. else:
  1838. self.queue_extract.put(_dict)
  1839. self.list_extract.append(docid)
  1840. _count = len(list_dict)
  1841. while next_token and _count<flow_process_count:
  1842. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1843. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1844. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1845. list_dict = getRow_ots(rows)
  1846. for _dict in list_dict:
  1847. docid = _dict.get(document_tmp_docid)
  1848. if docid in set_docid:
  1849. self.list_extract.insert(0,docid)
  1850. continue
  1851. else:
  1852. self.queue_extract.put(_dict)
  1853. self.list_extract.append(docid)
  1854. _count += len(list_dict)
  1855. except Exception as e:
  1856. log("flow extract producer error:%s"%(str(e)))
  1857. traceback.print_exc()
  1858. def flow_extract(self,):
  1859. self.comsumer()
  1860. def comsumer(self):
  1861. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,35,1,True)
  1862. mt.run()
  1863. def comsumer_handle(self,item,result_queue):
  1864. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1865. "docid":item.get("docid")})
  1866. dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
  1867. item[document_tmp_dochtmlcon] = dhtml.getProperties().get(document_tmp_dochtmlcon,"")
  1868. _extract = Document_extract({})
  1869. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  1870. _extract.setValue(document_extract2_docid,item.get(document_docid))
  1871. all_done = 1
  1872. if all_done:
  1873. data = item
  1874. resp = requests.post(self.other_url,json=data,headers=self.header)
  1875. if (resp.status_code >=200 and resp.status_code<=210):
  1876. _extract.setValue(document_extract2_other_json,resp.content.decode("utf8"),True)
  1877. else:
  1878. all_done = -1
  1879. data = {}
  1880. for k,v in item.items():
  1881. data[k] = v
  1882. data["timeout"] = 240
  1883. data["doc_id"] = data.get(document_tmp_docid)
  1884. data["content"] = data.get(document_tmp_dochtmlcon,"")
  1885. if document_tmp_dochtmlcon in data:
  1886. data.pop(document_tmp_dochtmlcon)
  1887. data["title"] = data.get(document_tmp_doctitle,"")
  1888. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  1889. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  1890. if all_done:
  1891. resp = requests.post(self.extract_url,json=data,headers=self.header)
  1892. if (resp.status_code >=200 and resp.status_code<=210):
  1893. _extract.setValue(document_extract2_extract_json,resp.content.decode("utf8"),True)
  1894. else:
  1895. all_done = -2
  1896. if all_done:
  1897. resp = requests.post(self.industy_url,json=data,headers=self.header)
  1898. if (resp.status_code >=200 and resp.status_code<=210):
  1899. _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  1900. else:
  1901. all_done = -3
  1902. _dict = {document_partitionkey:item.get(document_tmp_partitionkey),
  1903. document_docid:item.get(document_tmp_docid),
  1904. }
  1905. dtmp = Document_tmp(_dict)
  1906. if all_done!=1:
  1907. sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  1908. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_failed_to),True)
  1909. dtmp.update_row(self.ots_client)
  1910. else:
  1911. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1912. dtmp.update_row(self.ots_client)
  1913. # 插入接口表,上线放开
  1914. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1915. _extract.update_row(self.ots_client)
  1916. log("process docid:%d %s"%(data["doc_id"],str(all_done)))
  1917. def start_flow_extract(self):
  1918. schedule = BlockingScheduler()
  1919. schedule.add_job(self.flow_extract_producer,"cron",second="*/10")
  1920. schedule.add_job(self.flow_extract,"cron",second="*/10")
  1921. schedule.start()
  1922. class Dataflow_dumplicate(Dataflow):
  1923. class DeleteListener():
  1924. def __init__(self,conn,_func,*args,**kwargs):
  1925. self.conn = conn
  1926. self._func = _func
  1927. def on_error(self, headers,*args,**kwargs):
  1928. log('received an error %s' % str(headers.body))
  1929. def on_message(self, headers,*args,**kwargs):
  1930. try:
  1931. message_id = headers.headers["message-id"]
  1932. body = headers.body
  1933. log("get message %s"%(message_id))
  1934. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  1935. except Exception as e:
  1936. traceback.print_exc()
  1937. pass
  1938. def __del__(self):
  1939. self.conn.disconnect()
  1940. def __init__(self,start_delete_listener=True):
  1941. Dataflow.__init__(self,)
  1942. self.c_f_get_extractCount = f_get_extractCount()
  1943. self.c_f_get_package = f_get_package()
  1944. logging.basicConfig(level = logging.info,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  1945. self.fix_doc_docid = None
  1946. self.bdm = BaseDataMonitor()
  1947. self.check_rule = 1
  1948. if start_delete_listener:
  1949. self.delete_comsumer_counts = 2
  1950. self.doc_delete_queue = "/queue/doc_delete_queue"
  1951. self.doc_delete_result = "/queue/doc_delete_result"
  1952. self.pool_mq_ali = ConnectorPool(1,10,getConnect_activateMQ_ali)
  1953. for _ in range(self.delete_comsumer_counts):
  1954. conn = getConnect_activateMQ_ali()
  1955. listener = self.DeleteListener(conn,self.delete_doc_handle)
  1956. createComsumer(listener,self.doc_delete_queue)
  1957. def get_dict_time(self,_extract,keys=["time_bidclose","time_bidopen","time_bidstart","time_commencement","time_completion","time_earnestMoneyEnd","time_earnestMoneyStart","time_getFileEnd","time_getFileStart","time_publicityEnd","time_publicityStart","time_registrationEnd","time_registrationStart"]):
  1958. dict_time = {}
  1959. for k in keys:
  1960. _time = _extract.get(k)
  1961. _time = _time[:10] if _time else ""
  1962. dict_time[k] = _time
  1963. return dict_time
  1964. def get_attrs_before_dump(self,docid,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]):
  1965. bool_query = BoolQuery(must_queries=[
  1966. TermQuery("docid",docid)
  1967. ])
  1968. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1969. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1970. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1971. log("flow_dumplicate producer total_count:%d"%total_count)
  1972. if total_count==0:
  1973. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1974. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1975. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1976. list_dict = getRow_ots(rows)
  1977. if len(list_dict)>0:
  1978. return self.post_extract(list_dict[0])
  1979. def post_extract(self,_dict):
  1980. win_tenderer,bidding_budget,win_bid_price,_ = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
  1981. _dict["win_tenderer"] = win_tenderer
  1982. _dict["bidding_budget"] = bidding_budget
  1983. _dict["win_bid_price"] = win_bid_price
  1984. extract_json = _dict.get(document_tmp_extract_json,"{}")
  1985. _extract = json.loads(extract_json)
  1986. _dict["product"] = ",".join(_extract.get("product",[]))
  1987. _dict["fingerprint"] = _extract.get("fingerprint","")
  1988. _dict["project_codes"] = _extract.get("code",[])
  1989. if len(_dict["project_codes"])>0:
  1990. _dict["project_code"] = _dict["project_codes"][0]
  1991. else:
  1992. _dict["project_code"] = ""
  1993. _dict["doctitle_refine"] = _extract.get("doctitle_refine","")
  1994. if _dict["doctitle_refine"]=="":
  1995. _dict["doctitle_refine"] = _dict.get("doctitle")
  1996. _dict["moneys"] = set(_extract.get("moneys",[]))
  1997. _dict["moneys_attachment"] = set(_extract.get("moneys_attachment",[]))
  1998. _dict["nlp_enterprise"] = json.dumps({"indoctextcon":_extract.get("nlp_enterprise",[]),
  1999. "notindoctextcon":_extract.get("nlp_enterprise_attachment",[])},ensure_ascii=False)
  2000. _dict["extract_count"] = _extract.get("extract_count",0)
  2001. _dict["package"] = self.c_f_get_package.evaluate(extract_json)
  2002. _dict["project_name"] = _extract.get("name","")
  2003. _dict["dict_time"] = self.get_dict_time(_extract)
  2004. _dict["punish"] = _extract.get("punish",{})
  2005. _dict["approval"] = _extract.get("approval",[])
  2006. # 专项债字段
  2007. issue_details = _extract.get("debt_dic",{}).get("issue_details",[])
  2008. _dict["is_special_bonds"] = 1 if _dict.get(document_tmp_docchannel)==302 and _dict.get(document_tmp_web_source_name)=='专项债券信息网' and issue_details else 0
  2009. # 采购意向字段
  2010. if _dict.get("docchannel")==114:
  2011. _dict["demand_info"] = _extract.get("demand_info",{}).get("data",[])
  2012. else:
  2013. _dict["demand_info"] = []
  2014. return _dict
  2015. def dumplicate_fianl_check(self,base_list,b_log=False):
  2016. the_group = base_list
  2017. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  2018. _index = 0
  2019. base_fingerprint = "None"
  2020. if len(base_list)>0:
  2021. base_fingerprint = base_list[0]["fingerprint"]
  2022. final_group = []
  2023. for _i in range(len(base_list)):
  2024. _dict1 = base_list[_i]
  2025. fingerprint_less = _dict1["fingerprint"]
  2026. _pass = True
  2027. if fingerprint_less==base_fingerprint:
  2028. _index = _i
  2029. final_group.append(_dict1)
  2030. continue
  2031. for _dict2 in final_group:
  2032. _prob,day_dis = self.dumplicate_check(_dict1,_dict2,_dict1.get("min_counts",10),b_log=b_log)
  2033. if _prob<=0.1:
  2034. _pass = False
  2035. break
  2036. log("checking index:%d %s %.2f"%(_i,str(_pass),_prob))
  2037. _index = _i
  2038. if _pass:
  2039. final_group.append(_dict1)
  2040. else:
  2041. break
  2042. return final_group
  2043. def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
  2044. document_less = _dict1
  2045. docid_less = _dict1["docid"]
  2046. docchannel_less = document_less.get("docchannel",0)
  2047. page_time_less = document_less.get("page_time")
  2048. doctitle_refine_less = document_less["doctitle_refine"]
  2049. project_codes_less = document_less.get("project_codes")
  2050. nlp_enterprise_less = document_less["nlp_enterprise"]
  2051. tenderee_less = document_less.get("tenderee","")
  2052. agency_less = document_less.get("agency")
  2053. win_tenderer_less = document_less["win_tenderer"]
  2054. bidding_budget_less = document_less["bidding_budget"]
  2055. win_bid_price_less = document_less["win_bid_price"]
  2056. product_less = document_less.get("product")
  2057. package_less = document_less.get("package")
  2058. json_time_less = document_less.get("dict_time")
  2059. project_name_less = document_less.get("project_name")
  2060. fingerprint_less = document_less.get("fingerprint")
  2061. extract_count_less = document_less.get("extract_count",0)
  2062. web_source_no_less = document_less.get("web_source_no")
  2063. province_less = document_less.get("province")
  2064. city_less = document_less.get("city")
  2065. district_less = document_less.get("district")
  2066. moneys_less = document_less.get("moneys")
  2067. moneys_attachment_less = document_less.get("moneys_attachment")
  2068. page_attachments_less = document_less.get("page_attachments","[]")
  2069. punish_less = document_less.get("punish",{})
  2070. approval_less = document_less.get("approval",[])
  2071. source_type_less = document_less.get("source_type")
  2072. document_greater = _dict2
  2073. docid_greater = _dict2["docid"]
  2074. page_time_greater = document_greater["page_time"]
  2075. docchannel_greater = document_greater.get("docchannel",0)
  2076. doctitle_refine_greater = document_greater.get("doctitle_refine","")
  2077. project_codes_greater = document_greater["project_codes"]
  2078. nlp_enterprise_greater = document_greater["nlp_enterprise"]
  2079. tenderee_greater = document_greater.get("tenderee","")
  2080. agency_greater = document_greater.get("agency","")
  2081. win_tenderer_greater = document_greater["win_tenderer"]
  2082. bidding_budget_greater = document_greater["bidding_budget"]
  2083. win_bid_price_greater = document_greater["win_bid_price"]
  2084. product_greater = document_greater.get("product")
  2085. package_greater = document_greater.get("package")
  2086. json_time_greater = document_greater["dict_time"]
  2087. project_name_greater = document_greater.get("project_name")
  2088. fingerprint_greater = document_greater.get("fingerprint")
  2089. extract_count_greater = document_greater.get("extract_count",0)
  2090. web_source_no_greater = document_greater.get("web_source_no")
  2091. province_greater = document_greater.get("province")
  2092. city_greater = document_greater.get("city")
  2093. district_greater = document_greater.get("district")
  2094. moneys_greater = document_greater.get("moneys")
  2095. moneys_attachment_greater = document_greater.get("moneys_attachment")
  2096. page_attachments_greater = document_greater.get("page_attachments","[]")
  2097. punish_greater = document_greater.get("punish",{})
  2098. approval_greater = document_greater.get("approval",[])
  2099. source_type_greater = document_greater.get("source_type")
  2100. hard_level=1
  2101. if docchannel_less==docchannel_greater==302:
  2102. hard_level=2
  2103. if web_source_no_less==web_source_no_greater=="17397-3":
  2104. hard_level=2
  2105. if self.check_rule==1:
  2106. _prob = check_dumplicate_rule(document_less,document_greater,min_counts,b_log=b_log,hard_level=hard_level)
  2107. else:
  2108. _prob = check_dumplicate_rule_test(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater)
  2109. pagetime_stamp_less = getTimeStamp(page_time_less)
  2110. pagetime_stamp_greater = getTimeStamp(page_time_greater)
  2111. day_dis = abs(pagetime_stamp_greater-pagetime_stamp_less)//86400
  2112. if document_less.get("is_special_bonds",0)==document_greater.get("is_special_bonds",0)==1:
  2113. pass
  2114. else:
  2115. if day_dis>7:
  2116. _prob = 0
  2117. elif day_dis>3:
  2118. if _prob<0.4:
  2119. _prob = 0
  2120. return _prob,day_dis
  2121. def dumplicate_check_bak(self,_dict1,_dict2,min_counts,b_log=False):
  2122. document_less = _dict1
  2123. docid_less = _dict1["docid"]
  2124. docchannel_less = document_less["docchannel"]
  2125. page_time_less = document_less["page_time"]
  2126. doctitle_refine_less = document_less["doctitle_refine"]
  2127. project_codes_less = document_less["project_codes"]
  2128. nlp_enterprise_less = document_less["nlp_enterprise"]
  2129. tenderee_less = document_less["tenderee"]
  2130. agency_less = document_less["agency"]
  2131. win_tenderer_less = document_less["win_tenderer"]
  2132. bidding_budget_less = document_less["bidding_budget"]
  2133. win_bid_price_less = document_less["win_bid_price"]
  2134. product_less = document_less["product"]
  2135. package_less = document_less["package"]
  2136. json_time_less = document_less["dict_time"]
  2137. project_name_less = document_less["project_name"]
  2138. fingerprint_less = document_less["fingerprint"]
  2139. extract_count_less = document_less["extract_count"]
  2140. document_greater = _dict2
  2141. docid_greater = _dict2["docid"]
  2142. page_time_greater = document_greater["page_time"]
  2143. doctitle_refine_greater = document_greater["doctitle_refine"]
  2144. project_codes_greater = document_greater["project_codes"]
  2145. nlp_enterprise_greater = document_greater["nlp_enterprise"]
  2146. tenderee_greater = document_greater["tenderee"]
  2147. agency_greater = document_greater["agency"]
  2148. win_tenderer_greater = document_greater["win_tenderer"]
  2149. bidding_budget_greater = document_greater["bidding_budget"]
  2150. win_bid_price_greater = document_greater["win_bid_price"]
  2151. product_greater = document_greater["product"]
  2152. package_greater = document_greater["package"]
  2153. json_time_greater = document_greater["dict_time"]
  2154. project_name_greater = document_greater["project_name"]
  2155. fingerprint_greater = document_greater["fingerprint"]
  2156. extract_count_greater = document_greater["extract_count"]
  2157. if fingerprint_less==fingerprint_greater:
  2158. return 1
  2159. same_count = 0
  2160. all_count = 8
  2161. if len(set(project_codes_less) & set(project_codes_greater))>0:
  2162. same_count += 1
  2163. if getLength(tenderee_less)>0 and tenderee_less==tenderee_greater:
  2164. same_count += 1
  2165. if getLength(agency_less)>0 and agency_less==agency_greater:
  2166. same_count += 1
  2167. if getLength(win_tenderer_less)>0 and win_tenderer_less==win_tenderer_greater:
  2168. same_count += 1
  2169. if getLength(bidding_budget_less)>0 and bidding_budget_less==bidding_budget_greater:
  2170. same_count += 1
  2171. if getLength(win_bid_price_less)>0 and win_bid_price_less==win_bid_price_greater:
  2172. same_count += 1
  2173. if getLength(project_name_less)>0 and project_name_less==project_name_greater:
  2174. same_count += 1
  2175. if getLength(doctitle_refine_less)>0 and (doctitle_refine_less==doctitle_refine_greater or doctitle_refine_less in doctitle_refine_greater or doctitle_refine_greater in doctitle_refine_less):
  2176. same_count += 1
  2177. base_prob = 0
  2178. if min_counts<3:
  2179. base_prob = 0.9
  2180. elif min_counts<5:
  2181. base_prob = 0.8
  2182. elif min_counts<8:
  2183. base_prob = 0.7
  2184. else:
  2185. base_prob = 0.6
  2186. _prob = base_prob*same_count/all_count
  2187. if _prob<0.1 and min(extract_count_less,extract_count_greater)<=3:
  2188. _prob = 0.15
  2189. if _prob<0.1:
  2190. return _prob
  2191. check_result = {"pass":1}
  2192. if docchannel_less in (51,102,103,104,115,116,117):
  2193. if doctitle_refine_less!=doctitle_refine_greater:
  2194. if page_time_less!=page_time_greater:
  2195. check_result["docchannel"] = 0
  2196. check_result["pass"] = 0
  2197. else:
  2198. check_result["docchannel"] = 2
  2199. if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater,page_time_less,page_time_greater):
  2200. check_result["doctitle"] = 0
  2201. check_result["pass"] = 0
  2202. if b_log:
  2203. logging.info("%d-%d,check_doctitle_failed:%s==%s"%(docid_less,docid_greater,str(doctitle_refine_less),str(doctitle_refine_greater)))
  2204. else:
  2205. check_result["doctitle"] = 2
  2206. #added check
  2207. if not check_codes(project_codes_less,project_codes_greater):
  2208. check_result["code"] = 0
  2209. check_result["pass"] = 0
  2210. if b_log:
  2211. logging.info("%d-%d,check_code_failed:%s==%s"%(docid_less,docid_greater,str(project_codes_less),str(project_codes_greater)))
  2212. else:
  2213. if getLength(project_codes_less)>0 and getLength(project_codes_greater)>0 and len(set(project_codes_less) & set(project_codes_greater))>0:
  2214. check_result["code"] = 2
  2215. else:
  2216. check_result["code"] = 1
  2217. if not check_product(product_less,product_greater,doctitle_refine_less,doctitle_refine_greater):
  2218. check_result["product"] = 0
  2219. check_result["pass"] = 0
  2220. if b_log:
  2221. logging.info("%d-%d,check_product_failed:%s==%s"%(docid_less,docid_greater,str(product_less),str(product_greater)))
  2222. else:
  2223. if getLength(product_less)>0 and getLength(product_greater)>0:
  2224. check_result["product"] = 2
  2225. else:
  2226. check_result["product"] = 1
  2227. if not check_demand():
  2228. check_result["pass"] = 0
  2229. if not check_entity(nlp_enterprise_less,nlp_enterprise_greater,
  2230. tenderee_less,tenderee_greater,
  2231. agency_less,agency_greater,
  2232. win_tenderer_less,win_tenderer_greater):
  2233. check_result["entity"] = 0
  2234. check_result["pass"] = 0
  2235. if b_log:
  2236. logging.info("%d-%d,check_entity_failed:%s==%s==%s==%s==%s==%s==%s==%s"%(docid_less,docid_greater,str(nlp_enterprise_less),str(nlp_enterprise_greater),str(tenderee_less),str(tenderee_greater),str(agency_less),str(agency_greater),str(win_tenderer_less),str(win_tenderer_greater)))
  2237. else:
  2238. if docchannel_less in (51,52,103,105,114,118) and getLength(tenderee_less)>0 and getLength(tenderee_greater)>0:
  2239. check_result["entity"] = 2
  2240. elif docchannel_less in (101,119,120) and getLength(win_tenderer_less)>0 and getLength(win_tenderer_greater)>0:
  2241. check_result["entity"] = 2
  2242. else:
  2243. check_result["entity"] = 1
  2244. if not check_money(bidding_budget_less,bidding_budget_greater,
  2245. win_bid_price_less,win_bid_price_greater):
  2246. if b_log:
  2247. logging.info("%d-%d,check_money_failed:%s==%s==%s==%s"%(docid_less,docid_greater,str(bidding_budget_less),str(bidding_budget_greater),str(win_bid_price_less),str(win_bid_price_greater)))
  2248. check_result["money"] = 0
  2249. check_result["pass"] = 0
  2250. else:
  2251. if docchannel_less in (51,52,103,105,114,118) and getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0:
  2252. check_result["money"] = 2
  2253. elif docchannel_less in (101,119,120) and getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0:
  2254. check_result["money"] = 2
  2255. else:
  2256. check_result["money"] = 1
  2257. #added check
  2258. if not check_package(package_less,package_greater):
  2259. if b_log:
  2260. logging.info("%d-%d,check_package_failed:%s==%s"%(docid_less,docid_greater,str(package_less),str(package_greater)))
  2261. check_result["package"] = 0
  2262. check_result["pass"] = 0
  2263. else:
  2264. if getLength(package_less)>0 and getLength(package_greater)>0:
  2265. check_result["package"] = 2
  2266. else:
  2267. check_result["package"] = 1
  2268. #added check
  2269. if not check_time(json_time_less,json_time_greater):
  2270. if b_log:
  2271. logging.info("%d-%d,check_time_failed:%s==%s"%(docid_less,docid_greater,str(json_time_less),str(json_time_greater)))
  2272. if isinstance(json_time_less,dict):
  2273. time_less = json_time_less
  2274. else:
  2275. time_less = json.loads(json_time_less)
  2276. if isinstance(json_time_greater,dict):
  2277. time_greater = json_time_greater
  2278. else:
  2279. time_greater = json.loads(json_time_greater)
  2280. for k,v in time_less.items():
  2281. if getLength(v)>0:
  2282. v1 = time_greater.get(k,"")
  2283. if getLength(v1)>0:
  2284. if v!=v1:
  2285. log("%d-%d,key:%s"%(docid_less,docid_greater,str(k)))
  2286. check_result["time"] = 0
  2287. check_result["pass"] = 0
  2288. else:
  2289. if getLength(json_time_less)>10 and getLength(json_time_greater)>10:
  2290. check_result["time"] = 2
  2291. else:
  2292. check_result["time"] = 1
  2293. if check_result.get("pass",0)==0:
  2294. if b_log:
  2295. logging.info(str(check_result))
  2296. if check_result.get("money",1)==0:
  2297. return 0
  2298. if check_result.get("entity",1)==2 and check_result.get("code",1)==2 and check_result.get("doctitle",2)==2 and check_result.get("product",2)==2 and check_result.get("money",0)==2:
  2299. return _prob
  2300. else:
  2301. return 0
  2302. if check_result.get("time",1)==0:
  2303. return 0
  2304. return _prob
  2305. def search_data_by_query(self,item,_query,confidence,retry_times=3,merge=False,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count],b_log=False):
  2306. for _ in range(retry_times):
  2307. try:
  2308. _time = time.time()
  2309. check_time = 0
  2310. if isinstance(_query,list):
  2311. bool_query = BoolQuery(should_queries=_query)
  2312. else:
  2313. bool_query = _query
  2314. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  2315. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=100,get_total_count=True),
  2316. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2317. list_dict = getRow_ots(rows)
  2318. list_data = []
  2319. for _dict in list_dict:
  2320. self.post_extract(_dict)
  2321. _docid = _dict.get(document_tmp_docid)
  2322. if merge:
  2323. list_data.append(_dict)
  2324. else:
  2325. if _docid!=item.get(document_tmp_docid):
  2326. _time1 = time.time()
  2327. confidence,day_dis = self.dumplicate_check(item,_dict,total_count,b_log=b_log)
  2328. check_time+= time.time()-_time1
  2329. _dict["confidence"] = confidence
  2330. _dict["min_counts"] = total_count
  2331. list_data.append(_dict)
  2332. all_time = time.time()-_time
  2333. # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
  2334. return list_data
  2335. except Exception as e:
  2336. traceback.print_exc()
  2337. return []
  2338. def add_data_by_query(self,item,base_list,set_docid,_query,confidence,table_name,table_index,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count],b_log=False):
  2339. list_dict = self.search_data_by_query(item,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns,b_log=b_log)
  2340. for _dict in list_dict:
  2341. _docid = _dict.get(document_tmp_docid)
  2342. confidence = _dict["confidence"]
  2343. if b_log:
  2344. log("confidence %d %.3f total_count %d"%(_docid,confidence,_dict.get('min_counts',0)))
  2345. if confidence>0.1:
  2346. if _docid not in set_docid:
  2347. base_list.append(_dict)
  2348. set_docid.add(_docid)
  2349. set_docid.add(_docid)
  2350. def appendRule(self,list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=False):
  2351. for k,v in _dict.items():
  2352. if getLength(v)==0:
  2353. return
  2354. _dict.update(base_dict)
  2355. if b_log:
  2356. log("rule dict:"+str(_dict))
  2357. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  2358. _rule = {"confidence":confidence,
  2359. "item":item,
  2360. "query":_query,
  2361. "singleNum_keys":[],
  2362. "contain_keys":[],
  2363. "multiNum_keys":[],
  2364. "_dict":_dict}
  2365. list_rules.append(_rule)
  2366. def translate_dumplicate_rules(self,status_from,item,get_all=False,to_log=False,day_dis=7,table_name ="document_tmp",table_index="document_tmp_index"):
  2367. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  2368. current_date = getCurrent_date("%Y-%m-%d")
  2369. if page_time=='':
  2370. page_time = current_date
  2371. two_day_dict = {"page_time":[timeAdd(page_time,-7),timeAdd(page_time,7)]}
  2372. if table_name in {"document_tmp","document"}:
  2373. # if page_time>=timeAdd(current_date,-7) and item.get("is_special_bonds")!=1:
  2374. if page_time>=timeAdd(current_date,-7) and item.get("is_special_bonds")!=1 and not get_all:
  2375. table_name = "document_tmp"
  2376. table_index = "document_tmp_index"
  2377. base_dict = {
  2378. "docchannel":item.get("docchannel",52),
  2379. "status":[status_from[0]],
  2380. "page_time":[timeAdd(page_time,-day_dis),timeAdd(page_time,day_dis)]
  2381. }
  2382. must_not_dict = {"save":0,"docid":item.get("docid")}
  2383. doctitle_refine_name = "doctitle_refine"
  2384. else:
  2385. table_name = "document"
  2386. table_index = "document_index"
  2387. if get_all:
  2388. _status = [201,450]
  2389. else:
  2390. _status = [201,300]
  2391. base_dict = {
  2392. "docchannel":item["docchannel"],
  2393. "status":_status,
  2394. "page_time":[timeAdd(page_time,-day_dis),timeAdd(page_time,day_dis)]
  2395. }
  2396. must_not_dict = {"docid":item.get("docid")}
  2397. doctitle_refine_name = "doctitle"
  2398. else:
  2399. _status = [201,300]
  2400. base_dict = {
  2401. "docchannel":item["docchannel"],
  2402. "status":_status,
  2403. "page_time":[timeAdd(page_time,-day_dis),timeAdd(page_time,day_dis)]
  2404. }
  2405. must_not_dict = {"docid":item.get("docid")}
  2406. doctitle_refine_name = "doctitle"
  2407. list_rules = []
  2408. singleNum_keys = ["tenderee","win_tenderer"]
  2409. confidence = 100
  2410. self.appendRule(list_rules,{document_tmp_fingerprint:fingerprint},base_dict,must_not_dict,confidence,item,b_log=to_log)
  2411. confidence = 90
  2412. _dict = {document_tmp_agency:agency,
  2413. "win_tenderer":win_tenderer,
  2414. "win_bid_price":win_bid_price}
  2415. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2416. _dict = {document_tmp_agency:agency,
  2417. "win_tenderer":win_tenderer,
  2418. "bidding_budget":bidding_budget}
  2419. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2420. _dict = {document_tmp_agency:agency,
  2421. "win_bid_price":win_bid_price,
  2422. "bidding_budget":bidding_budget}
  2423. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2424. _dict = {win_tenderer:win_tenderer,
  2425. "win_bid_price":win_bid_price,
  2426. "bidding_budget":bidding_budget}
  2427. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2428. _dict = {"tenderee":tenderee,
  2429. "win_tenderer":win_tenderer,
  2430. "win_bid_price":win_bid_price}
  2431. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2432. _dict = {"tenderee":tenderee,
  2433. "win_tenderer":win_tenderer,
  2434. "bidding_budget":bidding_budget}
  2435. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2436. _dict = {"tenderee":tenderee,
  2437. "win_bid_price":win_bid_price,
  2438. "bidding_budget":bidding_budget}
  2439. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2440. _dict = {"tenderee":tenderee,
  2441. "agency":agency,
  2442. "win_tenderer":win_tenderer}
  2443. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2444. _dict = {"tenderee":tenderee,
  2445. "agency":agency,
  2446. "win_bid_price":win_bid_price}
  2447. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2448. _dict = {"tenderee":tenderee,
  2449. "agency":agency,
  2450. "bidding_budget":bidding_budget}
  2451. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2452. _dict = {"tenderee":tenderee,
  2453. "project_codes":project_code
  2454. }
  2455. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2456. _dict = {"tenderee":tenderee,
  2457. "win_bid_price":win_bid_price
  2458. }
  2459. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2460. _dict = {"agency":agency,
  2461. "project_codes":project_code
  2462. }
  2463. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2464. _dict = {"win_tenderer":win_tenderer,
  2465. "bidding_budget":bidding_budget
  2466. }
  2467. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2468. _dict = {"project_codes":project_code,
  2469. "win_bid_price":win_bid_price
  2470. }
  2471. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2472. _dict = {"project_codes":project_code,
  2473. "bidding_budget":bidding_budget
  2474. }
  2475. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2476. _dict = {"project_codes":project_code,
  2477. doctitle_refine_name:doctitle_refine
  2478. }
  2479. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2480. _dict = {"tenderee":tenderee,
  2481. "bidding_budget":bidding_budget
  2482. }
  2483. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2484. _dict = {"project_codes":project_code,
  2485. "win_tenderer":win_tenderer
  2486. }
  2487. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2488. base_dict.update(two_day_dict)
  2489. confidence=85
  2490. _dict = {"tenderee":tenderee,
  2491. "agency":agency
  2492. }
  2493. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2494. _dict = {"tenderee":tenderee,
  2495. "project_name":project_name
  2496. }
  2497. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2498. if getLength(product)>0:
  2499. l_p = product.split(",")
  2500. _dict = {"tenderee":tenderee,
  2501. "product":l_p[0]
  2502. }
  2503. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2504. _dict = {"tenderee":tenderee,
  2505. "win_tenderer":win_tenderer
  2506. }
  2507. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2508. _dict = {"tenderee":tenderee,
  2509. doctitle_refine_name:doctitle_refine
  2510. }
  2511. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2512. _dict = {"agency":agency,
  2513. "project_name":project_name
  2514. }
  2515. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2516. _dict = {"project_codes":project_code,
  2517. "project_name":project_name
  2518. }
  2519. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2520. _dict = {"project_name":project_name,
  2521. "win_tenderer":win_tenderer
  2522. }
  2523. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2524. _dict = {"project_name":project_name,
  2525. "win_bid_price":win_bid_price
  2526. }
  2527. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2528. _dict = {"project_name":project_name,
  2529. "bidding_budget":bidding_budget
  2530. }
  2531. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2532. _dict = {"project_name":project_name,
  2533. doctitle_refine_name:doctitle_refine
  2534. }
  2535. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2536. _dict = {"win_tenderer":win_tenderer,
  2537. "win_bid_price":win_bid_price
  2538. }
  2539. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2540. _dict = {"win_tenderer":win_tenderer,
  2541. doctitle_refine_name:doctitle_refine
  2542. }
  2543. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2544. _dict = {"win_bid_price":win_bid_price,
  2545. "bidding_budget":bidding_budget
  2546. }
  2547. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2548. confidence=80
  2549. _dict = {"project_codes":project_code}
  2550. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2551. _dict = {"win_bid_price":win_bid_price,
  2552. doctitle_refine_name:doctitle_refine
  2553. }
  2554. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2555. _dict = {"bidding_budget":bidding_budget,
  2556. doctitle_refine_name:doctitle_refine
  2557. }
  2558. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2559. confidence=80
  2560. _dict = {doctitle_refine_name:doctitle_refine}
  2561. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2562. # 专项债
  2563. if item.get("is_special_bonds")==1:
  2564. confidence = 90
  2565. _dict = {doctitle_refine_name: doctitle_refine,
  2566. document_tmp_web_source_name:"专项债券信息网"}
  2567. tmp_base_dict = {
  2568. "docchannel": item["docchannel"],
  2569. "status": [201, 450],
  2570. # "page_time": [timeAdd(page_time, -365), timeAdd(page_time, 365)]
  2571. }
  2572. self.appendRule(list_rules, _dict, tmp_base_dict, must_not_dict, confidence, item, b_log=to_log)
  2573. confidence=70
  2574. _dict = {"project_name":project_name}
  2575. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2576. return list_rules,table_name,table_index
  2577. def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,"detail_link",'products']):
  2578. q_size = self.queue_dumplicate.qsize()
  2579. log("dumplicate queue size %d"%(q_size))
  2580. while 1:
  2581. try:
  2582. docid = self.queue_dumplicate_processed.get(block=False)
  2583. if docid in self.dumplicate_set:
  2584. self.dumplicate_set.remove(docid)
  2585. except Exception as e:
  2586. break
  2587. if q_size>process_count//3:
  2588. return
  2589. bool_query = BoolQuery(must_queries=[
  2590. RangeQuery(document_tmp_status,*status_from,True,True),
  2591. # TermQuery("docid",271983871)
  2592. ])
  2593. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2594. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_update_document,SortOrder.DESC),FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
  2595. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2596. log("flow_dumplicate producer total_count:%d"%total_count)
  2597. list_dict = getRow_ots(rows)
  2598. for _dict in list_dict:
  2599. docid = _dict.get(document_tmp_docid)
  2600. if docid in self.dumplicate_set:
  2601. continue
  2602. self.dumplicate_set.add(docid)
  2603. self.queue_dumplicate.put(_dict)
  2604. _count = len(list_dict)
  2605. while next_token and _count<process_count:
  2606. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2607. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  2608. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2609. list_dict = getRow_ots(rows)
  2610. for _dict in list_dict:
  2611. docid = _dict.get(document_tmp_docid)
  2612. if docid in self.dumplicate_set:
  2613. continue
  2614. self.dumplicate_set.add(docid)
  2615. self.queue_dumplicate.put(_dict)
  2616. _count += len(list_dict)
  2617. # _l = list(self.dumplicate_set)
  2618. # _l.sort(key=lambda x:x,reverse=True)
  2619. # self.dumplicate_set = set(_l[:flow_process_count*2])
  2620. def comsumer_flow_dumplicate(self):
  2621. mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,60,1,ots_client=self.ots_client)
  2622. mt.run()
  2623. def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
  2624. self.producer_flow_dumplicate(process_count=process_count,status_from=status_from)
  2625. # self.comsumer_flow_dumplicate()
  2626. def flow_dumpcate_comsumer(self):
  2627. from multiprocessing import Process
  2628. process_count = 6
  2629. thread_count = 12
  2630. list_process = []
  2631. def start_thread():
  2632. mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,thread_count,1,need_stop=False,restart=True,timeout=600,ots_client=self.ots_client)
  2633. mt.run()
  2634. for _ in range(process_count):
  2635. p = Process(target=start_thread)
  2636. list_process.append(p)
  2637. for p in list_process:
  2638. p.start()
  2639. while 1:
  2640. for _i in range(len(list_process)):
  2641. p = list_process[_i]
  2642. if not p.is_alive():
  2643. p = Process(target=start_thread)
  2644. list_process[_i] = p
  2645. p.start()
  2646. time.sleep(1)
  2647. # mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,40,1,ots_client=self.ots_client)
  2648. # mt.run()
  2649. def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment,document_tenderee_code,document_agency_code,document_candidates],document_name="document"):
  2650. '''
  2651. 根据docid查询公告内容,先查询document_tmp,再查询document
  2652. :param list_docids:
  2653. :return:
  2654. '''
  2655. list_docs = []
  2656. set_fingerprint = set()
  2657. for _docid in list_docids:
  2658. docid = int(_docid)
  2659. _dict = {document_partitionkey:getPartitionKey(docid),
  2660. document_docid:docid}
  2661. if document_name in {"document","document_tmp"}:
  2662. _doc = Document_tmp(_dict)
  2663. _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
  2664. if not _exists:
  2665. _doc = Document(_dict)
  2666. _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
  2667. else:
  2668. _doc = Document(_dict)
  2669. _doc.table_name = document_name
  2670. _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
  2671. if _exists:
  2672. _fingerprint = _doc.getProperties().get(document_fingerprint)
  2673. if _fingerprint in set_fingerprint:
  2674. continue
  2675. set_fingerprint.add(_fingerprint)
  2676. list_docs.append(_doc)
  2677. for _doc in list_docs:
  2678. try:
  2679. _sub_docs_json = _doc.getProperties().get(document_tmp_sub_docs_json)
  2680. if _sub_docs_json is not None:
  2681. _doc.setValue("sub_docs",json.loads(_sub_docs_json),False)
  2682. except Exception as e:
  2683. traceback.print_exc()
  2684. list_docs.sort(key=lambda x:x.getProperties().get(document_page_time,""))
  2685. return list_docs
  2686. def is_same_package(self,_dict1,_dict2):
  2687. sub_project_name1 = _dict1.get(project_sub_project_name,"")
  2688. if sub_project_name1=="Project":
  2689. sub_project_name1 = ""
  2690. win_tenderer1 = _dict1.get(project_win_tenderer,"")
  2691. win_bid_price1 = _dict1.get(project_win_bid_price,0)
  2692. bidding_budget1 = _dict1.get(project_bidding_budget,0)
  2693. sub_project_name2 = _dict2.get(project_sub_project_name,"")
  2694. if sub_project_name2=="Project":
  2695. sub_project_name2 = ""
  2696. win_tenderer2 = _dict2.get(project_win_tenderer,"")
  2697. win_bid_price2 = _dict2.get(project_win_bid_price,0)
  2698. bidding_budget2 = _dict2.get(project_bidding_budget,0)
  2699. _set = set([a for a in [sub_project_name1,sub_project_name2] if a!=""])
  2700. if len(_set)>1:
  2701. return False
  2702. _set = set([a for a in [win_tenderer1,win_tenderer2] if a!=""])
  2703. if len(_set)>1:
  2704. return False
  2705. _set = set([a for a in [win_bid_price1,win_bid_price2] if a!=0])
  2706. if len(_set)>1:
  2707. return False
  2708. _set = set([a for a in [bidding_budget1,bidding_budget2] if a!=0])
  2709. if len(_set)>1:
  2710. return False
  2711. return True
  2712. def getUpdate_dict(self,_dict):
  2713. update_dict = {}
  2714. for k,v in _dict.items():
  2715. if v is None:
  2716. continue
  2717. if isinstance(v,str):
  2718. if v=="":
  2719. continue
  2720. if isinstance(v,(float,int)):
  2721. if v==0:
  2722. continue
  2723. update_dict[k] = v
  2724. return update_dict
  2725. def update_projects_by_document(self,docid,save,projects,document_name="document"):
  2726. '''
  2727. 更新projects中对应的document的属性
  2728. :param docid:
  2729. :param projects: 项目集合
  2730. :param action:add/delete add时附加唯一属性,delete时删除唯一属性
  2731. :return:
  2732. '''
  2733. list_docs = self.search_docs([docid],document_name=document_name)
  2734. docs = [_doc.getProperties() for _doc in list_docs]
  2735. project_dict = generate_common_properties(docs)
  2736. list_package_properties = generate_packages_properties(docs)
  2737. _dict = {}
  2738. #更新公共属性
  2739. _replace_replace = False
  2740. v = project_dict.get(document_district,"")
  2741. if not (v is None or v=="" or v=="[]" or v=="未知"):
  2742. _replace_replace = True
  2743. for k,v in project_dict.items():
  2744. if not _replace_replace:
  2745. if k in [document_district,document_city,document_province,document_area]:
  2746. continue
  2747. if v is None or v=="" or v=="[]" or v=="未知":
  2748. continue
  2749. if k in (project_project_dynamics,project_product,project_project_codes,project_docids,project_candidates,project_zhong_biao_page_time,project_zhao_biao_page_time,project_page_time,project_docchannel):
  2750. continue
  2751. _dict[k] = v
  2752. for _proj in projects:
  2753. _proj.update(_dict)
  2754. for _proj in projects:
  2755. if _proj.get(project_page_time,"")<=project_dict.get(project_page_time,""):
  2756. _proj[project_page_time] = project_dict.get(project_page_time,"")
  2757. _proj[project_docchannel] = project_dict.get(project_docchannel,"")
  2758. else:
  2759. if project_docchannel in project_dict:
  2760. project_dict.pop(project_docchannel)
  2761. if _proj.get(project_zhong_biao_page_time,"")>project_dict.get(project_zhong_biao_page_time,""):
  2762. _proj[project_zhong_biao_page_time] = project_dict.get(project_zhong_biao_page_time,"")
  2763. if _proj.get(project_zhao_biao_page_time,"")>project_dict.get(project_zhao_biao_page_time,""):
  2764. _proj[project_zhao_biao_page_time] = project_dict.get(project_zhao_biao_page_time,"")
  2765. for _proj in projects:
  2766. #拼接属性
  2767. append_dict = {}
  2768. set_docid = set()
  2769. set_product = set()
  2770. set_code = set()
  2771. set_nlp_enterprise = set()
  2772. set_nlp_enterprise_attachment = set()
  2773. set_candidates = set()
  2774. _docids = _proj.get(project_docids,"")
  2775. _codes = _proj.get(project_project_codes,"")
  2776. _product = _proj.get(project_product,"")
  2777. set_docid = set(_docids.split(","))
  2778. if save==1:
  2779. set_docid.add(str(docid))
  2780. else:
  2781. if str(docid) in set_docid:
  2782. set_docid.remove(str(docid))
  2783. set_code = set_code | set(_codes.split(","))
  2784. set_product = set_product | set(_product.split(","))
  2785. try:
  2786. set_nlp_enterprise |= set(json.loads(_proj.get(project_nlp_enterprise,"[]")))
  2787. set_nlp_enterprise_attachment |= set(json.loads(_proj.get(project_nlp_enterprise_attachment,"[]")))
  2788. list_candidates = json.loads(project_dict.get(project_candidates,"[]"))
  2789. for item in list_candidates:
  2790. if item.get("name") is not None and item.get("name") not in set_candidates:
  2791. set_candidates.add(item.get("name"))
  2792. set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
  2793. set_product = set_product | set(project_dict.get(project_product,"").split(","))
  2794. set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
  2795. set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
  2796. for item in json.loads(_proj.get(project_candidates,"[]")):
  2797. if item.get("name") is not None and item.get("name") not in set_candidates:
  2798. set_candidates.add(item.get("name"))
  2799. list_candidates.append(item)
  2800. except Exception as e:
  2801. pass
  2802. append_dict[project_docids] = ",".join([a for a in list(set_docid) if a!=""])
  2803. append_dict[project_docid_number] = len(set_docid)
  2804. append_dict[project_project_codes] = ",".join([a for a in list(set_code) if a!=""])
  2805. append_dict[project_product] = ",".join([a for a in list(set_product) if a!=""])
  2806. append_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
  2807. append_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
  2808. append_dict[project_candidates] = json.dumps(list_candidates,ensure_ascii=False)
  2809. dict_dynamic = {}
  2810. set_docid = set()
  2811. _dynamic = json.loads(_proj.get(project_project_dynamics,"[]"))
  2812. for _dy in _dynamic:
  2813. _docid = _dy.get("docid")
  2814. dict_dynamic[_docid] = _dy
  2815. _dynamic = json.loads(project_dict.get(project_project_dynamics,"[]"))
  2816. for _dy in _dynamic:
  2817. _docid = _dy.get("docid")
  2818. dict_dynamic[_docid] = _dy
  2819. list_dynamics = []
  2820. for k,v in dict_dynamic.items():
  2821. list_dynamics.append(v)
  2822. list_dynamics.sort(key=lambda x:x.get(document_page_time,""))
  2823. append_dict[project_project_dynamics] = json.dumps(list_dynamics[:100],ensure_ascii=False)
  2824. _proj.update(append_dict)
  2825. dict_package = {}
  2826. for _pp in projects:
  2827. _counts = 0
  2828. sub_project_name = _pp.get(project_sub_project_name,"")
  2829. if sub_project_name=="Project":
  2830. sub_project_name = ""
  2831. win_tenderer = _pp.get(project_win_tenderer,"")
  2832. win_bid_price = _pp.get(project_win_bid_price,0)
  2833. bidding_budget = _pp.get(project_bidding_budget,0)
  2834. if win_tenderer!="" and bidding_budget!=0:
  2835. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2836. dict_package[_key] = _pp
  2837. _counts += 1
  2838. if win_tenderer!="" and win_bid_price!=0:
  2839. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2840. dict_package[_key] = _pp
  2841. _counts +=1
  2842. if _counts==0:
  2843. if win_tenderer!="":
  2844. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2845. dict_package[_key] = _pp
  2846. _counts += 1
  2847. if bidding_budget!=0:
  2848. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2849. dict_package[_key] = _pp
  2850. _counts += 1
  2851. #更新私有属性
  2852. if len(projects)==1 and len(list_package_properties)==1:
  2853. _pp = list_package_properties[0]
  2854. pp = projects[0]
  2855. ud = self.getUpdate_dict(_pp)
  2856. self.set_project_uuid(ud,pp.get("uuid"))
  2857. pp.update(_pp)
  2858. else:
  2859. for _pp in list_package_properties:
  2860. flag_update = False
  2861. sub_project_name = _pp.get(project_sub_project_name,"")
  2862. if sub_project_name=="Project":
  2863. sub_project_name = ""
  2864. win_tenderer = _pp.get(project_win_tenderer,"")
  2865. win_bid_price = _pp.get(project_win_bid_price,0)
  2866. bidding_budget = _pp.get(project_bidding_budget,0)
  2867. if win_tenderer!="" and bidding_budget!=0:
  2868. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2869. if _key in dict_package:
  2870. if self.is_same_package(_pp,dict_package[_key]):
  2871. ud = self.getUpdate_dict(_pp)
  2872. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2873. dict_package[_key].update(ud)
  2874. flag_update = True
  2875. continue
  2876. if win_tenderer!="" and win_bid_price!=0:
  2877. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2878. if _key in dict_package:
  2879. if self.is_same_package(_pp,dict_package[_key]):
  2880. ud = self.getUpdate_dict(_pp)
  2881. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2882. dict_package[_key].update(ud)
  2883. flag_update = True
  2884. continue
  2885. if win_tenderer!="":
  2886. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2887. if _key in dict_package:
  2888. if self.is_same_package(_pp,dict_package[_key]):
  2889. ud = self.getUpdate_dict(_pp)
  2890. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2891. dict_package[_key].update(ud)
  2892. flag_update = True
  2893. continue
  2894. if bidding_budget!=0:
  2895. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2896. if _key in dict_package:
  2897. if self.is_same_package(_pp,dict_package[_key]):
  2898. ud = self.getUpdate_dict(_pp)
  2899. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2900. dict_package[_key].update(ud)
  2901. flag_update = True
  2902. continue
  2903. if not flag_update:
  2904. _pp.update(project_dict)
  2905. projects.append(_pp)
  2906. _counts = 0
  2907. if win_tenderer!="" and bidding_budget!=0:
  2908. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2909. dict_package[_key] = _pp
  2910. _counts += 1
  2911. if win_tenderer!="" and win_bid_price!=0:
  2912. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2913. dict_package[_key] = _pp
  2914. _counts +=1
  2915. if _counts==0:
  2916. if win_tenderer!="":
  2917. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2918. dict_package[_key] = _pp
  2919. _counts += 1
  2920. if bidding_budget!=0:
  2921. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2922. dict_package[_key] = _pp
  2923. _counts += 1
  2924. def delete_projects_by_document(self,docid):
  2925. '''
  2926. 更新projects中对应的document的属性
  2927. :param docid:
  2928. :param projects: 项目集合
  2929. :param action:add/delete add时附加唯一属性,delete时删除唯一属性
  2930. :return:
  2931. '''
  2932. set_docid = set()
  2933. list_delete_projects = []
  2934. list_projects = self.search_projects_with_document([docid])
  2935. for _proj in list_projects:
  2936. _p = {}
  2937. _docids = _proj.get(project_docids,"")
  2938. print(_proj.get(project_uuid))
  2939. _p["delete_uuid"] = _proj.get(project_uuid)
  2940. _p["to_delete"] = True
  2941. list_delete_projects.append(_p)
  2942. if _docids!="":
  2943. set_docid = set_docid | set(_docids.split(","))
  2944. if str(docid) in set_docid:
  2945. set_docid.remove(str(docid))
  2946. list_docid = list(set_docid)
  2947. list_projects = []
  2948. if len(list_docid)>0:
  2949. list_docs = self.search_docs(list_docid)
  2950. print("search_docs(list_docid)")
  2951. list_projects = self.generate_projects_from_document(list_docs)
  2952. print("generate_projects_from_document")
  2953. list_projects = dumplicate_projects(list_projects,max_count=20)
  2954. print("dumplicate_projects")
  2955. list_projects.extend(list_delete_projects)
  2956. project_json = to_project_json(list_projects)
  2957. return project_json
  2958. def delete_doc_handle(self,_dict,result_queue):
  2959. try:
  2960. headers = _dict.get("frame")
  2961. conn = _dict.get("conn")
  2962. if headers is not None:
  2963. message_id = headers.headers["message-id"]
  2964. body = headers.body
  2965. item = json.loads(body)
  2966. docid = item.get("docid")
  2967. log("==========start delete docid:%s"%(str(docid)))
  2968. if docid is None:
  2969. ackMsg(conn,message_id)
  2970. delete_result = self.delete_projects_by_document(docid)
  2971. log("1")
  2972. _uuid = uuid4().hex
  2973. _d = {PROJECT_PROCESS_UUID:_uuid,
  2974. PROJECT_PROCESS_CRTIME:1,
  2975. PROJECT_PROCESS_PROJECTS:delete_result}
  2976. _pp = Project_process(_d)
  2977. log("2")
  2978. try:
  2979. if _pp.update_row(self.ots_client):
  2980. ackMsg(conn,message_id)
  2981. except Exception as e:
  2982. ackMsg(conn,message_id)
  2983. log("3")
  2984. #取消插入结果队列,改成插入project_process表
  2985. # if send_msg_toacmq(self.pool_mq_ali,delete_result,self.doc_delete_result):
  2986. # ackMsg(conn,message_id)
  2987. log("==========end delete docid:%s"%(str(docid)))
  2988. else:
  2989. log("has not headers")
  2990. except Exception as e:
  2991. traceback.print_exc()
  2992. ackMsg(conn,message_id)
  2993. log("==========end delete docid:%s"%(str(docid)))
  2994. def generate_common_properties(self,list_docs):
  2995. '''
  2996. #通用属性生成
  2997. :param list_docis:
  2998. :return:
  2999. '''
  3000. #计数法选择
  3001. choose_dict = {}
  3002. project_dict = {}
  3003. for _key in [document_bidway,document_industry,document_info_type,document_info_source,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]:
  3004. for _doc in list_docs:
  3005. _value = _doc.getProperties().get(_key,"")
  3006. if _value!="":
  3007. if _key not in choose_dict:
  3008. choose_dict[_key] = {}
  3009. if _value not in choose_dict[_key]:
  3010. choose_dict[_key][_value] = 0
  3011. choose_dict[_key][_value] += 1
  3012. _find = False
  3013. for _key in [document_district,document_city,document_province,document_area]:
  3014. area_dict = {}
  3015. for _doc in list_docs:
  3016. loc = _doc.getProperties().get(_key,"未知")
  3017. if loc not in ('全国','未知',"0"):
  3018. if loc not in area_dict:
  3019. area_dict[loc] = 0
  3020. area_dict[loc] += 1
  3021. list_loc = []
  3022. for k,v in area_dict.items():
  3023. list_loc.append([k,v])
  3024. list_loc.sort(key=lambda x:x[1],reverse=True)
  3025. if len(list_loc)>0:
  3026. project_dict[document_district] = _doc.getProperties().get(document_district)
  3027. project_dict[document_city] = _doc.getProperties().get(document_city)
  3028. project_dict[document_province] = _doc.getProperties().get(document_province)
  3029. project_dict[document_area] = _doc.getProperties().get(document_area)
  3030. _find = True
  3031. break
  3032. if not _find:
  3033. if len(list_docs)>0:
  3034. project_dict[document_district] = list_docs[0].getProperties().get(document_district)
  3035. project_dict[document_city] = list_docs[0].getProperties().get(document_city)
  3036. project_dict[document_province] = list_docs[0].getProperties().get(document_province)
  3037. project_dict[document_area] = list_docs[0].getProperties().get(document_area)
  3038. for _key,_value in choose_dict.items():
  3039. _l = []
  3040. for k,v in _value.items():
  3041. _l.append([k,v])
  3042. _l.sort(key=lambda x:x[1],reverse=True)
  3043. if len(_l)>0:
  3044. _v = _l[0][0]
  3045. if _v in ('全国','未知'):
  3046. if len(_l)>1:
  3047. _v = _l[1][0]
  3048. project_dict[_key] = _v
  3049. list_dynamics = []
  3050. docid_number = 0
  3051. visuable_docids = []
  3052. zhao_biao_page_time = ""
  3053. zhong_biao_page_time = ""
  3054. list_codes = []
  3055. list_product = []
  3056. p_page_time = ""
  3057. remove_docids = set()
  3058. for _doc in list_docs:
  3059. table_name = _doc.getProperties().get("table_name")
  3060. status = _doc.getProperties().get(document_status,0)
  3061. _save = _doc.getProperties().get(document_tmp_save,1)
  3062. doctitle = _doc.getProperties().get(document_doctitle,"")
  3063. docchannel = _doc.getProperties().get(document_docchannel)
  3064. page_time = _doc.getProperties().get(document_page_time,"")
  3065. _docid = _doc.getProperties().get(document_docid)
  3066. _bidway = _doc.getProperties().get(document_bidway,"")
  3067. _docchannel = _doc.getProperties().get(document_life_docchannel,0)
  3068. project_codes = _doc.getProperties().get(document_project_codes)
  3069. product = _doc.getProperties().get(document_product)
  3070. sub_docs = _doc.getProperties().get("sub_docs",[])
  3071. is_multipack = True if len(sub_docs)>1 else False
  3072. extract_count = _doc.getProperties().get(document_tmp_extract_count,0)
  3073. if product is not None:
  3074. list_product.extend(product.split(","))
  3075. if project_codes is not None:
  3076. _c = project_codes.split(",")
  3077. list_codes.extend(_c)
  3078. if p_page_time=="":
  3079. p_page_time = page_time
  3080. if zhao_biao_page_time=="" and _docchannel in (51,52,102,103,114):
  3081. zhao_biao_page_time = page_time
  3082. if zhong_biao_page_time=="" and _docchannel in (101,118,119,120):
  3083. zhong_biao_page_time = page_time
  3084. is_visuable = 0
  3085. if table_name=="document":
  3086. if status>=201 and status<=300:
  3087. docid_number +=1
  3088. visuable_docids.append(str(_docid))
  3089. is_visuable = 1
  3090. else:
  3091. remove_docids.add(str(_docid))
  3092. else:
  3093. if _save==1:
  3094. docid_number +=1
  3095. visuable_docids.append(str(_docid))
  3096. is_visuable = 1
  3097. else:
  3098. remove_docids.add(str(_docid))
  3099. list_dynamics.append({document_docid:_docid,
  3100. document_doctitle:doctitle,
  3101. document_docchannel:_docchannel,
  3102. document_bidway:_bidway,
  3103. document_page_time:page_time,
  3104. document_status:201 if is_visuable==1 else 401,
  3105. "is_multipack":is_multipack,
  3106. document_tmp_extract_count:extract_count
  3107. }
  3108. )
  3109. project_dict[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
  3110. project_dict[project_docid_number] = docid_number
  3111. project_dict[project_docids] = ",".join(list(set(visuable_docids)-remove_docids))
  3112. if zhao_biao_page_time !="":
  3113. project_dict[project_zhao_biao_page_time] = zhao_biao_page_time
  3114. if zhong_biao_page_time !="":
  3115. project_dict[project_zhong_biao_page_time] = zhong_biao_page_time
  3116. project_dict[project_project_codes] = ",".join(list(set(list_codes)))
  3117. project_dict[project_page_time] = p_page_time
  3118. project_dict[project_product] = ",".join(list(set(list_product)))
  3119. return project_dict
  3120. def generate_packages_properties(self,list_docs):
  3121. '''
  3122. 生成分包属性
  3123. :param list_docs:
  3124. :return:
  3125. '''
  3126. list_properties = []
  3127. set_key = set()
  3128. for _doc in list_docs:
  3129. _dict = {}
  3130. sub_docs = _doc.getProperties().get("sub_docs")
  3131. if sub_docs is not None:
  3132. for _d in sub_docs:
  3133. sub_project_code = _d.get(project_sub_project_code,"")
  3134. sub_project_name = _d.get(project_sub_project_name,"")
  3135. win_tenderer = _d.get(project_win_tenderer,"")
  3136. win_bid_price = _d.get(project_win_bid_price,"")
  3137. _key = "%s-%s-%s-%s"%(sub_project_code,sub_project_name,win_tenderer,win_bid_price)
  3138. if _key in set_key:
  3139. continue
  3140. set_key.add(_key)
  3141. list_properties.append(_d)
  3142. return list_properties
  3143. def generate_projects_from_document(self,list_docs):
  3144. '''
  3145. #通过公告生成projects
  3146. :param list_docids:
  3147. :return:
  3148. '''
  3149. #判断标段数
  3150. list_projects = generate_projects([doc.getProperties() for doc in list_docs])
  3151. return list_projects
  3152. def search_projects_with_document(self,list_docids,project_table,project_table_index):
  3153. '''
  3154. 通过docid集合查询对应的projects
  3155. :param list_docids:
  3156. :return:
  3157. '''
  3158. log("search_projects_with_document %s"%str(list_docids))
  3159. list_should_q = []
  3160. for _docid in list_docids:
  3161. list_should_q.append(TermQuery("docids",_docid))
  3162. bool_query = BoolQuery(should_queries=list_should_q)
  3163. _query = {"query":bool_query,"limit":20}
  3164. list_project_dict = getDocument(_query,self.ots_client,[
  3165. project_uuid,project_docids,project_zhao_biao_page_time,
  3166. project_zhong_biao_page_time,
  3167. project_page_time,
  3168. project_area,
  3169. project_province,
  3170. project_city,
  3171. project_district,
  3172. project_info_type,
  3173. project_industry,
  3174. project_qcodes,
  3175. project_project_name,
  3176. project_project_code,
  3177. project_project_codes,
  3178. project_project_addr,
  3179. project_tenderee,
  3180. project_tenderee_addr,
  3181. project_tenderee_phone,
  3182. project_tenderee_contact,
  3183. project_agency,
  3184. project_agency_phone,
  3185. project_agency_contact,
  3186. project_sub_project_name,
  3187. project_sub_project_code,
  3188. project_bidding_budget,
  3189. project_win_tenderer,
  3190. project_win_bid_price,
  3191. project_win_tenderer_manager,
  3192. project_win_tenderer_phone,
  3193. project_second_tenderer,
  3194. project_second_bid_price,
  3195. project_second_tenderer_manager,
  3196. project_second_tenderer_phone,
  3197. project_third_tenderer,
  3198. project_third_bid_price,
  3199. project_third_tenderer_manager,
  3200. project_third_tenderer_phone,
  3201. project_procurement_system,
  3202. project_bidway,
  3203. project_dup_data,
  3204. project_docid_number,
  3205. project_project_dynamics,
  3206. project_product,
  3207. project_moneysource,
  3208. project_service_time,
  3209. project_time_bidclose,
  3210. project_time_bidopen,
  3211. project_time_bidstart,
  3212. project_time_commencement,
  3213. project_time_completion,
  3214. project_time_earnest_money_start,
  3215. project_time_earnest_money_end,
  3216. project_time_get_file_end,
  3217. project_time_get_file_start,
  3218. project_time_publicity_end,
  3219. project_time_publicity_start,
  3220. project_time_registration_end,
  3221. project_time_registration_start,
  3222. project_time_release,
  3223. project_dup_docid,
  3224. project_info_source,
  3225. project_nlp_enterprise,
  3226. project_nlp_enterprise_attachment,
  3227. project_tenderee_code,
  3228. project_agency_code,
  3229. project_candidates,
  3230. project_docchannel
  3231. ],sort="page_time",table_name=project_table,table_index=project_table_index)
  3232. return list_project_dict
  3233. def set_project_uuid(self,_dict,_uuid):
  3234. if _uuid is not None and _uuid!="":
  3235. if "uuid" in _dict:
  3236. _dict["uuid"] = "%s,%s"%(_dict["uuid"],_uuid)
  3237. else:
  3238. _dict["uuid"] = _uuid
  3239. def getMerge_rules(self,page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district):
  3240. whole_time_start = time.time()
  3241. _time = time.time()
  3242. list_query = []
  3243. list_code = [a for a in project_codes.split(",") if a!='']
  3244. should_q_code = BoolQuery(should_queries=[MatchQuery(project_project_codes,a) for a in list_code[:20]])
  3245. # print("should_q_code",[a for a in list_code[:20]])
  3246. should_q_cod = BoolQuery(should_queries=[MatchQuery(project_project_code,a) for a in list_code[:20]])
  3247. list_product = [a for a in product.split(",") if a!='']
  3248. should_q_product = BoolQuery(should_queries=[MatchQuery(project_product,a) for a in list_product[:20]])
  3249. should_q_area = None
  3250. if province!="" or city!="" or district!="":
  3251. should_q = []
  3252. if province not in ("","全国","未知") and province is not None:
  3253. should_q.append(TermQuery(project_province,province))
  3254. if city not in ("","全国","未知") and city is not None:
  3255. should_q.append(TermQuery(project_city,city))
  3256. if district not in ("","全国","未知") and district is not None:
  3257. should_q.append(TermQuery(project_district,district))
  3258. if len(should_q)>0:
  3259. should_q_area = BoolQuery(should_queries=should_q)
  3260. prepare_time = time.time()-_time
  3261. _time = time.time()
  3262. # log("list_code %s"%(str(list_code)))
  3263. # log("list_product %s"%(str(list_product)))
  3264. # log("tenderee %s"%(tenderee))
  3265. # log("bidding_budget %s"%(bidding_budget))
  3266. # log("win_tenderer %s"%(win_tenderer))
  3267. # log("win_bid_price %s"%(win_bid_price))
  3268. # log("project_name %s"%(project_name))
  3269. log_time = time.time()-_time
  3270. _time = time.time()
  3271. if tenderee!="" and len(list_code)>0:
  3272. _query = [TermQuery(project_tenderee,tenderee),
  3273. should_q_code,
  3274. ]
  3275. list_query.append([_query,2])
  3276. _query = [TermQuery(project_tenderee,tenderee),
  3277. should_q_cod
  3278. ]
  3279. list_query.append([_query,2])
  3280. if tenderee!="" and len(list_product)>0:
  3281. _query = [TermQuery(project_tenderee,tenderee),
  3282. should_q_product]
  3283. list_query.append([_query,1])
  3284. if tenderee!="" and project_name!="":
  3285. _query = [TermQuery(project_tenderee,tenderee),
  3286. TermQuery(project_project_name,project_name)]
  3287. list_query.append([_query,2])
  3288. if tenderee!="" and agency!="":
  3289. _query = [TermQuery(project_tenderee,tenderee),
  3290. TermQuery(project_agency,agency)]
  3291. list_query.append([_query,0])
  3292. if tenderee!="" and float(bidding_budget)>0:
  3293. _query = [TermQuery(project_tenderee,tenderee),
  3294. TermQuery(project_bidding_budget,bidding_budget)]
  3295. list_query.append([_query,2])
  3296. if float(bidding_budget)>0 and float(win_bid_price)>0:
  3297. _query = [TermQuery(project_bidding_budget,bidding_budget),
  3298. TermQuery(project_win_bid_price,win_bid_price)]
  3299. list_query.append([_query,2])
  3300. if tenderee!="" and win_tenderer!="":
  3301. _query = [TermQuery(project_tenderee,tenderee),
  3302. TermQuery(project_win_tenderer,win_tenderer)]
  3303. list_query.append([_query,2])
  3304. if agency!="" and win_tenderer!="":
  3305. _query = [TermQuery(project_agency,agency),
  3306. TermQuery(project_win_tenderer,win_tenderer)]
  3307. list_query.append([_query,0])
  3308. if agency!="" and len(list_product)>0:
  3309. _query = [TermQuery(project_agency,agency),
  3310. should_q_product]
  3311. list_query.append([_query,1])
  3312. if win_tenderer!="" and len(list_code)>0:
  3313. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3314. should_q_code]
  3315. list_query.append([_query,2])
  3316. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3317. should_q_cod]
  3318. list_query.append([_query,2])
  3319. if win_tenderer!="" and sub_project_name!="":
  3320. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3321. TermQuery(project_sub_project_name,sub_project_name)
  3322. ]
  3323. list_query.append([_query,2])
  3324. if win_tenderer!="" and float(win_bid_price)>0:
  3325. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3326. TermQuery(project_win_bid_price,win_bid_price)]
  3327. list_query.append([_query,2])
  3328. if win_tenderer!="" and float(bidding_budget)>0:
  3329. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3330. TermQuery(project_bidding_budget,bidding_budget)]
  3331. list_query.append([_query,2])
  3332. if len(list_code)>0 and len(list_product)>0:
  3333. _query = [should_q_code,
  3334. should_q_product]
  3335. list_query.append([_query,2])
  3336. if len(list_code)>0:
  3337. _query = [
  3338. should_q_code]
  3339. list_query.append([_query,2])
  3340. _query = [
  3341. should_q_cod]
  3342. list_query.append([_query,1])
  3343. if project_name!="" and project_name is not None:
  3344. _query = [
  3345. TermQuery(project_project_name,project_name)]
  3346. list_query.append([_query,1])
  3347. _query_title = [MatchPhraseQuery(project_doctitles,project_name)]
  3348. list_query.append([_query_title,1])
  3349. if len(list_product)>0 and should_q_area is not None:
  3350. _query = [should_q_area,
  3351. should_q_product]
  3352. list_query.append([_query,0])
  3353. generate_time = time.time()-_time
  3354. whole_time = time.time()-whole_time_start
  3355. # log("projects merge rules whole_time:%.3f prepare_time:%.3f log_time:%.3f generate_time:%.3f"%(whole_time,prepare_time,log_time,generate_time))
  3356. return list_query
  3357. def merge_projects(self,list_projects,b_log=False,check_columns=[project_uuid,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_project_name,project_project_code,project_project_codes,project_tenderee,project_agency,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_project_dynamics,project_product,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_nlp_enterprise,project_nlp_enterprise_attachment,project_docids,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_addr,project_tenderee_addr,project_agency_phone,project_agency_contact,project_tenderee_phone,project_tenderee_contact,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_moneysource,project_service_time,project_dup_docid,project_info_source],project_table="project2",project_table_index="project2_index_formerge"):
  3358. '''
  3359. 对项目进行合并
  3360. :return:
  3361. '''
  3362. try:
  3363. whole_time_start = time.time()
  3364. set_uuid = set()
  3365. for _proj in list_projects:
  3366. _uuid = _proj.get("uuid")
  3367. if _uuid is not None:
  3368. set_uuid = set_uuid | set(_uuid.split(","))
  3369. projects_merge_count = 0
  3370. projects_check_rule_time = 0
  3371. projects_update_time = 0
  3372. projects_query_time = 0
  3373. projects_prepare_time = 0
  3374. current_date = getCurrent_date("%Y-%m-%d")
  3375. min_date = timeAdd(current_date,-35,format="%Y-%m-%d")
  3376. search_table = "project2"
  3377. search_table_index = "project2_index_formerge"
  3378. project_cls = Project
  3379. docids = ""
  3380. for _proj in list_projects[:30]:
  3381. must_not_q = []
  3382. for _uuid in list(set_uuid):
  3383. must_not_q.append(TermQuery("uuid",_uuid))
  3384. docids = _proj.get(project_docids,"")
  3385. page_time = _proj.get(project_page_time,"")
  3386. project_codes = _proj.get(project_project_codes,"")
  3387. project_name = _proj.get(project_project_name,"")
  3388. tenderee = _proj.get(project_tenderee,"")
  3389. agency = _proj.get(project_agency,"")
  3390. product = _proj.get(project_product,"")
  3391. sub_project_name = _proj.get(project_sub_project_name,"")
  3392. bidding_budget = _proj.get(project_bidding_budget,-1)
  3393. win_tenderer = _proj.get(project_win_tenderer,"")
  3394. win_bid_price = _proj.get(project_win_bid_price,-1)
  3395. _dynamic = _proj.get(project_project_dynamics,"[]")
  3396. is_yanshou = False
  3397. list_dynamic = json.loads(_dynamic)
  3398. for _d in list_dynamic:
  3399. _title = _d.get("doctitle","")
  3400. if re.search("验收公[示告]|验收结果",_title) is not None or _d.get("docchannel")==122:
  3401. is_yanshou = True
  3402. break
  3403. province = _proj.get(project_province,"")
  3404. city = _proj.get(project_city,"")
  3405. district = _proj.get(project_district,"")
  3406. if is_yanshou:
  3407. page_time_less = timeAdd(page_time,-850)
  3408. page_time_greater = timeAdd(page_time,820)
  3409. else:
  3410. page_time_less = timeAdd(page_time,-450)
  3411. page_time_greater = timeAdd(page_time,420)
  3412. sub_project_q = TermQuery(project_sub_project_name,sub_project_name) if sub_project_name.replace("Project","")!="" else None
  3413. _time = time.time()
  3414. list_must_query = self.getMerge_rules(page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district)
  3415. list_merge_data = []
  3416. search_table = "project2"
  3417. search_table_index = "project2_index_formerge"
  3418. project_cls = Project
  3419. search_table = project_table
  3420. search_table_index = project_table_index
  3421. # print("page_time,min_date",page_time,min_date)
  3422. # if page_time>=min_date:
  3423. # search_table = "project2_tmp"
  3424. # search_table_index = "project2_tmp_index"
  3425. # project_cls = Project_tmp
  3426. _step = 2
  3427. _begin = 0
  3428. must_queries = []
  3429. if page_time_less is not None and page_time_greater is not None:
  3430. must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
  3431. # RangeQuery("status",201,301)
  3432. ]
  3433. #sub_project_name非必要条件
  3434. # if sub_project_q is not None:
  3435. # must_queries.append(sub_project_q)
  3436. projects_prepare_time += time.time()-_time
  3437. _time = time.time()
  3438. sort_type = SortOrder.DESC
  3439. while _begin<len(list_must_query):
  3440. if sort_type==SortOrder.DESC:
  3441. sort_type=SortOrder.ASC
  3442. if sort_type==SortOrder.ASC:
  3443. sort_type=SortOrder.DESC
  3444. list_should_q = []
  3445. _limit = 10
  3446. for must_q,_count in list_must_query[_begin:_begin+_step]:
  3447. must_q1 = list(must_q)
  3448. must_q1.extend(must_queries)
  3449. list_should_q.append(BoolQuery(must_queries=must_q1))
  3450. _limit += _count*5
  3451. _query = BoolQuery(
  3452. should_queries=list_should_q,
  3453. must_not_queries=must_not_q[:100]
  3454. )
  3455. # rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
  3456. # SearchQuery(_query,limit=_limit),
  3457. # columns_to_get=ColumnsToGet(column_names=[project_uuid,project_docids,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_name,project_project_code,project_project_codes,project_project_addr,project_tenderee,project_tenderee_addr,project_tenderee_phone,project_tenderee_contact,project_agency,project_agency_phone,project_agency_contact,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_project_dynamics,project_product,project_moneysource,project_service_time,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_dup_docid,project_info_source,project_nlp_enterprise,project_nlp_enterprise_attachment],return_type=ColumnReturnType.SPECIFIED))
  3458. rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search(search_table,search_table_index,
  3459. SearchQuery(_query,sort=Sort(sorters=[FieldSort(project_page_time,sort_type)]),limit=_limit),
  3460. columns_to_get=ColumnsToGet(column_names=check_columns,return_type=ColumnReturnType.SPECIFIED))
  3461. list_data = getRow_ots(rows)
  3462. list_merge_data.extend(list_data)
  3463. # print(list_data)
  3464. for _data in list_data:
  3465. must_not_q.append(TermQuery(project_uuid,_data.get(project_uuid)))
  3466. _begin += _step
  3467. projects_query_time += time.time()-_time
  3468. #优先匹配招标金额相近的
  3469. projects_merge_count = len(list_merge_data)
  3470. list_merge_data.sort(key=lambda x:x.get(project_page_time,""))
  3471. list_merge_data.sort(key=lambda x:x.get(project_bidding_budget,-1))
  3472. # log(page_time_less+"=="+page_time_greater)
  3473. if b_log:
  3474. log("list_merge_data count:%d"%(len(list_merge_data)))
  3475. list_check_data = []
  3476. for _data in list_merge_data:
  3477. _time = time.time()
  3478. _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
  3479. if b_log:
  3480. log(str(_check))
  3481. projects_check_rule_time += time.time()-_time
  3482. if _check:
  3483. list_check_data.append([_data,_prob])
  3484. list_check_data.sort(key=lambda x:x[1],reverse=True)
  3485. for _data,_ in list_check_data:
  3486. _time = time.time()
  3487. _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
  3488. projects_check_rule_time += time.time()-_time
  3489. _time = time.time()
  3490. if _check:
  3491. # o_proj = project_cls(_data)
  3492. # o_proj.fix_columns(self.ots_client,fix_columns,True)
  3493. # for k in fix_columns:
  3494. # _data[k] = o_proj.getProperties().get(k)
  3495. update_projects_by_project(_data,[_proj])
  3496. projects_update_time += time.time()-_time
  3497. whole_time = time.time()-whole_time_start
  3498. log("%s %s merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(search_table,docids,whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
  3499. return list_projects
  3500. except Exception as e:
  3501. traceback.print_exc()
  3502. assert 1==2
  3503. def dumplicate_document_in_merge(self,list_projects,dup_docid,_docid,_docchannel,document_name="document",b_log=False):
  3504. '''
  3505. 合并时去重
  3506. :param list_projects:
  3507. :return:
  3508. '''
  3509. dup_docid = set([str(a) for a in dup_docid])
  3510. set_dup_total = set()
  3511. docid_item = self.get_attrs_before_dump(_docid)
  3512. best_docid = None
  3513. for _proj in list_projects:
  3514. try:
  3515. docids = _proj.get(project_docids,"")
  3516. set_docids = set([a for a in docids.split(",") if a!=""])
  3517. _project_dynamics = _proj.get(project_project_dynamics,"[]")
  3518. list_dynamics = json.loads(_project_dynamics)
  3519. set_dup_docid = set()
  3520. list_dup_result = [(_docid,docid_item.get("extract_count"))]
  3521. log("=========%s---%s"%(str(set_docids),str(_docid)))
  3522. if str(_docid) in set_docids:
  3523. list_to_dup_docid = []
  3524. for _d in list_dynamics:
  3525. docid = _d.get(document_docid)
  3526. doctitle = _d.get(document_doctitle,"")
  3527. docchannel = _d.get(document_docchannel,0)
  3528. status = _d.get(document_status,0)
  3529. if status>=401:
  3530. continue
  3531. if str(docid) not in set_docids:
  3532. continue
  3533. if str(docid) in dup_docid:
  3534. continue
  3535. if docchannel!=_docchannel:
  3536. continue
  3537. if docid==_docid:
  3538. continue
  3539. list_to_dup_docid.append(_d)
  3540. for _d in list_to_dup_docid:
  3541. docid = _d.get(document_docid)
  3542. _item = self.get_attrs_before_dump(docid)
  3543. _prob = check_dumplicate_rule(docid_item,_item,5,b_log=b_log)
  3544. log("dumplicate_document_in_merge %s-%s prob %.2f"%(str(_docid),str(docid),_prob))
  3545. if _prob>0.4:
  3546. docid = int(docid)
  3547. _d = {"partitionkey":docid%500+1,
  3548. "docid":docid,
  3549. }
  3550. _doc = Document(_d)
  3551. _doc.table_name = document_name
  3552. if _doc.fix_columns(self.ots_client,[document_page_time,document_update_document],True):
  3553. if _doc.getProperties().get(document_update_document,"")!="true":
  3554. list_dup_result.append((docid,_item.get("extract_count")))
  3555. list_dup_result.sort(key=lambda x:x[0])
  3556. list_dup_result.sort(key=lambda x:x[1],reverse=True)
  3557. if len(list_dup_result)>0:
  3558. best_docid1 = list_dup_result[0][0]
  3559. if best_docid1 not in set_dup_total:
  3560. best_docid = best_docid1
  3561. for _d in list_dup_result[1:]:
  3562. set_dup_docid.add(str(_d[0]))
  3563. for _dynamic in list_dynamics:
  3564. if _dynamic.get(document_docid) in set_dup_docid:
  3565. _dynamic[document_status] = 401
  3566. set_docids = set_docids-set_dup_docid-dup_docid
  3567. set_dup_total |= set_dup_docid
  3568. if len(set_docids)==0:
  3569. print(set_dup_docid,dup_docid)
  3570. log("projects set_docids length is zero %s"%(docids))
  3571. return None,None
  3572. else:
  3573. _proj[project_docids] = ",".join(list(set_docids))
  3574. _proj[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
  3575. _proj[project_docid_number] = len(set_docids)
  3576. _proj[project_dup_docid] = ",".join(list(set_dup_docid))
  3577. # log("dumplicate_document docid%s dynamic %d takes%.3f"%(str(docid),len(list_dynamics),time.time()-_time))
  3578. except Exception as e:
  3579. traceback.print_exc()
  3580. if best_docid in set_dup_total:
  3581. best_docid = None
  3582. return best_docid,list(set_dup_total)
  3583. def merge_document_real(self,item,dup_docid,save,document_name="document",project_table="project2",project_table_index="project2_index_formerge",b_log=False):
  3584. '''
  3585. 实时项目合并
  3586. :param item:
  3587. :param dup_docid:重复的公告集合
  3588. :param status_to:
  3589. :return:
  3590. '''
  3591. try:
  3592. list_docids = []
  3593. _docid = item.get(document_tmp_docid)
  3594. list_docids.append(_docid)
  3595. print("dup_docid",dup_docid)
  3596. if save==0:
  3597. dup_docid.insert(0,_docid)
  3598. if isinstance(dup_docid,list):
  3599. list_docids.extend(dup_docid)
  3600. list_docids = [a for a in list_docids if a is not None]
  3601. _time = time.time()
  3602. list_projects = self.search_projects_with_document(list_docids,project_table,project_table_index)
  3603. log("search %d projects takes:%.3f"%(len(list_projects),time.time()-_time))
  3604. if len(list_projects)==0:
  3605. # _time = time.time()
  3606. list_docs = self.search_docs(list_docids,document_name=document_name)
  3607. # log("search document takes:%.3f"%(time.time()-_time))
  3608. # _time = time.time()
  3609. list_projects = self.generate_projects_from_document(list_docs)
  3610. # log("generate projects takes:%.3f"%(time.time()-_time))
  3611. else:
  3612. _time = time.time()
  3613. self.update_projects_by_document(_docid,save,list_projects,document_name=document_name)
  3614. # log("update projects takes:%.3f"%(time.time()-_time))
  3615. _time = time.time()
  3616. list_projects = dumplicate_projects(list_projects)
  3617. # log("dumplicate projects takes:%.3f"%(time.time()-_time))
  3618. _time = time.time()
  3619. list_projects = self.merge_projects(list_projects,b_log,project_table=project_table,project_table_index=project_table_index)
  3620. # log("merge projects takes:%.3f"%(time.time()-_time))
  3621. _time = time.time()
  3622. best_docid,list_merge_dump = self.dumplicate_document_in_merge(list_projects,dup_docid,_docid,item.get(document_docchannel),document_name=document_name,b_log=b_log)
  3623. # log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
  3624. if list_merge_dump is None:
  3625. list_projects = []
  3626. _time = time.time()
  3627. project_json = to_project_json(list_projects)
  3628. # log("json projects takes:%.3f"%(time.time()-_time))
  3629. if b_log:
  3630. log("project_json:%s"%project_json)
  3631. return project_json,best_docid,list_merge_dump
  3632. except Exception as e:
  3633. raise RuntimeError("error on dumplicate")
  3634. def is_exist_fingerprint(self,final_list,_docid,_fingerprint,is_tmp=False):
  3635. set_fingerprint = set()
  3636. for _i in range(1,len(final_list)):
  3637. _dict = final_list[_i]
  3638. b_docid = _dict[document_tmp_docid]
  3639. _save = _dict.get(document_tmp_save,0)
  3640. _status = _dict.get(document_tmp_status,0)
  3641. if not is_tmp:
  3642. if _status>=201 and _status<=300:
  3643. _save = 1
  3644. fingerprint_less = _dict.get(document_tmp_fingerprint,"")
  3645. if b_docid==_docid:
  3646. pass
  3647. else:
  3648. if _save==1:
  3649. set_fingerprint.add(fingerprint_less)
  3650. if _fingerprint in set_fingerprint:
  3651. return True
  3652. return False
  3653. def exists_normal_fingerprint(self,_fingerprint,docid,table_name="document",table_index="document_index"):
  3654. query = BoolQuery(must_queries=[
  3655. RangeQuery("status",201,301),
  3656. TermQuery("fingerprint",_fingerprint),
  3657. RangeQuery("docid",0,docid-400000),
  3658. ]
  3659. )
  3660. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  3661. SearchQuery(query,get_total_count=True,limit=1))
  3662. if total_count>0:
  3663. return True
  3664. return False
  3665. def check_page_time(self,item,table_name="document",table_index="document_index"):
  3666. page_time = item.get(document_page_time,"")
  3667. has_before = False
  3668. has_after = False
  3669. bidclose_time = page_time
  3670. web_source_name = item.get(document_tmp_web_source_name,"")
  3671. docchannel = item.get(document_tmp_docchannel,"0")
  3672. try:
  3673. docchannel = int(docchannel)
  3674. except:
  3675. docchannel = 0
  3676. if docchannel<200:
  3677. if len(page_time)>0:
  3678. l_page_time = timeAdd(page_time,days=-90)
  3679. dict_time = item.get("dict_time",{})
  3680. for k,v in dict_time.items():
  3681. if v is not None and len(v)>0:
  3682. if l_page_time>v:
  3683. has_before = True
  3684. if v>page_time:
  3685. has_after = True
  3686. if k==document_tmp_time_bidclose:
  3687. bidclose_time = v
  3688. set_web_source = {"中国招标投标公共服务平台","比地招标"}
  3689. if web_source_name in set_web_source and bidclose_time<page_time:
  3690. return False
  3691. log("%s check page_time has_before %s has_after %s"%(str(item.get(document_docid)),str(has_before),str(has_after)))
  3692. if has_before:
  3693. _query = BoolQuery(must_queries=[MatchPhraseQuery(document_doctitle,item.get(document_doctitle,""))],
  3694. must_not_queries=[TermQuery(document_docid,item.get(document_docid,0))])
  3695. if not has_after:
  3696. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  3697. SearchQuery(_query,get_total_count=True,limit=1))
  3698. if total_count>0:
  3699. log("%s check page_time false %s==%s-%s"%(str(item.get(document_docid)),l_page_time,k,v))
  3700. return False
  3701. if item.get(document_web_source_name,"")=="中国政府采购网":
  3702. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  3703. SearchQuery(_query,get_total_count=True,limit=1))
  3704. if total_count>0:
  3705. log("%s check 中国政府采购网 false "%(str(item.get(document_docid))))
  3706. return False
  3707. return True
  3708. def dumplicate_comsumer_handle_interface(self,docid,document_table,document_table_index,project_table,project_table_index,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document],b_log=False,upgrade=False):
  3709. result_dict = {"success":True}
  3710. try:
  3711. bool_query = BoolQuery(must_queries=[
  3712. TermQuery("docid",docid)
  3713. ])
  3714. rows,next_token,total_count,is_all_succeed = self.ots_client.search(document_table,document_table_index,
  3715. SearchQuery(bool_query,limit=1,get_total_count=True),
  3716. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3717. list_dict = getRow_ots(rows)
  3718. if len(list_dict)==0:
  3719. raise RuntimeError("未查找到docid为%s的数据"%(str(docid)))
  3720. item = list_dict[0]
  3721. self.post_extract(item)
  3722. log("dumplicate start on:%s"%(str(item.get(document_tmp_docid))))
  3723. base_list = []
  3724. set_docid = set()
  3725. list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=False,to_log=False,table_name=document_table,table_index=document_table_index)
  3726. # print("len_rules",len(list_rules),table_name,table_index)
  3727. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  3728. log("dumplicate %s rules:%d"%(str(item.get(document_tmp_docid)),len(list_rules)))
  3729. list_rules = list_rules[:30]
  3730. _i = 0
  3731. step = 2
  3732. item["confidence"] = 999
  3733. if item.get(document_tmp_docid) not in set_docid:
  3734. base_list.append(item)
  3735. set_docid.add(item.get(document_tmp_docid))
  3736. while _i<len(list_rules):
  3737. must_not_q = []
  3738. if len(base_list)>0:
  3739. must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
  3740. _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
  3741. must_not_queries=must_not_q)
  3742. _rule = list_rules[_i]
  3743. confidence = _rule["confidence"]
  3744. singleNum_keys = _rule["singleNum_keys"]
  3745. contain_keys = _rule["contain_keys"]
  3746. multiNum_keys = _rule["multiNum_keys"]
  3747. self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document],b_log=b_log)
  3748. _i += step
  3749. _time = time.time()
  3750. # log("%d start final check with length:%d"%(item["docid"],len(base_list)))
  3751. final_list = self.dumplicate_fianl_check(base_list,b_log)
  3752. exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),is_tmp=table_name=="document_tmp")
  3753. exist_normal_fingerprint = self.exists_normal_fingerprint(item.get(document_tmp_fingerprint),item.get(document_tmp_docid),table_name=table_name,table_index=table_index)
  3754. # print("exist_normal_fingerprint",exist_normal_fingerprint)
  3755. # log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
  3756. best_docid = self.get_best_docid(final_list)
  3757. final_list_docid = [a["docid"] for a in final_list]
  3758. # log("%d:final_list_docid:%s"%(item["docid"],str(final_list_docid)))
  3759. _d = {"partitionkey":item["partitionkey"],
  3760. "docid":item["docid"],
  3761. "status":random.randint(*flow_dumplicate_status_to),
  3762. document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  3763. }
  3764. dtmp = Document_tmp(_d)
  3765. dup_docid = set()
  3766. for _dict in final_list:
  3767. if _dict.get("update_document","")!="true":
  3768. dup_docid.add(_dict.get(document_tmp_docid))
  3769. if item.get(document_tmp_docid) in dup_docid:
  3770. dup_docid.remove(item.get(document_tmp_docid))
  3771. remove_list = []
  3772. _unnormal = False
  3773. dmp_docid = ""
  3774. _check_time = self.check_page_time(item,table_name=table_name,table_index=table_index)
  3775. if (_check_time and not exist_normal_fingerprint and (len(final_list)==0 or best_docid==item.get(document_tmp_docid))) or item.get("update_document","")=="true":
  3776. dtmp.setValue(document_tmp_save,1,True)
  3777. # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
  3778. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3779. for _dict in final_list:
  3780. if _dict.get(document_tmp_docid) in dup_docid:
  3781. remove_list.append(_dict)
  3782. else:
  3783. if exist_normal_fingerprint:
  3784. log("%s has exist_normal_fingerprint"%(str(item.get(document_docid))))
  3785. best_docid = -1
  3786. dmp_docid = ""
  3787. _unnormal = True
  3788. if not _check_time:
  3789. best_docid = -2
  3790. dmp_docid = ""
  3791. _unnormal = True
  3792. dtmp.setValue(document_tmp_save,0,True)
  3793. if best_docid in dup_docid:
  3794. dup_docid.remove(best_docid)
  3795. for _dict in final_list:
  3796. if _dict.get(document_tmp_docid) in dup_docid:
  3797. remove_list.append(_dict)
  3798. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3799. else:
  3800. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3801. for _dict in final_list:
  3802. if _dict.get(document_tmp_docid) in dup_docid:
  3803. remove_list.append(_dict)
  3804. list_docids = list(dup_docid)
  3805. # if item.get(document_update_document)=="true":
  3806. # dtmp.setValue(document_tmp_save,1,True)
  3807. list_merge_dump = []
  3808. if (exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0) or item.get(document_docchannel,0) in (301,302):
  3809. if exist_finterprint:
  3810. log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
  3811. dtmp.setValue(document_tmp_projects,"[]",True)
  3812. else:
  3813. project_json,merge_best_docid,list_merge_dump = self.merge_document_real(item,list_docids,dtmp.getProperties().get(document_tmp_save),document_name=document_table,project_table=project_table,project_table_index=project_table_index,b_log=b_log)
  3814. if merge_best_docid is not None and (best_docid is None or best_docid==item.get(document_tmp_docid) or best_docid<0):
  3815. best_docid = merge_best_docid
  3816. if list_merge_dump is not None and len(list_merge_dump)>0 and str(item.get(document_tmp_docid)) in list_merge_dump and item.get("update_document","")!="true":
  3817. dtmp.setValue(document_tmp_save,0,True)
  3818. if list_merge_dump is not None:
  3819. dmp_docid = "%s,%s"%(dmp_docid,",".join([str(a) for a in list_merge_dump]))
  3820. dtmp.setValue(document_tmp_projects,project_json,True)
  3821. result_dict["projects"] = project_json
  3822. log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
  3823. dmp_docid = set([a for a in dmp_docid.split(",") if a!=""])
  3824. if str(best_docid) in dmp_docid:
  3825. dmp_docid.remove(str(best_docid))
  3826. dmp_docid = ",".join([str(a) for a in list(dmp_docid)])
  3827. result_dict["best_docid"] = str(best_docid) if best_docid is not None else ""
  3828. result_dict["save"] = dtmp.getProperties().get("save")
  3829. result_dict["dmp_docid"] = dmp_docid
  3830. except Exception as e:
  3831. result_dict["success"] = False
  3832. result_dict["errmsg"] = str(e)
  3833. return result_dict
  3834. def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
  3835. try:
  3836. start_time = time.time()
  3837. b_log = False if upgrade else True
  3838. self.post_extract(item)
  3839. log("dumplicate start on:%s"%(str(item.get(document_tmp_docid))))
  3840. base_list = []
  3841. set_docid = set()
  3842. list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=get_all,to_log=b_log)
  3843. # print("len_rules",len(list_rules),table_name,table_index)
  3844. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  3845. log("dumplicate %s rules:%d"%(str(item.get(document_tmp_docid)),len(list_rules)))
  3846. list_rules = list_rules[:30]
  3847. _i = 0
  3848. step = 2
  3849. item["confidence"] = 999
  3850. if item.get(document_tmp_docid) not in set_docid:
  3851. base_list.append(item)
  3852. set_docid.add(item.get(document_tmp_docid))
  3853. while _i<len(list_rules):
  3854. must_not_q = []
  3855. if len(base_list)>0:
  3856. must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
  3857. _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
  3858. must_not_queries=must_not_q)
  3859. _rule = list_rules[_i]
  3860. confidence = _rule["confidence"]
  3861. singleNum_keys = _rule["singleNum_keys"]
  3862. contain_keys = _rule["contain_keys"]
  3863. multiNum_keys = _rule["multiNum_keys"]
  3864. self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document,document_tmp_web_source_name,'detail_link','products'],b_log=b_log)
  3865. _i += step
  3866. _time = time.time()
  3867. # log("%d start final check with length:%d"%(item["docid"],len(base_list)))
  3868. final_list = self.dumplicate_fianl_check(base_list,b_log)
  3869. exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),is_tmp=table_name=="document_tmp")
  3870. exist_normal_fingerprint = self.exists_normal_fingerprint(item.get(document_tmp_fingerprint),item.get(document_tmp_docid))
  3871. # print("exist_normal_fingerprint",exist_normal_fingerprint)
  3872. # log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
  3873. best_docid = self.get_best_docid(final_list)
  3874. final_list_docid = [a["docid"] for a in final_list]
  3875. # log("%d:final_list_docid:%s"%(item["docid"],str(final_list_docid)))
  3876. _d = {"partitionkey":item["partitionkey"],
  3877. "docid":item["docid"],
  3878. "status":random.randint(*flow_dumplicate_status_to),
  3879. document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  3880. }
  3881. dtmp = Document_tmp(_d)
  3882. dup_docid = set()
  3883. for _dict in final_list:
  3884. if _dict.get("update_document","")!="true":
  3885. dup_docid.add(_dict.get(document_tmp_docid))
  3886. if item.get(document_tmp_docid) in dup_docid:
  3887. dup_docid.remove(item.get(document_tmp_docid))
  3888. remove_list = []
  3889. _unnormal = False
  3890. dmp_docid = ""
  3891. _check_time = self.check_page_time(item)
  3892. if (_check_time and not exist_normal_fingerprint and (len(final_list)==0 or best_docid==item.get(document_tmp_docid))) or item.get("update_document","")=="true":
  3893. dtmp.setValue(document_tmp_save,1,True)
  3894. # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
  3895. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3896. for _dict in final_list:
  3897. if _dict.get(document_tmp_docid) in dup_docid:
  3898. remove_list.append(_dict)
  3899. else:
  3900. if exist_normal_fingerprint:
  3901. log("%s has exist_normal_fingerprint"%(str(item.get(document_docid))))
  3902. best_docid = -1
  3903. dmp_docid = ""
  3904. _unnormal = True
  3905. if not _check_time:
  3906. best_docid = -2
  3907. dmp_docid = ""
  3908. _unnormal = True
  3909. dtmp.setValue(document_tmp_save,0,True)
  3910. if best_docid in dup_docid:
  3911. dup_docid.remove(best_docid)
  3912. for _dict in final_list:
  3913. if _dict.get(document_tmp_docid) in dup_docid:
  3914. remove_list.append(_dict)
  3915. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3916. else:
  3917. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3918. for _dict in final_list:
  3919. if _dict.get(document_tmp_docid) in dup_docid:
  3920. remove_list.append(_dict)
  3921. list_docids = list(dup_docid)
  3922. # if item.get(document_update_document)=="true":
  3923. # dtmp.setValue(document_tmp_save,1,True)
  3924. list_merge_dump = []
  3925. if (exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0) or item.get(document_docchannel,0) in (301,302):
  3926. if exist_finterprint:
  3927. log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
  3928. dtmp.setValue(document_tmp_projects,"[]",True)
  3929. else:
  3930. project_json,merge_best_docid,list_merge_dump = self.merge_document_real(item,list_docids,dtmp.getProperties().get(document_tmp_save),b_log=b_log)
  3931. if merge_best_docid is not None and (best_docid is None or best_docid==item.get(document_tmp_docid) or best_docid<0):
  3932. best_docid = merge_best_docid
  3933. if list_merge_dump is not None and len(list_merge_dump)>0 and str(item.get(document_tmp_docid)) in list_merge_dump and item.get("update_document","")!="true":
  3934. dtmp.setValue(document_tmp_save,0,True)
  3935. if list_merge_dump is not None:
  3936. dmp_docid = "%s,%s"%(dmp_docid,",".join([str(a) for a in list_merge_dump]))
  3937. dtmp.setValue(document_tmp_projects,project_json,True)
  3938. log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
  3939. dmp_docid = set([a for a in dmp_docid.split(",") if a!=""])
  3940. if str(best_docid) in dmp_docid:
  3941. dmp_docid.remove(str(best_docid))
  3942. dmp_docid = ",".join([str(a) for a in list(dmp_docid)])
  3943. if _unnormal:
  3944. dmp_docid = ""
  3945. if upgrade:
  3946. # print(dtmp.getProperties())
  3947. dmp_docid = dmp_docid.replace(",,",",")
  3948. dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
  3949. dtmp.setValue(document_tmp_best_docid,best_docid,True)
  3950. _flag = dtmp.update_row(self.ots_client)
  3951. if not _flag:
  3952. for i in range(10):
  3953. list_proj_json = dtmp.getProperties().get(document_tmp_projects)
  3954. if list_proj_json is not None:
  3955. list_proj = json.loads(list_proj_json)
  3956. dtmp.setValue(document_tmp_projects,json.dumps(list_proj[:len(list_proj)//2]),True)
  3957. if dtmp.update_row(self.ots_client):
  3958. break
  3959. self.changeSaveStatus(remove_list)
  3960. self.changeSaveStatus(list_merge_dump)
  3961. else:
  3962. return list_docids
  3963. except Exception as e:
  3964. traceback.print_exc()
  3965. log("dumplicate error on:%s"%(str(item.get(document_tmp_docid))))
  3966. finally:
  3967. log("dumplicate end on:%s"%(str(item.get(document_tmp_docid))))
  3968. self.queue_dumplicate_processed.put(item.get(document_tmp_docid))
  3969. def fix_doc_which_not_in_project(self):
  3970. '''
  3971. 将成品公告中不存在于project2的数据取出,并放入document_tmp中重新进行去重和合并
  3972. :return:
  3973. '''
  3974. def fix_doc_handle(item,result_queue):
  3975. _docid = item.get(document_tmp_docid)
  3976. b_q = BoolQuery(must_queries=[TermQuery(project_docids,str(_docid))])
  3977. rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
  3978. SearchQuery(b_q,get_total_count=True),
  3979. ColumnsToGet(return_type=ColumnReturnType.NONE))
  3980. if total_count==0:
  3981. log("fix_doc:%s not in project2"%(str(_docid)))
  3982. d_tmp = Document_tmp(item)
  3983. d_tmp.setValue(document_tmp_status,flow_dumplicate_status_from[0],True)
  3984. d_tmp.update_row(self.ots_client)
  3985. current_date = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  3986. before_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-20)
  3987. after_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-5)
  3988. if self.fix_doc_docid is None:
  3989. bool_query = BoolQuery(must_queries=[
  3990. TermQuery(document_tmp_save,1),
  3991. RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
  3992. RangeQuery(document_tmp_docchannel,0,300),
  3993. RangeQuery(document_tmp_opertime,before_date,after_date)
  3994. ])
  3995. else:
  3996. bool_query = BoolQuery(must_queries=[
  3997. TermQuery(document_tmp_save,1),
  3998. RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
  3999. RangeQuery(document_tmp_docchannel,0,300),
  4000. RangeQuery(document_tmp_docid,self.fix_doc_docid),
  4001. RangeQuery(document_tmp_opertime,before_date,after_date)
  4002. ])
  4003. list_data = []
  4004. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  4005. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),get_total_count=True,limit=100),
  4006. ColumnsToGet(return_type=ColumnReturnType.NONE))
  4007. list_d = getRow_ots(rows)
  4008. list_data.extend(list_d)
  4009. while next_token:
  4010. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  4011. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  4012. ColumnsToGet(return_type=ColumnReturnType.NONE))
  4013. list_d = getRow_ots(rows)
  4014. list_data.extend(list_d)
  4015. print("%d/%d"%(len(list_data),total_count))
  4016. if len(list_data)>0:
  4017. self.fix_doc_docid = list_data[-1].get(document_tmp_docid)
  4018. log("current fix_doc_docid:%s"%(str(self.fix_doc_docid)))
  4019. task_queue = Queue()
  4020. for _data in list_data:
  4021. task_queue.put(_data)
  4022. mt = MultiThreadHandler(task_queue,fix_doc_handle,None,30)
  4023. mt.run()
  4024. def send_daily_check_data(self):
  4025. import datetime
  4026. def get_download_url(bucket, ObjectName, timeout):
  4027. url = ""
  4028. exist = bucket.object_exists(ObjectName)
  4029. if exist:
  4030. get_url = False
  4031. for i in range(3):
  4032. try:
  4033. url = bucket.sign_url('GET', ObjectName, timeout)
  4034. url = url.replace("-internal", "") # 替换地址里的内网标识
  4035. get_url = True
  4036. except:
  4037. pass
  4038. if get_url:
  4039. break
  4040. return url
  4041. file_timeout = 60 * 60 * 24 * 5 # 文件下载链接保存 5 天
  4042. # 获取昨天的日期
  4043. date = str(datetime.date.today() - datetime.timedelta(days=1))
  4044. oss_path = 'tmp_document_quality_data/'
  4045. object_path = oss_path + date + '/'
  4046. msg = "每日数据质量检查结果(报警):"
  4047. csv_name = "数据质量监控检查结果.xlsx"
  4048. ObjectName = object_path + csv_name
  4049. url = get_download_url(self.bucket,ObjectName,file_timeout)
  4050. if url:
  4051. msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
  4052. csv_name = "公告重复量大的编号.xlsx"
  4053. ObjectName = object_path + csv_name
  4054. url = get_download_url(self.bucket, ObjectName, file_timeout)
  4055. if url:
  4056. msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
  4057. csv_name = "公告附件重复量大的编号.xlsx"
  4058. ObjectName = object_path + csv_name
  4059. url = get_download_url(self.bucket, ObjectName, file_timeout)
  4060. if url:
  4061. msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
  4062. csv_name = "附件识别异常的站源.xlsx"
  4063. ObjectName = object_path + csv_name
  4064. url = get_download_url(self.bucket, ObjectName, file_timeout)
  4065. if url:
  4066. msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
  4067. csv_name = "报名时间,截止时间在发布时间之前的公告.xlsx"
  4068. ObjectName = object_path + csv_name
  4069. url = get_download_url(self.bucket, ObjectName, file_timeout)
  4070. if url:
  4071. msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
  4072. atMobiles = ['18813973429'] # 维阵
  4073. ACCESS_TOKEN_DATAWORKS = "https://oapi.dingtalk.com/robot/send?access_token=9489f01c4ab9f0c3f87e2ff5c3e35eb9fb0d17afb6244de4683596df1111daea"
  4074. sentMsgToDD(msg,ACCESS_TOKEN_DATAWORKS,atMobiles=atMobiles)
  4075. def send_daily_check_data2(self):
  4076. import datetime
  4077. import pandas as pd
  4078. from itertools import groupby
  4079. dict_channel = {"公告变更": 51,
  4080. "招标公告": 52,
  4081. "中标信息": 101,
  4082. "招标预告": 102,
  4083. "招标答疑": 103,
  4084. "资审结果": 105,
  4085. "法律法规": 106,
  4086. "新闻资讯": 107,
  4087. "采购意向": 114,
  4088. "拍卖出让": 115,
  4089. "土地矿产": 116,
  4090. "产权交易": 117,
  4091. "废标公告": 118,
  4092. "候选人公示": 119,
  4093. "合同公告": 120}
  4094. label2channel = {v:k for k,v in dict_channel.items()}
  4095. def post_data(url,json_data):
  4096. post_sucess = False
  4097. for i in range(3):
  4098. if not post_sucess:
  4099. try:
  4100. # 发送POST请求,传输JSON数据
  4101. response = requests.post(url, json=json_data)
  4102. # 检查响应状态码
  4103. if response.status_code == 200:
  4104. post_sucess = True
  4105. except requests.exceptions.RequestException as e:
  4106. log("send_daily_check_data2,post error reason: %s"%(str(e)))
  4107. pass
  4108. return post_sucess
  4109. res_json = {
  4110. "data": [],
  4111. "count": 0
  4112. }
  4113. # 获取昨天的日期
  4114. date = str(datetime.date.today() - datetime.timedelta(days=1))
  4115. oss_path = 'tmp_document_quality_data/'
  4116. object_path = oss_path + date + '/'
  4117. csv_name = "数据质量监控检查结果.xlsx"
  4118. ObjectName = object_path + csv_name
  4119. LocalPath = os.path.join(self.current_path,"download",csv_name)
  4120. down_res = downloadFile(self.bucket,ObjectName,LocalPath,retry=3)
  4121. if down_res:
  4122. df = pd.read_excel(LocalPath)
  4123. for web_source_no,original_docchannel,error_rule in zip(df['web_source_no'],df['original_docchannel'],df['error_rule']):
  4124. error_rule = json.loads(error_rule)
  4125. for error_type,error_sample in error_rule.items():
  4126. tmp_data = {
  4127. "WEB_SOURCE_NO": web_source_no,
  4128. "WEBTYPE": label2channel.get(original_docchannel, ""),
  4129. "TYPE": error_type,
  4130. "ITEMS": error_sample
  4131. }
  4132. res_json['data'].append(tmp_data)
  4133. res_json['count'] += 1
  4134. os.remove(LocalPath)
  4135. csv_name = "公告重复量大的编号.xlsx"
  4136. ObjectName = object_path + csv_name
  4137. down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
  4138. if down_res:
  4139. df = pd.read_excel(LocalPath)
  4140. tmp_list = []
  4141. for web_source_no,fingerprint,original_docchannel,cnt,res in zip(df['web_source_no'], df['fingerprint'],
  4142. df['original_docchannel'],df['cnt'],df['res']):
  4143. tmp_data = {
  4144. "WEB_SOURCE_NO": web_source_no,
  4145. "WEBTYPE": label2channel.get(original_docchannel, ""),
  4146. "TYPE": "编号公告重复",
  4147. "FINGERPRINT": fingerprint,
  4148. "ITEMS": json.loads(res)
  4149. }
  4150. tmp_list.append(tmp_data)
  4151. tmp_list.sort(key=lambda x: x['WEB_SOURCE_NO'])
  4152. for key, group in groupby(tmp_list, lambda x: (x['WEB_SOURCE_NO'])):
  4153. group = list(group)[:5]
  4154. res_json['data'].extend(group)
  4155. res_json['count'] += len(group)
  4156. os.remove(LocalPath)
  4157. csv_name = "公告附件重复量大的编号.xlsx"
  4158. ObjectName = object_path + csv_name
  4159. down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
  4160. if down_res:
  4161. df = pd.read_excel(LocalPath)
  4162. tmp_list = []
  4163. for web_source_no,filemd5,original_docchannel,cnt,res in zip(df['web_source_no'],df['filemd5'],
  4164. df['original_docchannel'],df['cnt'],df['res']):
  4165. tmp_data = {
  4166. "WEB_SOURCE_NO": web_source_no,
  4167. "WEBTYPE": label2channel.get(original_docchannel, ""),
  4168. "TYPE": "编号附件重复",
  4169. "FILEMD5": filemd5,
  4170. "ITEMS": json.loads(res)
  4171. }
  4172. tmp_list.append(tmp_data)
  4173. tmp_list.sort(key=lambda x: x['WEB_SOURCE_NO'])
  4174. for key, group in groupby(tmp_list, lambda x: (x['WEB_SOURCE_NO'])):
  4175. group = list(group)[:5]
  4176. res_json['data'].extend(group)
  4177. res_json['count'] += len(group)
  4178. os.remove(LocalPath)
  4179. csv_name = "附件识别异常的站源.xlsx"
  4180. ObjectName = object_path + csv_name
  4181. down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
  4182. if down_res:
  4183. df = pd.read_excel(LocalPath)
  4184. for web_source_no,original_docchannel,error_ratio,error_sample,res in zip(df['web_source_no'], df['original_docchannel'],
  4185. df['error_ratio'],df['error_sample'],df['res']):
  4186. tmp_data = {
  4187. "WEB_SOURCE_NO": web_source_no,
  4188. "WEBTYPE": label2channel.get(original_docchannel, ""),
  4189. "TYPE": "附件识别异常",
  4190. "ITEMS": json.loads(res)
  4191. }
  4192. res_json['data'].append(tmp_data)
  4193. res_json['count'] += 1
  4194. os.remove(LocalPath)
  4195. csv_name = "报名时间,截止时间在发布时间之前的公告.xlsx"
  4196. ObjectName = object_path + csv_name
  4197. down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
  4198. if down_res:
  4199. df = pd.read_excel(LocalPath)
  4200. tmp_list = []
  4201. for web_source_no,original_docchannel,res in zip(df['web_source_no'],df['original_docchannel'],df['res']):
  4202. tmp_data = {
  4203. "WEB_SOURCE_NO": web_source_no,
  4204. "WEBTYPE": label2channel.get(original_docchannel, ""),
  4205. "TYPE": "截止日期在发布日期之前",
  4206. "ITEMS": json.loads(res)
  4207. }
  4208. tmp_list.append(tmp_data)
  4209. res_json['data'].extend(tmp_list)
  4210. res_json['count'] += len(tmp_list)
  4211. os.remove(LocalPath)
  4212. # url = "http://120.132.118.205:17090/saveQualityListData"
  4213. url = "http://data-monitor.bidizhaobiao.com/oldApi/saveQualityListData"
  4214. res = post_data(url,res_json)
  4215. if res:
  4216. log("send_daily_check_data2,sent data len: %d"%(res_json['count']))
  4217. # 根据项目修复doc公告数据
  4218. def fix_doc_by_project2(self):
  4219. import datetime
  4220. from itertools import groupby
  4221. from collections import Counter
  4222. label2key = {
  4223. '公告变更': 51,
  4224. '招标公告': 52,
  4225. '中标信息': 101,
  4226. '招标预告': 102,
  4227. '招标答疑': 103,
  4228. '招标文件': 104,
  4229. '资审结果': 105,
  4230. '法律法规': 106,
  4231. '新闻资讯': 107,
  4232. '采购意向': 114,
  4233. '拍卖出让': 115,
  4234. '土地矿产': 116,
  4235. '产权交易': 117,
  4236. '废标公告': 118,
  4237. '候选人公示': 119,
  4238. '合同公告': 120,
  4239. '开标记录': 121,
  4240. '验收合同': 122,
  4241. # 以下排除
  4242. '拟在建数据': 301,
  4243. '审批项目数据': 302,
  4244. '投诉处罚': 303
  4245. }
  4246. key2label = dict((i[1], i[0]) for i in label2key.items())
  4247. today = str(datetime.date.today())
  4248. yesterday = str(datetime.date.today() - datetime.timedelta(days=1))
  4249. front_year = str(datetime.date.today() - datetime.timedelta(days=365))
  4250. bool_query = BoolQuery(must_queries=[RangeQuery("update_time", yesterday + " 00:00:00", today + " 00:00:00"),
  4251. RangeQuery("page_time", front_year, today),
  4252. RangeQuery("status", 201, 301),
  4253. RangeQuery("docid_number", 4, 30)]
  4254. )
  4255. all_rows = []
  4256. rows, next_token, total_count, is_all_succeed = self.ots_client.search("project2", "project2_index",
  4257. SearchQuery(bool_query, sort=Sort(sorters=[
  4258. FieldSort("update_time", SortOrder.ASC)]),
  4259. limit=100, get_total_count=True),
  4260. ColumnsToGet(['uuid', 'docids', 'update_time','docid_number'],
  4261. return_type=ColumnReturnType.SPECIFIED))
  4262. all_rows.extend(rows)
  4263. while next_token:
  4264. rows, next_token, total_count, is_all_succeed = self.ots_client.search("project2", "project2_index",
  4265. SearchQuery(bool_query,
  4266. next_token=next_token,
  4267. sort=Sort(sorters=[
  4268. FieldSort("update_time",SortOrder.ASC)]),
  4269. limit=100,get_total_count=True),
  4270. ColumnsToGet(['uuid', 'docids', 'update_time','docid_number'],
  4271. return_type=ColumnReturnType.SPECIFIED))
  4272. all_rows.extend(rows)
  4273. list_dict = getRow_ots(all_rows)
  4274. docids_list = []
  4275. for _dict in list_dict:
  4276. _uuid = _dict.get("uuid", "")
  4277. _docids = _dict.get("docids", "")
  4278. _docids = _docids.split(",")
  4279. for docid in _docids:
  4280. docids_list.append([_uuid, int(docid)])
  4281. # print('docids_list len:', len(docids_list))
  4282. ots_query_res = []
  4283. doc_columns_list = ['page_time', 'tenderee', 'tenderee_phone', 'agency', 'agency_phone', 'extract_count',
  4284. "sub_docs_json",'extract_json', 'extract_json1', 'extract_json2', 'extract_json3']
  4285. def extract_json_process(res_json):
  4286. # 解析document数据
  4287. extract_json = res_json.pop("extract_json")
  4288. extract_json = extract_json if extract_json else "{}"
  4289. if 'extract_json1' in res_json:
  4290. extract_json1 = res_json.pop("extract_json1")
  4291. extract_json1 = extract_json1 if extract_json1 else ""
  4292. extract_json = extract_json + extract_json1
  4293. if 'extract_json2' in res_json:
  4294. extract_json2 = res_json.pop("extract_json2")
  4295. extract_json2 = extract_json2 if extract_json2 else ""
  4296. extract_json = extract_json + extract_json2
  4297. if 'extract_json3' in res_json:
  4298. extract_json3 = res_json.pop("extract_json3")
  4299. extract_json3 = extract_json3 if extract_json3 else ""
  4300. extract_json = extract_json + extract_json3
  4301. try:
  4302. extract_json = json.loads(extract_json)
  4303. except:
  4304. return None
  4305. docchannel_dict = extract_json.get('docchannel', {})
  4306. res_json['docchannel'] = docchannel_dict.get('docchannel', "")
  4307. res_json['life_docchannel'] = docchannel_dict.get('life_docchannel', "")
  4308. district_dict = extract_json.get('district', {})
  4309. res_json['province'] = district_dict.get('province', "")
  4310. res_json['city'] = district_dict.get('city', "")
  4311. res_json['district'] = district_dict.get('district', "")
  4312. res_json['area'] = district_dict.get('area', "")
  4313. prem = extract_json.get('prem', {})
  4314. res_json['prem'] = prem
  4315. return res_json
  4316. def _handle(item, _):
  4317. # 查询解析document数据
  4318. _uuid = item[0] # project uuid
  4319. _docid = item[1]
  4320. for i in range(3):
  4321. try:
  4322. bool_query = BoolQuery(must_queries=[TermQuery('docid', _docid)]
  4323. )
  4324. rows, next_token, total_count, is_all_succeed = self.ots_client.search("document", "document_index",
  4325. SearchQuery(bool_query,
  4326. sort=Sort(sorters=[FieldSort("page_time",SortOrder.ASC)]),
  4327. limit=None,get_total_count=True),
  4328. ColumnsToGet(doc_columns_list,
  4329. return_type=ColumnReturnType.SPECIFIED))
  4330. res = getRow_ots(rows)
  4331. if res:
  4332. # 通过extract_count过滤掉相关性不大的公告
  4333. if res[0].get('extract_count', 0) > 5:
  4334. ots_query_res.append([_uuid, _docid, extract_json_process(res[0])])
  4335. break
  4336. except Exception as e:
  4337. # print('error:',e)
  4338. pass
  4339. task_queue = Queue()
  4340. for item in docids_list:
  4341. task_queue.put(item)
  4342. if task_queue.qsize() >= 10000:
  4343. _mt = MultiThreadHandler(task_queue, _handle, None, 20)
  4344. _mt.run()
  4345. if task_queue.qsize() >= 0:
  4346. _mt = MultiThreadHandler(task_queue, _handle, None, 20)
  4347. _mt.run()
  4348. # print('ots_query_res len:', len(ots_query_res))
  4349. # 处理修复数据
  4350. ots_query_res.sort(key=lambda x: x[0])
  4351. # 招标类别
  4352. zb_type = [51, 52, 101, 102, 103, 104, 105, 114, 118, 119, 120, 121, 122]
  4353. zb_type = [key2label[i] for i in zb_type]
  4354. change_res = []
  4355. for key, group in groupby(ots_query_res, lambda x: (x[0])):
  4356. uuid = key
  4357. project_data = list(group)
  4358. all_len = len(project_data)
  4359. if all_len < 4:
  4360. continue
  4361. zb_len = sum([1 if i[2].get('docchannel') in zb_type else 0 for i in project_data])
  4362. # 招标类公告占比
  4363. # if zb_len / all_len <= 0.5:
  4364. if zb_len / all_len <= 0.7:
  4365. # 项目不是招标相关项目
  4366. continue
  4367. # 项目里最多的省份
  4368. province_list = [i[2].get('province', '') for i in project_data]
  4369. province_sort = Counter(province_list).most_common()
  4370. change_province = ""
  4371. change_city = ""
  4372. change_district = ""
  4373. change_area = ""
  4374. # if province_sort[0][1]/all_len > 0.5:
  4375. if province_sort[0][1] / all_len > 0.7:
  4376. if province_sort[0][0] and province_sort[0][0] not in ["全国", "未知"]:
  4377. change_province = province_sort[0][0]
  4378. if change_province:
  4379. # 只替换到city,district 取"未知"
  4380. change_province_data = [(i[2].get('province', ''), i[2].get('city', ''), i[2].get('area', '')) for i in
  4381. project_data if i[2].get('province', '') == change_province]
  4382. change_province_data_sort = Counter(change_province_data).most_common()
  4383. change_city = change_province_data_sort[0][0][1]
  4384. change_area = change_province_data_sort[0][0][2]
  4385. change_district = "未知"
  4386. # 联系方式统计
  4387. phone_dict = {}
  4388. for d in project_data:
  4389. tenderee = d[2].get("tenderee", "")
  4390. agency = d[2].get("agency", "")
  4391. prem = d[2].get("prem", {})
  4392. if len(prem) > 0:
  4393. for name, project in prem.items():
  4394. roleList = project.get("roleList", [])
  4395. for role in roleList:
  4396. role_name = role.get("role_name", "")
  4397. role_text = role.get("role_text", "")
  4398. if role_name in ['tenderee', 'agency', 'win_tenderer']:
  4399. linklist = role.get("linklist", [])
  4400. for _contact in linklist:
  4401. if _contact[1] not in phone_dict:
  4402. phone_dict[_contact[1]] = {}
  4403. if role_text not in phone_dict[_contact[1]]:
  4404. phone_dict[_contact[1]][role_text] = 0
  4405. phone_dict[_contact[1]][role_text] += 1
  4406. # 汇总电话对应的实体
  4407. new_phone_dict = dict((phone, []) for phone in phone_dict)
  4408. for phone, value in phone_dict.items():
  4409. phone_name = [(name, count) for name, count in value.items()]
  4410. phone_name.sort(key=lambda x: x[1], reverse=True)
  4411. max_count = phone_name[0][1]
  4412. max_name = [name for name, count in value.items() if count == max_count and max_count > 0]
  4413. new_phone_dict[phone] = max_name
  4414. for item in project_data:
  4415. change_json = {"partitionkey": item[2].get("partitionkey"),
  4416. 'docid': item[1],
  4417. 'contactsByDelete': []}
  4418. tenderee = item[2].get("tenderee", "")
  4419. agency = item[2].get("agency", "")
  4420. # docchannel修复
  4421. docchannel = item[2].get('docchannel', "")
  4422. life_docchannel = item[2].get('life_docchannel', "")
  4423. if docchannel and docchannel not in zb_type:
  4424. if life_docchannel in zb_type and docchannel != '采招数据':
  4425. change_json['docchannel'] = label2key.get(life_docchannel)
  4426. # province修复
  4427. province = item[2].get('province', "")
  4428. if change_province:
  4429. if province != change_province and province in ["全国", "未知", '']: # province未识别时才修复
  4430. change_json['province'] = change_province
  4431. change_json['city'] = change_city
  4432. change_json['district'] = change_district
  4433. change_json['area'] = change_area
  4434. # 联系方式修复
  4435. tenderee_phone = item[2].get("tenderee_phone", "")
  4436. agency_phone = item[2].get("agency_phone", "")
  4437. prem = item[2].get("prem", {})
  4438. sub_docs_json = item[2].get("sub_docs_json", "[]")
  4439. try:
  4440. sub_docs_json = json.loads(sub_docs_json)
  4441. except:
  4442. sub_docs_json = []
  4443. for name, project in prem.items():
  4444. roleList = project.get("roleList", [])
  4445. for role in roleList:
  4446. role_name = role.get("role_name", "")
  4447. role_text = role.get("role_text", "")
  4448. if role_name == 'tenderee' and role_text == tenderee:
  4449. linklist = role.get("linklist", [])
  4450. need_change = False
  4451. right_contact = []
  4452. for _contact in linklist:
  4453. if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
  4454. change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
  4455. if _contact[1] == tenderee_phone:
  4456. need_change = True
  4457. else:
  4458. right_contact.append([_contact[0], _contact[1]])
  4459. if need_change:
  4460. if right_contact:
  4461. right_contact.sort(reverse=True)
  4462. change_json['tendereeContact'] = right_contact[0][0]
  4463. change_json['tendereePhone'] = right_contact[0][1]
  4464. elif role_name == 'agency' and role_text == agency:
  4465. linklist = role.get("linklist", [])
  4466. need_change = False
  4467. right_contact = []
  4468. for _contact in linklist:
  4469. if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
  4470. change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
  4471. if _contact[1] == agency_phone:
  4472. need_change = True
  4473. else:
  4474. right_contact.append([_contact[0], _contact[1]])
  4475. if need_change:
  4476. if right_contact:
  4477. right_contact.sort(reverse=True)
  4478. change_json['agencyContact'] = right_contact[0][0]
  4479. change_json['agencyPhone'] = right_contact[0][1]
  4480. elif role_name == 'win_tenderer':
  4481. linklist = role.get("linklist", [])
  4482. for _contact in linklist:
  4483. if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
  4484. change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
  4485. sub_docs_json_change = False
  4486. if sub_docs_json:
  4487. for _project in sub_docs_json:
  4488. win_tenderer = _project.get("win_tenderer", "")
  4489. win_tenderer_phone = _project.get("win_tenderer_phone", "")
  4490. if win_tenderer_phone and new_phone_dict.get(win_tenderer_phone) and win_tenderer not in new_phone_dict[win_tenderer_phone]:
  4491. _project["win_tenderer_phone"] = ""
  4492. _project["win_tenderer_manager"] = ""
  4493. sub_docs_json_change = True
  4494. if sub_docs_json_change:
  4495. change_json['subDocsJson'] = sub_docs_json
  4496. new_contact_json = []
  4497. for _contact in change_json['contactsByDelete']:
  4498. if _contact not in new_contact_json:
  4499. new_contact_json.append(_contact)
  4500. change_json['contactsByDelete'] = new_contact_json
  4501. if len(change_json) > 3 or len(change_json['contactsByDelete']) > 0:
  4502. # 没有修改地区时,传输原来提取的地区
  4503. if not change_json.get("province"):
  4504. change_json['area'] = item[2].get("area", "")
  4505. change_json['province'] = item[2].get("province", "")
  4506. change_json['city'] = item[2].get("city", "")
  4507. change_json['district'] = item[2].get("district", "")
  4508. change_res.append({"document": change_json})
  4509. # post result
  4510. headers = {'Content-Type': 'application/json',
  4511. "Authorization": "Bearer eyJhbGciOiJIUzUxMiJ9.eyJ1c2VySWQiOjEsInVzZXJuYW1lIjoiYWRtaW4iLCJ1dWlkIjoiNGQwYzA0ODYtMzVmZi00MDJhLTk4OWQtNWEwNTE3YTljMDNiIiwic3ViIjoiMSIsImlhdCI6MTY3OTk5MTcxNywiZXhwIjo0ODMzNTkxNzE3fQ.ESDDnEDYP5ioK4ouHOYXsZbLayGRNVI9ugpbxDx_3fPIceD1KIjlDeopBmeATLoz8VYQihd8qO-UzP5pDsaUmQ"}
  4512. # url = "http://192.168.2.26:8002/document/updateAreaAndContact"
  4513. url = "http://data-api.bidizhaobiao.com/document/updateAreaAndContact"
  4514. for _data in change_res:
  4515. post_sucess = False
  4516. for i in range(3):
  4517. if not post_sucess:
  4518. try:
  4519. # 发送POST请求,传输JSON数据
  4520. response = requests.post(url, json=_data,headers=headers)
  4521. # print(response.status_code,response.json())
  4522. # 检查响应状态码
  4523. if response.status_code == 200:
  4524. post_sucess = True
  4525. except requests.exceptions.RequestException as e:
  4526. # log("fix doc by project2,post error reason: %s"%(str(e)))
  4527. pass
  4528. log("fix doc by project2, change doc nums:%d"%len(change_res))
  4529. def start_flow_dumplicate(self):
  4530. schedule = BlockingScheduler()
  4531. schedule.add_job(self.flow_dumplicate,"cron",second="*/5")
  4532. schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/30")
  4533. schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
  4534. schedule.add_job(self.flow_remove,"cron",hour="20")
  4535. schedule.add_job(self.send_daily_check_data,"cron",hour='9', minute='10')
  4536. schedule.add_job(self.send_daily_check_data2,"cron",hour='9', minute='10')
  4537. schedule.add_job(self.fix_doc_by_project2,"cron",hour='8', minute='10')
  4538. schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
  4539. schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="*/10")
  4540. schedule.start()
  4541. def changeSaveStatus(self,list_dict):
  4542. if list_dict is not None:
  4543. for _dict in list_dict:
  4544. if isinstance(_dict,dict):
  4545. if _dict.get(document_tmp_save,1)==1:
  4546. _d = {"partitionkey":_dict["partitionkey"],
  4547. "docid":_dict["docid"],
  4548. document_tmp_save:0
  4549. }
  4550. _d_tmp = Document_tmp(_d)
  4551. if _d_tmp.exists_row(self.ots_client):
  4552. _d_tmp.update_row(self.ots_client)
  4553. elif isinstance(_dict,int):
  4554. _d = {"partitionkey":_dict%500+1,
  4555. "docid":_dict,
  4556. document_tmp_save:0
  4557. }
  4558. _d_tmp = Document_tmp(_d)
  4559. if _d_tmp.fix_columns(self.ots_client,["status",document_update_document],True):
  4560. if _d_tmp.getProperties().get("status")==1:
  4561. if _d_tmp.getProperties().get(document_update_document,"")!="true":
  4562. _d_tmp.setValue("status",0,True)
  4563. _d_tmp.update_row(self.ots_client)
  4564. def test_dumplicate(self,docid):
  4565. # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
  4566. columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,'detail_link','products']
  4567. # print('columns',columns)
  4568. item = self.get_attrs_before_dump(docid,columns)
  4569. if item:
  4570. log("start dumplicate_comsumer_handle")
  4571. self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=False)
  4572. # self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
  4573. return
  4574. def test_merge(self,list_docid_less,list_docid_greater):
  4575. list_docs_less = self.search_docs(list_docid_less)
  4576. list_projects_less = self.generate_projects_from_document(list_docs_less)
  4577. list_docs_greater = self.search_docs(list_docid_greater)
  4578. list_projects_greater = self.generate_projects_from_document(list_docs_greater)
  4579. list_projects_less.extend(list_projects_greater)
  4580. list_projects = dumplicate_projects(list_projects_less,b_log=True)
  4581. project_json = to_project_json(list_projects)
  4582. log("project_json:%s"%project_json)
  4583. return project_json
  4584. def getRemainDoc(self,docid):
  4585. columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
  4586. bool_query = BoolQuery(must_queries=[
  4587. TermQuery("docid",docid)
  4588. ])
  4589. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  4590. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  4591. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  4592. list_dict = getRow_ots(rows)
  4593. if len(list_dict)>0:
  4594. item = list_dict[0]
  4595. start_time = time.time()
  4596. self.post_extract(item)
  4597. base_list = []
  4598. set_docid = set()
  4599. list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,to_log=True)
  4600. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  4601. _i = 0
  4602. step = 5
  4603. item["confidence"] = 999
  4604. if item.get(document_tmp_docid) not in set_docid:
  4605. base_list.append(item)
  4606. set_docid.add(item.get(document_tmp_docid))
  4607. while _i<len(list_rules):
  4608. must_not_q = []
  4609. if len(base_list)>0:
  4610. must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
  4611. _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
  4612. must_not_queries=must_not_q)
  4613. _rule = list_rules[_i]
  4614. confidence = _rule["confidence"]
  4615. singleNum_keys = _rule["singleNum_keys"]
  4616. contain_keys = _rule["contain_keys"]
  4617. multiNum_keys = _rule["multiNum_keys"]
  4618. self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json])
  4619. _i += step
  4620. _time = time.time()
  4621. log("%d start final check with length:%d"%(item["docid"],len(base_list)))
  4622. final_list = self.dumplicate_fianl_check(base_list)
  4623. log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
  4624. best_docid = self.get_best_docid(final_list)
  4625. return best_docid
  4626. return None
  4627. def compare_dumplicate_check():
  4628. import pandas as pd
  4629. df_dump = Dataflow_dumplicate(start_delete_listener=False)
  4630. test_count = 1000
  4631. # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
  4632. columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district]
  4633. bool_query = BoolQuery(must_queries=[
  4634. RangeQuery("docid",400453395,400463395)
  4635. ])
  4636. rows,next_token,total_count,is_all_succeed = df_dump.ots_client.search("document","document_index",
  4637. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=10,get_total_count=True),
  4638. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  4639. log("flow_dumplicate producer total_count:%d"%total_count)
  4640. list_dict = getRow_ots(rows)
  4641. while 1:
  4642. if not next_token or len(list_dict)>=test_count:
  4643. break
  4644. rows,next_token,total_count,is_all_succeed = df_dump.ots_client.search("document","document_index",
  4645. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  4646. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  4647. list_dict.extend(getRow_ots(rows))
  4648. def _handle1(_item,result_queue):
  4649. try:
  4650. list_docid = df_dump.dumplicate_comsumer_handle(_item,None,df_dump.ots_client,get_all=True,upgrade=False)
  4651. _item["before"] = list_docid
  4652. except Exception as e:
  4653. pass
  4654. dump_result = {}
  4655. for item in list_dict:
  4656. dump_result[item["docid"]] = {}
  4657. task_queue = Queue()
  4658. list_item = []
  4659. for item in list_dict:
  4660. _item = {}
  4661. _item.update(item)
  4662. list_item.append(_item)
  4663. task_queue.put(_item)
  4664. mt = MultiThreadHandler(task_queue,_handle1,None,30)
  4665. mt.run()
  4666. for item in list_item:
  4667. dump_result[item["docid"]]["before"] = item.get("before")
  4668. df_dump.check_rule = 2
  4669. def _handle2(_item,result_queue):
  4670. try:
  4671. list_docid1 = df_dump.dumplicate_comsumer_handle(_item,None,df_dump.ots_client,get_all=True,upgrade=False)
  4672. _item["after"] = list_docid1
  4673. except Exception as e:
  4674. pass
  4675. task_queue = Queue()
  4676. list_item = []
  4677. for item in list_dict:
  4678. _item = {}
  4679. _item.update(item)
  4680. list_item.append(_item)
  4681. task_queue.put(_item)
  4682. mt = MultiThreadHandler(task_queue,_handle2,None,30)
  4683. mt.run()
  4684. for item in list_item:
  4685. dump_result[item["docid"]]["after"] = item.get("after")
  4686. df_data = {"docid":[],
  4687. "before":[],
  4688. "after":[],
  4689. "before-after":[],
  4690. "after-before":[]}
  4691. for docid,_d in dump_result.items():
  4692. df_data["docid"].append(docid)
  4693. before = _d.get("before",[])
  4694. after = _d.get("after",[])
  4695. df_data["before"].append(str(before))
  4696. df_data["after"].append(str(after))
  4697. df_data["before-after"].append(str(set(before)-set(after)))
  4698. df_data["after-before"].append(str(set(after)-set(before)))
  4699. df = pd.DataFrame(df_data,columns=["docid","before","after","before-after","after-before"])
  4700. df.to_excel("compare_dump.xlsx")
  4701. def fix_merge_docid(docid):
  4702. def get_uuid_docids(docid):
  4703. ots_client = getConnect_ots()
  4704. bool_query = BoolQuery(must_queries=[
  4705. TermQuery("docids",docid)
  4706. ])
  4707. rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
  4708. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time")]),limit=100,get_total_count=True),
  4709. ColumnsToGet(["docids"],return_type=ColumnReturnType.SPECIFIED))
  4710. list_row = getRow_ots(rows)
  4711. while next_token:
  4712. rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
  4713. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  4714. ColumnsToGet(["docids"],return_type=ColumnReturnType.SPECIFIED))
  4715. list_row.extend(getRow_ots(rows))
  4716. return list_row
  4717. def get_new_docid(list_docid1,list_docid2):
  4718. return list(set(list_docid1)-set(list_docid2))
  4719. def get_list_docid(list_row):
  4720. list_docid = []
  4721. for row in list_row:
  4722. docids = row.get("docids",'')
  4723. if docids:
  4724. list_docid.extend([int(a) for a in docids.split(",")])
  4725. return list(set(list_docid))
  4726. def get_list_uuid(list_row):
  4727. list_uuid = []
  4728. for row in list_row:
  4729. uuid = row.get("uuid",'')
  4730. if uuid:
  4731. list_uuid.append(uuid)
  4732. return list(set(list_uuid))
  4733. list_row = get_uuid_docids(docid)
  4734. print(list_row)
  4735. list_docid1 = get_list_docid(list_row)
  4736. list_new_docid = get_new_docid(list_docid1,[docid])
  4737. while 1:
  4738. if len(list_new_docid)==0:
  4739. break
  4740. list_row2 = []
  4741. for _docid in list_new_docid:
  4742. list_row2.extend(get_uuid_docids(_docid))
  4743. list_docid1 = get_list_docid(list_row)
  4744. list_docid2 = get_list_docid(list_row2)
  4745. list_new_docid = get_new_docid(list_docid1,list_docid2)
  4746. list_row.extend(list_row2)
  4747. list_uuid = get_list_uuid(list_row)
  4748. list_docid = get_list_docid(list_row)
  4749. print(list_uuid)
  4750. print(list_docid)
  4751. for _docid in list_docid:
  4752. _d = Document({document_partitionkey:_docid%500+1,
  4753. document_docid:_docid,
  4754. document_status:1})
  4755. if _d.exists_row(ots_client):
  4756. _d.update_row(ots_client)
  4757. for _uuid in list_uuid:
  4758. _p = Project({project_uuid:_uuid,})
  4759. _p.delete_row(ots_client)
  4760. if __name__ == '__main__':
  4761. a = time.time()
  4762. # df = Dataflow()
  4763. # df.flow_init()
  4764. # df.flow_test()
  4765. # df.test_merge()
  4766. # df.start_flow_attachment()
  4767. # df.start_flow_extract()
  4768. # df.start_flow_dumplicate()
  4769. # # df.start_flow_merge()
  4770. # df.start_flow_remove()
  4771. # download_attachment()
  4772. # test_attachment_interface()
  4773. df_dump = Dataflow_dumplicate(start_delete_listener=False)
  4774. # df_dump.start_flow_dumplicate()
  4775. df_dump.test_dumplicate(628365020
  4776. )
  4777. # df_dump.dumplicate_comsumer_handle_interface(603504420,document_table="document_0000",document_table_index="document_0000_index",project_table="project_0000",project_table_index="project_0000_index_formerge")
  4778. # compare_dumplicate_check()
  4779. # df_dump.test_merge([391898061
  4780. # ],[371551361,])
  4781. # df_dump.flow_remove_project_tmp()
  4782. # fix_merge_docid(595271944)
  4783. print("takes",time.time()-a)
  4784. # df_dump.fix_doc_which_not_in_project()
  4785. # df_dump.delete_projects_by_document(16288036)
  4786. # log("=======")
  4787. # for i in range(3):
  4788. # time.sleep(20)
  4789. #
  4790. # a = {"docid":74295123}
  4791. # send_msg_toacmq(df_dump.pool_mq_ali,json.dumps(a),df_dump.doc_delete_queue)