dataflow.py 197 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171
  1. # sys.path.append("/data")
  2. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ_ali
  3. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  4. from BaseDataMaintenance.common.multiProcess import MultiHandler
  5. from queue import Queue
  6. from multiprocessing import Queue as PQueue
  7. from BaseDataMaintenance.model.ots.document_tmp import *
  8. from BaseDataMaintenance.model.ots.attachment import *
  9. from BaseDataMaintenance.model.ots.document_html import *
  10. from BaseDataMaintenance.model.ots.document_extract2 import *
  11. from BaseDataMaintenance.model.ots.project import *
  12. from BaseDataMaintenance.model.ots.project2_tmp import *
  13. from BaseDataMaintenance.model.ots.document import *
  14. from BaseDataMaintenance.model.ots.project_process import *
  15. import base64
  16. from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
  17. from uuid import uuid4
  18. from BaseDataMaintenance.common.ossUtils import *
  19. from BaseDataMaintenance.dataSource.source import is_internal,getAuth
  20. from apscheduler.schedulers.blocking import BlockingScheduler
  21. from BaseDataMaintenance.maintenance.dataflow_settings import *
  22. from threading import Thread
  23. import oss2
  24. from BaseDataMaintenance.maxcompute.documentDumplicate import *
  25. from BaseDataMaintenance.maxcompute.documentMerge import *
  26. from BaseDataMaintenance.common.otsUtils import *
  27. from BaseDataMaintenance.common.activateMQUtils import *
  28. from BaseDataMaintenance.dataMonitor.data_monitor import BaseDataMonitor
  29. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  30. def getSet(list_dict,key):
  31. _set = set()
  32. for item in list_dict:
  33. if key in item:
  34. if item[key]!='' and item[key] is not None:
  35. if re.search("^\d[\d\.]*$",item[key]) is not None:
  36. _set.add(str(float(item[key])))
  37. else:
  38. _set.add(str(item[key]))
  39. return _set
  40. def getSimilarityOfString(str1,str2):
  41. _set1 = set()
  42. _set2 = set()
  43. if str1 is not None:
  44. for i in range(1,len(str1)):
  45. _set1.add(str1[i-1:i+1])
  46. if str2 is not None:
  47. for i in range(1,len(str2)):
  48. _set2.add(str2[i-1:i+1])
  49. _len = max(1,min(len(_set1),len(_set2)))
  50. return len(_set1&_set2)/_len
  51. def getDiffIndex(list_dict,key,confidence=100):
  52. _set = set()
  53. for _i in range(len(list_dict)):
  54. item = list_dict[_i]
  55. if item["confidence"]>=confidence:
  56. continue
  57. if key in item:
  58. if item[key]!='' and item[key] is not None:
  59. if re.search("^\d+(\.\d+)?$",item[key]) is not None:
  60. _set.add(str(float(item[key])))
  61. else:
  62. _set.add(str(item[key]))
  63. if len(_set)>1:
  64. return _i
  65. return len(list_dict)
  66. def transformSWF(bucket,attachment_hub_url,objectPath,localpath,swf_dir):
  67. swf_urls = []
  68. try:
  69. list_files = os.listdir(swf_dir)
  70. list_files.sort(key=lambda x:x)
  71. headers = dict()
  72. headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PUBLIC_READ
  73. for _file in list_files:
  74. swf_localpath = "%s/%s"%(swf_dir,_file)
  75. swf_objectPath = "%s/%s"%(objectPath.split(".")[0],_file)
  76. uploadFileByPath(bucket,swf_localpath,swf_objectPath,headers)
  77. _url = "%s/%s"%(attachment_hub_url,swf_objectPath)
  78. swf_urls.append(_url)
  79. os.remove(swf_localpath)
  80. except Exception as e:
  81. traceback.print_exc()
  82. return swf_urls
  83. class Dataflow():
  84. def __init__(self):
  85. self.ots_client = getConnect_ots()
  86. self.queue_init = Queue()
  87. self.queue_attachment = Queue()
  88. self.queue_attachment_ocr = Queue()
  89. self.queue_attachment_not_ocr = Queue()
  90. self.list_attachment_ocr = []
  91. self.list_attachment_not_ocr = []
  92. self.queue_extract = Queue()
  93. self.list_extract = []
  94. self.queue_dumplicate = PQueue()
  95. self.dumplicate_set = set()
  96. self.queue_merge = Queue()
  97. self.queue_syncho = Queue()
  98. self.queue_remove = Queue()
  99. self.queue_remove_project = Queue()
  100. self.attachment_rec_interface = ""
  101. self.ots_client_merge = getConnect_ots()
  102. if is_internal:
  103. self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  104. else:
  105. self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  106. if is_internal:
  107. self.extract_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
  108. self.industy_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
  109. self.other_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
  110. else:
  111. self.extract_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
  112. self.industy_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
  113. self.other_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
  114. self.header = {'Content-Type': 'application/json',"Authorization":"NzZmOWZlMmU2MGY3YmQ4MDBjM2E5MDAyZjhjNjQ0MzZlMmE0NTMwZg=="}
  115. self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
  116. self.auth = getAuth()
  117. oss2.defaults.connection_pool_size = 100
  118. oss2.defaults.multiget_num_threads = 20
  119. log("bucket_url:%s"%(self.bucket_url))
  120. self.attachment_bucket_name = "attachment-hub"
  121. self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
  122. self.current_path = os.path.dirname(__file__)
  123. def flow_init(self):
  124. def producer():
  125. bool_query = BoolQuery(must_queries=[RangeQuery("crtime",'2022-04-20')])
  126. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  127. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  128. ColumnsToGet(return_type=ColumnReturnType.ALL))
  129. log("flow_init producer total_count:%d"%total_count)
  130. list_dict = getRow_ots(rows)
  131. for _dict in list_dict:
  132. self.queue_init.put(_dict)
  133. _count = len(list_dict)
  134. while next_token:
  135. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  136. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  137. ColumnsToGet(return_type=ColumnReturnType.ALL))
  138. list_dict = getRow_ots(rows)
  139. for _dict in list_dict:
  140. self.queue_init.put(_dict)
  141. _count += len(list_dict)
  142. def comsumer():
  143. mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
  144. mt.run()
  145. def comsumer_handle(item,result_queue,ots_client):
  146. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  147. if document_tmp_dochtmlcon in item:
  148. item.pop(document_tmp_dochtmlcon)
  149. if document_tmp_doctextcon in item:
  150. item.pop(document_tmp_doctextcon)
  151. if document_tmp_attachmenttextcon in item:
  152. item.pop(document_tmp_attachmenttextcon)
  153. _status = item.get(document_tmp_status)
  154. new_status = None
  155. if _status>=201 and _status<=300:
  156. item[document_tmp_save] = 1
  157. new_status = 81
  158. elif _status>=401 and _status<=450:
  159. item[document_tmp_save] = 0
  160. new_status = 81
  161. else:
  162. new_status = 1
  163. # new_status = 1
  164. item[document_tmp_status] = new_status
  165. dtmp = Document_tmp(item)
  166. dhtml = Document_html({document_tmp_partitionkey:item.get(document_tmp_partitionkey),
  167. document_tmp_docid:item.get(document_tmp_docid),
  168. document_tmp_dochtmlcon:_dochtmlcon})
  169. dtmp.update_row(ots_client)
  170. dhtml.update_row(ots_client)
  171. producer()
  172. comsumer()
  173. def getTitleFromHtml(self,filemd5,_html):
  174. _soup = BeautifulSoup(_html,"lxml")
  175. _find = _soup.find("a",attrs={"data":filemd5})
  176. _title = ""
  177. if _find is not None:
  178. _title = _find.get_text()
  179. return _title
  180. def getSourceLinkFromHtml(self,filemd5,_html):
  181. _soup = BeautifulSoup(_html,"lxml")
  182. _find = _soup.find("a",attrs={"filelink":filemd5})
  183. filelink = ""
  184. if _find is None:
  185. _find = _soup.find("img",attrs={"filelink":filemd5})
  186. if _find is not None:
  187. filelink = _find.attrs.get("src","")
  188. else:
  189. filelink = _find.attrs.get("href","")
  190. return filelink
  191. def request_attachment_interface(self,attach,_dochtmlcon):
  192. filemd5 = attach.getProperties().get(attachment_filemd5)
  193. _status = attach.getProperties().get(attachment_status)
  194. _filetype = attach.getProperties().get(attachment_filetype)
  195. _size = attach.getProperties().get(attachment_size)
  196. _path = attach.getProperties().get(attachment_path)
  197. _uuid = uuid4()
  198. objectPath = attach.getProperties().get(attachment_path)
  199. localpath = os.path.join(self.current_path,"download",_uuid.hex)
  200. docids = attach.getProperties().get(attachment_docids)
  201. try:
  202. if _size>ATTACHMENT_LARGESIZE:
  203. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE)
  204. log("attachment :%s of path:%s to large"%(filemd5,_path))
  205. attach.update_row(self.ots_client)
  206. return True
  207. else:
  208. d_start_time = time.time()
  209. if downloadFile(self.bucket,objectPath,localpath):
  210. time_download = time.time()-d_start_time
  211. _data_base64 = base64.b64encode(open(localpath,"rb").read())
  212. #调用接口处理结果
  213. start_time = time.time()
  214. _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  215. if _success:
  216. log("process filemd5:%s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
  217. else:
  218. log("attach interface failed of docid:%s filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  219. sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  220. _html = ""
  221. return False
  222. swf_images = eval(swf_images)
  223. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  224. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  225. if len(swf_urls)==0:
  226. objectPath = attach.getProperties().get(attachment_path,"")
  227. localpath = os.path.join(self.current_path,"download/%s.swf"%(uuid4().hex))
  228. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  229. if not os.path.exists(swf_dir):
  230. os.mkdir(swf_dir)
  231. for _i in range(len(swf_images)):
  232. _base = swf_images[_i]
  233. _base = base64.b64decode(_base)
  234. filename = "swf_page_%d.png"%(_i)
  235. filepath = os.path.join(swf_dir,filename)
  236. with open(filepath,"wb") as f:
  237. f.write(_base)
  238. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  239. if os.path.exists(swf_dir):
  240. os.rmdir(swf_dir)
  241. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  242. if re.search("<td",_html) is not None:
  243. attach.setValue(attachment_has_table,1,True)
  244. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  245. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  246. if _file_title!="":
  247. attach.setValue(attachment_file_title,_file_title,True)
  248. if filelink!="":
  249. attach.setValue(attachment_file_link,filelink,True)
  250. attach.setValue(attachment_attachmenthtml,_html,True)
  251. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  252. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  253. attach.setValue(attachment_recsize,len(_html),True)
  254. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  255. attach.update_row(self.ots_client) #线上再开放更新
  256. return True
  257. else:
  258. return False
  259. except oss2.exceptions.NotFound as e:
  260. return True
  261. except Exception as e:
  262. traceback.print_exc()
  263. finally:
  264. try:
  265. os.remove(localpath)
  266. except:
  267. pass
  268. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  269. list_html = []
  270. swf_urls = []
  271. for _attach in list_attach:
  272. #测试全跑
  273. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  274. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  275. if _html is None:
  276. _html = ""
  277. list_html.append(_html)
  278. else:
  279. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  280. if not _succeed:
  281. return False,"",[]
  282. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  283. if _html is None:
  284. _html = ""
  285. list_html.append(_html)
  286. if _attach.getProperties().get(attachment_filetype)=="swf":
  287. swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  288. return True,list_html,swf_urls
  289. def generate_dumplicate_query(self,_dict,_dict_must_not,set_match=set(["project_code","project_codes","product"]),set_nested=set(["win_tenderer","bidding_budget","win_bid_price"]),
  290. set_term=set(["project_name","doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save","docid"]),
  291. set_range=set(["page_time","status"]),set_phrase=set(["doctitle"])):
  292. list_must_queries = []
  293. list_must_no_queries = []
  294. for k,v in _dict.items():
  295. if k in set_match:
  296. if isinstance(v,str):
  297. l_s = []
  298. for s_v in v.split(","):
  299. l_s.append(MatchQuery(k,s_v))
  300. list_must_queries.append(BoolQuery(should_queries=l_s))
  301. elif k in set_nested:
  302. _v = v
  303. if k!="" and k=="bidding_budget" or k=="win_bid_price":
  304. _v = float(_v)
  305. list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  306. elif k in set_term:
  307. list_must_queries.append(TermQuery(k,v))
  308. elif k in set_phrase:
  309. list_must_queries.append(MatchPhraseQuery(k,v))
  310. elif k in set_range:
  311. if len(v)==1:
  312. list_must_queries.append(RangeQuery(k,v[0]))
  313. elif len(v)==2:
  314. list_must_queries.append(RangeQuery(k,v[0],v[1],True,True))
  315. for k,v in _dict_must_not.items():
  316. if k in set_match:
  317. if isinstance(v,str):
  318. l_s = []
  319. for s_v in v.split(","):
  320. l_s.append(MatchQuery(k,s_v))
  321. list_must_no_queries.append(BoolQuery(should_queries=l_s))
  322. elif k in set_nested:
  323. _v = v
  324. if k!="" and k=="bidding_budget" or k=="win_bid_price":
  325. _v = float(_v)
  326. list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  327. elif k in set_term:
  328. list_must_no_queries.append(TermQuery(k,v))
  329. elif k in set_range:
  330. if len(v)==1:
  331. list_must_no_queries.append(RangeQuery(k,v[0]))
  332. elif len(v)==2:
  333. list_must_no_queries.append(RangeQuery(k,v[0],v[1],True,True))
  334. return BoolQuery(must_queries=list_must_queries,must_not_queries=list_must_no_queries)
  335. def f_decode_sub_docs_json(self, project_code,project_name,tenderee,agency,sub_docs_json):
  336. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  337. extract_count = 0
  338. if project_code is not None and project_code!="":
  339. extract_count += 1
  340. if project_name is not None and project_name!="":
  341. extract_count += 1
  342. if tenderee is not None and tenderee!="":
  343. extract_count += 1
  344. if agency is not None and agency!="":
  345. extract_count += 1
  346. if sub_docs_json is not None:
  347. sub_docs = json.loads(sub_docs_json)
  348. sub_docs.sort(key=lambda x:float(x.get("bidding_budget",0)),reverse=True)
  349. sub_docs.sort(key=lambda x:float(x.get("win_bid_price",0)),reverse=True)
  350. # log("==%s"%(str(sub_docs)))
  351. for sub_docs in sub_docs:
  352. for _key_sub_docs in sub_docs.keys():
  353. extract_count += 1
  354. if _key_sub_docs in columns:
  355. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  356. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  357. if float(sub_docs[_key_sub_docs])>0:
  358. columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
  359. else:
  360. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  361. return columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count
  362. def post_extract(self,_dict):
  363. win_tenderer,bidding_budget,win_bid_price,extract_count = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
  364. _dict["win_tenderer"] = win_tenderer
  365. _dict["bidding_budget"] = bidding_budget
  366. _dict["win_bid_price"] = win_bid_price
  367. if "extract_count" not in _dict:
  368. _dict["extract_count"] = extract_count
  369. def get_dump_columns(self,_dict):
  370. docchannel = _dict.get(document_tmp_docchannel,0)
  371. project_code = _dict.get(document_tmp_project_code,"")
  372. project_name = _dict.get(document_tmp_project_name,"")
  373. tenderee = _dict.get(document_tmp_tenderee,"")
  374. agency = _dict.get(document_tmp_agency,"")
  375. doctitle_refine = _dict.get(document_tmp_doctitle_refine,"")
  376. win_tenderer = _dict.get("win_tenderer","")
  377. bidding_budget = _dict.get("bidding_budget","")
  378. if bidding_budget==0:
  379. bidding_budget = ""
  380. win_bid_price = _dict.get("win_bid_price","")
  381. if win_bid_price==0:
  382. win_bid_price = ""
  383. page_time = _dict.get(document_tmp_page_time,"")
  384. fingerprint = _dict.get(document_tmp_fingerprint,"")
  385. product = _dict.get(document_tmp_product,"")
  386. return docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product
  387. def f_set_docid_limitNum_contain(self,item, _split,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"]):
  388. flag = True
  389. for _key in singleNum_keys:
  390. if len(getSet(_split,_key))>1:
  391. flag = False
  392. break
  393. for _key in multiNum_keys:
  394. if len(getSet(_split,_key))<=1:
  395. flag = False
  396. break
  397. project_code = item.get("project_code","")
  398. for _key in notlike_keys:
  399. if not flag:
  400. break
  401. for _d in _split:
  402. _key_v = _d.get(_key,"")
  403. _sim = getSimilarityOfString(project_code,_key_v)
  404. if _sim>0.7 and _sim<1:
  405. flag = False
  406. break
  407. #判断组内每条公告是否包含
  408. if flag:
  409. if len(contain_keys)>0:
  410. for _key in contain_keys:
  411. MAX_CONTAIN_COLUMN = None
  412. for _d in _split:
  413. contain_column = _d.get(_key)
  414. if contain_column is not None and contain_column !="":
  415. if MAX_CONTAIN_COLUMN is None:
  416. MAX_CONTAIN_COLUMN = contain_column
  417. else:
  418. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  419. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  420. flag = False
  421. break
  422. MAX_CONTAIN_COLUMN = contain_column
  423. else:
  424. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  425. flag = False
  426. break
  427. if flag:
  428. return _split
  429. return []
  430. def search_data_by_query(self,item,_query,confidence,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  431. list_data = []
  432. if isinstance(_query,list):
  433. bool_query = BoolQuery(should_queries=_query)
  434. else:
  435. bool_query = _query
  436. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  437. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=50,get_total_count=True),
  438. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  439. list_dict = getRow_ots(rows)
  440. for _dict in list_dict:
  441. self.post_extract(_dict)
  442. _dict["confidence"] = confidence
  443. list_data.append(_dict)
  444. # _count = len(list_dict)
  445. # while next_token:
  446. # rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  447. # SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  448. # ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  449. # list_dict = getRow_ots(rows)
  450. # for _dict in list_dict:
  451. # self.post_extract(_dict)
  452. # _dict["confidence"] = confidence
  453. # list_data.append(_dict)
  454. list_dict = self.f_set_docid_limitNum_contain(item,list_dict,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys)
  455. return list_dict
  456. def add_data_by_query(self,item,base_list,set_docid,_query,confidence,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  457. list_dict = self.search_data_by_query(item,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
  458. for _dict in list_dict:
  459. self.post_extract(_dict)
  460. _docid = _dict.get(document_tmp_docid)
  461. if _docid not in set_docid:
  462. base_list.append(_dict)
  463. set_docid.add(_docid)
  464. def translate_dumplicate_rules(self,status_from,item):
  465. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  466. if page_time=='':
  467. page_time = getCurrent_date("%Y-%m-%d")
  468. base_dict = {
  469. "status":[status_from[0]],
  470. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  471. }
  472. must_not_dict = {"save":0}
  473. list_rules = []
  474. singleNum_keys = ["tenderee","win_tenderer"]
  475. if fingerprint!="":
  476. _dict = {}
  477. confidence = 100
  478. _dict[document_tmp_fingerprint] = fingerprint
  479. _dict.update(base_dict)
  480. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  481. _rule = {"confidence":confidence,
  482. "item":item,
  483. "query":_query,
  484. "singleNum_keys":[],
  485. "contain_keys":[],
  486. "multiNum_keys":[]}
  487. list_rules.append(_rule)
  488. if docchannel in (52,118):
  489. if bidding_budget!="" and tenderee!="" and project_code!="":
  490. confidence = 90
  491. _dict = {document_tmp_docchannel:docchannel,
  492. "bidding_budget":item.get("bidding_budget"),
  493. document_tmp_tenderee:item.get(document_tmp_tenderee,""),
  494. document_tmp_project_code:item.get(document_tmp_project_code,"")
  495. }
  496. _dict.update(base_dict)
  497. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  498. _rule = {"confidence":confidence,
  499. "query":_query,
  500. "singleNum_keys":singleNum_keys,
  501. "contain_keys":[],
  502. "multiNum_keys":[document_tmp_web_source_no]}
  503. list_rules.append(_rule)
  504. if doctitle_refine!="" and tenderee!="" and bidding_budget!="":
  505. confidence = 80
  506. _dict = {document_tmp_docchannel:docchannel,
  507. "doctitle_refine":doctitle_refine,
  508. "tenderee":tenderee,
  509. bidding_budget:"bidding_budget"
  510. }
  511. _dict.update(base_dict)
  512. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  513. _rule = {"confidence":confidence,
  514. "query":_query,
  515. "singleNum_keys":singleNum_keys,
  516. "contain_keys":[],
  517. "multiNum_keys":[document_tmp_web_source_no]}
  518. list_rules.append(_rule)
  519. if project_code!="" and doctitle_refine!="" and agency!="" and bidding_budget!="":
  520. confidence = 90
  521. _dict = {document_tmp_docchannel:docchannel,
  522. "project_code":project_code,
  523. "doctitle_refine":doctitle_refine,
  524. "agency":agency,
  525. "bidding_budget":bidding_budget
  526. }
  527. _dict.update(base_dict)
  528. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  529. _rule = {"confidence":confidence,
  530. "query":_query,
  531. "singleNum_keys":singleNum_keys,
  532. "contain_keys":[],
  533. "multiNum_keys":[document_tmp_web_source_no]}
  534. list_rules.append(_rule)
  535. if project_code!="" and tenderee!="" and bidding_budget!="":
  536. confidence = 91
  537. _dict = {document_tmp_docchannel:docchannel,
  538. "project_code":project_code,
  539. "tenderee":tenderee,
  540. "bidding_budget":bidding_budget
  541. }
  542. _dict.update(base_dict)
  543. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  544. _rule = {"confidence":confidence,
  545. "query":_query,
  546. "singleNum_keys":singleNum_keys,
  547. "contain_keys":[],
  548. "multiNum_keys":[document_tmp_web_source_no]}
  549. list_rules.append(_rule)
  550. if doctitle_refine!="" and agency!="" and bidding_budget!="":
  551. confidence = 71
  552. _dict = {document_tmp_docchannel:docchannel,
  553. "doctitle_refine":doctitle_refine,
  554. "agency":agency,
  555. "bidding_budget":bidding_budget
  556. }
  557. _dict.update(base_dict)
  558. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  559. _rule = {"confidence":confidence,
  560. "query":_query,
  561. "singleNum_keys":singleNum_keys,
  562. "contain_keys":[],
  563. "multiNum_keys":[document_tmp_web_source_no]}
  564. list_rules.append(_rule)
  565. if project_code!="" and project_name!="" and agency!="" and bidding_budget!="":
  566. confidence = 91
  567. _dict = {document_tmp_docchannel:docchannel,
  568. "project_code":project_code,
  569. "project_name":project_name,
  570. "agency":agency,
  571. "bidding_budget":bidding_budget
  572. }
  573. _dict.update(base_dict)
  574. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  575. n_singleKeys = [i for i in singleNum_keys]
  576. n_singleKeys.append(document_tmp_web_source_no)
  577. _rule = {"confidence":confidence,
  578. "query":_query,
  579. "singleNum_keys":n_singleKeys,
  580. "contain_keys":[],
  581. "multiNum_keys":[]}
  582. list_rules.append(_rule)
  583. ##-- 5. 招标公告 - 同项目编号- 同[项目名称、标题] - 同[招标人、代理公司] - 同预算(!=0) - 同信息源=1
  584. if project_code!="" and project_name!="" and tenderee!="" and bidding_budget!="":
  585. confidence = 91
  586. _dict = {document_tmp_docchannel:docchannel,
  587. "project_code":project_code,
  588. "project_name":project_name,
  589. "tenderee":tenderee,
  590. "bidding_budget":bidding_budget
  591. }
  592. _dict.update(base_dict)
  593. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  594. n_singleKeys = [i for i in singleNum_keys]
  595. n_singleKeys.append(document_tmp_web_source_no)
  596. _rule = {"confidence":confidence,
  597. "query":_query,
  598. "singleNum_keys":n_singleKeys,
  599. "contain_keys":[],
  600. "multiNum_keys":[]}
  601. list_rules.append(_rule)
  602. if project_code!="" and doctitle_refine!="" and tenderee!="" and bidding_budget!="":
  603. confidence = 71
  604. _dict = {document_tmp_docchannel:docchannel,
  605. "project_code":project_code,
  606. "doctitle_refine":doctitle_refine,
  607. "tenderee":tenderee,
  608. "bidding_budget":bidding_budget
  609. }
  610. _dict.update(base_dict)
  611. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  612. _rule = {"confidence":confidence,
  613. "query":_query,
  614. "singleNum_keys":singleNum_keys,
  615. "contain_keys":[],
  616. "multiNum_keys":[document_tmp_web_source_no]}
  617. list_rules.append(_rule)
  618. #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
  619. if project_name!="" and agency!="":
  620. tmp_bidding = 0
  621. if bidding_budget!="":
  622. tmp_bidding = bidding_budget
  623. confidence = 51
  624. _dict = {document_tmp_docchannel:docchannel,
  625. "project_name":project_name,
  626. "agency":agency,
  627. "bidding_budget":tmp_bidding
  628. }
  629. _dict.update(base_dict)
  630. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  631. _rule = {"confidence":confidence,
  632. "query":_query,
  633. "singleNum_keys":singleNum_keys,
  634. "contain_keys":[],
  635. "multiNum_keys":[document_tmp_web_source_no]}
  636. list_rules.append(_rule)
  637. #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
  638. if project_code!="" and agency!="":
  639. tmp_bidding = 0
  640. if bidding_budget!="":
  641. tmp_bidding = bidding_budget
  642. confidence = 51
  643. _dict = {document_tmp_docchannel:docchannel,
  644. "project_code":project_code,
  645. "agency":agency,
  646. "bidding_budget":tmp_bidding
  647. }
  648. _dict.update(base_dict)
  649. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  650. _rule = {"confidence":confidence,
  651. "query":_query,
  652. "singleNum_keys":singleNum_keys,
  653. "contain_keys":[],
  654. "multiNum_keys":[document_tmp_web_source_no]}
  655. list_rules.append(_rule)
  656. if docchannel not in (101,119,120):
  657. #-- 7. 非中标公告 - 同项目名称 - 同发布日期 - 同招标人 - 同预算 - 同类型 - 信息源>1 - 同项目编号
  658. if project_name!="" and tenderee!="" and project_code!="":
  659. tmp_bidding = 0
  660. if bidding_budget!="":
  661. tmp_bidding = bidding_budget
  662. confidence = 51
  663. _dict = {document_tmp_docchannel:docchannel,
  664. "project_name":project_name,
  665. "tenderee":tenderee,
  666. "project_code":project_code
  667. }
  668. _dict.update(base_dict)
  669. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  670. _rule = {"confidence":confidence,
  671. "query":_query,
  672. "singleNum_keys":singleNum_keys,
  673. "contain_keys":[],
  674. "multiNum_keys":[document_tmp_web_source_no]}
  675. list_rules.append(_rule)
  676. if docchannel in (101,119,120):
  677. #-- 3. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(==0)
  678. if project_code!="" and project_name!="" and win_tenderer!="":
  679. tmp_win = 0
  680. if win_bid_price!="":
  681. tmp_win = win_bid_price
  682. confidence = 61
  683. _dict = {document_tmp_docchannel:docchannel,
  684. "project_code":project_code,
  685. "project_name":project_name,
  686. "win_tenderer":win_tenderer,
  687. "win_bid_price":tmp_win
  688. }
  689. _dict.update(base_dict)
  690. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  691. _rule = {"confidence":confidence,
  692. "query":_query,
  693. "singleNum_keys":singleNum_keys,
  694. "contain_keys":[],
  695. "multiNum_keys":[]}
  696. list_rules.append(_rule)
  697. if project_code!="" and project_name!="" and bidding_budget!="" and product!="":
  698. confidence = 72
  699. _dict = {document_tmp_docchannel:docchannel,
  700. "project_code":project_code,
  701. "project_name":project_name,
  702. "bidding_budget":bidding_budget,
  703. "product":product
  704. }
  705. _dict.update(base_dict)
  706. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  707. n_singleKeys = [i for i in singleNum_keys]
  708. n_singleKeys.append(document_tmp_web_source_no)
  709. _rule = {"confidence":confidence,
  710. "query":_query,
  711. "singleNum_keys":n_singleKeys,
  712. "contain_keys":[],
  713. "multiNum_keys":[]}
  714. list_rules.append(_rule)
  715. if project_code!='' and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  716. confidence = 91
  717. _dict = {document_tmp_docchannel:docchannel,
  718. "project_code":project_code,
  719. "doctitle_refine":doctitle_refine,
  720. "win_tenderer":win_tenderer,
  721. "win_bid_price":win_bid_price
  722. }
  723. _dict.update(base_dict)
  724. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  725. n_singleKeys = [i for i in singleNum_keys]
  726. n_singleKeys.append(document_tmp_web_source_no)
  727. _rule = {"confidence":confidence,
  728. "query":_query,
  729. "singleNum_keys":n_singleKeys,
  730. "contain_keys":[],
  731. "multiNum_keys":[]}
  732. list_rules.append(_rule)
  733. ##-- 2. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(!=0) - 同信息源=1
  734. if project_code!="" and project_name!="" and win_tenderer!="" and win_bid_price!="":
  735. confidence = 91
  736. _dict = {document_tmp_docchannel:docchannel,
  737. "project_code":project_code,
  738. "project_name":project_name,
  739. "win_tenderer":win_tenderer,
  740. "win_bid_price":win_bid_price
  741. }
  742. _dict.update(base_dict)
  743. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  744. n_singleKeys = [i for i in singleNum_keys]
  745. n_singleKeys.append(document_tmp_web_source_no)
  746. _rule = {"confidence":confidence,
  747. "query":_query,
  748. "singleNum_keys":n_singleKeys,
  749. "contain_keys":[],
  750. "multiNum_keys":[]}
  751. list_rules.append(_rule)
  752. if project_name!="" and win_tenderer!="" and win_bid_price!="":
  753. confidence = 91
  754. _dict = {document_tmp_docchannel:docchannel,
  755. "project_name":project_name,
  756. "win_tenderer":win_tenderer,
  757. "win_bid_price":win_bid_price,
  758. }
  759. _dict.update(base_dict)
  760. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  761. _rule = {"confidence":confidence,
  762. "query":_query,
  763. "singleNum_keys":singleNum_keys,
  764. "contain_keys":[],
  765. "multiNum_keys":[document_tmp_web_source_no]}
  766. list_rules.append(_rule)
  767. if project_code!="" and win_tenderer!="" and win_bid_price!="":
  768. confidence = 91
  769. _dict = {document_tmp_docchannel:docchannel,
  770. "project_code":project_code,
  771. "win_tenderer":win_tenderer,
  772. "win_bid_price":win_bid_price,
  773. }
  774. _dict.update(base_dict)
  775. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  776. _rule = {"confidence":confidence,
  777. "query":_query,
  778. "singleNum_keys":singleNum_keys,
  779. "contain_keys":[],
  780. "multiNum_keys":[document_tmp_web_source_no]}
  781. list_rules.append(_rule)
  782. if project_code!="" and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  783. confidence = 91
  784. _dict = {document_tmp_docchannel:docchannel,
  785. "project_code":project_code,
  786. "doctitle_refine":doctitle_refine,
  787. "win_tenderer":win_tenderer,
  788. "win_bid_price":win_bid_price
  789. }
  790. _dict.update(base_dict)
  791. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  792. n_singleKeys = [i for i in singleNum_keys]
  793. n_singleKeys.append(document_tmp_web_source_no)
  794. _rule = {"confidence":confidence,
  795. "query":_query,
  796. "singleNum_keys":n_singleKeys,
  797. "contain_keys":[],
  798. "multiNum_keys":[]}
  799. list_rules.append(_rule)
  800. if doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  801. confidence=90
  802. _dict = {document_tmp_docchannel:docchannel,
  803. "doctitle_refine":doctitle_refine,
  804. "win_tenderer":win_tenderer,
  805. "win_bid_price":win_bid_price
  806. }
  807. _dict.update(base_dict)
  808. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  809. _rule = {"confidence":confidence,
  810. "query":_query,
  811. "singleNum_keys":singleNum_keys,
  812. "contain_keys":[],
  813. "multiNum_keys":[document_tmp_web_source_no]}
  814. list_rules.append(_rule)
  815. if project_name!="" and win_tenderer!="" and win_bid_price!="" and project_code!="":
  816. confidence=95
  817. _dict = {document_tmp_docchannel:docchannel,
  818. "project_name":project_name,
  819. "win_tenderer":win_tenderer,
  820. "win_bid_price":win_bid_price,
  821. "project_code":project_code
  822. }
  823. _dict.update(base_dict)
  824. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  825. _rule = {"confidence":confidence,
  826. "query":_query,
  827. "singleNum_keys":singleNum_keys,
  828. "contain_keys":[],
  829. "multiNum_keys":[document_tmp_web_source_no]}
  830. list_rules.append(_rule)
  831. if docchannel in (51,103,115,116):
  832. #9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
  833. if doctitle_refine!="" and tenderee!="":
  834. tmp_budget = 0
  835. if bidding_budget!="":
  836. tmp_budget = bidding_budget
  837. confidence=81
  838. _dict = {document_tmp_docchannel:docchannel,
  839. "doctitle_refine":doctitle_refine,
  840. "tenderee":tenderee,
  841. "bidding_budget":tmp_budget,
  842. }
  843. _dict.update(base_dict)
  844. _dict["page_time"] = [page_time,page_time]
  845. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  846. _rule = {"confidence":confidence,
  847. "query":_query,
  848. "singleNum_keys":singleNum_keys,
  849. "contain_keys":[],
  850. "multiNum_keys":[document_tmp_web_source_no]}
  851. list_rules.append(_rule)
  852. #-- 9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
  853. if project_code!="" and tenderee!="":
  854. confidence=81
  855. tmp_budget = 0
  856. if bidding_budget!="":
  857. tmp_budget = bidding_budget
  858. _dict = {document_tmp_docchannel:docchannel,
  859. "project_code":project_code,
  860. "tenderee":tenderee,
  861. "bidding_budget":tmp_budget,
  862. }
  863. _dict.update(base_dict)
  864. _dict["page_time"] = [page_time,page_time]
  865. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  866. _rule = {"confidence":confidence,
  867. "query":_query,
  868. "singleNum_keys":singleNum_keys,
  869. "contain_keys":[],
  870. "multiNum_keys":[document_tmp_web_source_no]}
  871. list_rules.append(_rule)
  872. if project_name!="" and tenderee!="":
  873. confidence=81
  874. tmp_budget = 0
  875. if bidding_budget!="":
  876. tmp_budget = bidding_budget
  877. _dict = {document_tmp_docchannel:docchannel,
  878. "project_name":project_name,
  879. "tenderee":tenderee,
  880. "bidding_budget":tmp_budget,
  881. }
  882. _dict.update(base_dict)
  883. _dict["page_time"] = [page_time,page_time]
  884. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  885. _rule = {"confidence":confidence,
  886. "query":_query,
  887. "singleNum_keys":singleNum_keys,
  888. "contain_keys":[],
  889. "multiNum_keys":[document_tmp_web_source_no]}
  890. list_rules.append(_rule)
  891. if agency!="" and tenderee!="":
  892. confidence=81
  893. tmp_budget = 0
  894. if bidding_budget!="":
  895. tmp_budget = bidding_budget
  896. _dict = {document_tmp_docchannel:docchannel,
  897. "agency":agency,
  898. "tenderee":tenderee,
  899. "bidding_budget":tmp_budget,
  900. "product":product
  901. }
  902. _dict.update(base_dict)
  903. _dict["page_time"] = [page_time,page_time]
  904. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  905. _rule = {"confidence":confidence,
  906. "query":_query,
  907. "singleNum_keys":singleNum_keys,
  908. "contain_keys":[],
  909. "multiNum_keys":[document_tmp_web_source_no]}
  910. list_rules.append(_rule)
  911. if agency!="" and project_code!="":
  912. confidence=81
  913. tmp_budget = 0
  914. if bidding_budget!="":
  915. tmp_budget = bidding_budget
  916. _dict = {document_tmp_docchannel:docchannel,
  917. "agency":agency,
  918. "project_code":project_code,
  919. "bidding_budget":tmp_budget,
  920. "product":product
  921. }
  922. _dict.update(base_dict)
  923. _dict["page_time"] = [page_time,page_time]
  924. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  925. _rule = {"confidence":confidence,
  926. "query":_query,
  927. "singleNum_keys":singleNum_keys,
  928. "contain_keys":[],
  929. "multiNum_keys":[document_tmp_web_source_no]}
  930. list_rules.append(_rule)
  931. if agency!="" and project_name!="":
  932. confidence=81
  933. tmp_budget = 0
  934. if bidding_budget!="":
  935. tmp_budget = bidding_budget
  936. _dict = {document_tmp_docchannel:docchannel,
  937. "agency":agency,
  938. "project_name":project_name,
  939. "bidding_budget":tmp_budget,
  940. "product":product
  941. }
  942. _dict.update(base_dict)
  943. _dict["page_time"] = [page_time,page_time]
  944. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  945. _rule = {"confidence":confidence,
  946. "query":_query,
  947. "singleNum_keys":singleNum_keys,
  948. "contain_keys":[],
  949. "multiNum_keys":[document_tmp_web_source_no]}
  950. list_rules.append(_rule)
  951. #五选二
  952. if tenderee!="" and bidding_budget!="" and product!="":
  953. confidence=80
  954. _dict = {document_tmp_docchannel:docchannel,
  955. "tenderee":tenderee,
  956. "bidding_budget":bidding_budget,
  957. "product":product,
  958. }
  959. _dict.update(base_dict)
  960. _dict["page_time"] = [page_time,page_time]
  961. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  962. _rule = {"confidence":confidence,
  963. "query":_query,
  964. "singleNum_keys":singleNum_keys,
  965. "contain_keys":[],
  966. "multiNum_keys":[]}
  967. list_rules.append(_rule)
  968. if tenderee!="" and win_tenderer!="" and product!="":
  969. confidence=80
  970. _dict = {document_tmp_docchannel:docchannel,
  971. "tenderee":tenderee,
  972. "win_tenderer":win_tenderer,
  973. "product":product,
  974. }
  975. _dict.update(base_dict)
  976. _dict["page_time"] = [page_time,page_time]
  977. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  978. _rule = {"confidence":confidence,
  979. "query":_query,
  980. "singleNum_keys":singleNum_keys,
  981. "contain_keys":[],
  982. "multiNum_keys":[]}
  983. list_rules.append(_rule)
  984. if tenderee!="" and win_bid_price!="":
  985. confidence=80
  986. _dict = {document_tmp_docchannel:docchannel,
  987. "tenderee":tenderee,
  988. "win_bid_price":win_bid_price,
  989. "product":product,
  990. }
  991. _dict.update(base_dict)
  992. _dict["page_time"] = [page_time,page_time]
  993. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  994. _rule = {"confidence":confidence,
  995. "query":_query,
  996. "singleNum_keys":singleNum_keys,
  997. "contain_keys":[],
  998. "multiNum_keys":[]}
  999. list_rules.append(_rule)
  1000. if tenderee!="" and agency!="":
  1001. confidence=80
  1002. _dict = {document_tmp_docchannel:docchannel,
  1003. "tenderee":tenderee,
  1004. "agency":agency,
  1005. "product":product,
  1006. }
  1007. _dict.update(base_dict)
  1008. _dict["page_time"] = [page_time,page_time]
  1009. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1010. _rule = {"confidence":confidence,
  1011. "query":_query,
  1012. "singleNum_keys":singleNum_keys,
  1013. "contain_keys":[],
  1014. "multiNum_keys":[]}
  1015. list_rules.append(_rule)
  1016. if win_tenderer!="" and bidding_budget!="":
  1017. confidence=80
  1018. _dict = {document_tmp_docchannel:docchannel,
  1019. "win_tenderer":win_tenderer,
  1020. "bidding_budget":bidding_budget,
  1021. "product":product,
  1022. }
  1023. _dict.update(base_dict)
  1024. _dict["page_time"] = [page_time,page_time]
  1025. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1026. _rule = {"confidence":confidence,
  1027. "query":_query,
  1028. "singleNum_keys":singleNum_keys,
  1029. "contain_keys":[],
  1030. "multiNum_keys":[]}
  1031. list_rules.append(_rule)
  1032. if win_bid_price!="" and bidding_budget!="":
  1033. confidence=80
  1034. _dict = {document_tmp_docchannel:docchannel,
  1035. "win_bid_price":win_bid_price,
  1036. "bidding_budget":bidding_budget,
  1037. "product":product,
  1038. }
  1039. _dict.update(base_dict)
  1040. _dict["page_time"] = [page_time,page_time]
  1041. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1042. _rule = {"confidence":confidence,
  1043. "query":_query,
  1044. "singleNum_keys":singleNum_keys,
  1045. "contain_keys":[],
  1046. "multiNum_keys":[]}
  1047. list_rules.append(_rule)
  1048. if agency!="" and bidding_budget!="":
  1049. confidence=80
  1050. _dict = {document_tmp_docchannel:docchannel,
  1051. "agency":agency,
  1052. "bidding_budget":bidding_budget,
  1053. "product":product,
  1054. }
  1055. _dict.update(base_dict)
  1056. _dict["page_time"] = [page_time,page_time]
  1057. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1058. _rule = {"confidence":confidence,
  1059. "query":_query,
  1060. "singleNum_keys":singleNum_keys,
  1061. "contain_keys":[],
  1062. "multiNum_keys":[]}
  1063. list_rules.append(_rule)
  1064. if win_tenderer!="" and win_bid_price!="":
  1065. confidence=80
  1066. _dict = {document_tmp_docchannel:docchannel,
  1067. "win_tenderer":win_tenderer,
  1068. "win_bid_price":win_bid_price,
  1069. "product":product,
  1070. }
  1071. _dict.update(base_dict)
  1072. _dict["page_time"] = [page_time,page_time]
  1073. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1074. _rule = {"confidence":confidence,
  1075. "query":_query,
  1076. "singleNum_keys":singleNum_keys,
  1077. "contain_keys":[],
  1078. "multiNum_keys":[]}
  1079. list_rules.append(_rule)
  1080. if win_tenderer!="" and agency!="":
  1081. confidence=80
  1082. _dict = {document_tmp_docchannel:docchannel,
  1083. "win_tenderer":win_tenderer,
  1084. "agency":agency,
  1085. "product":product,
  1086. }
  1087. _dict.update(base_dict)
  1088. _dict["page_time"] = [page_time,page_time]
  1089. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1090. _rule = {"confidence":confidence,
  1091. "query":_query,
  1092. "singleNum_keys":singleNum_keys,
  1093. "contain_keys":[],
  1094. "multiNum_keys":[]}
  1095. list_rules.append(_rule)
  1096. if doctitle_refine!="" and product!="" and len(doctitle_refine)>7:
  1097. confidence=80
  1098. _dict = {document_tmp_docchannel:docchannel,
  1099. "doctitle_refine":doctitle_refine,
  1100. "product":product,
  1101. }
  1102. _dict.update(base_dict)
  1103. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1104. _rule = {"confidence":confidence,
  1105. "query":_query,
  1106. "singleNum_keys":singleNum_keys,
  1107. "contain_keys":[],
  1108. "multiNum_keys":[]}
  1109. list_rules.append(_rule)
  1110. return list_rules
  1111. def dumplicate_fianl_check(self,base_list):
  1112. the_group = base_list
  1113. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  1114. if len(the_group)>10:
  1115. keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget","doctitle_refine"]
  1116. else:
  1117. keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget"]
  1118. #置信度
  1119. list_key_index = []
  1120. for _k in keys:
  1121. if _k=="doctitle":
  1122. list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
  1123. else:
  1124. list_key_index.append(getDiffIndex(the_group,_k))
  1125. _index = min(list_key_index)
  1126. if _index>1:
  1127. return the_group[:_index]
  1128. return []
  1129. def get_best_docid(self,base_list):
  1130. to_reverse = False
  1131. dict_source_count = {}
  1132. for _item in base_list:
  1133. _web_source = _item.get(document_tmp_web_source_no)
  1134. _fingerprint = _item.get(document_tmp_fingerprint)
  1135. if _web_source is not None:
  1136. if _web_source not in dict_source_count:
  1137. dict_source_count[_web_source] = set()
  1138. dict_source_count[_web_source].add(_fingerprint)
  1139. if len(dict_source_count[_web_source])>=2:
  1140. to_reverse=True
  1141. if len(base_list)>0:
  1142. base_list.sort(key=lambda x:x["docid"],reverse=to_reverse)
  1143. base_list.sort(key=lambda x:x.get(document_attachment_extract_status,0),reverse=True)
  1144. base_list.sort(key=lambda x:x["extract_count"],reverse=True)
  1145. return base_list[0]["docid"]
  1146. def save_dumplicate(self,base_list,best_docid,status_from,status_to):
  1147. #best_docid need check while others can save directly
  1148. list_dict = []
  1149. for item in base_list:
  1150. docid = item["docid"]
  1151. _dict = {"partitionkey":item["partitionkey"],
  1152. "docid":item["docid"]}
  1153. if docid==best_docid:
  1154. if item.get("save",1)!=0:
  1155. _dict["save"] = 1
  1156. else:
  1157. _dict["save"] = 0
  1158. if item.get("status")>=status_from[0] and item.get("status")<=status_from[1]:
  1159. _dict["status"] = random.randint(status_to[0],status_to[1])
  1160. list_dict.append(_dict)
  1161. for _dict in list_dict:
  1162. dtmp = Document_tmp(_dict)
  1163. dtmp.update_row(self.ots_client)
  1164. def flow_test(self,status_to=[1,10]):
  1165. def producer():
  1166. bool_query = BoolQuery(must_queries=[
  1167. # ExistsQuery("docid"),
  1168. # RangeQuery("crtime",range_to='2022-04-10'),
  1169. # RangeQuery("status",61),
  1170. NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
  1171. ],
  1172. must_not_queries=[
  1173. # NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
  1174. TermQuery("attachment_extract_status",1),
  1175. RangeQuery("status",1,11)
  1176. ]
  1177. )
  1178. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1179. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1180. ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1181. log("flow_init producer total_count:%d"%total_count)
  1182. list_dict = getRow_ots(rows)
  1183. for _dict in list_dict:
  1184. self.queue_init.put(_dict)
  1185. _count = len(list_dict)
  1186. while next_token and _count<1000000:
  1187. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1188. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1189. ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1190. list_dict = getRow_ots(rows)
  1191. for _dict in list_dict:
  1192. self.queue_init.put(_dict)
  1193. _count += len(list_dict)
  1194. print("%d/%d"%(_count,total_count))
  1195. def comsumer():
  1196. mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
  1197. mt.run()
  1198. def comsumer_handle(item,result_queue,ots_client):
  1199. # print(item)
  1200. dtmp = Document_tmp(item)
  1201. dtmp.setValue(document_tmp_status,random.randint(*status_to),True)
  1202. dtmp.update_row(ots_client)
  1203. # dhtml = Document_html(item)
  1204. # dhtml.update_row(ots_client)
  1205. # dtmp.delete_row(ots_client)
  1206. # dhtml.delete_row(ots_client)
  1207. producer()
  1208. comsumer()
  1209. def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
  1210. def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1211. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
  1212. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1213. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1214. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1215. log("flow_dumplicate producer total_count:%d"%total_count)
  1216. list_dict = getRow_ots(rows)
  1217. for _dict in list_dict:
  1218. self.queue_dumplicate.put(_dict)
  1219. _count = len(list_dict)
  1220. while next_token and _count<flow_process_count:
  1221. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1222. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1223. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1224. list_dict = getRow_ots(rows)
  1225. for _dict in list_dict:
  1226. self.queue_dumplicate.put(_dict)
  1227. _count += len(list_dict)
  1228. def comsumer():
  1229. mt = MultiThreadHandler(self.queue_dumplicate,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1230. mt.run()
  1231. def comsumer_handle(item,result_queue,ots_client):
  1232. self.post_extract(item)
  1233. base_list = []
  1234. set_docid = set()
  1235. list_rules = self.translate_dumplicate_rules(flow_dumplicate_status_from,item)
  1236. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  1237. # print(item,"len_rules",len(list_rules))
  1238. for _rule in list_rules:
  1239. _query = _rule["query"]
  1240. confidence = _rule["confidence"]
  1241. singleNum_keys = _rule["singleNum_keys"]
  1242. contain_keys = _rule["contain_keys"]
  1243. multiNum_keys = _rule["multiNum_keys"]
  1244. self.add_data_by_query(item,base_list,set_docid,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys)
  1245. item["confidence"] = 999
  1246. if item.get(document_tmp_docid) not in set_docid:
  1247. base_list.append(item)
  1248. final_list = self.dumplicate_fianl_check(base_list)
  1249. best_docid = self.get_best_docid(final_list)
  1250. # log(str(final_list))
  1251. _d = {"partitionkey":item["partitionkey"],
  1252. "docid":item["docid"],
  1253. "status":random.randint(*flow_dumplicate_status_to),
  1254. document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  1255. }
  1256. dtmp = Document_tmp(_d)
  1257. dup_docid = set()
  1258. for _dict in final_list:
  1259. dup_docid.add(_dict.get(document_tmp_docid))
  1260. if item.get(document_tmp_docid) in dup_docid:
  1261. dup_docid.remove(item.get(document_tmp_docid))
  1262. if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
  1263. dtmp.setValue(document_tmp_save,1,True)
  1264. dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
  1265. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1266. else:
  1267. dtmp.setValue(document_tmp_save,0,True)
  1268. if best_docid in dup_docid:
  1269. dup_docid.remove(best_docid)
  1270. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1271. dmp_docid = "%d,%s"%(best_docid,dmp_docid)
  1272. else:
  1273. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1274. dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
  1275. dtmp.update_row(self.ots_client)
  1276. #只保留当前公告
  1277. # self.save_dumplicate(final_list,best_docid,status_from,status_to)
  1278. #
  1279. # print("=base=",item)
  1280. # if len(final_list)>=1:
  1281. # print("==================")
  1282. # for _dict in final_list:
  1283. # print(_dict)
  1284. # print("========>>>>>>>>>>")
  1285. producer()
  1286. comsumer()
  1287. def merge_document(self,item,status_to=None):
  1288. self.post_extract(item)
  1289. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  1290. _d = {"partitionkey":item["partitionkey"],
  1291. "docid":item["docid"],
  1292. }
  1293. dtmp = Document_tmp(_d)
  1294. if item.get(document_tmp_save,1)==1:
  1295. list_should_q = []
  1296. if project_code!="" and tenderee!="":
  1297. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1298. TermQuery("tenderee",tenderee)])
  1299. list_should_q.append(_q)
  1300. if project_name!="" and project_code!="":
  1301. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1302. TermQuery("project_name",project_name)])
  1303. list_should_q.append(_q)
  1304. if len(list_should_q)>0:
  1305. list_data = self.search_data_by_query(item,list_should_q,100,merge=True,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
  1306. if len(list_data)==1:
  1307. dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
  1308. print(item["docid"],list_data[0]["uuid"])
  1309. else:
  1310. list_should_q = []
  1311. if bidding_budget!="" and project_code!="":
  1312. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1313. TermQuery("bidding_budget",float(bidding_budget))])
  1314. list_should_q.append(_q)
  1315. if tenderee!="" and bidding_budget!="" and project_name!="":
  1316. _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
  1317. TermQuery("bidding_budget",float(bidding_budget)),
  1318. TermQuery("project_name",project_name)])
  1319. list_should_q.append(_q)
  1320. if tenderee!="" and win_bid_price!="" and project_name!="":
  1321. _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
  1322. TermQuery("win_bid_price",float(win_bid_price)),
  1323. TermQuery("project_name",project_name)])
  1324. list_should_q.append(_q)
  1325. if len(list_should_q)>0:
  1326. list_data = self.search_data_by_query(item,list_should_q,100,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
  1327. if len(list_data)==1:
  1328. dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
  1329. print(item["docid"],list_data[0]["uuid"])
  1330. return dtmp.getProperties().get("merge_uuid","")
  1331. # dtmp.update_row(self.ots_client)
  1332. def test_merge(self):
  1333. import pandas as pd
  1334. import queue
  1335. def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1336. list_test_item = []
  1337. should_q = BoolQuery(should_queries=[
  1338. TermQuery("docchannel",101),
  1339. TermQuery("docchannel",119),
  1340. TermQuery("docchannel",120)
  1341. ])
  1342. bool_query = BoolQuery(must_queries=[
  1343. TermQuery("page_time","2022-04-22"),
  1344. should_q,
  1345. TermQuery("save",1)
  1346. ])
  1347. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1348. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1349. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1350. log("flow_dumplicate producer total_count:%d"%total_count)
  1351. list_dict = getRow_ots(rows)
  1352. for _dict in list_dict:
  1353. list_test_item.append(_dict)
  1354. _count = len(list_dict)
  1355. while next_token:
  1356. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1357. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1358. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1359. list_dict = getRow_ots(rows)
  1360. for _dict in list_dict:
  1361. list_test_item.append(_dict)
  1362. _count += len(list_dict)
  1363. print("%d/%d"%(_count,total_count))
  1364. return list_test_item
  1365. from BaseDataMaintenance.model.ots.project import Project
  1366. def comsumer_handle(item,result_queue,ots_client):
  1367. item["merge_uuid"] = self.merge_document(item)
  1368. if item["merge_uuid"]!="":
  1369. _dict = {"uuid":item["merge_uuid"]}
  1370. _p = Project(_dict)
  1371. _p.fix_columns(self.ots_client,["zhao_biao_page_time"],True)
  1372. if _p.getProperties().get("zhao_biao_page_time","")!="":
  1373. item["是否有招标"] = "是"
  1374. list_test_item = producer()
  1375. task_queue = queue.Queue()
  1376. for item in list_test_item:
  1377. task_queue.put(item)
  1378. mt = MultiThreadHandler(task_queue,comsumer_handle,None,30,1,ots_client=self.ots_client)
  1379. mt.run()
  1380. keys = [document_tmp_docid,document_tmp_docchannel,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_doctitle_refine,"win_tenderer","bidding_budget","win_bid_price","merge_uuid","是否有招标"]
  1381. df_data = {}
  1382. for k in keys:
  1383. df_data[k] = []
  1384. for item in list_test_item:
  1385. for k in keys:
  1386. df_data[k].append(item.get(k,""))
  1387. df = pd.DataFrame(df_data)
  1388. df.to_excel("test_merge.xlsx",columns=keys)
  1389. def flow_merge(self,process_count=10000,status_from=[71,80],status_to=[81,90]):
  1390. def producer(columns=[document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1391. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
  1392. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1393. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1394. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1395. log("flow_merge producer total_count:%d"%total_count)
  1396. list_dict = getRow_ots(rows)
  1397. for _dict in list_dict:
  1398. self.queue_merge.put(_dict)
  1399. _count = len(list_dict)
  1400. while next_token and _count<process_count:
  1401. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1402. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1403. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1404. list_dict = getRow_ots(rows)
  1405. for _dict in list_dict:
  1406. self.queue_merge.put(_dict)
  1407. _count += len(list_dict)
  1408. def comsumer():
  1409. mt = MultiThreadHandler(self.queue_merge,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1410. mt.run()
  1411. def comsumer_handle(item,result_queue,ots_client):
  1412. self.merge_document(item,status_to)
  1413. # producer()
  1414. # comsumer()
  1415. pass
  1416. def flow_syncho(self,status_from=[71,80],status_to=[81,90]):
  1417. pass
  1418. def flow_remove(self,process_count=flow_process_count,status_from=flow_remove_status_from):
  1419. def producer():
  1420. current_date = getCurrent_date("%Y-%m-%d")
  1421. tmp_date = timeAdd(current_date,-4)
  1422. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True),
  1423. RangeQuery(document_tmp_crtime,range_to="%s 00:00:00"%(tmp_date))])
  1424. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1425. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1426. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1427. log("flow_remove producer total_count:%d"%total_count)
  1428. list_dict = getRow_ots(rows)
  1429. for _dict in list_dict:
  1430. self.queue_remove.put(_dict)
  1431. _count = len(list_dict)
  1432. while next_token:
  1433. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1434. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1435. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1436. list_dict = getRow_ots(rows)
  1437. for _dict in list_dict:
  1438. self.queue_remove.put(_dict)
  1439. _count += len(list_dict)
  1440. def comsumer():
  1441. mt = MultiThreadHandler(self.queue_remove,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1442. mt.run()
  1443. def comsumer_handle(item,result_queue,ots_client):
  1444. dtmp = Document_tmp(item)
  1445. dtmp.delete_row(self.ots_client)
  1446. dhtml = Document_html(item)
  1447. dhtml.delete_row(self.ots_client)
  1448. producer()
  1449. comsumer()
  1450. def flow_remove_project_tmp(self,process_count=flow_process_count):
  1451. def producer():
  1452. current_date = getCurrent_date("%Y-%m-%d")
  1453. tmp_date = timeAdd(current_date,-6*31)
  1454. bool_query = BoolQuery(must_queries=[
  1455. RangeQuery(project_page_time,range_to="%s"%(tmp_date))])
  1456. rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2_tmp","project2_tmp_index",
  1457. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time")]),limit=100,get_total_count=True),
  1458. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1459. log("flow_remove project2_tmp producer total_count:%d"%total_count)
  1460. list_dict = getRow_ots(rows)
  1461. for _dict in list_dict:
  1462. self.queue_remove_project.put(_dict)
  1463. _count = len(list_dict)
  1464. while next_token:
  1465. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1466. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1467. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1468. list_dict = getRow_ots(rows)
  1469. for _dict in list_dict:
  1470. self.queue_remove_project.put(_dict)
  1471. _count += len(list_dict)
  1472. def comsumer():
  1473. mt = MultiThreadHandler(self.queue_remove_project,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1474. mt.run()
  1475. def comsumer_handle(item,result_queue,ots_client):
  1476. ptmp = Project_tmp(item)
  1477. ptmp.delete_row(self.ots_client)
  1478. producer()
  1479. comsumer()
  1480. def start_flow_dumplicate(self):
  1481. schedule = BlockingScheduler()
  1482. schedule.add_job(self.flow_remove,"cron",hour="20")
  1483. schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
  1484. schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
  1485. schedule.start()
  1486. def start_flow_merge(self):
  1487. schedule = BlockingScheduler()
  1488. schedule.add_job(self.flow_merge,"cron",second="*/10")
  1489. schedule.start()
  1490. def download_attachment():
  1491. ots_client = getConnect_ots()
  1492. queue_attachment = Queue()
  1493. auth = getAuth()
  1494. oss2.defaults.connection_pool_size = 100
  1495. oss2.defaults.multiget_num_threads = 20
  1496. attachment_bucket_name = "attachment-hub"
  1497. if is_internal:
  1498. bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  1499. else:
  1500. bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  1501. bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name)
  1502. current_path = os.path.dirname(__file__)
  1503. def producer():
  1504. columns = [document_tmp_attachment_path]
  1505. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_crtime,"2022-03-29 15:00:00","2022-03-29 17:00:00",True,True)])
  1506. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1507. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
  1508. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1509. log("flow_attachment producer total_count:%d"%total_count)
  1510. list_dict = getRow_ots(rows)
  1511. for _dict in list_dict:
  1512. queue_attachment.put(_dict)
  1513. _count = len(list_dict)
  1514. while next_token:
  1515. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1516. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1517. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1518. list_dict = getRow_ots(rows)
  1519. for _dict in list_dict:
  1520. queue_attachment.put(_dict)
  1521. _count += len(list_dict)
  1522. def comsumer():
  1523. mt = MultiThreadHandler(queue_attachment,comsumer_handle,None,10,1)
  1524. mt.run()
  1525. def getAttachments(list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
  1526. list_attachment = []
  1527. rows_to_get = []
  1528. for _md5 in list_filemd5[:50]:
  1529. if _md5 is None:
  1530. continue
  1531. primary_key = [(attachment_filemd5,_md5)]
  1532. rows_to_get.append(primary_key)
  1533. req = BatchGetRowRequest()
  1534. req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
  1535. try:
  1536. result = ots_client.batch_get_row(req)
  1537. attach_result = result.get_result_by_table(attachment_table_name)
  1538. for item in attach_result:
  1539. if item.is_ok:
  1540. _dict = getRow_ots_primary(item.row)
  1541. if _dict is not None:
  1542. list_attachment.append(attachment(_dict))
  1543. except Exception as e:
  1544. log(str(list_filemd5))
  1545. log("attachProcess comsumer error %s"%str(e))
  1546. return list_attachment
  1547. def comsumer_handle(item,result_queue):
  1548. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  1549. if len(page_attachments)==0:
  1550. pass
  1551. else:
  1552. list_fileMd5 = []
  1553. for _atta in page_attachments:
  1554. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  1555. list_attach = getAttachments(list_fileMd5)
  1556. for attach in list_attach:
  1557. filemd5 = attach.getProperties().get(attachment_filemd5)
  1558. _status = attach.getProperties().get(attachment_status)
  1559. _filetype = attach.getProperties().get(attachment_filetype)
  1560. _size = attach.getProperties().get(attachment_size)
  1561. _path = attach.getProperties().get(attachment_path)
  1562. _uuid = uuid4()
  1563. objectPath = attach.getProperties().get(attachment_path)
  1564. localpath = os.path.join(current_path,"download","%s.%s"%(filemd5,_filetype))
  1565. try:
  1566. if _size>ATTACHMENT_LARGESIZE:
  1567. pass
  1568. else:
  1569. downloadFile(bucket,objectPath,localpath)
  1570. except Exception as e:
  1571. traceback.print_exc()
  1572. producer()
  1573. comsumer()
  1574. def test_attachment_interface():
  1575. current_path = os.path.dirname(__file__)
  1576. task_queue = Queue()
  1577. def producer():
  1578. _count = 0
  1579. list_filename = os.listdir(os.path.join(current_path,"download"))
  1580. for _filename in list_filename:
  1581. _count += 1
  1582. _type = _filename.split(".")[1]
  1583. task_queue.put({"path":os.path.join(current_path,"download",_filename),"file_type":_type})
  1584. if _count>=500:
  1585. break
  1586. def comsumer():
  1587. mt = MultiThreadHandler(task_queue,comsumer_handle,None,10)
  1588. mt.run()
  1589. def comsumer_handle(item,result_queue):
  1590. _path = item.get("path")
  1591. _type = item.get("file_type")
  1592. _data_base64 = base64.b64encode(open(_path,"rb").read())
  1593. #调用接口处理结果
  1594. start_time = time.time()
  1595. _success,_html,swf_images = getAttachDealInterface(_data_base64,_type)
  1596. log("%s result:%s takes:%d"%(_path,str(_success),time.time()-start_time))
  1597. producer()
  1598. comsumer()
  1599. class Dataflow_attachment(Dataflow):
  1600. def __init__(self):
  1601. Dataflow.__init__(self)
  1602. def flow_attachment_process(self):
  1603. self.process_comsumer()
  1604. def monitor_attachment_process(self):
  1605. alive_count = 0
  1606. for _t in self.process_list_thread:
  1607. if _t.is_alive():
  1608. alive_count += 1
  1609. log("attachment_process alive:%d total:%d"%(alive_count,len(self.process_list_thread)))
  1610. def process_comsumer(self):
  1611. self.process_list_thread = []
  1612. thread_count = 60
  1613. for i in range(thread_count):
  1614. self.process_list_thread.append(Thread(target=self.process_comsumer_handle))
  1615. for t in self.process_list_thread:
  1616. t.start()
  1617. for t in self.process_list_thread:
  1618. t.join()
  1619. def process_comsumer_handle(self):
  1620. while 1:
  1621. _flag = False
  1622. try:
  1623. item = self.queue_attachment_ocr.get(True,timeout=0.2)
  1624. log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
  1625. self.attachment_recognize(item,None)
  1626. log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
  1627. except Exception as e:
  1628. _flag = True
  1629. pass
  1630. try:
  1631. item = self.queue_attachment_not_ocr.get(True,timeout=0.2)
  1632. log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
  1633. self.attachment_recognize(item,None)
  1634. log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
  1635. except Exception as e:
  1636. _flag = True and _flag
  1637. pass
  1638. if _flag:
  1639. time.sleep(2)
  1640. def attachment_recognize(self,_dict,result_queue):
  1641. item = _dict.get("item")
  1642. list_attach = _dict.get("list_attach")
  1643. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1644. "docid":item.get("docid")})
  1645. dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
  1646. _dochtmlcon = dhtml.getProperties().get("dochtmlcon","")
  1647. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  1648. log(str(swf_urls))
  1649. if not _succeed:
  1650. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  1651. else:
  1652. dhtml.updateSWFImages(swf_urls)
  1653. dhtml.updateAttachment(list_html)
  1654. dhtml.update_row(self.ots_client)
  1655. item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
  1656. item[document_tmp_attachment_extract_status] = 1
  1657. log("document:%d get attachments with result:%s"%(item.get("docid"),str(_succeed)))
  1658. dtmp = Document_tmp(item)
  1659. dtmp.update_row(self.ots_client)
  1660. def flow_attachment(self):
  1661. self.flow_attachment_producer()
  1662. self.flow_attachment_producer_comsumer()
  1663. def getAttachments(self,list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
  1664. list_attachment = []
  1665. rows_to_get = []
  1666. for _md5 in list_filemd5[:50]:
  1667. if _md5 is None:
  1668. continue
  1669. primary_key = [(attachment_filemd5,_md5)]
  1670. rows_to_get.append(primary_key)
  1671. req = BatchGetRowRequest()
  1672. req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
  1673. try:
  1674. result = self.ots_client.batch_get_row(req)
  1675. attach_result = result.get_result_by_table(attachment_table_name)
  1676. for item in attach_result:
  1677. if item.is_ok:
  1678. _dict = getRow_ots_primary(item.row)
  1679. if _dict is not None:
  1680. list_attachment.append(attachment(_dict))
  1681. except Exception as e:
  1682. log(str(list_filemd5))
  1683. log("attachProcess comsumer error %s"%str(e))
  1684. return list_attachment
  1685. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  1686. qsize_ocr = self.queue_attachment_ocr.qsize()
  1687. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  1688. log("queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(qsize_ocr,qsize_not_ocr))
  1689. #选择加入数据场景
  1690. if min(qsize_ocr,qsize_not_ocr)>200 or max(qsize_ocr,qsize_not_ocr)>1000:
  1691. return
  1692. #去重
  1693. set_docid = set()
  1694. set_docid = set_docid | set(self.list_attachment_ocr) | set(self.list_attachment_not_ocr)
  1695. if qsize_ocr>0:
  1696. self.list_attachment_ocr = self.list_attachment_ocr[-qsize_ocr:]
  1697. else:
  1698. self.list_attachment_ocr = []
  1699. if qsize_not_ocr>0:
  1700. self.list_attachment_not_ocr = self.list_attachment_not_ocr[-qsize_not_ocr:]
  1701. else:
  1702. self.list_attachment_not_ocr = []
  1703. try:
  1704. bool_query = BoolQuery(must_queries=[
  1705. RangeQuery(document_tmp_status,*flow_attachment_status_from,True,True),
  1706. # TermQuery(document_tmp_docid,234925191),
  1707. ])
  1708. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1709. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
  1710. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1711. log("flow_attachment producer total_count:%d"%total_count)
  1712. list_dict = getRow_ots(rows)
  1713. _count = 0
  1714. for _dict in list_dict:
  1715. docid = _dict.get(document_tmp_docid)
  1716. if docid in set_docid:
  1717. continue
  1718. self.queue_attachment.put(_dict,True)
  1719. _count += 1
  1720. while next_token and _count<flow_process_count:
  1721. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1722. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1723. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1724. list_dict = getRow_ots(rows)
  1725. for _dict in list_dict:
  1726. docid = _dict.get(document_tmp_docid)
  1727. if docid in set_docid:
  1728. continue
  1729. self.queue_attachment.put(_dict,True)
  1730. _count += 1
  1731. log("add attachment count:%d"%(_count))
  1732. except Exception as e:
  1733. log("flow attachment producer error:%s"%(str(e)))
  1734. traceback.print_exc()
  1735. def flow_attachment_producer_comsumer(self):
  1736. log("start flow_attachment comsumer")
  1737. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1)
  1738. mt.run()
  1739. def set_queue(self,_dict):
  1740. list_attach = _dict.get("list_attach")
  1741. to_ocr = False
  1742. for attach in list_attach:
  1743. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  1744. to_ocr = True
  1745. break
  1746. if to_ocr:
  1747. self.queue_attachment_ocr.put(_dict,True)
  1748. # self.list_attachment_ocr.append(_dict.get("item").get(document_tmp_docid))
  1749. else:
  1750. self.queue_attachment_not_ocr.put(_dict,True)
  1751. # self.list_attachment_not_ocr.append(_dict.get("item").get(document_tmp_docid))
  1752. def comsumer_handle(self,item,result_queue):
  1753. try:
  1754. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  1755. if len(page_attachments)==0:
  1756. item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
  1757. dtmp = Document_tmp(item)
  1758. dtmp.update_row(self.ots_client)
  1759. else:
  1760. list_fileMd5 = []
  1761. for _atta in page_attachments:
  1762. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  1763. list_attach = self.getAttachments(list_fileMd5)
  1764. #未上传成功的2小时内不处理
  1765. if len(page_attachments)!=len(list_attach) and time.mktime(time.localtime())-time.mktime(time.strptime(item.get(document_tmp_crtime),"%Y-%m-%d %H:%M:%S"))<7200:
  1766. item[document_tmp_status] = 1
  1767. dtmp = Document_tmp(item)
  1768. dtmp.update_row(self.ots_client)
  1769. return
  1770. self.set_queue({"item":item,"list_attach":list_attach})
  1771. except Exception as e:
  1772. traceback.print_exc()
  1773. def start_flow_attachment(self):
  1774. schedule = BlockingScheduler()
  1775. schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  1776. schedule.add_job(self.flow_attachment,"cron",second="*/10")
  1777. schedule.start()
  1778. class Dataflow_extract(Dataflow):
  1779. def __init__(self):
  1780. Dataflow.__init__(self)
  1781. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  1782. q_size = self.queue_extract.qsize()
  1783. if q_size>100:
  1784. return
  1785. set_docid = set(self.list_extract)
  1786. if q_size>0:
  1787. self.list_extract = self.list_extract[-q_size:]
  1788. else:
  1789. self.list_extract = []
  1790. try:
  1791. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*flow_extract_status_from,True,True)])
  1792. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1793. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.ASC)]),limit=100,get_total_count=True),
  1794. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1795. log("flow_extract producer total_count:%d"%total_count)
  1796. list_dict = getRow_ots(rows)
  1797. for _dict in list_dict:
  1798. docid = _dict.get(document_tmp_docid)
  1799. if docid in set_docid:
  1800. self.list_extract.insert(0,docid)
  1801. continue
  1802. else:
  1803. self.queue_extract.put(_dict)
  1804. self.list_extract.append(docid)
  1805. _count = len(list_dict)
  1806. while next_token and _count<flow_process_count:
  1807. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1808. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1809. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1810. list_dict = getRow_ots(rows)
  1811. for _dict in list_dict:
  1812. docid = _dict.get(document_tmp_docid)
  1813. if docid in set_docid:
  1814. self.list_extract.insert(0,docid)
  1815. continue
  1816. else:
  1817. self.queue_extract.put(_dict)
  1818. self.list_extract.append(docid)
  1819. _count += len(list_dict)
  1820. except Exception as e:
  1821. log("flow extract producer error:%s"%(str(e)))
  1822. traceback.print_exc()
  1823. def flow_extract(self,):
  1824. self.comsumer()
  1825. def comsumer(self):
  1826. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,35,1,True)
  1827. mt.run()
  1828. def comsumer_handle(self,item,result_queue):
  1829. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1830. "docid":item.get("docid")})
  1831. dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
  1832. item[document_tmp_dochtmlcon] = dhtml.getProperties().get(document_tmp_dochtmlcon,"")
  1833. _extract = Document_extract({})
  1834. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  1835. _extract.setValue(document_extract2_docid,item.get(document_docid))
  1836. all_done = 1
  1837. if all_done:
  1838. data = item
  1839. resp = requests.post(self.other_url,json=data,headers=self.header)
  1840. if (resp.status_code >=200 and resp.status_code<=210):
  1841. _extract.setValue(document_extract2_other_json,resp.content.decode("utf8"),True)
  1842. else:
  1843. all_done = -1
  1844. data = {}
  1845. for k,v in item.items():
  1846. data[k] = v
  1847. data["timeout"] = 240
  1848. data["doc_id"] = data.get(document_tmp_docid)
  1849. data["content"] = data.get(document_tmp_dochtmlcon,"")
  1850. if document_tmp_dochtmlcon in data:
  1851. data.pop(document_tmp_dochtmlcon)
  1852. data["title"] = data.get(document_tmp_doctitle,"")
  1853. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  1854. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  1855. if all_done:
  1856. resp = requests.post(self.extract_url,json=data,headers=self.header)
  1857. if (resp.status_code >=200 and resp.status_code<=210):
  1858. _extract.setValue(document_extract2_extract_json,resp.content.decode("utf8"),True)
  1859. else:
  1860. all_done = -2
  1861. if all_done:
  1862. resp = requests.post(self.industy_url,json=data,headers=self.header)
  1863. if (resp.status_code >=200 and resp.status_code<=210):
  1864. _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  1865. else:
  1866. all_done = -3
  1867. _dict = {document_partitionkey:item.get(document_tmp_partitionkey),
  1868. document_docid:item.get(document_tmp_docid),
  1869. }
  1870. dtmp = Document_tmp(_dict)
  1871. if all_done!=1:
  1872. sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  1873. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_failed_to),True)
  1874. dtmp.update_row(self.ots_client)
  1875. else:
  1876. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1877. dtmp.update_row(self.ots_client)
  1878. # 插入接口表,上线放开
  1879. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1880. _extract.update_row(self.ots_client)
  1881. log("process docid:%d %s"%(data["doc_id"],str(all_done)))
  1882. def start_flow_extract(self):
  1883. schedule = BlockingScheduler()
  1884. schedule.add_job(self.flow_extract_producer,"cron",second="*/10")
  1885. schedule.add_job(self.flow_extract,"cron",second="*/10")
  1886. schedule.start()
  1887. class Dataflow_dumplicate(Dataflow):
  1888. class DeleteListener():
  1889. def __init__(self,conn,_func,*args,**kwargs):
  1890. self.conn = conn
  1891. self._func = _func
  1892. def on_error(self, headers,*args,**kwargs):
  1893. log('received an error %s' % str(headers.body))
  1894. def on_message(self, headers,*args,**kwargs):
  1895. try:
  1896. message_id = headers.headers["message-id"]
  1897. body = headers.body
  1898. log("get message %s"%(message_id))
  1899. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  1900. except Exception as e:
  1901. traceback.print_exc()
  1902. pass
  1903. def __del__(self):
  1904. self.conn.disconnect()
  1905. def __init__(self,start_delete_listener=True):
  1906. Dataflow.__init__(self,)
  1907. self.c_f_get_extractCount = f_get_extractCount()
  1908. self.c_f_get_package = f_get_package()
  1909. logging.basicConfig(level = logging.info,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  1910. self.fix_doc_docid = None
  1911. self.bdm = BaseDataMonitor()
  1912. if start_delete_listener:
  1913. self.delete_comsumer_counts = 2
  1914. self.doc_delete_queue = "/queue/doc_delete_queue"
  1915. self.doc_delete_result = "/queue/doc_delete_result"
  1916. self.pool_mq_ali = ConnectorPool(1,10,getConnect_activateMQ_ali)
  1917. for _ in range(self.delete_comsumer_counts):
  1918. conn = getConnect_activateMQ_ali()
  1919. listener = self.DeleteListener(conn,self.delete_doc_handle)
  1920. createComsumer(listener,self.doc_delete_queue)
  1921. def get_dict_time(self,_extract,keys=["time_bidclose","time_bidopen","time_bidstart","time_commencement","time_completion","time_earnestMoneyEnd","time_earnestMoneyStart","time_getFileEnd","time_getFileStart","time_publicityEnd","time_publicityStart","time_registrationEnd","time_registrationStart","time_release"]):
  1922. dict_time = {}
  1923. for k in keys:
  1924. dict_time[k] = _extract.get(k)
  1925. return dict_time
  1926. def post_extract(self,_dict):
  1927. win_tenderer,bidding_budget,win_bid_price,_ = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
  1928. _dict["win_tenderer"] = win_tenderer
  1929. _dict["bidding_budget"] = bidding_budget
  1930. _dict["win_bid_price"] = win_bid_price
  1931. extract_json = _dict.get(document_tmp_extract_json,"{}")
  1932. _extract = json.loads(extract_json)
  1933. _dict["product"] = ",".join(_extract.get("product",[]))
  1934. _dict["fingerprint"] = _extract.get("fingerprint","")
  1935. _dict["project_codes"] = _extract.get("code",[])
  1936. if len(_dict["project_codes"])>0:
  1937. _dict["project_code"] = _dict["project_codes"][0]
  1938. else:
  1939. _dict["project_code"] = ""
  1940. _dict["doctitle_refine"] = _extract.get("doctitle_refine","")
  1941. _dict["nlp_enterprise"] = str({"indoctextcon":_extract.get("nlp_enterprise",[]),
  1942. "notindoctextcon":_extract.get("nlp_enterprise_attachment",[])})
  1943. _dict["extract_count"] = self.c_f_get_extractCount.evaluate(extract_json)
  1944. _dict["package"] = self.c_f_get_package.evaluate(extract_json)
  1945. _dict["project_name"] = _extract.get("name","")
  1946. _dict["dict_time"] = self.get_dict_time(_extract)
  1947. def dumplicate_fianl_check(self,base_list):
  1948. the_group = base_list
  1949. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  1950. _index = 0
  1951. base_fingerprint = "None"
  1952. if len(base_list)>0:
  1953. base_fingerprint = base_list[0]["fingerprint"]
  1954. for _i in range(1,len(base_list)):
  1955. _dict1 = base_list[_i]
  1956. fingerprint_less = _dict1["fingerprint"]
  1957. _pass = True
  1958. if fingerprint_less==base_fingerprint:
  1959. _index = _i
  1960. continue
  1961. for _j in range(min(_i,10)):
  1962. _dict2 = base_list[_j]
  1963. _prob = self.dumplicate_check(_dict1,_dict2,_dict2.get("min_counts",10),b_log=False)
  1964. # print("_prob:",_prob)
  1965. if _prob<=0.1:
  1966. _pass = False
  1967. break
  1968. log("checking index:%d"%(_i))
  1969. _index = _i
  1970. if not _pass:
  1971. _index -= 1
  1972. break
  1973. if _index>=1:
  1974. # #对重复入库的进行去重
  1975. # _l = the_group[:_index+1]
  1976. # set_fingerprint = set()
  1977. # final_l = []
  1978. # for _dict in _l:
  1979. # fingerprint_less = _dict["fingerprint"]
  1980. # if fingerprint_less in set_fingerprint:
  1981. # continue
  1982. # else:
  1983. # final_l.append(_dict)
  1984. # set_fingerprint.add(fingerprint_less)
  1985. return the_group[:_index+1]
  1986. return []
  1987. def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
  1988. document_less = _dict1
  1989. docid_less = _dict1["docid"]
  1990. docchannel_less = document_less["docchannel"]
  1991. page_time_less = document_less["page_time"]
  1992. doctitle_refine_less = document_less["doctitle_refine"]
  1993. project_codes_less = document_less["project_codes"]
  1994. nlp_enterprise_less = document_less["nlp_enterprise"]
  1995. tenderee_less = document_less["tenderee"]
  1996. agency_less = document_less["agency"]
  1997. win_tenderer_less = document_less["win_tenderer"]
  1998. bidding_budget_less = document_less["bidding_budget"]
  1999. win_bid_price_less = document_less["win_bid_price"]
  2000. product_less = document_less["product"]
  2001. package_less = document_less["package"]
  2002. json_time_less = document_less["dict_time"]
  2003. project_name_less = document_less["project_name"]
  2004. fingerprint_less = document_less["fingerprint"]
  2005. extract_count_less = document_less["extract_count"]
  2006. web_source_no_less = document_less.get("web_source_no")
  2007. document_greater = _dict2
  2008. docid_greater = _dict2["docid"]
  2009. page_time_greater = document_greater["page_time"]
  2010. docchannel_greater = document_greater["docchannel"]
  2011. doctitle_refine_greater = document_greater["doctitle_refine"]
  2012. project_codes_greater = document_greater["project_codes"]
  2013. nlp_enterprise_greater = document_greater["nlp_enterprise"]
  2014. tenderee_greater = document_greater["tenderee"]
  2015. agency_greater = document_greater["agency"]
  2016. win_tenderer_greater = document_greater["win_tenderer"]
  2017. bidding_budget_greater = document_greater["bidding_budget"]
  2018. win_bid_price_greater = document_greater["win_bid_price"]
  2019. product_greater = document_greater["product"]
  2020. package_greater = document_greater["package"]
  2021. json_time_greater = document_greater["dict_time"]
  2022. project_name_greater = document_greater["project_name"]
  2023. fingerprint_greater = document_greater["fingerprint"]
  2024. extract_count_greater = document_greater["extract_count"]
  2025. web_source_no_greater = document_greater.get("web_source_no")
  2026. hard_level=1
  2027. if web_source_no_less==web_source_no_greater=="17397-3":
  2028. hard_level=2
  2029. return check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,min_counts,b_log=b_log,hard_level=hard_level)
  2030. def dumplicate_check_bak(self,_dict1,_dict2,min_counts,b_log=False):
  2031. document_less = _dict1
  2032. docid_less = _dict1["docid"]
  2033. docchannel_less = document_less["docchannel"]
  2034. page_time_less = document_less["page_time"]
  2035. doctitle_refine_less = document_less["doctitle_refine"]
  2036. project_codes_less = document_less["project_codes"]
  2037. nlp_enterprise_less = document_less["nlp_enterprise"]
  2038. tenderee_less = document_less["tenderee"]
  2039. agency_less = document_less["agency"]
  2040. win_tenderer_less = document_less["win_tenderer"]
  2041. bidding_budget_less = document_less["bidding_budget"]
  2042. win_bid_price_less = document_less["win_bid_price"]
  2043. product_less = document_less["product"]
  2044. package_less = document_less["package"]
  2045. json_time_less = document_less["dict_time"]
  2046. project_name_less = document_less["project_name"]
  2047. fingerprint_less = document_less["fingerprint"]
  2048. extract_count_less = document_less["extract_count"]
  2049. document_greater = _dict2
  2050. docid_greater = _dict2["docid"]
  2051. page_time_greater = document_greater["page_time"]
  2052. doctitle_refine_greater = document_greater["doctitle_refine"]
  2053. project_codes_greater = document_greater["project_codes"]
  2054. nlp_enterprise_greater = document_greater["nlp_enterprise"]
  2055. tenderee_greater = document_greater["tenderee"]
  2056. agency_greater = document_greater["agency"]
  2057. win_tenderer_greater = document_greater["win_tenderer"]
  2058. bidding_budget_greater = document_greater["bidding_budget"]
  2059. win_bid_price_greater = document_greater["win_bid_price"]
  2060. product_greater = document_greater["product"]
  2061. package_greater = document_greater["package"]
  2062. json_time_greater = document_greater["dict_time"]
  2063. project_name_greater = document_greater["project_name"]
  2064. fingerprint_greater = document_greater["fingerprint"]
  2065. extract_count_greater = document_greater["extract_count"]
  2066. if fingerprint_less==fingerprint_greater:
  2067. return 1
  2068. same_count = 0
  2069. all_count = 8
  2070. if len(set(project_codes_less) & set(project_codes_greater))>0:
  2071. same_count += 1
  2072. if getLength(tenderee_less)>0 and tenderee_less==tenderee_greater:
  2073. same_count += 1
  2074. if getLength(agency_less)>0 and agency_less==agency_greater:
  2075. same_count += 1
  2076. if getLength(win_tenderer_less)>0 and win_tenderer_less==win_tenderer_greater:
  2077. same_count += 1
  2078. if getLength(bidding_budget_less)>0 and bidding_budget_less==bidding_budget_greater:
  2079. same_count += 1
  2080. if getLength(win_bid_price_less)>0 and win_bid_price_less==win_bid_price_greater:
  2081. same_count += 1
  2082. if getLength(project_name_less)>0 and project_name_less==project_name_greater:
  2083. same_count += 1
  2084. if getLength(doctitle_refine_less)>0 and doctitle_refine_less==doctitle_refine_greater:
  2085. same_count += 1
  2086. base_prob = 0
  2087. if min_counts<3:
  2088. base_prob = 0.9
  2089. elif min_counts<5:
  2090. base_prob = 0.8
  2091. elif min_counts<8:
  2092. base_prob = 0.7
  2093. else:
  2094. base_prob = 0.6
  2095. _prob = base_prob*same_count/all_count
  2096. if _prob<0.1 and min(extract_count_less,extract_count_greater)<=3:
  2097. _prob = 0.15
  2098. if _prob<0.1:
  2099. return _prob
  2100. check_result = {"pass":1}
  2101. if docchannel_less in (51,102,103,104,115,116,117):
  2102. if doctitle_refine_less!=doctitle_refine_greater:
  2103. if page_time_less!=page_time_greater:
  2104. check_result["docchannel"] = 0
  2105. check_result["pass"] = 0
  2106. else:
  2107. check_result["docchannel"] = 2
  2108. if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater):
  2109. check_result["doctitle"] = 0
  2110. check_result["pass"] = 0
  2111. if b_log:
  2112. logging.info("%d-%d,check_doctitle_failed:%s==%s"%(docid_less,docid_greater,str(doctitle_refine_less),str(doctitle_refine_greater)))
  2113. else:
  2114. check_result["doctitle"] = 2
  2115. #added check
  2116. if not check_codes(project_codes_less,project_codes_greater):
  2117. check_result["code"] = 0
  2118. check_result["pass"] = 0
  2119. if b_log:
  2120. logging.info("%d-%d,check_code_failed:%s==%s"%(docid_less,docid_greater,str(project_codes_less),str(project_codes_greater)))
  2121. else:
  2122. if getLength(project_codes_less)>0 and getLength(project_codes_greater)>0 and len(set(project_codes_less) & set(project_codes_greater))>0:
  2123. check_result["code"] = 2
  2124. else:
  2125. check_result["code"] = 1
  2126. if not check_product(product_less,product_greater):
  2127. check_result["product"] = 0
  2128. check_result["pass"] = 0
  2129. if b_log:
  2130. logging.info("%d-%d,check_product_failed:%s==%s"%(docid_less,docid_greater,str(product_less),str(product_greater)))
  2131. else:
  2132. if getLength(product_less)>0 and getLength(product_greater)>0:
  2133. check_result["product"] = 2
  2134. else:
  2135. check_result["product"] = 1
  2136. if not check_demand():
  2137. check_result["pass"] = 0
  2138. if not check_entity(nlp_enterprise_less,nlp_enterprise_greater,
  2139. tenderee_less,tenderee_greater,
  2140. agency_less,agency_greater,
  2141. win_tenderer_less,win_tenderer_greater):
  2142. check_result["entity"] = 0
  2143. check_result["pass"] = 0
  2144. if b_log:
  2145. logging.info("%d-%d,check_entity_failed:%s==%s==%s==%s==%s==%s==%s==%s"%(docid_less,docid_greater,str(nlp_enterprise_less),str(nlp_enterprise_greater),str(tenderee_less),str(tenderee_greater),str(agency_less),str(agency_greater),str(win_tenderer_less),str(win_tenderer_greater)))
  2146. else:
  2147. if docchannel_less in (51,52,103,105,114,118) and getLength(tenderee_less)>0 and getLength(tenderee_greater)>0:
  2148. check_result["entity"] = 2
  2149. elif docchannel_less in (101,119,120) and getLength(win_tenderer_less)>0 and getLength(win_tenderer_greater)>0:
  2150. check_result["entity"] = 2
  2151. else:
  2152. check_result["entity"] = 1
  2153. if not check_money(bidding_budget_less,bidding_budget_greater,
  2154. win_bid_price_less,win_bid_price_greater):
  2155. if b_log:
  2156. logging.info("%d-%d,check_money_failed:%s==%s==%s==%s"%(docid_less,docid_greater,str(bidding_budget_less),str(bidding_budget_greater),str(win_bid_price_less),str(win_bid_price_greater)))
  2157. check_result["money"] = 0
  2158. check_result["pass"] = 0
  2159. else:
  2160. if docchannel_less in (51,52,103,105,114,118) and getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0:
  2161. check_result["money"] = 2
  2162. elif docchannel_less in (101,119,120) and getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0:
  2163. check_result["money"] = 2
  2164. else:
  2165. check_result["money"] = 1
  2166. #added check
  2167. if not check_package(package_less,package_greater):
  2168. if b_log:
  2169. logging.info("%d-%d,check_package_failed:%s==%s"%(docid_less,docid_greater,str(package_less),str(package_greater)))
  2170. check_result["package"] = 0
  2171. check_result["pass"] = 0
  2172. else:
  2173. if getLength(package_less)>0 and getLength(package_greater)>0:
  2174. check_result["package"] = 2
  2175. else:
  2176. check_result["package"] = 1
  2177. #added check
  2178. if not check_time(json_time_less,json_time_greater):
  2179. if b_log:
  2180. logging.info("%d-%d,check_time_failed:%s==%s"%(docid_less,docid_greater,str(json_time_less),str(json_time_greater)))
  2181. if isinstance(json_time_less,dict):
  2182. time_less = json_time_less
  2183. else:
  2184. time_less = json.loads(json_time_less)
  2185. if isinstance(json_time_greater,dict):
  2186. time_greater = json_time_greater
  2187. else:
  2188. time_greater = json.loads(json_time_greater)
  2189. for k,v in time_less.items():
  2190. if getLength(v)>0:
  2191. v1 = time_greater.get(k,"")
  2192. if getLength(v1)>0:
  2193. if v!=v1:
  2194. log("%d-%d,key:%s"%(docid_less,docid_greater,str(k)))
  2195. check_result["time"] = 0
  2196. check_result["pass"] = 0
  2197. else:
  2198. if getLength(json_time_less)>10 and getLength(json_time_greater)>10:
  2199. check_result["time"] = 2
  2200. else:
  2201. check_result["time"] = 1
  2202. if check_result.get("pass",0)==0:
  2203. if b_log:
  2204. logging.info(str(check_result))
  2205. if check_result.get("money",1)==0:
  2206. return 0
  2207. if check_result.get("entity",1)==2 and check_result.get("code",1)==2 and check_result.get("doctitle",2)==2 and check_result.get("product",2)==2 and check_result.get("money",0)==2:
  2208. return _prob
  2209. else:
  2210. return 0
  2211. if check_result.get("time",1)==0:
  2212. return 0
  2213. return _prob
  2214. def search_data_by_query(self,item,_query,confidence,retry_times=3,merge=False,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  2215. for _ in range(retry_times):
  2216. try:
  2217. _time = time.time()
  2218. check_time = 0
  2219. if isinstance(_query,list):
  2220. bool_query = BoolQuery(should_queries=_query)
  2221. else:
  2222. bool_query = _query
  2223. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  2224. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=30,get_total_count=True),
  2225. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2226. list_dict = getRow_ots(rows)
  2227. list_data = []
  2228. for _dict in list_dict:
  2229. self.post_extract(_dict)
  2230. _docid = _dict.get(document_tmp_docid)
  2231. if merge:
  2232. list_data.append(_dict)
  2233. else:
  2234. if _docid!=item.get(document_tmp_docid):
  2235. _time1 = time.time()
  2236. confidence = self.dumplicate_check(item,_dict,total_count,b_log=False)
  2237. check_time+= time.time()-_time1
  2238. _dict["confidence"] = confidence
  2239. _dict["min_counts"] = total_count
  2240. print("check====",item.get("docid"),_dict.get("docid"),confidence)
  2241. if not confidence<0.1:
  2242. list_data.append(_dict)
  2243. all_time = time.time()-_time
  2244. # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
  2245. return list_data
  2246. except Exception as e:
  2247. traceback.print_exc()
  2248. return []
  2249. def add_data_by_query(self,item,base_list,set_docid,_query,confidence,table_name,table_index,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  2250. list_dict = self.search_data_by_query(item,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
  2251. for _dict in list_dict:
  2252. _docid = _dict.get(document_tmp_docid)
  2253. confidence = _dict["confidence"]
  2254. print("confidence",_docid,confidence)
  2255. if confidence>0.1:
  2256. if _docid not in set_docid:
  2257. base_list.append(_dict)
  2258. set_docid.add(_docid)
  2259. set_docid.add(_docid)
  2260. def appendRule(self,list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=False):
  2261. for k,v in _dict.items():
  2262. if getLength(v)==0:
  2263. return
  2264. _dict.update(base_dict)
  2265. if b_log:
  2266. log(str(_dict))
  2267. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  2268. _rule = {"confidence":confidence,
  2269. "item":item,
  2270. "query":_query,
  2271. "singleNum_keys":[],
  2272. "contain_keys":[],
  2273. "multiNum_keys":[]}
  2274. list_rules.append(_rule)
  2275. def translate_dumplicate_rules(self,status_from,item,get_all=False,to_log=False):
  2276. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  2277. current_date = getCurrent_date("%Y-%m-%d")
  2278. if page_time=='':
  2279. page_time = current_date
  2280. if page_time>=timeAdd(current_date,-2):
  2281. table_name = "document_tmp"
  2282. table_index = "document_tmp_index"
  2283. base_dict = {
  2284. "docchannel":item.get("docchannel",52),
  2285. "status":[status_from[0]],
  2286. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  2287. }
  2288. must_not_dict = {"save":0,"docid":item.get("docid")}
  2289. doctitle_refine_name = "doctitle_refine"
  2290. else:
  2291. table_name = "document"
  2292. table_index = "document_index"
  2293. if get_all:
  2294. _status = [201,450]
  2295. else:
  2296. _status = [201,300]
  2297. base_dict = {
  2298. "docchannel":item["docchannel"],
  2299. "status":_status,
  2300. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  2301. }
  2302. must_not_dict = {"docid":item.get("docid")}
  2303. doctitle_refine_name = "doctitle"
  2304. list_rules = []
  2305. singleNum_keys = ["tenderee","win_tenderer"]
  2306. confidence = 100
  2307. self.appendRule(list_rules,{document_tmp_fingerprint:fingerprint},base_dict,must_not_dict,confidence,item,b_log=to_log)
  2308. confidence = 90
  2309. _dict = {document_tmp_agency:agency,
  2310. "win_tenderer":win_tenderer,
  2311. "win_bid_price":win_bid_price}
  2312. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2313. _dict = {document_tmp_agency:agency,
  2314. "win_tenderer":win_tenderer,
  2315. "bidding_budget":bidding_budget}
  2316. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2317. _dict = {document_tmp_agency:agency,
  2318. "win_bid_price":win_bid_price,
  2319. "bidding_budget":bidding_budget}
  2320. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2321. _dict = {win_tenderer:win_tenderer,
  2322. "win_bid_price":win_bid_price,
  2323. "bidding_budget":bidding_budget}
  2324. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2325. _dict = {"tenderee":tenderee,
  2326. "win_tenderer":win_tenderer,
  2327. "win_bid_price":win_bid_price}
  2328. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2329. _dict = {"tenderee":tenderee,
  2330. "win_tenderer":win_tenderer,
  2331. "bidding_budget":bidding_budget}
  2332. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2333. _dict = {"tenderee":tenderee,
  2334. "win_bid_price":win_bid_price,
  2335. "bidding_budget":bidding_budget}
  2336. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2337. _dict = {"tenderee":tenderee,
  2338. "agency":agency,
  2339. "win_tenderer":win_tenderer}
  2340. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2341. _dict = {"tenderee":tenderee,
  2342. "agency":agency,
  2343. "win_bid_price":win_bid_price}
  2344. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2345. _dict = {"tenderee":tenderee,
  2346. "agency":agency,
  2347. "bidding_budget":bidding_budget}
  2348. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2349. confidence=85
  2350. _dict = {"tenderee":tenderee,
  2351. "agency":agency
  2352. }
  2353. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2354. _dict = {"tenderee":tenderee,
  2355. "project_codes":project_code
  2356. }
  2357. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2358. _dict = {"tenderee":tenderee,
  2359. "project_name":project_name
  2360. }
  2361. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2362. if getLength(product)>0:
  2363. l_p = product.split(",")
  2364. _dict = {"tenderee":tenderee,
  2365. "product":l_p[0]
  2366. }
  2367. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2368. _dict = {"tenderee":tenderee,
  2369. "win_tenderer":win_tenderer
  2370. }
  2371. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2372. _dict = {"tenderee":tenderee,
  2373. "win_bid_price":win_bid_price
  2374. }
  2375. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2376. _dict = {"tenderee":tenderee,
  2377. "bidding_budget":bidding_budget
  2378. }
  2379. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2380. _dict = {"tenderee":tenderee,
  2381. doctitle_refine_name:doctitle_refine
  2382. }
  2383. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2384. _dict = {"agency":agency,
  2385. "project_codes":project_code
  2386. }
  2387. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2388. _dict = {"agency":agency,
  2389. "project_name":project_name
  2390. }
  2391. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2392. _dict = {"project_codes":project_code,
  2393. "project_name":project_name
  2394. }
  2395. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2396. _dict = {"project_codes":project_code,
  2397. "win_tenderer":win_tenderer
  2398. }
  2399. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2400. _dict = {"project_codes":project_code,
  2401. "win_bid_price":win_bid_price
  2402. }
  2403. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2404. _dict = {"project_codes":project_code,
  2405. "bidding_budget":bidding_budget
  2406. }
  2407. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2408. _dict = {"project_codes":project_code,
  2409. doctitle_refine_name:doctitle_refine
  2410. }
  2411. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2412. _dict = {"project_name":project_name,
  2413. "win_tenderer":win_tenderer
  2414. }
  2415. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2416. _dict = {"project_name":project_name,
  2417. "win_bid_price":win_bid_price
  2418. }
  2419. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2420. _dict = {"project_name":project_name,
  2421. "bidding_budget":bidding_budget
  2422. }
  2423. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2424. _dict = {"project_name":project_name,
  2425. doctitle_refine_name:doctitle_refine
  2426. }
  2427. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2428. _dict = {"win_tenderer":win_tenderer,
  2429. "win_bid_price":win_bid_price
  2430. }
  2431. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2432. _dict = {"win_tenderer":win_tenderer,
  2433. "bidding_budget":bidding_budget
  2434. }
  2435. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2436. _dict = {"win_tenderer":win_tenderer,
  2437. doctitle_refine_name:doctitle_refine
  2438. }
  2439. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2440. _dict = {"win_bid_price":win_bid_price,
  2441. "bidding_budget":bidding_budget
  2442. }
  2443. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2444. _dict = {"win_bid_price":win_bid_price,
  2445. doctitle_refine_name:doctitle_refine
  2446. }
  2447. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2448. _dict = {"bidding_budget":bidding_budget,
  2449. doctitle_refine_name:doctitle_refine
  2450. }
  2451. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2452. confidence=80
  2453. _dict = {doctitle_refine_name:doctitle_refine}
  2454. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2455. _dict = {"project_codes":project_code}
  2456. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2457. confidence=70
  2458. _dict = {"project_name":project_name}
  2459. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2460. return list_rules,table_name,table_index
  2461. def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
  2462. def producer(columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document]):
  2463. q_size = self.queue_dumplicate.qsize()
  2464. log("dumplicate queue size %d"%(q_size))
  2465. if q_size>flow_process_count//3:
  2466. return
  2467. bool_query = BoolQuery(must_queries=[
  2468. RangeQuery(document_tmp_status,*status_from,True,True),
  2469. # TermQuery("docid",271983871)
  2470. ])
  2471. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2472. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_update_document,SortOrder.DESC),FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
  2473. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2474. log("flow_dumplicate producer total_count:%d"%total_count)
  2475. list_dict = getRow_ots(rows)
  2476. for _dict in list_dict:
  2477. docid = _dict.get(document_tmp_docid)
  2478. if docid in self.dumplicate_set:
  2479. continue
  2480. self.dumplicate_set.add(docid)
  2481. self.queue_dumplicate.put(_dict)
  2482. _count = len(list_dict)
  2483. while next_token and _count<flow_process_count:
  2484. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2485. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  2486. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2487. list_dict = getRow_ots(rows)
  2488. for _dict in list_dict:
  2489. docid = _dict.get(document_tmp_docid)
  2490. if docid in self.dumplicate_set:
  2491. continue
  2492. self.dumplicate_set.add(docid)
  2493. self.queue_dumplicate.put(_dict)
  2494. _count += len(list_dict)
  2495. _l = list(self.dumplicate_set)
  2496. _l.sort(key=lambda x:x,reverse=True)
  2497. self.dumplicate_set = set(_l[:flow_process_count]) | set(_l[-flow_process_count:])
  2498. def comsumer():
  2499. mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,60,1,ots_client=self.ots_client)
  2500. mt.run()
  2501. producer()
  2502. # comsumer()
  2503. def flow_dumpcate_comsumer(self):
  2504. from multiprocessing import Process
  2505. process_count = 2
  2506. thread_count = 30
  2507. list_process = []
  2508. def start_thread():
  2509. mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,thread_count,1,need_stop=False,ots_client=self.ots_client)
  2510. mt.run()
  2511. for _ in range(process_count):
  2512. p = Process(target=start_thread)
  2513. list_process.append(p)
  2514. for p in list_process:
  2515. p.start()
  2516. for p in list_process:
  2517. p.join()
  2518. # mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,40,1,ots_client=self.ots_client)
  2519. # mt.run()
  2520. def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment]):
  2521. '''
  2522. 根据docid查询公告内容,先查询document_tmp,再查询document
  2523. :param list_docids:
  2524. :return:
  2525. '''
  2526. list_docs = []
  2527. set_fingerprint = set()
  2528. for _docid in list_docids:
  2529. docid = int(_docid)
  2530. _dict = {document_partitionkey:getPartitionKey(docid),
  2531. document_docid:docid}
  2532. _doc = Document_tmp(_dict)
  2533. _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
  2534. if not _exists:
  2535. _doc = Document(_dict)
  2536. _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
  2537. if _exists:
  2538. _fingerprint = _doc.getProperties().get(document_fingerprint)
  2539. if _fingerprint in set_fingerprint:
  2540. continue
  2541. set_fingerprint.add(_fingerprint)
  2542. list_docs.append(_doc)
  2543. for _doc in list_docs:
  2544. try:
  2545. _sub_docs_json = _doc.getProperties().get(document_tmp_sub_docs_json)
  2546. if _sub_docs_json is not None:
  2547. _doc.setValue("sub_docs",json.loads(_sub_docs_json),False)
  2548. except Exception as e:
  2549. traceback.print_exc()
  2550. list_docs.sort(key=lambda x:x.getProperties().get(document_page_time,""))
  2551. return list_docs
  2552. def is_same_package(self,_dict1,_dict2):
  2553. sub_project_name1 = _dict1.get(project_sub_project_name,"")
  2554. if sub_project_name1=="Project":
  2555. sub_project_name1 = ""
  2556. win_tenderer1 = _dict1.get(project_win_tenderer,"")
  2557. win_bid_price1 = _dict1.get(project_win_bid_price,0)
  2558. bidding_budget1 = _dict1.get(project_bidding_budget,0)
  2559. sub_project_name2 = _dict2.get(project_sub_project_name,"")
  2560. if sub_project_name2=="Project":
  2561. sub_project_name2 = ""
  2562. win_tenderer2 = _dict2.get(project_win_tenderer,"")
  2563. win_bid_price2 = _dict2.get(project_win_bid_price,0)
  2564. bidding_budget2 = _dict2.get(project_bidding_budget,0)
  2565. _set = set([a for a in [sub_project_name1,sub_project_name2] if a!=""])
  2566. if len(_set)>1:
  2567. return False
  2568. _set = set([a for a in [win_tenderer1,win_tenderer2] if a!=""])
  2569. if len(_set)>1:
  2570. return False
  2571. _set = set([a for a in [win_bid_price1,win_bid_price2] if a!=0])
  2572. if len(_set)>1:
  2573. return False
  2574. _set = set([a for a in [bidding_budget1,bidding_budget2] if a!=0])
  2575. if len(_set)>1:
  2576. return False
  2577. return True
  2578. def getUpdate_dict(self,_dict):
  2579. update_dict = {}
  2580. for k,v in _dict.items():
  2581. if v is None:
  2582. continue
  2583. if isinstance(v,str):
  2584. if v=="":
  2585. continue
  2586. if isinstance(v,(float,int)):
  2587. if v==0:
  2588. continue
  2589. update_dict[k] = v
  2590. return update_dict
  2591. def update_projects_by_document(self,docid,save,projects):
  2592. '''
  2593. 更新projects中对应的document的属性
  2594. :param docid:
  2595. :param projects: 项目集合
  2596. :param action:add/delete add时附加唯一属性,delete时删除唯一属性
  2597. :return:
  2598. '''
  2599. list_docs = self.search_docs([docid])
  2600. docs = [_doc.getProperties() for _doc in list_docs]
  2601. project_dict = generate_common_properties(docs)
  2602. list_package_properties = generate_packages_properties(docs)
  2603. _dict = {}
  2604. #更新公共属性
  2605. _replace_replace = False
  2606. v = project_dict.get(document_district,"")
  2607. if not (v is None or v=="" or v=="[]" or v=="未知"):
  2608. _replace_replace = True
  2609. for k,v in project_dict.items():
  2610. if not _replace_replace:
  2611. if k in [document_district,document_city,document_province,document_area]:
  2612. continue
  2613. if v is None or v=="" or v=="[]" or v=="未知":
  2614. continue
  2615. if k in (project_project_dynamics,project_product,project_project_codes,project_docids):
  2616. continue
  2617. _dict[k] = v
  2618. for _proj in projects:
  2619. _proj.update(_dict)
  2620. for _proj in projects:
  2621. if _proj.get(project_page_time,"")<project_dict.get(project_page_time,""):
  2622. _proj[project_page_time] = project_dict.get(project_page_time,"")
  2623. #拼接属性
  2624. append_dict = {}
  2625. set_docid = set()
  2626. set_product = set()
  2627. set_code = set()
  2628. set_nlp_enterprise = set()
  2629. set_nlp_enterprise_attachment = set()
  2630. for _proj in projects:
  2631. _docids = _proj.get(project_docids,"")
  2632. _codes = _proj.get(project_project_codes,"")
  2633. _product = _proj.get(project_product,"")
  2634. set_docid = set(_docids.split(","))
  2635. if save==1:
  2636. set_docid.add(str(docid))
  2637. else:
  2638. if str(docid) in set_docid:
  2639. set_docid.remove(str(docid))
  2640. set_code = set_code | set(_codes.split(","))
  2641. set_product = set_product | set(_product.split(","))
  2642. try:
  2643. set_nlp_enterprise |= set(json.loads(_proj.get(project_nlp_enterprise,"[]")))
  2644. set_nlp_enterprise_attachment |= set(json.loads(_proj.get(project_nlp_enterprise_attachment,"[]")))
  2645. except Exception as e:
  2646. pass
  2647. set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
  2648. set_product = set_product | set(project_dict.get(project_product,"").split(","))
  2649. try:
  2650. set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
  2651. set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
  2652. except Exception as e:
  2653. pass
  2654. append_dict[project_docids] = ",".join([a for a in list(set_docid) if a!=""])
  2655. append_dict[project_docid_number] = len(set_docid)
  2656. append_dict[project_project_codes] = ",".join([a for a in list(set_code) if a!=""])
  2657. append_dict[project_product] = ",".join([a for a in list(set_product) if a!=""])
  2658. append_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
  2659. append_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
  2660. dict_dynamic = {}
  2661. set_docid = set()
  2662. _dynamic = json.loads(_proj.get(project_project_dynamics,"[]"))
  2663. for _dy in _dynamic:
  2664. _docid = _dy.get("docid")
  2665. dict_dynamic[_docid] = _dy
  2666. _dynamic = json.loads(project_dict.get(project_project_dynamics,"[]"))
  2667. for _dy in _dynamic:
  2668. _docid = _dy.get("docid")
  2669. dict_dynamic[_docid] = _dy
  2670. list_dynamics = []
  2671. for k,v in dict_dynamic.items():
  2672. list_dynamics.append(v)
  2673. list_dynamics.sort(key=lambda x:x.get(document_page_time,""))
  2674. append_dict[project_project_dynamics] = json.dumps(list_dynamics[:100],ensure_ascii=False)
  2675. _proj.update(append_dict)
  2676. dict_package = {}
  2677. for _pp in projects:
  2678. _counts = 0
  2679. sub_project_name = _pp.get(project_sub_project_name,"")
  2680. if sub_project_name=="Project":
  2681. sub_project_name = ""
  2682. win_tenderer = _pp.get(project_win_tenderer,"")
  2683. win_bid_price = _pp.get(project_win_bid_price,0)
  2684. bidding_budget = _pp.get(project_bidding_budget,0)
  2685. if win_tenderer!="" and bidding_budget!=0:
  2686. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2687. dict_package[_key] = _pp
  2688. _counts += 1
  2689. if win_tenderer!="" and win_bid_price!=0:
  2690. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2691. dict_package[_key] = _pp
  2692. _counts +=1
  2693. if _counts==0:
  2694. if win_tenderer!="":
  2695. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2696. dict_package[_key] = _pp
  2697. _counts += 1
  2698. if bidding_budget!=0:
  2699. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2700. dict_package[_key] = _pp
  2701. _counts += 1
  2702. #更新私有属性
  2703. for _pp in list_package_properties:
  2704. flag_update = False
  2705. sub_project_name = _pp.get(project_sub_project_name,"")
  2706. if sub_project_name=="Project":
  2707. sub_project_name = ""
  2708. win_tenderer = _pp.get(project_win_tenderer,"")
  2709. win_bid_price = _pp.get(project_win_bid_price,0)
  2710. bidding_budget = _pp.get(project_bidding_budget,0)
  2711. if win_tenderer!="" and bidding_budget!=0:
  2712. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2713. if _key in dict_package:
  2714. if self.is_same_package(_pp,dict_package[_key]):
  2715. ud = self.getUpdate_dict(_pp)
  2716. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2717. dict_package[_key].update(ud)
  2718. flag_update = True
  2719. continue
  2720. if win_tenderer!="" and win_bid_price!=0:
  2721. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2722. if _key in dict_package:
  2723. if self.is_same_package(_pp,dict_package[_key]):
  2724. ud = self.getUpdate_dict(_pp)
  2725. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2726. dict_package[_key].update(ud)
  2727. flag_update = True
  2728. continue
  2729. if win_tenderer!="":
  2730. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2731. if _key in dict_package:
  2732. if self.is_same_package(_pp,dict_package[_key]):
  2733. ud = self.getUpdate_dict(_pp)
  2734. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2735. dict_package[_key].update(ud)
  2736. flag_update = True
  2737. continue
  2738. if bidding_budget!=0:
  2739. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2740. if _key in dict_package:
  2741. if self.is_same_package(_pp,dict_package[_key]):
  2742. ud = self.getUpdate_dict(_pp)
  2743. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2744. dict_package[_key].update(ud)
  2745. flag_update = True
  2746. continue
  2747. if not flag_update:
  2748. _pp.update(project_dict)
  2749. projects.append(_pp)
  2750. _counts = 0
  2751. if win_tenderer!="" and bidding_budget!=0:
  2752. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2753. dict_package[_key] = _pp
  2754. _counts += 1
  2755. if win_tenderer!="" and win_bid_price!=0:
  2756. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2757. dict_package[_key] = _pp
  2758. _counts +=1
  2759. if _counts==0:
  2760. if win_tenderer!="":
  2761. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2762. dict_package[_key] = _pp
  2763. _counts += 1
  2764. if bidding_budget!=0:
  2765. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2766. dict_package[_key] = _pp
  2767. _counts += 1
  2768. def delete_projects_by_document(self,docid):
  2769. '''
  2770. 更新projects中对应的document的属性
  2771. :param docid:
  2772. :param projects: 项目集合
  2773. :param action:add/delete add时附加唯一属性,delete时删除唯一属性
  2774. :return:
  2775. '''
  2776. set_docid = set()
  2777. list_delete_projects = []
  2778. list_projects = self.search_projects_with_document([docid])
  2779. for _proj in list_projects:
  2780. _p = {}
  2781. _docids = _proj.get(project_docids,"")
  2782. print(_proj.get(project_uuid))
  2783. _p["delete_uuid"] = _proj.get(project_uuid)
  2784. _p["to_delete"] = True
  2785. list_delete_projects.append(_p)
  2786. if _docids!="":
  2787. set_docid = set_docid | set(_docids.split(","))
  2788. if str(docid) in set_docid:
  2789. set_docid.remove(str(docid))
  2790. list_docid = list(set_docid)
  2791. list_projects = []
  2792. if len(list_docid)>0:
  2793. list_docs = self.search_docs(list_docid)
  2794. list_projects = self.generate_projects_from_document(list_docs)
  2795. list_projects = dumplicate_projects(list_projects)
  2796. list_projects.extend(list_delete_projects)
  2797. project_json = to_project_json(list_projects)
  2798. print("delete_json",project_json)
  2799. return project_json
  2800. def delete_doc_handle(self,_dict,result_queue):
  2801. headers = _dict.get("frame")
  2802. conn = _dict.get("conn")
  2803. log("==========delete")
  2804. if headers is not None:
  2805. message_id = headers.headers["message-id"]
  2806. body = headers.body
  2807. item = json.loads(body)
  2808. docid = item.get("docid")
  2809. if docid is None:
  2810. return
  2811. delete_result = self.delete_projects_by_document(docid)
  2812. _uuid = uuid4().hex
  2813. _d = {PROJECT_PROCESS_UUID:_uuid,
  2814. PROJECT_PROCESS_CRTIME:1,
  2815. PROJECT_PROCESS_PROJECTS:delete_result}
  2816. _pp = Project_process(_d)
  2817. if _pp.update_row(self.ots_client):
  2818. ackMsg(conn,message_id)
  2819. #取消插入结果队列,改成插入project_process表
  2820. # if send_msg_toacmq(self.pool_mq_ali,delete_result,self.doc_delete_result):
  2821. # ackMsg(conn,message_id)
  2822. def generate_common_properties(self,list_docs):
  2823. '''
  2824. #通用属性生成
  2825. :param list_docis:
  2826. :return:
  2827. '''
  2828. #计数法选择
  2829. choose_dict = {}
  2830. project_dict = {}
  2831. for _key in [document_bidway,document_industry,document_info_type,document_info_source,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]:
  2832. for _doc in list_docs:
  2833. _value = _doc.getProperties().get(_key,"")
  2834. if _value!="":
  2835. if _key not in choose_dict:
  2836. choose_dict[_key] = {}
  2837. if _value not in choose_dict[_key]:
  2838. choose_dict[_key][_value] = 0
  2839. choose_dict[_key][_value] += 1
  2840. _find = False
  2841. for _key in [document_district,document_city,document_province,document_area]:
  2842. area_dict = {}
  2843. for _doc in list_docs:
  2844. loc = _doc.getProperties().get(_key,"未知")
  2845. if loc not in ('全国','未知',"0"):
  2846. if loc not in area_dict:
  2847. area_dict[loc] = 0
  2848. area_dict[loc] += 1
  2849. list_loc = []
  2850. for k,v in area_dict.items():
  2851. list_loc.append([k,v])
  2852. list_loc.sort(key=lambda x:x[1],reverse=True)
  2853. if len(list_loc)>0:
  2854. project_dict[document_district] = _doc.getProperties().get(document_district)
  2855. project_dict[document_city] = _doc.getProperties().get(document_city)
  2856. project_dict[document_province] = _doc.getProperties().get(document_province)
  2857. project_dict[document_area] = _doc.getProperties().get(document_area)
  2858. _find = True
  2859. break
  2860. if not _find:
  2861. if len(list_docs)>0:
  2862. project_dict[document_district] = list_docs[0].getProperties().get(document_district)
  2863. project_dict[document_city] = list_docs[0].getProperties().get(document_city)
  2864. project_dict[document_province] = list_docs[0].getProperties().get(document_province)
  2865. project_dict[document_area] = list_docs[0].getProperties().get(document_area)
  2866. for _key,_value in choose_dict.items():
  2867. _l = []
  2868. for k,v in _value.items():
  2869. _l.append([k,v])
  2870. _l.sort(key=lambda x:x[1],reverse=True)
  2871. if len(_l)>0:
  2872. _v = _l[0][0]
  2873. if _v in ('全国','未知'):
  2874. if len(_l)>1:
  2875. _v = _l[1][0]
  2876. project_dict[_key] = _v
  2877. list_dynamics = []
  2878. docid_number = 0
  2879. visuable_docids = []
  2880. zhao_biao_page_time = ""
  2881. zhong_biao_page_time = ""
  2882. list_codes = []
  2883. list_product = []
  2884. p_page_time = ""
  2885. remove_docids = set()
  2886. for _doc in list_docs:
  2887. table_name = _doc.getProperties().get("table_name")
  2888. status = _doc.getProperties().get(document_status,0)
  2889. _save = _doc.getProperties().get(document_tmp_save,1)
  2890. doctitle = _doc.getProperties().get(document_doctitle,"")
  2891. docchannel = _doc.getProperties().get(document_docchannel)
  2892. page_time = _doc.getProperties().get(document_page_time,"")
  2893. _docid = _doc.getProperties().get(document_docid)
  2894. _bidway = _doc.getProperties().get(document_bidway,"")
  2895. _docchannel = _doc.getProperties().get(document_life_docchannel,0)
  2896. project_codes = _doc.getProperties().get(document_project_codes)
  2897. product = _doc.getProperties().get(document_product)
  2898. sub_docs = _doc.getProperties().get("sub_docs",[])
  2899. is_multipack = True if len(sub_docs)>1 else False
  2900. extract_count = _doc.getProperties().get(document_tmp_extract_count,0)
  2901. if product is not None:
  2902. list_product.extend(product.split(","))
  2903. if project_codes is not None:
  2904. _c = project_codes.split(",")
  2905. list_codes.extend(_c)
  2906. if p_page_time=="":
  2907. p_page_time = page_time
  2908. if zhao_biao_page_time=="" and _docchannel in (51,52,102,103,114):
  2909. zhao_biao_page_time = page_time
  2910. if zhong_biao_page_time=="" and _docchannel in (101,118,119,120):
  2911. zhong_biao_page_time = page_time
  2912. is_visuable = 0
  2913. if table_name=="document":
  2914. if status>=201 and status<=300:
  2915. docid_number +=1
  2916. visuable_docids.append(str(_docid))
  2917. is_visuable = 1
  2918. else:
  2919. remove_docids.add(str(_docid))
  2920. else:
  2921. if _save==1:
  2922. docid_number +=1
  2923. visuable_docids.append(str(_docid))
  2924. is_visuable = 1
  2925. else:
  2926. remove_docids.add(str(_docid))
  2927. list_dynamics.append({document_docid:_docid,
  2928. document_doctitle:doctitle,
  2929. document_docchannel:_docchannel,
  2930. document_bidway:_bidway,
  2931. document_page_time:page_time,
  2932. document_status:201 if is_visuable==1 else 401,
  2933. "is_multipack":is_multipack,
  2934. document_tmp_extract_count:extract_count
  2935. }
  2936. )
  2937. project_dict[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
  2938. project_dict[project_docid_number] = docid_number
  2939. project_dict[project_docids] = ",".join(list(set(visuable_docids)-remove_docids))
  2940. if zhao_biao_page_time !="":
  2941. project_dict[project_zhao_biao_page_time] = zhao_biao_page_time
  2942. if zhong_biao_page_time !="":
  2943. project_dict[project_zhong_biao_page_time] = zhong_biao_page_time
  2944. project_dict[project_project_codes] = ",".join(list(set(list_codes)))
  2945. project_dict[project_page_time] = p_page_time
  2946. project_dict[project_product] = ",".join(list(set(list_product)))
  2947. return project_dict
  2948. def generate_packages_properties(self,list_docs):
  2949. '''
  2950. 生成分包属性
  2951. :param list_docs:
  2952. :return:
  2953. '''
  2954. list_properties = []
  2955. set_key = set()
  2956. for _doc in list_docs:
  2957. _dict = {}
  2958. sub_docs = _doc.getProperties().get("sub_docs")
  2959. if sub_docs is not None:
  2960. for _d in sub_docs:
  2961. sub_project_code = _d.get(project_sub_project_code,"")
  2962. sub_project_name = _d.get(project_sub_project_name,"")
  2963. win_tenderer = _d.get(project_win_tenderer,"")
  2964. win_bid_price = _d.get(project_win_bid_price,"")
  2965. _key = "%s-%s-%s-%s"%(sub_project_code,sub_project_name,win_tenderer,win_bid_price)
  2966. if _key in set_key:
  2967. continue
  2968. set_key.add(_key)
  2969. list_properties.append(_d)
  2970. return list_properties
  2971. def generate_projects_from_document(self,list_docs):
  2972. '''
  2973. #通过公告生成projects
  2974. :param list_docids:
  2975. :return:
  2976. '''
  2977. #判断标段数
  2978. list_projects = generate_projects([doc.getProperties() for doc in list_docs])
  2979. return list_projects
  2980. def search_projects_with_document(self,list_docids):
  2981. '''
  2982. 通过docid集合查询对应的projects
  2983. :param list_docids:
  2984. :return:
  2985. '''
  2986. print("==",list_docids)
  2987. list_should_q = []
  2988. for _docid in list_docids:
  2989. list_should_q.append(TermQuery("docids",_docid))
  2990. bool_query = BoolQuery(should_queries=list_should_q)
  2991. _query = {"query":bool_query,"limit":20}
  2992. list_project_dict = getDocument(_query,self.ots_client,[
  2993. project_uuid,project_docids,project_zhao_biao_page_time,
  2994. project_zhong_biao_page_time,
  2995. project_page_time,
  2996. project_area,
  2997. project_province,
  2998. project_city,
  2999. project_district,
  3000. project_info_type,
  3001. project_industry,
  3002. project_qcodes,
  3003. project_project_name,
  3004. project_project_code,
  3005. project_project_codes,
  3006. project_project_addr,
  3007. project_tenderee,
  3008. project_tenderee_addr,
  3009. project_tenderee_phone,
  3010. project_tenderee_contact,
  3011. project_agency,
  3012. project_agency_phone,
  3013. project_agency_contact,
  3014. project_sub_project_name,
  3015. project_sub_project_code,
  3016. project_bidding_budget,
  3017. project_win_tenderer,
  3018. project_win_bid_price,
  3019. project_win_tenderer_manager,
  3020. project_win_tenderer_phone,
  3021. project_second_tenderer,
  3022. project_second_bid_price,
  3023. project_second_tenderer_manager,
  3024. project_second_tenderer_phone,
  3025. project_third_tenderer,
  3026. project_third_bid_price,
  3027. project_third_tenderer_manager,
  3028. project_third_tenderer_phone,
  3029. project_procurement_system,
  3030. project_bidway,
  3031. project_dup_data,
  3032. project_docid_number,
  3033. project_project_dynamics,
  3034. project_product,
  3035. project_moneysource,
  3036. project_service_time,
  3037. project_time_bidclose,
  3038. project_time_bidopen,
  3039. project_time_bidstart,
  3040. project_time_commencement,
  3041. project_time_completion,
  3042. project_time_earnest_money_start,
  3043. project_time_earnest_money_end,
  3044. project_time_get_file_end,
  3045. project_time_get_file_start,
  3046. project_time_publicity_end,
  3047. project_time_publicity_start,
  3048. project_time_registration_end,
  3049. project_time_registration_start,
  3050. project_time_release,
  3051. project_dup_docid,
  3052. project_info_source,
  3053. project_nlp_enterprise,
  3054. project_nlp_enterprise_attachment,
  3055. ],sort="page_time",table_name="project2",table_index="project2_index")
  3056. return list_project_dict
  3057. def set_project_uuid(self,_dict,_uuid):
  3058. if _uuid is not None and _uuid!="":
  3059. if "uuid" in _dict:
  3060. _dict["uuid"] = "%s,%s"%(_dict["uuid"],_uuid)
  3061. else:
  3062. _dict["uuid"] = _uuid
  3063. def getMerge_rules(self,page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district):
  3064. whole_time_start = time.time()
  3065. _time = time.time()
  3066. list_query = []
  3067. list_code = [a for a in project_codes.split(",") if a!='']
  3068. should_q_code = BoolQuery(should_queries=[MatchQuery(project_project_codes,a) for a in list_code[:20]])
  3069. # print("should_q_code",[a for a in list_code[:20]])
  3070. should_q_cod = BoolQuery(should_queries=[MatchQuery(project_project_code,a) for a in list_code[:20]])
  3071. list_product = [a for a in product.split(",") if a!='']
  3072. should_q_product = BoolQuery(should_queries=[MatchQuery(project_product,a) for a in list_product[:20]])
  3073. should_q_area = None
  3074. if province!="" or city!="" or district!="":
  3075. should_q = []
  3076. if province not in ("","全国","未知") and province is not None:
  3077. should_q.append(TermQuery(project_province,province))
  3078. if city not in ("","全国","未知") and city is not None:
  3079. should_q.append(TermQuery(project_city,city))
  3080. if district not in ("","全国","未知") and district is not None:
  3081. should_q.append(TermQuery(project_district,district))
  3082. if len(should_q)>0:
  3083. should_q_area = BoolQuery(should_queries=should_q)
  3084. prepare_time = time.time()-_time
  3085. _time = time.time()
  3086. # log("list_code %s"%(str(list_code)))
  3087. # log("list_product %s"%(str(list_product)))
  3088. # log("tenderee %s"%(tenderee))
  3089. # log("bidding_budget %s"%(bidding_budget))
  3090. # log("win_tenderer %s"%(win_tenderer))
  3091. # log("win_bid_price %s"%(win_bid_price))
  3092. # log("project_name %s"%(project_name))
  3093. log_time = time.time()-_time
  3094. _time = time.time()
  3095. if tenderee!="" and len(list_code)>0:
  3096. _query = [TermQuery(project_tenderee,tenderee),
  3097. should_q_code,
  3098. ]
  3099. list_query.append([_query,2])
  3100. _query = [TermQuery(project_tenderee,tenderee),
  3101. should_q_cod
  3102. ]
  3103. list_query.append([_query,2])
  3104. if tenderee!="" and len(list_product)>0:
  3105. _query = [TermQuery(project_tenderee,tenderee),
  3106. should_q_product]
  3107. list_query.append([_query,2])
  3108. if tenderee!="" and project_name!="":
  3109. _query = [TermQuery(project_tenderee,tenderee),
  3110. TermQuery(project_project_name,project_name)]
  3111. list_query.append([_query,2])
  3112. if tenderee!="" and agency!="":
  3113. _query = [TermQuery(project_tenderee,tenderee),
  3114. TermQuery(project_agency,agency)]
  3115. list_query.append([_query,1])
  3116. if tenderee!="" and float(bidding_budget)>0:
  3117. _query = [TermQuery(project_tenderee,tenderee),
  3118. TermQuery(project_bidding_budget,bidding_budget)]
  3119. list_query.append([_query,2])
  3120. if float(bidding_budget)>0 and float(win_bid_price)>0:
  3121. _query = [TermQuery(project_bidding_budget,bidding_budget),
  3122. TermQuery(project_win_bid_price,win_bid_price)]
  3123. list_query.append([_query,2])
  3124. if tenderee!="" and win_tenderer!="":
  3125. _query = [TermQuery(project_tenderee,tenderee),
  3126. TermQuery(project_win_tenderer,win_tenderer)]
  3127. list_query.append([_query,2])
  3128. if agency!="" and win_tenderer!="":
  3129. _query = [TermQuery(project_agency,agency),
  3130. TermQuery(project_win_tenderer,win_tenderer)]
  3131. list_query.append([_query,2])
  3132. if agency!="" and len(list_product)>0:
  3133. _query = [TermQuery(project_agency,agency),
  3134. should_q_product]
  3135. list_query.append([_query,2])
  3136. if win_tenderer!="" and len(list_code)>0:
  3137. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3138. should_q_code]
  3139. list_query.append([_query,2])
  3140. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3141. should_q_cod]
  3142. list_query.append([_query,2])
  3143. if win_tenderer!="" and float(win_bid_price)>0:
  3144. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3145. TermQuery(project_win_bid_price,win_bid_price)]
  3146. list_query.append([_query,2])
  3147. if win_tenderer!="" and float(bidding_budget)>0:
  3148. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3149. TermQuery(project_bidding_budget,bidding_budget)]
  3150. list_query.append([_query,2])
  3151. if len(list_code)>0 and len(list_product)>0:
  3152. _query = [should_q_code,
  3153. should_q_product]
  3154. list_query.append([_query,2])
  3155. if len(list_code)>0:
  3156. _query = [
  3157. should_q_code]
  3158. list_query.append([_query,1])
  3159. _query = [
  3160. should_q_cod]
  3161. list_query.append([_query,1])
  3162. if project_name!="" and project_name is not None:
  3163. _query = [
  3164. TermQuery(project_project_name,project_name)]
  3165. list_query.append([_query,1])
  3166. _query_title = [MatchPhraseQuery(project_doctitles,project_name)]
  3167. list_query.append([_query_title,1])
  3168. if len(list_product)>0 and should_q_area is not None:
  3169. _query = [should_q_area,
  3170. should_q_product]
  3171. list_query.append([_query,1])
  3172. generate_time = time.time()-_time
  3173. whole_time = time.time()-whole_time_start
  3174. log("projects merge rules whole_time:%.3f prepare_time:%.3f log_time:%.3f generate_time:%.3f"%(whole_time,prepare_time,log_time,generate_time))
  3175. return list_query
  3176. def merge_projects(self,list_projects,b_log=False,check_columns=[project_uuid,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_project_name,project_project_code,project_project_codes,project_tenderee,project_agency,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_project_dynamics,project_product,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_nlp_enterprise,project_nlp_enterprise_attachment,project_docids,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_addr,project_tenderee_addr,project_agency_phone,project_agency_contact,project_tenderee_phone,project_tenderee_contact,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_moneysource,project_service_time,project_dup_docid,project_info_source],fix_columns=[project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_addr,project_tenderee_addr,project_agency_phone,project_agency_contact,project_tenderee_phone,project_tenderee_contact,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_moneysource,project_service_time,project_dup_docid,project_info_source]):
  3177. '''
  3178. 对项目进行合并
  3179. :return:
  3180. '''
  3181. try:
  3182. whole_time_start = time.time()
  3183. set_uuid = set()
  3184. for _proj in list_projects:
  3185. _uuid = _proj.get("uuid")
  3186. if _uuid is not None:
  3187. set_uuid = set_uuid | set(_uuid.split(","))
  3188. must_not_q = []
  3189. for _uuid in list(set_uuid):
  3190. must_not_q.append(TermQuery("uuid",_uuid))
  3191. projects_merge_count = 0
  3192. projects_check_rule_time = 0
  3193. projects_update_time = 0
  3194. projects_query_time = 0
  3195. projects_prepare_time = 0
  3196. current_date = getCurrent_date("%Y-%m-%d")
  3197. min_date = timeAdd(current_date,-35,format="%Y-%m-%d")
  3198. search_table = "project2"
  3199. search_table_index = "project2_index_formerge"
  3200. project_cls = Project
  3201. docids = ""
  3202. for _proj in list_projects[:30]:
  3203. docids = _proj.get(project_docids,"")
  3204. page_time = _proj.get(project_page_time,"")
  3205. project_codes = _proj.get(project_project_codes,"")
  3206. project_name = _proj.get(project_project_name,"")
  3207. tenderee = _proj.get(project_tenderee,"")
  3208. agency = _proj.get(project_agency,"")
  3209. product = _proj.get(project_product,"")
  3210. sub_project_name = _proj.get(project_sub_project_name,"")
  3211. bidding_budget = _proj.get(project_bidding_budget,-1)
  3212. win_tenderer = _proj.get(project_win_tenderer,"")
  3213. win_bid_price = _proj.get(project_win_bid_price,-1)
  3214. province = _proj.get(project_province,"")
  3215. city = _proj.get(project_city,"")
  3216. district = _proj.get(project_district,"")
  3217. page_time_less = timeAdd(page_time,-150)
  3218. page_time_greater = timeAdd(page_time,120)
  3219. sub_project_q = TermQuery(project_sub_project_name,sub_project_name) if sub_project_name.replace("Project","")!="" else None
  3220. _time = time.time()
  3221. list_must_query = self.getMerge_rules(page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district)
  3222. list_merge_data = []
  3223. search_table = "project2"
  3224. search_table_index = "project2_index_formerge"
  3225. project_cls = Project
  3226. print("page_time,min_date",page_time,min_date)
  3227. if page_time>=min_date:
  3228. search_table = "project2_tmp"
  3229. search_table_index = "project2_tmp_index"
  3230. project_cls = Project_tmp
  3231. _step = 4
  3232. _begin = 0
  3233. must_queries = []
  3234. if page_time_less is not None and page_time_greater is not None:
  3235. must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
  3236. ]
  3237. print("page_time_less,page_time_greater",page_time,page_time_less,page_time_greater)
  3238. #sub_project_name非必要条件
  3239. # if sub_project_q is not None:
  3240. # must_queries.append(sub_project_q)
  3241. projects_prepare_time += time.time()-_time
  3242. _time = time.time()
  3243. while _begin<len(list_must_query):
  3244. list_should_q = []
  3245. _limit = 20
  3246. for must_q,_count in list_must_query[_begin:_begin+_step]:
  3247. must_q1 = list(must_q)
  3248. must_q1.extend(must_queries)
  3249. list_should_q.append(BoolQuery(must_queries=must_q1))
  3250. # _limit += _count*5
  3251. _query = BoolQuery(
  3252. should_queries=list_should_q,
  3253. must_not_queries=must_not_q[:100]
  3254. )
  3255. # rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
  3256. # SearchQuery(_query,limit=_limit),
  3257. # columns_to_get=ColumnsToGet(column_names=[project_uuid,project_docids,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_name,project_project_code,project_project_codes,project_project_addr,project_tenderee,project_tenderee_addr,project_tenderee_phone,project_tenderee_contact,project_agency,project_agency_phone,project_agency_contact,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_project_dynamics,project_product,project_moneysource,project_service_time,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_dup_docid,project_info_source,project_nlp_enterprise,project_nlp_enterprise_attachment],return_type=ColumnReturnType.SPECIFIED))
  3258. rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search(search_table,search_table_index,
  3259. SearchQuery(_query,limit=_limit),
  3260. columns_to_get=ColumnsToGet(column_names=check_columns,return_type=ColumnReturnType.SPECIFIED))
  3261. list_data = getRow_ots(rows)
  3262. list_merge_data.extend(list_data)
  3263. # print(list_data)
  3264. for _data in list_data:
  3265. must_not_q.append(TermQuery(project_uuid,_data.get(project_uuid)))
  3266. _begin += _step
  3267. projects_query_time += time.time()-_time
  3268. #优先匹配招标金额相近的
  3269. projects_merge_count = len(list_merge_data)
  3270. list_merge_data.sort(key=lambda x:x.get(project_page_time,""))
  3271. list_merge_data.sort(key=lambda x:x.get(project_bidding_budget,-1))
  3272. # log(page_time_less+"=="+page_time_greater)
  3273. # log("list_merge_data:%s"%(str(list_merge_data)))
  3274. list_check_data = []
  3275. for _data in list_merge_data:
  3276. _time = time.time()
  3277. _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
  3278. if b_log:
  3279. log(str(_check))
  3280. projects_check_rule_time += time.time()-_time
  3281. if _check:
  3282. list_check_data.append([_data,_prob])
  3283. list_check_data.sort(key=lambda x:x[1],reverse=True)
  3284. for _data,_ in list_check_data:
  3285. _time = time.time()
  3286. _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
  3287. projects_check_rule_time += time.time()-_time
  3288. _time = time.time()
  3289. if _check:
  3290. # o_proj = project_cls(_data)
  3291. # o_proj.fix_columns(self.ots_client,fix_columns,True)
  3292. # for k in fix_columns:
  3293. # _data[k] = o_proj.getProperties().get(k)
  3294. update_projects_by_project(_data,[_proj])
  3295. projects_update_time += time.time()-_time
  3296. whole_time = time.time()-whole_time_start
  3297. log("%s %s merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(search_table,docids,whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
  3298. return list_projects
  3299. except Exception as e:
  3300. traceback.print_exc()
  3301. assert 1==2
  3302. def merge_document_real(self,item,dup_docid,table_name,save,status_to=None,b_log=False):
  3303. '''
  3304. 实时项目合并
  3305. :param item:
  3306. :param dup_docid:重复的公告集合
  3307. :param status_to:
  3308. :return:
  3309. '''
  3310. try:
  3311. list_docids = []
  3312. _docid = item.get(document_tmp_docid)
  3313. list_docids.append(_docid)
  3314. if isinstance(dup_docid,list):
  3315. list_docids.extend(dup_docid)
  3316. list_docids = [a for a in list_docids if a is not None]
  3317. _time = time.time()
  3318. list_projects = self.search_projects_with_document(list_docids)
  3319. # log("search projects takes:%.3f"%(time.time()-_time))
  3320. if len(list_projects)==0:
  3321. # _time = time.time()
  3322. list_docs = self.search_docs(list_docids)
  3323. # log("search document takes:%.3f"%(time.time()-_time))
  3324. # _time = time.time()
  3325. list_projects = self.generate_projects_from_document(list_docs)
  3326. # log("generate projects takes:%.3f"%(time.time()-_time))
  3327. else:
  3328. _time = time.time()
  3329. self.update_projects_by_document(_docid,save,list_projects)
  3330. # log("update projects takes:%.3f"%(time.time()-_time))
  3331. _time = time.time()
  3332. list_projects = dumplicate_projects(list_projects)
  3333. # log("dumplicate projects takes:%.3f"%(time.time()-_time))
  3334. _time = time.time()
  3335. list_projects = self.merge_projects(list_projects,b_log)
  3336. # log("merge projects takes:%.3f"%(time.time()-_time))
  3337. _time = time.time()
  3338. dumplicate_document_in_merge(list_projects)
  3339. log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
  3340. _time = time.time()
  3341. project_json = to_project_json(list_projects)
  3342. # log("json projects takes:%.3f"%(time.time()-_time))
  3343. if b_log:
  3344. log("project_json:%s"%project_json)
  3345. return project_json
  3346. except Exception as e:
  3347. raise RuntimeError("error on dumplicate")
  3348. def is_exist_fingerprint(self,final_list,_docid,_fingerprint,table_name):
  3349. set_fingerprint = set()
  3350. for _i in range(1,len(final_list)):
  3351. _dict = final_list[_i]
  3352. b_docid = _dict[document_tmp_docid]
  3353. _save = _dict.get(document_tmp_save,0)
  3354. _status = _dict.get(document_tmp_status,0)
  3355. if table_name=="document":
  3356. if _status>=201 and _status<=300:
  3357. _save = 1
  3358. fingerprint_less = _dict.get(document_tmp_fingerprint,"")
  3359. if b_docid==_docid:
  3360. pass
  3361. else:
  3362. if _save==1:
  3363. set_fingerprint.add(fingerprint_less)
  3364. print("_fingerprint",_fingerprint)
  3365. print(set_fingerprint)
  3366. if _fingerprint in set_fingerprint:
  3367. return True
  3368. return False
  3369. def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
  3370. try:
  3371. start_time = time.time()
  3372. self.post_extract(item)
  3373. base_list = []
  3374. set_docid = set()
  3375. list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=get_all,to_log=False)
  3376. # print("len_rules",len(list_rules),table_name,table_index)
  3377. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  3378. _i = 0
  3379. step = 5
  3380. item["confidence"] = 999
  3381. if item.get(document_tmp_docid) not in set_docid:
  3382. base_list.append(item)
  3383. set_docid.add(item.get(document_tmp_docid))
  3384. while _i<len(list_rules):
  3385. must_not_q = []
  3386. if len(base_list)>0:
  3387. must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
  3388. _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
  3389. must_not_queries=must_not_q)
  3390. _rule = list_rules[_i]
  3391. confidence = _rule["confidence"]
  3392. singleNum_keys = _rule["singleNum_keys"]
  3393. contain_keys = _rule["contain_keys"]
  3394. multiNum_keys = _rule["multiNum_keys"]
  3395. self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status])
  3396. _i += step
  3397. _time = time.time()
  3398. # log("%d start final check with length:%d"%(item["docid"],len(base_list)))
  3399. final_list = self.dumplicate_fianl_check(base_list)
  3400. exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),table_name)
  3401. # log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
  3402. best_docid = self.get_best_docid(final_list)
  3403. final_list_docid = [a["docid"] for a in final_list]
  3404. # log("%d:final_list_docid:%s"%(item["docid"],str(final_list_docid)))
  3405. _d = {"partitionkey":item["partitionkey"],
  3406. "docid":item["docid"],
  3407. "status":random.randint(*flow_dumplicate_status_to),
  3408. document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  3409. }
  3410. dtmp = Document_tmp(_d)
  3411. dup_docid = set()
  3412. for _dict in final_list:
  3413. dup_docid.add(_dict.get(document_tmp_docid))
  3414. if item.get(document_tmp_docid) in dup_docid:
  3415. dup_docid.remove(item.get(document_tmp_docid))
  3416. remove_list = []
  3417. if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
  3418. dtmp.setValue(document_tmp_save,1,True)
  3419. # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
  3420. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3421. for _dict in final_list:
  3422. if _dict.get(document_tmp_docid) in dup_docid:
  3423. remove_list.append(_dict)
  3424. else:
  3425. dtmp.setValue(document_tmp_save,0,True)
  3426. if best_docid in dup_docid:
  3427. dup_docid.remove(best_docid)
  3428. for _dict in final_list:
  3429. if _dict.get(document_tmp_docid) in dup_docid:
  3430. remove_list.append(_dict)
  3431. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3432. dmp_docid = "%d,%s"%(best_docid,dmp_docid)
  3433. else:
  3434. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3435. for _dict in final_list:
  3436. if _dict.get(document_tmp_docid) in dup_docid:
  3437. remove_list.append(_dict)
  3438. list_docids = list(dup_docid)
  3439. list_docids.append(best_docid)
  3440. b_log = False if upgrade else True
  3441. if item.get(document_update_document)=="true":
  3442. dtmp.setValue(document_tmp_save,1,True)
  3443. if exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0:
  3444. log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
  3445. dtmp.setValue(document_tmp_projects,"[]",True)
  3446. else:
  3447. dtmp.setValue(document_tmp_projects,self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log),True)
  3448. log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
  3449. if upgrade:
  3450. if table_name=="document_tmp":
  3451. self.changeSaveStatus(remove_list)
  3452. # print(dtmp.getProperties())
  3453. dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
  3454. dtmp.setValue(document_tmp_best_docid,best_docid,True)
  3455. _flag = dtmp.update_row(self.ots_client)
  3456. if not _flag:
  3457. for i in range(10):
  3458. list_proj_json = dtmp.getProperties().get(document_tmp_projects)
  3459. if list_proj_json is not None:
  3460. list_proj = json.loads(list_proj_json)
  3461. dtmp.setValue(document_tmp_projects,json.dumps(list_proj[:len(list_proj)//2]),True)
  3462. if dtmp.update_row(self.ots_client):
  3463. break
  3464. # log("dump takes %.2f"%(time.time()-start_time))
  3465. except Exception as e:
  3466. traceback.print_exc()
  3467. log("error on dumplicate of %s"%(str(item.get(document_tmp_docid))))
  3468. def fix_doc_which_not_in_project(self):
  3469. '''
  3470. 将成品公告中不存在于project2的数据取出,并放入document_tmp中重新进行去重和合并
  3471. :return:
  3472. '''
  3473. def fix_doc_handle(item,result_queue):
  3474. _docid = item.get(document_tmp_docid)
  3475. b_q = BoolQuery(must_queries=[TermQuery(project_docids,str(_docid))])
  3476. rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
  3477. SearchQuery(b_q,get_total_count=True),
  3478. ColumnsToGet(return_type=ColumnReturnType.NONE))
  3479. if total_count==0:
  3480. log("fix_doc:%s not in project2"%(str(_docid)))
  3481. d_tmp = Document_tmp(item)
  3482. d_tmp.setValue(document_tmp_status,flow_dumplicate_status_from[0],True)
  3483. d_tmp.update_row(self.ots_client)
  3484. if self.fix_doc_docid is None:
  3485. current_date = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  3486. before_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-5)
  3487. bool_query = BoolQuery(must_queries=[
  3488. TermQuery(document_tmp_save,1),
  3489. RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
  3490. RangeQuery(document_tmp_opertime,before_date)
  3491. ])
  3492. else:
  3493. bool_query = BoolQuery(must_queries=[
  3494. TermQuery(document_tmp_save,1),
  3495. RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
  3496. RangeQuery(document_tmp_docid,self.fix_doc_docid)
  3497. ])
  3498. list_data = []
  3499. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  3500. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),get_total_count=True,limit=100),
  3501. ColumnsToGet(return_type=ColumnReturnType.NONE))
  3502. list_d = getRow_ots(rows)
  3503. list_data.extend(list_d)
  3504. while next_token:
  3505. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  3506. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  3507. ColumnsToGet(return_type=ColumnReturnType.NONE))
  3508. list_d = getRow_ots(rows)
  3509. list_data.extend(list_d)
  3510. print("%d/%d"%(len(list_data),total_count))
  3511. if len(list_data)>0:
  3512. self.fix_doc_docid = list_data[-1].get(document_tmp_docid)
  3513. log("current fix_doc_docid:%s"%(str(self.fix_doc_docid)))
  3514. task_queue = Queue()
  3515. for _data in list_data:
  3516. task_queue.put(_data)
  3517. mt = MultiThreadHandler(task_queue,fix_doc_handle,None,30)
  3518. mt.run()
  3519. def start_flow_dumplicate(self):
  3520. schedule = BlockingScheduler()
  3521. schedule.add_job(self.flow_dumplicate,"cron",second="*/40")
  3522. schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/10")
  3523. schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
  3524. # schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="55")
  3525. schedule.start()
  3526. def changeSaveStatus(self,list_dict):
  3527. for _dict in list_dict:
  3528. if _dict.get(document_tmp_save,1)==1:
  3529. _d = {"partitionkey":_dict["partitionkey"],
  3530. "docid":_dict["docid"],
  3531. document_tmp_save:0
  3532. }
  3533. _d_tmp = Document_tmp(_d)
  3534. _d_tmp.update_row(self.ots_client)
  3535. def test_dumplicate(self,docid):
  3536. columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
  3537. bool_query = BoolQuery(must_queries=[
  3538. TermQuery("docid",docid)
  3539. ])
  3540. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  3541. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3542. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3543. log("flow_dumplicate producer total_count:%d"%total_count)
  3544. if total_count==0:
  3545. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  3546. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3547. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3548. list_dict = getRow_ots(rows)
  3549. for item in list_dict:
  3550. self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
  3551. return
  3552. def test_merge(self,list_docid_less,list_docid_greater):
  3553. list_docs_less = self.search_docs(list_docid_less)
  3554. list_projects_less = self.generate_projects_from_document(list_docs_less)
  3555. list_docs_greater = self.search_docs(list_docid_greater)
  3556. list_projects_greater = self.generate_projects_from_document(list_docs_greater)
  3557. list_projects_less.extend(list_projects_greater)
  3558. list_projects = dumplicate_projects(list_projects_less,b_log=True)
  3559. project_json = to_project_json(list_projects)
  3560. log("project_json:%s"%project_json)
  3561. return project_json
  3562. def getRemainDoc(self,docid):
  3563. columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
  3564. bool_query = BoolQuery(must_queries=[
  3565. TermQuery("docid",docid)
  3566. ])
  3567. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  3568. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3569. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3570. list_dict = getRow_ots(rows)
  3571. if len(list_dict)>0:
  3572. item = list_dict[0]
  3573. start_time = time.time()
  3574. self.post_extract(item)
  3575. base_list = []
  3576. set_docid = set()
  3577. list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,to_log=True)
  3578. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  3579. _i = 0
  3580. step = 5
  3581. item["confidence"] = 999
  3582. if item.get(document_tmp_docid) not in set_docid:
  3583. base_list.append(item)
  3584. set_docid.add(item.get(document_tmp_docid))
  3585. while _i<len(list_rules):
  3586. must_not_q = []
  3587. if len(base_list)>0:
  3588. must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
  3589. _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
  3590. must_not_queries=must_not_q)
  3591. _rule = list_rules[_i]
  3592. confidence = _rule["confidence"]
  3593. singleNum_keys = _rule["singleNum_keys"]
  3594. contain_keys = _rule["contain_keys"]
  3595. multiNum_keys = _rule["multiNum_keys"]
  3596. self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json])
  3597. _i += step
  3598. _time = time.time()
  3599. log("%d start final check with length:%d"%(item["docid"],len(base_list)))
  3600. final_list = self.dumplicate_fianl_check(base_list)
  3601. log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
  3602. best_docid = self.get_best_docid(final_list)
  3603. return best_docid
  3604. return None
  3605. if __name__ == '__main__':
  3606. # df = Dataflow()
  3607. # df.flow_init()
  3608. # df.flow_test()
  3609. # df.test_merge()
  3610. # df.start_flow_attachment()
  3611. # df.start_flow_extract()
  3612. # df.start_flow_dumplicate()
  3613. # # df.start_flow_merge()
  3614. # df.start_flow_remove()
  3615. # download_attachment()
  3616. # test_attachment_interface()
  3617. df_dump = Dataflow_dumplicate(start_delete_listener=False)
  3618. # df_dump.start_flow_dumplicate()
  3619. a = time.time()
  3620. df_dump.test_dumplicate(335604520)
  3621. # df_dump.test_merge([292315564],[287890754])
  3622. # df_dump.flow_remove_project_tmp()
  3623. print("takes",time.time()-a)
  3624. # df_dump.fix_doc_which_not_in_project()
  3625. # df_dump.delete_projects_by_document(16288036)
  3626. # log("=======")
  3627. # for i in range(3):
  3628. # time.sleep(20)
  3629. #
  3630. # a = {"docid":74295123}
  3631. # send_msg_toacmq(df_dump.pool_mq_ali,json.dumps(a),df_dump.doc_delete_queue)