dataflow.py 180 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926
  1. import sys,os
  2. # sys.path.append("/data")
  3. from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_ots_capacity,getConnect_activateMQ_ali
  4. from tablestore import *
  5. from BaseDataMaintenance.common.Utils import *
  6. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  7. from BaseDataMaintenance.common.multiProcess import MultiProcessHandler
  8. from queue import Queue
  9. from BaseDataMaintenance.model.ots.document_tmp import *
  10. from BaseDataMaintenance.model.ots.attachment import *
  11. from BaseDataMaintenance.model.ots.document_html import *
  12. from BaseDataMaintenance.model.ots.document_extract2 import *
  13. from BaseDataMaintenance.model.ots.project import *
  14. from BaseDataMaintenance.model.ots.document import *
  15. import base64
  16. from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
  17. from uuid import uuid4
  18. from BaseDataMaintenance.common.ossUtils import *
  19. from BaseDataMaintenance.dataSource.source import is_internal,getAuth
  20. from apscheduler.schedulers.blocking import BlockingScheduler
  21. from BaseDataMaintenance.maintenance.dataflow_settings import *
  22. from threading import Thread
  23. import oss2
  24. from BaseDataMaintenance.maintenance.documentDumplicate import *
  25. from BaseDataMaintenance.common.otsUtils import *
  26. from BaseDataMaintenance.common.activateMQUtils import *
  27. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  28. def getSet(list_dict,key):
  29. _set = set()
  30. for item in list_dict:
  31. if key in item:
  32. if item[key]!='' and item[key] is not None:
  33. if re.search("^\d[\d\.]*$",item[key]) is not None:
  34. _set.add(str(float(item[key])))
  35. else:
  36. _set.add(str(item[key]))
  37. return _set
  38. def getSimilarityOfString(str1,str2):
  39. _set1 = set()
  40. _set2 = set()
  41. if str1 is not None:
  42. for i in range(1,len(str1)):
  43. _set1.add(str1[i-1:i+1])
  44. if str2 is not None:
  45. for i in range(1,len(str2)):
  46. _set2.add(str2[i-1:i+1])
  47. _len = max(1,min(len(_set1),len(_set2)))
  48. return len(_set1&_set2)/_len
  49. def getDiffIndex(list_dict,key,confidence=100):
  50. _set = set()
  51. for _i in range(len(list_dict)):
  52. item = list_dict[_i]
  53. if item["confidence"]>=confidence:
  54. continue
  55. if key in item:
  56. if item[key]!='' and item[key] is not None:
  57. if re.search("^\d+(\.\d+)?$",item[key]) is not None:
  58. _set.add(str(float(item[key])))
  59. else:
  60. _set.add(str(item[key]))
  61. if len(_set)>1:
  62. return _i
  63. return len(list_dict)
  64. def transformSWF(bucket,attachment_hub_url,objectPath,localpath,swf_dir):
  65. swf_urls = []
  66. try:
  67. list_files = os.listdir(swf_dir)
  68. list_files.sort(key=lambda x:x)
  69. headers = dict()
  70. headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PUBLIC_READ
  71. for _file in list_files:
  72. swf_localpath = "%s/%s"%(swf_dir,_file)
  73. swf_objectPath = "%s/%s"%(objectPath.split(".")[0],_file)
  74. uploadFileByPath(bucket,swf_localpath,swf_objectPath,headers)
  75. _url = "%s/%s"%(attachment_hub_url,swf_objectPath)
  76. swf_urls.append(_url)
  77. os.remove(swf_localpath)
  78. except Exception as e:
  79. traceback.print_exc()
  80. return swf_urls
  81. class Dataflow():
  82. def __init__(self):
  83. self.ots_client = getConnect_ots()
  84. self.queue_init = Queue()
  85. self.queue_attachment = Queue()
  86. self.queue_attachment_ocr = Queue()
  87. self.queue_attachment_not_ocr = Queue()
  88. self.list_attachment_ocr = []
  89. self.list_attachment_not_ocr = []
  90. self.queue_extract = Queue()
  91. self.list_extract = []
  92. self.queue_dumplicate = Queue()
  93. self.queue_merge = Queue()
  94. self.queue_syncho = Queue()
  95. self.queue_remove = Queue()
  96. self.attachment_rec_interface = ""
  97. self.ots_client = getConnect_ots()
  98. if is_internal:
  99. self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  100. else:
  101. self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  102. if is_internal:
  103. self.extract_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
  104. self.industy_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
  105. self.other_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
  106. else:
  107. self.extract_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
  108. self.industy_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
  109. self.other_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
  110. self.header = {'Content-Type': 'application/json',"Authorization":"NzZmOWZlMmU2MGY3YmQ4MDBjM2E5MDAyZjhjNjQ0MzZlMmE0NTMwZg=="}
  111. self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
  112. self.auth = getAuth()
  113. oss2.defaults.connection_pool_size = 100
  114. oss2.defaults.multiget_num_threads = 20
  115. log("bucket_url:%s"%(self.bucket_url))
  116. self.attachment_bucket_name = "attachment-hub"
  117. self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
  118. self.current_path = os.path.dirname(__file__)
  119. def flow_init(self):
  120. def producer():
  121. bool_query = BoolQuery(must_queries=[RangeQuery("crtime",'2022-04-20')])
  122. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  123. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  124. ColumnsToGet(return_type=ColumnReturnType.ALL))
  125. log("flow_init producer total_count:%d"%total_count)
  126. list_dict = getRow_ots(rows)
  127. for _dict in list_dict:
  128. self.queue_init.put(_dict)
  129. _count = len(list_dict)
  130. while next_token:
  131. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  132. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  133. ColumnsToGet(return_type=ColumnReturnType.ALL))
  134. list_dict = getRow_ots(rows)
  135. for _dict in list_dict:
  136. self.queue_init.put(_dict)
  137. _count += len(list_dict)
  138. def comsumer():
  139. mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
  140. mt.run()
  141. def comsumer_handle(item,result_queue,ots_client):
  142. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  143. if document_tmp_dochtmlcon in item:
  144. item.pop(document_tmp_dochtmlcon)
  145. if document_tmp_doctextcon in item:
  146. item.pop(document_tmp_doctextcon)
  147. if document_tmp_attachmenttextcon in item:
  148. item.pop(document_tmp_attachmenttextcon)
  149. _status = item.get(document_tmp_status)
  150. new_status = None
  151. if _status>=201 and _status<=300:
  152. item[document_tmp_save] = 1
  153. new_status = 81
  154. elif _status>=401 and _status<=450:
  155. item[document_tmp_save] = 0
  156. new_status = 81
  157. else:
  158. new_status = 1
  159. # new_status = 1
  160. item[document_tmp_status] = new_status
  161. dtmp = Document_tmp(item)
  162. dhtml = Document_html({document_tmp_partitionkey:item.get(document_tmp_partitionkey),
  163. document_tmp_docid:item.get(document_tmp_docid),
  164. document_tmp_dochtmlcon:_dochtmlcon})
  165. dtmp.update_row(ots_client)
  166. dhtml.update_row(ots_client)
  167. producer()
  168. comsumer()
  169. def getTitleFromHtml(self,filemd5,_html):
  170. _soup = BeautifulSoup(_html,"lxml")
  171. _find = _soup.find("a",attrs={"data":filemd5})
  172. _title = ""
  173. if _find is not None:
  174. _title = _find.get_text()
  175. return _title
  176. def getSourceLinkFromHtml(self,filemd5,_html):
  177. _soup = BeautifulSoup(_html,"lxml")
  178. _find = _soup.find("a",attrs={"filelink":filemd5})
  179. filelink = ""
  180. if _find is None:
  181. _find = _soup.find("img",attrs={"filelink":filemd5})
  182. if _find is not None:
  183. filelink = _find.attrs.get("src","")
  184. else:
  185. filelink = _find.attrs.get("href","")
  186. return filelink
  187. def request_attachment_interface(self,attach,_dochtmlcon):
  188. filemd5 = attach.getProperties().get(attachment_filemd5)
  189. _status = attach.getProperties().get(attachment_status)
  190. _filetype = attach.getProperties().get(attachment_filetype)
  191. _size = attach.getProperties().get(attachment_size)
  192. _path = attach.getProperties().get(attachment_path)
  193. _uuid = uuid4()
  194. objectPath = attach.getProperties().get(attachment_path)
  195. localpath = os.path.join(self.current_path,"download",_uuid.hex)
  196. docids = attach.getProperties().get(attachment_docids)
  197. try:
  198. if _size>ATTACHMENT_LARGESIZE:
  199. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE)
  200. log("attachment :%s of path:%s to large"%(filemd5,_path))
  201. attach.update_row(self.ots_client)
  202. return True
  203. else:
  204. d_start_time = time.time()
  205. if downloadFile(self.bucket,objectPath,localpath):
  206. time_download = time.time()-d_start_time
  207. _data_base64 = base64.b64encode(open(localpath,"rb").read())
  208. #调用接口处理结果
  209. start_time = time.time()
  210. _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  211. if _success:
  212. log("process filemd5:%s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
  213. else:
  214. log("attach interface failed of docid:%s filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  215. sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  216. _html = ""
  217. return False
  218. swf_images = eval(swf_images)
  219. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  220. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  221. if len(swf_urls)==0:
  222. objectPath = attach.getProperties().get(attachment_path,"")
  223. localpath = os.path.join(self.current_path,"download/%s.swf"%(uuid4().hex))
  224. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  225. if not os.path.exists(swf_dir):
  226. os.mkdir(swf_dir)
  227. for _i in range(len(swf_images)):
  228. _base = swf_images[_i]
  229. _base = base64.b64decode(_base)
  230. filename = "swf_page_%d.png"%(_i)
  231. filepath = os.path.join(swf_dir,filename)
  232. with open(filepath,"wb") as f:
  233. f.write(_base)
  234. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  235. if os.path.exists(swf_dir):
  236. os.rmdir(swf_dir)
  237. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  238. if re.search("<td",_html) is not None:
  239. attach.setValue(attachment_has_table,1,True)
  240. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  241. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  242. if _file_title!="":
  243. attach.setValue(attachment_file_title,_file_title,True)
  244. if filelink!="":
  245. attach.setValue(attachment_file_link,filelink,True)
  246. attach.setValue(attachment_attachmenthtml,_html,True)
  247. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  248. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  249. attach.setValue(attachment_recsize,len(_html),True)
  250. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  251. attach.update_row(self.ots_client) #线上再开放更新
  252. return True
  253. else:
  254. return False
  255. except oss2.exceptions.NotFound as e:
  256. return True
  257. except Exception as e:
  258. traceback.print_exc()
  259. finally:
  260. try:
  261. os.remove(localpath)
  262. except:
  263. pass
  264. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  265. list_html = []
  266. swf_urls = []
  267. for _attach in list_attach:
  268. #测试全跑
  269. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  270. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  271. if _html is None:
  272. _html = ""
  273. list_html.append(_html)
  274. else:
  275. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  276. if not _succeed:
  277. return False,"",[]
  278. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  279. if _html is None:
  280. _html = ""
  281. list_html.append(_html)
  282. if _attach.getProperties().get(attachment_filetype)=="swf":
  283. swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  284. return True,list_html,swf_urls
  285. def generate_dumplicate_query(self,_dict,_dict_must_not,set_match=set(["project_code","project_codes","product"]),set_nested=set(["win_tenderer","bidding_budget","win_bid_price"]),
  286. set_term=set(["project_name","doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save","docid"]),
  287. set_range=set(["page_time","status"]),set_phrase=set(["doctitle"])):
  288. list_must_queries = []
  289. list_must_no_queries = []
  290. for k,v in _dict.items():
  291. if k in set_match:
  292. if isinstance(v,str):
  293. l_s = []
  294. for s_v in v.split(","):
  295. l_s.append(MatchQuery(k,s_v))
  296. list_must_queries.append(BoolQuery(should_queries=l_s))
  297. elif k in set_nested:
  298. _v = v
  299. if k!="" and k=="bidding_budget" or k=="win_bid_price":
  300. _v = float(_v)
  301. list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  302. elif k in set_term:
  303. list_must_queries.append(TermQuery(k,v))
  304. elif k in set_phrase:
  305. list_must_queries.append(MatchPhraseQuery(k,v))
  306. elif k in set_range:
  307. if len(v)==1:
  308. list_must_queries.append(RangeQuery(k,v[0]))
  309. elif len(v)==2:
  310. list_must_queries.append(RangeQuery(k,v[0],v[1],True,True))
  311. for k,v in _dict_must_not.items():
  312. if k in set_match:
  313. if isinstance(v,str):
  314. l_s = []
  315. for s_v in v.split(","):
  316. l_s.append(MatchQuery(k,s_v))
  317. list_must_no_queries.append(BoolQuery(should_queries=l_s))
  318. elif k in set_nested:
  319. _v = v
  320. if k!="" and k=="bidding_budget" or k=="win_bid_price":
  321. _v = float(_v)
  322. list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  323. elif k in set_term:
  324. list_must_no_queries.append(TermQuery(k,v))
  325. elif k in set_range:
  326. if len(v)==1:
  327. list_must_no_queries.append(RangeQuery(k,v[0]))
  328. elif len(v)==2:
  329. list_must_no_queries.append(RangeQuery(k,v[0],v[1],True,True))
  330. return BoolQuery(must_queries=list_must_queries,must_not_queries=list_must_no_queries)
  331. def f_decode_sub_docs_json(self, project_code,project_name,tenderee,agency,sub_docs_json):
  332. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  333. extract_count = 0
  334. if project_code is not None and project_code!="":
  335. extract_count += 1
  336. if project_name is not None and project_name!="":
  337. extract_count += 1
  338. if tenderee is not None and tenderee!="":
  339. extract_count += 1
  340. if agency is not None and agency!="":
  341. extract_count += 1
  342. if sub_docs_json is not None:
  343. sub_docs = json.loads(sub_docs_json)
  344. sub_docs.sort(key=lambda x:x.get("bidding_budget",0),reverse=True)
  345. sub_docs.sort(key=lambda x:x.get("win_bid_price",0),reverse=True)
  346. # log("==%s"%(str(sub_docs)))
  347. for sub_docs in sub_docs:
  348. for _key_sub_docs in sub_docs.keys():
  349. extract_count += 1
  350. if _key_sub_docs in columns:
  351. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  352. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  353. if float(sub_docs[_key_sub_docs])>0:
  354. columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
  355. else:
  356. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  357. return columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count
  358. def post_extract(self,_dict):
  359. win_tenderer,bidding_budget,win_bid_price,extract_count = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
  360. _dict["win_tenderer"] = win_tenderer
  361. _dict["bidding_budget"] = bidding_budget
  362. _dict["win_bid_price"] = win_bid_price
  363. if "extract_count" not in _dict:
  364. _dict["extract_count"] = extract_count
  365. def get_dump_columns(self,_dict):
  366. docchannel = _dict.get(document_tmp_docchannel,0)
  367. project_code = _dict.get(document_tmp_project_code,"")
  368. project_name = _dict.get(document_tmp_project_name,"")
  369. tenderee = _dict.get(document_tmp_tenderee,"")
  370. agency = _dict.get(document_tmp_agency,"")
  371. doctitle_refine = _dict.get(document_tmp_doctitle_refine,"")
  372. win_tenderer = _dict.get("win_tenderer","")
  373. bidding_budget = _dict.get("bidding_budget","")
  374. if bidding_budget==0:
  375. bidding_budget = ""
  376. win_bid_price = _dict.get("win_bid_price","")
  377. if win_bid_price==0:
  378. win_bid_price = ""
  379. page_time = _dict.get(document_tmp_page_time,"")
  380. fingerprint = _dict.get(document_tmp_fingerprint,"")
  381. product = _dict.get(document_tmp_product,"")
  382. return docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product
  383. def f_set_docid_limitNum_contain(self,item, _split,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"]):
  384. flag = True
  385. for _key in singleNum_keys:
  386. if len(getSet(_split,_key))>1:
  387. flag = False
  388. break
  389. for _key in multiNum_keys:
  390. if len(getSet(_split,_key))<=1:
  391. flag = False
  392. break
  393. project_code = item.get("project_code","")
  394. for _key in notlike_keys:
  395. if not flag:
  396. break
  397. for _d in _split:
  398. _key_v = _d.get(_key,"")
  399. _sim = getSimilarityOfString(project_code,_key_v)
  400. if _sim>0.7 and _sim<1:
  401. flag = False
  402. break
  403. #判断组内每条公告是否包含
  404. if flag:
  405. if len(contain_keys)>0:
  406. for _key in contain_keys:
  407. MAX_CONTAIN_COLUMN = None
  408. for _d in _split:
  409. contain_column = _d.get(_key)
  410. if contain_column is not None and contain_column !="":
  411. if MAX_CONTAIN_COLUMN is None:
  412. MAX_CONTAIN_COLUMN = contain_column
  413. else:
  414. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  415. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  416. flag = False
  417. break
  418. MAX_CONTAIN_COLUMN = contain_column
  419. else:
  420. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  421. flag = False
  422. break
  423. if flag:
  424. return _split
  425. return []
  426. def search_data_by_query(self,item,_query,confidence,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  427. list_data = []
  428. if isinstance(_query,list):
  429. bool_query = BoolQuery(should_queries=_query)
  430. else:
  431. bool_query = _query
  432. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  433. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=50,get_total_count=True),
  434. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  435. list_dict = getRow_ots(rows)
  436. for _dict in list_dict:
  437. self.post_extract(_dict)
  438. _dict["confidence"] = confidence
  439. list_data.append(_dict)
  440. # _count = len(list_dict)
  441. # while next_token:
  442. # rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  443. # SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  444. # ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  445. # list_dict = getRow_ots(rows)
  446. # for _dict in list_dict:
  447. # self.post_extract(_dict)
  448. # _dict["confidence"] = confidence
  449. # list_data.append(_dict)
  450. list_dict = self.f_set_docid_limitNum_contain(item,list_dict,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys)
  451. return list_dict
  452. def add_data_by_query(self,item,base_list,set_docid,_query,confidence,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  453. list_dict = self.search_data_by_query(item,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
  454. for _dict in list_dict:
  455. self.post_extract(_dict)
  456. _docid = _dict.get(document_tmp_docid)
  457. if _docid not in set_docid:
  458. base_list.append(_dict)
  459. set_docid.add(_docid)
  460. def translate_dumplicate_rules(self,status_from,item):
  461. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  462. if page_time=='':
  463. page_time = getCurrent_date("%Y-%m-%d")
  464. base_dict = {
  465. "status":[status_from[0]],
  466. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  467. }
  468. must_not_dict = {"save":0}
  469. list_rules = []
  470. singleNum_keys = ["tenderee","win_tenderer"]
  471. if fingerprint!="":
  472. _dict = {}
  473. confidence = 100
  474. _dict[document_tmp_fingerprint] = fingerprint
  475. _dict.update(base_dict)
  476. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  477. _rule = {"confidence":confidence,
  478. "item":item,
  479. "query":_query,
  480. "singleNum_keys":[],
  481. "contain_keys":[],
  482. "multiNum_keys":[]}
  483. list_rules.append(_rule)
  484. if docchannel in (52,118):
  485. if bidding_budget!="" and tenderee!="" and project_code!="":
  486. confidence = 90
  487. _dict = {document_tmp_docchannel:docchannel,
  488. "bidding_budget":item.get("bidding_budget"),
  489. document_tmp_tenderee:item.get(document_tmp_tenderee,""),
  490. document_tmp_project_code:item.get(document_tmp_project_code,"")
  491. }
  492. _dict.update(base_dict)
  493. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  494. _rule = {"confidence":confidence,
  495. "query":_query,
  496. "singleNum_keys":singleNum_keys,
  497. "contain_keys":[],
  498. "multiNum_keys":[document_tmp_web_source_no]}
  499. list_rules.append(_rule)
  500. if doctitle_refine!="" and tenderee!="" and bidding_budget!="":
  501. confidence = 80
  502. _dict = {document_tmp_docchannel:docchannel,
  503. "doctitle_refine":doctitle_refine,
  504. "tenderee":tenderee,
  505. bidding_budget:"bidding_budget"
  506. }
  507. _dict.update(base_dict)
  508. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  509. _rule = {"confidence":confidence,
  510. "query":_query,
  511. "singleNum_keys":singleNum_keys,
  512. "contain_keys":[],
  513. "multiNum_keys":[document_tmp_web_source_no]}
  514. list_rules.append(_rule)
  515. if project_code!="" and doctitle_refine!="" and agency!="" and bidding_budget!="":
  516. confidence = 90
  517. _dict = {document_tmp_docchannel:docchannel,
  518. "project_code":project_code,
  519. "doctitle_refine":doctitle_refine,
  520. "agency":agency,
  521. "bidding_budget":bidding_budget
  522. }
  523. _dict.update(base_dict)
  524. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  525. _rule = {"confidence":confidence,
  526. "query":_query,
  527. "singleNum_keys":singleNum_keys,
  528. "contain_keys":[],
  529. "multiNum_keys":[document_tmp_web_source_no]}
  530. list_rules.append(_rule)
  531. if project_code!="" and tenderee!="" and bidding_budget!="":
  532. confidence = 91
  533. _dict = {document_tmp_docchannel:docchannel,
  534. "project_code":project_code,
  535. "tenderee":tenderee,
  536. "bidding_budget":bidding_budget
  537. }
  538. _dict.update(base_dict)
  539. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  540. _rule = {"confidence":confidence,
  541. "query":_query,
  542. "singleNum_keys":singleNum_keys,
  543. "contain_keys":[],
  544. "multiNum_keys":[document_tmp_web_source_no]}
  545. list_rules.append(_rule)
  546. if doctitle_refine!="" and agency!="" and bidding_budget!="":
  547. confidence = 71
  548. _dict = {document_tmp_docchannel:docchannel,
  549. "doctitle_refine":doctitle_refine,
  550. "agency":agency,
  551. "bidding_budget":bidding_budget
  552. }
  553. _dict.update(base_dict)
  554. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  555. _rule = {"confidence":confidence,
  556. "query":_query,
  557. "singleNum_keys":singleNum_keys,
  558. "contain_keys":[],
  559. "multiNum_keys":[document_tmp_web_source_no]}
  560. list_rules.append(_rule)
  561. if project_code!="" and project_name!="" and agency!="" and bidding_budget!="":
  562. confidence = 91
  563. _dict = {document_tmp_docchannel:docchannel,
  564. "project_code":project_code,
  565. "project_name":project_name,
  566. "agency":agency,
  567. "bidding_budget":bidding_budget
  568. }
  569. _dict.update(base_dict)
  570. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  571. n_singleKeys = [i for i in singleNum_keys]
  572. n_singleKeys.append(document_tmp_web_source_no)
  573. _rule = {"confidence":confidence,
  574. "query":_query,
  575. "singleNum_keys":n_singleKeys,
  576. "contain_keys":[],
  577. "multiNum_keys":[]}
  578. list_rules.append(_rule)
  579. ##-- 5. 招标公告 - 同项目编号- 同[项目名称、标题] - 同[招标人、代理公司] - 同预算(!=0) - 同信息源=1
  580. if project_code!="" and project_name!="" and tenderee!="" and bidding_budget!="":
  581. confidence = 91
  582. _dict = {document_tmp_docchannel:docchannel,
  583. "project_code":project_code,
  584. "project_name":project_name,
  585. "tenderee":tenderee,
  586. "bidding_budget":bidding_budget
  587. }
  588. _dict.update(base_dict)
  589. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  590. n_singleKeys = [i for i in singleNum_keys]
  591. n_singleKeys.append(document_tmp_web_source_no)
  592. _rule = {"confidence":confidence,
  593. "query":_query,
  594. "singleNum_keys":n_singleKeys,
  595. "contain_keys":[],
  596. "multiNum_keys":[]}
  597. list_rules.append(_rule)
  598. if project_code!="" and doctitle_refine!="" and tenderee!="" and bidding_budget!="":
  599. confidence = 71
  600. _dict = {document_tmp_docchannel:docchannel,
  601. "project_code":project_code,
  602. "doctitle_refine":doctitle_refine,
  603. "tenderee":tenderee,
  604. "bidding_budget":bidding_budget
  605. }
  606. _dict.update(base_dict)
  607. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  608. _rule = {"confidence":confidence,
  609. "query":_query,
  610. "singleNum_keys":singleNum_keys,
  611. "contain_keys":[],
  612. "multiNum_keys":[document_tmp_web_source_no]}
  613. list_rules.append(_rule)
  614. #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
  615. if project_name!="" and agency!="":
  616. tmp_bidding = 0
  617. if bidding_budget!="":
  618. tmp_bidding = bidding_budget
  619. confidence = 51
  620. _dict = {document_tmp_docchannel:docchannel,
  621. "project_name":project_name,
  622. "agency":agency,
  623. "bidding_budget":tmp_bidding
  624. }
  625. _dict.update(base_dict)
  626. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  627. _rule = {"confidence":confidence,
  628. "query":_query,
  629. "singleNum_keys":singleNum_keys,
  630. "contain_keys":[],
  631. "multiNum_keys":[document_tmp_web_source_no]}
  632. list_rules.append(_rule)
  633. #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
  634. if project_code!="" and agency!="":
  635. tmp_bidding = 0
  636. if bidding_budget!="":
  637. tmp_bidding = bidding_budget
  638. confidence = 51
  639. _dict = {document_tmp_docchannel:docchannel,
  640. "project_code":project_code,
  641. "agency":agency,
  642. "bidding_budget":tmp_bidding
  643. }
  644. _dict.update(base_dict)
  645. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  646. _rule = {"confidence":confidence,
  647. "query":_query,
  648. "singleNum_keys":singleNum_keys,
  649. "contain_keys":[],
  650. "multiNum_keys":[document_tmp_web_source_no]}
  651. list_rules.append(_rule)
  652. if docchannel not in (101,119,120):
  653. #-- 7. 非中标公告 - 同项目名称 - 同发布日期 - 同招标人 - 同预算 - 同类型 - 信息源>1 - 同项目编号
  654. if project_name!="" and tenderee!="" and project_code!="":
  655. tmp_bidding = 0
  656. if bidding_budget!="":
  657. tmp_bidding = bidding_budget
  658. confidence = 51
  659. _dict = {document_tmp_docchannel:docchannel,
  660. "project_name":project_name,
  661. "tenderee":tenderee,
  662. "project_code":project_code
  663. }
  664. _dict.update(base_dict)
  665. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  666. _rule = {"confidence":confidence,
  667. "query":_query,
  668. "singleNum_keys":singleNum_keys,
  669. "contain_keys":[],
  670. "multiNum_keys":[document_tmp_web_source_no]}
  671. list_rules.append(_rule)
  672. if docchannel in (101,119,120):
  673. #-- 3. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(==0)
  674. if project_code!="" and project_name!="" and win_tenderer!="":
  675. tmp_win = 0
  676. if win_bid_price!="":
  677. tmp_win = win_bid_price
  678. confidence = 61
  679. _dict = {document_tmp_docchannel:docchannel,
  680. "project_code":project_code,
  681. "project_name":project_name,
  682. "win_tenderer":win_tenderer,
  683. "win_bid_price":tmp_win
  684. }
  685. _dict.update(base_dict)
  686. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  687. _rule = {"confidence":confidence,
  688. "query":_query,
  689. "singleNum_keys":singleNum_keys,
  690. "contain_keys":[],
  691. "multiNum_keys":[]}
  692. list_rules.append(_rule)
  693. if project_code!="" and project_name!="" and bidding_budget!="" and product!="":
  694. confidence = 72
  695. _dict = {document_tmp_docchannel:docchannel,
  696. "project_code":project_code,
  697. "project_name":project_name,
  698. "bidding_budget":bidding_budget,
  699. "product":product
  700. }
  701. _dict.update(base_dict)
  702. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  703. n_singleKeys = [i for i in singleNum_keys]
  704. n_singleKeys.append(document_tmp_web_source_no)
  705. _rule = {"confidence":confidence,
  706. "query":_query,
  707. "singleNum_keys":n_singleKeys,
  708. "contain_keys":[],
  709. "multiNum_keys":[]}
  710. list_rules.append(_rule)
  711. if project_code!='' and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  712. confidence = 91
  713. _dict = {document_tmp_docchannel:docchannel,
  714. "project_code":project_code,
  715. "doctitle_refine":doctitle_refine,
  716. "win_tenderer":win_tenderer,
  717. "win_bid_price":win_bid_price
  718. }
  719. _dict.update(base_dict)
  720. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  721. n_singleKeys = [i for i in singleNum_keys]
  722. n_singleKeys.append(document_tmp_web_source_no)
  723. _rule = {"confidence":confidence,
  724. "query":_query,
  725. "singleNum_keys":n_singleKeys,
  726. "contain_keys":[],
  727. "multiNum_keys":[]}
  728. list_rules.append(_rule)
  729. ##-- 2. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(!=0) - 同信息源=1
  730. if project_code!="" and project_name!="" and win_tenderer!="" and win_bid_price!="":
  731. confidence = 91
  732. _dict = {document_tmp_docchannel:docchannel,
  733. "project_code":project_code,
  734. "project_name":project_name,
  735. "win_tenderer":win_tenderer,
  736. "win_bid_price":win_bid_price
  737. }
  738. _dict.update(base_dict)
  739. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  740. n_singleKeys = [i for i in singleNum_keys]
  741. n_singleKeys.append(document_tmp_web_source_no)
  742. _rule = {"confidence":confidence,
  743. "query":_query,
  744. "singleNum_keys":n_singleKeys,
  745. "contain_keys":[],
  746. "multiNum_keys":[]}
  747. list_rules.append(_rule)
  748. if project_name!="" and win_tenderer!="" and win_bid_price!="":
  749. confidence = 91
  750. _dict = {document_tmp_docchannel:docchannel,
  751. "project_name":project_name,
  752. "win_tenderer":win_tenderer,
  753. "win_bid_price":win_bid_price,
  754. }
  755. _dict.update(base_dict)
  756. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  757. _rule = {"confidence":confidence,
  758. "query":_query,
  759. "singleNum_keys":singleNum_keys,
  760. "contain_keys":[],
  761. "multiNum_keys":[document_tmp_web_source_no]}
  762. list_rules.append(_rule)
  763. if project_code!="" and win_tenderer!="" and win_bid_price!="":
  764. confidence = 91
  765. _dict = {document_tmp_docchannel:docchannel,
  766. "project_code":project_code,
  767. "win_tenderer":win_tenderer,
  768. "win_bid_price":win_bid_price,
  769. }
  770. _dict.update(base_dict)
  771. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  772. _rule = {"confidence":confidence,
  773. "query":_query,
  774. "singleNum_keys":singleNum_keys,
  775. "contain_keys":[],
  776. "multiNum_keys":[document_tmp_web_source_no]}
  777. list_rules.append(_rule)
  778. if project_code!="" and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  779. confidence = 91
  780. _dict = {document_tmp_docchannel:docchannel,
  781. "project_code":project_code,
  782. "doctitle_refine":doctitle_refine,
  783. "win_tenderer":win_tenderer,
  784. "win_bid_price":win_bid_price
  785. }
  786. _dict.update(base_dict)
  787. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  788. n_singleKeys = [i for i in singleNum_keys]
  789. n_singleKeys.append(document_tmp_web_source_no)
  790. _rule = {"confidence":confidence,
  791. "query":_query,
  792. "singleNum_keys":n_singleKeys,
  793. "contain_keys":[],
  794. "multiNum_keys":[]}
  795. list_rules.append(_rule)
  796. if doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  797. confidence=90
  798. _dict = {document_tmp_docchannel:docchannel,
  799. "doctitle_refine":doctitle_refine,
  800. "win_tenderer":win_tenderer,
  801. "win_bid_price":win_bid_price
  802. }
  803. _dict.update(base_dict)
  804. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  805. _rule = {"confidence":confidence,
  806. "query":_query,
  807. "singleNum_keys":singleNum_keys,
  808. "contain_keys":[],
  809. "multiNum_keys":[document_tmp_web_source_no]}
  810. list_rules.append(_rule)
  811. if project_name!="" and win_tenderer!="" and win_bid_price!="" and project_code!="":
  812. confidence=95
  813. _dict = {document_tmp_docchannel:docchannel,
  814. "project_name":project_name,
  815. "win_tenderer":win_tenderer,
  816. "win_bid_price":win_bid_price,
  817. "project_code":project_code
  818. }
  819. _dict.update(base_dict)
  820. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  821. _rule = {"confidence":confidence,
  822. "query":_query,
  823. "singleNum_keys":singleNum_keys,
  824. "contain_keys":[],
  825. "multiNum_keys":[document_tmp_web_source_no]}
  826. list_rules.append(_rule)
  827. if docchannel in (51,103,115,116):
  828. #9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
  829. if doctitle_refine!="" and tenderee!="":
  830. tmp_budget = 0
  831. if bidding_budget!="":
  832. tmp_budget = bidding_budget
  833. confidence=81
  834. _dict = {document_tmp_docchannel:docchannel,
  835. "doctitle_refine":doctitle_refine,
  836. "tenderee":tenderee,
  837. "bidding_budget":tmp_budget,
  838. }
  839. _dict.update(base_dict)
  840. _dict["page_time"] = [page_time,page_time]
  841. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  842. _rule = {"confidence":confidence,
  843. "query":_query,
  844. "singleNum_keys":singleNum_keys,
  845. "contain_keys":[],
  846. "multiNum_keys":[document_tmp_web_source_no]}
  847. list_rules.append(_rule)
  848. #-- 9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
  849. if project_code!="" and tenderee!="":
  850. confidence=81
  851. tmp_budget = 0
  852. if bidding_budget!="":
  853. tmp_budget = bidding_budget
  854. _dict = {document_tmp_docchannel:docchannel,
  855. "project_code":project_code,
  856. "tenderee":tenderee,
  857. "bidding_budget":tmp_budget,
  858. }
  859. _dict.update(base_dict)
  860. _dict["page_time"] = [page_time,page_time]
  861. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  862. _rule = {"confidence":confidence,
  863. "query":_query,
  864. "singleNum_keys":singleNum_keys,
  865. "contain_keys":[],
  866. "multiNum_keys":[document_tmp_web_source_no]}
  867. list_rules.append(_rule)
  868. if project_name!="" and tenderee!="":
  869. confidence=81
  870. tmp_budget = 0
  871. if bidding_budget!="":
  872. tmp_budget = bidding_budget
  873. _dict = {document_tmp_docchannel:docchannel,
  874. "project_name":project_name,
  875. "tenderee":tenderee,
  876. "bidding_budget":tmp_budget,
  877. }
  878. _dict.update(base_dict)
  879. _dict["page_time"] = [page_time,page_time]
  880. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  881. _rule = {"confidence":confidence,
  882. "query":_query,
  883. "singleNum_keys":singleNum_keys,
  884. "contain_keys":[],
  885. "multiNum_keys":[document_tmp_web_source_no]}
  886. list_rules.append(_rule)
  887. if agency!="" and tenderee!="":
  888. confidence=81
  889. tmp_budget = 0
  890. if bidding_budget!="":
  891. tmp_budget = bidding_budget
  892. _dict = {document_tmp_docchannel:docchannel,
  893. "agency":agency,
  894. "tenderee":tenderee,
  895. "bidding_budget":tmp_budget,
  896. "product":product
  897. }
  898. _dict.update(base_dict)
  899. _dict["page_time"] = [page_time,page_time]
  900. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  901. _rule = {"confidence":confidence,
  902. "query":_query,
  903. "singleNum_keys":singleNum_keys,
  904. "contain_keys":[],
  905. "multiNum_keys":[document_tmp_web_source_no]}
  906. list_rules.append(_rule)
  907. if agency!="" and project_code!="":
  908. confidence=81
  909. tmp_budget = 0
  910. if bidding_budget!="":
  911. tmp_budget = bidding_budget
  912. _dict = {document_tmp_docchannel:docchannel,
  913. "agency":agency,
  914. "project_code":project_code,
  915. "bidding_budget":tmp_budget,
  916. "product":product
  917. }
  918. _dict.update(base_dict)
  919. _dict["page_time"] = [page_time,page_time]
  920. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  921. _rule = {"confidence":confidence,
  922. "query":_query,
  923. "singleNum_keys":singleNum_keys,
  924. "contain_keys":[],
  925. "multiNum_keys":[document_tmp_web_source_no]}
  926. list_rules.append(_rule)
  927. if agency!="" and project_name!="":
  928. confidence=81
  929. tmp_budget = 0
  930. if bidding_budget!="":
  931. tmp_budget = bidding_budget
  932. _dict = {document_tmp_docchannel:docchannel,
  933. "agency":agency,
  934. "project_name":project_name,
  935. "bidding_budget":tmp_budget,
  936. "product":product
  937. }
  938. _dict.update(base_dict)
  939. _dict["page_time"] = [page_time,page_time]
  940. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  941. _rule = {"confidence":confidence,
  942. "query":_query,
  943. "singleNum_keys":singleNum_keys,
  944. "contain_keys":[],
  945. "multiNum_keys":[document_tmp_web_source_no]}
  946. list_rules.append(_rule)
  947. #五选二
  948. if tenderee!="" and bidding_budget!="" and product!="":
  949. confidence=80
  950. _dict = {document_tmp_docchannel:docchannel,
  951. "tenderee":tenderee,
  952. "bidding_budget":bidding_budget,
  953. "product":product,
  954. }
  955. _dict.update(base_dict)
  956. _dict["page_time"] = [page_time,page_time]
  957. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  958. _rule = {"confidence":confidence,
  959. "query":_query,
  960. "singleNum_keys":singleNum_keys,
  961. "contain_keys":[],
  962. "multiNum_keys":[]}
  963. list_rules.append(_rule)
  964. if tenderee!="" and win_tenderer!="" and product!="":
  965. confidence=80
  966. _dict = {document_tmp_docchannel:docchannel,
  967. "tenderee":tenderee,
  968. "win_tenderer":win_tenderer,
  969. "product":product,
  970. }
  971. _dict.update(base_dict)
  972. _dict["page_time"] = [page_time,page_time]
  973. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  974. _rule = {"confidence":confidence,
  975. "query":_query,
  976. "singleNum_keys":singleNum_keys,
  977. "contain_keys":[],
  978. "multiNum_keys":[]}
  979. list_rules.append(_rule)
  980. if tenderee!="" and win_bid_price!="":
  981. confidence=80
  982. _dict = {document_tmp_docchannel:docchannel,
  983. "tenderee":tenderee,
  984. "win_bid_price":win_bid_price,
  985. "product":product,
  986. }
  987. _dict.update(base_dict)
  988. _dict["page_time"] = [page_time,page_time]
  989. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  990. _rule = {"confidence":confidence,
  991. "query":_query,
  992. "singleNum_keys":singleNum_keys,
  993. "contain_keys":[],
  994. "multiNum_keys":[]}
  995. list_rules.append(_rule)
  996. if tenderee!="" and agency!="":
  997. confidence=80
  998. _dict = {document_tmp_docchannel:docchannel,
  999. "tenderee":tenderee,
  1000. "agency":agency,
  1001. "product":product,
  1002. }
  1003. _dict.update(base_dict)
  1004. _dict["page_time"] = [page_time,page_time]
  1005. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1006. _rule = {"confidence":confidence,
  1007. "query":_query,
  1008. "singleNum_keys":singleNum_keys,
  1009. "contain_keys":[],
  1010. "multiNum_keys":[]}
  1011. list_rules.append(_rule)
  1012. if win_tenderer!="" and bidding_budget!="":
  1013. confidence=80
  1014. _dict = {document_tmp_docchannel:docchannel,
  1015. "win_tenderer":win_tenderer,
  1016. "bidding_budget":bidding_budget,
  1017. "product":product,
  1018. }
  1019. _dict.update(base_dict)
  1020. _dict["page_time"] = [page_time,page_time]
  1021. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1022. _rule = {"confidence":confidence,
  1023. "query":_query,
  1024. "singleNum_keys":singleNum_keys,
  1025. "contain_keys":[],
  1026. "multiNum_keys":[]}
  1027. list_rules.append(_rule)
  1028. if win_bid_price!="" and bidding_budget!="":
  1029. confidence=80
  1030. _dict = {document_tmp_docchannel:docchannel,
  1031. "win_bid_price":win_bid_price,
  1032. "bidding_budget":bidding_budget,
  1033. "product":product,
  1034. }
  1035. _dict.update(base_dict)
  1036. _dict["page_time"] = [page_time,page_time]
  1037. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1038. _rule = {"confidence":confidence,
  1039. "query":_query,
  1040. "singleNum_keys":singleNum_keys,
  1041. "contain_keys":[],
  1042. "multiNum_keys":[]}
  1043. list_rules.append(_rule)
  1044. if agency!="" and bidding_budget!="":
  1045. confidence=80
  1046. _dict = {document_tmp_docchannel:docchannel,
  1047. "agency":agency,
  1048. "bidding_budget":bidding_budget,
  1049. "product":product,
  1050. }
  1051. _dict.update(base_dict)
  1052. _dict["page_time"] = [page_time,page_time]
  1053. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1054. _rule = {"confidence":confidence,
  1055. "query":_query,
  1056. "singleNum_keys":singleNum_keys,
  1057. "contain_keys":[],
  1058. "multiNum_keys":[]}
  1059. list_rules.append(_rule)
  1060. if win_tenderer!="" and win_bid_price!="":
  1061. confidence=80
  1062. _dict = {document_tmp_docchannel:docchannel,
  1063. "win_tenderer":win_tenderer,
  1064. "win_bid_price":win_bid_price,
  1065. "product":product,
  1066. }
  1067. _dict.update(base_dict)
  1068. _dict["page_time"] = [page_time,page_time]
  1069. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1070. _rule = {"confidence":confidence,
  1071. "query":_query,
  1072. "singleNum_keys":singleNum_keys,
  1073. "contain_keys":[],
  1074. "multiNum_keys":[]}
  1075. list_rules.append(_rule)
  1076. if win_tenderer!="" and agency!="":
  1077. confidence=80
  1078. _dict = {document_tmp_docchannel:docchannel,
  1079. "win_tenderer":win_tenderer,
  1080. "agency":agency,
  1081. "product":product,
  1082. }
  1083. _dict.update(base_dict)
  1084. _dict["page_time"] = [page_time,page_time]
  1085. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1086. _rule = {"confidence":confidence,
  1087. "query":_query,
  1088. "singleNum_keys":singleNum_keys,
  1089. "contain_keys":[],
  1090. "multiNum_keys":[]}
  1091. list_rules.append(_rule)
  1092. if doctitle_refine!="" and product!="" and len(doctitle_refine)>7:
  1093. confidence=80
  1094. _dict = {document_tmp_docchannel:docchannel,
  1095. "doctitle_refine":doctitle_refine,
  1096. "product":product,
  1097. }
  1098. _dict.update(base_dict)
  1099. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1100. _rule = {"confidence":confidence,
  1101. "query":_query,
  1102. "singleNum_keys":singleNum_keys,
  1103. "contain_keys":[],
  1104. "multiNum_keys":[]}
  1105. list_rules.append(_rule)
  1106. return list_rules
  1107. def dumplicate_fianl_check(self,base_list):
  1108. the_group = base_list
  1109. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  1110. if len(the_group)>10:
  1111. keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget","doctitle_refine"]
  1112. else:
  1113. keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget"]
  1114. #置信度
  1115. list_key_index = []
  1116. for _k in keys:
  1117. if _k=="doctitle":
  1118. list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
  1119. else:
  1120. list_key_index.append(getDiffIndex(the_group,_k))
  1121. _index = min(list_key_index)
  1122. if _index>1:
  1123. return the_group[:_index]
  1124. return []
  1125. def get_best_docid(self,base_list):
  1126. if len(base_list)>0:
  1127. base_list.sort(key=lambda x:x["docid"])
  1128. base_list.sort(key=lambda x:x["extract_count"],reverse=True)
  1129. return base_list[0]["docid"]
  1130. def save_dumplicate(self,base_list,best_docid,status_from,status_to):
  1131. #best_docid need check while others can save directly
  1132. list_dict = []
  1133. for item in base_list:
  1134. docid = item["docid"]
  1135. _dict = {"partitionkey":item["partitionkey"],
  1136. "docid":item["docid"]}
  1137. if docid==best_docid:
  1138. if item.get("save",1)!=0:
  1139. _dict["save"] = 1
  1140. else:
  1141. _dict["save"] = 0
  1142. if item.get("status")>=status_from[0] and item.get("status")<=status_from[1]:
  1143. _dict["status"] = random.randint(status_to[0],status_to[1])
  1144. list_dict.append(_dict)
  1145. for _dict in list_dict:
  1146. dtmp = Document_tmp(_dict)
  1147. dtmp.update_row(self.ots_client)
  1148. def flow_test(self,status_to=[1,10]):
  1149. def producer():
  1150. bool_query = BoolQuery(must_queries=[
  1151. # ExistsQuery("docid"),
  1152. # RangeQuery("crtime",range_to='2022-04-10'),
  1153. # RangeQuery("status",61),
  1154. NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
  1155. ],
  1156. must_not_queries=[
  1157. # NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
  1158. TermQuery("attachment_extract_status",1),
  1159. RangeQuery("status",1,11)
  1160. ]
  1161. )
  1162. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1163. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1164. ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1165. log("flow_init producer total_count:%d"%total_count)
  1166. list_dict = getRow_ots(rows)
  1167. for _dict in list_dict:
  1168. self.queue_init.put(_dict)
  1169. _count = len(list_dict)
  1170. while next_token and _count<1000000:
  1171. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1172. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1173. ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1174. list_dict = getRow_ots(rows)
  1175. for _dict in list_dict:
  1176. self.queue_init.put(_dict)
  1177. _count += len(list_dict)
  1178. print("%d/%d"%(_count,total_count))
  1179. def comsumer():
  1180. mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
  1181. mt.run()
  1182. def comsumer_handle(item,result_queue,ots_client):
  1183. # print(item)
  1184. dtmp = Document_tmp(item)
  1185. dtmp.setValue(document_tmp_status,random.randint(*status_to),True)
  1186. dtmp.update_row(ots_client)
  1187. # dhtml = Document_html(item)
  1188. # dhtml.update_row(ots_client)
  1189. # dtmp.delete_row(ots_client)
  1190. # dhtml.delete_row(ots_client)
  1191. producer()
  1192. comsumer()
  1193. def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
  1194. def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1195. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
  1196. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1197. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1198. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1199. log("flow_dumplicate producer total_count:%d"%total_count)
  1200. list_dict = getRow_ots(rows)
  1201. for _dict in list_dict:
  1202. self.queue_dumplicate.put(_dict)
  1203. _count = len(list_dict)
  1204. while next_token and _count<flow_process_count:
  1205. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1206. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1207. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1208. list_dict = getRow_ots(rows)
  1209. for _dict in list_dict:
  1210. self.queue_dumplicate.put(_dict)
  1211. _count += len(list_dict)
  1212. def comsumer():
  1213. mt = MultiThreadHandler(self.queue_dumplicate,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1214. mt.run()
  1215. def comsumer_handle(item,result_queue,ots_client):
  1216. self.post_extract(item)
  1217. base_list = []
  1218. set_docid = set()
  1219. list_rules = self.translate_dumplicate_rules(flow_dumplicate_status_from,item)
  1220. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  1221. # print(item,"len_rules",len(list_rules))
  1222. for _rule in list_rules:
  1223. _query = _rule["query"]
  1224. confidence = _rule["confidence"]
  1225. singleNum_keys = _rule["singleNum_keys"]
  1226. contain_keys = _rule["contain_keys"]
  1227. multiNum_keys = _rule["multiNum_keys"]
  1228. self.add_data_by_query(item,base_list,set_docid,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys)
  1229. item["confidence"] = 999
  1230. if item.get(document_tmp_docid) not in set_docid:
  1231. base_list.append(item)
  1232. final_list = self.dumplicate_fianl_check(base_list)
  1233. best_docid = self.get_best_docid(final_list)
  1234. # log(str(final_list))
  1235. _d = {"partitionkey":item["partitionkey"],
  1236. "docid":item["docid"],
  1237. "status":random.randint(*flow_dumplicate_status_to),
  1238. document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  1239. }
  1240. dtmp = Document_tmp(_d)
  1241. dup_docid = set()
  1242. for _dict in final_list:
  1243. dup_docid.add(_dict.get(document_tmp_docid))
  1244. if item.get(document_tmp_docid) in dup_docid:
  1245. dup_docid.remove(item.get(document_tmp_docid))
  1246. if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
  1247. dtmp.setValue(document_tmp_save,1,True)
  1248. dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
  1249. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1250. else:
  1251. dtmp.setValue(document_tmp_save,0,True)
  1252. if best_docid in dup_docid:
  1253. dup_docid.remove(best_docid)
  1254. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1255. dmp_docid = "%d,%s"%(best_docid,dmp_docid)
  1256. else:
  1257. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1258. dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
  1259. dtmp.update_row(self.ots_client)
  1260. #只保留当前公告
  1261. # self.save_dumplicate(final_list,best_docid,status_from,status_to)
  1262. #
  1263. # print("=base=",item)
  1264. # if len(final_list)>=1:
  1265. # print("==================")
  1266. # for _dict in final_list:
  1267. # print(_dict)
  1268. # print("========>>>>>>>>>>")
  1269. producer()
  1270. comsumer()
  1271. def merge_document(self,item,status_to=None):
  1272. self.post_extract(item)
  1273. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  1274. _d = {"partitionkey":item["partitionkey"],
  1275. "docid":item["docid"],
  1276. }
  1277. dtmp = Document_tmp(_d)
  1278. if item.get(document_tmp_save,1)==1:
  1279. list_should_q = []
  1280. if project_code!="" and tenderee!="":
  1281. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1282. TermQuery("tenderee",tenderee)])
  1283. list_should_q.append(_q)
  1284. if project_name!="" and project_code!="":
  1285. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1286. TermQuery("project_name",project_name)])
  1287. list_should_q.append(_q)
  1288. if len(list_should_q)>0:
  1289. list_data = self.search_data_by_query(item,list_should_q,100,merge=True,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
  1290. if len(list_data)==1:
  1291. dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
  1292. print(item["docid"],list_data[0]["uuid"])
  1293. else:
  1294. list_should_q = []
  1295. if bidding_budget!="" and project_code!="":
  1296. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1297. TermQuery("bidding_budget",float(bidding_budget))])
  1298. list_should_q.append(_q)
  1299. if tenderee!="" and bidding_budget!="" and project_name!="":
  1300. _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
  1301. TermQuery("bidding_budget",float(bidding_budget)),
  1302. TermQuery("project_name",project_name)])
  1303. list_should_q.append(_q)
  1304. if tenderee!="" and win_bid_price!="" and project_name!="":
  1305. _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
  1306. TermQuery("win_bid_price",float(win_bid_price)),
  1307. TermQuery("project_name",project_name)])
  1308. list_should_q.append(_q)
  1309. if len(list_should_q)>0:
  1310. list_data = self.search_data_by_query(item,list_should_q,100,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
  1311. if len(list_data)==1:
  1312. dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
  1313. print(item["docid"],list_data[0]["uuid"])
  1314. return dtmp.getProperties().get("merge_uuid","")
  1315. # dtmp.update_row(self.ots_client)
  1316. def test_merge(self):
  1317. import pandas as pd
  1318. import queue
  1319. def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1320. list_test_item = []
  1321. should_q = BoolQuery(should_queries=[
  1322. TermQuery("docchannel",101),
  1323. TermQuery("docchannel",119),
  1324. TermQuery("docchannel",120)
  1325. ])
  1326. bool_query = BoolQuery(must_queries=[
  1327. TermQuery("page_time","2022-04-22"),
  1328. should_q,
  1329. TermQuery("save",1)
  1330. ])
  1331. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1332. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1333. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1334. log("flow_dumplicate producer total_count:%d"%total_count)
  1335. list_dict = getRow_ots(rows)
  1336. for _dict in list_dict:
  1337. list_test_item.append(_dict)
  1338. _count = len(list_dict)
  1339. while next_token:
  1340. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1341. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1342. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1343. list_dict = getRow_ots(rows)
  1344. for _dict in list_dict:
  1345. list_test_item.append(_dict)
  1346. _count += len(list_dict)
  1347. print("%d/%d"%(_count,total_count))
  1348. return list_test_item
  1349. from BaseDataMaintenance.model.ots.project import Project
  1350. def comsumer_handle(item,result_queue,ots_client):
  1351. item["merge_uuid"] = self.merge_document(item)
  1352. if item["merge_uuid"]!="":
  1353. _dict = {"uuid":item["merge_uuid"]}
  1354. _p = Project(_dict)
  1355. _p.fix_columns(self.ots_client,["zhao_biao_page_time"],True)
  1356. if _p.getProperties().get("zhao_biao_page_time","")!="":
  1357. item["是否有招标"] = "是"
  1358. list_test_item = producer()
  1359. task_queue = queue.Queue()
  1360. for item in list_test_item:
  1361. task_queue.put(item)
  1362. mt = MultiThreadHandler(task_queue,comsumer_handle,None,30,1,ots_client=self.ots_client)
  1363. mt.run()
  1364. keys = [document_tmp_docid,document_tmp_docchannel,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_doctitle_refine,"win_tenderer","bidding_budget","win_bid_price","merge_uuid","是否有招标"]
  1365. df_data = {}
  1366. for k in keys:
  1367. df_data[k] = []
  1368. for item in list_test_item:
  1369. for k in keys:
  1370. df_data[k].append(item.get(k,""))
  1371. df = pd.DataFrame(df_data)
  1372. df.to_excel("test_merge.xlsx",columns=keys)
  1373. def flow_merge(self,process_count=10000,status_from=[71,80],status_to=[81,90]):
  1374. def producer(columns=[document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1375. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
  1376. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1377. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1378. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1379. log("flow_merge producer total_count:%d"%total_count)
  1380. list_dict = getRow_ots(rows)
  1381. for _dict in list_dict:
  1382. self.queue_merge.put(_dict)
  1383. _count = len(list_dict)
  1384. while next_token and _count<process_count:
  1385. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1386. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1387. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1388. list_dict = getRow_ots(rows)
  1389. for _dict in list_dict:
  1390. self.queue_merge.put(_dict)
  1391. _count += len(list_dict)
  1392. def comsumer():
  1393. mt = MultiThreadHandler(self.queue_merge,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1394. mt.run()
  1395. def comsumer_handle(item,result_queue,ots_client):
  1396. self.merge_document(item,status_to)
  1397. # producer()
  1398. # comsumer()
  1399. pass
  1400. def flow_syncho(self,status_from=[71,80],status_to=[81,90]):
  1401. pass
  1402. def flow_remove(self,process_count=flow_process_count,status_from=flow_remove_status_from):
  1403. def producer():
  1404. current_date = getCurrent_date("%Y-%m-%d")
  1405. tmp_date = timeAdd(current_date,-4)
  1406. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True),
  1407. RangeQuery(document_tmp_crtime,range_to="%s 00:00:00"%(tmp_date))])
  1408. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1409. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1410. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1411. log("flow_remove producer total_count:%d"%total_count)
  1412. list_dict = getRow_ots(rows)
  1413. for _dict in list_dict:
  1414. self.queue_remove.put(_dict)
  1415. _count = len(list_dict)
  1416. while next_token:
  1417. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1418. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1419. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1420. list_dict = getRow_ots(rows)
  1421. for _dict in list_dict:
  1422. self.queue_remove.put(_dict)
  1423. _count += len(list_dict)
  1424. def comsumer():
  1425. mt = MultiThreadHandler(self.queue_remove,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1426. mt.run()
  1427. def comsumer_handle(item,result_queue,ots_client):
  1428. dtmp = Document_tmp(item)
  1429. dtmp.delete_row(self.ots_client)
  1430. dhtml = Document_html(item)
  1431. dhtml.delete_row(self.ots_client)
  1432. producer()
  1433. comsumer()
  1434. def start_flow_dumplicate(self):
  1435. schedule = BlockingScheduler()
  1436. schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
  1437. schedule.start()
  1438. def start_flow_merge(self):
  1439. schedule = BlockingScheduler()
  1440. schedule.add_job(self.flow_merge,"cron",second="*/10")
  1441. schedule.start()
  1442. def start_flow_remove(self):
  1443. schedule = BlockingScheduler()
  1444. schedule.add_job(self.flow_remove,"cron",hour="20")
  1445. schedule.start()
  1446. def download_attachment():
  1447. ots_client = getConnect_ots()
  1448. queue_attachment = Queue()
  1449. auth = getAuth()
  1450. oss2.defaults.connection_pool_size = 100
  1451. oss2.defaults.multiget_num_threads = 20
  1452. attachment_bucket_name = "attachment-hub"
  1453. if is_internal:
  1454. bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  1455. else:
  1456. bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  1457. bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name)
  1458. current_path = os.path.dirname(__file__)
  1459. def producer():
  1460. columns = [document_tmp_attachment_path]
  1461. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_crtime,"2022-03-29 15:00:00","2022-03-29 17:00:00",True,True)])
  1462. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1463. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
  1464. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1465. log("flow_attachment producer total_count:%d"%total_count)
  1466. list_dict = getRow_ots(rows)
  1467. for _dict in list_dict:
  1468. queue_attachment.put(_dict)
  1469. _count = len(list_dict)
  1470. while next_token:
  1471. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1472. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1473. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1474. list_dict = getRow_ots(rows)
  1475. for _dict in list_dict:
  1476. queue_attachment.put(_dict)
  1477. _count += len(list_dict)
  1478. def comsumer():
  1479. mt = MultiThreadHandler(queue_attachment,comsumer_handle,None,10,1)
  1480. mt.run()
  1481. def getAttachments(list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
  1482. list_attachment = []
  1483. rows_to_get = []
  1484. for _md5 in list_filemd5[:50]:
  1485. if _md5 is None:
  1486. continue
  1487. primary_key = [(attachment_filemd5,_md5)]
  1488. rows_to_get.append(primary_key)
  1489. req = BatchGetRowRequest()
  1490. req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
  1491. try:
  1492. result = ots_client.batch_get_row(req)
  1493. attach_result = result.get_result_by_table(attachment_table_name)
  1494. for item in attach_result:
  1495. if item.is_ok:
  1496. _dict = getRow_ots_primary(item.row)
  1497. if _dict is not None:
  1498. list_attachment.append(attachment(_dict))
  1499. except Exception as e:
  1500. log(str(list_filemd5))
  1501. log("attachProcess comsumer error %s"%str(e))
  1502. return list_attachment
  1503. def comsumer_handle(item,result_queue):
  1504. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  1505. if len(page_attachments)==0:
  1506. pass
  1507. else:
  1508. list_fileMd5 = []
  1509. for _atta in page_attachments:
  1510. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  1511. list_attach = getAttachments(list_fileMd5)
  1512. for attach in list_attach:
  1513. filemd5 = attach.getProperties().get(attachment_filemd5)
  1514. _status = attach.getProperties().get(attachment_status)
  1515. _filetype = attach.getProperties().get(attachment_filetype)
  1516. _size = attach.getProperties().get(attachment_size)
  1517. _path = attach.getProperties().get(attachment_path)
  1518. _uuid = uuid4()
  1519. objectPath = attach.getProperties().get(attachment_path)
  1520. localpath = os.path.join(current_path,"download","%s.%s"%(filemd5,_filetype))
  1521. try:
  1522. if _size>ATTACHMENT_LARGESIZE:
  1523. pass
  1524. else:
  1525. downloadFile(bucket,objectPath,localpath)
  1526. except Exception as e:
  1527. traceback.print_exc()
  1528. producer()
  1529. comsumer()
  1530. def test_attachment_interface():
  1531. current_path = os.path.dirname(__file__)
  1532. task_queue = Queue()
  1533. def producer():
  1534. _count = 0
  1535. list_filename = os.listdir(os.path.join(current_path,"download"))
  1536. for _filename in list_filename:
  1537. _count += 1
  1538. _type = _filename.split(".")[1]
  1539. task_queue.put({"path":os.path.join(current_path,"download",_filename),"file_type":_type})
  1540. if _count>=500:
  1541. break
  1542. def comsumer():
  1543. mt = MultiThreadHandler(task_queue,comsumer_handle,None,10)
  1544. mt.run()
  1545. def comsumer_handle(item,result_queue):
  1546. _path = item.get("path")
  1547. _type = item.get("file_type")
  1548. _data_base64 = base64.b64encode(open(_path,"rb").read())
  1549. #调用接口处理结果
  1550. start_time = time.time()
  1551. _success,_html,swf_images = getAttachDealInterface(_data_base64,_type)
  1552. log("%s result:%s takes:%d"%(_path,str(_success),time.time()-start_time))
  1553. producer()
  1554. comsumer()
  1555. class Dataflow_attachment(Dataflow):
  1556. def __init__(self):
  1557. Dataflow.__init__(self)
  1558. def flow_attachment_process(self):
  1559. self.process_comsumer()
  1560. def process_comsumer(self):
  1561. list_thread = []
  1562. thread_count = 60
  1563. for i in range(thread_count):
  1564. list_thread.append(Thread(target=self.process_comsumer_handle))
  1565. for t in list_thread:
  1566. t.start()
  1567. for t in list_thread:
  1568. t.join()
  1569. def process_comsumer_handle(self):
  1570. while 1:
  1571. _flag = False
  1572. try:
  1573. item = self.queue_attachment_ocr.get(True,timeout=0.2)
  1574. self.attachment_recognize(item,None)
  1575. except Exception as e:
  1576. _flag = True
  1577. pass
  1578. try:
  1579. item = self.queue_attachment_not_ocr.get(True,timeout=0.2)
  1580. self.attachment_recognize(item,None)
  1581. except Exception as e:
  1582. _flag = True and _flag
  1583. pass
  1584. if _flag:
  1585. time.sleep(2)
  1586. def attachment_recognize(self,_dict,result_queue):
  1587. item = _dict.get("item")
  1588. list_attach = _dict.get("list_attach")
  1589. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1590. "docid":item.get("docid")})
  1591. dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
  1592. _dochtmlcon = dhtml.getProperties().get("dochtmlcon","")
  1593. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  1594. log(str(swf_urls))
  1595. if not _succeed:
  1596. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  1597. else:
  1598. dhtml.updateSWFImages(swf_urls)
  1599. dhtml.updateAttachment(list_html)
  1600. dhtml.update_row(self.ots_client)
  1601. item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
  1602. item[document_tmp_attachment_extract_status] = 1
  1603. log("document:%d get attachments with result:%s"%(item.get("docid"),str(_succeed)))
  1604. dtmp = Document_tmp(item)
  1605. dtmp.update_row(self.ots_client)
  1606. def flow_attachment(self):
  1607. self.flow_attachment_producer()
  1608. self.flow_attachment_producer_comsumer()
  1609. def getAttachments(self,list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
  1610. list_attachment = []
  1611. rows_to_get = []
  1612. for _md5 in list_filemd5[:50]:
  1613. if _md5 is None:
  1614. continue
  1615. primary_key = [(attachment_filemd5,_md5)]
  1616. rows_to_get.append(primary_key)
  1617. req = BatchGetRowRequest()
  1618. req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
  1619. try:
  1620. result = self.ots_client.batch_get_row(req)
  1621. attach_result = result.get_result_by_table(attachment_table_name)
  1622. for item in attach_result:
  1623. if item.is_ok:
  1624. _dict = getRow_ots_primary(item.row)
  1625. if _dict is not None:
  1626. list_attachment.append(attachment(_dict))
  1627. except Exception as e:
  1628. log(str(list_filemd5))
  1629. log("attachProcess comsumer error %s"%str(e))
  1630. return list_attachment
  1631. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  1632. qsize_ocr = self.queue_attachment_ocr.qsize()
  1633. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  1634. log("queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(qsize_ocr,qsize_not_ocr))
  1635. #选择加入数据场景
  1636. if min(qsize_ocr,qsize_not_ocr)>200 or max(qsize_ocr,qsize_not_ocr)>1000:
  1637. return
  1638. #去重
  1639. set_docid = set()
  1640. set_docid = set_docid | set(self.list_attachment_ocr) | set(self.list_attachment_not_ocr)
  1641. if qsize_ocr>0:
  1642. self.list_attachment_ocr = self.list_attachment_ocr[-qsize_ocr:]
  1643. else:
  1644. self.list_attachment_ocr = []
  1645. if qsize_not_ocr>0:
  1646. self.list_attachment_not_ocr = self.list_attachment_not_ocr[-qsize_not_ocr:]
  1647. else:
  1648. self.list_attachment_not_ocr = []
  1649. try:
  1650. bool_query = BoolQuery(must_queries=[
  1651. RangeQuery(document_tmp_status,*flow_attachment_status_from,True,True),
  1652. # TermQuery(document_tmp_docid,234925191),
  1653. ])
  1654. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1655. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
  1656. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1657. log("flow_attachment producer total_count:%d"%total_count)
  1658. list_dict = getRow_ots(rows)
  1659. _count = 0
  1660. for _dict in list_dict:
  1661. docid = _dict.get(document_tmp_docid)
  1662. if docid in set_docid:
  1663. continue
  1664. self.queue_attachment.put(_dict,True)
  1665. _count += 1
  1666. while next_token and _count<flow_process_count:
  1667. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1668. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1669. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1670. list_dict = getRow_ots(rows)
  1671. for _dict in list_dict:
  1672. docid = _dict.get(document_tmp_docid)
  1673. if docid in set_docid:
  1674. continue
  1675. self.queue_attachment.put(_dict,True)
  1676. _count += 1
  1677. log("add attachment count:%d"%(_count))
  1678. except Exception as e:
  1679. log("flow attachment producer error:%s"%(str(e)))
  1680. traceback.print_exc()
  1681. def flow_attachment_producer_comsumer(self):
  1682. log("start flow_attachment comsumer")
  1683. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1)
  1684. mt.run()
  1685. def set_queue(self,_dict):
  1686. list_attach = _dict.get("list_attach")
  1687. to_ocr = False
  1688. for attach in list_attach:
  1689. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  1690. to_ocr = True
  1691. break
  1692. if to_ocr:
  1693. self.queue_attachment_ocr.put(_dict,True)
  1694. # self.list_attachment_ocr.append(_dict.get("item").get(document_tmp_docid))
  1695. else:
  1696. self.queue_attachment_not_ocr.put(_dict,True)
  1697. # self.list_attachment_not_ocr.append(_dict.get("item").get(document_tmp_docid))
  1698. def comsumer_handle(self,item,result_queue):
  1699. try:
  1700. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  1701. if len(page_attachments)==0:
  1702. item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
  1703. dtmp = Document_tmp(item)
  1704. dtmp.update_row(self.ots_client)
  1705. else:
  1706. list_fileMd5 = []
  1707. for _atta in page_attachments:
  1708. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  1709. list_attach = self.getAttachments(list_fileMd5)
  1710. #未上传成功的2小时内不处理
  1711. if len(page_attachments)!=len(list_attach) and time.mktime(time.localtime())-time.mktime(time.strptime(item.get(document_tmp_crtime),"%Y-%m-%d %H:%M:%S"))<7200:
  1712. item[document_tmp_status] = 1
  1713. dtmp = Document_tmp(item)
  1714. dtmp.update_row(self.ots_client)
  1715. return
  1716. self.set_queue({"item":item,"list_attach":list_attach})
  1717. except Exception as e:
  1718. traceback.print_exc()
  1719. def start_flow_attachment(self):
  1720. schedule = BlockingScheduler()
  1721. schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  1722. schedule.add_job(self.flow_attachment,"cron",second="*/10")
  1723. schedule.start()
  1724. class Dataflow_extract(Dataflow):
  1725. def __init__(self):
  1726. Dataflow.__init__(self)
  1727. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  1728. q_size = self.queue_extract.qsize()
  1729. if q_size>100:
  1730. return
  1731. set_docid = set(self.list_extract)
  1732. if q_size>0:
  1733. self.list_extract = self.list_extract[-q_size:]
  1734. else:
  1735. self.list_extract = []
  1736. try:
  1737. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*flow_extract_status_from,True,True)])
  1738. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1739. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.ASC)]),limit=100,get_total_count=True),
  1740. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1741. log("flow_extract producer total_count:%d"%total_count)
  1742. list_dict = getRow_ots(rows)
  1743. for _dict in list_dict:
  1744. docid = _dict.get(document_tmp_docid)
  1745. if docid in set_docid:
  1746. self.list_extract.insert(0,docid)
  1747. continue
  1748. else:
  1749. self.queue_extract.put(_dict)
  1750. self.list_extract.append(docid)
  1751. _count = len(list_dict)
  1752. while next_token and _count<flow_process_count:
  1753. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1754. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1755. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1756. list_dict = getRow_ots(rows)
  1757. for _dict in list_dict:
  1758. docid = _dict.get(document_tmp_docid)
  1759. if docid in set_docid:
  1760. self.list_extract.insert(0,docid)
  1761. continue
  1762. else:
  1763. self.queue_extract.put(_dict)
  1764. self.list_extract.append(docid)
  1765. _count += len(list_dict)
  1766. except Exception as e:
  1767. log("flow extract producer error:%s"%(str(e)))
  1768. traceback.print_exc()
  1769. def flow_extract(self,):
  1770. self.comsumer()
  1771. def comsumer(self):
  1772. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,35,1,True)
  1773. mt.run()
  1774. def comsumer_handle(self,item,result_queue):
  1775. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1776. "docid":item.get("docid")})
  1777. dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
  1778. item[document_tmp_dochtmlcon] = dhtml.getProperties().get(document_tmp_dochtmlcon,"")
  1779. _extract = Document_extract({})
  1780. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  1781. _extract.setValue(document_extract2_docid,item.get(document_docid))
  1782. all_done = 1
  1783. if all_done:
  1784. data = item
  1785. resp = requests.post(self.other_url,json=data,headers=self.header)
  1786. if (resp.status_code >=200 and resp.status_code<=210):
  1787. _extract.setValue(document_extract2_other_json,resp.content.decode("utf8"),True)
  1788. else:
  1789. all_done = -1
  1790. data = {}
  1791. for k,v in item.items():
  1792. data[k] = v
  1793. data["timeout"] = 240
  1794. data["doc_id"] = data.get(document_tmp_docid)
  1795. data["content"] = data.get(document_tmp_dochtmlcon,"")
  1796. if document_tmp_dochtmlcon in data:
  1797. data.pop(document_tmp_dochtmlcon)
  1798. data["title"] = data.get(document_tmp_doctitle,"")
  1799. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  1800. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  1801. if all_done:
  1802. resp = requests.post(self.extract_url,json=data,headers=self.header)
  1803. if (resp.status_code >=200 and resp.status_code<=210):
  1804. _extract.setValue(document_extract2_extract_json,resp.content.decode("utf8"),True)
  1805. else:
  1806. all_done = -2
  1807. if all_done:
  1808. resp = requests.post(self.industy_url,json=data,headers=self.header)
  1809. if (resp.status_code >=200 and resp.status_code<=210):
  1810. _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  1811. else:
  1812. all_done = -3
  1813. _dict = {document_partitionkey:item.get(document_tmp_partitionkey),
  1814. document_docid:item.get(document_tmp_docid),
  1815. }
  1816. dtmp = Document_tmp(_dict)
  1817. if all_done!=1:
  1818. sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  1819. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_failed_to),True)
  1820. dtmp.update_row(self.ots_client)
  1821. else:
  1822. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1823. dtmp.update_row(self.ots_client)
  1824. # 插入接口表,上线放开
  1825. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1826. _extract.update_row(self.ots_client)
  1827. log("process docid:%d %s"%(data["doc_id"],str(all_done)))
  1828. def start_flow_extract(self):
  1829. schedule = BlockingScheduler()
  1830. schedule.add_job(self.flow_extract_producer,"cron",second="*/10")
  1831. schedule.add_job(self.flow_extract,"cron",second="*/10")
  1832. schedule.start()
  1833. class Dataflow_dumplicate(Dataflow):
  1834. class DeleteListener():
  1835. def __init__(self,conn,_func,*args,**kwargs):
  1836. self.conn = conn
  1837. self._func = _func
  1838. def on_error(self, headers):
  1839. log('received an error %s' % str(headers.body))
  1840. def on_message(self, headers):
  1841. try:
  1842. message_id = headers.headers["message-id"]
  1843. body = headers.body
  1844. log("get message %s"%(message_id))
  1845. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  1846. except Exception as e:
  1847. traceback.print_exc()
  1848. pass
  1849. def __del__(self):
  1850. self.conn.disconnect()
  1851. def __init__(self):
  1852. Dataflow.__init__(self)
  1853. self.c_f_get_extractCount = f_get_extractCount()
  1854. self.c_f_get_package = f_get_package()
  1855. logging.basicConfig(level = logging.info,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  1856. self.delete_comsumer_counts = 2
  1857. self.doc_delete_queue = "/queue/doc_delete_queue"
  1858. self.doc_delete_result = "/queue/doc_delete_result"
  1859. self.pool_mq_ali = ConnectorPool(1,10,getConnect_activateMQ_ali)
  1860. for _ in range(self.delete_comsumer_counts):
  1861. conn = getConnect_activateMQ_ali()
  1862. listener = self.DeleteListener(conn,self.delete_doc_handle)
  1863. createComsumer(listener,self.doc_delete_queue)
  1864. def get_dict_time(self,_extract,keys=["time_bidclose","time_bidopen","time_bidstart","time_commencement","time_completion","time_earnestMoneyEnd","time_earnestMoneyStart","time_getFileEnd","time_getFileStart","time_publicityEnd","time_publicityStart","time_registrationEnd","time_registrationStart","time_release"]):
  1865. dict_time = {}
  1866. for k in keys:
  1867. dict_time[k] = _extract.get(k)
  1868. return dict_time
  1869. def post_extract(self,_dict):
  1870. win_tenderer,bidding_budget,win_bid_price,_ = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
  1871. _dict["win_tenderer"] = win_tenderer
  1872. _dict["bidding_budget"] = bidding_budget
  1873. _dict["win_bid_price"] = win_bid_price
  1874. extract_json = _dict.get(document_tmp_extract_json,"{}")
  1875. _extract = json.loads(extract_json)
  1876. _dict["product"] = ",".join(_extract.get("product",[]))
  1877. _dict["fingerprint"] = _extract.get("fingerprint","")
  1878. _dict["project_codes"] = _extract.get("code",[])
  1879. if len(_dict["project_codes"])>0:
  1880. _dict["project_code"] = _dict["project_codes"][0]
  1881. else:
  1882. _dict["project_code"] = ""
  1883. _dict["doctitle_refine"] = _extract.get("doctitle_refine","")
  1884. _dict["nlp_enterprise"] = str({"indoctextcon":_extract.get("nlp_enterprise",[]),
  1885. "notindoctextcon":_extract.get("nlp_enterprise_attachment",[])})
  1886. _dict["extract_count"] = self.c_f_get_extractCount.evaluate(extract_json)
  1887. _dict["package"] = self.c_f_get_package.evaluate(extract_json)
  1888. _dict["project_name"] = _extract.get("name","")
  1889. _dict["dict_time"] = self.get_dict_time(_extract)
  1890. def dumplicate_fianl_check(self,base_list):
  1891. the_group = base_list
  1892. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  1893. _index = 0
  1894. base_fingerprint = "None"
  1895. if len(base_list)>0:
  1896. base_fingerprint = base_list[0]["fingerprint"]
  1897. for _i in range(1,len(base_list)):
  1898. _dict1 = base_list[_i]
  1899. fingerprint_less = _dict1["fingerprint"]
  1900. _pass = True
  1901. if fingerprint_less==base_fingerprint:
  1902. _index = _i
  1903. continue
  1904. for _j in range(min(_i,5)):
  1905. _dict2 = base_list[_j]
  1906. _prob = self.dumplicate_check(_dict1,_dict2,_dict2.get("min_counts",10),b_log=False)
  1907. # print("_prob:",_prob)
  1908. if _prob<=0.1:
  1909. _pass = False
  1910. break
  1911. log("checking index:%d"%(_i))
  1912. _index = _i
  1913. if not _pass:
  1914. _index -= 1
  1915. break
  1916. if _index>=1:
  1917. return the_group[:_index+1]
  1918. return []
  1919. def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
  1920. document_less = _dict1
  1921. docid_less = _dict1["docid"]
  1922. docchannel_less = document_less["docchannel"]
  1923. page_time_less = document_less["page_time"]
  1924. doctitle_refine_less = document_less["doctitle_refine"]
  1925. project_codes_less = document_less["project_codes"]
  1926. nlp_enterprise_less = document_less["nlp_enterprise"]
  1927. tenderee_less = document_less["tenderee"]
  1928. agency_less = document_less["agency"]
  1929. win_tenderer_less = document_less["win_tenderer"]
  1930. bidding_budget_less = document_less["bidding_budget"]
  1931. win_bid_price_less = document_less["win_bid_price"]
  1932. product_less = document_less["product"]
  1933. package_less = document_less["package"]
  1934. json_time_less = document_less["dict_time"]
  1935. project_name_less = document_less["project_name"]
  1936. fingerprint_less = document_less["fingerprint"]
  1937. extract_count_less = document_less["extract_count"]
  1938. document_greater = _dict2
  1939. docid_greater = _dict2["docid"]
  1940. page_time_greater = document_greater["page_time"]
  1941. doctitle_refine_greater = document_greater["doctitle_refine"]
  1942. project_codes_greater = document_greater["project_codes"]
  1943. nlp_enterprise_greater = document_greater["nlp_enterprise"]
  1944. tenderee_greater = document_greater["tenderee"]
  1945. agency_greater = document_greater["agency"]
  1946. win_tenderer_greater = document_greater["win_tenderer"]
  1947. bidding_budget_greater = document_greater["bidding_budget"]
  1948. win_bid_price_greater = document_greater["win_bid_price"]
  1949. product_greater = document_greater["product"]
  1950. package_greater = document_greater["package"]
  1951. json_time_greater = document_greater["dict_time"]
  1952. project_name_greater = document_greater["project_name"]
  1953. fingerprint_greater = document_greater["fingerprint"]
  1954. extract_count_greater = document_greater["extract_count"]
  1955. if fingerprint_less==fingerprint_greater:
  1956. return 1
  1957. same_count = 0
  1958. all_count = 8
  1959. if len(set(project_codes_less) & set(project_codes_greater))>0:
  1960. same_count += 1
  1961. if getLength(tenderee_less)>0 and tenderee_less==tenderee_greater:
  1962. same_count += 1
  1963. if getLength(agency_less)>0 and agency_less==agency_greater:
  1964. same_count += 1
  1965. if getLength(win_tenderer_less)>0 and win_tenderer_less==win_tenderer_greater:
  1966. same_count += 1
  1967. if getLength(bidding_budget_less)>0 and bidding_budget_less==bidding_budget_greater:
  1968. same_count += 1
  1969. if getLength(win_bid_price_less)>0 and win_bid_price_less==win_bid_price_greater:
  1970. same_count += 1
  1971. if getLength(project_name_less)>0 and project_name_less==project_name_greater:
  1972. same_count += 1
  1973. if getLength(doctitle_refine_less)>0 and doctitle_refine_less==doctitle_refine_greater:
  1974. same_count += 1
  1975. base_prob = 0
  1976. if min_counts<3:
  1977. base_prob = 0.9
  1978. elif min_counts<5:
  1979. base_prob = 0.8
  1980. elif min_counts<8:
  1981. base_prob = 0.7
  1982. else:
  1983. base_prob = 0.6
  1984. _prob = base_prob*same_count/all_count
  1985. if _prob<0.1 and min(extract_count_less,extract_count_greater)<=3:
  1986. _prob = 0.15
  1987. if _prob<0.1:
  1988. return _prob
  1989. check_result = {"pass":1}
  1990. if docchannel_less in (51,102,103,104,115,116,117):
  1991. if doctitle_refine_less!=doctitle_refine_greater:
  1992. if page_time_less!=page_time_greater:
  1993. check_result["docchannel"] = 0
  1994. check_result["pass"] = 0
  1995. else:
  1996. check_result["docchannel"] = 2
  1997. if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater):
  1998. check_result["doctitle"] = 0
  1999. check_result["pass"] = 0
  2000. if b_log:
  2001. logging.info("%d-%d,check_doctitle_failed:%s==%s"%(docid_less,docid_greater,str(doctitle_refine_less),str(doctitle_refine_greater)))
  2002. else:
  2003. check_result["doctitle"] = 2
  2004. #added check
  2005. if not check_codes(project_codes_less,project_codes_greater):
  2006. check_result["code"] = 0
  2007. check_result["pass"] = 0
  2008. if b_log:
  2009. logging.info("%d-%d,check_code_failed:%s==%s"%(docid_less,docid_greater,str(project_codes_less),str(project_codes_greater)))
  2010. else:
  2011. if getLength(project_codes_less)>0 and getLength(project_codes_greater)>0 and len(set(project_codes_less) & set(project_codes_greater))>0:
  2012. check_result["code"] = 2
  2013. else:
  2014. check_result["code"] = 1
  2015. if not check_product(product_less,product_greater):
  2016. check_result["product"] = 0
  2017. check_result["pass"] = 0
  2018. if b_log:
  2019. logging.info("%d-%d,check_product_failed:%s==%s"%(docid_less,docid_greater,str(product_less),str(product_greater)))
  2020. else:
  2021. if getLength(product_less)>0 and getLength(product_greater)>0:
  2022. check_result["product"] = 2
  2023. else:
  2024. check_result["product"] = 1
  2025. if not check_demand():
  2026. check_result["pass"] = 0
  2027. if not check_entity(nlp_enterprise_less,nlp_enterprise_greater,
  2028. tenderee_less,tenderee_greater,
  2029. agency_less,agency_greater,
  2030. win_tenderer_less,win_tenderer_greater):
  2031. check_result["entity"] = 0
  2032. check_result["pass"] = 0
  2033. if b_log:
  2034. logging.info("%d-%d,check_entity_failed:%s==%s==%s==%s==%s==%s==%s==%s"%(docid_less,docid_greater,str(nlp_enterprise_less),str(nlp_enterprise_greater),str(tenderee_less),str(tenderee_greater),str(agency_less),str(agency_greater),str(win_tenderer_less),str(win_tenderer_greater)))
  2035. else:
  2036. if docchannel_less in (51,52,103,105,114,118) and getLength(tenderee_less)>0 and getLength(tenderee_greater)>0:
  2037. check_result["entity"] = 2
  2038. elif docchannel_less in (101,119,120) and getLength(win_tenderer_less)>0 and getLength(win_tenderer_greater)>0:
  2039. check_result["entity"] = 2
  2040. else:
  2041. check_result["entity"] = 1
  2042. if not check_money(bidding_budget_less,bidding_budget_greater,
  2043. win_bid_price_less,win_bid_price_greater):
  2044. if b_log:
  2045. logging.info("%d-%d,check_money_failed:%s==%s==%s==%s"%(docid_less,docid_greater,str(bidding_budget_less),str(bidding_budget_greater),str(win_bid_price_less),str(win_bid_price_greater)))
  2046. check_result["money"] = 0
  2047. check_result["pass"] = 0
  2048. else:
  2049. if docchannel_less in (51,52,103,105,114,118) and getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0:
  2050. check_result["money"] = 2
  2051. elif docchannel_less in (101,119,120) and getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0:
  2052. check_result["money"] = 2
  2053. else:
  2054. check_result["money"] = 1
  2055. #added check
  2056. if not check_package(package_less,package_greater):
  2057. if b_log:
  2058. logging.info("%d-%d,check_package_failed:%s==%s"%(docid_less,docid_greater,str(package_less),str(package_greater)))
  2059. check_result["package"] = 0
  2060. check_result["pass"] = 0
  2061. else:
  2062. if getLength(package_less)>0 and getLength(package_greater)>0:
  2063. check_result["package"] = 2
  2064. else:
  2065. check_result["package"] = 1
  2066. #added check
  2067. if not check_time(json_time_less,json_time_greater):
  2068. if b_log:
  2069. logging.info("%d-%d,check_time_failed:%s==%s"%(docid_less,docid_greater,str(json_time_less),str(json_time_greater)))
  2070. if isinstance(json_time_less,dict):
  2071. time_less = json_time_less
  2072. else:
  2073. time_less = json.loads(json_time_less)
  2074. if isinstance(json_time_greater,dict):
  2075. time_greater = json_time_greater
  2076. else:
  2077. time_greater = json.loads(json_time_greater)
  2078. for k,v in time_less.items():
  2079. if getLength(v)>0:
  2080. v1 = time_greater.get(k,"")
  2081. if getLength(v1)>0:
  2082. if v!=v1:
  2083. log("%d-%d,key:%s"%(docid_less,docid_greater,str(k)))
  2084. check_result["time"] = 0
  2085. check_result["pass"] = 0
  2086. else:
  2087. if getLength(json_time_less)>10 and getLength(json_time_greater)>10:
  2088. check_result["time"] = 2
  2089. else:
  2090. check_result["time"] = 1
  2091. if check_result.get("pass",0)==0:
  2092. if b_log:
  2093. logging.info(str(check_result))
  2094. if check_result.get("money",1)==0:
  2095. return 0
  2096. if check_result.get("entity",1)==2 and check_result.get("code",1)==2 and check_result.get("doctitle",2)==2 and check_result.get("product",2)==2 and check_result.get("money",0)==2:
  2097. return _prob
  2098. else:
  2099. return 0
  2100. if check_result.get("time",1)==0:
  2101. return 0
  2102. return _prob
  2103. def search_data_by_query(self,item,_query,confidence,retry_times=3,merge=False,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  2104. for _ in range(retry_times):
  2105. try:
  2106. _time = time.time()
  2107. check_time = 0
  2108. if isinstance(_query,list):
  2109. bool_query = BoolQuery(should_queries=_query)
  2110. else:
  2111. bool_query = _query
  2112. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  2113. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=30,get_total_count=True),
  2114. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2115. list_dict = getRow_ots(rows)
  2116. list_data = []
  2117. for _dict in list_dict:
  2118. self.post_extract(_dict)
  2119. _docid = _dict.get(document_tmp_docid)
  2120. if merge:
  2121. list_data.append(_dict)
  2122. else:
  2123. if _docid!=item.get(document_tmp_docid):
  2124. _time1 = time.time()
  2125. confidence = self.dumplicate_check(item,_dict,total_count,b_log=False)
  2126. check_time+= time.time()-_time1
  2127. _dict["confidence"] = confidence
  2128. _dict["min_counts"] = total_count
  2129. list_data.append(_dict)
  2130. all_time = time.time()-_time
  2131. # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
  2132. return list_data
  2133. except Exception as e:
  2134. traceback.print_exc()
  2135. return []
  2136. def add_data_by_query(self,item,base_list,set_docid,_query,confidence,table_name,table_index,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  2137. list_dict = self.search_data_by_query(item,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
  2138. for _dict in list_dict:
  2139. _docid = _dict.get(document_tmp_docid)
  2140. confidence = _dict["confidence"]
  2141. if confidence>0.1:
  2142. if _docid not in set_docid:
  2143. base_list.append(_dict)
  2144. set_docid.add(_docid)
  2145. set_docid.add(_docid)
  2146. def appendRule(self,list_rules,_dict,base_dict,must_not_dict,confidence,item,to_log=True):
  2147. for k,v in _dict.items():
  2148. if getLength(v)==0:
  2149. return
  2150. _dict.update(base_dict)
  2151. if to_log:
  2152. log(str(_dict))
  2153. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  2154. _rule = {"confidence":confidence,
  2155. "item":item,
  2156. "query":_query,
  2157. "singleNum_keys":[],
  2158. "contain_keys":[],
  2159. "multiNum_keys":[]}
  2160. list_rules.append(_rule)
  2161. def translate_dumplicate_rules(self,status_from,item,get_all=False,to_log=False):
  2162. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  2163. current_date = getCurrent_date("%Y-%m-%d")
  2164. if page_time=='':
  2165. page_time = current_date
  2166. if page_time>=timeAdd(current_date,-2):
  2167. table_name = "document_tmp"
  2168. table_index = "document_tmp_index"
  2169. base_dict = {
  2170. "docchannel":item["docchannel"],
  2171. "status":[status_from[0]],
  2172. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  2173. }
  2174. must_not_dict = {"save":0,"docid":item.get("docid")}
  2175. doctitle_refine_name = "doctitle_refine"
  2176. else:
  2177. table_name = "document"
  2178. table_index = "document_index"
  2179. if get_all:
  2180. _status = [201,450]
  2181. else:
  2182. _status = [201,300]
  2183. base_dict = {
  2184. "docchannel":item["docchannel"],
  2185. "status":_status,
  2186. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  2187. }
  2188. must_not_dict = {"docid":item.get("docid")}
  2189. doctitle_refine_name = "doctitle"
  2190. list_rules = []
  2191. singleNum_keys = ["tenderee","win_tenderer"]
  2192. confidence = 100
  2193. self.appendRule(list_rules,{document_tmp_fingerprint:fingerprint},base_dict,must_not_dict,confidence,item)
  2194. confidence = 90
  2195. _dict = {document_tmp_agency:agency,
  2196. "win_tenderer":win_tenderer,
  2197. "win_bid_price":win_bid_price}
  2198. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2199. _dict = {document_tmp_agency:agency,
  2200. "win_tenderer":win_tenderer,
  2201. "bidding_budget":bidding_budget}
  2202. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2203. _dict = {document_tmp_agency:agency,
  2204. "win_bid_price":win_bid_price,
  2205. "bidding_budget":bidding_budget}
  2206. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2207. _dict = {win_tenderer:win_tenderer,
  2208. "win_bid_price":win_bid_price,
  2209. "bidding_budget":bidding_budget}
  2210. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2211. _dict = {"tenderee":tenderee,
  2212. "win_tenderer":win_tenderer,
  2213. "win_bid_price":win_bid_price}
  2214. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2215. _dict = {"tenderee":tenderee,
  2216. "win_tenderer":win_tenderer,
  2217. "bidding_budget":bidding_budget}
  2218. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2219. _dict = {"tenderee":tenderee,
  2220. "win_bid_price":win_bid_price,
  2221. "bidding_budget":bidding_budget}
  2222. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2223. _dict = {"tenderee":tenderee,
  2224. "agency":agency,
  2225. "win_tenderer":win_tenderer}
  2226. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2227. _dict = {"tenderee":tenderee,
  2228. "agency":agency,
  2229. "win_bid_price":win_bid_price}
  2230. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2231. _dict = {"tenderee":tenderee,
  2232. "agency":agency,
  2233. "bidding_budget":bidding_budget}
  2234. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2235. confidence=85
  2236. _dict = {"tenderee":tenderee,
  2237. "agency":agency
  2238. }
  2239. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2240. _dict = {"tenderee":tenderee,
  2241. "project_codes":project_code
  2242. }
  2243. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2244. _dict = {"tenderee":tenderee,
  2245. "project_name":project_name
  2246. }
  2247. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2248. if getLength(product)>0:
  2249. l_p = product.split(",")
  2250. _dict = {"tenderee":tenderee,
  2251. "product":l_p[0]
  2252. }
  2253. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2254. _dict = {"tenderee":tenderee,
  2255. "win_tenderer":win_tenderer
  2256. }
  2257. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2258. _dict = {"tenderee":tenderee,
  2259. "win_bid_price":win_bid_price
  2260. }
  2261. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2262. _dict = {"tenderee":tenderee,
  2263. "bidding_budget":bidding_budget
  2264. }
  2265. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2266. _dict = {"tenderee":tenderee,
  2267. doctitle_refine_name:doctitle_refine
  2268. }
  2269. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2270. _dict = {"agency":agency,
  2271. "project_codes":project_code
  2272. }
  2273. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2274. _dict = {"agency":agency,
  2275. "project_name":project_name
  2276. }
  2277. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2278. _dict = {"project_codes":project_code,
  2279. "project_name":project_name
  2280. }
  2281. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2282. _dict = {"project_codes":project_code,
  2283. "win_tenderer":win_tenderer
  2284. }
  2285. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2286. _dict = {"project_codes":project_code,
  2287. "win_bid_price":win_bid_price
  2288. }
  2289. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2290. _dict = {"project_codes":project_code,
  2291. "bidding_budget":bidding_budget
  2292. }
  2293. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2294. _dict = {"project_codes":project_code,
  2295. doctitle_refine_name:doctitle_refine
  2296. }
  2297. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2298. _dict = {"project_name":project_name,
  2299. "win_tenderer":win_tenderer
  2300. }
  2301. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2302. _dict = {"project_name":project_name,
  2303. "win_bid_price":win_bid_price
  2304. }
  2305. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2306. _dict = {"project_name":project_name,
  2307. "bidding_budget":bidding_budget
  2308. }
  2309. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2310. _dict = {"project_name":project_name,
  2311. doctitle_refine_name:doctitle_refine
  2312. }
  2313. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2314. _dict = {"win_tenderer":win_tenderer,
  2315. "win_bid_price":win_bid_price
  2316. }
  2317. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2318. _dict = {"win_tenderer":win_tenderer,
  2319. "bidding_budget":bidding_budget
  2320. }
  2321. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2322. _dict = {"win_tenderer":win_tenderer,
  2323. doctitle_refine_name:doctitle_refine
  2324. }
  2325. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2326. _dict = {"win_bid_price":win_bid_price,
  2327. "bidding_budget":bidding_budget
  2328. }
  2329. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2330. _dict = {"win_bid_price":win_bid_price,
  2331. doctitle_refine_name:doctitle_refine
  2332. }
  2333. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2334. _dict = {"bidding_budget":bidding_budget,
  2335. doctitle_refine_name:doctitle_refine
  2336. }
  2337. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2338. confidence=80
  2339. _dict = {doctitle_refine_name:doctitle_refine}
  2340. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2341. _dict = {"project_codes":project_code}
  2342. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2343. confidence=70
  2344. _dict = {"project_name":project_name}
  2345. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2346. return list_rules,table_name,table_index
  2347. def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
  2348. def producer(columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]):
  2349. bool_query = BoolQuery(must_queries=[
  2350. RangeQuery(document_tmp_status,*status_from,True,True),
  2351. # TermQuery("docid",271983871)
  2352. ])
  2353. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2354. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  2355. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2356. log("flow_dumplicate producer total_count:%d"%total_count)
  2357. list_dict = getRow_ots(rows)
  2358. for _dict in list_dict:
  2359. self.queue_dumplicate.put(_dict)
  2360. _count = len(list_dict)
  2361. while next_token and _count<flow_process_count:
  2362. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2363. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  2364. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2365. list_dict = getRow_ots(rows)
  2366. for _dict in list_dict:
  2367. self.queue_dumplicate.put(_dict)
  2368. _count += len(list_dict)
  2369. def comsumer():
  2370. mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,60,1,ots_client=self.ots_client)
  2371. mt.run()
  2372. producer()
  2373. comsumer()
  2374. def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]):
  2375. '''
  2376. 根据docid查询公告内容,先查询document_tmp,再查询document
  2377. :param list_docids:
  2378. :return:
  2379. '''
  2380. list_docs = []
  2381. for _docid in list_docids:
  2382. docid = int(_docid)
  2383. _dict = {document_partitionkey:getPartitionKey(docid),
  2384. document_docid:docid}
  2385. _doc = Document_tmp(_dict)
  2386. _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
  2387. if not _exists:
  2388. _doc = Document(_dict)
  2389. _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
  2390. if _exists:
  2391. list_docs.append(_doc)
  2392. for _doc in list_docs:
  2393. try:
  2394. _sub_docs_json = _doc.getProperties().get(document_tmp_sub_docs_json)
  2395. if _sub_docs_json is not None:
  2396. _doc.setValue("sub_docs",json.loads(_sub_docs_json),False)
  2397. except Exception as e:
  2398. traceback.print_exc()
  2399. list_docs.sort(key=lambda x:x.getProperties().get(document_page_time,""))
  2400. return list_docs
  2401. def is_same_package(self,_dict1,_dict2):
  2402. sub_project_name1 = _dict1.get(project_sub_project_name,"")
  2403. if sub_project_name1=="Project":
  2404. sub_project_name1 = ""
  2405. win_tenderer1 = _dict1.get(project_win_tenderer,"")
  2406. win_bid_price1 = _dict1.get(project_win_bid_price,0)
  2407. bidding_budget1 = _dict1.get(project_bidding_budget,0)
  2408. sub_project_name2 = _dict2.get(project_sub_project_name,"")
  2409. if sub_project_name2=="Project":
  2410. sub_project_name2 = ""
  2411. win_tenderer2 = _dict2.get(project_win_tenderer,"")
  2412. win_bid_price2 = _dict2.get(project_win_bid_price,0)
  2413. bidding_budget2 = _dict2.get(project_bidding_budget,0)
  2414. _set = set([a for a in [sub_project_name1,sub_project_name2] if a!=""])
  2415. if len(_set)>1:
  2416. return False
  2417. _set = set([a for a in [win_tenderer1,win_tenderer2] if a!=""])
  2418. if len(_set)>1:
  2419. return False
  2420. _set = set([a for a in [win_bid_price1,win_bid_price2] if a!=0])
  2421. if len(_set)>1:
  2422. return False
  2423. _set = set([a for a in [bidding_budget1,bidding_budget2] if a!=0])
  2424. if len(_set)>1:
  2425. return False
  2426. return True
  2427. def getUpdate_dict(self,_dict):
  2428. update_dict = {}
  2429. for k,v in _dict.items():
  2430. if v is None:
  2431. continue
  2432. if isinstance(v,str):
  2433. if v=="":
  2434. continue
  2435. if isinstance(v,(float,int)):
  2436. if v==0:
  2437. continue
  2438. update_dict[k] = v
  2439. return update_dict
  2440. def update_projects_by_project(self,project_dict,projects):
  2441. _dict = {}
  2442. #更新公共属性
  2443. for k,v in project_dict.items():
  2444. if k in (project_dynamics,project_product,project_project_codes,project_docids,project_uuid):
  2445. continue
  2446. for _proj in projects:
  2447. if k not in _proj:
  2448. _dict[k] = v
  2449. elif _proj.get(k,"未知") in ('全国',"未知"):
  2450. _dict[k] = v
  2451. for _proj in projects:
  2452. _proj.update(_dict)
  2453. #拼接属性
  2454. append_dict = {}
  2455. set_docid = set()
  2456. set_product = set()
  2457. set_code = set()
  2458. set_uuid = set()
  2459. for _proj in projects:
  2460. _docids = _proj.get(project_docids,"")
  2461. _codes = _proj.get(project_project_codes,"")
  2462. _product = _proj.get(project_product,"")
  2463. _uuid = _proj.get(project_uuid,"")
  2464. set_docid = set_docid | set(_docids.split(","))
  2465. set_code = set_code | set(_codes.split(","))
  2466. set_product = set_product | set(_product.split(","))
  2467. set_uuid = set_uuid | set(_uuid.split(","))
  2468. set_docid = set_docid | set(project_dict.get(project_docids,"").split(","))
  2469. set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
  2470. set_product = set_product | set(project_dict.get(project_product,"").split(","))
  2471. set_uuid = set_uuid | set(project_dict.get(project_uuid,"").split(","))
  2472. append_dict[project_docids] = ",".join([a for a in list(set_docid) if a!=""])
  2473. append_dict[project_docid_number] = len(set_docid)
  2474. append_dict[project_project_codes] = ",".join([a for a in list(set_code) if a!=""])
  2475. append_dict[project_product] = ",".join([a for a in list(set_product) if a!=""])
  2476. append_dict[project_uuid] = ",".join([a for a in list(set_uuid) if a!=""])
  2477. dict_dynamic = {}
  2478. set_docid = set()
  2479. for _proj in projects:
  2480. _dynamic = json.loads(_proj.get(project_dynamics,"[]"))
  2481. for _dy in _dynamic:
  2482. _docid = _dy.get("docid")
  2483. dict_dynamic[_docid] = _dy
  2484. _dynamic = json.loads(project_dict.get(project_dynamics,"[]"))
  2485. for _dy in _dynamic:
  2486. _docid = _dy.get("docid")
  2487. dict_dynamic[_docid] = _dy
  2488. list_dynamics = []
  2489. for k,v in dict_dynamic.items():
  2490. list_dynamics.append(v)
  2491. list_dynamics.sort(key=lambda x:x.get(document_page_time,""))
  2492. append_dict[project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
  2493. for _proj in projects:
  2494. _proj.update(append_dict)
  2495. def update_projects_by_document(self,docid,projects):
  2496. '''
  2497. 更新projects中对应的document的属性
  2498. :param docid:
  2499. :param projects: 项目集合
  2500. :param action:add/delete add时附加唯一属性,delete时删除唯一属性
  2501. :return:
  2502. '''
  2503. list_docs = self.search_docs([docid])
  2504. project_dict = self.generate_common_properties(list_docs)
  2505. list_package_properties = self.generate_packages_properties(list_docs)
  2506. _dict = {}
  2507. #更新公共属性
  2508. for k,v in project_dict.items():
  2509. if v is None or v=="":
  2510. continue
  2511. if k in (project_dynamics,project_product,project_project_codes,project_docids):
  2512. continue
  2513. for _proj in projects:
  2514. if k not in _proj:
  2515. _dict[k] = v
  2516. elif _proj.get(k,"未知")=="未知":
  2517. _dict[k] = v
  2518. for _proj in projects:
  2519. _proj.update(_dict)
  2520. #拼接属性
  2521. append_dict = {}
  2522. set_docid = set()
  2523. set_product = set()
  2524. set_code = set()
  2525. for _proj in projects:
  2526. _docids = _proj.get(project_docids,"")
  2527. _codes = _proj.get(project_project_codes,"")
  2528. _product = _proj.get(project_product,"")
  2529. set_docid = set_docid | set(_docids.split(","))
  2530. set_code = set_code | set(_codes.split(","))
  2531. set_product = set_product | set(_product.split(","))
  2532. set_docid = set_docid | set(project_dict.get(project_docids,"").split(","))
  2533. set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
  2534. set_product = set_product | set(project_dict.get(project_product,"").split(","))
  2535. append_dict[project_docids] = ",".join([a for a in list(set_docid) if a!=""])
  2536. append_dict[project_docid_number] = len(set_docid)
  2537. append_dict[project_project_codes] = ",".join([a for a in list(set_code) if a!=""])
  2538. append_dict[project_product] = ",".join([a for a in list(set_product) if a!=""])
  2539. dict_dynamic = {}
  2540. set_docid = set()
  2541. for _proj in projects:
  2542. _dynamic = json.loads(_proj.get(project_dynamics,"[]"))
  2543. for _dy in _dynamic:
  2544. _docid = _dy.get("docid")
  2545. dict_dynamic[_docid] = _dy
  2546. _dynamic = json.loads(project_dict.get(project_dynamics,"[]"))
  2547. for _dy in _dynamic:
  2548. _docid = _dy.get("docid")
  2549. dict_dynamic[_docid] = _dy
  2550. list_dynamics = []
  2551. for k,v in dict_dynamic.items():
  2552. list_dynamics.append(v)
  2553. list_dynamics.sort(key=lambda x:x.get(document_page_time,""))
  2554. append_dict[project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
  2555. for _proj in projects:
  2556. _proj.update(append_dict)
  2557. dict_package = {}
  2558. for _pp in projects:
  2559. _counts = 0
  2560. sub_project_name = _pp.get(project_sub_project_name,"")
  2561. if sub_project_name=="Project":
  2562. sub_project_name = ""
  2563. win_tenderer = _pp.get(project_win_tenderer,"")
  2564. win_bid_price = _pp.get(project_win_bid_price,0)
  2565. bidding_budget = _pp.get(project_bidding_budget,0)
  2566. if win_tenderer!="" and bidding_budget!=0:
  2567. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2568. dict_package[_key] = _pp
  2569. _counts += 1
  2570. if win_tenderer!="" and win_bid_price!=0:
  2571. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2572. dict_package[_key] = _pp
  2573. _counts +=1
  2574. if _counts==0:
  2575. if win_tenderer!="":
  2576. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2577. dict_package[_key] = _pp
  2578. _counts += 1
  2579. if bidding_budget!=0:
  2580. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2581. dict_package[_key] = _pp
  2582. _counts += 1
  2583. #更新私有属性
  2584. for _pp in list_package_properties:
  2585. flag_update = False
  2586. sub_project_name = _pp.get(project_sub_project_name,"")
  2587. if sub_project_name=="Project":
  2588. sub_project_name = ""
  2589. win_tenderer = _pp.get(project_win_tenderer,"")
  2590. win_bid_price = _pp.get(project_win_bid_price,0)
  2591. bidding_budget = _pp.get(project_bidding_budget,0)
  2592. if win_tenderer!="" and bidding_budget!=0:
  2593. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2594. if _key in dict_package:
  2595. if self.is_same_package(_pp,dict_package[_key]):
  2596. ud = self.getUpdate_dict(_pp)
  2597. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2598. dict_package[_key].update(ud)
  2599. flag_update = True
  2600. continue
  2601. if win_tenderer!="" and win_bid_price!=0:
  2602. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2603. if _key in dict_package:
  2604. if self.is_same_package(_pp,dict_package[_key]):
  2605. ud = self.getUpdate_dict(_pp)
  2606. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2607. dict_package[_key].update(ud)
  2608. flag_update = True
  2609. continue
  2610. if win_tenderer!="":
  2611. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2612. if _key in dict_package:
  2613. if self.is_same_package(_pp,dict_package[_key]):
  2614. ud = self.getUpdate_dict(_pp)
  2615. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2616. dict_package[_key].update(ud)
  2617. flag_update = True
  2618. continue
  2619. if bidding_budget!=0:
  2620. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2621. if _key in dict_package:
  2622. if self.is_same_package(_pp,dict_package[_key]):
  2623. ud = self.getUpdate_dict(_pp)
  2624. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2625. dict_package[_key].update(ud)
  2626. flag_update = True
  2627. continue
  2628. if not flag_update:
  2629. _pp.update(project_dict)
  2630. projects.append(_pp)
  2631. _counts = 0
  2632. if win_tenderer!="" and bidding_budget!=0:
  2633. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2634. dict_package[_key] = _pp
  2635. _counts += 1
  2636. if win_tenderer!="" and win_bid_price!=0:
  2637. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2638. dict_package[_key] = _pp
  2639. _counts +=1
  2640. if _counts==0:
  2641. if win_tenderer!="":
  2642. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2643. dict_package[_key] = _pp
  2644. _counts += 1
  2645. if bidding_budget!=0:
  2646. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2647. dict_package[_key] = _pp
  2648. _counts += 1
  2649. def delete_projects_by_document(self,docid):
  2650. '''
  2651. 更新projects中对应的document的属性
  2652. :param docid:
  2653. :param projects: 项目集合
  2654. :param action:add/delete add时附加唯一属性,delete时删除唯一属性
  2655. :return:
  2656. '''
  2657. set_docid = set()
  2658. list_delete_projects = []
  2659. list_projects = self.search_projects_with_document([docid])
  2660. for _proj in list_projects:
  2661. _p = {}
  2662. _docids = _proj.get(project_docids,"")
  2663. print(_proj.get(project_uuid))
  2664. _p["delete_uuid"] = _proj.get(project_uuid)
  2665. _p["to_delete"] = True
  2666. list_delete_projects.append(_p)
  2667. if _docids!="":
  2668. set_docid = set_docid | set(_docids.split(","))
  2669. if str(docid) in set_docid:
  2670. set_docid.remove(str(docid))
  2671. list_docid = list(set_docid)
  2672. list_projects = []
  2673. if len(list_docid)>0:
  2674. list_docs = self.search_docs(list_docid)
  2675. list_projects = self.generate_projects_from_document(list_docs)
  2676. list_projects = self.dumplicate_projects(list_projects)
  2677. list_projects.extend(list_delete_projects)
  2678. project_json = self.to_project_json(list_projects)
  2679. print("delete_json",project_json)
  2680. return project_json
  2681. def delete_doc_handle(self,_dict,result_queue):
  2682. headers = _dict.get("frame")
  2683. conn = _dict.get("conn")
  2684. log("==========delete")
  2685. if headers is not None:
  2686. message_id = headers.headers["message-id"]
  2687. body = headers.body
  2688. item = json.loads(body)
  2689. docid = item.get("docid")
  2690. if docid is None:
  2691. return
  2692. delete_result = self.delete_projects_by_document(docid)
  2693. if send_msg_toacmq(self.pool_mq_ali,delete_result,self.doc_delete_result):
  2694. ackMsg(conn,message_id)
  2695. def generate_common_properties(self,list_docs):
  2696. '''
  2697. #通用属性生成
  2698. :param list_docis:
  2699. :return:
  2700. '''
  2701. #计数法选择
  2702. choose_dict = {}
  2703. project_dict = {}
  2704. for _key in [document_bidway,document_industry,document_info_type,document_info_source,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]:
  2705. for _doc in list_docs:
  2706. _value = _doc.getProperties().get(_key,"")
  2707. if _value!="":
  2708. if _key not in choose_dict:
  2709. choose_dict[_key] = {}
  2710. if _value not in choose_dict[_key]:
  2711. choose_dict[_key][_value] = 0
  2712. choose_dict[_key][_value] += 1
  2713. _find = False
  2714. for _key in [document_district,document_city,document_province,document_area]:
  2715. area_dict = {}
  2716. for _doc in list_docs:
  2717. loc = _doc.getProperties().get(_key,"未知")
  2718. if loc not in ('全国','未知',"0"):
  2719. if loc not in area_dict:
  2720. area_dict[loc] = 0
  2721. area_dict[loc] += 1
  2722. list_loc = []
  2723. for k,v in area_dict.items():
  2724. list_loc.append([k,v])
  2725. list_loc.sort(key=lambda x:x[1],reverse=True)
  2726. if len(list_loc)>0:
  2727. project_dict[document_district] = _doc.getProperties().get(document_district)
  2728. project_dict[document_city] = _doc.getProperties().get(document_city)
  2729. project_dict[document_province] = _doc.getProperties().get(document_province)
  2730. project_dict[document_area] = _doc.getProperties().get(document_area)
  2731. _find = True
  2732. break
  2733. if not _find:
  2734. if len(list_docs)>0:
  2735. project_dict[document_district] = list_docs[0].getProperties().get(document_district)
  2736. project_dict[document_city] = list_docs[0].getProperties().get(document_city)
  2737. project_dict[document_province] = list_docs[0].getProperties().get(document_province)
  2738. project_dict[document_area] = list_docs[0].getProperties().get(document_area)
  2739. print("choose_dict",choose_dict)
  2740. for _key,_value in choose_dict.items():
  2741. _l = []
  2742. for k,v in _value.items():
  2743. _l.append([k,v])
  2744. _l.sort(key=lambda x:x[1],reverse=True)
  2745. if len(_l)>0:
  2746. _v = _l[0][0]
  2747. if _v in ('全国','未知'):
  2748. if len(_l)>1:
  2749. _v = _l[1][0]
  2750. project_dict[_key] = _v
  2751. list_dynamics = []
  2752. docid_number = 0
  2753. visuable_docids = []
  2754. zhao_biao_page_time = ""
  2755. zhong_biao_page_time = ""
  2756. list_codes = []
  2757. list_product = []
  2758. p_page_time = ""
  2759. remove_docids = set()
  2760. for _doc in list_docs:
  2761. table_name = _doc.getProperties().get("table_name")
  2762. status = _doc.getProperties().get(document_status,0)
  2763. _save = _doc.getProperties().get(document_tmp_save,1)
  2764. doctitle = _doc.getProperties().get(document_doctitle,"")
  2765. docchannel = _doc.getProperties().get(document_docchannel)
  2766. page_time = _doc.getProperties().get(document_page_time,"")
  2767. _docid = _doc.getProperties().get(document_docid)
  2768. _bidway = _doc.getProperties().get(document_bidway,"")
  2769. _docchannel = _doc.getProperties().get(document_life_docchannel,0)
  2770. project_codes = _doc.getProperties().get(document_project_codes)
  2771. product = _doc.getProperties().get(document_product)
  2772. sub_docs = _doc.getProperties().get("sub_docs",[])
  2773. is_multipack = True if len(sub_docs)>1 else False
  2774. extract_count = _doc.getProperties().get(document_tmp_extract_count,0)
  2775. if product is not None:
  2776. list_product.extend(product.split(","))
  2777. if project_codes is not None:
  2778. _c = project_codes.split(",")
  2779. list_codes.extend(_c)
  2780. if p_page_time=="":
  2781. p_page_time = page_time
  2782. print("docid %s page_time:%s docchannel %s"%(str(_docid),str(page_time),str(_docchannel)))
  2783. if zhao_biao_page_time=="" and _docchannel in (51,52,102,103,114):
  2784. zhao_biao_page_time = page_time
  2785. if zhong_biao_page_time=="" and _docchannel in (101,118,119,120):
  2786. zhong_biao_page_time = page_time
  2787. is_visuable = 0
  2788. if table_name=="document":
  2789. if status>=201 and status<=300:
  2790. docid_number +=1
  2791. visuable_docids.append(str(_docid))
  2792. is_visuable = 1
  2793. else:
  2794. remove_docids.add(str(_docid))
  2795. else:
  2796. if _save==1:
  2797. docid_number +=1
  2798. visuable_docids.append(str(_docid))
  2799. is_visuable = 1
  2800. else:
  2801. remove_docids.add(str(_docid))
  2802. list_dynamics.append({document_docid:_docid,
  2803. document_doctitle:doctitle,
  2804. document_docchannel:_docchannel,
  2805. document_bidway:_bidway,
  2806. document_page_time:page_time,
  2807. document_status:201 if is_visuable==1 else 401,
  2808. "is_multipack":is_multipack,
  2809. document_tmp_extract_count:extract_count
  2810. }
  2811. )
  2812. project_dict[project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
  2813. project_dict[project_docid_number] = docid_number
  2814. project_dict[project_docids] = ",".join(list(set(visuable_docids)-remove_docids))
  2815. if zhao_biao_page_time !="":
  2816. project_dict[project_zhao_biao_page_time] = zhao_biao_page_time
  2817. if zhong_biao_page_time !="":
  2818. project_dict[project_zhong_biao_page_time] = zhong_biao_page_time
  2819. project_dict[project_project_codes] = ",".join(list(set(list_codes)))
  2820. project_dict[project_page_time] = p_page_time
  2821. project_dict[project_product] = ",".join(list(set(list_product)))
  2822. return project_dict
  2823. def generate_packages_properties(self,list_docs):
  2824. '''
  2825. 生成分包属性
  2826. :param list_docs:
  2827. :return:
  2828. '''
  2829. list_properties = []
  2830. set_key = set()
  2831. for _doc in list_docs:
  2832. _dict = {}
  2833. sub_docs = _doc.getProperties().get("sub_docs")
  2834. if sub_docs is not None:
  2835. for _d in sub_docs:
  2836. sub_project_code = _d.get(project_sub_project_code,"")
  2837. sub_project_name = _d.get(project_sub_project_name,"")
  2838. win_tenderer = _d.get(project_win_tenderer,"")
  2839. win_bid_price = _d.get(project_win_bid_price,"")
  2840. _key = "%s-%s-%s-%s"%(sub_project_code,sub_project_name,win_tenderer,win_bid_price)
  2841. if _key in set_key:
  2842. continue
  2843. set_key.add(_key)
  2844. list_properties.append(_d)
  2845. return list_properties
  2846. def generate_projects_from_document(self,list_docs):
  2847. '''
  2848. #通过公告生成projects
  2849. :param list_docids:
  2850. :return:
  2851. '''
  2852. #判断标段数
  2853. list_projects = []
  2854. project_dict = self.generate_common_properties(list_docs)
  2855. list_package_properties = self.generate_packages_properties(list_docs)
  2856. #生成包数据
  2857. for _pp in list_package_properties:
  2858. _pp.update(project_dict)
  2859. list_projects.append(_pp)
  2860. return list_projects
  2861. def search_projects_with_document(self,list_docids):
  2862. '''
  2863. 通过docid集合查询对应的projects
  2864. :param list_docids:
  2865. :return:
  2866. '''
  2867. print("==",list_docids)
  2868. list_should_q = []
  2869. for _docid in list_docids:
  2870. list_should_q.append(TermQuery("docids",_docid))
  2871. bool_query = BoolQuery(should_queries=list_should_q)
  2872. _query = {"query":bool_query,"limit":20}
  2873. list_project_dict = getDocument(_query,self.ots_client,[
  2874. project_uuid,project_docids,project_zhao_biao_page_time,
  2875. project_zhong_biao_page_time,
  2876. project_page_time,
  2877. project_area,
  2878. project_province,
  2879. project_city,
  2880. project_district,
  2881. project_info_type,
  2882. project_industry,
  2883. project_qcodes,
  2884. project_project_name,
  2885. project_project_code,
  2886. project_project_codes,
  2887. project_project_addr,
  2888. project_tenderee,
  2889. project_tenderee_addr,
  2890. project_tenderee_phone,
  2891. project_tenderee_contact,
  2892. project_agency,
  2893. project_agency_phone,
  2894. project_agency_contact,
  2895. project_sub_project_name,
  2896. project_sub_project_code,
  2897. project_bidding_budget,
  2898. project_win_tenderer,
  2899. project_win_bid_price,
  2900. project_win_tenderer_manager,
  2901. project_win_tenderer_phone,
  2902. project_second_tenderer,
  2903. project_second_bid_price,
  2904. project_second_tenderer_manager,
  2905. project_second_tenderer_phone,
  2906. project_third_tenderer,
  2907. project_third_bid_price,
  2908. project_third_tenderer_manager,
  2909. project_third_tenderer_phone,
  2910. project_procurement_system,
  2911. project_bidway,
  2912. project_dup_data,
  2913. project_docid_number,
  2914. project_dynamics
  2915. ],sort="page_time",table_name="project2",table_index="project2_index")
  2916. return list_project_dict
  2917. def set_project_uuid(self,_dict,_uuid):
  2918. if _uuid is not None and _uuid!="":
  2919. if "uuid" in _dict:
  2920. _dict["uuid"] = "%s,%s"%(_dict["uuid"],_uuid)
  2921. else:
  2922. _dict["uuid"] = _uuid
  2923. def dumplicate_projects(self,list_projects):
  2924. '''
  2925. 对多标段项目进行去重
  2926. :return:
  2927. '''
  2928. cluster_projects = list_projects
  2929. while 1:
  2930. _update = False
  2931. list_p = []
  2932. for _pp in cluster_projects:
  2933. _find = False
  2934. for _p in list_p:
  2935. if self.check_merge_rule(_p,_pp):
  2936. self.update_projects_by_project(_pp,[_p])
  2937. _find = True
  2938. _update = True
  2939. if not _find:
  2940. list_p.append(_pp)
  2941. if len(cluster_projects)==len(list_p):
  2942. break
  2943. cluster_projects = list_p
  2944. return cluster_projects
  2945. def getMerge_rules(self,page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price):
  2946. list_query = []
  2947. page_time_less = timeAdd(page_time,-150)
  2948. page_time_greater = timeAdd(page_time,120)
  2949. list_code = [a for a in project_codes.split(",") if a!='']
  2950. should_q_code = BoolQuery(should_queries=[MatchQuery(project_project_codes,a) for a in list_code[:20]])
  2951. should_q_cod = BoolQuery(should_queries=[MatchQuery(project_project_code,a) for a in list_code[:20]])
  2952. list_product = [a for a in product.split(",") if a!='']
  2953. should_q_product = BoolQuery(should_queries=[MatchQuery(project_product,a) for a in list_product[:20]])
  2954. sub_project_q = TermQuery(project_sub_project_name,sub_project_name) if sub_project_name.replace("Project","")!="" else ExistsQuery(project_uuid)
  2955. log("page_time_less %s"%(page_time_less))
  2956. log("page_time_greater %s"%(page_time_greater))
  2957. log("list_code %s"%(str(list_code)))
  2958. log("list_product %s"%(str(list_product)))
  2959. log("tenderee %s"%(tenderee))
  2960. log("bidding_budget %s"%(bidding_budget))
  2961. log("win_tenderer %s"%(win_tenderer))
  2962. log("win_bid_price %s"%(win_bid_price))
  2963. log("project_name %s"%(project_name))
  2964. if tenderee!="" and len(list_code)>0:
  2965. _query = [TermQuery(project_tenderee,tenderee),
  2966. sub_project_q,
  2967. should_q_code,
  2968. RangeQuery(project_page_time,page_time_less,page_time_greater,True,True)]
  2969. list_query.append([_query,2])
  2970. _query = [TermQuery(project_tenderee,tenderee),
  2971. sub_project_q,
  2972. should_q_cod,
  2973. RangeQuery(project_page_time,page_time_less,page_time_greater,True,True)]
  2974. list_query.append([_query,2])
  2975. if tenderee!="" and len(list_product)>0:
  2976. _query = [TermQuery(project_tenderee,tenderee),
  2977. sub_project_q,
  2978. should_q_product,
  2979. RangeQuery(project_page_time,page_time_less,page_time_greater,True,True)]
  2980. list_query.append([_query,2])
  2981. if tenderee!="" and project_name!="":
  2982. _query = [TermQuery(project_tenderee,tenderee),
  2983. sub_project_q,
  2984. TermQuery(project_project_name,project_name),
  2985. RangeQuery(project_page_time,page_time_less,page_time_greater,True,True)]
  2986. list_query.append([_query,2])
  2987. if tenderee!="" and bidding_budget>0:
  2988. _query = [TermQuery(project_tenderee,tenderee),
  2989. sub_project_q,
  2990. TermQuery(project_bidding_budget,bidding_budget),
  2991. RangeQuery(project_page_time,page_time_less,page_time_greater,True,True)]
  2992. list_query.append([_query,2])
  2993. if tenderee!="" and win_tenderer!="":
  2994. _query = [TermQuery(project_tenderee,tenderee),
  2995. sub_project_q,
  2996. TermQuery(project_win_tenderer,win_tenderer),
  2997. RangeQuery(project_page_time,page_time_less,page_time_greater,True,True)]
  2998. list_query.append([_query,2])
  2999. if win_tenderer!="" and len(list_code)>0:
  3000. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3001. sub_project_q,
  3002. should_q_code,
  3003. RangeQuery(project_page_time,page_time_less,page_time_greater,True,True)]
  3004. list_query.append([_query,2])
  3005. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3006. sub_project_q,
  3007. should_q_cod,
  3008. RangeQuery(project_page_time,page_time_less,page_time_greater,True,True)]
  3009. list_query.append([_query,2])
  3010. if win_tenderer!="" and win_bid_price>0:
  3011. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3012. sub_project_q,
  3013. TermQuery(project_win_bid_price,win_bid_price),
  3014. RangeQuery(project_page_time,page_time_less,page_time_greater,True,True)]
  3015. list_query.append([_query,2])
  3016. if len(list_code)>0:
  3017. _query = [
  3018. sub_project_q,
  3019. should_q_code,
  3020. RangeQuery(project_page_time,page_time_less,page_time_greater,True,True)]
  3021. list_query.append([_query,1])
  3022. _query = [
  3023. sub_project_q,
  3024. should_q_cod,
  3025. RangeQuery(project_page_time,page_time_less,page_time_greater,True,True)]
  3026. list_query.append([_query,1])
  3027. if project_name!="":
  3028. _query = [
  3029. sub_project_q,
  3030. TermQuery(project_project_name,project_name),
  3031. RangeQuery(project_page_time,page_time_less,page_time_greater,True,True)]
  3032. list_query.append([_query,1])
  3033. return list_query
  3034. def check_merge_rule(self,_proj,_dict,b_log=False):
  3035. page_time = _proj.get(project_page_time,"")
  3036. project_codes = _proj.get(project_project_codes,"")
  3037. project_name = _proj.get(project_project_name,"")
  3038. tenderee = _proj.get(project_tenderee,"")
  3039. agency = _proj.get(project_agency,"")
  3040. product = _proj.get(project_product,"")
  3041. sub_project_name = _proj.get(project_sub_project_name,"")
  3042. bidding_budget = _proj.get(project_bidding_budget,-1)
  3043. win_tenderer = _proj.get(project_win_tenderer,"")
  3044. win_bid_price = _proj.get(project_win_bid_price,-1)
  3045. project_code = _proj.get(project_project_code,"")
  3046. list_code = [a for a in project_codes.split(",") if a!='']
  3047. if project_code!="":
  3048. list_code.append(project_code)
  3049. page_time_to_merge = _dict.get(project_page_time,"")
  3050. project_codes_to_merge = _dict.get(project_project_codes,"")
  3051. project_name_to_merge = _dict.get(project_project_name,"")
  3052. tenderee_to_merge = _dict.get(project_tenderee,"")
  3053. agency_to_merge = _dict.get(project_agency,"")
  3054. product_to_merge = _dict.get(project_product,"")
  3055. sub_project_name_to_merge = _dict.get(project_sub_project_name,"")
  3056. bidding_budget_to_merge = _dict.get(project_bidding_budget,-1)
  3057. win_tenderer_to_merge = _dict.get(project_win_tenderer,"")
  3058. win_bid_price_to_merge = _dict.get(project_win_bid_price,-1)
  3059. project_code_to_merge = _dict.get(project_project_code,"")
  3060. list_code_to_merge = [a for a in project_codes_to_merge.split(",") if a!='']
  3061. if project_code_to_merge!="":
  3062. list_code.append(project_code_to_merge)
  3063. #check sub_project_name
  3064. _set = set([a for a in [sub_project_name.replace("Project",""),sub_project_name_to_merge.replace("Project","")] if a!=""])
  3065. if len(_set)>1:
  3066. if b_log:
  3067. log("check sub_project_name failed %s===%s"%(str(_proj),str(_dict)))
  3068. return False
  3069. _set = set([a for a in [tenderee,tenderee_to_merge] if a!=""])
  3070. if len(_set)>1:
  3071. if b_log:
  3072. log("check tenderee failed %s===%s"%(str(_proj),str(_dict)))
  3073. return False
  3074. _set = set([a for a in [agency,agency_to_merge] if a!=""])
  3075. if len(_set)>1:
  3076. if b_log:
  3077. log("check agency failed %s===%s"%(str(_proj),str(_dict)))
  3078. return False
  3079. _set = set([a for a in [win_tenderer,win_tenderer_to_merge] if a!=""])
  3080. if len(_set)>1:
  3081. if b_log:
  3082. log("check win_tenderer failed %s===%s"%(str(_proj),str(_dict)))
  3083. return False
  3084. _set = set([a for a in [bidding_budget,bidding_budget_to_merge] if a>0])
  3085. if len(_set)>1:
  3086. if b_log:
  3087. log("check bidding_budget failed %s===%s"%(str(_proj),str(_dict)))
  3088. return False
  3089. _set = set([a for a in [win_bid_price,win_bid_price_to_merge] if a>0])
  3090. if len(_set)>1:
  3091. if b_log:
  3092. log("check win_bid_price failed %s===%s"%(str(_proj),str(_dict)))
  3093. return False
  3094. #check project_codes
  3095. has_same = False
  3096. has_similar = False
  3097. for _c in list_code:
  3098. for _c1 in list_code_to_merge:
  3099. _simi = getSimilarityOfString(_c,_c1)
  3100. if _simi==1:
  3101. has_same = True
  3102. elif _simi>0.7:
  3103. has_similar = True
  3104. if not has_same and has_similar:
  3105. if b_log:
  3106. log("check code failed %s===%s"%(str(_proj),str(_dict)))
  3107. return False
  3108. #check product
  3109. set_product = set([a for a in product.split(",") if a!=""])
  3110. set_product_to_merge = set([a for a in product_to_merge.split(",") if a!=""])
  3111. if len(set_product)>0 and len(set_product_to_merge)>0 and len(set_product&set_product_to_merge)==0:
  3112. if b_log:
  3113. log("check product failed %s===%s"%(str(_proj),str(_dict)))
  3114. return False
  3115. #check money
  3116. _set = set([a for a in [bidding_budget,bidding_budget_to_merge] if a>0])
  3117. _set1 = set([a for a in [win_bid_price,win_bid_price_to_merge] if a>0])
  3118. if len(_set)==1 and len(_set1)==1:
  3119. if max(_set1)>max(_set):
  3120. if b_log:
  3121. log("check money failed %s===%s"%(str(_proj),str(_dict)))
  3122. return False
  3123. return True
  3124. def merge_projects(self,list_projects,columns=[project_tenderee,project_agency,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_sub_project_name,project_product,project_zhao_biao_page_time,project_zhong_biao_page_time,project_project_code,project_project_codes,project_docids]):
  3125. '''
  3126. 对项目进行合并
  3127. :return:
  3128. '''
  3129. set_uuid = set()
  3130. for _proj in list_projects:
  3131. _uuid = _proj.get("uuid")
  3132. if _uuid is not None:
  3133. set_uuid = set_uuid | set(_uuid.split(","))
  3134. must_not_q = []
  3135. for _uuid in list(set_uuid):
  3136. must_not_q.append(TermQuery("uuid",_uuid))
  3137. for _proj in list_projects:
  3138. page_time = _proj.get(project_page_time,"")
  3139. project_codes = _proj.get(project_project_codes,"")
  3140. project_name = _proj.get(project_project_name,"")
  3141. tenderee = _proj.get(project_tenderee,"")
  3142. agency = _proj.get(project_agency,"")
  3143. product = _proj.get(project_product,"")
  3144. sub_project_name = _proj.get(project_sub_project_name,"")
  3145. bidding_budget = _proj.get(project_bidding_budget,-1)
  3146. win_tenderer = _proj.get(project_win_tenderer,"")
  3147. win_bid_price = _proj.get(project_win_bid_price,-1)
  3148. list_must_query = self.getMerge_rules(page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price)
  3149. print("rules count:%d"%(len(list_must_query)))
  3150. list_merge_data = []
  3151. for must_q,_count in list_must_query:
  3152. _query = BoolQuery(must_queries=must_q,
  3153. must_not_queries=must_not_q[:100])
  3154. _limit = _count*10
  3155. rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index_formerge",
  3156. SearchQuery(_query,limit=_limit),
  3157. columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3158. list_data = getRow_ots(rows)
  3159. list_merge_data.extend(list_data)
  3160. # print(list_data)
  3161. for _data in list_data:
  3162. must_not_q.append(TermQuery(project_uuid,_data.get(project_uuid)))
  3163. #优先匹配招标金额相近的
  3164. list_merge_data.sort(key=lambda x:x.get(project_bidding_budget,-1))
  3165. for _data in list_merge_data:
  3166. if self.check_merge_rule(_proj,_data,b_log=False):
  3167. print("pass",_data)
  3168. self.update_projects_by_project(_data,[_proj])
  3169. return list_projects
  3170. def to_project_json(self,projects):
  3171. list_proj = []
  3172. for _proj in projects:
  3173. _uuid = _proj.get(project_uuid,"")
  3174. list_uuid = [a for a in _uuid.split(",") if a!=""]
  3175. if len(list_uuid)>0:
  3176. _proj["keep_uuid"] = list_uuid[0]
  3177. _proj["delete_uuid"] = ",".join(list_uuid[1:])
  3178. else:
  3179. _proj["keep_uuid"] = _proj.get("keep_uuid","")
  3180. _proj["delete_uuid"] = _proj.get("delete_uuid","")
  3181. list_proj.append(_proj)
  3182. if project_uuid in _proj:
  3183. _proj.pop(project_uuid)
  3184. return json.dumps(list_proj,ensure_ascii=False)
  3185. def dumplicate_document_in_merge(self,list_projects):
  3186. '''
  3187. 合并时去重
  3188. :param list_projects:
  3189. :return:
  3190. '''
  3191. for _proj in list_projects:
  3192. dict_channel_proj = {}
  3193. list_dynamics = json.loads(_proj.get(project_dynamics,"[]"))
  3194. set_dup_docid = set()
  3195. for _d in list_dynamics:
  3196. docid = _d.get(document_docid)
  3197. _status = _d.get(document_status,201)
  3198. is_multipack = _d.get("is_multipack",True)
  3199. extract_count = _d.get(document_tmp_extract_count,0)
  3200. docchannel = _d.get(document_docchannel,0)
  3201. if _status>=201 and _status<=300 and docchannel>0:
  3202. if docchannel in dict_channel_proj:
  3203. n_d = dict_channel_proj[docchannel]
  3204. n_docid = n_d.get(document_docid)
  3205. n_is_multipack = n_d.get("is_multipack",True)
  3206. n_extract_count = n_d.get(document_tmp_extract_count,0)
  3207. if not n_is_multipack:
  3208. if is_multipack:
  3209. set_dup_docid.add(str(n_docid))
  3210. dict_channel_proj[docchannel] = _d
  3211. else:
  3212. if extract_count>n_extract_count:
  3213. set_dup_docid.add(str(n_docid))
  3214. dict_channel_proj[docchannel] = _d
  3215. elif extract_count==n_extract_count:
  3216. if n_docid>docid:
  3217. set_dup_docid.add(str(n_docid))
  3218. dict_channel_proj[docchannel] = _d
  3219. else:
  3220. set_dup_docid.add(str(docid))
  3221. else:
  3222. set_dup_docid.add(str(docid))
  3223. else:
  3224. if not is_multipack:
  3225. set_dup_docid.add(str(docid))
  3226. else:
  3227. dict_channel_proj[docchannel] = _d
  3228. docids = _proj.get(project_docids,"")
  3229. set_docids = set([a for a in docids.split(",") if a!=""])
  3230. set_docids = set_docids-set_dup_docid
  3231. _proj[project_docids] = ",".join(list(set_docids))
  3232. _proj[project_docid_number] = len(set_docids)
  3233. _proj[project_dup_docid] = ",".join(list(set_dup_docid))
  3234. def merge_document_real(self,item,dup_docid,table_name,status_to=None):
  3235. '''
  3236. 实时项目合并
  3237. :param item:
  3238. :param dup_docid:重复的公告集合
  3239. :param status_to:
  3240. :return:
  3241. '''
  3242. list_docids = []
  3243. _docid = item.get(document_tmp_docid)
  3244. list_docids.append(_docid)
  3245. if isinstance(dup_docid,list):
  3246. list_docids.extend(dup_docid)
  3247. list_docids = [a for a in list_docids if a is not None]
  3248. list_projects = self.search_projects_with_document(list_docids)
  3249. if len(list_projects)==0:
  3250. list_docs = self.search_docs(list_docids)
  3251. list_projects = self.generate_projects_from_document(list_docs)
  3252. else:
  3253. self.update_projects_by_document(_docid,list_projects)
  3254. list_projects = self.dumplicate_projects(list_projects)
  3255. list_projects = self.merge_projects(list_projects)
  3256. self.dumplicate_document_in_merge(list_projects)
  3257. project_json = self.to_project_json(list_projects)
  3258. print("project_json",project_json)
  3259. return project_json
  3260. def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
  3261. try:
  3262. start_time = time.time()
  3263. self.post_extract(item)
  3264. base_list = []
  3265. set_docid = set()
  3266. list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=get_all,to_log=True)
  3267. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  3268. _i = 0
  3269. step = 5
  3270. item["confidence"] = 999
  3271. if item.get(document_tmp_docid) not in set_docid:
  3272. base_list.append(item)
  3273. set_docid.add(item.get(document_tmp_docid))
  3274. while _i<len(list_rules):
  3275. must_not_q = []
  3276. if len(base_list)>0:
  3277. must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
  3278. _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
  3279. must_not_queries=must_not_q)
  3280. _rule = list_rules[_i]
  3281. confidence = _rule["confidence"]
  3282. singleNum_keys = _rule["singleNum_keys"]
  3283. contain_keys = _rule["contain_keys"]
  3284. multiNum_keys = _rule["multiNum_keys"]
  3285. self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json])
  3286. _i += step
  3287. _time = time.time()
  3288. log("%d start final check with length:%d"%(item["docid"],len(base_list)))
  3289. final_list = self.dumplicate_fianl_check(base_list)
  3290. log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
  3291. best_docid = self.get_best_docid(final_list)
  3292. final_list_docid = [a["docid"] for a in final_list]
  3293. log("%d:final_list_docid:%s"%(item["docid"],str(final_list_docid)))
  3294. _d = {"partitionkey":item["partitionkey"],
  3295. "docid":item["docid"],
  3296. "status":random.randint(*flow_dumplicate_status_to),
  3297. document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  3298. }
  3299. dtmp = Document_tmp(_d)
  3300. dup_docid = set()
  3301. for _dict in final_list:
  3302. dup_docid.add(_dict.get(document_tmp_docid))
  3303. if item.get(document_tmp_docid) in dup_docid:
  3304. dup_docid.remove(item.get(document_tmp_docid))
  3305. remove_list = []
  3306. if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
  3307. dtmp.setValue(document_tmp_save,1,True)
  3308. # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
  3309. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3310. for _dict in final_list:
  3311. if _dict.get(document_tmp_docid) in dup_docid:
  3312. remove_list.append(_dict)
  3313. else:
  3314. dtmp.setValue(document_tmp_save,0,True)
  3315. if best_docid in dup_docid:
  3316. dup_docid.remove(best_docid)
  3317. for _dict in final_list:
  3318. if _dict.get(document_tmp_docid) in dup_docid:
  3319. remove_list.append(_dict)
  3320. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3321. dmp_docid = "%d,%s"%(best_docid,dmp_docid)
  3322. else:
  3323. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3324. for _dict in final_list:
  3325. if _dict.get(document_tmp_docid) in dup_docid:
  3326. remove_list.append(_dict)
  3327. list_docids = list(dup_docid)
  3328. list_docids.append(best_docid)
  3329. dtmp.setValue(document_tmp_projects,self.merge_document_real(item,list_docids,table_name,flow_dumplicate_status_to),True)
  3330. log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
  3331. if upgrade:
  3332. if table_name=="document_tmp":
  3333. self.changeSaveStatus(remove_list)
  3334. # print(dtmp.getProperties())
  3335. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3336. dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
  3337. dtmp.update_row(self.ots_client)
  3338. # log("dump takes %.2f"%(time.time()-start_time))
  3339. except Exception as e:
  3340. traceback.print_exc()
  3341. log("error on dumplicate of %s"%(str(item.get(document_tmp_docid))))
  3342. def start_flow_dumplicate(self):
  3343. schedule = BlockingScheduler()
  3344. schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
  3345. schedule.start()
  3346. def changeSaveStatus(self,list_dict):
  3347. for _dict in list_dict:
  3348. if _dict.get(document_tmp_save,1)==1:
  3349. _d = {"partitionkey":_dict["partitionkey"],
  3350. "docid":_dict["docid"],
  3351. document_tmp_save:0
  3352. }
  3353. _d_tmp = Document_tmp(_d)
  3354. _d_tmp.update_row(self.ots_client)
  3355. def test_dumplicate(self,docid):
  3356. columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
  3357. bool_query = BoolQuery(must_queries=[
  3358. TermQuery("docid",docid)
  3359. ])
  3360. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  3361. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3362. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3363. log("flow_dumplicate producer total_count:%d"%total_count)
  3364. if total_count==0:
  3365. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  3366. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3367. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3368. list_dict = getRow_ots(rows)
  3369. for item in list_dict:
  3370. self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
  3371. return
  3372. def getRemainDoc(self,docid):
  3373. columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
  3374. bool_query = BoolQuery(must_queries=[
  3375. TermQuery("docid",docid)
  3376. ])
  3377. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  3378. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3379. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3380. list_dict = getRow_ots(rows)
  3381. if len(list_dict)>0:
  3382. item = list_dict[0]
  3383. start_time = time.time()
  3384. self.post_extract(item)
  3385. base_list = []
  3386. set_docid = set()
  3387. list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,to_log=True)
  3388. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  3389. _i = 0
  3390. step = 5
  3391. item["confidence"] = 999
  3392. if item.get(document_tmp_docid) not in set_docid:
  3393. base_list.append(item)
  3394. set_docid.add(item.get(document_tmp_docid))
  3395. while _i<len(list_rules):
  3396. must_not_q = []
  3397. if len(base_list)>0:
  3398. must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
  3399. _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
  3400. must_not_queries=must_not_q)
  3401. _rule = list_rules[_i]
  3402. confidence = _rule["confidence"]
  3403. singleNum_keys = _rule["singleNum_keys"]
  3404. contain_keys = _rule["contain_keys"]
  3405. multiNum_keys = _rule["multiNum_keys"]
  3406. self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json])
  3407. _i += step
  3408. _time = time.time()
  3409. log("%d start final check with length:%d"%(item["docid"],len(base_list)))
  3410. final_list = self.dumplicate_fianl_check(base_list)
  3411. log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
  3412. best_docid = self.get_best_docid(final_list)
  3413. return best_docid
  3414. return None
  3415. if __name__ == '__main__':
  3416. # df = Dataflow()
  3417. # df.flow_init()
  3418. # df.flow_test()
  3419. # df.test_merge()
  3420. # df.start_flow_attachment()
  3421. # df.start_flow_extract()
  3422. # df.start_flow_dumplicate()
  3423. # # df.start_flow_merge()
  3424. # df.start_flow_remove()
  3425. # download_attachment()
  3426. # test_attachment_interface()
  3427. df_dump = Dataflow_dumplicate()
  3428. # df_dump.start_flow_dumplicate()
  3429. a = time.time()
  3430. df_dump.test_dumplicate(272934158)
  3431. print("takes",time.time()-a)
  3432. # df_dump.delete_projects_by_document(16288036)
  3433. # log("=======")
  3434. # for i in range(3):
  3435. # time.sleep(20)
  3436. #
  3437. # a = {"docid":16288036}
  3438. # send_msg_toacmq(df_dump.pool_mq_ali,json.dumps(a),df_dump.doc_delete_queue)