dataflow.py 172 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739
  1. # sys.path.append("/data")
  2. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ_ali
  3. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  4. from queue import Queue
  5. from BaseDataMaintenance.model.ots.document_tmp import *
  6. from BaseDataMaintenance.model.ots.attachment import *
  7. from BaseDataMaintenance.model.ots.document_html import *
  8. from BaseDataMaintenance.model.ots.document_extract2 import *
  9. from BaseDataMaintenance.model.ots.project import *
  10. from BaseDataMaintenance.model.ots.document import *
  11. import base64
  12. from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
  13. from uuid import uuid4
  14. from BaseDataMaintenance.common.ossUtils import *
  15. from BaseDataMaintenance.dataSource.source import is_internal,getAuth
  16. from apscheduler.schedulers.blocking import BlockingScheduler
  17. from BaseDataMaintenance.maintenance.dataflow_settings import *
  18. from threading import Thread
  19. import oss2
  20. from BaseDataMaintenance.maxcompute.documentDumplicate import *
  21. from BaseDataMaintenance.maxcompute.documentMerge import *
  22. from BaseDataMaintenance.common.otsUtils import *
  23. from BaseDataMaintenance.common.activateMQUtils import *
  24. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  25. def getSet(list_dict,key):
  26. _set = set()
  27. for item in list_dict:
  28. if key in item:
  29. if item[key]!='' and item[key] is not None:
  30. if re.search("^\d[\d\.]*$",item[key]) is not None:
  31. _set.add(str(float(item[key])))
  32. else:
  33. _set.add(str(item[key]))
  34. return _set
  35. def getSimilarityOfString(str1,str2):
  36. _set1 = set()
  37. _set2 = set()
  38. if str1 is not None:
  39. for i in range(1,len(str1)):
  40. _set1.add(str1[i-1:i+1])
  41. if str2 is not None:
  42. for i in range(1,len(str2)):
  43. _set2.add(str2[i-1:i+1])
  44. _len = max(1,min(len(_set1),len(_set2)))
  45. return len(_set1&_set2)/_len
  46. def getDiffIndex(list_dict,key,confidence=100):
  47. _set = set()
  48. for _i in range(len(list_dict)):
  49. item = list_dict[_i]
  50. if item["confidence"]>=confidence:
  51. continue
  52. if key in item:
  53. if item[key]!='' and item[key] is not None:
  54. if re.search("^\d+(\.\d+)?$",item[key]) is not None:
  55. _set.add(str(float(item[key])))
  56. else:
  57. _set.add(str(item[key]))
  58. if len(_set)>1:
  59. return _i
  60. return len(list_dict)
  61. def transformSWF(bucket,attachment_hub_url,objectPath,localpath,swf_dir):
  62. swf_urls = []
  63. try:
  64. list_files = os.listdir(swf_dir)
  65. list_files.sort(key=lambda x:x)
  66. headers = dict()
  67. headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PUBLIC_READ
  68. for _file in list_files:
  69. swf_localpath = "%s/%s"%(swf_dir,_file)
  70. swf_objectPath = "%s/%s"%(objectPath.split(".")[0],_file)
  71. uploadFileByPath(bucket,swf_localpath,swf_objectPath,headers)
  72. _url = "%s/%s"%(attachment_hub_url,swf_objectPath)
  73. swf_urls.append(_url)
  74. os.remove(swf_localpath)
  75. except Exception as e:
  76. traceback.print_exc()
  77. return swf_urls
  78. class Dataflow():
  79. def __init__(self):
  80. self.ots_client = getConnect_ots()
  81. self.queue_init = Queue()
  82. self.queue_attachment = Queue()
  83. self.queue_attachment_ocr = Queue()
  84. self.queue_attachment_not_ocr = Queue()
  85. self.list_attachment_ocr = []
  86. self.list_attachment_not_ocr = []
  87. self.queue_extract = Queue()
  88. self.list_extract = []
  89. self.queue_dumplicate = Queue()
  90. self.queue_merge = Queue()
  91. self.queue_syncho = Queue()
  92. self.queue_remove = Queue()
  93. self.attachment_rec_interface = ""
  94. self.ots_client = getConnect_ots()
  95. self.ots_client_merge = getConnect_ots()
  96. if is_internal:
  97. self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  98. else:
  99. self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  100. if is_internal:
  101. self.extract_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
  102. self.industy_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
  103. self.other_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
  104. else:
  105. self.extract_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
  106. self.industy_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
  107. self.other_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
  108. self.header = {'Content-Type': 'application/json',"Authorization":"NzZmOWZlMmU2MGY3YmQ4MDBjM2E5MDAyZjhjNjQ0MzZlMmE0NTMwZg=="}
  109. self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
  110. self.auth = getAuth()
  111. oss2.defaults.connection_pool_size = 100
  112. oss2.defaults.multiget_num_threads = 20
  113. log("bucket_url:%s"%(self.bucket_url))
  114. self.attachment_bucket_name = "attachment-hub"
  115. self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
  116. self.current_path = os.path.dirname(__file__)
  117. def flow_init(self):
  118. def producer():
  119. bool_query = BoolQuery(must_queries=[RangeQuery("crtime",'2022-04-20')])
  120. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  121. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  122. ColumnsToGet(return_type=ColumnReturnType.ALL))
  123. log("flow_init producer total_count:%d"%total_count)
  124. list_dict = getRow_ots(rows)
  125. for _dict in list_dict:
  126. self.queue_init.put(_dict)
  127. _count = len(list_dict)
  128. while next_token:
  129. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  130. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  131. ColumnsToGet(return_type=ColumnReturnType.ALL))
  132. list_dict = getRow_ots(rows)
  133. for _dict in list_dict:
  134. self.queue_init.put(_dict)
  135. _count += len(list_dict)
  136. def comsumer():
  137. mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
  138. mt.run()
  139. def comsumer_handle(item,result_queue,ots_client):
  140. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  141. if document_tmp_dochtmlcon in item:
  142. item.pop(document_tmp_dochtmlcon)
  143. if document_tmp_doctextcon in item:
  144. item.pop(document_tmp_doctextcon)
  145. if document_tmp_attachmenttextcon in item:
  146. item.pop(document_tmp_attachmenttextcon)
  147. _status = item.get(document_tmp_status)
  148. new_status = None
  149. if _status>=201 and _status<=300:
  150. item[document_tmp_save] = 1
  151. new_status = 81
  152. elif _status>=401 and _status<=450:
  153. item[document_tmp_save] = 0
  154. new_status = 81
  155. else:
  156. new_status = 1
  157. # new_status = 1
  158. item[document_tmp_status] = new_status
  159. dtmp = Document_tmp(item)
  160. dhtml = Document_html({document_tmp_partitionkey:item.get(document_tmp_partitionkey),
  161. document_tmp_docid:item.get(document_tmp_docid),
  162. document_tmp_dochtmlcon:_dochtmlcon})
  163. dtmp.update_row(ots_client)
  164. dhtml.update_row(ots_client)
  165. producer()
  166. comsumer()
  167. def getTitleFromHtml(self,filemd5,_html):
  168. _soup = BeautifulSoup(_html,"lxml")
  169. _find = _soup.find("a",attrs={"data":filemd5})
  170. _title = ""
  171. if _find is not None:
  172. _title = _find.get_text()
  173. return _title
  174. def getSourceLinkFromHtml(self,filemd5,_html):
  175. _soup = BeautifulSoup(_html,"lxml")
  176. _find = _soup.find("a",attrs={"filelink":filemd5})
  177. filelink = ""
  178. if _find is None:
  179. _find = _soup.find("img",attrs={"filelink":filemd5})
  180. if _find is not None:
  181. filelink = _find.attrs.get("src","")
  182. else:
  183. filelink = _find.attrs.get("href","")
  184. return filelink
  185. def request_attachment_interface(self,attach,_dochtmlcon):
  186. filemd5 = attach.getProperties().get(attachment_filemd5)
  187. _status = attach.getProperties().get(attachment_status)
  188. _filetype = attach.getProperties().get(attachment_filetype)
  189. _size = attach.getProperties().get(attachment_size)
  190. _path = attach.getProperties().get(attachment_path)
  191. _uuid = uuid4()
  192. objectPath = attach.getProperties().get(attachment_path)
  193. localpath = os.path.join(self.current_path,"download",_uuid.hex)
  194. docids = attach.getProperties().get(attachment_docids)
  195. try:
  196. if _size>ATTACHMENT_LARGESIZE:
  197. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE)
  198. log("attachment :%s of path:%s to large"%(filemd5,_path))
  199. attach.update_row(self.ots_client)
  200. return True
  201. else:
  202. d_start_time = time.time()
  203. if downloadFile(self.bucket,objectPath,localpath):
  204. time_download = time.time()-d_start_time
  205. _data_base64 = base64.b64encode(open(localpath,"rb").read())
  206. #调用接口处理结果
  207. start_time = time.time()
  208. _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  209. if _success:
  210. log("process filemd5:%s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
  211. else:
  212. log("attach interface failed of docid:%s filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  213. sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  214. _html = ""
  215. return False
  216. swf_images = eval(swf_images)
  217. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  218. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  219. if len(swf_urls)==0:
  220. objectPath = attach.getProperties().get(attachment_path,"")
  221. localpath = os.path.join(self.current_path,"download/%s.swf"%(uuid4().hex))
  222. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  223. if not os.path.exists(swf_dir):
  224. os.mkdir(swf_dir)
  225. for _i in range(len(swf_images)):
  226. _base = swf_images[_i]
  227. _base = base64.b64decode(_base)
  228. filename = "swf_page_%d.png"%(_i)
  229. filepath = os.path.join(swf_dir,filename)
  230. with open(filepath,"wb") as f:
  231. f.write(_base)
  232. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  233. if os.path.exists(swf_dir):
  234. os.rmdir(swf_dir)
  235. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  236. if re.search("<td",_html) is not None:
  237. attach.setValue(attachment_has_table,1,True)
  238. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  239. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  240. if _file_title!="":
  241. attach.setValue(attachment_file_title,_file_title,True)
  242. if filelink!="":
  243. attach.setValue(attachment_file_link,filelink,True)
  244. attach.setValue(attachment_attachmenthtml,_html,True)
  245. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  246. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  247. attach.setValue(attachment_recsize,len(_html),True)
  248. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  249. attach.update_row(self.ots_client) #线上再开放更新
  250. return True
  251. else:
  252. return False
  253. except oss2.exceptions.NotFound as e:
  254. return True
  255. except Exception as e:
  256. traceback.print_exc()
  257. finally:
  258. try:
  259. os.remove(localpath)
  260. except:
  261. pass
  262. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  263. list_html = []
  264. swf_urls = []
  265. for _attach in list_attach:
  266. #测试全跑
  267. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  268. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  269. if _html is None:
  270. _html = ""
  271. list_html.append(_html)
  272. else:
  273. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  274. if not _succeed:
  275. return False,"",[]
  276. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  277. if _html is None:
  278. _html = ""
  279. list_html.append(_html)
  280. if _attach.getProperties().get(attachment_filetype)=="swf":
  281. swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  282. return True,list_html,swf_urls
  283. def generate_dumplicate_query(self,_dict,_dict_must_not,set_match=set(["project_code","project_codes","product"]),set_nested=set(["win_tenderer","bidding_budget","win_bid_price"]),
  284. set_term=set(["project_name","doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save","docid"]),
  285. set_range=set(["page_time","status"]),set_phrase=set(["doctitle"])):
  286. list_must_queries = []
  287. list_must_no_queries = []
  288. for k,v in _dict.items():
  289. if k in set_match:
  290. if isinstance(v,str):
  291. l_s = []
  292. for s_v in v.split(","):
  293. l_s.append(MatchQuery(k,s_v))
  294. list_must_queries.append(BoolQuery(should_queries=l_s))
  295. elif k in set_nested:
  296. _v = v
  297. if k!="" and k=="bidding_budget" or k=="win_bid_price":
  298. _v = float(_v)
  299. list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  300. elif k in set_term:
  301. list_must_queries.append(TermQuery(k,v))
  302. elif k in set_phrase:
  303. list_must_queries.append(MatchPhraseQuery(k,v))
  304. elif k in set_range:
  305. if len(v)==1:
  306. list_must_queries.append(RangeQuery(k,v[0]))
  307. elif len(v)==2:
  308. list_must_queries.append(RangeQuery(k,v[0],v[1],True,True))
  309. for k,v in _dict_must_not.items():
  310. if k in set_match:
  311. if isinstance(v,str):
  312. l_s = []
  313. for s_v in v.split(","):
  314. l_s.append(MatchQuery(k,s_v))
  315. list_must_no_queries.append(BoolQuery(should_queries=l_s))
  316. elif k in set_nested:
  317. _v = v
  318. if k!="" and k=="bidding_budget" or k=="win_bid_price":
  319. _v = float(_v)
  320. list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  321. elif k in set_term:
  322. list_must_no_queries.append(TermQuery(k,v))
  323. elif k in set_range:
  324. if len(v)==1:
  325. list_must_no_queries.append(RangeQuery(k,v[0]))
  326. elif len(v)==2:
  327. list_must_no_queries.append(RangeQuery(k,v[0],v[1],True,True))
  328. return BoolQuery(must_queries=list_must_queries,must_not_queries=list_must_no_queries)
  329. def f_decode_sub_docs_json(self, project_code,project_name,tenderee,agency,sub_docs_json):
  330. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  331. extract_count = 0
  332. if project_code is not None and project_code!="":
  333. extract_count += 1
  334. if project_name is not None and project_name!="":
  335. extract_count += 1
  336. if tenderee is not None and tenderee!="":
  337. extract_count += 1
  338. if agency is not None and agency!="":
  339. extract_count += 1
  340. if sub_docs_json is not None:
  341. sub_docs = json.loads(sub_docs_json)
  342. sub_docs.sort(key=lambda x:x.get("bidding_budget",0),reverse=True)
  343. sub_docs.sort(key=lambda x:x.get("win_bid_price",0),reverse=True)
  344. # log("==%s"%(str(sub_docs)))
  345. for sub_docs in sub_docs:
  346. for _key_sub_docs in sub_docs.keys():
  347. extract_count += 1
  348. if _key_sub_docs in columns:
  349. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  350. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  351. if float(sub_docs[_key_sub_docs])>0:
  352. columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
  353. else:
  354. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  355. return columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count
  356. def post_extract(self,_dict):
  357. win_tenderer,bidding_budget,win_bid_price,extract_count = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
  358. _dict["win_tenderer"] = win_tenderer
  359. _dict["bidding_budget"] = bidding_budget
  360. _dict["win_bid_price"] = win_bid_price
  361. if "extract_count" not in _dict:
  362. _dict["extract_count"] = extract_count
  363. def get_dump_columns(self,_dict):
  364. docchannel = _dict.get(document_tmp_docchannel,0)
  365. project_code = _dict.get(document_tmp_project_code,"")
  366. project_name = _dict.get(document_tmp_project_name,"")
  367. tenderee = _dict.get(document_tmp_tenderee,"")
  368. agency = _dict.get(document_tmp_agency,"")
  369. doctitle_refine = _dict.get(document_tmp_doctitle_refine,"")
  370. win_tenderer = _dict.get("win_tenderer","")
  371. bidding_budget = _dict.get("bidding_budget","")
  372. if bidding_budget==0:
  373. bidding_budget = ""
  374. win_bid_price = _dict.get("win_bid_price","")
  375. if win_bid_price==0:
  376. win_bid_price = ""
  377. page_time = _dict.get(document_tmp_page_time,"")
  378. fingerprint = _dict.get(document_tmp_fingerprint,"")
  379. product = _dict.get(document_tmp_product,"")
  380. return docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product
  381. def f_set_docid_limitNum_contain(self,item, _split,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"]):
  382. flag = True
  383. for _key in singleNum_keys:
  384. if len(getSet(_split,_key))>1:
  385. flag = False
  386. break
  387. for _key in multiNum_keys:
  388. if len(getSet(_split,_key))<=1:
  389. flag = False
  390. break
  391. project_code = item.get("project_code","")
  392. for _key in notlike_keys:
  393. if not flag:
  394. break
  395. for _d in _split:
  396. _key_v = _d.get(_key,"")
  397. _sim = getSimilarityOfString(project_code,_key_v)
  398. if _sim>0.7 and _sim<1:
  399. flag = False
  400. break
  401. #判断组内每条公告是否包含
  402. if flag:
  403. if len(contain_keys)>0:
  404. for _key in contain_keys:
  405. MAX_CONTAIN_COLUMN = None
  406. for _d in _split:
  407. contain_column = _d.get(_key)
  408. if contain_column is not None and contain_column !="":
  409. if MAX_CONTAIN_COLUMN is None:
  410. MAX_CONTAIN_COLUMN = contain_column
  411. else:
  412. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  413. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  414. flag = False
  415. break
  416. MAX_CONTAIN_COLUMN = contain_column
  417. else:
  418. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  419. flag = False
  420. break
  421. if flag:
  422. return _split
  423. return []
  424. def search_data_by_query(self,item,_query,confidence,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  425. list_data = []
  426. if isinstance(_query,list):
  427. bool_query = BoolQuery(should_queries=_query)
  428. else:
  429. bool_query = _query
  430. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  431. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=50,get_total_count=True),
  432. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  433. list_dict = getRow_ots(rows)
  434. for _dict in list_dict:
  435. self.post_extract(_dict)
  436. _dict["confidence"] = confidence
  437. list_data.append(_dict)
  438. # _count = len(list_dict)
  439. # while next_token:
  440. # rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  441. # SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  442. # ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  443. # list_dict = getRow_ots(rows)
  444. # for _dict in list_dict:
  445. # self.post_extract(_dict)
  446. # _dict["confidence"] = confidence
  447. # list_data.append(_dict)
  448. list_dict = self.f_set_docid_limitNum_contain(item,list_dict,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys)
  449. return list_dict
  450. def add_data_by_query(self,item,base_list,set_docid,_query,confidence,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  451. list_dict = self.search_data_by_query(item,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
  452. for _dict in list_dict:
  453. self.post_extract(_dict)
  454. _docid = _dict.get(document_tmp_docid)
  455. if _docid not in set_docid:
  456. base_list.append(_dict)
  457. set_docid.add(_docid)
  458. def translate_dumplicate_rules(self,status_from,item):
  459. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  460. if page_time=='':
  461. page_time = getCurrent_date("%Y-%m-%d")
  462. base_dict = {
  463. "status":[status_from[0]],
  464. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  465. }
  466. must_not_dict = {"save":0}
  467. list_rules = []
  468. singleNum_keys = ["tenderee","win_tenderer"]
  469. if fingerprint!="":
  470. _dict = {}
  471. confidence = 100
  472. _dict[document_tmp_fingerprint] = fingerprint
  473. _dict.update(base_dict)
  474. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  475. _rule = {"confidence":confidence,
  476. "item":item,
  477. "query":_query,
  478. "singleNum_keys":[],
  479. "contain_keys":[],
  480. "multiNum_keys":[]}
  481. list_rules.append(_rule)
  482. if docchannel in (52,118):
  483. if bidding_budget!="" and tenderee!="" and project_code!="":
  484. confidence = 90
  485. _dict = {document_tmp_docchannel:docchannel,
  486. "bidding_budget":item.get("bidding_budget"),
  487. document_tmp_tenderee:item.get(document_tmp_tenderee,""),
  488. document_tmp_project_code:item.get(document_tmp_project_code,"")
  489. }
  490. _dict.update(base_dict)
  491. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  492. _rule = {"confidence":confidence,
  493. "query":_query,
  494. "singleNum_keys":singleNum_keys,
  495. "contain_keys":[],
  496. "multiNum_keys":[document_tmp_web_source_no]}
  497. list_rules.append(_rule)
  498. if doctitle_refine!="" and tenderee!="" and bidding_budget!="":
  499. confidence = 80
  500. _dict = {document_tmp_docchannel:docchannel,
  501. "doctitle_refine":doctitle_refine,
  502. "tenderee":tenderee,
  503. bidding_budget:"bidding_budget"
  504. }
  505. _dict.update(base_dict)
  506. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  507. _rule = {"confidence":confidence,
  508. "query":_query,
  509. "singleNum_keys":singleNum_keys,
  510. "contain_keys":[],
  511. "multiNum_keys":[document_tmp_web_source_no]}
  512. list_rules.append(_rule)
  513. if project_code!="" and doctitle_refine!="" and agency!="" and bidding_budget!="":
  514. confidence = 90
  515. _dict = {document_tmp_docchannel:docchannel,
  516. "project_code":project_code,
  517. "doctitle_refine":doctitle_refine,
  518. "agency":agency,
  519. "bidding_budget":bidding_budget
  520. }
  521. _dict.update(base_dict)
  522. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  523. _rule = {"confidence":confidence,
  524. "query":_query,
  525. "singleNum_keys":singleNum_keys,
  526. "contain_keys":[],
  527. "multiNum_keys":[document_tmp_web_source_no]}
  528. list_rules.append(_rule)
  529. if project_code!="" and tenderee!="" and bidding_budget!="":
  530. confidence = 91
  531. _dict = {document_tmp_docchannel:docchannel,
  532. "project_code":project_code,
  533. "tenderee":tenderee,
  534. "bidding_budget":bidding_budget
  535. }
  536. _dict.update(base_dict)
  537. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  538. _rule = {"confidence":confidence,
  539. "query":_query,
  540. "singleNum_keys":singleNum_keys,
  541. "contain_keys":[],
  542. "multiNum_keys":[document_tmp_web_source_no]}
  543. list_rules.append(_rule)
  544. if doctitle_refine!="" and agency!="" and bidding_budget!="":
  545. confidence = 71
  546. _dict = {document_tmp_docchannel:docchannel,
  547. "doctitle_refine":doctitle_refine,
  548. "agency":agency,
  549. "bidding_budget":bidding_budget
  550. }
  551. _dict.update(base_dict)
  552. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  553. _rule = {"confidence":confidence,
  554. "query":_query,
  555. "singleNum_keys":singleNum_keys,
  556. "contain_keys":[],
  557. "multiNum_keys":[document_tmp_web_source_no]}
  558. list_rules.append(_rule)
  559. if project_code!="" and project_name!="" and agency!="" and bidding_budget!="":
  560. confidence = 91
  561. _dict = {document_tmp_docchannel:docchannel,
  562. "project_code":project_code,
  563. "project_name":project_name,
  564. "agency":agency,
  565. "bidding_budget":bidding_budget
  566. }
  567. _dict.update(base_dict)
  568. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  569. n_singleKeys = [i for i in singleNum_keys]
  570. n_singleKeys.append(document_tmp_web_source_no)
  571. _rule = {"confidence":confidence,
  572. "query":_query,
  573. "singleNum_keys":n_singleKeys,
  574. "contain_keys":[],
  575. "multiNum_keys":[]}
  576. list_rules.append(_rule)
  577. ##-- 5. 招标公告 - 同项目编号- 同[项目名称、标题] - 同[招标人、代理公司] - 同预算(!=0) - 同信息源=1
  578. if project_code!="" and project_name!="" and tenderee!="" and bidding_budget!="":
  579. confidence = 91
  580. _dict = {document_tmp_docchannel:docchannel,
  581. "project_code":project_code,
  582. "project_name":project_name,
  583. "tenderee":tenderee,
  584. "bidding_budget":bidding_budget
  585. }
  586. _dict.update(base_dict)
  587. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  588. n_singleKeys = [i for i in singleNum_keys]
  589. n_singleKeys.append(document_tmp_web_source_no)
  590. _rule = {"confidence":confidence,
  591. "query":_query,
  592. "singleNum_keys":n_singleKeys,
  593. "contain_keys":[],
  594. "multiNum_keys":[]}
  595. list_rules.append(_rule)
  596. if project_code!="" and doctitle_refine!="" and tenderee!="" and bidding_budget!="":
  597. confidence = 71
  598. _dict = {document_tmp_docchannel:docchannel,
  599. "project_code":project_code,
  600. "doctitle_refine":doctitle_refine,
  601. "tenderee":tenderee,
  602. "bidding_budget":bidding_budget
  603. }
  604. _dict.update(base_dict)
  605. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  606. _rule = {"confidence":confidence,
  607. "query":_query,
  608. "singleNum_keys":singleNum_keys,
  609. "contain_keys":[],
  610. "multiNum_keys":[document_tmp_web_source_no]}
  611. list_rules.append(_rule)
  612. #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
  613. if project_name!="" and agency!="":
  614. tmp_bidding = 0
  615. if bidding_budget!="":
  616. tmp_bidding = bidding_budget
  617. confidence = 51
  618. _dict = {document_tmp_docchannel:docchannel,
  619. "project_name":project_name,
  620. "agency":agency,
  621. "bidding_budget":tmp_bidding
  622. }
  623. _dict.update(base_dict)
  624. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  625. _rule = {"confidence":confidence,
  626. "query":_query,
  627. "singleNum_keys":singleNum_keys,
  628. "contain_keys":[],
  629. "multiNum_keys":[document_tmp_web_source_no]}
  630. list_rules.append(_rule)
  631. #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
  632. if project_code!="" and agency!="":
  633. tmp_bidding = 0
  634. if bidding_budget!="":
  635. tmp_bidding = bidding_budget
  636. confidence = 51
  637. _dict = {document_tmp_docchannel:docchannel,
  638. "project_code":project_code,
  639. "agency":agency,
  640. "bidding_budget":tmp_bidding
  641. }
  642. _dict.update(base_dict)
  643. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  644. _rule = {"confidence":confidence,
  645. "query":_query,
  646. "singleNum_keys":singleNum_keys,
  647. "contain_keys":[],
  648. "multiNum_keys":[document_tmp_web_source_no]}
  649. list_rules.append(_rule)
  650. if docchannel not in (101,119,120):
  651. #-- 7. 非中标公告 - 同项目名称 - 同发布日期 - 同招标人 - 同预算 - 同类型 - 信息源>1 - 同项目编号
  652. if project_name!="" and tenderee!="" and project_code!="":
  653. tmp_bidding = 0
  654. if bidding_budget!="":
  655. tmp_bidding = bidding_budget
  656. confidence = 51
  657. _dict = {document_tmp_docchannel:docchannel,
  658. "project_name":project_name,
  659. "tenderee":tenderee,
  660. "project_code":project_code
  661. }
  662. _dict.update(base_dict)
  663. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  664. _rule = {"confidence":confidence,
  665. "query":_query,
  666. "singleNum_keys":singleNum_keys,
  667. "contain_keys":[],
  668. "multiNum_keys":[document_tmp_web_source_no]}
  669. list_rules.append(_rule)
  670. if docchannel in (101,119,120):
  671. #-- 3. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(==0)
  672. if project_code!="" and project_name!="" and win_tenderer!="":
  673. tmp_win = 0
  674. if win_bid_price!="":
  675. tmp_win = win_bid_price
  676. confidence = 61
  677. _dict = {document_tmp_docchannel:docchannel,
  678. "project_code":project_code,
  679. "project_name":project_name,
  680. "win_tenderer":win_tenderer,
  681. "win_bid_price":tmp_win
  682. }
  683. _dict.update(base_dict)
  684. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  685. _rule = {"confidence":confidence,
  686. "query":_query,
  687. "singleNum_keys":singleNum_keys,
  688. "contain_keys":[],
  689. "multiNum_keys":[]}
  690. list_rules.append(_rule)
  691. if project_code!="" and project_name!="" and bidding_budget!="" and product!="":
  692. confidence = 72
  693. _dict = {document_tmp_docchannel:docchannel,
  694. "project_code":project_code,
  695. "project_name":project_name,
  696. "bidding_budget":bidding_budget,
  697. "product":product
  698. }
  699. _dict.update(base_dict)
  700. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  701. n_singleKeys = [i for i in singleNum_keys]
  702. n_singleKeys.append(document_tmp_web_source_no)
  703. _rule = {"confidence":confidence,
  704. "query":_query,
  705. "singleNum_keys":n_singleKeys,
  706. "contain_keys":[],
  707. "multiNum_keys":[]}
  708. list_rules.append(_rule)
  709. if project_code!='' and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  710. confidence = 91
  711. _dict = {document_tmp_docchannel:docchannel,
  712. "project_code":project_code,
  713. "doctitle_refine":doctitle_refine,
  714. "win_tenderer":win_tenderer,
  715. "win_bid_price":win_bid_price
  716. }
  717. _dict.update(base_dict)
  718. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  719. n_singleKeys = [i for i in singleNum_keys]
  720. n_singleKeys.append(document_tmp_web_source_no)
  721. _rule = {"confidence":confidence,
  722. "query":_query,
  723. "singleNum_keys":n_singleKeys,
  724. "contain_keys":[],
  725. "multiNum_keys":[]}
  726. list_rules.append(_rule)
  727. ##-- 2. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(!=0) - 同信息源=1
  728. if project_code!="" and project_name!="" and win_tenderer!="" and win_bid_price!="":
  729. confidence = 91
  730. _dict = {document_tmp_docchannel:docchannel,
  731. "project_code":project_code,
  732. "project_name":project_name,
  733. "win_tenderer":win_tenderer,
  734. "win_bid_price":win_bid_price
  735. }
  736. _dict.update(base_dict)
  737. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  738. n_singleKeys = [i for i in singleNum_keys]
  739. n_singleKeys.append(document_tmp_web_source_no)
  740. _rule = {"confidence":confidence,
  741. "query":_query,
  742. "singleNum_keys":n_singleKeys,
  743. "contain_keys":[],
  744. "multiNum_keys":[]}
  745. list_rules.append(_rule)
  746. if project_name!="" and win_tenderer!="" and win_bid_price!="":
  747. confidence = 91
  748. _dict = {document_tmp_docchannel:docchannel,
  749. "project_name":project_name,
  750. "win_tenderer":win_tenderer,
  751. "win_bid_price":win_bid_price,
  752. }
  753. _dict.update(base_dict)
  754. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  755. _rule = {"confidence":confidence,
  756. "query":_query,
  757. "singleNum_keys":singleNum_keys,
  758. "contain_keys":[],
  759. "multiNum_keys":[document_tmp_web_source_no]}
  760. list_rules.append(_rule)
  761. if project_code!="" and win_tenderer!="" and win_bid_price!="":
  762. confidence = 91
  763. _dict = {document_tmp_docchannel:docchannel,
  764. "project_code":project_code,
  765. "win_tenderer":win_tenderer,
  766. "win_bid_price":win_bid_price,
  767. }
  768. _dict.update(base_dict)
  769. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  770. _rule = {"confidence":confidence,
  771. "query":_query,
  772. "singleNum_keys":singleNum_keys,
  773. "contain_keys":[],
  774. "multiNum_keys":[document_tmp_web_source_no]}
  775. list_rules.append(_rule)
  776. if project_code!="" and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  777. confidence = 91
  778. _dict = {document_tmp_docchannel:docchannel,
  779. "project_code":project_code,
  780. "doctitle_refine":doctitle_refine,
  781. "win_tenderer":win_tenderer,
  782. "win_bid_price":win_bid_price
  783. }
  784. _dict.update(base_dict)
  785. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  786. n_singleKeys = [i for i in singleNum_keys]
  787. n_singleKeys.append(document_tmp_web_source_no)
  788. _rule = {"confidence":confidence,
  789. "query":_query,
  790. "singleNum_keys":n_singleKeys,
  791. "contain_keys":[],
  792. "multiNum_keys":[]}
  793. list_rules.append(_rule)
  794. if doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  795. confidence=90
  796. _dict = {document_tmp_docchannel:docchannel,
  797. "doctitle_refine":doctitle_refine,
  798. "win_tenderer":win_tenderer,
  799. "win_bid_price":win_bid_price
  800. }
  801. _dict.update(base_dict)
  802. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  803. _rule = {"confidence":confidence,
  804. "query":_query,
  805. "singleNum_keys":singleNum_keys,
  806. "contain_keys":[],
  807. "multiNum_keys":[document_tmp_web_source_no]}
  808. list_rules.append(_rule)
  809. if project_name!="" and win_tenderer!="" and win_bid_price!="" and project_code!="":
  810. confidence=95
  811. _dict = {document_tmp_docchannel:docchannel,
  812. "project_name":project_name,
  813. "win_tenderer":win_tenderer,
  814. "win_bid_price":win_bid_price,
  815. "project_code":project_code
  816. }
  817. _dict.update(base_dict)
  818. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  819. _rule = {"confidence":confidence,
  820. "query":_query,
  821. "singleNum_keys":singleNum_keys,
  822. "contain_keys":[],
  823. "multiNum_keys":[document_tmp_web_source_no]}
  824. list_rules.append(_rule)
  825. if docchannel in (51,103,115,116):
  826. #9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
  827. if doctitle_refine!="" and tenderee!="":
  828. tmp_budget = 0
  829. if bidding_budget!="":
  830. tmp_budget = bidding_budget
  831. confidence=81
  832. _dict = {document_tmp_docchannel:docchannel,
  833. "doctitle_refine":doctitle_refine,
  834. "tenderee":tenderee,
  835. "bidding_budget":tmp_budget,
  836. }
  837. _dict.update(base_dict)
  838. _dict["page_time"] = [page_time,page_time]
  839. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  840. _rule = {"confidence":confidence,
  841. "query":_query,
  842. "singleNum_keys":singleNum_keys,
  843. "contain_keys":[],
  844. "multiNum_keys":[document_tmp_web_source_no]}
  845. list_rules.append(_rule)
  846. #-- 9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
  847. if project_code!="" and tenderee!="":
  848. confidence=81
  849. tmp_budget = 0
  850. if bidding_budget!="":
  851. tmp_budget = bidding_budget
  852. _dict = {document_tmp_docchannel:docchannel,
  853. "project_code":project_code,
  854. "tenderee":tenderee,
  855. "bidding_budget":tmp_budget,
  856. }
  857. _dict.update(base_dict)
  858. _dict["page_time"] = [page_time,page_time]
  859. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  860. _rule = {"confidence":confidence,
  861. "query":_query,
  862. "singleNum_keys":singleNum_keys,
  863. "contain_keys":[],
  864. "multiNum_keys":[document_tmp_web_source_no]}
  865. list_rules.append(_rule)
  866. if project_name!="" and tenderee!="":
  867. confidence=81
  868. tmp_budget = 0
  869. if bidding_budget!="":
  870. tmp_budget = bidding_budget
  871. _dict = {document_tmp_docchannel:docchannel,
  872. "project_name":project_name,
  873. "tenderee":tenderee,
  874. "bidding_budget":tmp_budget,
  875. }
  876. _dict.update(base_dict)
  877. _dict["page_time"] = [page_time,page_time]
  878. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  879. _rule = {"confidence":confidence,
  880. "query":_query,
  881. "singleNum_keys":singleNum_keys,
  882. "contain_keys":[],
  883. "multiNum_keys":[document_tmp_web_source_no]}
  884. list_rules.append(_rule)
  885. if agency!="" and tenderee!="":
  886. confidence=81
  887. tmp_budget = 0
  888. if bidding_budget!="":
  889. tmp_budget = bidding_budget
  890. _dict = {document_tmp_docchannel:docchannel,
  891. "agency":agency,
  892. "tenderee":tenderee,
  893. "bidding_budget":tmp_budget,
  894. "product":product
  895. }
  896. _dict.update(base_dict)
  897. _dict["page_time"] = [page_time,page_time]
  898. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  899. _rule = {"confidence":confidence,
  900. "query":_query,
  901. "singleNum_keys":singleNum_keys,
  902. "contain_keys":[],
  903. "multiNum_keys":[document_tmp_web_source_no]}
  904. list_rules.append(_rule)
  905. if agency!="" and project_code!="":
  906. confidence=81
  907. tmp_budget = 0
  908. if bidding_budget!="":
  909. tmp_budget = bidding_budget
  910. _dict = {document_tmp_docchannel:docchannel,
  911. "agency":agency,
  912. "project_code":project_code,
  913. "bidding_budget":tmp_budget,
  914. "product":product
  915. }
  916. _dict.update(base_dict)
  917. _dict["page_time"] = [page_time,page_time]
  918. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  919. _rule = {"confidence":confidence,
  920. "query":_query,
  921. "singleNum_keys":singleNum_keys,
  922. "contain_keys":[],
  923. "multiNum_keys":[document_tmp_web_source_no]}
  924. list_rules.append(_rule)
  925. if agency!="" and project_name!="":
  926. confidence=81
  927. tmp_budget = 0
  928. if bidding_budget!="":
  929. tmp_budget = bidding_budget
  930. _dict = {document_tmp_docchannel:docchannel,
  931. "agency":agency,
  932. "project_name":project_name,
  933. "bidding_budget":tmp_budget,
  934. "product":product
  935. }
  936. _dict.update(base_dict)
  937. _dict["page_time"] = [page_time,page_time]
  938. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  939. _rule = {"confidence":confidence,
  940. "query":_query,
  941. "singleNum_keys":singleNum_keys,
  942. "contain_keys":[],
  943. "multiNum_keys":[document_tmp_web_source_no]}
  944. list_rules.append(_rule)
  945. #五选二
  946. if tenderee!="" and bidding_budget!="" and product!="":
  947. confidence=80
  948. _dict = {document_tmp_docchannel:docchannel,
  949. "tenderee":tenderee,
  950. "bidding_budget":bidding_budget,
  951. "product":product,
  952. }
  953. _dict.update(base_dict)
  954. _dict["page_time"] = [page_time,page_time]
  955. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  956. _rule = {"confidence":confidence,
  957. "query":_query,
  958. "singleNum_keys":singleNum_keys,
  959. "contain_keys":[],
  960. "multiNum_keys":[]}
  961. list_rules.append(_rule)
  962. if tenderee!="" and win_tenderer!="" and product!="":
  963. confidence=80
  964. _dict = {document_tmp_docchannel:docchannel,
  965. "tenderee":tenderee,
  966. "win_tenderer":win_tenderer,
  967. "product":product,
  968. }
  969. _dict.update(base_dict)
  970. _dict["page_time"] = [page_time,page_time]
  971. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  972. _rule = {"confidence":confidence,
  973. "query":_query,
  974. "singleNum_keys":singleNum_keys,
  975. "contain_keys":[],
  976. "multiNum_keys":[]}
  977. list_rules.append(_rule)
  978. if tenderee!="" and win_bid_price!="":
  979. confidence=80
  980. _dict = {document_tmp_docchannel:docchannel,
  981. "tenderee":tenderee,
  982. "win_bid_price":win_bid_price,
  983. "product":product,
  984. }
  985. _dict.update(base_dict)
  986. _dict["page_time"] = [page_time,page_time]
  987. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  988. _rule = {"confidence":confidence,
  989. "query":_query,
  990. "singleNum_keys":singleNum_keys,
  991. "contain_keys":[],
  992. "multiNum_keys":[]}
  993. list_rules.append(_rule)
  994. if tenderee!="" and agency!="":
  995. confidence=80
  996. _dict = {document_tmp_docchannel:docchannel,
  997. "tenderee":tenderee,
  998. "agency":agency,
  999. "product":product,
  1000. }
  1001. _dict.update(base_dict)
  1002. _dict["page_time"] = [page_time,page_time]
  1003. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1004. _rule = {"confidence":confidence,
  1005. "query":_query,
  1006. "singleNum_keys":singleNum_keys,
  1007. "contain_keys":[],
  1008. "multiNum_keys":[]}
  1009. list_rules.append(_rule)
  1010. if win_tenderer!="" and bidding_budget!="":
  1011. confidence=80
  1012. _dict = {document_tmp_docchannel:docchannel,
  1013. "win_tenderer":win_tenderer,
  1014. "bidding_budget":bidding_budget,
  1015. "product":product,
  1016. }
  1017. _dict.update(base_dict)
  1018. _dict["page_time"] = [page_time,page_time]
  1019. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1020. _rule = {"confidence":confidence,
  1021. "query":_query,
  1022. "singleNum_keys":singleNum_keys,
  1023. "contain_keys":[],
  1024. "multiNum_keys":[]}
  1025. list_rules.append(_rule)
  1026. if win_bid_price!="" and bidding_budget!="":
  1027. confidence=80
  1028. _dict = {document_tmp_docchannel:docchannel,
  1029. "win_bid_price":win_bid_price,
  1030. "bidding_budget":bidding_budget,
  1031. "product":product,
  1032. }
  1033. _dict.update(base_dict)
  1034. _dict["page_time"] = [page_time,page_time]
  1035. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1036. _rule = {"confidence":confidence,
  1037. "query":_query,
  1038. "singleNum_keys":singleNum_keys,
  1039. "contain_keys":[],
  1040. "multiNum_keys":[]}
  1041. list_rules.append(_rule)
  1042. if agency!="" and bidding_budget!="":
  1043. confidence=80
  1044. _dict = {document_tmp_docchannel:docchannel,
  1045. "agency":agency,
  1046. "bidding_budget":bidding_budget,
  1047. "product":product,
  1048. }
  1049. _dict.update(base_dict)
  1050. _dict["page_time"] = [page_time,page_time]
  1051. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1052. _rule = {"confidence":confidence,
  1053. "query":_query,
  1054. "singleNum_keys":singleNum_keys,
  1055. "contain_keys":[],
  1056. "multiNum_keys":[]}
  1057. list_rules.append(_rule)
  1058. if win_tenderer!="" and win_bid_price!="":
  1059. confidence=80
  1060. _dict = {document_tmp_docchannel:docchannel,
  1061. "win_tenderer":win_tenderer,
  1062. "win_bid_price":win_bid_price,
  1063. "product":product,
  1064. }
  1065. _dict.update(base_dict)
  1066. _dict["page_time"] = [page_time,page_time]
  1067. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1068. _rule = {"confidence":confidence,
  1069. "query":_query,
  1070. "singleNum_keys":singleNum_keys,
  1071. "contain_keys":[],
  1072. "multiNum_keys":[]}
  1073. list_rules.append(_rule)
  1074. if win_tenderer!="" and agency!="":
  1075. confidence=80
  1076. _dict = {document_tmp_docchannel:docchannel,
  1077. "win_tenderer":win_tenderer,
  1078. "agency":agency,
  1079. "product":product,
  1080. }
  1081. _dict.update(base_dict)
  1082. _dict["page_time"] = [page_time,page_time]
  1083. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1084. _rule = {"confidence":confidence,
  1085. "query":_query,
  1086. "singleNum_keys":singleNum_keys,
  1087. "contain_keys":[],
  1088. "multiNum_keys":[]}
  1089. list_rules.append(_rule)
  1090. if doctitle_refine!="" and product!="" and len(doctitle_refine)>7:
  1091. confidence=80
  1092. _dict = {document_tmp_docchannel:docchannel,
  1093. "doctitle_refine":doctitle_refine,
  1094. "product":product,
  1095. }
  1096. _dict.update(base_dict)
  1097. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1098. _rule = {"confidence":confidence,
  1099. "query":_query,
  1100. "singleNum_keys":singleNum_keys,
  1101. "contain_keys":[],
  1102. "multiNum_keys":[]}
  1103. list_rules.append(_rule)
  1104. return list_rules
  1105. def dumplicate_fianl_check(self,base_list):
  1106. the_group = base_list
  1107. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  1108. if len(the_group)>10:
  1109. keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget","doctitle_refine"]
  1110. else:
  1111. keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget"]
  1112. #置信度
  1113. list_key_index = []
  1114. for _k in keys:
  1115. if _k=="doctitle":
  1116. list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
  1117. else:
  1118. list_key_index.append(getDiffIndex(the_group,_k))
  1119. _index = min(list_key_index)
  1120. if _index>1:
  1121. return the_group[:_index]
  1122. return []
  1123. def get_best_docid(self,base_list):
  1124. if len(base_list)>0:
  1125. base_list.sort(key=lambda x:x["docid"])
  1126. base_list.sort(key=lambda x:x["extract_count"],reverse=True)
  1127. return base_list[0]["docid"]
  1128. def save_dumplicate(self,base_list,best_docid,status_from,status_to):
  1129. #best_docid need check while others can save directly
  1130. list_dict = []
  1131. for item in base_list:
  1132. docid = item["docid"]
  1133. _dict = {"partitionkey":item["partitionkey"],
  1134. "docid":item["docid"]}
  1135. if docid==best_docid:
  1136. if item.get("save",1)!=0:
  1137. _dict["save"] = 1
  1138. else:
  1139. _dict["save"] = 0
  1140. if item.get("status")>=status_from[0] and item.get("status")<=status_from[1]:
  1141. _dict["status"] = random.randint(status_to[0],status_to[1])
  1142. list_dict.append(_dict)
  1143. for _dict in list_dict:
  1144. dtmp = Document_tmp(_dict)
  1145. dtmp.update_row(self.ots_client)
  1146. def flow_test(self,status_to=[1,10]):
  1147. def producer():
  1148. bool_query = BoolQuery(must_queries=[
  1149. # ExistsQuery("docid"),
  1150. # RangeQuery("crtime",range_to='2022-04-10'),
  1151. # RangeQuery("status",61),
  1152. NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
  1153. ],
  1154. must_not_queries=[
  1155. # NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
  1156. TermQuery("attachment_extract_status",1),
  1157. RangeQuery("status",1,11)
  1158. ]
  1159. )
  1160. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1161. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1162. ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1163. log("flow_init producer total_count:%d"%total_count)
  1164. list_dict = getRow_ots(rows)
  1165. for _dict in list_dict:
  1166. self.queue_init.put(_dict)
  1167. _count = len(list_dict)
  1168. while next_token and _count<1000000:
  1169. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1170. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1171. ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1172. list_dict = getRow_ots(rows)
  1173. for _dict in list_dict:
  1174. self.queue_init.put(_dict)
  1175. _count += len(list_dict)
  1176. print("%d/%d"%(_count,total_count))
  1177. def comsumer():
  1178. mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
  1179. mt.run()
  1180. def comsumer_handle(item,result_queue,ots_client):
  1181. # print(item)
  1182. dtmp = Document_tmp(item)
  1183. dtmp.setValue(document_tmp_status,random.randint(*status_to),True)
  1184. dtmp.update_row(ots_client)
  1185. # dhtml = Document_html(item)
  1186. # dhtml.update_row(ots_client)
  1187. # dtmp.delete_row(ots_client)
  1188. # dhtml.delete_row(ots_client)
  1189. producer()
  1190. comsumer()
  1191. def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
  1192. def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1193. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
  1194. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1195. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1196. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1197. log("flow_dumplicate producer total_count:%d"%total_count)
  1198. list_dict = getRow_ots(rows)
  1199. for _dict in list_dict:
  1200. self.queue_dumplicate.put(_dict)
  1201. _count = len(list_dict)
  1202. while next_token and _count<flow_process_count:
  1203. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1204. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1205. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1206. list_dict = getRow_ots(rows)
  1207. for _dict in list_dict:
  1208. self.queue_dumplicate.put(_dict)
  1209. _count += len(list_dict)
  1210. def comsumer():
  1211. mt = MultiThreadHandler(self.queue_dumplicate,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1212. mt.run()
  1213. def comsumer_handle(item,result_queue,ots_client):
  1214. self.post_extract(item)
  1215. base_list = []
  1216. set_docid = set()
  1217. list_rules = self.translate_dumplicate_rules(flow_dumplicate_status_from,item)
  1218. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  1219. # print(item,"len_rules",len(list_rules))
  1220. for _rule in list_rules:
  1221. _query = _rule["query"]
  1222. confidence = _rule["confidence"]
  1223. singleNum_keys = _rule["singleNum_keys"]
  1224. contain_keys = _rule["contain_keys"]
  1225. multiNum_keys = _rule["multiNum_keys"]
  1226. self.add_data_by_query(item,base_list,set_docid,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys)
  1227. item["confidence"] = 999
  1228. if item.get(document_tmp_docid) not in set_docid:
  1229. base_list.append(item)
  1230. final_list = self.dumplicate_fianl_check(base_list)
  1231. best_docid = self.get_best_docid(final_list)
  1232. # log(str(final_list))
  1233. _d = {"partitionkey":item["partitionkey"],
  1234. "docid":item["docid"],
  1235. "status":random.randint(*flow_dumplicate_status_to),
  1236. document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  1237. }
  1238. dtmp = Document_tmp(_d)
  1239. dup_docid = set()
  1240. for _dict in final_list:
  1241. dup_docid.add(_dict.get(document_tmp_docid))
  1242. if item.get(document_tmp_docid) in dup_docid:
  1243. dup_docid.remove(item.get(document_tmp_docid))
  1244. if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
  1245. dtmp.setValue(document_tmp_save,1,True)
  1246. dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
  1247. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1248. else:
  1249. dtmp.setValue(document_tmp_save,0,True)
  1250. if best_docid in dup_docid:
  1251. dup_docid.remove(best_docid)
  1252. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1253. dmp_docid = "%d,%s"%(best_docid,dmp_docid)
  1254. else:
  1255. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1256. dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
  1257. dtmp.update_row(self.ots_client)
  1258. #只保留当前公告
  1259. # self.save_dumplicate(final_list,best_docid,status_from,status_to)
  1260. #
  1261. # print("=base=",item)
  1262. # if len(final_list)>=1:
  1263. # print("==================")
  1264. # for _dict in final_list:
  1265. # print(_dict)
  1266. # print("========>>>>>>>>>>")
  1267. producer()
  1268. comsumer()
  1269. def merge_document(self,item,status_to=None):
  1270. self.post_extract(item)
  1271. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  1272. _d = {"partitionkey":item["partitionkey"],
  1273. "docid":item["docid"],
  1274. }
  1275. dtmp = Document_tmp(_d)
  1276. if item.get(document_tmp_save,1)==1:
  1277. list_should_q = []
  1278. if project_code!="" and tenderee!="":
  1279. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1280. TermQuery("tenderee",tenderee)])
  1281. list_should_q.append(_q)
  1282. if project_name!="" and project_code!="":
  1283. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1284. TermQuery("project_name",project_name)])
  1285. list_should_q.append(_q)
  1286. if len(list_should_q)>0:
  1287. list_data = self.search_data_by_query(item,list_should_q,100,merge=True,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
  1288. if len(list_data)==1:
  1289. dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
  1290. print(item["docid"],list_data[0]["uuid"])
  1291. else:
  1292. list_should_q = []
  1293. if bidding_budget!="" and project_code!="":
  1294. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1295. TermQuery("bidding_budget",float(bidding_budget))])
  1296. list_should_q.append(_q)
  1297. if tenderee!="" and bidding_budget!="" and project_name!="":
  1298. _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
  1299. TermQuery("bidding_budget",float(bidding_budget)),
  1300. TermQuery("project_name",project_name)])
  1301. list_should_q.append(_q)
  1302. if tenderee!="" and win_bid_price!="" and project_name!="":
  1303. _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
  1304. TermQuery("win_bid_price",float(win_bid_price)),
  1305. TermQuery("project_name",project_name)])
  1306. list_should_q.append(_q)
  1307. if len(list_should_q)>0:
  1308. list_data = self.search_data_by_query(item,list_should_q,100,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
  1309. if len(list_data)==1:
  1310. dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
  1311. print(item["docid"],list_data[0]["uuid"])
  1312. return dtmp.getProperties().get("merge_uuid","")
  1313. # dtmp.update_row(self.ots_client)
  1314. def test_merge(self):
  1315. import pandas as pd
  1316. import queue
  1317. def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1318. list_test_item = []
  1319. should_q = BoolQuery(should_queries=[
  1320. TermQuery("docchannel",101),
  1321. TermQuery("docchannel",119),
  1322. TermQuery("docchannel",120)
  1323. ])
  1324. bool_query = BoolQuery(must_queries=[
  1325. TermQuery("page_time","2022-04-22"),
  1326. should_q,
  1327. TermQuery("save",1)
  1328. ])
  1329. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1330. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1331. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1332. log("flow_dumplicate producer total_count:%d"%total_count)
  1333. list_dict = getRow_ots(rows)
  1334. for _dict in list_dict:
  1335. list_test_item.append(_dict)
  1336. _count = len(list_dict)
  1337. while next_token:
  1338. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1339. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1340. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1341. list_dict = getRow_ots(rows)
  1342. for _dict in list_dict:
  1343. list_test_item.append(_dict)
  1344. _count += len(list_dict)
  1345. print("%d/%d"%(_count,total_count))
  1346. return list_test_item
  1347. from BaseDataMaintenance.model.ots.project import Project
  1348. def comsumer_handle(item,result_queue,ots_client):
  1349. item["merge_uuid"] = self.merge_document(item)
  1350. if item["merge_uuid"]!="":
  1351. _dict = {"uuid":item["merge_uuid"]}
  1352. _p = Project(_dict)
  1353. _p.fix_columns(self.ots_client,["zhao_biao_page_time"],True)
  1354. if _p.getProperties().get("zhao_biao_page_time","")!="":
  1355. item["是否有招标"] = "是"
  1356. list_test_item = producer()
  1357. task_queue = queue.Queue()
  1358. for item in list_test_item:
  1359. task_queue.put(item)
  1360. mt = MultiThreadHandler(task_queue,comsumer_handle,None,30,1,ots_client=self.ots_client)
  1361. mt.run()
  1362. keys = [document_tmp_docid,document_tmp_docchannel,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_doctitle_refine,"win_tenderer","bidding_budget","win_bid_price","merge_uuid","是否有招标"]
  1363. df_data = {}
  1364. for k in keys:
  1365. df_data[k] = []
  1366. for item in list_test_item:
  1367. for k in keys:
  1368. df_data[k].append(item.get(k,""))
  1369. df = pd.DataFrame(df_data)
  1370. df.to_excel("test_merge.xlsx",columns=keys)
  1371. def flow_merge(self,process_count=10000,status_from=[71,80],status_to=[81,90]):
  1372. def producer(columns=[document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1373. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
  1374. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1375. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1376. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1377. log("flow_merge producer total_count:%d"%total_count)
  1378. list_dict = getRow_ots(rows)
  1379. for _dict in list_dict:
  1380. self.queue_merge.put(_dict)
  1381. _count = len(list_dict)
  1382. while next_token and _count<process_count:
  1383. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1384. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1385. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1386. list_dict = getRow_ots(rows)
  1387. for _dict in list_dict:
  1388. self.queue_merge.put(_dict)
  1389. _count += len(list_dict)
  1390. def comsumer():
  1391. mt = MultiThreadHandler(self.queue_merge,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1392. mt.run()
  1393. def comsumer_handle(item,result_queue,ots_client):
  1394. self.merge_document(item,status_to)
  1395. # producer()
  1396. # comsumer()
  1397. pass
  1398. def flow_syncho(self,status_from=[71,80],status_to=[81,90]):
  1399. pass
  1400. def flow_remove(self,process_count=flow_process_count,status_from=flow_remove_status_from):
  1401. def producer():
  1402. current_date = getCurrent_date("%Y-%m-%d")
  1403. tmp_date = timeAdd(current_date,-4)
  1404. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True),
  1405. RangeQuery(document_tmp_crtime,range_to="%s 00:00:00"%(tmp_date))])
  1406. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1407. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1408. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1409. log("flow_remove producer total_count:%d"%total_count)
  1410. list_dict = getRow_ots(rows)
  1411. for _dict in list_dict:
  1412. self.queue_remove.put(_dict)
  1413. _count = len(list_dict)
  1414. while next_token:
  1415. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1416. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1417. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1418. list_dict = getRow_ots(rows)
  1419. for _dict in list_dict:
  1420. self.queue_remove.put(_dict)
  1421. _count += len(list_dict)
  1422. def comsumer():
  1423. mt = MultiThreadHandler(self.queue_remove,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1424. mt.run()
  1425. def comsumer_handle(item,result_queue,ots_client):
  1426. dtmp = Document_tmp(item)
  1427. dtmp.delete_row(self.ots_client)
  1428. dhtml = Document_html(item)
  1429. dhtml.delete_row(self.ots_client)
  1430. producer()
  1431. comsumer()
  1432. def start_flow_dumplicate(self):
  1433. schedule = BlockingScheduler()
  1434. schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
  1435. schedule.start()
  1436. def start_flow_merge(self):
  1437. schedule = BlockingScheduler()
  1438. schedule.add_job(self.flow_merge,"cron",second="*/10")
  1439. schedule.start()
  1440. def start_flow_remove(self):
  1441. schedule = BlockingScheduler()
  1442. schedule.add_job(self.flow_remove,"cron",hour="20")
  1443. schedule.start()
  1444. def download_attachment():
  1445. ots_client = getConnect_ots()
  1446. queue_attachment = Queue()
  1447. auth = getAuth()
  1448. oss2.defaults.connection_pool_size = 100
  1449. oss2.defaults.multiget_num_threads = 20
  1450. attachment_bucket_name = "attachment-hub"
  1451. if is_internal:
  1452. bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  1453. else:
  1454. bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  1455. bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name)
  1456. current_path = os.path.dirname(__file__)
  1457. def producer():
  1458. columns = [document_tmp_attachment_path]
  1459. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_crtime,"2022-03-29 15:00:00","2022-03-29 17:00:00",True,True)])
  1460. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1461. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
  1462. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1463. log("flow_attachment producer total_count:%d"%total_count)
  1464. list_dict = getRow_ots(rows)
  1465. for _dict in list_dict:
  1466. queue_attachment.put(_dict)
  1467. _count = len(list_dict)
  1468. while next_token:
  1469. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1470. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1471. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1472. list_dict = getRow_ots(rows)
  1473. for _dict in list_dict:
  1474. queue_attachment.put(_dict)
  1475. _count += len(list_dict)
  1476. def comsumer():
  1477. mt = MultiThreadHandler(queue_attachment,comsumer_handle,None,10,1)
  1478. mt.run()
  1479. def getAttachments(list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
  1480. list_attachment = []
  1481. rows_to_get = []
  1482. for _md5 in list_filemd5[:50]:
  1483. if _md5 is None:
  1484. continue
  1485. primary_key = [(attachment_filemd5,_md5)]
  1486. rows_to_get.append(primary_key)
  1487. req = BatchGetRowRequest()
  1488. req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
  1489. try:
  1490. result = ots_client.batch_get_row(req)
  1491. attach_result = result.get_result_by_table(attachment_table_name)
  1492. for item in attach_result:
  1493. if item.is_ok:
  1494. _dict = getRow_ots_primary(item.row)
  1495. if _dict is not None:
  1496. list_attachment.append(attachment(_dict))
  1497. except Exception as e:
  1498. log(str(list_filemd5))
  1499. log("attachProcess comsumer error %s"%str(e))
  1500. return list_attachment
  1501. def comsumer_handle(item,result_queue):
  1502. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  1503. if len(page_attachments)==0:
  1504. pass
  1505. else:
  1506. list_fileMd5 = []
  1507. for _atta in page_attachments:
  1508. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  1509. list_attach = getAttachments(list_fileMd5)
  1510. for attach in list_attach:
  1511. filemd5 = attach.getProperties().get(attachment_filemd5)
  1512. _status = attach.getProperties().get(attachment_status)
  1513. _filetype = attach.getProperties().get(attachment_filetype)
  1514. _size = attach.getProperties().get(attachment_size)
  1515. _path = attach.getProperties().get(attachment_path)
  1516. _uuid = uuid4()
  1517. objectPath = attach.getProperties().get(attachment_path)
  1518. localpath = os.path.join(current_path,"download","%s.%s"%(filemd5,_filetype))
  1519. try:
  1520. if _size>ATTACHMENT_LARGESIZE:
  1521. pass
  1522. else:
  1523. downloadFile(bucket,objectPath,localpath)
  1524. except Exception as e:
  1525. traceback.print_exc()
  1526. producer()
  1527. comsumer()
  1528. def test_attachment_interface():
  1529. current_path = os.path.dirname(__file__)
  1530. task_queue = Queue()
  1531. def producer():
  1532. _count = 0
  1533. list_filename = os.listdir(os.path.join(current_path,"download"))
  1534. for _filename in list_filename:
  1535. _count += 1
  1536. _type = _filename.split(".")[1]
  1537. task_queue.put({"path":os.path.join(current_path,"download",_filename),"file_type":_type})
  1538. if _count>=500:
  1539. break
  1540. def comsumer():
  1541. mt = MultiThreadHandler(task_queue,comsumer_handle,None,10)
  1542. mt.run()
  1543. def comsumer_handle(item,result_queue):
  1544. _path = item.get("path")
  1545. _type = item.get("file_type")
  1546. _data_base64 = base64.b64encode(open(_path,"rb").read())
  1547. #调用接口处理结果
  1548. start_time = time.time()
  1549. _success,_html,swf_images = getAttachDealInterface(_data_base64,_type)
  1550. log("%s result:%s takes:%d"%(_path,str(_success),time.time()-start_time))
  1551. producer()
  1552. comsumer()
  1553. class Dataflow_attachment(Dataflow):
  1554. def __init__(self):
  1555. Dataflow.__init__(self)
  1556. def flow_attachment_process(self):
  1557. self.process_comsumer()
  1558. def process_comsumer(self):
  1559. list_thread = []
  1560. thread_count = 60
  1561. for i in range(thread_count):
  1562. list_thread.append(Thread(target=self.process_comsumer_handle))
  1563. for t in list_thread:
  1564. t.start()
  1565. for t in list_thread:
  1566. t.join()
  1567. def process_comsumer_handle(self):
  1568. while 1:
  1569. _flag = False
  1570. try:
  1571. item = self.queue_attachment_ocr.get(True,timeout=0.2)
  1572. self.attachment_recognize(item,None)
  1573. except Exception as e:
  1574. _flag = True
  1575. pass
  1576. try:
  1577. item = self.queue_attachment_not_ocr.get(True,timeout=0.2)
  1578. self.attachment_recognize(item,None)
  1579. except Exception as e:
  1580. _flag = True and _flag
  1581. pass
  1582. if _flag:
  1583. time.sleep(2)
  1584. def attachment_recognize(self,_dict,result_queue):
  1585. item = _dict.get("item")
  1586. list_attach = _dict.get("list_attach")
  1587. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1588. "docid":item.get("docid")})
  1589. dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
  1590. _dochtmlcon = dhtml.getProperties().get("dochtmlcon","")
  1591. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  1592. log(str(swf_urls))
  1593. if not _succeed:
  1594. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  1595. else:
  1596. dhtml.updateSWFImages(swf_urls)
  1597. dhtml.updateAttachment(list_html)
  1598. dhtml.update_row(self.ots_client)
  1599. item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
  1600. item[document_tmp_attachment_extract_status] = 1
  1601. log("document:%d get attachments with result:%s"%(item.get("docid"),str(_succeed)))
  1602. dtmp = Document_tmp(item)
  1603. dtmp.update_row(self.ots_client)
  1604. def flow_attachment(self):
  1605. self.flow_attachment_producer()
  1606. self.flow_attachment_producer_comsumer()
  1607. def getAttachments(self,list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
  1608. list_attachment = []
  1609. rows_to_get = []
  1610. for _md5 in list_filemd5[:50]:
  1611. if _md5 is None:
  1612. continue
  1613. primary_key = [(attachment_filemd5,_md5)]
  1614. rows_to_get.append(primary_key)
  1615. req = BatchGetRowRequest()
  1616. req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
  1617. try:
  1618. result = self.ots_client.batch_get_row(req)
  1619. attach_result = result.get_result_by_table(attachment_table_name)
  1620. for item in attach_result:
  1621. if item.is_ok:
  1622. _dict = getRow_ots_primary(item.row)
  1623. if _dict is not None:
  1624. list_attachment.append(attachment(_dict))
  1625. except Exception as e:
  1626. log(str(list_filemd5))
  1627. log("attachProcess comsumer error %s"%str(e))
  1628. return list_attachment
  1629. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  1630. qsize_ocr = self.queue_attachment_ocr.qsize()
  1631. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  1632. log("queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(qsize_ocr,qsize_not_ocr))
  1633. #选择加入数据场景
  1634. if min(qsize_ocr,qsize_not_ocr)>200 or max(qsize_ocr,qsize_not_ocr)>1000:
  1635. return
  1636. #去重
  1637. set_docid = set()
  1638. set_docid = set_docid | set(self.list_attachment_ocr) | set(self.list_attachment_not_ocr)
  1639. if qsize_ocr>0:
  1640. self.list_attachment_ocr = self.list_attachment_ocr[-qsize_ocr:]
  1641. else:
  1642. self.list_attachment_ocr = []
  1643. if qsize_not_ocr>0:
  1644. self.list_attachment_not_ocr = self.list_attachment_not_ocr[-qsize_not_ocr:]
  1645. else:
  1646. self.list_attachment_not_ocr = []
  1647. try:
  1648. bool_query = BoolQuery(must_queries=[
  1649. RangeQuery(document_tmp_status,*flow_attachment_status_from,True,True),
  1650. # TermQuery(document_tmp_docid,234925191),
  1651. ])
  1652. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1653. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
  1654. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1655. log("flow_attachment producer total_count:%d"%total_count)
  1656. list_dict = getRow_ots(rows)
  1657. _count = 0
  1658. for _dict in list_dict:
  1659. docid = _dict.get(document_tmp_docid)
  1660. if docid in set_docid:
  1661. continue
  1662. self.queue_attachment.put(_dict,True)
  1663. _count += 1
  1664. while next_token and _count<flow_process_count:
  1665. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1666. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1667. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1668. list_dict = getRow_ots(rows)
  1669. for _dict in list_dict:
  1670. docid = _dict.get(document_tmp_docid)
  1671. if docid in set_docid:
  1672. continue
  1673. self.queue_attachment.put(_dict,True)
  1674. _count += 1
  1675. log("add attachment count:%d"%(_count))
  1676. except Exception as e:
  1677. log("flow attachment producer error:%s"%(str(e)))
  1678. traceback.print_exc()
  1679. def flow_attachment_producer_comsumer(self):
  1680. log("start flow_attachment comsumer")
  1681. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1)
  1682. mt.run()
  1683. def set_queue(self,_dict):
  1684. list_attach = _dict.get("list_attach")
  1685. to_ocr = False
  1686. for attach in list_attach:
  1687. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  1688. to_ocr = True
  1689. break
  1690. if to_ocr:
  1691. self.queue_attachment_ocr.put(_dict,True)
  1692. # self.list_attachment_ocr.append(_dict.get("item").get(document_tmp_docid))
  1693. else:
  1694. self.queue_attachment_not_ocr.put(_dict,True)
  1695. # self.list_attachment_not_ocr.append(_dict.get("item").get(document_tmp_docid))
  1696. def comsumer_handle(self,item,result_queue):
  1697. try:
  1698. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  1699. if len(page_attachments)==0:
  1700. item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
  1701. dtmp = Document_tmp(item)
  1702. dtmp.update_row(self.ots_client)
  1703. else:
  1704. list_fileMd5 = []
  1705. for _atta in page_attachments:
  1706. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  1707. list_attach = self.getAttachments(list_fileMd5)
  1708. #未上传成功的2小时内不处理
  1709. if len(page_attachments)!=len(list_attach) and time.mktime(time.localtime())-time.mktime(time.strptime(item.get(document_tmp_crtime),"%Y-%m-%d %H:%M:%S"))<7200:
  1710. item[document_tmp_status] = 1
  1711. dtmp = Document_tmp(item)
  1712. dtmp.update_row(self.ots_client)
  1713. return
  1714. self.set_queue({"item":item,"list_attach":list_attach})
  1715. except Exception as e:
  1716. traceback.print_exc()
  1717. def start_flow_attachment(self):
  1718. schedule = BlockingScheduler()
  1719. schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  1720. schedule.add_job(self.flow_attachment,"cron",second="*/10")
  1721. schedule.start()
  1722. class Dataflow_extract(Dataflow):
  1723. def __init__(self):
  1724. Dataflow.__init__(self)
  1725. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  1726. q_size = self.queue_extract.qsize()
  1727. if q_size>100:
  1728. return
  1729. set_docid = set(self.list_extract)
  1730. if q_size>0:
  1731. self.list_extract = self.list_extract[-q_size:]
  1732. else:
  1733. self.list_extract = []
  1734. try:
  1735. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*flow_extract_status_from,True,True)])
  1736. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1737. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.ASC)]),limit=100,get_total_count=True),
  1738. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1739. log("flow_extract producer total_count:%d"%total_count)
  1740. list_dict = getRow_ots(rows)
  1741. for _dict in list_dict:
  1742. docid = _dict.get(document_tmp_docid)
  1743. if docid in set_docid:
  1744. self.list_extract.insert(0,docid)
  1745. continue
  1746. else:
  1747. self.queue_extract.put(_dict)
  1748. self.list_extract.append(docid)
  1749. _count = len(list_dict)
  1750. while next_token and _count<flow_process_count:
  1751. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1752. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1753. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1754. list_dict = getRow_ots(rows)
  1755. for _dict in list_dict:
  1756. docid = _dict.get(document_tmp_docid)
  1757. if docid in set_docid:
  1758. self.list_extract.insert(0,docid)
  1759. continue
  1760. else:
  1761. self.queue_extract.put(_dict)
  1762. self.list_extract.append(docid)
  1763. _count += len(list_dict)
  1764. except Exception as e:
  1765. log("flow extract producer error:%s"%(str(e)))
  1766. traceback.print_exc()
  1767. def flow_extract(self,):
  1768. self.comsumer()
  1769. def comsumer(self):
  1770. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,35,1,True)
  1771. mt.run()
  1772. def comsumer_handle(self,item,result_queue):
  1773. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1774. "docid":item.get("docid")})
  1775. dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
  1776. item[document_tmp_dochtmlcon] = dhtml.getProperties().get(document_tmp_dochtmlcon,"")
  1777. _extract = Document_extract({})
  1778. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  1779. _extract.setValue(document_extract2_docid,item.get(document_docid))
  1780. all_done = 1
  1781. if all_done:
  1782. data = item
  1783. resp = requests.post(self.other_url,json=data,headers=self.header)
  1784. if (resp.status_code >=200 and resp.status_code<=210):
  1785. _extract.setValue(document_extract2_other_json,resp.content.decode("utf8"),True)
  1786. else:
  1787. all_done = -1
  1788. data = {}
  1789. for k,v in item.items():
  1790. data[k] = v
  1791. data["timeout"] = 240
  1792. data["doc_id"] = data.get(document_tmp_docid)
  1793. data["content"] = data.get(document_tmp_dochtmlcon,"")
  1794. if document_tmp_dochtmlcon in data:
  1795. data.pop(document_tmp_dochtmlcon)
  1796. data["title"] = data.get(document_tmp_doctitle,"")
  1797. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  1798. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  1799. if all_done:
  1800. resp = requests.post(self.extract_url,json=data,headers=self.header)
  1801. if (resp.status_code >=200 and resp.status_code<=210):
  1802. _extract.setValue(document_extract2_extract_json,resp.content.decode("utf8"),True)
  1803. else:
  1804. all_done = -2
  1805. if all_done:
  1806. resp = requests.post(self.industy_url,json=data,headers=self.header)
  1807. if (resp.status_code >=200 and resp.status_code<=210):
  1808. _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  1809. else:
  1810. all_done = -3
  1811. _dict = {document_partitionkey:item.get(document_tmp_partitionkey),
  1812. document_docid:item.get(document_tmp_docid),
  1813. }
  1814. dtmp = Document_tmp(_dict)
  1815. if all_done!=1:
  1816. sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  1817. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_failed_to),True)
  1818. dtmp.update_row(self.ots_client)
  1819. else:
  1820. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1821. dtmp.update_row(self.ots_client)
  1822. # 插入接口表,上线放开
  1823. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1824. _extract.update_row(self.ots_client)
  1825. log("process docid:%d %s"%(data["doc_id"],str(all_done)))
  1826. def start_flow_extract(self):
  1827. schedule = BlockingScheduler()
  1828. schedule.add_job(self.flow_extract_producer,"cron",second="*/10")
  1829. schedule.add_job(self.flow_extract,"cron",second="*/10")
  1830. schedule.start()
  1831. class Dataflow_dumplicate(Dataflow):
  1832. class DeleteListener():
  1833. def __init__(self,conn,_func,*args,**kwargs):
  1834. self.conn = conn
  1835. self._func = _func
  1836. def on_error(self, headers):
  1837. log('received an error %s' % str(headers.body))
  1838. def on_message(self, headers):
  1839. try:
  1840. message_id = headers.headers["message-id"]
  1841. body = headers.body
  1842. log("get message %s"%(message_id))
  1843. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  1844. except Exception as e:
  1845. traceback.print_exc()
  1846. pass
  1847. def __del__(self):
  1848. self.conn.disconnect()
  1849. def __init__(self,start_delete_listener=True):
  1850. Dataflow.__init__(self,)
  1851. self.c_f_get_extractCount = f_get_extractCount()
  1852. self.c_f_get_package = f_get_package()
  1853. logging.basicConfig(level = logging.info,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  1854. if start_delete_listener:
  1855. self.delete_comsumer_counts = 2
  1856. self.doc_delete_queue = "/queue/doc_delete_queue"
  1857. self.doc_delete_result = "/queue/doc_delete_result"
  1858. self.pool_mq_ali = ConnectorPool(1,10,getConnect_activateMQ_ali)
  1859. for _ in range(self.delete_comsumer_counts):
  1860. conn = getConnect_activateMQ_ali()
  1861. listener = self.DeleteListener(conn,self.delete_doc_handle)
  1862. createComsumer(listener,self.doc_delete_queue)
  1863. def get_dict_time(self,_extract,keys=["time_bidclose","time_bidopen","time_bidstart","time_commencement","time_completion","time_earnestMoneyEnd","time_earnestMoneyStart","time_getFileEnd","time_getFileStart","time_publicityEnd","time_publicityStart","time_registrationEnd","time_registrationStart","time_release"]):
  1864. dict_time = {}
  1865. for k in keys:
  1866. dict_time[k] = _extract.get(k)
  1867. return dict_time
  1868. def post_extract(self,_dict):
  1869. win_tenderer,bidding_budget,win_bid_price,_ = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
  1870. _dict["win_tenderer"] = win_tenderer
  1871. _dict["bidding_budget"] = bidding_budget
  1872. _dict["win_bid_price"] = win_bid_price
  1873. extract_json = _dict.get(document_tmp_extract_json,"{}")
  1874. _extract = json.loads(extract_json)
  1875. _dict["product"] = ",".join(_extract.get("product",[]))
  1876. _dict["fingerprint"] = _extract.get("fingerprint","")
  1877. _dict["project_codes"] = _extract.get("code",[])
  1878. if len(_dict["project_codes"])>0:
  1879. _dict["project_code"] = _dict["project_codes"][0]
  1880. else:
  1881. _dict["project_code"] = ""
  1882. _dict["doctitle_refine"] = _extract.get("doctitle_refine","")
  1883. _dict["nlp_enterprise"] = str({"indoctextcon":_extract.get("nlp_enterprise",[]),
  1884. "notindoctextcon":_extract.get("nlp_enterprise_attachment",[])})
  1885. _dict["extract_count"] = self.c_f_get_extractCount.evaluate(extract_json)
  1886. _dict["package"] = self.c_f_get_package.evaluate(extract_json)
  1887. _dict["project_name"] = _extract.get("name","")
  1888. _dict["dict_time"] = self.get_dict_time(_extract)
  1889. def dumplicate_fianl_check(self,base_list):
  1890. the_group = base_list
  1891. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  1892. _index = 0
  1893. base_fingerprint = "None"
  1894. if len(base_list)>0:
  1895. base_fingerprint = base_list[0]["fingerprint"]
  1896. for _i in range(1,len(base_list)):
  1897. _dict1 = base_list[_i]
  1898. fingerprint_less = _dict1["fingerprint"]
  1899. _pass = True
  1900. if fingerprint_less==base_fingerprint:
  1901. _index = _i
  1902. continue
  1903. for _j in range(min(_i,5)):
  1904. _dict2 = base_list[_j]
  1905. _prob = self.dumplicate_check(_dict1,_dict2,_dict2.get("min_counts",10),b_log=False)
  1906. # print("_prob:",_prob)
  1907. if _prob<=0.1:
  1908. _pass = False
  1909. break
  1910. log("checking index:%d"%(_i))
  1911. _index = _i
  1912. if not _pass:
  1913. _index -= 1
  1914. break
  1915. if _index>=1:
  1916. return the_group[:_index+1]
  1917. return []
  1918. def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
  1919. document_less = _dict1
  1920. docid_less = _dict1["docid"]
  1921. docchannel_less = document_less["docchannel"]
  1922. page_time_less = document_less["page_time"]
  1923. doctitle_refine_less = document_less["doctitle_refine"]
  1924. project_codes_less = document_less["project_codes"]
  1925. nlp_enterprise_less = document_less["nlp_enterprise"]
  1926. tenderee_less = document_less["tenderee"]
  1927. agency_less = document_less["agency"]
  1928. win_tenderer_less = document_less["win_tenderer"]
  1929. bidding_budget_less = document_less["bidding_budget"]
  1930. win_bid_price_less = document_less["win_bid_price"]
  1931. product_less = document_less["product"]
  1932. package_less = document_less["package"]
  1933. json_time_less = document_less["dict_time"]
  1934. project_name_less = document_less["project_name"]
  1935. fingerprint_less = document_less["fingerprint"]
  1936. extract_count_less = document_less["extract_count"]
  1937. document_greater = _dict2
  1938. docid_greater = _dict2["docid"]
  1939. page_time_greater = document_greater["page_time"]
  1940. doctitle_refine_greater = document_greater["doctitle_refine"]
  1941. project_codes_greater = document_greater["project_codes"]
  1942. nlp_enterprise_greater = document_greater["nlp_enterprise"]
  1943. tenderee_greater = document_greater["tenderee"]
  1944. agency_greater = document_greater["agency"]
  1945. win_tenderer_greater = document_greater["win_tenderer"]
  1946. bidding_budget_greater = document_greater["bidding_budget"]
  1947. win_bid_price_greater = document_greater["win_bid_price"]
  1948. product_greater = document_greater["product"]
  1949. package_greater = document_greater["package"]
  1950. json_time_greater = document_greater["dict_time"]
  1951. project_name_greater = document_greater["project_name"]
  1952. fingerprint_greater = document_greater["fingerprint"]
  1953. extract_count_greater = document_greater["extract_count"]
  1954. if fingerprint_less==fingerprint_greater:
  1955. return 1
  1956. same_count = 0
  1957. all_count = 8
  1958. if len(set(project_codes_less) & set(project_codes_greater))>0:
  1959. same_count += 1
  1960. if getLength(tenderee_less)>0 and tenderee_less==tenderee_greater:
  1961. same_count += 1
  1962. if getLength(agency_less)>0 and agency_less==agency_greater:
  1963. same_count += 1
  1964. if getLength(win_tenderer_less)>0 and win_tenderer_less==win_tenderer_greater:
  1965. same_count += 1
  1966. if getLength(bidding_budget_less)>0 and bidding_budget_less==bidding_budget_greater:
  1967. same_count += 1
  1968. if getLength(win_bid_price_less)>0 and win_bid_price_less==win_bid_price_greater:
  1969. same_count += 1
  1970. if getLength(project_name_less)>0 and project_name_less==project_name_greater:
  1971. same_count += 1
  1972. if getLength(doctitle_refine_less)>0 and doctitle_refine_less==doctitle_refine_greater:
  1973. same_count += 1
  1974. base_prob = 0
  1975. if min_counts<3:
  1976. base_prob = 0.9
  1977. elif min_counts<5:
  1978. base_prob = 0.8
  1979. elif min_counts<8:
  1980. base_prob = 0.7
  1981. else:
  1982. base_prob = 0.6
  1983. _prob = base_prob*same_count/all_count
  1984. if _prob<0.1 and min(extract_count_less,extract_count_greater)<=3:
  1985. _prob = 0.15
  1986. if _prob<0.1:
  1987. return _prob
  1988. check_result = {"pass":1}
  1989. if docchannel_less in (51,102,103,104,115,116,117):
  1990. if doctitle_refine_less!=doctitle_refine_greater:
  1991. if page_time_less!=page_time_greater:
  1992. check_result["docchannel"] = 0
  1993. check_result["pass"] = 0
  1994. else:
  1995. check_result["docchannel"] = 2
  1996. if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater):
  1997. check_result["doctitle"] = 0
  1998. check_result["pass"] = 0
  1999. if b_log:
  2000. logging.info("%d-%d,check_doctitle_failed:%s==%s"%(docid_less,docid_greater,str(doctitle_refine_less),str(doctitle_refine_greater)))
  2001. else:
  2002. check_result["doctitle"] = 2
  2003. #added check
  2004. if not check_codes(project_codes_less,project_codes_greater):
  2005. check_result["code"] = 0
  2006. check_result["pass"] = 0
  2007. if b_log:
  2008. logging.info("%d-%d,check_code_failed:%s==%s"%(docid_less,docid_greater,str(project_codes_less),str(project_codes_greater)))
  2009. else:
  2010. if getLength(project_codes_less)>0 and getLength(project_codes_greater)>0 and len(set(project_codes_less) & set(project_codes_greater))>0:
  2011. check_result["code"] = 2
  2012. else:
  2013. check_result["code"] = 1
  2014. if not check_product(product_less,product_greater):
  2015. check_result["product"] = 0
  2016. check_result["pass"] = 0
  2017. if b_log:
  2018. logging.info("%d-%d,check_product_failed:%s==%s"%(docid_less,docid_greater,str(product_less),str(product_greater)))
  2019. else:
  2020. if getLength(product_less)>0 and getLength(product_greater)>0:
  2021. check_result["product"] = 2
  2022. else:
  2023. check_result["product"] = 1
  2024. if not check_demand():
  2025. check_result["pass"] = 0
  2026. if not check_entity(nlp_enterprise_less,nlp_enterprise_greater,
  2027. tenderee_less,tenderee_greater,
  2028. agency_less,agency_greater,
  2029. win_tenderer_less,win_tenderer_greater):
  2030. check_result["entity"] = 0
  2031. check_result["pass"] = 0
  2032. if b_log:
  2033. logging.info("%d-%d,check_entity_failed:%s==%s==%s==%s==%s==%s==%s==%s"%(docid_less,docid_greater,str(nlp_enterprise_less),str(nlp_enterprise_greater),str(tenderee_less),str(tenderee_greater),str(agency_less),str(agency_greater),str(win_tenderer_less),str(win_tenderer_greater)))
  2034. else:
  2035. if docchannel_less in (51,52,103,105,114,118) and getLength(tenderee_less)>0 and getLength(tenderee_greater)>0:
  2036. check_result["entity"] = 2
  2037. elif docchannel_less in (101,119,120) and getLength(win_tenderer_less)>0 and getLength(win_tenderer_greater)>0:
  2038. check_result["entity"] = 2
  2039. else:
  2040. check_result["entity"] = 1
  2041. if not check_money(bidding_budget_less,bidding_budget_greater,
  2042. win_bid_price_less,win_bid_price_greater):
  2043. if b_log:
  2044. logging.info("%d-%d,check_money_failed:%s==%s==%s==%s"%(docid_less,docid_greater,str(bidding_budget_less),str(bidding_budget_greater),str(win_bid_price_less),str(win_bid_price_greater)))
  2045. check_result["money"] = 0
  2046. check_result["pass"] = 0
  2047. else:
  2048. if docchannel_less in (51,52,103,105,114,118) and getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0:
  2049. check_result["money"] = 2
  2050. elif docchannel_less in (101,119,120) and getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0:
  2051. check_result["money"] = 2
  2052. else:
  2053. check_result["money"] = 1
  2054. #added check
  2055. if not check_package(package_less,package_greater):
  2056. if b_log:
  2057. logging.info("%d-%d,check_package_failed:%s==%s"%(docid_less,docid_greater,str(package_less),str(package_greater)))
  2058. check_result["package"] = 0
  2059. check_result["pass"] = 0
  2060. else:
  2061. if getLength(package_less)>0 and getLength(package_greater)>0:
  2062. check_result["package"] = 2
  2063. else:
  2064. check_result["package"] = 1
  2065. #added check
  2066. if not check_time(json_time_less,json_time_greater):
  2067. if b_log:
  2068. logging.info("%d-%d,check_time_failed:%s==%s"%(docid_less,docid_greater,str(json_time_less),str(json_time_greater)))
  2069. if isinstance(json_time_less,dict):
  2070. time_less = json_time_less
  2071. else:
  2072. time_less = json.loads(json_time_less)
  2073. if isinstance(json_time_greater,dict):
  2074. time_greater = json_time_greater
  2075. else:
  2076. time_greater = json.loads(json_time_greater)
  2077. for k,v in time_less.items():
  2078. if getLength(v)>0:
  2079. v1 = time_greater.get(k,"")
  2080. if getLength(v1)>0:
  2081. if v!=v1:
  2082. log("%d-%d,key:%s"%(docid_less,docid_greater,str(k)))
  2083. check_result["time"] = 0
  2084. check_result["pass"] = 0
  2085. else:
  2086. if getLength(json_time_less)>10 and getLength(json_time_greater)>10:
  2087. check_result["time"] = 2
  2088. else:
  2089. check_result["time"] = 1
  2090. if check_result.get("pass",0)==0:
  2091. if b_log:
  2092. logging.info(str(check_result))
  2093. if check_result.get("money",1)==0:
  2094. return 0
  2095. if check_result.get("entity",1)==2 and check_result.get("code",1)==2 and check_result.get("doctitle",2)==2 and check_result.get("product",2)==2 and check_result.get("money",0)==2:
  2096. return _prob
  2097. else:
  2098. return 0
  2099. if check_result.get("time",1)==0:
  2100. return 0
  2101. return _prob
  2102. def search_data_by_query(self,item,_query,confidence,retry_times=3,merge=False,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  2103. for _ in range(retry_times):
  2104. try:
  2105. _time = time.time()
  2106. check_time = 0
  2107. if isinstance(_query,list):
  2108. bool_query = BoolQuery(should_queries=_query)
  2109. else:
  2110. bool_query = _query
  2111. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  2112. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=30,get_total_count=True),
  2113. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2114. list_dict = getRow_ots(rows)
  2115. list_data = []
  2116. for _dict in list_dict:
  2117. self.post_extract(_dict)
  2118. _docid = _dict.get(document_tmp_docid)
  2119. if merge:
  2120. list_data.append(_dict)
  2121. else:
  2122. if _docid!=item.get(document_tmp_docid):
  2123. _time1 = time.time()
  2124. confidence = self.dumplicate_check(item,_dict,total_count,b_log=False)
  2125. check_time+= time.time()-_time1
  2126. _dict["confidence"] = confidence
  2127. _dict["min_counts"] = total_count
  2128. list_data.append(_dict)
  2129. all_time = time.time()-_time
  2130. # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
  2131. return list_data
  2132. except Exception as e:
  2133. traceback.print_exc()
  2134. return []
  2135. def add_data_by_query(self,item,base_list,set_docid,_query,confidence,table_name,table_index,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  2136. list_dict = self.search_data_by_query(item,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
  2137. for _dict in list_dict:
  2138. _docid = _dict.get(document_tmp_docid)
  2139. confidence = _dict["confidence"]
  2140. if confidence>0.1:
  2141. if _docid not in set_docid:
  2142. base_list.append(_dict)
  2143. set_docid.add(_docid)
  2144. set_docid.add(_docid)
  2145. def appendRule(self,list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=False):
  2146. for k,v in _dict.items():
  2147. if getLength(v)==0:
  2148. return
  2149. _dict.update(base_dict)
  2150. if b_log:
  2151. log(str(_dict))
  2152. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  2153. _rule = {"confidence":confidence,
  2154. "item":item,
  2155. "query":_query,
  2156. "singleNum_keys":[],
  2157. "contain_keys":[],
  2158. "multiNum_keys":[]}
  2159. list_rules.append(_rule)
  2160. def translate_dumplicate_rules(self,status_from,item,get_all=False,to_log=False):
  2161. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  2162. current_date = getCurrent_date("%Y-%m-%d")
  2163. if page_time=='':
  2164. page_time = current_date
  2165. if page_time>=timeAdd(current_date,-2):
  2166. table_name = "document_tmp"
  2167. table_index = "document_tmp_index"
  2168. base_dict = {
  2169. "docchannel":item["docchannel"],
  2170. "status":[status_from[0]],
  2171. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  2172. }
  2173. must_not_dict = {"save":0,"docid":item.get("docid")}
  2174. doctitle_refine_name = "doctitle_refine"
  2175. else:
  2176. table_name = "document"
  2177. table_index = "document_index"
  2178. if get_all:
  2179. _status = [201,450]
  2180. else:
  2181. _status = [201,300]
  2182. base_dict = {
  2183. "docchannel":item["docchannel"],
  2184. "status":_status,
  2185. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  2186. }
  2187. must_not_dict = {"docid":item.get("docid")}
  2188. doctitle_refine_name = "doctitle"
  2189. list_rules = []
  2190. singleNum_keys = ["tenderee","win_tenderer"]
  2191. confidence = 100
  2192. self.appendRule(list_rules,{document_tmp_fingerprint:fingerprint},base_dict,must_not_dict,confidence,item)
  2193. confidence = 90
  2194. _dict = {document_tmp_agency:agency,
  2195. "win_tenderer":win_tenderer,
  2196. "win_bid_price":win_bid_price}
  2197. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2198. _dict = {document_tmp_agency:agency,
  2199. "win_tenderer":win_tenderer,
  2200. "bidding_budget":bidding_budget}
  2201. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2202. _dict = {document_tmp_agency:agency,
  2203. "win_bid_price":win_bid_price,
  2204. "bidding_budget":bidding_budget}
  2205. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2206. _dict = {win_tenderer:win_tenderer,
  2207. "win_bid_price":win_bid_price,
  2208. "bidding_budget":bidding_budget}
  2209. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2210. _dict = {"tenderee":tenderee,
  2211. "win_tenderer":win_tenderer,
  2212. "win_bid_price":win_bid_price}
  2213. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2214. _dict = {"tenderee":tenderee,
  2215. "win_tenderer":win_tenderer,
  2216. "bidding_budget":bidding_budget}
  2217. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2218. _dict = {"tenderee":tenderee,
  2219. "win_bid_price":win_bid_price,
  2220. "bidding_budget":bidding_budget}
  2221. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2222. _dict = {"tenderee":tenderee,
  2223. "agency":agency,
  2224. "win_tenderer":win_tenderer}
  2225. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2226. _dict = {"tenderee":tenderee,
  2227. "agency":agency,
  2228. "win_bid_price":win_bid_price}
  2229. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2230. _dict = {"tenderee":tenderee,
  2231. "agency":agency,
  2232. "bidding_budget":bidding_budget}
  2233. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2234. confidence=85
  2235. _dict = {"tenderee":tenderee,
  2236. "agency":agency
  2237. }
  2238. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2239. _dict = {"tenderee":tenderee,
  2240. "project_codes":project_code
  2241. }
  2242. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2243. _dict = {"tenderee":tenderee,
  2244. "project_name":project_name
  2245. }
  2246. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2247. if getLength(product)>0:
  2248. l_p = product.split(",")
  2249. _dict = {"tenderee":tenderee,
  2250. "product":l_p[0]
  2251. }
  2252. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2253. _dict = {"tenderee":tenderee,
  2254. "win_tenderer":win_tenderer
  2255. }
  2256. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2257. _dict = {"tenderee":tenderee,
  2258. "win_bid_price":win_bid_price
  2259. }
  2260. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2261. _dict = {"tenderee":tenderee,
  2262. "bidding_budget":bidding_budget
  2263. }
  2264. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2265. _dict = {"tenderee":tenderee,
  2266. doctitle_refine_name:doctitle_refine
  2267. }
  2268. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2269. _dict = {"agency":agency,
  2270. "project_codes":project_code
  2271. }
  2272. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2273. _dict = {"agency":agency,
  2274. "project_name":project_name
  2275. }
  2276. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2277. _dict = {"project_codes":project_code,
  2278. "project_name":project_name
  2279. }
  2280. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2281. _dict = {"project_codes":project_code,
  2282. "win_tenderer":win_tenderer
  2283. }
  2284. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2285. _dict = {"project_codes":project_code,
  2286. "win_bid_price":win_bid_price
  2287. }
  2288. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2289. _dict = {"project_codes":project_code,
  2290. "bidding_budget":bidding_budget
  2291. }
  2292. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2293. _dict = {"project_codes":project_code,
  2294. doctitle_refine_name:doctitle_refine
  2295. }
  2296. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2297. _dict = {"project_name":project_name,
  2298. "win_tenderer":win_tenderer
  2299. }
  2300. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2301. _dict = {"project_name":project_name,
  2302. "win_bid_price":win_bid_price
  2303. }
  2304. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2305. _dict = {"project_name":project_name,
  2306. "bidding_budget":bidding_budget
  2307. }
  2308. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2309. _dict = {"project_name":project_name,
  2310. doctitle_refine_name:doctitle_refine
  2311. }
  2312. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2313. _dict = {"win_tenderer":win_tenderer,
  2314. "win_bid_price":win_bid_price
  2315. }
  2316. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2317. _dict = {"win_tenderer":win_tenderer,
  2318. "bidding_budget":bidding_budget
  2319. }
  2320. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2321. _dict = {"win_tenderer":win_tenderer,
  2322. doctitle_refine_name:doctitle_refine
  2323. }
  2324. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2325. _dict = {"win_bid_price":win_bid_price,
  2326. "bidding_budget":bidding_budget
  2327. }
  2328. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2329. _dict = {"win_bid_price":win_bid_price,
  2330. doctitle_refine_name:doctitle_refine
  2331. }
  2332. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2333. _dict = {"bidding_budget":bidding_budget,
  2334. doctitle_refine_name:doctitle_refine
  2335. }
  2336. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2337. confidence=80
  2338. _dict = {doctitle_refine_name:doctitle_refine}
  2339. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2340. _dict = {"project_codes":project_code}
  2341. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2342. confidence=70
  2343. _dict = {"project_name":project_name}
  2344. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
  2345. return list_rules,table_name,table_index
  2346. def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
  2347. def producer(columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]):
  2348. bool_query = BoolQuery(must_queries=[
  2349. RangeQuery(document_tmp_status,*status_from,True,True),
  2350. # TermQuery("docid",271983871)
  2351. ])
  2352. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2353. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  2354. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2355. log("flow_dumplicate producer total_count:%d"%total_count)
  2356. list_dict = getRow_ots(rows)
  2357. for _dict in list_dict:
  2358. self.queue_dumplicate.put(_dict)
  2359. _count = len(list_dict)
  2360. while next_token and _count<flow_process_count:
  2361. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2362. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  2363. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2364. list_dict = getRow_ots(rows)
  2365. for _dict in list_dict:
  2366. self.queue_dumplicate.put(_dict)
  2367. _count += len(list_dict)
  2368. def comsumer():
  2369. mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,50,1,ots_client=self.ots_client)
  2370. mt.run()
  2371. producer()
  2372. comsumer()
  2373. def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment]):
  2374. '''
  2375. 根据docid查询公告内容,先查询document_tmp,再查询document
  2376. :param list_docids:
  2377. :return:
  2378. '''
  2379. list_docs = []
  2380. for _docid in list_docids:
  2381. docid = int(_docid)
  2382. _dict = {document_partitionkey:getPartitionKey(docid),
  2383. document_docid:docid}
  2384. _doc = Document_tmp(_dict)
  2385. _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
  2386. if not _exists:
  2387. _doc = Document(_dict)
  2388. _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
  2389. if _exists:
  2390. list_docs.append(_doc)
  2391. for _doc in list_docs:
  2392. try:
  2393. _sub_docs_json = _doc.getProperties().get(document_tmp_sub_docs_json)
  2394. if _sub_docs_json is not None:
  2395. _doc.setValue("sub_docs",json.loads(_sub_docs_json),False)
  2396. except Exception as e:
  2397. traceback.print_exc()
  2398. list_docs.sort(key=lambda x:x.getProperties().get(document_page_time,""))
  2399. return list_docs
  2400. def is_same_package(self,_dict1,_dict2):
  2401. sub_project_name1 = _dict1.get(project_sub_project_name,"")
  2402. if sub_project_name1=="Project":
  2403. sub_project_name1 = ""
  2404. win_tenderer1 = _dict1.get(project_win_tenderer,"")
  2405. win_bid_price1 = _dict1.get(project_win_bid_price,0)
  2406. bidding_budget1 = _dict1.get(project_bidding_budget,0)
  2407. sub_project_name2 = _dict2.get(project_sub_project_name,"")
  2408. if sub_project_name2=="Project":
  2409. sub_project_name2 = ""
  2410. win_tenderer2 = _dict2.get(project_win_tenderer,"")
  2411. win_bid_price2 = _dict2.get(project_win_bid_price,0)
  2412. bidding_budget2 = _dict2.get(project_bidding_budget,0)
  2413. _set = set([a for a in [sub_project_name1,sub_project_name2] if a!=""])
  2414. if len(_set)>1:
  2415. return False
  2416. _set = set([a for a in [win_tenderer1,win_tenderer2] if a!=""])
  2417. if len(_set)>1:
  2418. return False
  2419. _set = set([a for a in [win_bid_price1,win_bid_price2] if a!=0])
  2420. if len(_set)>1:
  2421. return False
  2422. _set = set([a for a in [bidding_budget1,bidding_budget2] if a!=0])
  2423. if len(_set)>1:
  2424. return False
  2425. return True
  2426. def getUpdate_dict(self,_dict):
  2427. update_dict = {}
  2428. for k,v in _dict.items():
  2429. if v is None:
  2430. continue
  2431. if isinstance(v,str):
  2432. if v=="":
  2433. continue
  2434. if isinstance(v,(float,int)):
  2435. if v==0:
  2436. continue
  2437. update_dict[k] = v
  2438. return update_dict
  2439. def update_projects_by_document(self,docid,projects):
  2440. '''
  2441. 更新projects中对应的document的属性
  2442. :param docid:
  2443. :param projects: 项目集合
  2444. :param action:add/delete add时附加唯一属性,delete时删除唯一属性
  2445. :return:
  2446. '''
  2447. list_docs = self.search_docs([docid])
  2448. docs = [_doc.getProperties() for _doc in list_docs]
  2449. project_dict = generate_common_properties(docs)
  2450. print("list_docs",project_dict)
  2451. list_package_properties = generate_packages_properties(docs)
  2452. _dict = {}
  2453. #更新公共属性
  2454. for k,v in project_dict.items():
  2455. if v is None or v=="":
  2456. continue
  2457. if k in (project_project_dynamics,project_product,project_project_codes,project_docids):
  2458. continue
  2459. for _proj in projects:
  2460. if k not in _proj:
  2461. _dict[k] = v
  2462. elif _proj.get(k,"未知")=="未知":
  2463. _dict[k] = v
  2464. for _proj in projects:
  2465. _proj.update(_dict)
  2466. #拼接属性
  2467. append_dict = {}
  2468. set_docid = set()
  2469. set_product = set()
  2470. set_code = set()
  2471. set_nlp_enterprise = set()
  2472. set_nlp_enterprise_attachment = set()
  2473. for _proj in projects:
  2474. _docids = _proj.get(project_docids,"")
  2475. _codes = _proj.get(project_project_codes,"")
  2476. _product = _proj.get(project_product,"")
  2477. set_docid = set_docid | set(_docids.split(","))
  2478. set_code = set_code | set(_codes.split(","))
  2479. set_product = set_product | set(_product.split(","))
  2480. try:
  2481. set_nlp_enterprise |= set(json.loads(_proj.get(project_nlp_enterprise,"[]")))
  2482. set_nlp_enterprise_attachment |= set(json.loads(_proj.get(project_nlp_enterprise_attachment,"[]")))
  2483. except Exception as e:
  2484. pass
  2485. set_docid = set_docid | set(project_dict.get(project_docids,"").split(","))
  2486. set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
  2487. set_product = set_product | set(project_dict.get(project_product,"").split(","))
  2488. try:
  2489. set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
  2490. set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
  2491. except Exception as e:
  2492. pass
  2493. append_dict[project_docids] = ",".join([a for a in list(set_docid) if a!=""])
  2494. append_dict[project_docid_number] = len(set_docid)
  2495. append_dict[project_project_codes] = ",".join([a for a in list(set_code) if a!=""])
  2496. append_dict[project_product] = ",".join([a for a in list(set_product) if a!=""])
  2497. append_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
  2498. append_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
  2499. dict_dynamic = {}
  2500. set_docid = set()
  2501. for _proj in projects:
  2502. _dynamic = json.loads(_proj.get(project_project_dynamics,"[]"))
  2503. for _dy in _dynamic:
  2504. _docid = _dy.get("docid")
  2505. dict_dynamic[_docid] = _dy
  2506. _dynamic = json.loads(project_dict.get(project_project_dynamics,"[]"))
  2507. for _dy in _dynamic:
  2508. _docid = _dy.get("docid")
  2509. dict_dynamic[_docid] = _dy
  2510. list_dynamics = []
  2511. for k,v in dict_dynamic.items():
  2512. list_dynamics.append(v)
  2513. list_dynamics.sort(key=lambda x:x.get(document_page_time,""))
  2514. append_dict[project_project_dynamics] = json.dumps(list_dynamics[:100],ensure_ascii=False)
  2515. for _proj in projects:
  2516. _proj.update(append_dict)
  2517. dict_package = {}
  2518. for _pp in projects:
  2519. _counts = 0
  2520. sub_project_name = _pp.get(project_sub_project_name,"")
  2521. if sub_project_name=="Project":
  2522. sub_project_name = ""
  2523. win_tenderer = _pp.get(project_win_tenderer,"")
  2524. win_bid_price = _pp.get(project_win_bid_price,0)
  2525. bidding_budget = _pp.get(project_bidding_budget,0)
  2526. if win_tenderer!="" and bidding_budget!=0:
  2527. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2528. dict_package[_key] = _pp
  2529. _counts += 1
  2530. if win_tenderer!="" and win_bid_price!=0:
  2531. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2532. dict_package[_key] = _pp
  2533. _counts +=1
  2534. if _counts==0:
  2535. if win_tenderer!="":
  2536. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2537. dict_package[_key] = _pp
  2538. _counts += 1
  2539. if bidding_budget!=0:
  2540. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2541. dict_package[_key] = _pp
  2542. _counts += 1
  2543. #更新私有属性
  2544. for _pp in list_package_properties:
  2545. flag_update = False
  2546. sub_project_name = _pp.get(project_sub_project_name,"")
  2547. if sub_project_name=="Project":
  2548. sub_project_name = ""
  2549. win_tenderer = _pp.get(project_win_tenderer,"")
  2550. win_bid_price = _pp.get(project_win_bid_price,0)
  2551. bidding_budget = _pp.get(project_bidding_budget,0)
  2552. if win_tenderer!="" and bidding_budget!=0:
  2553. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2554. if _key in dict_package:
  2555. if self.is_same_package(_pp,dict_package[_key]):
  2556. ud = self.getUpdate_dict(_pp)
  2557. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2558. dict_package[_key].update(ud)
  2559. flag_update = True
  2560. continue
  2561. if win_tenderer!="" and win_bid_price!=0:
  2562. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2563. if _key in dict_package:
  2564. if self.is_same_package(_pp,dict_package[_key]):
  2565. ud = self.getUpdate_dict(_pp)
  2566. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2567. dict_package[_key].update(ud)
  2568. flag_update = True
  2569. continue
  2570. if win_tenderer!="":
  2571. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2572. if _key in dict_package:
  2573. if self.is_same_package(_pp,dict_package[_key]):
  2574. ud = self.getUpdate_dict(_pp)
  2575. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2576. dict_package[_key].update(ud)
  2577. flag_update = True
  2578. continue
  2579. if bidding_budget!=0:
  2580. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2581. if _key in dict_package:
  2582. if self.is_same_package(_pp,dict_package[_key]):
  2583. ud = self.getUpdate_dict(_pp)
  2584. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2585. dict_package[_key].update(ud)
  2586. flag_update = True
  2587. continue
  2588. if not flag_update:
  2589. _pp.update(project_dict)
  2590. projects.append(_pp)
  2591. _counts = 0
  2592. if win_tenderer!="" and bidding_budget!=0:
  2593. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2594. dict_package[_key] = _pp
  2595. _counts += 1
  2596. if win_tenderer!="" and win_bid_price!=0:
  2597. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2598. dict_package[_key] = _pp
  2599. _counts +=1
  2600. if _counts==0:
  2601. if win_tenderer!="":
  2602. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2603. dict_package[_key] = _pp
  2604. _counts += 1
  2605. if bidding_budget!=0:
  2606. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2607. dict_package[_key] = _pp
  2608. _counts += 1
  2609. def delete_projects_by_document(self,docid):
  2610. '''
  2611. 更新projects中对应的document的属性
  2612. :param docid:
  2613. :param projects: 项目集合
  2614. :param action:add/delete add时附加唯一属性,delete时删除唯一属性
  2615. :return:
  2616. '''
  2617. set_docid = set()
  2618. list_delete_projects = []
  2619. list_projects = self.search_projects_with_document([docid])
  2620. for _proj in list_projects:
  2621. _p = {}
  2622. _docids = _proj.get(project_docids,"")
  2623. print(_proj.get(project_uuid))
  2624. _p["delete_uuid"] = _proj.get(project_uuid)
  2625. _p["to_delete"] = True
  2626. list_delete_projects.append(_p)
  2627. if _docids!="":
  2628. set_docid = set_docid | set(_docids.split(","))
  2629. if str(docid) in set_docid:
  2630. set_docid.remove(str(docid))
  2631. list_docid = list(set_docid)
  2632. list_projects = []
  2633. if len(list_docid)>0:
  2634. list_docs = self.search_docs(list_docid)
  2635. list_projects = self.generate_projects_from_document(list_docs)
  2636. list_projects = dumplicate_projects(list_projects)
  2637. list_projects.extend(list_delete_projects)
  2638. project_json = to_project_json(list_projects)
  2639. print("delete_json",project_json)
  2640. return project_json
  2641. def delete_doc_handle(self,_dict,result_queue):
  2642. headers = _dict.get("frame")
  2643. conn = _dict.get("conn")
  2644. log("==========delete")
  2645. if headers is not None:
  2646. message_id = headers.headers["message-id"]
  2647. body = headers.body
  2648. item = json.loads(body)
  2649. docid = item.get("docid")
  2650. if docid is None:
  2651. return
  2652. delete_result = self.delete_projects_by_document(docid)
  2653. if send_msg_toacmq(self.pool_mq_ali,delete_result,self.doc_delete_result):
  2654. ackMsg(conn,message_id)
  2655. def generate_common_properties(self,list_docs):
  2656. '''
  2657. #通用属性生成
  2658. :param list_docis:
  2659. :return:
  2660. '''
  2661. #计数法选择
  2662. choose_dict = {}
  2663. project_dict = {}
  2664. for _key in [document_bidway,document_industry,document_info_type,document_info_source,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]:
  2665. for _doc in list_docs:
  2666. _value = _doc.getProperties().get(_key,"")
  2667. if _value!="":
  2668. if _key not in choose_dict:
  2669. choose_dict[_key] = {}
  2670. if _value not in choose_dict[_key]:
  2671. choose_dict[_key][_value] = 0
  2672. choose_dict[_key][_value] += 1
  2673. _find = False
  2674. for _key in [document_district,document_city,document_province,document_area]:
  2675. area_dict = {}
  2676. for _doc in list_docs:
  2677. loc = _doc.getProperties().get(_key,"未知")
  2678. if loc not in ('全国','未知',"0"):
  2679. if loc not in area_dict:
  2680. area_dict[loc] = 0
  2681. area_dict[loc] += 1
  2682. list_loc = []
  2683. for k,v in area_dict.items():
  2684. list_loc.append([k,v])
  2685. list_loc.sort(key=lambda x:x[1],reverse=True)
  2686. if len(list_loc)>0:
  2687. project_dict[document_district] = _doc.getProperties().get(document_district)
  2688. project_dict[document_city] = _doc.getProperties().get(document_city)
  2689. project_dict[document_province] = _doc.getProperties().get(document_province)
  2690. project_dict[document_area] = _doc.getProperties().get(document_area)
  2691. _find = True
  2692. break
  2693. if not _find:
  2694. if len(list_docs)>0:
  2695. project_dict[document_district] = list_docs[0].getProperties().get(document_district)
  2696. project_dict[document_city] = list_docs[0].getProperties().get(document_city)
  2697. project_dict[document_province] = list_docs[0].getProperties().get(document_province)
  2698. project_dict[document_area] = list_docs[0].getProperties().get(document_area)
  2699. for _key,_value in choose_dict.items():
  2700. _l = []
  2701. for k,v in _value.items():
  2702. _l.append([k,v])
  2703. _l.sort(key=lambda x:x[1],reverse=True)
  2704. if len(_l)>0:
  2705. _v = _l[0][0]
  2706. if _v in ('全国','未知'):
  2707. if len(_l)>1:
  2708. _v = _l[1][0]
  2709. project_dict[_key] = _v
  2710. list_dynamics = []
  2711. docid_number = 0
  2712. visuable_docids = []
  2713. zhao_biao_page_time = ""
  2714. zhong_biao_page_time = ""
  2715. list_codes = []
  2716. list_product = []
  2717. p_page_time = ""
  2718. remove_docids = set()
  2719. for _doc in list_docs:
  2720. table_name = _doc.getProperties().get("table_name")
  2721. status = _doc.getProperties().get(document_status,0)
  2722. _save = _doc.getProperties().get(document_tmp_save,1)
  2723. doctitle = _doc.getProperties().get(document_doctitle,"")
  2724. docchannel = _doc.getProperties().get(document_docchannel)
  2725. page_time = _doc.getProperties().get(document_page_time,"")
  2726. _docid = _doc.getProperties().get(document_docid)
  2727. _bidway = _doc.getProperties().get(document_bidway,"")
  2728. _docchannel = _doc.getProperties().get(document_life_docchannel,0)
  2729. project_codes = _doc.getProperties().get(document_project_codes)
  2730. product = _doc.getProperties().get(document_product)
  2731. sub_docs = _doc.getProperties().get("sub_docs",[])
  2732. is_multipack = True if len(sub_docs)>1 else False
  2733. extract_count = _doc.getProperties().get(document_tmp_extract_count,0)
  2734. if product is not None:
  2735. list_product.extend(product.split(","))
  2736. if project_codes is not None:
  2737. _c = project_codes.split(",")
  2738. list_codes.extend(_c)
  2739. if p_page_time=="":
  2740. p_page_time = page_time
  2741. if zhao_biao_page_time=="" and _docchannel in (51,52,102,103,114):
  2742. zhao_biao_page_time = page_time
  2743. if zhong_biao_page_time=="" and _docchannel in (101,118,119,120):
  2744. zhong_biao_page_time = page_time
  2745. is_visuable = 0
  2746. if table_name=="document":
  2747. if status>=201 and status<=300:
  2748. docid_number +=1
  2749. visuable_docids.append(str(_docid))
  2750. is_visuable = 1
  2751. else:
  2752. remove_docids.add(str(_docid))
  2753. else:
  2754. if _save==1:
  2755. docid_number +=1
  2756. visuable_docids.append(str(_docid))
  2757. is_visuable = 1
  2758. else:
  2759. remove_docids.add(str(_docid))
  2760. list_dynamics.append({document_docid:_docid,
  2761. document_doctitle:doctitle,
  2762. document_docchannel:_docchannel,
  2763. document_bidway:_bidway,
  2764. document_page_time:page_time,
  2765. document_status:201 if is_visuable==1 else 401,
  2766. "is_multipack":is_multipack,
  2767. document_tmp_extract_count:extract_count
  2768. }
  2769. )
  2770. project_dict[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
  2771. project_dict[project_docid_number] = docid_number
  2772. project_dict[project_docids] = ",".join(list(set(visuable_docids)-remove_docids))
  2773. if zhao_biao_page_time !="":
  2774. project_dict[project_zhao_biao_page_time] = zhao_biao_page_time
  2775. if zhong_biao_page_time !="":
  2776. project_dict[project_zhong_biao_page_time] = zhong_biao_page_time
  2777. project_dict[project_project_codes] = ",".join(list(set(list_codes)))
  2778. project_dict[project_page_time] = p_page_time
  2779. project_dict[project_product] = ",".join(list(set(list_product)))
  2780. return project_dict
  2781. def generate_packages_properties(self,list_docs):
  2782. '''
  2783. 生成分包属性
  2784. :param list_docs:
  2785. :return:
  2786. '''
  2787. list_properties = []
  2788. set_key = set()
  2789. for _doc in list_docs:
  2790. _dict = {}
  2791. sub_docs = _doc.getProperties().get("sub_docs")
  2792. if sub_docs is not None:
  2793. for _d in sub_docs:
  2794. sub_project_code = _d.get(project_sub_project_code,"")
  2795. sub_project_name = _d.get(project_sub_project_name,"")
  2796. win_tenderer = _d.get(project_win_tenderer,"")
  2797. win_bid_price = _d.get(project_win_bid_price,"")
  2798. _key = "%s-%s-%s-%s"%(sub_project_code,sub_project_name,win_tenderer,win_bid_price)
  2799. if _key in set_key:
  2800. continue
  2801. set_key.add(_key)
  2802. list_properties.append(_d)
  2803. return list_properties
  2804. def generate_projects_from_document(self,list_docs):
  2805. '''
  2806. #通过公告生成projects
  2807. :param list_docids:
  2808. :return:
  2809. '''
  2810. #判断标段数
  2811. list_projects = generate_projects([doc.getProperties() for doc in list_docs])
  2812. return list_projects
  2813. def search_projects_with_document(self,list_docids):
  2814. '''
  2815. 通过docid集合查询对应的projects
  2816. :param list_docids:
  2817. :return:
  2818. '''
  2819. print("==",list_docids)
  2820. list_should_q = []
  2821. for _docid in list_docids:
  2822. list_should_q.append(TermQuery("docids",_docid))
  2823. bool_query = BoolQuery(should_queries=list_should_q)
  2824. _query = {"query":bool_query,"limit":20}
  2825. list_project_dict = getDocument(_query,self.ots_client,[
  2826. project_uuid,project_docids,project_zhao_biao_page_time,
  2827. project_zhong_biao_page_time,
  2828. project_page_time,
  2829. project_area,
  2830. project_province,
  2831. project_city,
  2832. project_district,
  2833. project_info_type,
  2834. project_industry,
  2835. project_qcodes,
  2836. project_project_name,
  2837. project_project_code,
  2838. project_project_codes,
  2839. project_project_addr,
  2840. project_tenderee,
  2841. project_tenderee_addr,
  2842. project_tenderee_phone,
  2843. project_tenderee_contact,
  2844. project_agency,
  2845. project_agency_phone,
  2846. project_agency_contact,
  2847. project_sub_project_name,
  2848. project_sub_project_code,
  2849. project_bidding_budget,
  2850. project_win_tenderer,
  2851. project_win_bid_price,
  2852. project_win_tenderer_manager,
  2853. project_win_tenderer_phone,
  2854. project_second_tenderer,
  2855. project_second_bid_price,
  2856. project_second_tenderer_manager,
  2857. project_second_tenderer_phone,
  2858. project_third_tenderer,
  2859. project_third_bid_price,
  2860. project_third_tenderer_manager,
  2861. project_third_tenderer_phone,
  2862. project_procurement_system,
  2863. project_bidway,
  2864. project_dup_data,
  2865. project_docid_number,
  2866. project_project_dynamics,
  2867. project_product,
  2868. project_moneysource,
  2869. project_service_time,
  2870. project_time_bidclose,
  2871. project_time_bidopen,
  2872. project_time_bidstart,
  2873. project_time_commencement,
  2874. project_time_completion,
  2875. project_time_earnest_money_start,
  2876. project_time_earnest_money_end,
  2877. project_time_get_file_end,
  2878. project_time_get_file_start,
  2879. project_time_publicity_end,
  2880. project_time_publicity_start,
  2881. project_time_registration_end,
  2882. project_time_registration_start,
  2883. project_time_release,
  2884. project_dup_docid,
  2885. project_info_source,
  2886. project_nlp_enterprise,
  2887. project_nlp_enterprise_attachment,
  2888. ],sort="page_time",table_name="project2",table_index="project2_index")
  2889. return list_project_dict
  2890. def set_project_uuid(self,_dict,_uuid):
  2891. if _uuid is not None and _uuid!="":
  2892. if "uuid" in _dict:
  2893. _dict["uuid"] = "%s,%s"%(_dict["uuid"],_uuid)
  2894. else:
  2895. _dict["uuid"] = _uuid
  2896. def getMerge_rules(self,page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price):
  2897. whole_time_start = time.time()
  2898. _time = time.time()
  2899. list_query = []
  2900. list_code = [a for a in project_codes.split(",") if a!='']
  2901. should_q_code = BoolQuery(should_queries=[MatchQuery(project_project_codes,a) for a in list_code[:20]])
  2902. should_q_cod = BoolQuery(should_queries=[MatchQuery(project_project_code,a) for a in list_code[:20]])
  2903. list_product = [a for a in product.split(",") if a!='']
  2904. should_q_product = BoolQuery(should_queries=[MatchQuery(project_product,a) for a in list_product[:20]])
  2905. prepare_time = time.time()-_time
  2906. _time = time.time()
  2907. # log("list_code %s"%(str(list_code)))
  2908. # log("list_product %s"%(str(list_product)))
  2909. # log("tenderee %s"%(tenderee))
  2910. # log("bidding_budget %s"%(bidding_budget))
  2911. # log("win_tenderer %s"%(win_tenderer))
  2912. # log("win_bid_price %s"%(win_bid_price))
  2913. # log("project_name %s"%(project_name))
  2914. log_time = time.time()-_time
  2915. _time = time.time()
  2916. if tenderee!="" and len(list_code)>0:
  2917. _query = [TermQuery(project_tenderee,tenderee),
  2918. should_q_code,
  2919. ]
  2920. list_query.append([_query,2])
  2921. _query = [TermQuery(project_tenderee,tenderee),
  2922. should_q_cod
  2923. ]
  2924. list_query.append([_query,2])
  2925. if tenderee!="" and len(list_product)>0:
  2926. _query = [TermQuery(project_tenderee,tenderee),
  2927. should_q_product]
  2928. list_query.append([_query,2])
  2929. if tenderee!="" and project_name!="":
  2930. _query = [TermQuery(project_tenderee,tenderee),
  2931. TermQuery(project_project_name,project_name)]
  2932. list_query.append([_query,2])
  2933. if tenderee!="" and bidding_budget>0:
  2934. _query = [TermQuery(project_tenderee,tenderee),
  2935. TermQuery(project_bidding_budget,bidding_budget)]
  2936. list_query.append([_query,2])
  2937. if tenderee!="" and win_tenderer!="":
  2938. _query = [TermQuery(project_tenderee,tenderee),
  2939. TermQuery(project_win_tenderer,win_tenderer)]
  2940. list_query.append([_query,2])
  2941. if win_tenderer!="" and len(list_code)>0:
  2942. _query = [TermQuery(project_win_tenderer,win_tenderer),
  2943. should_q_code]
  2944. list_query.append([_query,2])
  2945. _query = [TermQuery(project_win_tenderer,win_tenderer),
  2946. should_q_cod]
  2947. list_query.append([_query,2])
  2948. if win_tenderer!="" and win_bid_price>0:
  2949. _query = [TermQuery(project_win_tenderer,win_tenderer),
  2950. TermQuery(project_win_bid_price,win_bid_price)]
  2951. list_query.append([_query,2])
  2952. if len(list_code)>0:
  2953. _query = [
  2954. should_q_code]
  2955. list_query.append([_query,1])
  2956. _query = [
  2957. should_q_cod]
  2958. list_query.append([_query,1])
  2959. if project_name!="":
  2960. _query = [
  2961. TermQuery(project_project_name,project_name)]
  2962. list_query.append([_query,1])
  2963. generate_time = time.time()-_time
  2964. whole_time = time.time()-whole_time_start
  2965. log("projects merge rules whole_time:%.3f prepare_time:%.3f log_time:%.3f generate_time:%.3f"%(whole_time,prepare_time,log_time,generate_time))
  2966. return list_query
  2967. def merge_projects(self,list_projects,b_log=False,columns=[project_tenderee,project_agency,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_sub_project_name,project_product,project_zhao_biao_page_time,project_zhong_biao_page_time,project_project_code,project_project_codes,project_docids]):
  2968. '''
  2969. 对项目进行合并
  2970. :return:
  2971. '''
  2972. whole_time_start = time.time()
  2973. set_uuid = set()
  2974. for _proj in list_projects:
  2975. _uuid = _proj.get("uuid")
  2976. if _uuid is not None:
  2977. set_uuid = set_uuid | set(_uuid.split(","))
  2978. must_not_q = []
  2979. for _uuid in list(set_uuid):
  2980. must_not_q.append(TermQuery("uuid",_uuid))
  2981. projects_merge_count = 0
  2982. projects_check_rule_time = 0
  2983. projects_update_time = 0
  2984. projects_query_time = 0
  2985. projects_prepare_time = 0
  2986. for _proj in list_projects:
  2987. page_time = _proj.get(project_page_time,"")
  2988. project_codes = _proj.get(project_project_codes,"")
  2989. project_name = _proj.get(project_project_name,"")
  2990. tenderee = _proj.get(project_tenderee,"")
  2991. agency = _proj.get(project_agency,"")
  2992. product = _proj.get(project_product,"")
  2993. sub_project_name = _proj.get(project_sub_project_name,"")
  2994. bidding_budget = _proj.get(project_bidding_budget,-1)
  2995. win_tenderer = _proj.get(project_win_tenderer,"")
  2996. win_bid_price = _proj.get(project_win_bid_price,-1)
  2997. page_time_less = timeAdd(page_time,-150)
  2998. page_time_greater = timeAdd(page_time,120)
  2999. sub_project_q = TermQuery(project_sub_project_name,sub_project_name) if sub_project_name.replace("Project","")!="" else None
  3000. _time = time.time()
  3001. list_must_query = self.getMerge_rules(page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price)
  3002. list_merge_data = []
  3003. _step = 5
  3004. _begin = 0
  3005. must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
  3006. ]
  3007. if sub_project_q is not None:
  3008. must_queries.append(sub_project_q)
  3009. projects_prepare_time += time.time()-_time
  3010. _time = time.time()
  3011. while _begin<len(list_must_query):
  3012. list_should_q = []
  3013. _limit = 20
  3014. for must_q,_count in list_must_query[_begin:_begin+_step]:
  3015. must_q1 = list(must_q)
  3016. must_q1.extend(must_queries)
  3017. list_should_q.append(BoolQuery(must_queries=must_q1))
  3018. # _limit += _count*5
  3019. _query = BoolQuery(
  3020. should_queries=list_should_q,
  3021. must_not_queries=must_not_q[:100])
  3022. rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
  3023. SearchQuery(_query,limit=_limit),
  3024. columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3025. list_data = getRow_ots(rows)
  3026. list_merge_data.extend(list_data)
  3027. # print(list_data)
  3028. for _data in list_data:
  3029. must_not_q.append(TermQuery(project_uuid,_data.get(project_uuid)))
  3030. _begin += _step
  3031. projects_query_time += time.time()-_time
  3032. #优先匹配招标金额相近的
  3033. projects_merge_count = len(list_merge_data)
  3034. list_merge_data.sort(key=lambda x:x.get(project_bidding_budget,-1))
  3035. for _data in list_merge_data:
  3036. _time = time.time()
  3037. _check = check_merge_rule(_proj,_data,b_log=b_log)
  3038. projects_check_rule_time += time.time()-_time
  3039. if _check:
  3040. _time = time.time()
  3041. update_projects_by_project(_data,[_proj])
  3042. projects_update_time += time.time()-_time
  3043. whole_time = time.time()-whole_time_start
  3044. log("merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
  3045. return list_projects
  3046. def merge_document_real(self,item,dup_docid,table_name,status_to=None,b_log=False):
  3047. '''
  3048. 实时项目合并
  3049. :param item:
  3050. :param dup_docid:重复的公告集合
  3051. :param status_to:
  3052. :return:
  3053. '''
  3054. list_docids = []
  3055. _docid = item.get(document_tmp_docid)
  3056. list_docids.append(_docid)
  3057. if isinstance(dup_docid,list):
  3058. list_docids.extend(dup_docid)
  3059. list_docids = [a for a in list_docids if a is not None]
  3060. _time = time.time()
  3061. list_projects = self.search_projects_with_document(list_docids)
  3062. log("search projects takes:%.3f"%(time.time()-_time))
  3063. if len(list_projects)==0:
  3064. _time = time.time()
  3065. list_docs = self.search_docs(list_docids)
  3066. log("search document takes:%.3f"%(time.time()-_time))
  3067. _time = time.time()
  3068. list_projects = self.generate_projects_from_document(list_docs)
  3069. log("generate projects takes:%.3f"%(time.time()-_time))
  3070. else:
  3071. _time = time.time()
  3072. self.update_projects_by_document(_docid,list_projects)
  3073. log("update projects takes:%.3f"%(time.time()-_time))
  3074. _time = time.time()
  3075. list_projects = dumplicate_projects(list_projects)
  3076. log("dumplicate projects takes:%.3f"%(time.time()-_time))
  3077. _time = time.time()
  3078. list_projects = self.merge_projects(list_projects,b_log)
  3079. log("merge projects takes:%.3f"%(time.time()-_time))
  3080. _time = time.time()
  3081. dumplicate_document_in_merge(list_projects)
  3082. log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
  3083. _time = time.time()
  3084. project_json = to_project_json(list_projects)
  3085. log("json projects takes:%.3f"%(time.time()-_time))
  3086. if b_log:
  3087. log("project_json:%s"%project_json)
  3088. return project_json
  3089. def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
  3090. try:
  3091. start_time = time.time()
  3092. self.post_extract(item)
  3093. base_list = []
  3094. set_docid = set()
  3095. list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=get_all,to_log=True)
  3096. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  3097. _i = 0
  3098. step = 5
  3099. item["confidence"] = 999
  3100. if item.get(document_tmp_docid) not in set_docid:
  3101. base_list.append(item)
  3102. set_docid.add(item.get(document_tmp_docid))
  3103. while _i<len(list_rules):
  3104. must_not_q = []
  3105. if len(base_list)>0:
  3106. must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
  3107. _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
  3108. must_not_queries=must_not_q)
  3109. _rule = list_rules[_i]
  3110. confidence = _rule["confidence"]
  3111. singleNum_keys = _rule["singleNum_keys"]
  3112. contain_keys = _rule["contain_keys"]
  3113. multiNum_keys = _rule["multiNum_keys"]
  3114. self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json])
  3115. _i += step
  3116. _time = time.time()
  3117. log("%d start final check with length:%d"%(item["docid"],len(base_list)))
  3118. final_list = self.dumplicate_fianl_check(base_list)
  3119. log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
  3120. best_docid = self.get_best_docid(final_list)
  3121. final_list_docid = [a["docid"] for a in final_list]
  3122. log("%d:final_list_docid:%s"%(item["docid"],str(final_list_docid)))
  3123. _d = {"partitionkey":item["partitionkey"],
  3124. "docid":item["docid"],
  3125. "status":random.randint(*flow_dumplicate_status_to),
  3126. document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  3127. }
  3128. dtmp = Document_tmp(_d)
  3129. dup_docid = set()
  3130. for _dict in final_list:
  3131. dup_docid.add(_dict.get(document_tmp_docid))
  3132. if item.get(document_tmp_docid) in dup_docid:
  3133. dup_docid.remove(item.get(document_tmp_docid))
  3134. remove_list = []
  3135. if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
  3136. dtmp.setValue(document_tmp_save,1,True)
  3137. # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
  3138. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3139. for _dict in final_list:
  3140. if _dict.get(document_tmp_docid) in dup_docid:
  3141. remove_list.append(_dict)
  3142. else:
  3143. dtmp.setValue(document_tmp_save,0,True)
  3144. if best_docid in dup_docid:
  3145. dup_docid.remove(best_docid)
  3146. for _dict in final_list:
  3147. if _dict.get(document_tmp_docid) in dup_docid:
  3148. remove_list.append(_dict)
  3149. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3150. dmp_docid = "%d,%s"%(best_docid,dmp_docid)
  3151. else:
  3152. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3153. for _dict in final_list:
  3154. if _dict.get(document_tmp_docid) in dup_docid:
  3155. remove_list.append(_dict)
  3156. list_docids = list(dup_docid)
  3157. list_docids.append(best_docid)
  3158. b_log = False if upgrade else True
  3159. dtmp.setValue(document_tmp_projects,self.merge_document_real(item,list_docids,table_name,flow_dumplicate_status_to,b_log),True)
  3160. log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
  3161. if upgrade:
  3162. if table_name=="document_tmp":
  3163. self.changeSaveStatus(remove_list)
  3164. # print(dtmp.getProperties())
  3165. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3166. dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
  3167. dtmp.update_row(self.ots_client)
  3168. # log("dump takes %.2f"%(time.time()-start_time))
  3169. except Exception as e:
  3170. traceback.print_exc()
  3171. log("error on dumplicate of %s"%(str(item.get(document_tmp_docid))))
  3172. def start_flow_dumplicate(self):
  3173. schedule = BlockingScheduler()
  3174. schedule.add_job(self.flow_dumplicate,"cron",second="*/5")
  3175. schedule.start()
  3176. def changeSaveStatus(self,list_dict):
  3177. for _dict in list_dict:
  3178. if _dict.get(document_tmp_save,1)==1:
  3179. _d = {"partitionkey":_dict["partitionkey"],
  3180. "docid":_dict["docid"],
  3181. document_tmp_save:0
  3182. }
  3183. _d_tmp = Document_tmp(_d)
  3184. _d_tmp.update_row(self.ots_client)
  3185. def test_dumplicate(self,docid):
  3186. columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
  3187. bool_query = BoolQuery(must_queries=[
  3188. TermQuery("docid",docid)
  3189. ])
  3190. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  3191. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3192. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3193. log("flow_dumplicate producer total_count:%d"%total_count)
  3194. if total_count==0:
  3195. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  3196. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3197. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3198. list_dict = getRow_ots(rows)
  3199. for item in list_dict:
  3200. self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
  3201. return
  3202. def getRemainDoc(self,docid):
  3203. columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
  3204. bool_query = BoolQuery(must_queries=[
  3205. TermQuery("docid",docid)
  3206. ])
  3207. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  3208. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3209. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3210. list_dict = getRow_ots(rows)
  3211. if len(list_dict)>0:
  3212. item = list_dict[0]
  3213. start_time = time.time()
  3214. self.post_extract(item)
  3215. base_list = []
  3216. set_docid = set()
  3217. list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,to_log=True)
  3218. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  3219. _i = 0
  3220. step = 5
  3221. item["confidence"] = 999
  3222. if item.get(document_tmp_docid) not in set_docid:
  3223. base_list.append(item)
  3224. set_docid.add(item.get(document_tmp_docid))
  3225. while _i<len(list_rules):
  3226. must_not_q = []
  3227. if len(base_list)>0:
  3228. must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
  3229. _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
  3230. must_not_queries=must_not_q)
  3231. _rule = list_rules[_i]
  3232. confidence = _rule["confidence"]
  3233. singleNum_keys = _rule["singleNum_keys"]
  3234. contain_keys = _rule["contain_keys"]
  3235. multiNum_keys = _rule["multiNum_keys"]
  3236. self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json])
  3237. _i += step
  3238. _time = time.time()
  3239. log("%d start final check with length:%d"%(item["docid"],len(base_list)))
  3240. final_list = self.dumplicate_fianl_check(base_list)
  3241. log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
  3242. best_docid = self.get_best_docid(final_list)
  3243. return best_docid
  3244. return None
  3245. if __name__ == '__main__':
  3246. # df = Dataflow()
  3247. # df.flow_init()
  3248. # df.flow_test()
  3249. # df.test_merge()
  3250. # df.start_flow_attachment()
  3251. # df.start_flow_extract()
  3252. # df.start_flow_dumplicate()
  3253. # # df.start_flow_merge()
  3254. # df.start_flow_remove()
  3255. # download_attachment()
  3256. # test_attachment_interface()
  3257. df_dump = Dataflow_dumplicate(start_delete_listener=False)
  3258. # df_dump.start_flow_dumplicate()
  3259. a = time.time()
  3260. df_dump.test_dumplicate(275459183)
  3261. print("takes",time.time()-a)
  3262. # df_dump.delete_projects_by_document(16288036)
  3263. # log("=======")
  3264. # for i in range(3):
  3265. # time.sleep(20)
  3266. #
  3267. # a = {"docid":16288036}
  3268. # send_msg_toacmq(df_dump.pool_mq_ali,json.dumps(a),df_dump.doc_delete_queue)