dataflow.py 202 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268
  1. # sys.path.append("/data")
  2. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ_ali
  3. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  4. from BaseDataMaintenance.common.multiProcess import MultiHandler
  5. from queue import Queue
  6. from multiprocessing import Queue as PQueue
  7. from BaseDataMaintenance.model.ots.document_tmp import *
  8. from BaseDataMaintenance.model.ots.attachment import *
  9. from BaseDataMaintenance.model.ots.document_html import *
  10. from BaseDataMaintenance.model.ots.document_extract2 import *
  11. from BaseDataMaintenance.model.ots.project import *
  12. from BaseDataMaintenance.model.ots.project2_tmp import *
  13. from BaseDataMaintenance.model.ots.document import *
  14. from BaseDataMaintenance.model.ots.project_process import *
  15. import base64
  16. from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
  17. from uuid import uuid4
  18. from BaseDataMaintenance.common.ossUtils import *
  19. from BaseDataMaintenance.dataSource.source import is_internal,getAuth
  20. from apscheduler.schedulers.blocking import BlockingScheduler
  21. from BaseDataMaintenance.maintenance.dataflow_settings import *
  22. from threading import Thread
  23. import oss2
  24. from BaseDataMaintenance.maxcompute.documentDumplicate import *
  25. from BaseDataMaintenance.maxcompute.documentMerge import *
  26. from BaseDataMaintenance.common.otsUtils import *
  27. from BaseDataMaintenance.common.activateMQUtils import *
  28. from BaseDataMaintenance.dataMonitor.data_monitor import BaseDataMonitor
  29. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  30. def getSet(list_dict,key):
  31. _set = set()
  32. for item in list_dict:
  33. if key in item:
  34. if item[key]!='' and item[key] is not None:
  35. if re.search("^\d[\d\.]*$",item[key]) is not None:
  36. _set.add(str(float(item[key])))
  37. else:
  38. _set.add(str(item[key]))
  39. return _set
  40. def getSimilarityOfString(str1,str2):
  41. _set1 = set()
  42. _set2 = set()
  43. if str1 is not None:
  44. for i in range(1,len(str1)):
  45. _set1.add(str1[i-1:i+1])
  46. if str2 is not None:
  47. for i in range(1,len(str2)):
  48. _set2.add(str2[i-1:i+1])
  49. _len = max(1,min(len(_set1),len(_set2)))
  50. return len(_set1&_set2)/_len
  51. def getDiffIndex(list_dict,key,confidence=100):
  52. _set = set()
  53. for _i in range(len(list_dict)):
  54. item = list_dict[_i]
  55. if item["confidence"]>=confidence:
  56. continue
  57. if key in item:
  58. if item[key]!='' and item[key] is not None:
  59. if re.search("^\d+(\.\d+)?$",item[key]) is not None:
  60. _set.add(str(float(item[key])))
  61. else:
  62. _set.add(str(item[key]))
  63. if len(_set)>1:
  64. return _i
  65. return len(list_dict)
  66. def transformSWF(bucket,attachment_hub_url,objectPath,localpath,swf_dir):
  67. swf_urls = []
  68. try:
  69. list_files = os.listdir(swf_dir)
  70. list_files.sort(key=lambda x:x)
  71. headers = dict()
  72. headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PUBLIC_READ
  73. for _file in list_files:
  74. swf_localpath = "%s/%s"%(swf_dir,_file)
  75. swf_objectPath = "%s/%s"%(objectPath.split(".")[0],_file)
  76. uploadFileByPath(bucket,swf_localpath,swf_objectPath,headers)
  77. _url = "%s/%s"%(attachment_hub_url,swf_objectPath)
  78. swf_urls.append(_url)
  79. os.remove(swf_localpath)
  80. except Exception as e:
  81. traceback.print_exc()
  82. return swf_urls
  83. class Dataflow():
  84. def __init__(self):
  85. self.ots_client = getConnect_ots()
  86. self.queue_init = Queue()
  87. self.queue_attachment = Queue()
  88. self.queue_attachment_ocr = Queue()
  89. self.queue_attachment_not_ocr = Queue()
  90. self.list_attachment_ocr = []
  91. self.list_attachment_not_ocr = []
  92. self.queue_extract = Queue()
  93. self.list_extract = []
  94. self.queue_dumplicate = PQueue()
  95. self.queue_dumplicate_processed = PQueue()
  96. self.dumplicate_set = set()
  97. self.queue_merge = Queue()
  98. self.queue_syncho = Queue()
  99. self.queue_remove = Queue()
  100. self.queue_remove_project = Queue()
  101. self.attachment_rec_interface = ""
  102. self.ots_client_merge = getConnect_ots()
  103. if is_internal:
  104. self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  105. else:
  106. self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  107. if is_internal:
  108. self.extract_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
  109. self.industy_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
  110. self.other_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
  111. else:
  112. self.extract_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
  113. self.industy_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
  114. self.other_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
  115. self.header = {'Content-Type': 'application/json',"Authorization":"NzZmOWZlMmU2MGY3YmQ4MDBjM2E5MDAyZjhjNjQ0MzZlMmE0NTMwZg=="}
  116. self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
  117. self.auth = getAuth()
  118. oss2.defaults.connection_pool_size = 100
  119. oss2.defaults.multiget_num_threads = 20
  120. log("bucket_url:%s"%(self.bucket_url))
  121. self.attachment_bucket_name = "attachment-hub"
  122. self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
  123. self.current_path = os.path.dirname(__file__)
  124. def flow_init(self):
  125. def producer():
  126. bool_query = BoolQuery(must_queries=[RangeQuery("crtime",'2022-04-20')])
  127. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  128. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  129. ColumnsToGet(return_type=ColumnReturnType.ALL))
  130. log("flow_init producer total_count:%d"%total_count)
  131. list_dict = getRow_ots(rows)
  132. for _dict in list_dict:
  133. self.queue_init.put(_dict)
  134. _count = len(list_dict)
  135. while next_token:
  136. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  137. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  138. ColumnsToGet(return_type=ColumnReturnType.ALL))
  139. list_dict = getRow_ots(rows)
  140. for _dict in list_dict:
  141. self.queue_init.put(_dict)
  142. _count += len(list_dict)
  143. def comsumer():
  144. mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
  145. mt.run()
  146. def comsumer_handle(item,result_queue,ots_client):
  147. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  148. if document_tmp_dochtmlcon in item:
  149. item.pop(document_tmp_dochtmlcon)
  150. if document_tmp_doctextcon in item:
  151. item.pop(document_tmp_doctextcon)
  152. if document_tmp_attachmenttextcon in item:
  153. item.pop(document_tmp_attachmenttextcon)
  154. _status = item.get(document_tmp_status)
  155. new_status = None
  156. if _status>=201 and _status<=300:
  157. item[document_tmp_save] = 1
  158. new_status = 81
  159. elif _status>=401 and _status<=450:
  160. item[document_tmp_save] = 0
  161. new_status = 81
  162. else:
  163. new_status = 1
  164. # new_status = 1
  165. item[document_tmp_status] = new_status
  166. dtmp = Document_tmp(item)
  167. dhtml = Document_html({document_tmp_partitionkey:item.get(document_tmp_partitionkey),
  168. document_tmp_docid:item.get(document_tmp_docid),
  169. document_tmp_dochtmlcon:_dochtmlcon})
  170. dtmp.update_row(ots_client)
  171. dhtml.update_row(ots_client)
  172. producer()
  173. comsumer()
  174. def getTitleFromHtml(self,filemd5,_html):
  175. _soup = BeautifulSoup(_html,"lxml")
  176. _find = _soup.find("a",attrs={"data":filemd5})
  177. _title = ""
  178. if _find is not None:
  179. _title = _find.get_text()
  180. return _title
  181. def getSourceLinkFromHtml(self,filemd5,_html):
  182. _soup = BeautifulSoup(_html,"lxml")
  183. _find = _soup.find("a",attrs={"filelink":filemd5})
  184. filelink = ""
  185. if _find is None:
  186. _find = _soup.find("img",attrs={"filelink":filemd5})
  187. if _find is not None:
  188. filelink = _find.attrs.get("src","")
  189. else:
  190. filelink = _find.attrs.get("href","")
  191. return filelink
  192. def request_attachment_interface(self,attach,_dochtmlcon):
  193. filemd5 = attach.getProperties().get(attachment_filemd5)
  194. _status = attach.getProperties().get(attachment_status)
  195. _filetype = attach.getProperties().get(attachment_filetype)
  196. _size = attach.getProperties().get(attachment_size)
  197. _path = attach.getProperties().get(attachment_path)
  198. _uuid = uuid4()
  199. objectPath = attach.getProperties().get(attachment_path)
  200. localpath = os.path.join(self.current_path,"download",_uuid.hex)
  201. docids = attach.getProperties().get(attachment_docids)
  202. try:
  203. if _size>ATTACHMENT_LARGESIZE:
  204. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE)
  205. log("attachment :%s of path:%s to large"%(filemd5,_path))
  206. attach.update_row(self.ots_client)
  207. return True
  208. else:
  209. d_start_time = time.time()
  210. if downloadFile(self.bucket,objectPath,localpath):
  211. time_download = time.time()-d_start_time
  212. _data_base64 = base64.b64encode(open(localpath,"rb").read())
  213. #调用接口处理结果
  214. start_time = time.time()
  215. _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype,kwargs={"timeout":600})
  216. if _success:
  217. log("process filemd5:%s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
  218. else:
  219. log("attach interface failed of docid:%s filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  220. sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  221. _html = ""
  222. return False
  223. swf_images = eval(swf_images)
  224. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  225. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  226. if len(swf_urls)==0:
  227. objectPath = attach.getProperties().get(attachment_path,"")
  228. localpath = os.path.join(self.current_path,"download/%s.swf"%(uuid4().hex))
  229. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  230. if not os.path.exists(swf_dir):
  231. os.mkdir(swf_dir)
  232. for _i in range(len(swf_images)):
  233. _base = swf_images[_i]
  234. _base = base64.b64decode(_base)
  235. filename = "swf_page_%d.png"%(_i)
  236. filepath = os.path.join(swf_dir,filename)
  237. with open(filepath,"wb") as f:
  238. f.write(_base)
  239. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  240. if os.path.exists(swf_dir):
  241. os.rmdir(swf_dir)
  242. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  243. if re.search("<td",_html) is not None:
  244. attach.setValue(attachment_has_table,1,True)
  245. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  246. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  247. if _file_title!="":
  248. attach.setValue(attachment_file_title,_file_title,True)
  249. if filelink!="":
  250. attach.setValue(attachment_file_link,filelink,True)
  251. attach.setValue(attachment_attachmenthtml,_html,True)
  252. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  253. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  254. attach.setValue(attachment_recsize,len(_html),True)
  255. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  256. attach.update_row(self.ots_client) #线上再开放更新
  257. return True
  258. else:
  259. return False
  260. except oss2.exceptions.NotFound as e:
  261. return True
  262. except Exception as e:
  263. traceback.print_exc()
  264. finally:
  265. try:
  266. os.remove(localpath)
  267. except:
  268. pass
  269. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  270. list_html = []
  271. swf_urls = []
  272. for _attach in list_attach:
  273. #测试全跑
  274. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  275. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  276. if _html is None:
  277. _html = ""
  278. list_html.append(_html)
  279. else:
  280. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  281. if not _succeed:
  282. return False,"",[]
  283. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  284. if _html is None:
  285. _html = ""
  286. list_html.append(_html)
  287. if _attach.getProperties().get(attachment_filetype)=="swf":
  288. swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  289. return True,list_html,swf_urls
  290. def generate_dumplicate_query(self,_dict,_dict_must_not,set_match=set(["project_code","project_codes","product"]),set_nested=set(["win_tenderer","bidding_budget","win_bid_price"]),
  291. set_term=set(["project_name","doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save","docid"]),
  292. set_range=set(["page_time","status"]),set_phrase=set(["doctitle"])):
  293. list_must_queries = []
  294. list_must_no_queries = []
  295. for k,v in _dict.items():
  296. if k in set_match:
  297. if isinstance(v,str):
  298. l_s = []
  299. for s_v in v.split(","):
  300. l_s.append(MatchQuery(k,s_v))
  301. list_must_queries.append(BoolQuery(should_queries=l_s))
  302. elif k in set_nested:
  303. _v = v
  304. if k!="" and k=="bidding_budget" or k=="win_bid_price":
  305. _v = float(_v)
  306. list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  307. elif k in set_term:
  308. list_must_queries.append(TermQuery(k,v))
  309. elif k in set_phrase:
  310. list_must_queries.append(MatchPhraseQuery(k,v))
  311. elif k in set_range:
  312. if len(v)==1:
  313. list_must_queries.append(RangeQuery(k,v[0]))
  314. elif len(v)==2:
  315. list_must_queries.append(RangeQuery(k,v[0],v[1],True,True))
  316. for k,v in _dict_must_not.items():
  317. if k in set_match:
  318. if isinstance(v,str):
  319. l_s = []
  320. for s_v in v.split(","):
  321. l_s.append(MatchQuery(k,s_v))
  322. list_must_no_queries.append(BoolQuery(should_queries=l_s))
  323. elif k in set_nested:
  324. _v = v
  325. if k!="" and k=="bidding_budget" or k=="win_bid_price":
  326. _v = float(_v)
  327. list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  328. elif k in set_term:
  329. list_must_no_queries.append(TermQuery(k,v))
  330. elif k in set_range:
  331. if len(v)==1:
  332. list_must_no_queries.append(RangeQuery(k,v[0]))
  333. elif len(v)==2:
  334. list_must_no_queries.append(RangeQuery(k,v[0],v[1],True,True))
  335. return BoolQuery(must_queries=list_must_queries,must_not_queries=list_must_no_queries)
  336. def f_decode_sub_docs_json(self, project_code,project_name,tenderee,agency,sub_docs_json):
  337. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  338. extract_count = 0
  339. if project_code is not None and project_code!="":
  340. extract_count += 1
  341. if project_name is not None and project_name!="":
  342. extract_count += 1
  343. if tenderee is not None and tenderee!="":
  344. extract_count += 1
  345. if agency is not None and agency!="":
  346. extract_count += 1
  347. if sub_docs_json is not None:
  348. sub_docs = json.loads(sub_docs_json)
  349. sub_docs.sort(key=lambda x:float(x.get("bidding_budget",0)),reverse=True)
  350. sub_docs.sort(key=lambda x:float(x.get("win_bid_price",0)),reverse=True)
  351. # log("==%s"%(str(sub_docs)))
  352. for sub_docs in sub_docs:
  353. for _key_sub_docs in sub_docs.keys():
  354. extract_count += 1
  355. if _key_sub_docs in columns:
  356. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  357. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  358. if float(sub_docs[_key_sub_docs])>0:
  359. columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
  360. else:
  361. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  362. return columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count
  363. def post_extract(self,_dict):
  364. win_tenderer,bidding_budget,win_bid_price,extract_count = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
  365. _dict["win_tenderer"] = win_tenderer
  366. _dict["bidding_budget"] = bidding_budget
  367. _dict["win_bid_price"] = win_bid_price
  368. if "extract_count" not in _dict:
  369. _dict["extract_count"] = extract_count
  370. def get_dump_columns(self,_dict):
  371. docchannel = _dict.get(document_tmp_docchannel,0)
  372. project_code = _dict.get(document_tmp_project_code,"")
  373. project_name = _dict.get(document_tmp_project_name,"")
  374. tenderee = _dict.get(document_tmp_tenderee,"")
  375. agency = _dict.get(document_tmp_agency,"")
  376. doctitle_refine = _dict.get(document_tmp_doctitle_refine,"")
  377. win_tenderer = _dict.get("win_tenderer","")
  378. bidding_budget = _dict.get("bidding_budget","")
  379. if bidding_budget==0:
  380. bidding_budget = ""
  381. win_bid_price = _dict.get("win_bid_price","")
  382. if win_bid_price==0:
  383. win_bid_price = ""
  384. page_time = _dict.get(document_tmp_page_time,"")
  385. fingerprint = _dict.get(document_tmp_fingerprint,"")
  386. product = _dict.get(document_tmp_product,"")
  387. return docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product
  388. def f_set_docid_limitNum_contain(self,item, _split,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"]):
  389. flag = True
  390. for _key in singleNum_keys:
  391. if len(getSet(_split,_key))>1:
  392. flag = False
  393. break
  394. for _key in multiNum_keys:
  395. if len(getSet(_split,_key))<=1:
  396. flag = False
  397. break
  398. project_code = item.get("project_code","")
  399. for _key in notlike_keys:
  400. if not flag:
  401. break
  402. for _d in _split:
  403. _key_v = _d.get(_key,"")
  404. _sim = getSimilarityOfString(project_code,_key_v)
  405. if _sim>0.7 and _sim<1:
  406. flag = False
  407. break
  408. #判断组内每条公告是否包含
  409. if flag:
  410. if len(contain_keys)>0:
  411. for _key in contain_keys:
  412. MAX_CONTAIN_COLUMN = None
  413. for _d in _split:
  414. contain_column = _d.get(_key)
  415. if contain_column is not None and contain_column !="":
  416. if MAX_CONTAIN_COLUMN is None:
  417. MAX_CONTAIN_COLUMN = contain_column
  418. else:
  419. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  420. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  421. flag = False
  422. break
  423. MAX_CONTAIN_COLUMN = contain_column
  424. else:
  425. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  426. flag = False
  427. break
  428. if flag:
  429. return _split
  430. return []
  431. def search_data_by_query(self,item,_query,confidence,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count,document_tmp_doctitle]):
  432. list_data = []
  433. if isinstance(_query,list):
  434. bool_query = BoolQuery(should_queries=_query)
  435. else:
  436. bool_query = _query
  437. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  438. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=50,get_total_count=True),
  439. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  440. list_dict = getRow_ots(rows)
  441. for _dict in list_dict:
  442. self.post_extract(_dict)
  443. _dict["confidence"] = confidence
  444. list_data.append(_dict)
  445. # _count = len(list_dict)
  446. # while next_token:
  447. # rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  448. # SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  449. # ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  450. # list_dict = getRow_ots(rows)
  451. # for _dict in list_dict:
  452. # self.post_extract(_dict)
  453. # _dict["confidence"] = confidence
  454. # list_data.append(_dict)
  455. list_dict = self.f_set_docid_limitNum_contain(item,list_dict,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys)
  456. return list_dict
  457. def add_data_by_query(self,item,base_list,set_docid,_query,confidence,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  458. list_dict = self.search_data_by_query(item,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
  459. for _dict in list_dict:
  460. self.post_extract(_dict)
  461. _docid = _dict.get(document_tmp_docid)
  462. if _docid not in set_docid:
  463. base_list.append(_dict)
  464. set_docid.add(_docid)
  465. def translate_dumplicate_rules(self,status_from,item):
  466. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  467. if page_time=='':
  468. page_time = getCurrent_date("%Y-%m-%d")
  469. base_dict = {
  470. "status":[status_from[0]],
  471. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  472. }
  473. must_not_dict = {"save":0}
  474. list_rules = []
  475. singleNum_keys = ["tenderee","win_tenderer"]
  476. if fingerprint!="":
  477. _dict = {}
  478. confidence = 100
  479. _dict[document_tmp_fingerprint] = fingerprint
  480. _dict.update(base_dict)
  481. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  482. _rule = {"confidence":confidence,
  483. "item":item,
  484. "query":_query,
  485. "singleNum_keys":[],
  486. "contain_keys":[],
  487. "multiNum_keys":[]}
  488. list_rules.append(_rule)
  489. if docchannel in (52,118):
  490. if bidding_budget!="" and tenderee!="" and project_code!="":
  491. confidence = 90
  492. _dict = {document_tmp_docchannel:docchannel,
  493. "bidding_budget":item.get("bidding_budget"),
  494. document_tmp_tenderee:item.get(document_tmp_tenderee,""),
  495. document_tmp_project_code:item.get(document_tmp_project_code,"")
  496. }
  497. _dict.update(base_dict)
  498. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  499. _rule = {"confidence":confidence,
  500. "query":_query,
  501. "singleNum_keys":singleNum_keys,
  502. "contain_keys":[],
  503. "multiNum_keys":[document_tmp_web_source_no]}
  504. list_rules.append(_rule)
  505. if doctitle_refine!="" and tenderee!="" and bidding_budget!="":
  506. confidence = 80
  507. _dict = {document_tmp_docchannel:docchannel,
  508. "doctitle_refine":doctitle_refine,
  509. "tenderee":tenderee,
  510. bidding_budget:"bidding_budget"
  511. }
  512. _dict.update(base_dict)
  513. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  514. _rule = {"confidence":confidence,
  515. "query":_query,
  516. "singleNum_keys":singleNum_keys,
  517. "contain_keys":[],
  518. "multiNum_keys":[document_tmp_web_source_no]}
  519. list_rules.append(_rule)
  520. if project_code!="" and doctitle_refine!="" and agency!="" and bidding_budget!="":
  521. confidence = 90
  522. _dict = {document_tmp_docchannel:docchannel,
  523. "project_code":project_code,
  524. "doctitle_refine":doctitle_refine,
  525. "agency":agency,
  526. "bidding_budget":bidding_budget
  527. }
  528. _dict.update(base_dict)
  529. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  530. _rule = {"confidence":confidence,
  531. "query":_query,
  532. "singleNum_keys":singleNum_keys,
  533. "contain_keys":[],
  534. "multiNum_keys":[document_tmp_web_source_no]}
  535. list_rules.append(_rule)
  536. if project_code!="" and tenderee!="" and bidding_budget!="":
  537. confidence = 91
  538. _dict = {document_tmp_docchannel:docchannel,
  539. "project_code":project_code,
  540. "tenderee":tenderee,
  541. "bidding_budget":bidding_budget
  542. }
  543. _dict.update(base_dict)
  544. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  545. _rule = {"confidence":confidence,
  546. "query":_query,
  547. "singleNum_keys":singleNum_keys,
  548. "contain_keys":[],
  549. "multiNum_keys":[document_tmp_web_source_no]}
  550. list_rules.append(_rule)
  551. if doctitle_refine!="" and agency!="" and bidding_budget!="":
  552. confidence = 71
  553. _dict = {document_tmp_docchannel:docchannel,
  554. "doctitle_refine":doctitle_refine,
  555. "agency":agency,
  556. "bidding_budget":bidding_budget
  557. }
  558. _dict.update(base_dict)
  559. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  560. _rule = {"confidence":confidence,
  561. "query":_query,
  562. "singleNum_keys":singleNum_keys,
  563. "contain_keys":[],
  564. "multiNum_keys":[document_tmp_web_source_no]}
  565. list_rules.append(_rule)
  566. if project_code!="" and project_name!="" and agency!="" and bidding_budget!="":
  567. confidence = 91
  568. _dict = {document_tmp_docchannel:docchannel,
  569. "project_code":project_code,
  570. "project_name":project_name,
  571. "agency":agency,
  572. "bidding_budget":bidding_budget
  573. }
  574. _dict.update(base_dict)
  575. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  576. n_singleKeys = [i for i in singleNum_keys]
  577. n_singleKeys.append(document_tmp_web_source_no)
  578. _rule = {"confidence":confidence,
  579. "query":_query,
  580. "singleNum_keys":n_singleKeys,
  581. "contain_keys":[],
  582. "multiNum_keys":[]}
  583. list_rules.append(_rule)
  584. ##-- 5. 招标公告 - 同项目编号- 同[项目名称、标题] - 同[招标人、代理公司] - 同预算(!=0) - 同信息源=1
  585. if project_code!="" and project_name!="" and tenderee!="" and bidding_budget!="":
  586. confidence = 91
  587. _dict = {document_tmp_docchannel:docchannel,
  588. "project_code":project_code,
  589. "project_name":project_name,
  590. "tenderee":tenderee,
  591. "bidding_budget":bidding_budget
  592. }
  593. _dict.update(base_dict)
  594. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  595. n_singleKeys = [i for i in singleNum_keys]
  596. n_singleKeys.append(document_tmp_web_source_no)
  597. _rule = {"confidence":confidence,
  598. "query":_query,
  599. "singleNum_keys":n_singleKeys,
  600. "contain_keys":[],
  601. "multiNum_keys":[]}
  602. list_rules.append(_rule)
  603. if project_code!="" and doctitle_refine!="" and tenderee!="" and bidding_budget!="":
  604. confidence = 71
  605. _dict = {document_tmp_docchannel:docchannel,
  606. "project_code":project_code,
  607. "doctitle_refine":doctitle_refine,
  608. "tenderee":tenderee,
  609. "bidding_budget":bidding_budget
  610. }
  611. _dict.update(base_dict)
  612. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  613. _rule = {"confidence":confidence,
  614. "query":_query,
  615. "singleNum_keys":singleNum_keys,
  616. "contain_keys":[],
  617. "multiNum_keys":[document_tmp_web_source_no]}
  618. list_rules.append(_rule)
  619. #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
  620. if project_name!="" and agency!="":
  621. tmp_bidding = 0
  622. if bidding_budget!="":
  623. tmp_bidding = bidding_budget
  624. confidence = 51
  625. _dict = {document_tmp_docchannel:docchannel,
  626. "project_name":project_name,
  627. "agency":agency,
  628. "bidding_budget":tmp_bidding
  629. }
  630. _dict.update(base_dict)
  631. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  632. _rule = {"confidence":confidence,
  633. "query":_query,
  634. "singleNum_keys":singleNum_keys,
  635. "contain_keys":[],
  636. "multiNum_keys":[document_tmp_web_source_no]}
  637. list_rules.append(_rule)
  638. #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
  639. if project_code!="" and agency!="":
  640. tmp_bidding = 0
  641. if bidding_budget!="":
  642. tmp_bidding = bidding_budget
  643. confidence = 51
  644. _dict = {document_tmp_docchannel:docchannel,
  645. "project_code":project_code,
  646. "agency":agency,
  647. "bidding_budget":tmp_bidding
  648. }
  649. _dict.update(base_dict)
  650. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  651. _rule = {"confidence":confidence,
  652. "query":_query,
  653. "singleNum_keys":singleNum_keys,
  654. "contain_keys":[],
  655. "multiNum_keys":[document_tmp_web_source_no]}
  656. list_rules.append(_rule)
  657. if docchannel not in (101,119,120):
  658. #-- 7. 非中标公告 - 同项目名称 - 同发布日期 - 同招标人 - 同预算 - 同类型 - 信息源>1 - 同项目编号
  659. if project_name!="" and tenderee!="" and project_code!="":
  660. tmp_bidding = 0
  661. if bidding_budget!="":
  662. tmp_bidding = bidding_budget
  663. confidence = 51
  664. _dict = {document_tmp_docchannel:docchannel,
  665. "project_name":project_name,
  666. "tenderee":tenderee,
  667. "project_code":project_code
  668. }
  669. _dict.update(base_dict)
  670. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  671. _rule = {"confidence":confidence,
  672. "query":_query,
  673. "singleNum_keys":singleNum_keys,
  674. "contain_keys":[],
  675. "multiNum_keys":[document_tmp_web_source_no]}
  676. list_rules.append(_rule)
  677. if docchannel in (101,119,120):
  678. #-- 3. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(==0)
  679. if project_code!="" and project_name!="" and win_tenderer!="":
  680. tmp_win = 0
  681. if win_bid_price!="":
  682. tmp_win = win_bid_price
  683. confidence = 61
  684. _dict = {document_tmp_docchannel:docchannel,
  685. "project_code":project_code,
  686. "project_name":project_name,
  687. "win_tenderer":win_tenderer,
  688. "win_bid_price":tmp_win
  689. }
  690. _dict.update(base_dict)
  691. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  692. _rule = {"confidence":confidence,
  693. "query":_query,
  694. "singleNum_keys":singleNum_keys,
  695. "contain_keys":[],
  696. "multiNum_keys":[]}
  697. list_rules.append(_rule)
  698. if project_code!="" and project_name!="" and bidding_budget!="" and product!="":
  699. confidence = 72
  700. _dict = {document_tmp_docchannel:docchannel,
  701. "project_code":project_code,
  702. "project_name":project_name,
  703. "bidding_budget":bidding_budget,
  704. "product":product
  705. }
  706. _dict.update(base_dict)
  707. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  708. n_singleKeys = [i for i in singleNum_keys]
  709. n_singleKeys.append(document_tmp_web_source_no)
  710. _rule = {"confidence":confidence,
  711. "query":_query,
  712. "singleNum_keys":n_singleKeys,
  713. "contain_keys":[],
  714. "multiNum_keys":[]}
  715. list_rules.append(_rule)
  716. if project_code!='' and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  717. confidence = 91
  718. _dict = {document_tmp_docchannel:docchannel,
  719. "project_code":project_code,
  720. "doctitle_refine":doctitle_refine,
  721. "win_tenderer":win_tenderer,
  722. "win_bid_price":win_bid_price
  723. }
  724. _dict.update(base_dict)
  725. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  726. n_singleKeys = [i for i in singleNum_keys]
  727. n_singleKeys.append(document_tmp_web_source_no)
  728. _rule = {"confidence":confidence,
  729. "query":_query,
  730. "singleNum_keys":n_singleKeys,
  731. "contain_keys":[],
  732. "multiNum_keys":[]}
  733. list_rules.append(_rule)
  734. ##-- 2. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(!=0) - 同信息源=1
  735. if project_code!="" and project_name!="" and win_tenderer!="" and win_bid_price!="":
  736. confidence = 91
  737. _dict = {document_tmp_docchannel:docchannel,
  738. "project_code":project_code,
  739. "project_name":project_name,
  740. "win_tenderer":win_tenderer,
  741. "win_bid_price":win_bid_price
  742. }
  743. _dict.update(base_dict)
  744. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  745. n_singleKeys = [i for i in singleNum_keys]
  746. n_singleKeys.append(document_tmp_web_source_no)
  747. _rule = {"confidence":confidence,
  748. "query":_query,
  749. "singleNum_keys":n_singleKeys,
  750. "contain_keys":[],
  751. "multiNum_keys":[]}
  752. list_rules.append(_rule)
  753. if project_name!="" and win_tenderer!="" and win_bid_price!="":
  754. confidence = 91
  755. _dict = {document_tmp_docchannel:docchannel,
  756. "project_name":project_name,
  757. "win_tenderer":win_tenderer,
  758. "win_bid_price":win_bid_price,
  759. }
  760. _dict.update(base_dict)
  761. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  762. _rule = {"confidence":confidence,
  763. "query":_query,
  764. "singleNum_keys":singleNum_keys,
  765. "contain_keys":[],
  766. "multiNum_keys":[document_tmp_web_source_no]}
  767. list_rules.append(_rule)
  768. if project_code!="" and win_tenderer!="" and win_bid_price!="":
  769. confidence = 91
  770. _dict = {document_tmp_docchannel:docchannel,
  771. "project_code":project_code,
  772. "win_tenderer":win_tenderer,
  773. "win_bid_price":win_bid_price,
  774. }
  775. _dict.update(base_dict)
  776. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  777. _rule = {"confidence":confidence,
  778. "query":_query,
  779. "singleNum_keys":singleNum_keys,
  780. "contain_keys":[],
  781. "multiNum_keys":[document_tmp_web_source_no]}
  782. list_rules.append(_rule)
  783. if project_code!="" and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  784. confidence = 91
  785. _dict = {document_tmp_docchannel:docchannel,
  786. "project_code":project_code,
  787. "doctitle_refine":doctitle_refine,
  788. "win_tenderer":win_tenderer,
  789. "win_bid_price":win_bid_price
  790. }
  791. _dict.update(base_dict)
  792. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  793. n_singleKeys = [i for i in singleNum_keys]
  794. n_singleKeys.append(document_tmp_web_source_no)
  795. _rule = {"confidence":confidence,
  796. "query":_query,
  797. "singleNum_keys":n_singleKeys,
  798. "contain_keys":[],
  799. "multiNum_keys":[]}
  800. list_rules.append(_rule)
  801. if doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  802. confidence=90
  803. _dict = {document_tmp_docchannel:docchannel,
  804. "doctitle_refine":doctitle_refine,
  805. "win_tenderer":win_tenderer,
  806. "win_bid_price":win_bid_price
  807. }
  808. _dict.update(base_dict)
  809. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  810. _rule = {"confidence":confidence,
  811. "query":_query,
  812. "singleNum_keys":singleNum_keys,
  813. "contain_keys":[],
  814. "multiNum_keys":[document_tmp_web_source_no]}
  815. list_rules.append(_rule)
  816. if project_name!="" and win_tenderer!="" and win_bid_price!="" and project_code!="":
  817. confidence=95
  818. _dict = {document_tmp_docchannel:docchannel,
  819. "project_name":project_name,
  820. "win_tenderer":win_tenderer,
  821. "win_bid_price":win_bid_price,
  822. "project_code":project_code
  823. }
  824. _dict.update(base_dict)
  825. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  826. _rule = {"confidence":confidence,
  827. "query":_query,
  828. "singleNum_keys":singleNum_keys,
  829. "contain_keys":[],
  830. "multiNum_keys":[document_tmp_web_source_no]}
  831. list_rules.append(_rule)
  832. if docchannel in (51,103,115,116):
  833. #9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
  834. if doctitle_refine!="" and tenderee!="":
  835. tmp_budget = 0
  836. if bidding_budget!="":
  837. tmp_budget = bidding_budget
  838. confidence=81
  839. _dict = {document_tmp_docchannel:docchannel,
  840. "doctitle_refine":doctitle_refine,
  841. "tenderee":tenderee,
  842. "bidding_budget":tmp_budget,
  843. }
  844. _dict.update(base_dict)
  845. _dict["page_time"] = [page_time,page_time]
  846. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  847. _rule = {"confidence":confidence,
  848. "query":_query,
  849. "singleNum_keys":singleNum_keys,
  850. "contain_keys":[],
  851. "multiNum_keys":[document_tmp_web_source_no]}
  852. list_rules.append(_rule)
  853. #-- 9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
  854. if project_code!="" and tenderee!="":
  855. confidence=81
  856. tmp_budget = 0
  857. if bidding_budget!="":
  858. tmp_budget = bidding_budget
  859. _dict = {document_tmp_docchannel:docchannel,
  860. "project_code":project_code,
  861. "tenderee":tenderee,
  862. "bidding_budget":tmp_budget,
  863. }
  864. _dict.update(base_dict)
  865. _dict["page_time"] = [page_time,page_time]
  866. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  867. _rule = {"confidence":confidence,
  868. "query":_query,
  869. "singleNum_keys":singleNum_keys,
  870. "contain_keys":[],
  871. "multiNum_keys":[document_tmp_web_source_no]}
  872. list_rules.append(_rule)
  873. if project_name!="" and tenderee!="":
  874. confidence=81
  875. tmp_budget = 0
  876. if bidding_budget!="":
  877. tmp_budget = bidding_budget
  878. _dict = {document_tmp_docchannel:docchannel,
  879. "project_name":project_name,
  880. "tenderee":tenderee,
  881. "bidding_budget":tmp_budget,
  882. }
  883. _dict.update(base_dict)
  884. _dict["page_time"] = [page_time,page_time]
  885. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  886. _rule = {"confidence":confidence,
  887. "query":_query,
  888. "singleNum_keys":singleNum_keys,
  889. "contain_keys":[],
  890. "multiNum_keys":[document_tmp_web_source_no]}
  891. list_rules.append(_rule)
  892. if agency!="" and tenderee!="":
  893. confidence=81
  894. tmp_budget = 0
  895. if bidding_budget!="":
  896. tmp_budget = bidding_budget
  897. _dict = {document_tmp_docchannel:docchannel,
  898. "agency":agency,
  899. "tenderee":tenderee,
  900. "bidding_budget":tmp_budget,
  901. "product":product
  902. }
  903. _dict.update(base_dict)
  904. _dict["page_time"] = [page_time,page_time]
  905. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  906. _rule = {"confidence":confidence,
  907. "query":_query,
  908. "singleNum_keys":singleNum_keys,
  909. "contain_keys":[],
  910. "multiNum_keys":[document_tmp_web_source_no]}
  911. list_rules.append(_rule)
  912. if agency!="" and project_code!="":
  913. confidence=81
  914. tmp_budget = 0
  915. if bidding_budget!="":
  916. tmp_budget = bidding_budget
  917. _dict = {document_tmp_docchannel:docchannel,
  918. "agency":agency,
  919. "project_code":project_code,
  920. "bidding_budget":tmp_budget,
  921. "product":product
  922. }
  923. _dict.update(base_dict)
  924. _dict["page_time"] = [page_time,page_time]
  925. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  926. _rule = {"confidence":confidence,
  927. "query":_query,
  928. "singleNum_keys":singleNum_keys,
  929. "contain_keys":[],
  930. "multiNum_keys":[document_tmp_web_source_no]}
  931. list_rules.append(_rule)
  932. if agency!="" and project_name!="":
  933. confidence=81
  934. tmp_budget = 0
  935. if bidding_budget!="":
  936. tmp_budget = bidding_budget
  937. _dict = {document_tmp_docchannel:docchannel,
  938. "agency":agency,
  939. "project_name":project_name,
  940. "bidding_budget":tmp_budget,
  941. "product":product
  942. }
  943. _dict.update(base_dict)
  944. _dict["page_time"] = [page_time,page_time]
  945. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  946. _rule = {"confidence":confidence,
  947. "query":_query,
  948. "singleNum_keys":singleNum_keys,
  949. "contain_keys":[],
  950. "multiNum_keys":[document_tmp_web_source_no]}
  951. list_rules.append(_rule)
  952. #五选二
  953. if tenderee!="" and bidding_budget!="" and product!="":
  954. confidence=80
  955. _dict = {document_tmp_docchannel:docchannel,
  956. "tenderee":tenderee,
  957. "bidding_budget":bidding_budget,
  958. "product":product,
  959. }
  960. _dict.update(base_dict)
  961. _dict["page_time"] = [page_time,page_time]
  962. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  963. _rule = {"confidence":confidence,
  964. "query":_query,
  965. "singleNum_keys":singleNum_keys,
  966. "contain_keys":[],
  967. "multiNum_keys":[]}
  968. list_rules.append(_rule)
  969. if tenderee!="" and win_tenderer!="" and product!="":
  970. confidence=80
  971. _dict = {document_tmp_docchannel:docchannel,
  972. "tenderee":tenderee,
  973. "win_tenderer":win_tenderer,
  974. "product":product,
  975. }
  976. _dict.update(base_dict)
  977. _dict["page_time"] = [page_time,page_time]
  978. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  979. _rule = {"confidence":confidence,
  980. "query":_query,
  981. "singleNum_keys":singleNum_keys,
  982. "contain_keys":[],
  983. "multiNum_keys":[]}
  984. list_rules.append(_rule)
  985. if tenderee!="" and win_bid_price!="":
  986. confidence=80
  987. _dict = {document_tmp_docchannel:docchannel,
  988. "tenderee":tenderee,
  989. "win_bid_price":win_bid_price,
  990. "product":product,
  991. }
  992. _dict.update(base_dict)
  993. _dict["page_time"] = [page_time,page_time]
  994. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  995. _rule = {"confidence":confidence,
  996. "query":_query,
  997. "singleNum_keys":singleNum_keys,
  998. "contain_keys":[],
  999. "multiNum_keys":[]}
  1000. list_rules.append(_rule)
  1001. if tenderee!="" and agency!="":
  1002. confidence=80
  1003. _dict = {document_tmp_docchannel:docchannel,
  1004. "tenderee":tenderee,
  1005. "agency":agency,
  1006. "product":product,
  1007. }
  1008. _dict.update(base_dict)
  1009. _dict["page_time"] = [page_time,page_time]
  1010. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1011. _rule = {"confidence":confidence,
  1012. "query":_query,
  1013. "singleNum_keys":singleNum_keys,
  1014. "contain_keys":[],
  1015. "multiNum_keys":[]}
  1016. list_rules.append(_rule)
  1017. if win_tenderer!="" and bidding_budget!="":
  1018. confidence=80
  1019. _dict = {document_tmp_docchannel:docchannel,
  1020. "win_tenderer":win_tenderer,
  1021. "bidding_budget":bidding_budget,
  1022. "product":product,
  1023. }
  1024. _dict.update(base_dict)
  1025. _dict["page_time"] = [page_time,page_time]
  1026. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1027. _rule = {"confidence":confidence,
  1028. "query":_query,
  1029. "singleNum_keys":singleNum_keys,
  1030. "contain_keys":[],
  1031. "multiNum_keys":[]}
  1032. list_rules.append(_rule)
  1033. if win_bid_price!="" and bidding_budget!="":
  1034. confidence=80
  1035. _dict = {document_tmp_docchannel:docchannel,
  1036. "win_bid_price":win_bid_price,
  1037. "bidding_budget":bidding_budget,
  1038. "product":product,
  1039. }
  1040. _dict.update(base_dict)
  1041. _dict["page_time"] = [page_time,page_time]
  1042. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1043. _rule = {"confidence":confidence,
  1044. "query":_query,
  1045. "singleNum_keys":singleNum_keys,
  1046. "contain_keys":[],
  1047. "multiNum_keys":[]}
  1048. list_rules.append(_rule)
  1049. if agency!="" and bidding_budget!="":
  1050. confidence=80
  1051. _dict = {document_tmp_docchannel:docchannel,
  1052. "agency":agency,
  1053. "bidding_budget":bidding_budget,
  1054. "product":product,
  1055. }
  1056. _dict.update(base_dict)
  1057. _dict["page_time"] = [page_time,page_time]
  1058. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1059. _rule = {"confidence":confidence,
  1060. "query":_query,
  1061. "singleNum_keys":singleNum_keys,
  1062. "contain_keys":[],
  1063. "multiNum_keys":[]}
  1064. list_rules.append(_rule)
  1065. if win_tenderer!="" and win_bid_price!="":
  1066. confidence=80
  1067. _dict = {document_tmp_docchannel:docchannel,
  1068. "win_tenderer":win_tenderer,
  1069. "win_bid_price":win_bid_price,
  1070. "product":product,
  1071. }
  1072. _dict.update(base_dict)
  1073. _dict["page_time"] = [page_time,page_time]
  1074. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1075. _rule = {"confidence":confidence,
  1076. "query":_query,
  1077. "singleNum_keys":singleNum_keys,
  1078. "contain_keys":[],
  1079. "multiNum_keys":[]}
  1080. list_rules.append(_rule)
  1081. if win_tenderer!="" and agency!="":
  1082. confidence=80
  1083. _dict = {document_tmp_docchannel:docchannel,
  1084. "win_tenderer":win_tenderer,
  1085. "agency":agency,
  1086. "product":product,
  1087. }
  1088. _dict.update(base_dict)
  1089. _dict["page_time"] = [page_time,page_time]
  1090. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1091. _rule = {"confidence":confidence,
  1092. "query":_query,
  1093. "singleNum_keys":singleNum_keys,
  1094. "contain_keys":[],
  1095. "multiNum_keys":[]}
  1096. list_rules.append(_rule)
  1097. if doctitle_refine!="" and product!="" and len(doctitle_refine)>7:
  1098. confidence=80
  1099. _dict = {document_tmp_docchannel:docchannel,
  1100. "doctitle_refine":doctitle_refine,
  1101. "product":product,
  1102. }
  1103. _dict.update(base_dict)
  1104. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1105. _rule = {"confidence":confidence,
  1106. "query":_query,
  1107. "singleNum_keys":singleNum_keys,
  1108. "contain_keys":[],
  1109. "multiNum_keys":[]}
  1110. list_rules.append(_rule)
  1111. return list_rules
  1112. def dumplicate_fianl_check(self,base_list):
  1113. the_group = base_list
  1114. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  1115. if len(the_group)>10:
  1116. keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget","doctitle_refine"]
  1117. else:
  1118. keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget"]
  1119. #置信度
  1120. list_key_index = []
  1121. for _k in keys:
  1122. if _k=="doctitle":
  1123. list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
  1124. else:
  1125. list_key_index.append(getDiffIndex(the_group,_k))
  1126. _index = min(list_key_index)
  1127. if _index>1:
  1128. return the_group[:_index]
  1129. return []
  1130. def get_best_docid(self,base_list):
  1131. to_reverse = False
  1132. dict_source_count = {}
  1133. for _item in base_list:
  1134. _web_source = _item.get(document_tmp_web_source_no)
  1135. _fingerprint = _item.get(document_tmp_fingerprint)
  1136. if _web_source is not None:
  1137. if _web_source not in dict_source_count:
  1138. dict_source_count[_web_source] = set()
  1139. dict_source_count[_web_source].add(_fingerprint)
  1140. if len(dict_source_count[_web_source])>=2:
  1141. to_reverse=True
  1142. if len(base_list)>0:
  1143. base_list.sort(key=lambda x:x["docid"],reverse=to_reverse)
  1144. base_list.sort(key=lambda x:x.get(document_attachment_extract_status,0),reverse=True)
  1145. base_list.sort(key=lambda x:x["extract_count"],reverse=True)
  1146. return base_list[0]["docid"]
  1147. def save_dumplicate(self,base_list,best_docid,status_from,status_to):
  1148. #best_docid need check while others can save directly
  1149. list_dict = []
  1150. for item in base_list:
  1151. docid = item["docid"]
  1152. _dict = {"partitionkey":item["partitionkey"],
  1153. "docid":item["docid"]}
  1154. if docid==best_docid:
  1155. if item.get("save",1)!=0:
  1156. _dict["save"] = 1
  1157. else:
  1158. _dict["save"] = 0
  1159. if item.get("status")>=status_from[0] and item.get("status")<=status_from[1]:
  1160. _dict["status"] = random.randint(status_to[0],status_to[1])
  1161. list_dict.append(_dict)
  1162. for _dict in list_dict:
  1163. dtmp = Document_tmp(_dict)
  1164. dtmp.update_row(self.ots_client)
  1165. def flow_test(self,status_to=[1,10]):
  1166. def producer():
  1167. bool_query = BoolQuery(must_queries=[
  1168. # ExistsQuery("docid"),
  1169. # RangeQuery("crtime",range_to='2022-04-10'),
  1170. # RangeQuery("status",61),
  1171. NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
  1172. ],
  1173. must_not_queries=[
  1174. # NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
  1175. TermQuery("attachment_extract_status",1),
  1176. RangeQuery("status",1,11)
  1177. ]
  1178. )
  1179. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1180. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1181. ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1182. log("flow_init producer total_count:%d"%total_count)
  1183. list_dict = getRow_ots(rows)
  1184. for _dict in list_dict:
  1185. self.queue_init.put(_dict)
  1186. _count = len(list_dict)
  1187. while next_token and _count<1000000:
  1188. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1189. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1190. ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1191. list_dict = getRow_ots(rows)
  1192. for _dict in list_dict:
  1193. self.queue_init.put(_dict)
  1194. _count += len(list_dict)
  1195. print("%d/%d"%(_count,total_count))
  1196. def comsumer():
  1197. mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
  1198. mt.run()
  1199. def comsumer_handle(item,result_queue,ots_client):
  1200. # print(item)
  1201. dtmp = Document_tmp(item)
  1202. dtmp.setValue(document_tmp_status,random.randint(*status_to),True)
  1203. dtmp.update_row(ots_client)
  1204. # dhtml = Document_html(item)
  1205. # dhtml.update_row(ots_client)
  1206. # dtmp.delete_row(ots_client)
  1207. # dhtml.delete_row(ots_client)
  1208. producer()
  1209. comsumer()
  1210. def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
  1211. def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1212. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
  1213. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1214. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1215. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1216. log("flow_dumplicate producer total_count:%d"%total_count)
  1217. list_dict = getRow_ots(rows)
  1218. for _dict in list_dict:
  1219. self.queue_dumplicate.put(_dict)
  1220. _count = len(list_dict)
  1221. while next_token and _count<flow_process_count:
  1222. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1223. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1224. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1225. list_dict = getRow_ots(rows)
  1226. for _dict in list_dict:
  1227. self.queue_dumplicate.put(_dict)
  1228. _count += len(list_dict)
  1229. def comsumer():
  1230. mt = MultiThreadHandler(self.queue_dumplicate,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1231. mt.run()
  1232. def comsumer_handle(item,result_queue,ots_client):
  1233. self.post_extract(item)
  1234. base_list = []
  1235. set_docid = set()
  1236. list_rules = self.translate_dumplicate_rules(flow_dumplicate_status_from,item)
  1237. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  1238. # print(item,"len_rules",len(list_rules))
  1239. for _rule in list_rules:
  1240. _query = _rule["query"]
  1241. confidence = _rule["confidence"]
  1242. singleNum_keys = _rule["singleNum_keys"]
  1243. contain_keys = _rule["contain_keys"]
  1244. multiNum_keys = _rule["multiNum_keys"]
  1245. self.add_data_by_query(item,base_list,set_docid,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys)
  1246. item["confidence"] = 999
  1247. if item.get(document_tmp_docid) not in set_docid:
  1248. base_list.append(item)
  1249. final_list = self.dumplicate_fianl_check(base_list)
  1250. best_docid = self.get_best_docid(final_list)
  1251. # log(str(final_list))
  1252. _d = {"partitionkey":item["partitionkey"],
  1253. "docid":item["docid"],
  1254. "status":random.randint(*flow_dumplicate_status_to),
  1255. document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  1256. }
  1257. dtmp = Document_tmp(_d)
  1258. dup_docid = set()
  1259. for _dict in final_list:
  1260. dup_docid.add(_dict.get(document_tmp_docid))
  1261. if item.get(document_tmp_docid) in dup_docid:
  1262. dup_docid.remove(item.get(document_tmp_docid))
  1263. if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
  1264. dtmp.setValue(document_tmp_save,1,True)
  1265. dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
  1266. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1267. else:
  1268. dtmp.setValue(document_tmp_save,0,True)
  1269. if best_docid in dup_docid:
  1270. dup_docid.remove(best_docid)
  1271. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1272. dmp_docid = "%d,%s"%(best_docid,dmp_docid)
  1273. else:
  1274. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1275. dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
  1276. dtmp.update_row(self.ots_client)
  1277. #只保留当前公告
  1278. # self.save_dumplicate(final_list,best_docid,status_from,status_to)
  1279. #
  1280. # print("=base=",item)
  1281. # if len(final_list)>=1:
  1282. # print("==================")
  1283. # for _dict in final_list:
  1284. # print(_dict)
  1285. # print("========>>>>>>>>>>")
  1286. producer()
  1287. comsumer()
  1288. def merge_document(self,item,status_to=None):
  1289. self.post_extract(item)
  1290. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  1291. _d = {"partitionkey":item["partitionkey"],
  1292. "docid":item["docid"],
  1293. }
  1294. dtmp = Document_tmp(_d)
  1295. if item.get(document_tmp_save,1)==1:
  1296. list_should_q = []
  1297. if project_code!="" and tenderee!="":
  1298. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1299. TermQuery("tenderee",tenderee)])
  1300. list_should_q.append(_q)
  1301. if project_name!="" and project_code!="":
  1302. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1303. TermQuery("project_name",project_name)])
  1304. list_should_q.append(_q)
  1305. if len(list_should_q)>0:
  1306. list_data = self.search_data_by_query(item,list_should_q,100,merge=True,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
  1307. if len(list_data)==1:
  1308. dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
  1309. print(item["docid"],list_data[0]["uuid"])
  1310. else:
  1311. list_should_q = []
  1312. if bidding_budget!="" and project_code!="":
  1313. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1314. TermQuery("bidding_budget",float(bidding_budget))])
  1315. list_should_q.append(_q)
  1316. if tenderee!="" and bidding_budget!="" and project_name!="":
  1317. _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
  1318. TermQuery("bidding_budget",float(bidding_budget)),
  1319. TermQuery("project_name",project_name)])
  1320. list_should_q.append(_q)
  1321. if tenderee!="" and win_bid_price!="" and project_name!="":
  1322. _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
  1323. TermQuery("win_bid_price",float(win_bid_price)),
  1324. TermQuery("project_name",project_name)])
  1325. list_should_q.append(_q)
  1326. if len(list_should_q)>0:
  1327. list_data = self.search_data_by_query(item,list_should_q,100,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
  1328. if len(list_data)==1:
  1329. dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
  1330. print(item["docid"],list_data[0]["uuid"])
  1331. return dtmp.getProperties().get("merge_uuid","")
  1332. # dtmp.update_row(self.ots_client)
  1333. def test_merge(self):
  1334. import pandas as pd
  1335. import queue
  1336. def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1337. list_test_item = []
  1338. should_q = BoolQuery(should_queries=[
  1339. TermQuery("docchannel",101),
  1340. TermQuery("docchannel",119),
  1341. TermQuery("docchannel",120)
  1342. ])
  1343. bool_query = BoolQuery(must_queries=[
  1344. TermQuery("page_time","2022-04-22"),
  1345. should_q,
  1346. TermQuery("save",1)
  1347. ])
  1348. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1349. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1350. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1351. log("flow_dumplicate producer total_count:%d"%total_count)
  1352. list_dict = getRow_ots(rows)
  1353. for _dict in list_dict:
  1354. list_test_item.append(_dict)
  1355. _count = len(list_dict)
  1356. while next_token:
  1357. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1358. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1359. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1360. list_dict = getRow_ots(rows)
  1361. for _dict in list_dict:
  1362. list_test_item.append(_dict)
  1363. _count += len(list_dict)
  1364. print("%d/%d"%(_count,total_count))
  1365. return list_test_item
  1366. from BaseDataMaintenance.model.ots.project import Project
  1367. def comsumer_handle(item,result_queue,ots_client):
  1368. item["merge_uuid"] = self.merge_document(item)
  1369. if item["merge_uuid"]!="":
  1370. _dict = {"uuid":item["merge_uuid"]}
  1371. _p = Project(_dict)
  1372. _p.fix_columns(self.ots_client,["zhao_biao_page_time"],True)
  1373. if _p.getProperties().get("zhao_biao_page_time","")!="":
  1374. item["是否有招标"] = "是"
  1375. list_test_item = producer()
  1376. task_queue = queue.Queue()
  1377. for item in list_test_item:
  1378. task_queue.put(item)
  1379. mt = MultiThreadHandler(task_queue,comsumer_handle,None,30,1,ots_client=self.ots_client)
  1380. mt.run()
  1381. keys = [document_tmp_docid,document_tmp_docchannel,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_doctitle_refine,"win_tenderer","bidding_budget","win_bid_price","merge_uuid","是否有招标"]
  1382. df_data = {}
  1383. for k in keys:
  1384. df_data[k] = []
  1385. for item in list_test_item:
  1386. for k in keys:
  1387. df_data[k].append(item.get(k,""))
  1388. df = pd.DataFrame(df_data)
  1389. df.to_excel("test_merge.xlsx",columns=keys)
  1390. def flow_merge(self,process_count=10000,status_from=[71,80],status_to=[81,90]):
  1391. def producer(columns=[document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1392. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
  1393. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1394. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1395. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1396. log("flow_merge producer total_count:%d"%total_count)
  1397. list_dict = getRow_ots(rows)
  1398. for _dict in list_dict:
  1399. self.queue_merge.put(_dict)
  1400. _count = len(list_dict)
  1401. while next_token and _count<process_count:
  1402. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1403. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1404. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1405. list_dict = getRow_ots(rows)
  1406. for _dict in list_dict:
  1407. self.queue_merge.put(_dict)
  1408. _count += len(list_dict)
  1409. def comsumer():
  1410. mt = MultiThreadHandler(self.queue_merge,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1411. mt.run()
  1412. def comsumer_handle(item,result_queue,ots_client):
  1413. self.merge_document(item,status_to)
  1414. # producer()
  1415. # comsumer()
  1416. pass
  1417. def flow_syncho(self,status_from=[71,80],status_to=[81,90]):
  1418. pass
  1419. def flow_remove(self,process_count=flow_process_count,status_from=flow_remove_status_from):
  1420. def producer():
  1421. current_date = getCurrent_date("%Y-%m-%d")
  1422. tmp_date = timeAdd(current_date,-4)
  1423. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True),
  1424. RangeQuery(document_tmp_crtime,range_to="%s 00:00:00"%(tmp_date))])
  1425. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1426. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1427. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1428. log("flow_remove producer total_count:%d"%total_count)
  1429. list_dict = getRow_ots(rows)
  1430. for _dict in list_dict:
  1431. self.queue_remove.put(_dict)
  1432. _count = len(list_dict)
  1433. while next_token:
  1434. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1435. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1436. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1437. list_dict = getRow_ots(rows)
  1438. for _dict in list_dict:
  1439. self.queue_remove.put(_dict)
  1440. _count += len(list_dict)
  1441. def comsumer():
  1442. mt = MultiThreadHandler(self.queue_remove,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1443. mt.run()
  1444. def comsumer_handle(item,result_queue,ots_client):
  1445. dtmp = Document_tmp(item)
  1446. dtmp.delete_row(self.ots_client)
  1447. dhtml = Document_html(item)
  1448. dhtml.delete_row(self.ots_client)
  1449. producer()
  1450. comsumer()
  1451. def start_flow_dumplicate(self):
  1452. schedule = BlockingScheduler()
  1453. schedule.add_job(self.flow_remove,"cron",hour="20")
  1454. schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
  1455. schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
  1456. schedule.start()
  1457. def flow_remove_project_tmp(self,process_count=flow_process_count):
  1458. def producer():
  1459. current_date = getCurrent_date("%Y-%m-%d")
  1460. tmp_date = timeAdd(current_date,-6*31)
  1461. bool_query = BoolQuery(must_queries=[
  1462. RangeQuery(project_page_time,range_to="%s"%(tmp_date))])
  1463. rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2_tmp","project2_tmp_index",
  1464. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time")]),limit=100,get_total_count=True),
  1465. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1466. log("flow_remove project2_tmp producer total_count:%d"%total_count)
  1467. list_dict = getRow_ots(rows)
  1468. for _dict in list_dict:
  1469. self.queue_remove_project.put(_dict)
  1470. _count = len(list_dict)
  1471. while next_token:
  1472. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1473. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1474. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1475. list_dict = getRow_ots(rows)
  1476. for _dict in list_dict:
  1477. self.queue_remove_project.put(_dict)
  1478. _count += len(list_dict)
  1479. def comsumer():
  1480. mt = MultiThreadHandler(self.queue_remove_project,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1481. mt.run()
  1482. def comsumer_handle(item,result_queue,ots_client):
  1483. ptmp = Project_tmp(item)
  1484. ptmp.delete_row(self.ots_client)
  1485. producer()
  1486. comsumer()
  1487. def start_flow_merge(self):
  1488. schedule = BlockingScheduler()
  1489. schedule.add_job(self.flow_merge,"cron",second="*/10")
  1490. schedule.start()
  1491. def download_attachment():
  1492. ots_client = getConnect_ots()
  1493. queue_attachment = Queue()
  1494. auth = getAuth()
  1495. oss2.defaults.connection_pool_size = 100
  1496. oss2.defaults.multiget_num_threads = 20
  1497. attachment_bucket_name = "attachment-hub"
  1498. if is_internal:
  1499. bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  1500. else:
  1501. bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  1502. bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name)
  1503. current_path = os.path.dirname(__file__)
  1504. def producer():
  1505. columns = [document_tmp_attachment_path]
  1506. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_crtime,"2022-03-29 15:00:00","2022-03-29 17:00:00",True,True)])
  1507. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1508. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
  1509. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1510. log("flow_attachment producer total_count:%d"%total_count)
  1511. list_dict = getRow_ots(rows)
  1512. for _dict in list_dict:
  1513. queue_attachment.put(_dict)
  1514. _count = len(list_dict)
  1515. while next_token:
  1516. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1517. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1518. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1519. list_dict = getRow_ots(rows)
  1520. for _dict in list_dict:
  1521. queue_attachment.put(_dict)
  1522. _count += len(list_dict)
  1523. def comsumer():
  1524. mt = MultiThreadHandler(queue_attachment,comsumer_handle,None,10,1)
  1525. mt.run()
  1526. def getAttachments(list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
  1527. list_attachment = []
  1528. rows_to_get = []
  1529. for _md5 in list_filemd5[:50]:
  1530. if _md5 is None:
  1531. continue
  1532. primary_key = [(attachment_filemd5,_md5)]
  1533. rows_to_get.append(primary_key)
  1534. req = BatchGetRowRequest()
  1535. req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
  1536. try:
  1537. result = ots_client.batch_get_row(req)
  1538. attach_result = result.get_result_by_table(attachment_table_name)
  1539. for item in attach_result:
  1540. if item.is_ok:
  1541. _dict = getRow_ots_primary(item.row)
  1542. if _dict is not None:
  1543. list_attachment.append(attachment(_dict))
  1544. except Exception as e:
  1545. log(str(list_filemd5))
  1546. log("attachProcess comsumer error %s"%str(e))
  1547. return list_attachment
  1548. def comsumer_handle(item,result_queue):
  1549. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  1550. if len(page_attachments)==0:
  1551. pass
  1552. else:
  1553. list_fileMd5 = []
  1554. for _atta in page_attachments:
  1555. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  1556. list_attach = getAttachments(list_fileMd5)
  1557. for attach in list_attach:
  1558. filemd5 = attach.getProperties().get(attachment_filemd5)
  1559. _status = attach.getProperties().get(attachment_status)
  1560. _filetype = attach.getProperties().get(attachment_filetype)
  1561. _size = attach.getProperties().get(attachment_size)
  1562. _path = attach.getProperties().get(attachment_path)
  1563. _uuid = uuid4()
  1564. objectPath = attach.getProperties().get(attachment_path)
  1565. localpath = os.path.join(current_path,"download","%s.%s"%(filemd5,_filetype))
  1566. try:
  1567. if _size>ATTACHMENT_LARGESIZE:
  1568. pass
  1569. else:
  1570. downloadFile(bucket,objectPath,localpath)
  1571. except Exception as e:
  1572. traceback.print_exc()
  1573. producer()
  1574. comsumer()
  1575. def test_attachment_interface():
  1576. current_path = os.path.dirname(__file__)
  1577. task_queue = Queue()
  1578. def producer():
  1579. _count = 0
  1580. list_filename = os.listdir(os.path.join(current_path,"download"))
  1581. for _filename in list_filename:
  1582. _count += 1
  1583. _type = _filename.split(".")[1]
  1584. task_queue.put({"path":os.path.join(current_path,"download",_filename),"file_type":_type})
  1585. if _count>=500:
  1586. break
  1587. def comsumer():
  1588. mt = MultiThreadHandler(task_queue,comsumer_handle,None,10)
  1589. mt.run()
  1590. def comsumer_handle(item,result_queue):
  1591. _path = item.get("path")
  1592. _type = item.get("file_type")
  1593. _data_base64 = base64.b64encode(open(_path,"rb").read())
  1594. #调用接口处理结果
  1595. start_time = time.time()
  1596. _success,_html,swf_images = getAttachDealInterface(_data_base64,_type)
  1597. log("%s result:%s takes:%d"%(_path,str(_success),time.time()-start_time))
  1598. producer()
  1599. comsumer()
  1600. class Dataflow_attachment(Dataflow):
  1601. def __init__(self):
  1602. Dataflow.__init__(self)
  1603. self.process_list_thread = []
  1604. def flow_attachment_process(self):
  1605. self.process_comsumer()
  1606. def monitor_attachment_process(self):
  1607. alive_count = 0
  1608. for _t in self.process_list_thread:
  1609. if _t.is_alive():
  1610. alive_count += 1
  1611. log("attachment_process alive:%d total:%d"%(alive_count,len(self.process_list_thread)))
  1612. def process_comsumer(self):
  1613. if len(self.process_list_thread)==0:
  1614. thread_count = 60
  1615. for i in range(thread_count):
  1616. self.process_list_thread.append(Thread(target=self.process_comsumer_handle))
  1617. for t in self.process_list_thread:
  1618. t.start()
  1619. while 1:
  1620. failed_count = 0
  1621. for _i in range(len(self.process_list_thread)):
  1622. t = self.process_list_thread[_i]
  1623. if not t.is_alive():
  1624. failed_count += 1
  1625. self.prcess_list_thread[_i] = Thread(target=self.process_comsumer_handle)
  1626. self.prcess_list_thread[_i].start()
  1627. if failed_count>0:
  1628. log("attachment failed %d"%(failed_count))
  1629. time.sleep(5)
  1630. def process_comsumer_handle(self):
  1631. while 1:
  1632. _flag = False
  1633. log("attachment handle:%s"%str(threading.get_ident()))
  1634. try:
  1635. item = self.queue_attachment_ocr.get(True,timeout=0.2)
  1636. log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
  1637. self.attachment_recognize(item,None)
  1638. log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
  1639. except Exception as e:
  1640. _flag = True
  1641. pass
  1642. try:
  1643. item = self.queue_attachment_not_ocr.get(True,timeout=0.2)
  1644. log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
  1645. self.attachment_recognize(item,None)
  1646. log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
  1647. except Exception as e:
  1648. _flag = True and _flag
  1649. pass
  1650. if _flag:
  1651. time.sleep(2)
  1652. def attachment_recognize(self,_dict,result_queue):
  1653. item = _dict.get("item")
  1654. list_attach = _dict.get("list_attach")
  1655. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1656. "docid":item.get("docid")})
  1657. dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
  1658. _dochtmlcon = dhtml.getProperties().get("dochtmlcon","")
  1659. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  1660. log(str(swf_urls))
  1661. if not _succeed:
  1662. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  1663. else:
  1664. dhtml.updateSWFImages(swf_urls)
  1665. dhtml.updateAttachment(list_html)
  1666. dhtml.update_row(self.ots_client)
  1667. item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
  1668. item[document_tmp_attachment_extract_status] = 1
  1669. log("document:%d get attachments with result:%s"%(item.get("docid"),str(_succeed)))
  1670. dtmp = Document_tmp(item)
  1671. dtmp.update_row(self.ots_client)
  1672. def flow_attachment(self):
  1673. self.flow_attachment_producer()
  1674. self.flow_attachment_producer_comsumer()
  1675. def getAttachments(self,list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
  1676. list_attachment = []
  1677. rows_to_get = []
  1678. for _md5 in list_filemd5[:50]:
  1679. if _md5 is None:
  1680. continue
  1681. primary_key = [(attachment_filemd5,_md5)]
  1682. rows_to_get.append(primary_key)
  1683. req = BatchGetRowRequest()
  1684. req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
  1685. try:
  1686. result = self.ots_client.batch_get_row(req)
  1687. attach_result = result.get_result_by_table(attachment_table_name)
  1688. for item in attach_result:
  1689. if item.is_ok:
  1690. _dict = getRow_ots_primary(item.row)
  1691. if _dict is not None:
  1692. list_attachment.append(attachment(_dict))
  1693. except Exception as e:
  1694. log(str(list_filemd5))
  1695. log("attachProcess comsumer error %s"%str(e))
  1696. return list_attachment
  1697. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  1698. qsize_ocr = self.queue_attachment_ocr.qsize()
  1699. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  1700. log("queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(qsize_ocr,qsize_not_ocr))
  1701. #选择加入数据场景
  1702. if min(qsize_ocr,qsize_not_ocr)>200 or max(qsize_ocr,qsize_not_ocr)>1000:
  1703. return
  1704. #去重
  1705. set_docid = set()
  1706. set_docid = set_docid | set(self.list_attachment_ocr) | set(self.list_attachment_not_ocr)
  1707. if qsize_ocr>0:
  1708. self.list_attachment_ocr = self.list_attachment_ocr[-qsize_ocr:]
  1709. else:
  1710. self.list_attachment_ocr = []
  1711. if qsize_not_ocr>0:
  1712. self.list_attachment_not_ocr = self.list_attachment_not_ocr[-qsize_not_ocr:]
  1713. else:
  1714. self.list_attachment_not_ocr = []
  1715. try:
  1716. bool_query = BoolQuery(must_queries=[
  1717. RangeQuery(document_tmp_status,*flow_attachment_status_from,True,True),
  1718. # TermQuery(document_tmp_docid,234925191),
  1719. ])
  1720. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1721. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
  1722. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1723. log("flow_attachment producer total_count:%d"%total_count)
  1724. list_dict = getRow_ots(rows)
  1725. _count = 0
  1726. for _dict in list_dict:
  1727. docid = _dict.get(document_tmp_docid)
  1728. if docid in set_docid:
  1729. continue
  1730. self.queue_attachment.put(_dict,True)
  1731. _count += 1
  1732. while next_token and _count<flow_process_count:
  1733. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1734. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1735. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1736. list_dict = getRow_ots(rows)
  1737. for _dict in list_dict:
  1738. docid = _dict.get(document_tmp_docid)
  1739. if docid in set_docid:
  1740. continue
  1741. self.queue_attachment.put(_dict,True)
  1742. _count += 1
  1743. log("add attachment count:%d"%(_count))
  1744. except Exception as e:
  1745. log("flow attachment producer error:%s"%(str(e)))
  1746. traceback.print_exc()
  1747. def flow_attachment_producer_comsumer(self):
  1748. log("start flow_attachment comsumer")
  1749. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1)
  1750. mt.run()
  1751. def set_queue(self,_dict):
  1752. list_attach = _dict.get("list_attach")
  1753. to_ocr = False
  1754. for attach in list_attach:
  1755. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  1756. to_ocr = True
  1757. break
  1758. if to_ocr:
  1759. self.queue_attachment_ocr.put(_dict,True)
  1760. # self.list_attachment_ocr.append(_dict.get("item").get(document_tmp_docid))
  1761. else:
  1762. self.queue_attachment_not_ocr.put(_dict,True)
  1763. # self.list_attachment_not_ocr.append(_dict.get("item").get(document_tmp_docid))
  1764. def comsumer_handle(self,item,result_queue):
  1765. try:
  1766. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  1767. if len(page_attachments)==0:
  1768. item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
  1769. dtmp = Document_tmp(item)
  1770. dtmp.update_row(self.ots_client)
  1771. else:
  1772. list_fileMd5 = []
  1773. for _atta in page_attachments:
  1774. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  1775. list_attach = self.getAttachments(list_fileMd5)
  1776. #未上传成功的2小时内不处理
  1777. if len(page_attachments)!=len(list_attach) and time.mktime(time.localtime())-time.mktime(time.strptime(item.get(document_tmp_crtime),"%Y-%m-%d %H:%M:%S"))<7200:
  1778. item[document_tmp_status] = 1
  1779. dtmp = Document_tmp(item)
  1780. dtmp.update_row(self.ots_client)
  1781. return
  1782. self.set_queue({"item":item,"list_attach":list_attach})
  1783. except Exception as e:
  1784. traceback.print_exc()
  1785. def start_flow_attachment(self):
  1786. schedule = BlockingScheduler()
  1787. schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  1788. schedule.add_job(self.flow_attachment,"cron",second="*/10")
  1789. schedule.start()
  1790. class Dataflow_extract(Dataflow):
  1791. def __init__(self):
  1792. Dataflow.__init__(self)
  1793. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  1794. q_size = self.queue_extract.qsize()
  1795. if q_size>100:
  1796. return
  1797. set_docid = set(self.list_extract)
  1798. if q_size>0:
  1799. self.list_extract = self.list_extract[-q_size:]
  1800. else:
  1801. self.list_extract = []
  1802. try:
  1803. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*flow_extract_status_from,True,True)])
  1804. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1805. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.ASC)]),limit=100,get_total_count=True),
  1806. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1807. log("flow_extract producer total_count:%d"%total_count)
  1808. list_dict = getRow_ots(rows)
  1809. for _dict in list_dict:
  1810. docid = _dict.get(document_tmp_docid)
  1811. if docid in set_docid:
  1812. self.list_extract.insert(0,docid)
  1813. continue
  1814. else:
  1815. self.queue_extract.put(_dict)
  1816. self.list_extract.append(docid)
  1817. _count = len(list_dict)
  1818. while next_token and _count<flow_process_count:
  1819. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1820. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1821. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1822. list_dict = getRow_ots(rows)
  1823. for _dict in list_dict:
  1824. docid = _dict.get(document_tmp_docid)
  1825. if docid in set_docid:
  1826. self.list_extract.insert(0,docid)
  1827. continue
  1828. else:
  1829. self.queue_extract.put(_dict)
  1830. self.list_extract.append(docid)
  1831. _count += len(list_dict)
  1832. except Exception as e:
  1833. log("flow extract producer error:%s"%(str(e)))
  1834. traceback.print_exc()
  1835. def flow_extract(self,):
  1836. self.comsumer()
  1837. def comsumer(self):
  1838. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,35,1,True)
  1839. mt.run()
  1840. def comsumer_handle(self,item,result_queue):
  1841. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1842. "docid":item.get("docid")})
  1843. dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
  1844. item[document_tmp_dochtmlcon] = dhtml.getProperties().get(document_tmp_dochtmlcon,"")
  1845. _extract = Document_extract({})
  1846. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  1847. _extract.setValue(document_extract2_docid,item.get(document_docid))
  1848. all_done = 1
  1849. if all_done:
  1850. data = item
  1851. resp = requests.post(self.other_url,json=data,headers=self.header)
  1852. if (resp.status_code >=200 and resp.status_code<=210):
  1853. _extract.setValue(document_extract2_other_json,resp.content.decode("utf8"),True)
  1854. else:
  1855. all_done = -1
  1856. data = {}
  1857. for k,v in item.items():
  1858. data[k] = v
  1859. data["timeout"] = 240
  1860. data["doc_id"] = data.get(document_tmp_docid)
  1861. data["content"] = data.get(document_tmp_dochtmlcon,"")
  1862. if document_tmp_dochtmlcon in data:
  1863. data.pop(document_tmp_dochtmlcon)
  1864. data["title"] = data.get(document_tmp_doctitle,"")
  1865. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  1866. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  1867. if all_done:
  1868. resp = requests.post(self.extract_url,json=data,headers=self.header)
  1869. if (resp.status_code >=200 and resp.status_code<=210):
  1870. _extract.setValue(document_extract2_extract_json,resp.content.decode("utf8"),True)
  1871. else:
  1872. all_done = -2
  1873. if all_done:
  1874. resp = requests.post(self.industy_url,json=data,headers=self.header)
  1875. if (resp.status_code >=200 and resp.status_code<=210):
  1876. _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  1877. else:
  1878. all_done = -3
  1879. _dict = {document_partitionkey:item.get(document_tmp_partitionkey),
  1880. document_docid:item.get(document_tmp_docid),
  1881. }
  1882. dtmp = Document_tmp(_dict)
  1883. if all_done!=1:
  1884. sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  1885. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_failed_to),True)
  1886. dtmp.update_row(self.ots_client)
  1887. else:
  1888. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1889. dtmp.update_row(self.ots_client)
  1890. # 插入接口表,上线放开
  1891. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1892. _extract.update_row(self.ots_client)
  1893. log("process docid:%d %s"%(data["doc_id"],str(all_done)))
  1894. def start_flow_extract(self):
  1895. schedule = BlockingScheduler()
  1896. schedule.add_job(self.flow_extract_producer,"cron",second="*/10")
  1897. schedule.add_job(self.flow_extract,"cron",second="*/10")
  1898. schedule.start()
  1899. class Dataflow_dumplicate(Dataflow):
  1900. class DeleteListener():
  1901. def __init__(self,conn,_func,*args,**kwargs):
  1902. self.conn = conn
  1903. self._func = _func
  1904. def on_error(self, headers,*args,**kwargs):
  1905. log('received an error %s' % str(headers.body))
  1906. def on_message(self, headers,*args,**kwargs):
  1907. try:
  1908. message_id = headers.headers["message-id"]
  1909. body = headers.body
  1910. log("get message %s"%(message_id))
  1911. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  1912. except Exception as e:
  1913. traceback.print_exc()
  1914. pass
  1915. def __del__(self):
  1916. self.conn.disconnect()
  1917. def __init__(self,start_delete_listener=True):
  1918. Dataflow.__init__(self,)
  1919. self.c_f_get_extractCount = f_get_extractCount()
  1920. self.c_f_get_package = f_get_package()
  1921. logging.basicConfig(level = logging.info,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  1922. self.fix_doc_docid = None
  1923. self.bdm = BaseDataMonitor()
  1924. if start_delete_listener:
  1925. self.delete_comsumer_counts = 2
  1926. self.doc_delete_queue = "/queue/doc_delete_queue"
  1927. self.doc_delete_result = "/queue/doc_delete_result"
  1928. self.pool_mq_ali = ConnectorPool(1,10,getConnect_activateMQ_ali)
  1929. for _ in range(self.delete_comsumer_counts):
  1930. conn = getConnect_activateMQ_ali()
  1931. listener = self.DeleteListener(conn,self.delete_doc_handle)
  1932. createComsumer(listener,self.doc_delete_queue)
  1933. def get_dict_time(self,_extract,keys=["time_bidclose","time_bidopen","time_bidstart","time_commencement","time_completion","time_earnestMoneyEnd","time_earnestMoneyStart","time_getFileEnd","time_getFileStart","time_publicityEnd","time_publicityStart","time_registrationEnd","time_registrationStart","time_release"]):
  1934. dict_time = {}
  1935. for k in keys:
  1936. dict_time[k] = _extract.get(k)
  1937. return dict_time
  1938. def post_extract(self,_dict):
  1939. win_tenderer,bidding_budget,win_bid_price,_ = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
  1940. _dict["win_tenderer"] = win_tenderer
  1941. _dict["bidding_budget"] = bidding_budget
  1942. _dict["win_bid_price"] = win_bid_price
  1943. extract_json = _dict.get(document_tmp_extract_json,"{}")
  1944. _extract = json.loads(extract_json)
  1945. _dict["product"] = ",".join(_extract.get("product",[]))
  1946. _dict["fingerprint"] = _extract.get("fingerprint","")
  1947. _dict["project_codes"] = _extract.get("code",[])
  1948. if len(_dict["project_codes"])>0:
  1949. _dict["project_code"] = _dict["project_codes"][0]
  1950. else:
  1951. _dict["project_code"] = ""
  1952. _dict["doctitle_refine"] = _extract.get("doctitle_refine","")
  1953. if _dict["doctitle_refine"]=="":
  1954. _dict["doctitle_refine"] = _dict.get("doctitle")
  1955. _dict["nlp_enterprise"] = str({"indoctextcon":_extract.get("nlp_enterprise",[]),
  1956. "notindoctextcon":_extract.get("nlp_enterprise_attachment",[])})
  1957. _dict["extract_count"] = self.c_f_get_extractCount.evaluate(extract_json)
  1958. _dict["package"] = self.c_f_get_package.evaluate(extract_json)
  1959. _dict["project_name"] = _extract.get("name","")
  1960. _dict["dict_time"] = self.get_dict_time(_extract)
  1961. def dumplicate_fianl_check(self,base_list,b_log=False):
  1962. the_group = base_list
  1963. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  1964. _index = 0
  1965. base_fingerprint = "None"
  1966. if len(base_list)>0:
  1967. base_fingerprint = base_list[0]["fingerprint"]
  1968. for _i in range(1,len(base_list)):
  1969. _dict1 = base_list[_i]
  1970. fingerprint_less = _dict1["fingerprint"]
  1971. _pass = True
  1972. if fingerprint_less==base_fingerprint:
  1973. _index = _i
  1974. continue
  1975. for _j in range(min(_i,10)):
  1976. _dict2 = base_list[_j]
  1977. _prob = self.dumplicate_check(_dict1,_dict2,_dict1.get("min_counts",10),b_log=b_log)
  1978. print("_prob:",_prob)
  1979. if _prob<=0.1:
  1980. _pass = False
  1981. break
  1982. log("checking index:%d %s %.2f"%(_i,str(_pass),_prob))
  1983. _index = _i
  1984. if not _pass:
  1985. _index -= 1
  1986. break
  1987. if _index>=1:
  1988. # #对重复入库的进行去重
  1989. # _l = the_group[:_index+1]
  1990. # set_fingerprint = set()
  1991. # final_l = []
  1992. # for _dict in _l:
  1993. # fingerprint_less = _dict["fingerprint"]
  1994. # if fingerprint_less in set_fingerprint:
  1995. # continue
  1996. # else:
  1997. # final_l.append(_dict)
  1998. # set_fingerprint.add(fingerprint_less)
  1999. return the_group[:_index+1]
  2000. return []
  2001. def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
  2002. document_less = _dict1
  2003. docid_less = _dict1["docid"]
  2004. docchannel_less = document_less["docchannel"]
  2005. page_time_less = document_less["page_time"]
  2006. doctitle_refine_less = document_less["doctitle_refine"]
  2007. project_codes_less = document_less["project_codes"]
  2008. nlp_enterprise_less = document_less["nlp_enterprise"]
  2009. tenderee_less = document_less["tenderee"]
  2010. agency_less = document_less["agency"]
  2011. win_tenderer_less = document_less["win_tenderer"]
  2012. bidding_budget_less = document_less["bidding_budget"]
  2013. win_bid_price_less = document_less["win_bid_price"]
  2014. product_less = document_less["product"]
  2015. package_less = document_less["package"]
  2016. json_time_less = document_less["dict_time"]
  2017. project_name_less = document_less["project_name"]
  2018. fingerprint_less = document_less["fingerprint"]
  2019. extract_count_less = document_less["extract_count"]
  2020. web_source_no_less = document_less.get("web_source_no")
  2021. province_less = document_less.get("province")
  2022. city_less = document_less.get("city")
  2023. district_less = document_less.get("district")
  2024. document_greater = _dict2
  2025. docid_greater = _dict2["docid"]
  2026. page_time_greater = document_greater["page_time"]
  2027. docchannel_greater = document_greater["docchannel"]
  2028. doctitle_refine_greater = document_greater["doctitle_refine"]
  2029. project_codes_greater = document_greater["project_codes"]
  2030. nlp_enterprise_greater = document_greater["nlp_enterprise"]
  2031. tenderee_greater = document_greater["tenderee"]
  2032. agency_greater = document_greater["agency"]
  2033. win_tenderer_greater = document_greater["win_tenderer"]
  2034. bidding_budget_greater = document_greater["bidding_budget"]
  2035. win_bid_price_greater = document_greater["win_bid_price"]
  2036. product_greater = document_greater["product"]
  2037. package_greater = document_greater["package"]
  2038. json_time_greater = document_greater["dict_time"]
  2039. project_name_greater = document_greater["project_name"]
  2040. fingerprint_greater = document_greater["fingerprint"]
  2041. extract_count_greater = document_greater["extract_count"]
  2042. web_source_no_greater = document_greater.get("web_source_no")
  2043. province_greater = document_greater.get("province")
  2044. city_greater = document_greater.get("city")
  2045. district_greater = document_greater.get("district")
  2046. hard_level=1
  2047. if web_source_no_less==web_source_no_greater=="17397-3":
  2048. hard_level=2
  2049. return check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level)
  2050. def dumplicate_check_bak(self,_dict1,_dict2,min_counts,b_log=False):
  2051. document_less = _dict1
  2052. docid_less = _dict1["docid"]
  2053. docchannel_less = document_less["docchannel"]
  2054. page_time_less = document_less["page_time"]
  2055. doctitle_refine_less = document_less["doctitle_refine"]
  2056. project_codes_less = document_less["project_codes"]
  2057. nlp_enterprise_less = document_less["nlp_enterprise"]
  2058. tenderee_less = document_less["tenderee"]
  2059. agency_less = document_less["agency"]
  2060. win_tenderer_less = document_less["win_tenderer"]
  2061. bidding_budget_less = document_less["bidding_budget"]
  2062. win_bid_price_less = document_less["win_bid_price"]
  2063. product_less = document_less["product"]
  2064. package_less = document_less["package"]
  2065. json_time_less = document_less["dict_time"]
  2066. project_name_less = document_less["project_name"]
  2067. fingerprint_less = document_less["fingerprint"]
  2068. extract_count_less = document_less["extract_count"]
  2069. document_greater = _dict2
  2070. docid_greater = _dict2["docid"]
  2071. page_time_greater = document_greater["page_time"]
  2072. doctitle_refine_greater = document_greater["doctitle_refine"]
  2073. project_codes_greater = document_greater["project_codes"]
  2074. nlp_enterprise_greater = document_greater["nlp_enterprise"]
  2075. tenderee_greater = document_greater["tenderee"]
  2076. agency_greater = document_greater["agency"]
  2077. win_tenderer_greater = document_greater["win_tenderer"]
  2078. bidding_budget_greater = document_greater["bidding_budget"]
  2079. win_bid_price_greater = document_greater["win_bid_price"]
  2080. product_greater = document_greater["product"]
  2081. package_greater = document_greater["package"]
  2082. json_time_greater = document_greater["dict_time"]
  2083. project_name_greater = document_greater["project_name"]
  2084. fingerprint_greater = document_greater["fingerprint"]
  2085. extract_count_greater = document_greater["extract_count"]
  2086. if fingerprint_less==fingerprint_greater:
  2087. return 1
  2088. same_count = 0
  2089. all_count = 8
  2090. if len(set(project_codes_less) & set(project_codes_greater))>0:
  2091. same_count += 1
  2092. if getLength(tenderee_less)>0 and tenderee_less==tenderee_greater:
  2093. same_count += 1
  2094. if getLength(agency_less)>0 and agency_less==agency_greater:
  2095. same_count += 1
  2096. if getLength(win_tenderer_less)>0 and win_tenderer_less==win_tenderer_greater:
  2097. same_count += 1
  2098. if getLength(bidding_budget_less)>0 and bidding_budget_less==bidding_budget_greater:
  2099. same_count += 1
  2100. if getLength(win_bid_price_less)>0 and win_bid_price_less==win_bid_price_greater:
  2101. same_count += 1
  2102. if getLength(project_name_less)>0 and project_name_less==project_name_greater:
  2103. same_count += 1
  2104. if getLength(doctitle_refine_less)>0 and (doctitle_refine_less==doctitle_refine_greater or doctitle_refine_less in doctitle_refine_greater or doctitle_refine_greater in doctitle_refine_less):
  2105. same_count += 1
  2106. base_prob = 0
  2107. if min_counts<3:
  2108. base_prob = 0.9
  2109. elif min_counts<5:
  2110. base_prob = 0.8
  2111. elif min_counts<8:
  2112. base_prob = 0.7
  2113. else:
  2114. base_prob = 0.6
  2115. _prob = base_prob*same_count/all_count
  2116. if _prob<0.1 and min(extract_count_less,extract_count_greater)<=3:
  2117. _prob = 0.15
  2118. if _prob<0.1:
  2119. return _prob
  2120. check_result = {"pass":1}
  2121. if docchannel_less in (51,102,103,104,115,116,117):
  2122. if doctitle_refine_less!=doctitle_refine_greater:
  2123. if page_time_less!=page_time_greater:
  2124. check_result["docchannel"] = 0
  2125. check_result["pass"] = 0
  2126. else:
  2127. check_result["docchannel"] = 2
  2128. if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater):
  2129. check_result["doctitle"] = 0
  2130. check_result["pass"] = 0
  2131. if b_log:
  2132. logging.info("%d-%d,check_doctitle_failed:%s==%s"%(docid_less,docid_greater,str(doctitle_refine_less),str(doctitle_refine_greater)))
  2133. else:
  2134. check_result["doctitle"] = 2
  2135. #added check
  2136. if not check_codes(project_codes_less,project_codes_greater):
  2137. check_result["code"] = 0
  2138. check_result["pass"] = 0
  2139. if b_log:
  2140. logging.info("%d-%d,check_code_failed:%s==%s"%(docid_less,docid_greater,str(project_codes_less),str(project_codes_greater)))
  2141. else:
  2142. if getLength(project_codes_less)>0 and getLength(project_codes_greater)>0 and len(set(project_codes_less) & set(project_codes_greater))>0:
  2143. check_result["code"] = 2
  2144. else:
  2145. check_result["code"] = 1
  2146. if not check_product(product_less,product_greater):
  2147. check_result["product"] = 0
  2148. check_result["pass"] = 0
  2149. if b_log:
  2150. logging.info("%d-%d,check_product_failed:%s==%s"%(docid_less,docid_greater,str(product_less),str(product_greater)))
  2151. else:
  2152. if getLength(product_less)>0 and getLength(product_greater)>0:
  2153. check_result["product"] = 2
  2154. else:
  2155. check_result["product"] = 1
  2156. if not check_demand():
  2157. check_result["pass"] = 0
  2158. if not check_entity(nlp_enterprise_less,nlp_enterprise_greater,
  2159. tenderee_less,tenderee_greater,
  2160. agency_less,agency_greater,
  2161. win_tenderer_less,win_tenderer_greater):
  2162. check_result["entity"] = 0
  2163. check_result["pass"] = 0
  2164. if b_log:
  2165. logging.info("%d-%d,check_entity_failed:%s==%s==%s==%s==%s==%s==%s==%s"%(docid_less,docid_greater,str(nlp_enterprise_less),str(nlp_enterprise_greater),str(tenderee_less),str(tenderee_greater),str(agency_less),str(agency_greater),str(win_tenderer_less),str(win_tenderer_greater)))
  2166. else:
  2167. if docchannel_less in (51,52,103,105,114,118) and getLength(tenderee_less)>0 and getLength(tenderee_greater)>0:
  2168. check_result["entity"] = 2
  2169. elif docchannel_less in (101,119,120) and getLength(win_tenderer_less)>0 and getLength(win_tenderer_greater)>0:
  2170. check_result["entity"] = 2
  2171. else:
  2172. check_result["entity"] = 1
  2173. if not check_money(bidding_budget_less,bidding_budget_greater,
  2174. win_bid_price_less,win_bid_price_greater):
  2175. if b_log:
  2176. logging.info("%d-%d,check_money_failed:%s==%s==%s==%s"%(docid_less,docid_greater,str(bidding_budget_less),str(bidding_budget_greater),str(win_bid_price_less),str(win_bid_price_greater)))
  2177. check_result["money"] = 0
  2178. check_result["pass"] = 0
  2179. else:
  2180. if docchannel_less in (51,52,103,105,114,118) and getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0:
  2181. check_result["money"] = 2
  2182. elif docchannel_less in (101,119,120) and getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0:
  2183. check_result["money"] = 2
  2184. else:
  2185. check_result["money"] = 1
  2186. #added check
  2187. if not check_package(package_less,package_greater):
  2188. if b_log:
  2189. logging.info("%d-%d,check_package_failed:%s==%s"%(docid_less,docid_greater,str(package_less),str(package_greater)))
  2190. check_result["package"] = 0
  2191. check_result["pass"] = 0
  2192. else:
  2193. if getLength(package_less)>0 and getLength(package_greater)>0:
  2194. check_result["package"] = 2
  2195. else:
  2196. check_result["package"] = 1
  2197. #added check
  2198. if not check_time(json_time_less,json_time_greater):
  2199. if b_log:
  2200. logging.info("%d-%d,check_time_failed:%s==%s"%(docid_less,docid_greater,str(json_time_less),str(json_time_greater)))
  2201. if isinstance(json_time_less,dict):
  2202. time_less = json_time_less
  2203. else:
  2204. time_less = json.loads(json_time_less)
  2205. if isinstance(json_time_greater,dict):
  2206. time_greater = json_time_greater
  2207. else:
  2208. time_greater = json.loads(json_time_greater)
  2209. for k,v in time_less.items():
  2210. if getLength(v)>0:
  2211. v1 = time_greater.get(k,"")
  2212. if getLength(v1)>0:
  2213. if v!=v1:
  2214. log("%d-%d,key:%s"%(docid_less,docid_greater,str(k)))
  2215. check_result["time"] = 0
  2216. check_result["pass"] = 0
  2217. else:
  2218. if getLength(json_time_less)>10 and getLength(json_time_greater)>10:
  2219. check_result["time"] = 2
  2220. else:
  2221. check_result["time"] = 1
  2222. if check_result.get("pass",0)==0:
  2223. if b_log:
  2224. logging.info(str(check_result))
  2225. if check_result.get("money",1)==0:
  2226. return 0
  2227. if check_result.get("entity",1)==2 and check_result.get("code",1)==2 and check_result.get("doctitle",2)==2 and check_result.get("product",2)==2 and check_result.get("money",0)==2:
  2228. return _prob
  2229. else:
  2230. return 0
  2231. if check_result.get("time",1)==0:
  2232. return 0
  2233. return _prob
  2234. def search_data_by_query(self,item,_query,confidence,retry_times=3,merge=False,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  2235. for _ in range(retry_times):
  2236. try:
  2237. _time = time.time()
  2238. check_time = 0
  2239. if isinstance(_query,list):
  2240. bool_query = BoolQuery(should_queries=_query)
  2241. else:
  2242. bool_query = _query
  2243. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  2244. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=30,get_total_count=True),
  2245. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2246. list_dict = getRow_ots(rows)
  2247. list_data = []
  2248. for _dict in list_dict:
  2249. self.post_extract(_dict)
  2250. _docid = _dict.get(document_tmp_docid)
  2251. if merge:
  2252. list_data.append(_dict)
  2253. else:
  2254. if _docid!=item.get(document_tmp_docid):
  2255. _time1 = time.time()
  2256. confidence = self.dumplicate_check(item,_dict,total_count,b_log=False)
  2257. check_time+= time.time()-_time1
  2258. _dict["confidence"] = confidence
  2259. _dict["min_counts"] = total_count
  2260. if not confidence<0.1:
  2261. list_data.append(_dict)
  2262. all_time = time.time()-_time
  2263. # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
  2264. return list_data
  2265. except Exception as e:
  2266. traceback.print_exc()
  2267. return []
  2268. def add_data_by_query(self,item,base_list,set_docid,_query,confidence,table_name,table_index,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  2269. list_dict = self.search_data_by_query(item,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
  2270. for _dict in list_dict:
  2271. _docid = _dict.get(document_tmp_docid)
  2272. confidence = _dict["confidence"]
  2273. print("confidence",_docid,confidence)
  2274. if confidence>0.1:
  2275. if _docid not in set_docid:
  2276. base_list.append(_dict)
  2277. set_docid.add(_docid)
  2278. set_docid.add(_docid)
  2279. def appendRule(self,list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=False):
  2280. for k,v in _dict.items():
  2281. if getLength(v)==0:
  2282. return
  2283. _dict.update(base_dict)
  2284. if b_log:
  2285. log(str(_dict))
  2286. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  2287. _rule = {"confidence":confidence,
  2288. "item":item,
  2289. "query":_query,
  2290. "singleNum_keys":[],
  2291. "contain_keys":[],
  2292. "multiNum_keys":[]}
  2293. list_rules.append(_rule)
  2294. def translate_dumplicate_rules(self,status_from,item,get_all=False,to_log=False):
  2295. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  2296. current_date = getCurrent_date("%Y-%m-%d")
  2297. if page_time=='':
  2298. page_time = current_date
  2299. if page_time>=timeAdd(current_date,-2):
  2300. table_name = "document_tmp"
  2301. table_index = "document_tmp_index"
  2302. base_dict = {
  2303. "docchannel":item.get("docchannel",52),
  2304. "status":[status_from[0]],
  2305. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  2306. }
  2307. must_not_dict = {"save":0,"docid":item.get("docid")}
  2308. doctitle_refine_name = "doctitle_refine"
  2309. else:
  2310. table_name = "document"
  2311. table_index = "document_index"
  2312. if get_all:
  2313. _status = [201,450]
  2314. else:
  2315. _status = [201,300]
  2316. base_dict = {
  2317. "docchannel":item["docchannel"],
  2318. "status":_status,
  2319. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  2320. }
  2321. must_not_dict = {"docid":item.get("docid")}
  2322. doctitle_refine_name = "doctitle"
  2323. list_rules = []
  2324. singleNum_keys = ["tenderee","win_tenderer"]
  2325. confidence = 100
  2326. self.appendRule(list_rules,{document_tmp_fingerprint:fingerprint},base_dict,must_not_dict,confidence,item,b_log=to_log)
  2327. confidence = 90
  2328. _dict = {document_tmp_agency:agency,
  2329. "win_tenderer":win_tenderer,
  2330. "win_bid_price":win_bid_price}
  2331. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2332. _dict = {document_tmp_agency:agency,
  2333. "win_tenderer":win_tenderer,
  2334. "bidding_budget":bidding_budget}
  2335. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2336. _dict = {document_tmp_agency:agency,
  2337. "win_bid_price":win_bid_price,
  2338. "bidding_budget":bidding_budget}
  2339. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2340. _dict = {win_tenderer:win_tenderer,
  2341. "win_bid_price":win_bid_price,
  2342. "bidding_budget":bidding_budget}
  2343. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2344. _dict = {"tenderee":tenderee,
  2345. "win_tenderer":win_tenderer,
  2346. "win_bid_price":win_bid_price}
  2347. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2348. _dict = {"tenderee":tenderee,
  2349. "win_tenderer":win_tenderer,
  2350. "bidding_budget":bidding_budget}
  2351. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2352. _dict = {"tenderee":tenderee,
  2353. "win_bid_price":win_bid_price,
  2354. "bidding_budget":bidding_budget}
  2355. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2356. _dict = {"tenderee":tenderee,
  2357. "agency":agency,
  2358. "win_tenderer":win_tenderer}
  2359. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2360. _dict = {"tenderee":tenderee,
  2361. "agency":agency,
  2362. "win_bid_price":win_bid_price}
  2363. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2364. _dict = {"tenderee":tenderee,
  2365. "agency":agency,
  2366. "bidding_budget":bidding_budget}
  2367. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2368. confidence=85
  2369. _dict = {"tenderee":tenderee,
  2370. "agency":agency
  2371. }
  2372. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2373. _dict = {"tenderee":tenderee,
  2374. "project_codes":project_code
  2375. }
  2376. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2377. _dict = {"tenderee":tenderee,
  2378. "project_name":project_name
  2379. }
  2380. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2381. if getLength(product)>0:
  2382. l_p = product.split(",")
  2383. _dict = {"tenderee":tenderee,
  2384. "product":l_p[0]
  2385. }
  2386. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2387. _dict = {"tenderee":tenderee,
  2388. "win_tenderer":win_tenderer
  2389. }
  2390. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2391. _dict = {"tenderee":tenderee,
  2392. "win_bid_price":win_bid_price
  2393. }
  2394. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2395. _dict = {"tenderee":tenderee,
  2396. "bidding_budget":bidding_budget
  2397. }
  2398. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2399. _dict = {"tenderee":tenderee,
  2400. doctitle_refine_name:doctitle_refine
  2401. }
  2402. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2403. _dict = {"agency":agency,
  2404. "project_codes":project_code
  2405. }
  2406. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2407. _dict = {"agency":agency,
  2408. "project_name":project_name
  2409. }
  2410. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2411. _dict = {"project_codes":project_code,
  2412. "project_name":project_name
  2413. }
  2414. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2415. _dict = {"project_codes":project_code,
  2416. "win_tenderer":win_tenderer
  2417. }
  2418. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2419. _dict = {"project_codes":project_code,
  2420. "win_bid_price":win_bid_price
  2421. }
  2422. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2423. _dict = {"project_codes":project_code,
  2424. "bidding_budget":bidding_budget
  2425. }
  2426. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2427. _dict = {"project_codes":project_code,
  2428. doctitle_refine_name:doctitle_refine
  2429. }
  2430. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2431. _dict = {"project_name":project_name,
  2432. "win_tenderer":win_tenderer
  2433. }
  2434. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2435. _dict = {"project_name":project_name,
  2436. "win_bid_price":win_bid_price
  2437. }
  2438. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2439. _dict = {"project_name":project_name,
  2440. "bidding_budget":bidding_budget
  2441. }
  2442. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2443. _dict = {"project_name":project_name,
  2444. doctitle_refine_name:doctitle_refine
  2445. }
  2446. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2447. _dict = {"win_tenderer":win_tenderer,
  2448. "win_bid_price":win_bid_price
  2449. }
  2450. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2451. _dict = {"win_tenderer":win_tenderer,
  2452. "bidding_budget":bidding_budget
  2453. }
  2454. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2455. _dict = {"win_tenderer":win_tenderer,
  2456. doctitle_refine_name:doctitle_refine
  2457. }
  2458. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2459. _dict = {"win_bid_price":win_bid_price,
  2460. "bidding_budget":bidding_budget
  2461. }
  2462. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2463. _dict = {"win_bid_price":win_bid_price,
  2464. doctitle_refine_name:doctitle_refine
  2465. }
  2466. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2467. _dict = {"bidding_budget":bidding_budget,
  2468. doctitle_refine_name:doctitle_refine
  2469. }
  2470. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2471. confidence=80
  2472. _dict = {doctitle_refine_name:doctitle_refine}
  2473. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2474. _dict = {"project_codes":project_code}
  2475. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2476. confidence=70
  2477. _dict = {"project_name":project_name}
  2478. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2479. return list_rules,table_name,table_index
  2480. def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district]):
  2481. q_size = self.queue_dumplicate.qsize()
  2482. log("dumplicate queue size %d"%(q_size))
  2483. while 1:
  2484. try:
  2485. docid = self.queue_dumplicate_processed.get(block=False)
  2486. if docid in self.dumplicate_set:
  2487. self.dumplicate_set.remove(docid)
  2488. except Exception as e:
  2489. break
  2490. if q_size>process_count//3:
  2491. return
  2492. bool_query = BoolQuery(must_queries=[
  2493. RangeQuery(document_tmp_status,*status_from,True,True),
  2494. # TermQuery("docid",271983871)
  2495. ])
  2496. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2497. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_update_document,SortOrder.DESC),FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
  2498. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2499. log("flow_dumplicate producer total_count:%d"%total_count)
  2500. list_dict = getRow_ots(rows)
  2501. for _dict in list_dict:
  2502. docid = _dict.get(document_tmp_docid)
  2503. if docid in self.dumplicate_set:
  2504. continue
  2505. self.dumplicate_set.add(docid)
  2506. self.queue_dumplicate.put(_dict)
  2507. _count = len(list_dict)
  2508. while next_token and _count<process_count:
  2509. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2510. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  2511. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2512. list_dict = getRow_ots(rows)
  2513. for _dict in list_dict:
  2514. docid = _dict.get(document_tmp_docid)
  2515. if docid in self.dumplicate_set:
  2516. continue
  2517. self.dumplicate_set.add(docid)
  2518. self.queue_dumplicate.put(_dict)
  2519. _count += len(list_dict)
  2520. # _l = list(self.dumplicate_set)
  2521. # _l.sort(key=lambda x:x,reverse=True)
  2522. # self.dumplicate_set = set(_l[:flow_process_count*2])
  2523. def comsumer_flow_dumplicate(self):
  2524. mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,60,1,ots_client=self.ots_client)
  2525. mt.run()
  2526. def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
  2527. self.producer_flow_dumplicate(process_count=process_count,status_from=status_from)
  2528. # self.comsumer_flow_dumplicate()
  2529. def flow_dumpcate_comsumer(self):
  2530. from multiprocessing import Process
  2531. process_count = 3
  2532. thread_count = 12
  2533. list_process = []
  2534. def start_thread():
  2535. mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,thread_count,1,need_stop=False,restart=True,timeout=600,ots_client=self.ots_client)
  2536. mt.run()
  2537. for _ in range(process_count):
  2538. p = Process(target=start_thread)
  2539. list_process.append(p)
  2540. for p in list_process:
  2541. p.start()
  2542. while 1:
  2543. for _i in range(len(list_process)):
  2544. p = list_process[_i]
  2545. if not p.is_alive():
  2546. p = Process(target=start_thread)
  2547. list_process[_i] = p
  2548. p.start()
  2549. time.sleep(1)
  2550. # mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,40,1,ots_client=self.ots_client)
  2551. # mt.run()
  2552. def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment]):
  2553. '''
  2554. 根据docid查询公告内容,先查询document_tmp,再查询document
  2555. :param list_docids:
  2556. :return:
  2557. '''
  2558. list_docs = []
  2559. set_fingerprint = set()
  2560. for _docid in list_docids:
  2561. docid = int(_docid)
  2562. _dict = {document_partitionkey:getPartitionKey(docid),
  2563. document_docid:docid}
  2564. _doc = Document_tmp(_dict)
  2565. _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
  2566. if not _exists:
  2567. _doc = Document(_dict)
  2568. _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
  2569. if _exists:
  2570. _fingerprint = _doc.getProperties().get(document_fingerprint)
  2571. if _fingerprint in set_fingerprint:
  2572. continue
  2573. set_fingerprint.add(_fingerprint)
  2574. list_docs.append(_doc)
  2575. for _doc in list_docs:
  2576. try:
  2577. _sub_docs_json = _doc.getProperties().get(document_tmp_sub_docs_json)
  2578. if _sub_docs_json is not None:
  2579. _doc.setValue("sub_docs",json.loads(_sub_docs_json),False)
  2580. except Exception as e:
  2581. traceback.print_exc()
  2582. list_docs.sort(key=lambda x:x.getProperties().get(document_page_time,""))
  2583. return list_docs
  2584. def is_same_package(self,_dict1,_dict2):
  2585. sub_project_name1 = _dict1.get(project_sub_project_name,"")
  2586. if sub_project_name1=="Project":
  2587. sub_project_name1 = ""
  2588. win_tenderer1 = _dict1.get(project_win_tenderer,"")
  2589. win_bid_price1 = _dict1.get(project_win_bid_price,0)
  2590. bidding_budget1 = _dict1.get(project_bidding_budget,0)
  2591. sub_project_name2 = _dict2.get(project_sub_project_name,"")
  2592. if sub_project_name2=="Project":
  2593. sub_project_name2 = ""
  2594. win_tenderer2 = _dict2.get(project_win_tenderer,"")
  2595. win_bid_price2 = _dict2.get(project_win_bid_price,0)
  2596. bidding_budget2 = _dict2.get(project_bidding_budget,0)
  2597. _set = set([a for a in [sub_project_name1,sub_project_name2] if a!=""])
  2598. if len(_set)>1:
  2599. return False
  2600. _set = set([a for a in [win_tenderer1,win_tenderer2] if a!=""])
  2601. if len(_set)>1:
  2602. return False
  2603. _set = set([a for a in [win_bid_price1,win_bid_price2] if a!=0])
  2604. if len(_set)>1:
  2605. return False
  2606. _set = set([a for a in [bidding_budget1,bidding_budget2] if a!=0])
  2607. if len(_set)>1:
  2608. return False
  2609. return True
  2610. def getUpdate_dict(self,_dict):
  2611. update_dict = {}
  2612. for k,v in _dict.items():
  2613. if v is None:
  2614. continue
  2615. if isinstance(v,str):
  2616. if v=="":
  2617. continue
  2618. if isinstance(v,(float,int)):
  2619. if v==0:
  2620. continue
  2621. update_dict[k] = v
  2622. return update_dict
  2623. def update_projects_by_document(self,docid,save,projects):
  2624. '''
  2625. 更新projects中对应的document的属性
  2626. :param docid:
  2627. :param projects: 项目集合
  2628. :param action:add/delete add时附加唯一属性,delete时删除唯一属性
  2629. :return:
  2630. '''
  2631. list_docs = self.search_docs([docid])
  2632. docs = [_doc.getProperties() for _doc in list_docs]
  2633. project_dict = generate_common_properties(docs)
  2634. list_package_properties = generate_packages_properties(docs)
  2635. _dict = {}
  2636. #更新公共属性
  2637. _replace_replace = False
  2638. v = project_dict.get(document_district,"")
  2639. if not (v is None or v=="" or v=="[]" or v=="未知"):
  2640. _replace_replace = True
  2641. for k,v in project_dict.items():
  2642. if not _replace_replace:
  2643. if k in [document_district,document_city,document_province,document_area]:
  2644. continue
  2645. if v is None or v=="" or v=="[]" or v=="未知":
  2646. continue
  2647. if k in (project_project_dynamics,project_product,project_project_codes,project_docids):
  2648. continue
  2649. _dict[k] = v
  2650. for _proj in projects:
  2651. _proj.update(_dict)
  2652. for _proj in projects:
  2653. if _proj.get(project_page_time,"")<project_dict.get(project_page_time,""):
  2654. _proj[project_page_time] = project_dict.get(project_page_time,"")
  2655. #拼接属性
  2656. append_dict = {}
  2657. set_docid = set()
  2658. set_product = set()
  2659. set_code = set()
  2660. set_nlp_enterprise = set()
  2661. set_nlp_enterprise_attachment = set()
  2662. for _proj in projects:
  2663. _docids = _proj.get(project_docids,"")
  2664. _codes = _proj.get(project_project_codes,"")
  2665. _product = _proj.get(project_product,"")
  2666. set_docid = set(_docids.split(","))
  2667. if save==1:
  2668. set_docid.add(str(docid))
  2669. else:
  2670. if str(docid) in set_docid:
  2671. set_docid.remove(str(docid))
  2672. set_code = set_code | set(_codes.split(","))
  2673. set_product = set_product | set(_product.split(","))
  2674. try:
  2675. set_nlp_enterprise |= set(json.loads(_proj.get(project_nlp_enterprise,"[]")))
  2676. set_nlp_enterprise_attachment |= set(json.loads(_proj.get(project_nlp_enterprise_attachment,"[]")))
  2677. except Exception as e:
  2678. pass
  2679. set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
  2680. set_product = set_product | set(project_dict.get(project_product,"").split(","))
  2681. try:
  2682. set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
  2683. set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
  2684. except Exception as e:
  2685. pass
  2686. append_dict[project_docids] = ",".join([a for a in list(set_docid) if a!=""])
  2687. append_dict[project_docid_number] = len(set_docid)
  2688. append_dict[project_project_codes] = ",".join([a for a in list(set_code) if a!=""])
  2689. append_dict[project_product] = ",".join([a for a in list(set_product) if a!=""])
  2690. append_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
  2691. append_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
  2692. dict_dynamic = {}
  2693. set_docid = set()
  2694. _dynamic = json.loads(_proj.get(project_project_dynamics,"[]"))
  2695. for _dy in _dynamic:
  2696. _docid = _dy.get("docid")
  2697. dict_dynamic[_docid] = _dy
  2698. _dynamic = json.loads(project_dict.get(project_project_dynamics,"[]"))
  2699. for _dy in _dynamic:
  2700. _docid = _dy.get("docid")
  2701. dict_dynamic[_docid] = _dy
  2702. list_dynamics = []
  2703. for k,v in dict_dynamic.items():
  2704. list_dynamics.append(v)
  2705. list_dynamics.sort(key=lambda x:x.get(document_page_time,""))
  2706. append_dict[project_project_dynamics] = json.dumps(list_dynamics[:100],ensure_ascii=False)
  2707. _proj.update(append_dict)
  2708. dict_package = {}
  2709. for _pp in projects:
  2710. _counts = 0
  2711. sub_project_name = _pp.get(project_sub_project_name,"")
  2712. if sub_project_name=="Project":
  2713. sub_project_name = ""
  2714. win_tenderer = _pp.get(project_win_tenderer,"")
  2715. win_bid_price = _pp.get(project_win_bid_price,0)
  2716. bidding_budget = _pp.get(project_bidding_budget,0)
  2717. if win_tenderer!="" and bidding_budget!=0:
  2718. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2719. dict_package[_key] = _pp
  2720. _counts += 1
  2721. if win_tenderer!="" and win_bid_price!=0:
  2722. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2723. dict_package[_key] = _pp
  2724. _counts +=1
  2725. if _counts==0:
  2726. if win_tenderer!="":
  2727. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2728. dict_package[_key] = _pp
  2729. _counts += 1
  2730. if bidding_budget!=0:
  2731. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2732. dict_package[_key] = _pp
  2733. _counts += 1
  2734. #更新私有属性
  2735. for _pp in list_package_properties:
  2736. flag_update = False
  2737. sub_project_name = _pp.get(project_sub_project_name,"")
  2738. if sub_project_name=="Project":
  2739. sub_project_name = ""
  2740. win_tenderer = _pp.get(project_win_tenderer,"")
  2741. win_bid_price = _pp.get(project_win_bid_price,0)
  2742. bidding_budget = _pp.get(project_bidding_budget,0)
  2743. if win_tenderer!="" and bidding_budget!=0:
  2744. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2745. if _key in dict_package:
  2746. if self.is_same_package(_pp,dict_package[_key]):
  2747. ud = self.getUpdate_dict(_pp)
  2748. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2749. dict_package[_key].update(ud)
  2750. flag_update = True
  2751. continue
  2752. if win_tenderer!="" and win_bid_price!=0:
  2753. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2754. if _key in dict_package:
  2755. if self.is_same_package(_pp,dict_package[_key]):
  2756. ud = self.getUpdate_dict(_pp)
  2757. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2758. dict_package[_key].update(ud)
  2759. flag_update = True
  2760. continue
  2761. if win_tenderer!="":
  2762. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2763. if _key in dict_package:
  2764. if self.is_same_package(_pp,dict_package[_key]):
  2765. ud = self.getUpdate_dict(_pp)
  2766. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2767. dict_package[_key].update(ud)
  2768. flag_update = True
  2769. continue
  2770. if bidding_budget!=0:
  2771. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2772. if _key in dict_package:
  2773. if self.is_same_package(_pp,dict_package[_key]):
  2774. ud = self.getUpdate_dict(_pp)
  2775. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2776. dict_package[_key].update(ud)
  2777. flag_update = True
  2778. continue
  2779. if not flag_update:
  2780. _pp.update(project_dict)
  2781. projects.append(_pp)
  2782. _counts = 0
  2783. if win_tenderer!="" and bidding_budget!=0:
  2784. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2785. dict_package[_key] = _pp
  2786. _counts += 1
  2787. if win_tenderer!="" and win_bid_price!=0:
  2788. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2789. dict_package[_key] = _pp
  2790. _counts +=1
  2791. if _counts==0:
  2792. if win_tenderer!="":
  2793. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2794. dict_package[_key] = _pp
  2795. _counts += 1
  2796. if bidding_budget!=0:
  2797. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2798. dict_package[_key] = _pp
  2799. _counts += 1
  2800. def delete_projects_by_document(self,docid):
  2801. '''
  2802. 更新projects中对应的document的属性
  2803. :param docid:
  2804. :param projects: 项目集合
  2805. :param action:add/delete add时附加唯一属性,delete时删除唯一属性
  2806. :return:
  2807. '''
  2808. set_docid = set()
  2809. list_delete_projects = []
  2810. list_projects = self.search_projects_with_document([docid])
  2811. for _proj in list_projects:
  2812. _p = {}
  2813. _docids = _proj.get(project_docids,"")
  2814. print(_proj.get(project_uuid))
  2815. _p["delete_uuid"] = _proj.get(project_uuid)
  2816. _p["to_delete"] = True
  2817. list_delete_projects.append(_p)
  2818. if _docids!="":
  2819. set_docid = set_docid | set(_docids.split(","))
  2820. if str(docid) in set_docid:
  2821. set_docid.remove(str(docid))
  2822. list_docid = list(set_docid)
  2823. list_projects = []
  2824. if len(list_docid)>0:
  2825. list_docs = self.search_docs(list_docid)
  2826. list_projects = self.generate_projects_from_document(list_docs)
  2827. list_projects = dumplicate_projects(list_projects)
  2828. list_projects.extend(list_delete_projects)
  2829. project_json = to_project_json(list_projects)
  2830. print("delete_json",project_json)
  2831. return project_json
  2832. def delete_doc_handle(self,_dict,result_queue):
  2833. headers = _dict.get("frame")
  2834. conn = _dict.get("conn")
  2835. log("==========delete")
  2836. if headers is not None:
  2837. message_id = headers.headers["message-id"]
  2838. body = headers.body
  2839. item = json.loads(body)
  2840. docid = item.get("docid")
  2841. if docid is None:
  2842. return
  2843. delete_result = self.delete_projects_by_document(docid)
  2844. _uuid = uuid4().hex
  2845. _d = {PROJECT_PROCESS_UUID:_uuid,
  2846. PROJECT_PROCESS_CRTIME:1,
  2847. PROJECT_PROCESS_PROJECTS:delete_result}
  2848. _pp = Project_process(_d)
  2849. if _pp.update_row(self.ots_client):
  2850. ackMsg(conn,message_id)
  2851. #取消插入结果队列,改成插入project_process表
  2852. # if send_msg_toacmq(self.pool_mq_ali,delete_result,self.doc_delete_result):
  2853. # ackMsg(conn,message_id)
  2854. def generate_common_properties(self,list_docs):
  2855. '''
  2856. #通用属性生成
  2857. :param list_docis:
  2858. :return:
  2859. '''
  2860. #计数法选择
  2861. choose_dict = {}
  2862. project_dict = {}
  2863. for _key in [document_bidway,document_industry,document_info_type,document_info_source,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]:
  2864. for _doc in list_docs:
  2865. _value = _doc.getProperties().get(_key,"")
  2866. if _value!="":
  2867. if _key not in choose_dict:
  2868. choose_dict[_key] = {}
  2869. if _value not in choose_dict[_key]:
  2870. choose_dict[_key][_value] = 0
  2871. choose_dict[_key][_value] += 1
  2872. _find = False
  2873. for _key in [document_district,document_city,document_province,document_area]:
  2874. area_dict = {}
  2875. for _doc in list_docs:
  2876. loc = _doc.getProperties().get(_key,"未知")
  2877. if loc not in ('全国','未知',"0"):
  2878. if loc not in area_dict:
  2879. area_dict[loc] = 0
  2880. area_dict[loc] += 1
  2881. list_loc = []
  2882. for k,v in area_dict.items():
  2883. list_loc.append([k,v])
  2884. list_loc.sort(key=lambda x:x[1],reverse=True)
  2885. if len(list_loc)>0:
  2886. project_dict[document_district] = _doc.getProperties().get(document_district)
  2887. project_dict[document_city] = _doc.getProperties().get(document_city)
  2888. project_dict[document_province] = _doc.getProperties().get(document_province)
  2889. project_dict[document_area] = _doc.getProperties().get(document_area)
  2890. _find = True
  2891. break
  2892. if not _find:
  2893. if len(list_docs)>0:
  2894. project_dict[document_district] = list_docs[0].getProperties().get(document_district)
  2895. project_dict[document_city] = list_docs[0].getProperties().get(document_city)
  2896. project_dict[document_province] = list_docs[0].getProperties().get(document_province)
  2897. project_dict[document_area] = list_docs[0].getProperties().get(document_area)
  2898. for _key,_value in choose_dict.items():
  2899. _l = []
  2900. for k,v in _value.items():
  2901. _l.append([k,v])
  2902. _l.sort(key=lambda x:x[1],reverse=True)
  2903. if len(_l)>0:
  2904. _v = _l[0][0]
  2905. if _v in ('全国','未知'):
  2906. if len(_l)>1:
  2907. _v = _l[1][0]
  2908. project_dict[_key] = _v
  2909. list_dynamics = []
  2910. docid_number = 0
  2911. visuable_docids = []
  2912. zhao_biao_page_time = ""
  2913. zhong_biao_page_time = ""
  2914. list_codes = []
  2915. list_product = []
  2916. p_page_time = ""
  2917. remove_docids = set()
  2918. for _doc in list_docs:
  2919. table_name = _doc.getProperties().get("table_name")
  2920. status = _doc.getProperties().get(document_status,0)
  2921. _save = _doc.getProperties().get(document_tmp_save,1)
  2922. doctitle = _doc.getProperties().get(document_doctitle,"")
  2923. docchannel = _doc.getProperties().get(document_docchannel)
  2924. page_time = _doc.getProperties().get(document_page_time,"")
  2925. _docid = _doc.getProperties().get(document_docid)
  2926. _bidway = _doc.getProperties().get(document_bidway,"")
  2927. _docchannel = _doc.getProperties().get(document_life_docchannel,0)
  2928. project_codes = _doc.getProperties().get(document_project_codes)
  2929. product = _doc.getProperties().get(document_product)
  2930. sub_docs = _doc.getProperties().get("sub_docs",[])
  2931. is_multipack = True if len(sub_docs)>1 else False
  2932. extract_count = _doc.getProperties().get(document_tmp_extract_count,0)
  2933. if product is not None:
  2934. list_product.extend(product.split(","))
  2935. if project_codes is not None:
  2936. _c = project_codes.split(",")
  2937. list_codes.extend(_c)
  2938. if p_page_time=="":
  2939. p_page_time = page_time
  2940. if zhao_biao_page_time=="" and _docchannel in (51,52,102,103,114):
  2941. zhao_biao_page_time = page_time
  2942. if zhong_biao_page_time=="" and _docchannel in (101,118,119,120):
  2943. zhong_biao_page_time = page_time
  2944. is_visuable = 0
  2945. if table_name=="document":
  2946. if status>=201 and status<=300:
  2947. docid_number +=1
  2948. visuable_docids.append(str(_docid))
  2949. is_visuable = 1
  2950. else:
  2951. remove_docids.add(str(_docid))
  2952. else:
  2953. if _save==1:
  2954. docid_number +=1
  2955. visuable_docids.append(str(_docid))
  2956. is_visuable = 1
  2957. else:
  2958. remove_docids.add(str(_docid))
  2959. list_dynamics.append({document_docid:_docid,
  2960. document_doctitle:doctitle,
  2961. document_docchannel:_docchannel,
  2962. document_bidway:_bidway,
  2963. document_page_time:page_time,
  2964. document_status:201 if is_visuable==1 else 401,
  2965. "is_multipack":is_multipack,
  2966. document_tmp_extract_count:extract_count
  2967. }
  2968. )
  2969. project_dict[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
  2970. project_dict[project_docid_number] = docid_number
  2971. project_dict[project_docids] = ",".join(list(set(visuable_docids)-remove_docids))
  2972. if zhao_biao_page_time !="":
  2973. project_dict[project_zhao_biao_page_time] = zhao_biao_page_time
  2974. if zhong_biao_page_time !="":
  2975. project_dict[project_zhong_biao_page_time] = zhong_biao_page_time
  2976. project_dict[project_project_codes] = ",".join(list(set(list_codes)))
  2977. project_dict[project_page_time] = p_page_time
  2978. project_dict[project_product] = ",".join(list(set(list_product)))
  2979. return project_dict
  2980. def generate_packages_properties(self,list_docs):
  2981. '''
  2982. 生成分包属性
  2983. :param list_docs:
  2984. :return:
  2985. '''
  2986. list_properties = []
  2987. set_key = set()
  2988. for _doc in list_docs:
  2989. _dict = {}
  2990. sub_docs = _doc.getProperties().get("sub_docs")
  2991. if sub_docs is not None:
  2992. for _d in sub_docs:
  2993. sub_project_code = _d.get(project_sub_project_code,"")
  2994. sub_project_name = _d.get(project_sub_project_name,"")
  2995. win_tenderer = _d.get(project_win_tenderer,"")
  2996. win_bid_price = _d.get(project_win_bid_price,"")
  2997. _key = "%s-%s-%s-%s"%(sub_project_code,sub_project_name,win_tenderer,win_bid_price)
  2998. if _key in set_key:
  2999. continue
  3000. set_key.add(_key)
  3001. list_properties.append(_d)
  3002. return list_properties
  3003. def generate_projects_from_document(self,list_docs):
  3004. '''
  3005. #通过公告生成projects
  3006. :param list_docids:
  3007. :return:
  3008. '''
  3009. #判断标段数
  3010. list_projects = generate_projects([doc.getProperties() for doc in list_docs])
  3011. return list_projects
  3012. def search_projects_with_document(self,list_docids):
  3013. '''
  3014. 通过docid集合查询对应的projects
  3015. :param list_docids:
  3016. :return:
  3017. '''
  3018. print("==",list_docids)
  3019. list_should_q = []
  3020. for _docid in list_docids:
  3021. list_should_q.append(TermQuery("docids",_docid))
  3022. bool_query = BoolQuery(should_queries=list_should_q)
  3023. _query = {"query":bool_query,"limit":20}
  3024. list_project_dict = getDocument(_query,self.ots_client,[
  3025. project_uuid,project_docids,project_zhao_biao_page_time,
  3026. project_zhong_biao_page_time,
  3027. project_page_time,
  3028. project_area,
  3029. project_province,
  3030. project_city,
  3031. project_district,
  3032. project_info_type,
  3033. project_industry,
  3034. project_qcodes,
  3035. project_project_name,
  3036. project_project_code,
  3037. project_project_codes,
  3038. project_project_addr,
  3039. project_tenderee,
  3040. project_tenderee_addr,
  3041. project_tenderee_phone,
  3042. project_tenderee_contact,
  3043. project_agency,
  3044. project_agency_phone,
  3045. project_agency_contact,
  3046. project_sub_project_name,
  3047. project_sub_project_code,
  3048. project_bidding_budget,
  3049. project_win_tenderer,
  3050. project_win_bid_price,
  3051. project_win_tenderer_manager,
  3052. project_win_tenderer_phone,
  3053. project_second_tenderer,
  3054. project_second_bid_price,
  3055. project_second_tenderer_manager,
  3056. project_second_tenderer_phone,
  3057. project_third_tenderer,
  3058. project_third_bid_price,
  3059. project_third_tenderer_manager,
  3060. project_third_tenderer_phone,
  3061. project_procurement_system,
  3062. project_bidway,
  3063. project_dup_data,
  3064. project_docid_number,
  3065. project_project_dynamics,
  3066. project_product,
  3067. project_moneysource,
  3068. project_service_time,
  3069. project_time_bidclose,
  3070. project_time_bidopen,
  3071. project_time_bidstart,
  3072. project_time_commencement,
  3073. project_time_completion,
  3074. project_time_earnest_money_start,
  3075. project_time_earnest_money_end,
  3076. project_time_get_file_end,
  3077. project_time_get_file_start,
  3078. project_time_publicity_end,
  3079. project_time_publicity_start,
  3080. project_time_registration_end,
  3081. project_time_registration_start,
  3082. project_time_release,
  3083. project_dup_docid,
  3084. project_info_source,
  3085. project_nlp_enterprise,
  3086. project_nlp_enterprise_attachment,
  3087. ],sort="page_time",table_name="project2",table_index="project2_index")
  3088. return list_project_dict
  3089. def set_project_uuid(self,_dict,_uuid):
  3090. if _uuid is not None and _uuid!="":
  3091. if "uuid" in _dict:
  3092. _dict["uuid"] = "%s,%s"%(_dict["uuid"],_uuid)
  3093. else:
  3094. _dict["uuid"] = _uuid
  3095. def getMerge_rules(self,page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district):
  3096. whole_time_start = time.time()
  3097. _time = time.time()
  3098. list_query = []
  3099. list_code = [a for a in project_codes.split(",") if a!='']
  3100. should_q_code = BoolQuery(should_queries=[MatchQuery(project_project_codes,a) for a in list_code[:20]])
  3101. # print("should_q_code",[a for a in list_code[:20]])
  3102. should_q_cod = BoolQuery(should_queries=[MatchQuery(project_project_code,a) for a in list_code[:20]])
  3103. list_product = [a for a in product.split(",") if a!='']
  3104. should_q_product = BoolQuery(should_queries=[MatchQuery(project_product,a) for a in list_product[:20]])
  3105. should_q_area = None
  3106. if province!="" or city!="" or district!="":
  3107. should_q = []
  3108. if province not in ("","全国","未知") and province is not None:
  3109. should_q.append(TermQuery(project_province,province))
  3110. if city not in ("","全国","未知") and city is not None:
  3111. should_q.append(TermQuery(project_city,city))
  3112. if district not in ("","全国","未知") and district is not None:
  3113. should_q.append(TermQuery(project_district,district))
  3114. if len(should_q)>0:
  3115. should_q_area = BoolQuery(should_queries=should_q)
  3116. prepare_time = time.time()-_time
  3117. _time = time.time()
  3118. # log("list_code %s"%(str(list_code)))
  3119. # log("list_product %s"%(str(list_product)))
  3120. # log("tenderee %s"%(tenderee))
  3121. # log("bidding_budget %s"%(bidding_budget))
  3122. # log("win_tenderer %s"%(win_tenderer))
  3123. # log("win_bid_price %s"%(win_bid_price))
  3124. # log("project_name %s"%(project_name))
  3125. log_time = time.time()-_time
  3126. _time = time.time()
  3127. if tenderee!="" and len(list_code)>0:
  3128. _query = [TermQuery(project_tenderee,tenderee),
  3129. should_q_code,
  3130. ]
  3131. list_query.append([_query,2])
  3132. _query = [TermQuery(project_tenderee,tenderee),
  3133. should_q_cod
  3134. ]
  3135. list_query.append([_query,2])
  3136. if tenderee!="" and len(list_product)>0:
  3137. _query = [TermQuery(project_tenderee,tenderee),
  3138. should_q_product]
  3139. list_query.append([_query,1])
  3140. if tenderee!="" and project_name!="":
  3141. _query = [TermQuery(project_tenderee,tenderee),
  3142. TermQuery(project_project_name,project_name)]
  3143. list_query.append([_query,2])
  3144. if tenderee!="" and agency!="":
  3145. _query = [TermQuery(project_tenderee,tenderee),
  3146. TermQuery(project_agency,agency)]
  3147. list_query.append([_query,0])
  3148. if tenderee!="" and float(bidding_budget)>0:
  3149. _query = [TermQuery(project_tenderee,tenderee),
  3150. TermQuery(project_bidding_budget,bidding_budget)]
  3151. list_query.append([_query,2])
  3152. if float(bidding_budget)>0 and float(win_bid_price)>0:
  3153. _query = [TermQuery(project_bidding_budget,bidding_budget),
  3154. TermQuery(project_win_bid_price,win_bid_price)]
  3155. list_query.append([_query,2])
  3156. if tenderee!="" and win_tenderer!="":
  3157. _query = [TermQuery(project_tenderee,tenderee),
  3158. TermQuery(project_win_tenderer,win_tenderer)]
  3159. list_query.append([_query,2])
  3160. if agency!="" and win_tenderer!="":
  3161. _query = [TermQuery(project_agency,agency),
  3162. TermQuery(project_win_tenderer,win_tenderer)]
  3163. list_query.append([_query,0])
  3164. if agency!="" and len(list_product)>0:
  3165. _query = [TermQuery(project_agency,agency),
  3166. should_q_product]
  3167. list_query.append([_query,1])
  3168. if win_tenderer!="" and len(list_code)>0:
  3169. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3170. should_q_code]
  3171. list_query.append([_query,2])
  3172. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3173. should_q_cod]
  3174. list_query.append([_query,2])
  3175. if win_tenderer!="" and float(win_bid_price)>0:
  3176. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3177. TermQuery(project_win_bid_price,win_bid_price)]
  3178. list_query.append([_query,2])
  3179. if win_tenderer!="" and float(bidding_budget)>0:
  3180. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3181. TermQuery(project_bidding_budget,bidding_budget)]
  3182. list_query.append([_query,2])
  3183. if len(list_code)>0 and len(list_product)>0:
  3184. _query = [should_q_code,
  3185. should_q_product]
  3186. list_query.append([_query,2])
  3187. if len(list_code)>0:
  3188. _query = [
  3189. should_q_code]
  3190. list_query.append([_query,2])
  3191. _query = [
  3192. should_q_cod]
  3193. list_query.append([_query,1])
  3194. if project_name!="" and project_name is not None:
  3195. _query = [
  3196. TermQuery(project_project_name,project_name)]
  3197. list_query.append([_query,1])
  3198. _query_title = [MatchPhraseQuery(project_doctitles,project_name)]
  3199. list_query.append([_query_title,1])
  3200. if len(list_product)>0 and should_q_area is not None:
  3201. _query = [should_q_area,
  3202. should_q_product]
  3203. list_query.append([_query,0])
  3204. generate_time = time.time()-_time
  3205. whole_time = time.time()-whole_time_start
  3206. # log("projects merge rules whole_time:%.3f prepare_time:%.3f log_time:%.3f generate_time:%.3f"%(whole_time,prepare_time,log_time,generate_time))
  3207. return list_query
  3208. def merge_projects(self,list_projects,b_log=False,check_columns=[project_uuid,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_project_name,project_project_code,project_project_codes,project_tenderee,project_agency,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_project_dynamics,project_product,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_nlp_enterprise,project_nlp_enterprise_attachment,project_docids,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_addr,project_tenderee_addr,project_agency_phone,project_agency_contact,project_tenderee_phone,project_tenderee_contact,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_moneysource,project_service_time,project_dup_docid,project_info_source],fix_columns=[project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_addr,project_tenderee_addr,project_agency_phone,project_agency_contact,project_tenderee_phone,project_tenderee_contact,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_moneysource,project_service_time,project_dup_docid,project_info_source]):
  3209. '''
  3210. 对项目进行合并
  3211. :return:
  3212. '''
  3213. try:
  3214. whole_time_start = time.time()
  3215. set_uuid = set()
  3216. for _proj in list_projects:
  3217. _uuid = _proj.get("uuid")
  3218. if _uuid is not None:
  3219. set_uuid = set_uuid | set(_uuid.split(","))
  3220. must_not_q = []
  3221. for _uuid in list(set_uuid):
  3222. must_not_q.append(TermQuery("uuid",_uuid))
  3223. print("must_not_q uuid:%s"%(_uuid))
  3224. projects_merge_count = 0
  3225. projects_check_rule_time = 0
  3226. projects_update_time = 0
  3227. projects_query_time = 0
  3228. projects_prepare_time = 0
  3229. current_date = getCurrent_date("%Y-%m-%d")
  3230. min_date = timeAdd(current_date,-35,format="%Y-%m-%d")
  3231. search_table = "project2"
  3232. search_table_index = "project2_index_formerge"
  3233. project_cls = Project
  3234. docids = ""
  3235. for _proj in list_projects[:30]:
  3236. docids = _proj.get(project_docids,"")
  3237. page_time = _proj.get(project_page_time,"")
  3238. project_codes = _proj.get(project_project_codes,"")
  3239. project_name = _proj.get(project_project_name,"")
  3240. tenderee = _proj.get(project_tenderee,"")
  3241. agency = _proj.get(project_agency,"")
  3242. product = _proj.get(project_product,"")
  3243. sub_project_name = _proj.get(project_sub_project_name,"")
  3244. bidding_budget = _proj.get(project_bidding_budget,-1)
  3245. win_tenderer = _proj.get(project_win_tenderer,"")
  3246. win_bid_price = _proj.get(project_win_bid_price,-1)
  3247. _dynamic = _proj.get(project_project_dynamics,"[]")
  3248. is_yanshou = False
  3249. list_dynamic = json.loads(_dynamic)
  3250. for _d in list_dynamic:
  3251. _title = _d.get("doctitle","")
  3252. if re.search("验收公[示告]",_title) is not None:
  3253. is_yanshou = True
  3254. break
  3255. province = _proj.get(project_province,"")
  3256. city = _proj.get(project_city,"")
  3257. district = _proj.get(project_district,"")
  3258. if is_yanshou:
  3259. page_time_less = timeAdd(page_time,-750)
  3260. page_time_greater = timeAdd(page_time,720)
  3261. else:
  3262. page_time_less = timeAdd(page_time,-450)
  3263. page_time_greater = timeAdd(page_time,420)
  3264. sub_project_q = TermQuery(project_sub_project_name,sub_project_name) if sub_project_name.replace("Project","")!="" else None
  3265. _time = time.time()
  3266. list_must_query = self.getMerge_rules(page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district)
  3267. list_merge_data = []
  3268. search_table = "project2"
  3269. search_table_index = "project2_index_formerge"
  3270. project_cls = Project
  3271. # print("page_time,min_date",page_time,min_date)
  3272. # if page_time>=min_date:
  3273. # search_table = "project2_tmp"
  3274. # search_table_index = "project2_tmp_index"
  3275. # project_cls = Project_tmp
  3276. _step = 2
  3277. _begin = 0
  3278. must_queries = []
  3279. if page_time_less is not None and page_time_greater is not None:
  3280. must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
  3281. ]
  3282. #sub_project_name非必要条件
  3283. # if sub_project_q is not None:
  3284. # must_queries.append(sub_project_q)
  3285. projects_prepare_time += time.time()-_time
  3286. _time = time.time()
  3287. sort_type = SortOrder.DESC
  3288. while _begin<len(list_must_query):
  3289. if sort_type==SortOrder.DESC:
  3290. sort_type=SortOrder.ASC
  3291. if sort_type==SortOrder.ASC:
  3292. sort_type=SortOrder.DESC
  3293. list_should_q = []
  3294. _limit = 10
  3295. for must_q,_count in list_must_query[_begin:_begin+_step]:
  3296. must_q1 = list(must_q)
  3297. must_q1.extend(must_queries)
  3298. list_should_q.append(BoolQuery(must_queries=must_q1))
  3299. _limit += _count*5
  3300. _query = BoolQuery(
  3301. should_queries=list_should_q,
  3302. must_not_queries=must_not_q[:100]
  3303. )
  3304. # rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
  3305. # SearchQuery(_query,limit=_limit),
  3306. # columns_to_get=ColumnsToGet(column_names=[project_uuid,project_docids,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_name,project_project_code,project_project_codes,project_project_addr,project_tenderee,project_tenderee_addr,project_tenderee_phone,project_tenderee_contact,project_agency,project_agency_phone,project_agency_contact,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_project_dynamics,project_product,project_moneysource,project_service_time,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_dup_docid,project_info_source,project_nlp_enterprise,project_nlp_enterprise_attachment],return_type=ColumnReturnType.SPECIFIED))
  3307. rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search(search_table,search_table_index,
  3308. SearchQuery(_query,sort=Sort(sorters=[FieldSort(project_page_time,sort_type)]),limit=_limit),
  3309. columns_to_get=ColumnsToGet(column_names=check_columns,return_type=ColumnReturnType.SPECIFIED))
  3310. list_data = getRow_ots(rows)
  3311. list_merge_data.extend(list_data)
  3312. # print(list_data)
  3313. for _data in list_data:
  3314. must_not_q.append(TermQuery(project_uuid,_data.get(project_uuid)))
  3315. _begin += _step
  3316. projects_query_time += time.time()-_time
  3317. #优先匹配招标金额相近的
  3318. projects_merge_count = len(list_merge_data)
  3319. list_merge_data.sort(key=lambda x:x.get(project_page_time,""))
  3320. list_merge_data.sort(key=lambda x:x.get(project_bidding_budget,-1))
  3321. # log(page_time_less+"=="+page_time_greater)
  3322. # log("list_merge_data:%s"%(str(list_merge_data)))
  3323. list_check_data = []
  3324. for _data in list_merge_data:
  3325. _time = time.time()
  3326. _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
  3327. if b_log:
  3328. log(str(_check))
  3329. projects_check_rule_time += time.time()-_time
  3330. if _check:
  3331. list_check_data.append([_data,_prob])
  3332. list_check_data.sort(key=lambda x:x[1],reverse=True)
  3333. for _data,_ in list_check_data:
  3334. _time = time.time()
  3335. _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
  3336. projects_check_rule_time += time.time()-_time
  3337. _time = time.time()
  3338. if _check:
  3339. # o_proj = project_cls(_data)
  3340. # o_proj.fix_columns(self.ots_client,fix_columns,True)
  3341. # for k in fix_columns:
  3342. # _data[k] = o_proj.getProperties().get(k)
  3343. update_projects_by_project(_data,[_proj])
  3344. projects_update_time += time.time()-_time
  3345. whole_time = time.time()-whole_time_start
  3346. log("%s %s merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(search_table,docids,whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
  3347. return list_projects
  3348. except Exception as e:
  3349. traceback.print_exc()
  3350. assert 1==2
  3351. def merge_document_real(self,item,dup_docid,table_name,save,status_to=None,b_log=False):
  3352. '''
  3353. 实时项目合并
  3354. :param item:
  3355. :param dup_docid:重复的公告集合
  3356. :param status_to:
  3357. :return:
  3358. '''
  3359. try:
  3360. list_docids = []
  3361. _docid = item.get(document_tmp_docid)
  3362. list_docids.append(_docid)
  3363. if isinstance(dup_docid,list):
  3364. list_docids.extend(dup_docid)
  3365. list_docids = [a for a in list_docids if a is not None]
  3366. _time = time.time()
  3367. list_projects = self.search_projects_with_document(list_docids)
  3368. # log("search projects takes:%.3f"%(time.time()-_time))
  3369. if len(list_projects)==0:
  3370. # _time = time.time()
  3371. list_docs = self.search_docs(list_docids)
  3372. # log("search document takes:%.3f"%(time.time()-_time))
  3373. # _time = time.time()
  3374. list_projects = self.generate_projects_from_document(list_docs)
  3375. # log("generate projects takes:%.3f"%(time.time()-_time))
  3376. else:
  3377. _time = time.time()
  3378. self.update_projects_by_document(_docid,save,list_projects)
  3379. # log("update projects takes:%.3f"%(time.time()-_time))
  3380. _time = time.time()
  3381. list_projects = dumplicate_projects(list_projects)
  3382. # log("dumplicate projects takes:%.3f"%(time.time()-_time))
  3383. _time = time.time()
  3384. list_projects = self.merge_projects(list_projects,b_log)
  3385. # log("merge projects takes:%.3f"%(time.time()-_time))
  3386. _time = time.time()
  3387. list_merge_dump = dumplicate_document_in_merge(list_projects,dup_docid[:-1])
  3388. # log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
  3389. _time = time.time()
  3390. project_json = to_project_json(list_projects)
  3391. # log("json projects takes:%.3f"%(time.time()-_time))
  3392. if b_log:
  3393. log("project_json:%s"%project_json)
  3394. return project_json,list_merge_dump
  3395. except Exception as e:
  3396. raise RuntimeError("error on dumplicate")
  3397. def is_exist_fingerprint(self,final_list,_docid,_fingerprint,table_name):
  3398. set_fingerprint = set()
  3399. for _i in range(1,len(final_list)):
  3400. _dict = final_list[_i]
  3401. b_docid = _dict[document_tmp_docid]
  3402. _save = _dict.get(document_tmp_save,0)
  3403. _status = _dict.get(document_tmp_status,0)
  3404. if table_name=="document":
  3405. if _status>=201 and _status<=300:
  3406. _save = 1
  3407. fingerprint_less = _dict.get(document_tmp_fingerprint,"")
  3408. if b_docid==_docid:
  3409. pass
  3410. else:
  3411. if _save==1:
  3412. set_fingerprint.add(fingerprint_less)
  3413. if _fingerprint in set_fingerprint:
  3414. return True
  3415. return False
  3416. def check_page_time(self,item):
  3417. page_time = item.get(document_page_time,"")
  3418. has_before = False
  3419. has_after = False
  3420. if len(page_time)>0:
  3421. l_page_time = timeAdd(page_time,days=-90)
  3422. dict_time = item.get("dict_time",{})
  3423. for k,v in dict_time.items():
  3424. if v is not None and len(v)>0:
  3425. if l_page_time>v:
  3426. has_before = True
  3427. if v>page_time:
  3428. has_after = True
  3429. if not has_after and has_before:
  3430. log("check page_time false %s==%s-%s"%(l_page_time,k,v))
  3431. return False
  3432. return True
  3433. def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
  3434. try:
  3435. start_time = time.time()
  3436. self.post_extract(item)
  3437. log("dumplicate start on:%s"%(str(item.get(document_tmp_docid))))
  3438. base_list = []
  3439. set_docid = set()
  3440. list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=get_all,to_log=False)
  3441. # print("len_rules",len(list_rules),table_name,table_index)
  3442. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  3443. list_rules = list_rules[:30]
  3444. _i = 0
  3445. step = 5
  3446. print("here 1")
  3447. item["confidence"] = 999
  3448. if item.get(document_tmp_docid) not in set_docid:
  3449. base_list.append(item)
  3450. set_docid.add(item.get(document_tmp_docid))
  3451. while _i<len(list_rules):
  3452. must_not_q = []
  3453. if len(base_list)>0:
  3454. must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
  3455. _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
  3456. must_not_queries=must_not_q)
  3457. _rule = list_rules[_i]
  3458. confidence = _rule["confidence"]
  3459. singleNum_keys = _rule["singleNum_keys"]
  3460. contain_keys = _rule["contain_keys"]
  3461. multiNum_keys = _rule["multiNum_keys"]
  3462. self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle])
  3463. _i += step
  3464. print("here 2")
  3465. b_log = False if upgrade else True
  3466. _time = time.time()
  3467. # log("%d start final check with length:%d"%(item["docid"],len(base_list)))
  3468. final_list = self.dumplicate_fianl_check(base_list,b_log)
  3469. exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),table_name)
  3470. # log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
  3471. best_docid = self.get_best_docid(final_list)
  3472. final_list_docid = [a["docid"] for a in final_list]
  3473. # log("%d:final_list_docid:%s"%(item["docid"],str(final_list_docid)))
  3474. _d = {"partitionkey":item["partitionkey"],
  3475. "docid":item["docid"],
  3476. "status":random.randint(*flow_dumplicate_status_to),
  3477. document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  3478. }
  3479. dtmp = Document_tmp(_d)
  3480. dup_docid = set()
  3481. for _dict in final_list:
  3482. dup_docid.add(_dict.get(document_tmp_docid))
  3483. if item.get(document_tmp_docid) in dup_docid:
  3484. dup_docid.remove(item.get(document_tmp_docid))
  3485. remove_list = []
  3486. if self.check_page_time(item) and (len(final_list)==0 or best_docid==item.get(document_tmp_docid)):
  3487. dtmp.setValue(document_tmp_save,1,True)
  3488. # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
  3489. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3490. for _dict in final_list:
  3491. if _dict.get(document_tmp_docid) in dup_docid:
  3492. remove_list.append(_dict)
  3493. else:
  3494. dtmp.setValue(document_tmp_save,0,True)
  3495. if best_docid in dup_docid:
  3496. dup_docid.remove(best_docid)
  3497. for _dict in final_list:
  3498. if _dict.get(document_tmp_docid) in dup_docid:
  3499. remove_list.append(_dict)
  3500. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3501. dmp_docid = "%d,%s"%(best_docid,dmp_docid)
  3502. else:
  3503. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3504. for _dict in final_list:
  3505. if _dict.get(document_tmp_docid) in dup_docid:
  3506. remove_list.append(_dict)
  3507. list_docids = list(dup_docid)
  3508. list_docids.append(best_docid)
  3509. if item.get(document_update_document)=="true":
  3510. dtmp.setValue(document_tmp_save,1,True)
  3511. list_merge_dump = []
  3512. if exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0:
  3513. log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
  3514. dtmp.setValue(document_tmp_projects,"[]",True)
  3515. else:
  3516. project_json,list_merge_dump = self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log)
  3517. dtmp.setValue(document_tmp_projects,project_json,True)
  3518. log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
  3519. if upgrade:
  3520. # print(dtmp.getProperties())
  3521. dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
  3522. dtmp.setValue(document_tmp_best_docid,best_docid,True)
  3523. _flag = dtmp.update_row(self.ots_client)
  3524. if not _flag:
  3525. for i in range(10):
  3526. list_proj_json = dtmp.getProperties().get(document_tmp_projects)
  3527. if list_proj_json is not None:
  3528. list_proj = json.loads(list_proj_json)
  3529. dtmp.setValue(document_tmp_projects,json.dumps(list_proj[:len(list_proj)//2]),True)
  3530. if dtmp.update_row(self.ots_client):
  3531. break
  3532. if table_name=="document_tmp":
  3533. self.changeSaveStatus(remove_list)
  3534. self.changeSaveStatus(list_merge_dump)
  3535. log("dumplicate end on:%s"%(str(item.get(document_tmp_docid))))
  3536. except Exception as e:
  3537. traceback.print_exc()
  3538. log("dumplicate error on:%s"%(str(item.get(document_tmp_docid))))
  3539. finally:
  3540. self.queue_dumplicate_processed.put(item.get(document_tmp_docid))
  3541. def fix_doc_which_not_in_project(self):
  3542. '''
  3543. 将成品公告中不存在于project2的数据取出,并放入document_tmp中重新进行去重和合并
  3544. :return:
  3545. '''
  3546. def fix_doc_handle(item,result_queue):
  3547. _docid = item.get(document_tmp_docid)
  3548. b_q = BoolQuery(must_queries=[TermQuery(project_docids,str(_docid))])
  3549. rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
  3550. SearchQuery(b_q,get_total_count=True),
  3551. ColumnsToGet(return_type=ColumnReturnType.NONE))
  3552. if total_count==0:
  3553. log("fix_doc:%s not in project2"%(str(_docid)))
  3554. d_tmp = Document_tmp(item)
  3555. d_tmp.setValue(document_tmp_status,flow_dumplicate_status_from[0],True)
  3556. d_tmp.update_row(self.ots_client)
  3557. if self.fix_doc_docid is None:
  3558. current_date = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  3559. before_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-5)
  3560. bool_query = BoolQuery(must_queries=[
  3561. TermQuery(document_tmp_save,1),
  3562. RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
  3563. RangeQuery(document_tmp_opertime,before_date)
  3564. ])
  3565. else:
  3566. bool_query = BoolQuery(must_queries=[
  3567. TermQuery(document_tmp_save,1),
  3568. RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
  3569. RangeQuery(document_tmp_docid,self.fix_doc_docid)
  3570. ])
  3571. list_data = []
  3572. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  3573. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),get_total_count=True,limit=100),
  3574. ColumnsToGet(return_type=ColumnReturnType.NONE))
  3575. list_d = getRow_ots(rows)
  3576. list_data.extend(list_d)
  3577. while next_token:
  3578. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  3579. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  3580. ColumnsToGet(return_type=ColumnReturnType.NONE))
  3581. list_d = getRow_ots(rows)
  3582. list_data.extend(list_d)
  3583. print("%d/%d"%(len(list_data),total_count))
  3584. if len(list_data)>0:
  3585. self.fix_doc_docid = list_data[-1].get(document_tmp_docid)
  3586. log("current fix_doc_docid:%s"%(str(self.fix_doc_docid)))
  3587. task_queue = Queue()
  3588. for _data in list_data:
  3589. task_queue.put(_data)
  3590. mt = MultiThreadHandler(task_queue,fix_doc_handle,None,30)
  3591. mt.run()
  3592. def start_flow_dumplicate(self):
  3593. schedule = BlockingScheduler()
  3594. schedule.add_job(self.flow_dumplicate,"cron",second="*/5")
  3595. schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/30")
  3596. schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
  3597. schedule.add_job(self.flow_remove,"cron",hour="20")
  3598. schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
  3599. # schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="55")
  3600. schedule.start()
  3601. def changeSaveStatus(self,list_dict):
  3602. for _dict in list_dict:
  3603. if isinstance(_dict,dict):
  3604. if _dict.get(document_tmp_save,1)==1:
  3605. _d = {"partitionkey":_dict["partitionkey"],
  3606. "docid":_dict["docid"],
  3607. document_tmp_save:0
  3608. }
  3609. _d_tmp = Document_tmp(_d)
  3610. if _d_tmp.exists_row(self.ots_client):
  3611. _d_tmp.update_row(self.ots_client)
  3612. elif isinstance(_dict,int):
  3613. _d = {"partitionkey":_dict%500+1,
  3614. "docid":_dict,
  3615. document_tmp_save:0
  3616. }
  3617. _d_tmp = Document_tmp(_d)
  3618. if _d_tmp.fix_columns(self.ots_client,["status"],True):
  3619. if _d_tmp.getProperties().get("status")==1:
  3620. _d_tmp.setValue("status",0,True)
  3621. _d_tmp.update_row(self.ots_client)
  3622. def test_dumplicate(self,docid):
  3623. # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
  3624. columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district]
  3625. bool_query = BoolQuery(must_queries=[
  3626. TermQuery("docid",docid)
  3627. ])
  3628. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  3629. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3630. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3631. log("flow_dumplicate producer total_count:%d"%total_count)
  3632. if total_count==0:
  3633. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  3634. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3635. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3636. list_dict = getRow_ots(rows)
  3637. for item in list_dict:
  3638. self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
  3639. return
  3640. def test_merge(self,list_docid_less,list_docid_greater):
  3641. list_docs_less = self.search_docs(list_docid_less)
  3642. list_projects_less = self.generate_projects_from_document(list_docs_less)
  3643. list_docs_greater = self.search_docs(list_docid_greater)
  3644. list_projects_greater = self.generate_projects_from_document(list_docs_greater)
  3645. list_projects_less.extend(list_projects_greater)
  3646. list_projects = dumplicate_projects(list_projects_less,b_log=True)
  3647. project_json = to_project_json(list_projects)
  3648. log("project_json:%s"%project_json)
  3649. return project_json
  3650. def getRemainDoc(self,docid):
  3651. columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
  3652. bool_query = BoolQuery(must_queries=[
  3653. TermQuery("docid",docid)
  3654. ])
  3655. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  3656. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3657. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3658. list_dict = getRow_ots(rows)
  3659. if len(list_dict)>0:
  3660. item = list_dict[0]
  3661. start_time = time.time()
  3662. self.post_extract(item)
  3663. base_list = []
  3664. set_docid = set()
  3665. list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,to_log=True)
  3666. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  3667. _i = 0
  3668. step = 5
  3669. item["confidence"] = 999
  3670. if item.get(document_tmp_docid) not in set_docid:
  3671. base_list.append(item)
  3672. set_docid.add(item.get(document_tmp_docid))
  3673. while _i<len(list_rules):
  3674. must_not_q = []
  3675. if len(base_list)>0:
  3676. must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
  3677. _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
  3678. must_not_queries=must_not_q)
  3679. _rule = list_rules[_i]
  3680. confidence = _rule["confidence"]
  3681. singleNum_keys = _rule["singleNum_keys"]
  3682. contain_keys = _rule["contain_keys"]
  3683. multiNum_keys = _rule["multiNum_keys"]
  3684. self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json])
  3685. _i += step
  3686. _time = time.time()
  3687. log("%d start final check with length:%d"%(item["docid"],len(base_list)))
  3688. final_list = self.dumplicate_fianl_check(base_list)
  3689. log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
  3690. best_docid = self.get_best_docid(final_list)
  3691. return best_docid
  3692. return None
  3693. if __name__ == '__main__':
  3694. # df = Dataflow()
  3695. # df.flow_init()
  3696. # df.flow_test()
  3697. # df.test_merge()
  3698. # df.start_flow_attachment()
  3699. # df.start_flow_extract()
  3700. # df.start_flow_dumplicate()
  3701. # # df.start_flow_merge()
  3702. # df.start_flow_remove()
  3703. # download_attachment()
  3704. # test_attachment_interface()
  3705. df_dump = Dataflow_dumplicate(start_delete_listener=False)
  3706. # df_dump.start_flow_dumplicate()
  3707. a = time.time()
  3708. df_dump.test_dumplicate(397282007
  3709. )
  3710. # df_dump.test_merge([385521167
  3711. # ],[385521113])
  3712. # df_dump.flow_remove_project_tmp()
  3713. print("takes",time.time()-a)
  3714. # df_dump.fix_doc_which_not_in_project()
  3715. # df_dump.delete_projects_by_document(16288036)
  3716. # log("=======")
  3717. # for i in range(3):
  3718. # time.sleep(20)
  3719. #
  3720. # a = {"docid":74295123}
  3721. # send_msg_toacmq(df_dump.pool_mq_ali,json.dumps(a),df_dump.doc_delete_queue)